403Webshell
Server IP : 103.119.228.120  /  Your IP : 3.129.247.250
Web Server : Apache
System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User : nobody ( 99)
PHP Version : 5.6.40
Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime
MySQL : ON |  cURL : ON |  WGET : ON |  Perl : ON |  Python : ON |  Sudo : ON |  Pkexec : ON
Directory :  /usr/lib/mysqlsh/lib/python3.9/site-packages/oci/auth/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /usr/lib/mysqlsh/lib/python3.9/site-packages/oci/auth/federation_client.py
# coding: utf-8
# Copyright (c) 2016, 2024, Oracle and/or its affiliates.  All rights reserved.
# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.

from . import auth_utils
from .security_token_container import SecurityTokenContainer
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.hazmat.primitives.hashes import SHA1

from oci._vendor import requests

import oci.retry
import oci.signer
import threading
import logging
import pprint


class X509FederationClient(object):
    REQUIRED_INIT_KWARGS = [
        'federation_endpoint',
        'tenancy_id',
        'session_key_supplier',
        'leaf_certificate_retriever'
    ]

    def __init__(self, **kwargs):
        """
        A client which can be used to retrieve a token from Auth Service. It needs the following supplied to it:

            - The endpoint for Auth Service
            - Our tenancy OCID
            - A session key supplier so that we can send its public key as part of the token request. The private key
            in the session key supplier should be used to sign all requests made with the token
            - The certificate (via leaf_certificate_retriever) which will be used to sign the requests to Auth Service.

        Optionally, intermediate certificates (if present) can be supplied as part of the request to Auth Service.

        The client has knowledge of its last requested token and can re-request the token if it is expired (otherwise
        it will vend the last requested token if it is not expired).

        :param str federation_endpoint:
            The Auth Service endpoint from which to retrieve the token.

        :param str tenancy_id:
            The OCID of the tenancy whose resources will be interacted with by users of the token.

        :param SessionKeySupplier session_key_supplier:
            A SessionKeySupplier that can vend a public and private key. The public key will be sent as part of the token
            request and the private key should be used to sign all requests made with the token vended by this client.

        :param CertificateRetriever leaf_certificate_retriever:
            The certificate which will be used to sign requests to Auth Service.

        :param list[CertificateRetriever] intermediate_certificate_retrievers: (optional)
            A list of retrievers which can be used to fetch intermediate certificates which can be sent as part of the Auth Service request. This is an optional parameter

        :param cert_bundle_verify: (optional)
            If we need a specific cert bundle in order to perform verification against the federation endpoint, this parameter is the path to that bundle. Alternatively,
            False can be passed to disable verification.
        :type cert_bundle_verify: str or Boolean

        :param obj retry_strategy: (optional)
            A retry strategy to apply to calls made by this client. This should be one of the strategies available in
            the :py:mod:`~oci.retry` module. A convenience :py:data:`~oci.retry.DEFAULT_RETRY_STRATEGY` is also available and
            will be used if no explicit retry strategy is specified.

            The specifics of the default retry strategy are described `here <https://docs.oracle.com/en-us/iaas/tools/python/latest/sdk_behaviors/retries.html>`__.

            To have this operation explicitly not perform any retries, pass an instance of :py:class:`~oci.retry.NoneRetryStrategy`.

        :param bool log_requests: (optional)
        log_request if set to True, will log the request url and response data when retrieving
        the token from the federation endpoint.
        """

        kwarg_keys = kwargs.keys()
        missing_keys = []

        for required in self.REQUIRED_INIT_KWARGS:
            if required not in kwarg_keys:
                missing_keys.append(required)
            elif not kwargs[required]:
                missing_keys.append(required)

        if missing_keys:
            raise TypeError('The following required arguments were not provided: {}'.format(', '.join(missing_keys)))

        self.federation_endpoint = kwargs['federation_endpoint']
        self.tenancy_id = kwargs['tenancy_id']
        self.session_key_supplier = kwargs['session_key_supplier']
        self.leaf_certificate_retriever = kwargs['leaf_certificate_retriever']

        # The default (instance principal) purpose is None
        self.purpose = None
        if 'purpose' in kwargs and kwargs['purpose'] is not None:
            self.purpose = kwargs['purpose']

        self.logger = logging.getLogger("{}.{}".format(__name__, id(self)))
        self.logger.addHandler(logging.NullHandler())
        if kwargs.get('log_requests'):
            self.logger.disabled = False
            self.logger.setLevel(logging.DEBUG)
        else:
            self.logger.disabled = True

        if 'intermediate_certificate_retrievers' in kwargs and kwargs['intermediate_certificate_retrievers']:
            self.intermediate_certificate_retrievers = kwargs['intermediate_certificate_retrievers']
        else:
            self.intermediate_certificate_retrievers = []

        self.cert_bundle_verify = kwargs.get('cert_bundle_verify', None)
        self._refresh_lock = threading.Lock()

        retry_strategy = kwargs.get('retry_strategy', None)
        if retry_strategy:
            self.retry_strategy = retry_strategy
        else:
            self.retry_strategy = oci.retry.DEFAULT_RETRY_STRATEGY

        self.requests_session = requests.Session()

    def refresh_security_token(self):
        return self._refresh_security_token_inner()

    def get_security_token(self):
        if hasattr(self, 'security_token'):
            if self.security_token.valid_with_jitter():
                return self.security_token.security_token

        return self._refresh_security_token_inner()

    def _refresh_security_token_inner(self):
        self._refresh_lock.acquire()
        try:
            self.session_key_supplier.refresh()
            self.leaf_certificate_retriever.refresh()
            # for the default (instance principal) purpose, verify tenancy id matches
            if self.purpose is None:
                updated_tenancy_id = auth_utils.get_tenancy_id_from_certificate(self.leaf_certificate_retriever.get_certificate_as_certificate())
                if updated_tenancy_id != self.tenancy_id:
                    raise RuntimeError('Unexpected update of tenancy OCID in the leaf certificate. Previous tenancy: {}, Updated: {}'.format(self.tenancy_id, updated_tenancy_id))

            for retriever in self.intermediate_certificate_retrievers:
                retriever.refresh()

            self.retry_strategy.make_retrying_call(self._get_security_token_from_auth_service)
            return self.security_token.security_token
        finally:
            self._refresh_lock.release()

    def _get_security_token_from_auth_service(self):
        request_payload = {
            'certificate': auth_utils.sanitize_certificate_string(self.leaf_certificate_retriever.get_certificate_raw()),
            'publicKey': auth_utils.sanitize_certificate_string(self.session_key_supplier.get_key_pair()['public'].public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo))
        }
        # The default (instance principal) purpose is None
        if self.purpose is not None:
            request_payload['purpose'] = self.purpose

        if self.intermediate_certificate_retrievers:
            retrieved_certs = []
            for retriever in self.intermediate_certificate_retrievers:
                retrieved_certs.append(auth_utils.sanitize_certificate_string(retriever.get_certificate_raw()))

            request_payload['intermediateCertificates'] = retrieved_certs

        certificate = self.leaf_certificate_retriever.get_certificate_as_certificate()
        fingerprint = ":".join("{:02X}".format(ch) for ch in bytearray(certificate.fingerprint(SHA1())))
        signer = AuthTokenRequestSigner(self.tenancy_id, fingerprint, self.leaf_certificate_retriever)

        self.logger.debug("Requesting token from : %s " % (self.federation_endpoint))
        response = self.requests_session.post(self.federation_endpoint, json=request_payload, auth=signer, verify=self.cert_bundle_verify, timeout=(10, 60))
        self.logger.debug("Receiving token response......\n{}\n".format(pprint.pformat(
            {"status_code": response.status_code, "url": response.url, "header": dict(response.headers.items()),
                "reason": response.reason}, indent=2)))

        parsed_response = None
        try:
            parsed_response = response.json()
        except ValueError:
            error_text = 'Unable to parse response from auth service ({}): {}'.format(self.federation_endpoint, response.text)

            # If the response was a 2xx but unparseable, raise it straight away because it implies a potential service issue. If
            # we have a non-2xx but it is not parseable that is a more ambiguous scenario (e.g. could have been an issue with a
            # proxy or LB and those generally won't emit a JSON response) so throw it out a ServiceError so it can fall into
            # retry logic (depending on the error code)
            if response.ok:
                raise RuntimeError(error_text)
            else:
                raise oci.exceptions.ServiceError(
                    response.status_code,
                    response.reason,
                    response.headers,
                    error_text
                )

        if not response.ok:
            raise oci.exceptions.ServiceError(
                response.status_code,
                parsed_response.get('code'),
                response.headers,
                parsed_response.get('message')
            )
        else:
            if 'token' in parsed_response:
                self.security_token = SecurityTokenContainer(self.session_key_supplier, response.json()['token'])
            else:
                raise RuntimeError('Could not find token in response from auth service ({}): {}'.format(self.federation_endpoint, parsed_response))


class AuthTokenRequestSigner(oci.signer.AbstractBaseSigner):
    """
    A signer intended for X509FederationClient's use to request a token from Auth Service. Not intended for general use.
    """

    def __init__(self, tenancy_id, fingerprint, private_key_certificate_retriever):
        self.api_key = '{}/fed-x509/{}'.format(tenancy_id, fingerprint)
        self.private_key_certificate_retriever = private_key_certificate_retriever

        generic_headers = ["date", "(request-target)"]
        body_headers = ["content-length", "content-type", "x-content-sha256"]
        self.create_signers(self.api_key, self.private_key_certificate_retriever.get_private_key(), generic_headers, body_headers)

Youez - 2016 - github.com/yon3zu
LinuXploit