403Webshell
Server IP : 103.119.228.120  /  Your IP : 3.133.152.26
Web Server : Apache
System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User : nobody ( 99)
PHP Version : 5.6.40
Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime
MySQL : ON |  cURL : ON |  WGET : ON |  Perl : ON |  Python : ON |  Sudo : ON |  Pkexec : ON
Directory :  /usr/local/ssl/lib/mysqlsh/lib/python3.9/site-packages/oci/auth/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /usr/local/ssl/lib/mysqlsh/lib/python3.9/site-packages/oci/auth/certificate_retriever.py
# coding: utf-8
# Copyright (c) 2016, 2024, Oracle and/or its affiliates.  All rights reserved.
# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.

from cryptography import x509
from cryptography.hazmat.backends import default_backend
from oci._vendor import requests
from oci._vendor.requests.exceptions import HTTPError
from oci.exceptions import ServiceError

import oci.retry
import os.path
from oci._vendor import six
import threading
import logging
import pprint

# A retry strategy for use when calling the metadata endpoint on an instance to retrieve certificates.
# This retry strategy will retry on 404, 429 and 5xx exceptions (the metadata endpoint does not throw errors like an
# OCI service) up to 3 times with ~30-31 seconds wait, for a max of 3 minutes
INSTANCE_METADATA_URL_CERTIFICATE_RETRIEVER_RETRY_STRATEGY = (
    oci.retry.RetryStrategyBuilder(service_error_check=True,
                                   service_error_retry_config={-1: [], 429: [], 404: []},
                                   service_error_retry_on_any_5xx=True,
                                   retry_max_wait_between_calls_seconds=31,
                                   retry_base_sleep_time_seconds=30,
                                   retry_exponential_growth_factor=1,
                                   total_elapsed_time_seconds=180
                                   )
    .add_max_attempts(3).add_total_elapsed_time().get_retry_strategy())


# An abstract class which defines the interface via which certificates (and their corresponding private key) can be retrieved.
# Implementors should define their own concrete fetching mechanism (e.g. a URL, from a file, from provided PEM-format strings)
# but they are expected to be able to vend information in various ways:
#
#   - A dictionary containing all certificate and private key information. This is defined as:
#       - certificate: the certificate PEM string
#       - private_key_pem: the private key PEM string
#       - private_key: the private key as a cryptography.io X509 certificate object
#   - The certificate as a cryptography.io X509 certificate object
#   - The certificate PEM string
#   - The private key PEM string
#   - The private key as a cryptography.io X509 certificate object
#
# Certificates are optionally refreshable via a refresh() method. Where the refresh may not make sense (e.g. if the PEM-format strings
# are provided directly), implementors should make refersh() a no-op rather than throwing an exception.
class AbstractCertificateRetriever(object):
    def __init__(self, **kwargs):
        self.certificate_and_private_key = {
            'certificate': None,
            'private_key_pem': None,
            'private_key': None
        }

    # Refreshes the certificates held by this object (e.g. because they have been rotated)
    def refresh(self):
        raise NotImplementedError('Subclasses should implement this')

    # Returns the certificate_and_private_key dictionary contained by this object
    def get_certificate_and_private_key(self):
        raise NotImplementedError('Subclasses should implement this')

    # Retrieves the certificate as a cryptography.x509.Certificate
    def get_certificate_as_certificate(self):
        raise NotImplementedError('Subclasses should implement this')

    # Retrieves a string containing the certificate contents
    def get_certificate_raw(self):
        raise NotImplementedError('Subclasses should implement this')

    # Retrievea a string containing the PEM-encoded private key
    def get_private_key_pem(self):
        raise NotImplementedError('Subclasses should implement this')

    # Retrieves the private key as a cryptography private key
    def get_private_key(self):
        raise NotImplementedError('Subclasses should implement this')


class UrlBasedCertificateRetriever(AbstractCertificateRetriever):
    """
    A certificate retriever which reads PEM-format strings from URLs.

    :param str certificate_url:
        The URL from which to retrieve a certificate. It is assumed that what we retrieve is the PEM-formatted string for the certificate.
        This is mandatory

    :param str private_key_url: (optional)
        The URL from which to retrieve the private key corresponding to certificate_url (if any). It is assumed that what we retrieve is the PEM-formatted string for
        the private key.

    :param str passphrase: (optional)
        The passphrase of the private key (if any).

    :param obj retry_strategy: (optional)
        A retry strategy to use when retrieving the certificate and private key from the URLs provided to this class. This should be one of the strategies available in
        the :py:mod:`~oci.retry` module. By default this retriever will not perform any retries.

    :param bool log_requests: (optional)
        log_request if set to True will log the request url and response data when retrieving
        the certificate & corresponding private key (if there is one defined for this retriever)

    **Note:** This class is used internally, it is not recommended for user's direct use.
    """

    READ_CHUNK_BYTES = 1024 * 1024  # A mebibyte

    def __init__(self, **kwargs):
        super(UrlBasedCertificateRetriever, self).__init__()

        if 'certificate_url' not in kwargs:
            raise TypeError('certificate_url must be supplied as a keyword argument')

        self.cert_url = kwargs['certificate_url']
        self.private_key_url = kwargs.get('private_key_url')
        self.passphrase = kwargs.get('passphrase')
        self.retry_strategy = kwargs.get('retry_strategy')

        self.logger = logging.getLogger("{}.{}".format(__name__, id(self)))
        self.logger.addHandler(logging.NullHandler())

        if kwargs.get('log_requests'):
            self.logger.disabled = False
            self.logger.setLevel(logging.DEBUG)
        else:
            self.logger.disabled = True

        if self.passphrase and isinstance(self.passphrase, six.text_type):
            self.passphrase = self.passphrase.encode('ascii')

        self._refresh_lock = threading.Lock()
        self.requests_session = requests.Session()
        if kwargs.get('headers', None):
            self.requests_session.headers.update(kwargs.get('headers'))

        self.refresh()

    def refresh(self):
        """
        Refresh the token by making a call to Identity for a new token.
        Returns the new token.
        """
        self._refresh_lock.acquire()
        try:
            if self.retry_strategy:
                self.retry_strategy.make_retrying_call(self._refresh_inner)
            else:
                self._refresh_inner()
        finally:
            self._refresh_lock.release()

    def get_certificate_and_private_key(self):
        """
        Returns the certificate_and_private_key dictionary contained by this object
        """
        self._refresh_lock.acquire()
        ret_val = self.certificate_and_private_key.copy()
        self._refresh_lock.release()

        return ret_val

    def get_certificate_as_certificate(self):
        """
        Retrieves the certificate as a cryptography.x509.Certificate
        """
        raw_cert = self.get_certificate_raw()
        if raw_cert:
            return x509.load_pem_x509_certificate(raw_cert, default_backend())
        else:
            return None

    def get_certificate_raw(self):
        """
        Retrieves a string containing the certificate contents
        """
        self._refresh_lock.acquire()
        certificate = self.certificate_and_private_key.copy()['certificate']
        self._refresh_lock.release()

        return certificate

    def get_private_key_pem(self):
        """
        Retrievea a string containing the PEM-encoded private key
        """
        self._refresh_lock.acquire()
        private_key = self.certificate_and_private_key.copy()['private_key_pem']
        self._refresh_lock.release()

        return private_key

    def get_private_key(self):
        """
        Retrieves the private key as a cryptography private key
        """
        self._refresh_lock.acquire()
        private_key = self.certificate_and_private_key.copy()['private_key']
        self._refresh_lock.release()

        return private_key

    def _refresh_inner(self):
        """
        Refreshes the certificate and its corresponding private key (if there is one defined for this retriever).
        This method represents the unit of retrying for the certificate retriever. It is intentionally coarse
        grained (e.g. if we retrieve the certificate but fail to retrieve the private key then we'll retry and
        retrieve both the certificate and private key again) to try and best maintain consistency in the data.

        For example, if we had separate retries for the certificate and the private key, in the scenario where
        a certificate was successfully retrieved but the private key failed, the private key we successfully
        retrieved upon retry may not relate to the certificate that we retrieved (e.g. because of rotation). This
        is still a risk in coarse grained retries, but hopefully a smaller one.
        """
        import oci.signer

        downloaded_certificate = six.BytesIO()
        self.logger.debug("Requesting certificate from : %s " % (self.cert_url))
        response = self.requests_session.get(self.cert_url, stream=True, timeout=(10, 60))
        self.logger.debug("Receiving certificate response......\n{}\n".format(pprint.pformat(
            {"status_code": response.status_code, "url": response.url, "header": dict(response.headers.items()),
                "reason": response.reason}, indent=2)))

        try:
            response.raise_for_status()
        except HTTPError as e:
            raise ServiceError(e.response.status_code, e.errno, e.response.headers, str(e))

        for chunk in response.raw.stream(self.READ_CHUNK_BYTES, decode_content=False):
            downloaded_certificate.write(chunk)

        self.certificate_and_private_key['certificate'] = downloaded_certificate.getvalue().strip()
        downloaded_certificate.close()
        if isinstance(self.certificate_and_private_key['certificate'], six.text_type):
            self.certificate_and_private_key['certificate'] = self.certificate_and_private_key['certificate'].encode('ascii')

        self._check_valid_certificate_string(self.certificate_and_private_key['certificate'])

        if self.private_key_url:
            downloaded_private_key_raw = six.BytesIO()
            self.logger.debug("Requesting private key from : %s " % (self.private_key_url))
            response = self.requests_session.get(self.private_key_url, stream=True, timeout=(10, 60))

            self.logger.debug("Receiving private key response......\n{}\n".format(pprint.pformat(
                {"status_code": response.status_code, "url": response.url, "header": dict(response.headers.items()),
                    "reason": response.reason}, indent=2)))

            try:
                response.raise_for_status()
            except HTTPError as e:
                raise ServiceError(e.response.status_code, e.errno, e.response.headers, str(e))

            for chunk in response.raw.stream(self.READ_CHUNK_BYTES, decode_content=False):
                downloaded_private_key_raw.write(chunk)

            self.certificate_and_private_key['private_key_pem'] = downloaded_private_key_raw.getvalue().strip()
            downloaded_private_key_raw.close()

            if isinstance(self.certificate_and_private_key['private_key_pem'], six.text_type):
                self.certificate_and_private_key['private_key_pem'] = self.certificate_and_private_key['private_key_pem'].encode('ascii')

            try:
                self.certificate_and_private_key['private_key'] = oci.signer.load_private_key(
                    self.certificate_and_private_key['private_key_pem'],
                    self.passphrase
                )
            except oci.exceptions.InvalidPrivateKey:
                raise InvalidCertificateFromInstanceMetadataError(
                    certificate_type='private_key',
                    certificate_raw=self.certificate_and_private_key['private_key_pem']
                )

    def _check_valid_certificate_string(self, certificate_string_to_check):
        """
        Determines whether a given string is a valid certificate. Valid in this context means that it
        can be parsed into a cryptography.io X509 certificate object. If the string is not valid then
        this method will throw an exception.

        :param str certificate_string_to_check:
            The certificate string to check. If it is valid then it should be a PEM-formatted string
            and able to be parsed into a cryptography.io X509 certificate object
        """
        try:
            x509.load_pem_x509_certificate(certificate_string_to_check, default_backend())
        except ValueError:
            raise InvalidCertificateFromInstanceMetadataError(
                certificate_type='certificate',
                certificate_raw=certificate_string_to_check
            )


class PEMStringCertificateRetriever(AbstractCertificateRetriever):
    """
    A certificate retriever which is provided PEM format strings directly. This retriever is non-refreshable, and calling refresh() is a no-op.

    :param str certificate_pem:
        The PEM-formatted string of the certificate. This is mandatory.

    :param str private_key_pem (optional):
        The PEM-formatted string of the private key corresponding to certificate_pem (if any).

    :param str passphrase (optional):
        The passphrase of the private key (if any).

    **Note:** This class is used internally, it is not recommended for user's direct use.
    """

    def __init__(self, **kwargs):
        import oci.signer

        super(PEMStringCertificateRetriever, self).__init__()

        if 'certificate_pem' not in kwargs:
            raise TypeError('certificate_pem must be supplied as a keyword argument')

        if isinstance(kwargs['certificate_pem'], six.text_type):
            self.certificate_and_private_key['certificate'] = kwargs['certificate_pem'].encode('ascii')
        else:
            self.certificate_and_private_key['certificate'] = kwargs['certificate_pem']

        if 'private_key_pem' in kwargs and kwargs['private_key_pem']:
            if isinstance(kwargs['private_key_pem'], six.text_type):
                self.certificate_and_private_key['private_key_pem'] = kwargs['private_key_pem'].encode('ascii')
            else:
                self.certificate_and_private_key['private_key_pem'] = kwargs['private_key_pem']

            if 'passphrase' in kwargs and kwargs['passphrase']:
                passphrase = kwargs['passphrase'].encode('ascii')
            else:
                passphrase = None

            self.certificate_and_private_key['private_key'] = oci.signer.load_private_key(
                self.certificate_and_private_key['private_key_pem'],
                passphrase
            )

    def refresh(self):
        """
        Since these are just the string, there is no refresh as such. A new object should be created
        if the strings change
        """
        pass

    def get_certificate_and_private_key(self):
        """
        Returns the certificate_and_private_key dictionary contained by this object
        """
        return self.certificate_and_private_key.copy()

    def get_certificate_as_certificate(self):
        """
        Retrieves the certificate as a cryptography.x509.Certificate
        """
        return x509.load_pem_x509_certificate(self.certificate_and_private_key['certificate'], default_backend())

    def get_certificate_raw(self):
        """
        Retrieves a string containing the certificate contents
        """
        return self.certificate_and_private_key['certificate']

    def get_private_key_pem(self):
        """
        Retrieve a a string containing the PEM-encoded private key
        """
        return self.certificate_and_private_key['private_key_pem']

    def get_private_key(self):
        """
        Retrieves the private key as a cryptography private key
        """
        return self.certificate_and_private_key['private_key']


class FileBasedCertificateRetriever(PEMStringCertificateRetriever):
    """
    A specialization of PEMStringCertificateRetriever which reads certificates from a file. This retriever is non-refreshable, and calling refresh() is a no-op.

    :param str certificate_file_path:
        The file path from which to retrieve a certificate. It is assumed that what we retrieve is the PEM-formatted string for the certificate.
        This is mandatory

    :param str private_key_pem_file_path (optional):
        The file path from which to retrieve the private key corresponding to certificate_file_path (if any). It is assumed that what we retrieve is the PEM-formatted string for
        the private key.

    :param str passphrase (optional):
        The passphrase of the private key (if any).

    **Note:** This class is used internally, it is not recommended for user's direct use.
    """

    def __init__(self, **kwargs):
        if 'certificate_file_path' not in kwargs:
            raise TypeError('certificate_file_path must be supplied as a keyword argument')

        if 'private_key_pem_file_path' in kwargs:
            private_key_pem = self._load_data_from_file(kwargs['private_key_pem_file_path'])
        else:
            private_key_pem = None

        parent_class_kwargs = {
            'certificate_pem': self._load_data_from_file(kwargs['certificate_file_path']),
            'private_key_pem': private_key_pem,
            'passphrase': kwargs.get('passphrase')
        }

        super(FileBasedCertificateRetriever, self).__init__(**parent_class_kwargs)

    def _load_data_from_file(self, filename):
        filename = os.path.expanduser(filename)
        with open(filename, mode="rb") as f:
            cert_data = f.read().strip()

        return cert_data


class InvalidCertificateFromInstanceMetadataError(Exception):
    def __init__(self, certificate_type='certificate', certificate_raw=None):
        self.certificate_raw = certificate_raw
        self.certificate_type = certificate_type
        super(InvalidCertificateFromInstanceMetadataError, self).__init__({
            'message': 'Invalid certificate returned from instance metadata. Expected a PEM-formatted string',
            'certificate_type': self.certificate_type,
            'certificate_raw': self.certificate_raw
        })

Youez - 2016 - github.com/yon3zu
LinuXploit