403Webshell
Server IP : 103.119.228.120  /  Your IP : 18.116.12.7
Web Server : Apache
System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User : nobody ( 99)
PHP Version : 5.6.40
Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime
MySQL : ON |  cURL : ON |  WGET : ON |  Perl : ON |  Python : ON |  Sudo : ON |  Pkexec : ON
Directory :  /usr/local/ssl/lib/mysqlsh/lib/python3.9/site-packages/oci/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /usr/local/ssl/lib/mysqlsh/lib/python3.9/site-packages/oci/base_client.py
# coding: utf-8
# Copyright (c) 2016, 2024, Oracle and/or its affiliates.  All rights reserved.
# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.

from __future__ import absolute_import
import json
import logging
import platform

import circuitbreaker
import pytz
import random
import os
import re
import string
import uuid
import copy
# This was added to address thread safety issues with datetime.strptime
# See https://bugs.python.org/issue7980.
import _strptime  # noqa: F401
from datetime import date, datetime, timezone
from timeit import default_timer as timer
from ._vendor import requests, six, urllib3, sseclient
from dateutil.parser import parse
from dateutil import tz

import functools
from six.moves.http_client import HTTPResponse

from . import constants, exceptions, regions, retry
from .auth import signers
from .config import get_config_value_or_default, validate_config
from .request import Request
from .response import Response
from .circuit_breaker import CircuitBreakerStrategy, NoCircuitBreakerStrategy
from circuitbreaker import CircuitBreaker, CircuitBreakerMonitor
from .version import __version__
from .util import NONE_SENTINEL, Sentinel, extract_service_endpoint
missing = Sentinel("Missing")
APPEND_USER_AGENT_ENV_VAR_NAME = "OCI_SDK_APPEND_USER_AGENT"
OCI_REALM_SPECIFIC_SERVICE_ENDPOINT_TEMPLATE_ENABLED = "OCI_REALM_SPECIFIC_SERVICE_ENDPOINT_TEMPLATE_ENABLED"
APPEND_USER_AGENT = os.environ.get(APPEND_USER_AGENT_ENV_VAR_NAME)
USER_INFO = "Oracle-PythonSDK/{}".format(__version__)

DICT_VALUE_TYPE_REGEX = re.compile(r'dict\(str, (.+?)\)$')  # noqa: W605
LIST_ITEM_TYPE_REGEX = re.compile(r'list\[(.+?)\]$')  # noqa: W605
TROUBLESHOOT_URL = 'https://docs.oracle.com/en-us/iaas/Content/API/Concepts/sdk_troubleshooting.htm'

# Expect header is enabled by default
enable_expect_header = True
expect_header_env_var = os.environ.get('OCI_PYSDK_USING_EXPECT_HEADER', True)
if isinstance(expect_header_env_var, six.string_types) and expect_header_env_var.lower() == "false":
    enable_expect_header = False


def merge_type_mappings(*dictionaries):
    merged = {}
    for dictionary in dictionaries:
        merged.update(dictionary)
    return merged


def build_user_agent(extra=""):
    agent = "{} (python {}; {}-{}) {}".format(
        USER_INFO,
        platform.python_version(),
        platform.machine(),
        platform.system(),
        (extra or "")
    )
    agent = agent.strip()
    if APPEND_USER_AGENT:
        agent += " {}".format(APPEND_USER_AGENT)
    return agent


def utc_now():
    return " " + str(datetime.utcnow()) + ": "


def is_http_log_enabled(is_enabled):
    if is_enabled:
        six.moves.http_client.HTTPConnection.debuglevel = 1
    else:
        six.moves.http_client.HTTPConnection.debuglevel = 0


def _sanitize_headers_for_requests(headers):
    # Requests does not accept int or float values headers
    # Convert int, float and bool to string
    # Bools are automatically handled with this as bool is a subclass of int
    for header_name, header_value in six.iteritems(headers):
        if isinstance(header_value, six.integer_types) or isinstance(header_value, float):
            headers[header_name] = str(header_value)
    return headers


STREAM_RESPONSE_TYPE = 'stream'
BYTES_RESPONSE_TYPE = 'bytes'
SSE_RESPONSE_HEADER_VALUE = 'text/event-stream'
APPLICATION_JSON_CONTENT_HEADER_VALUE = 'application/json'

# Default timeout value(second)
DEFAULT_CONNECTION_TIMEOUT = 10.0
DEFAULT_READ_TIMEOUT = 60.0

# The keys here correspond to the Swagger collection format values described here: https://swagger.io/docs/specification/2-0/describing-parameters/
# and the values represent delimiters we'll use between values of the collection when placing those values in the query string.
#
# Note that the 'multi' type has no delimiter since in the query string we'll want to repeat the same query string param key but
# with different values each time (e.g. myKey=val1&myKey=val2), whereas for the other types we will only pass in a single
# key=value in the query string, where the "value" is the members of the collecction with a given delimiter.
VALID_COLLECTION_FORMAT_TYPES = {
    'multi': None,
    'csv': ',',
    'tsv': '\t',
    'ssv': ' ',
    'pipes': '|'
}


def _read_all_headers(fp):
    current = None
    while current != b'\r\n':
        current = fp.readline()


def _to_bytes(input_buffer):
    bytes_buffer = []
    for chunk in input_buffer:
        if isinstance(chunk, six.text_type):
            bytes_buffer.append(chunk.encode('utf-8'))
        else:
            bytes_buffer.append(chunk)
    msg = b"\r\n".join(bytes_buffer)
    return msg


def _is_100_continue(line):
    parts = line.split(None, 2)
    return (
        len(parts) >= 3 and parts[0].startswith(b'HTTP/') and
        parts[1] == b'100')


class OCIHTTPResponse(HTTPResponse):

    def __init__(self, *args, **kwargs):
        self._status_tuple = kwargs.pop('status_tuple')
        HTTPResponse.__init__(self, *args, **kwargs)

    def _read_status(self):
        if self._status_tuple is not None:
            status_tuple = self._status_tuple
            self._status_tuple = None
            return status_tuple
        else:
            return HTTPResponse._read_status(self)


class OCIConnection(urllib3.connection.VerifiedHTTPSConnection):
    """ HTTPConnection with 100 Continue support. """

    def __init__(self, *args, **kwargs):
        super(OCIConnection, self).__init__(*args, **kwargs)
        self._original_response_cls = self.response_class
        self._response_received = False
        self._using_expect_header = False
        self.logger = logging.getLogger("{}.{}".format(__name__, id(self)))

    def _send_request(self, method, url, body, headers, *args, **kwargs):
        self._response_received = False
        if headers.get('expect', '') == '100-continue':
            if self.debuglevel > 0:
                print('Using Expect header...')
            self._using_expect_header = True
        else:
            if self.debuglevel > 0:
                print('Not using Expect header...')
            self._using_expect_header = False
            self.response_class = self._original_response_cls
        rval = super(OCIConnection, self)._send_request(
            method, url, body, headers, *args, **kwargs)
        self._expect_header_set = False
        return rval

    def _send_output(self, message_body=None, *args, **kwargs):
        self._buffer.extend((b"", b""))
        msg = _to_bytes(self._buffer)
        del self._buffer[:]

        if not self._using_expect_header:
            if isinstance(message_body, bytes):
                msg += message_body
                message_body = None

        self.send(msg)

        if self._using_expect_header:
            if self.debuglevel > 0:
                print('Waiting 3 seconds for 100-continue response...')
            if urllib3.util.wait_for_read(self.sock, 3):
                self._handle_expect_response(message_body)
                return

        if message_body is not None:
            if self.debuglevel > 0:
                print('Timeout waiting for 100-continue response, sending message body...')
            self.send(message_body)

    def _handle_expect_response(self, message_body):
        fp = self.sock.makefile('rb', 0)
        try:
            line = fp.readline()
            parts = line.split(None, 2)
            if self.debuglevel > 0:
                print(line)
            if _is_100_continue(line):
                if self.debuglevel > 0:
                    print('Received 100-continue response, sending message body...')
                _read_all_headers(fp)
                self._send_message_body(message_body)
            elif len(parts) == 3 and parts[0].startswith(b'HTTP/'):
                if self.debuglevel > 0:
                    print('Received non-100-continue response, abort request...')
                status_tuple = (parts[0].decode('ascii'),
                                int(parts[1]), parts[2].decode('ascii'))
                response_class = functools.partial(
                    OCIHTTPResponse, status_tuple=status_tuple)
                self.response_class = response_class
                self._response_received = True
        finally:
            fp.close()

    def _send_message_body(self, message_body):
        if message_body is not None:
            self.send(message_body)

    def send(self, str):
        if self._response_received:
            return
        return super(OCIConnection, self).send(str)


class OCIConnectionPool(urllib3.HTTPSConnectionPool):
    ConnectionCls = OCIConnection
    """ HTTPConnectionPool with 100 Continue support. """


# Replace the HTTPS connection pool with OCIConnectionPool once the env var `OCI_PYSDK_USING_EXPECT_HEADER` is not set
# to "FALSE"
if enable_expect_header:
    urllib3.poolmanager.pool_classes_by_scheme["https"] = OCIConnectionPool


class BaseClient(object):
    primitive_type_map = {
        'int': int,
        'float': float,
        'str': six.u,
        'bool': bool,
        'date': date,
        'datetime': datetime,
        "object": object
    }

    ALLOW_CONTROL_CHARACTERS = False

    def __init__(self, service, config, signer, type_mapping, **kwargs):
        validate_config(config, signer=signer)
        self.signer = signer

        self.config = config
        # Default to true (is a regional client) if there is nothing explicitly set. Regional
        # clients allow us to call set_region and that'll also set the endpoint. For non-regional
        # clients we require an endpoint
        self.regional_client = kwargs.get('regional_client', True)

        self._endpoint = None
        self._base_path = kwargs.get('base_path')
        self.service_endpoint_template = kwargs.get('service_endpoint_template')
        self.service_endpoint_template_per_realm = kwargs.get('service_endpoint_template_per_realm')
        self.endpoint_service_name = kwargs.get('endpoint_service_name')

        # By default self._allow_control_chars will be None. The user would need to explicitly set it to True or False
        self._allow_control_chars = kwargs.get('allow_control_chars')

        self._client_level_realm_specific_endpoint_template_enabled = kwargs.get('client_level_realm_specific_endpoint_template_enabled')  # default this to None as it should be an opt-in feature

        if self.regional_client:
            if kwargs.get('service_endpoint'):
                self.endpoint = kwargs.get('service_endpoint')
            else:
                region_to_use = None
                if 'region' in config and config['region']:
                    region_to_use = config.get('region')
                elif hasattr(signer, 'region'):
                    region_to_use = signer.region

                endpoint_template = self.handle_service_endpoint_template(region_to_use, self.service_endpoint_template, self.service_endpoint_template_per_realm)

                self.endpoint = regions.endpoint_for(
                    service,
                    service_endpoint_template=endpoint_template,
                    region=region_to_use,
                    endpoint=config.get('endpoint'),
                    endpoint_service_name=self.endpoint_service_name)
        else:
            if not kwargs.get('service_endpoint'):
                raise exceptions.MissingEndpointForNonRegionalServiceClientError('An endpoint must be provided for a non-regional service client')
            self.endpoint = kwargs.get('service_endpoint')

        self.service = service
        self.complex_type_mappings = type_mapping
        self.type_mappings = merge_type_mappings(self.primitive_type_map, type_mapping)
        self.session = requests.Session()

        # If the user doesn't specify timeout explicitly we would use default timeout.
        self.timeout = kwargs.get('timeout') if 'timeout' in kwargs else (DEFAULT_CONNECTION_TIMEOUT, DEFAULT_READ_TIMEOUT)

        self.user_agent = build_user_agent(get_config_value_or_default(config, "additional_user_agent"))

        self.logger = logging.getLogger("{}.{}".format(__name__, id(self)))
        self.logger.addHandler(logging.NullHandler())
        if get_config_value_or_default(config, "log_requests"):
            self.logger.disabled = False
            self.logger.setLevel(logging.DEBUG)
            is_http_log_enabled(True)
        else:
            self.logger.disabled = True
            is_http_log_enabled(False)

        self.skip_deserialization = kwargs.get('skip_deserialization')

        # Circuit Breaker at client level
        self.circuit_breaker_strategy = kwargs.get('circuit_breaker_strategy')
        self.circuit_breaker_name = None
        # Log if Circuit Breaker Strategy is not enabled or if using Default Circuit breaker Strategy
        if self.circuit_breaker_strategy is None or isinstance(self.circuit_breaker_strategy, NoCircuitBreakerStrategy):
            self.logger.debug('No circuit breaker strategy enabled!')
        else:
            # Enable Circuit breaker if a valid circuit breaker strategy is available
            if not isinstance(self.circuit_breaker_strategy, CircuitBreakerStrategy):
                raise TypeError('Invalid Circuit Breaker Strategy!')
            self.circuit_breaker_name = str(uuid.uuid4()) if self.circuit_breaker_strategy.name is None else self.circuit_breaker_strategy.name
            # Re-use Circuit breaker if sharing a Circuit Breaker Strategy.
            circuit_breaker = CircuitBreakerMonitor.get(self.circuit_breaker_name)
            if circuit_breaker is None:
                circuit_breaker = CircuitBreaker(
                    failure_threshold=self.circuit_breaker_strategy.failure_threshold,
                    recovery_timeout=self.circuit_breaker_strategy.recovery_timeout,
                    expected_exception=self.circuit_breaker_strategy.expected_exception,
                    name=self.circuit_breaker_name
                )
            # Equivalent to decorating the request function with Circuit Breaker
            self.request = circuit_breaker(self.request)
        self.logger.debug('Endpoint: {}'.format(self._endpoint))

    def handle_service_endpoint_template(self, region_id, service_endpoint_template, service_endpoint_template_per_realm):
        should_enable_realm_template = self.should_allow_template_per_realm()

        if should_enable_realm_template:
            if region_id in regions.REGION_REALMS:
                realm = regions.REGION_REALMS[region_id]
                if realm in service_endpoint_template_per_realm:
                    return service_endpoint_template_per_realm[realm]

        return service_endpoint_template

    @property
    def endpoint(self):
        return self._endpoint

    @endpoint.setter
    def endpoint(self, endpoint):
        if self._base_path == '/':
            # If it's just the root path then use the endpoint as-is
            self._endpoint = endpoint
        elif self._base_path and not (endpoint.endswith(self._base_path) or endpoint.endswith('{}/'.format(self._base_path))):
            # Account for formats like https://iaas.us-phoenix-1.oraclecloud.com/20160918 and
            # https://iaas.us-phoenix-1.oraclecloud.com/20160918/ as they should both be fine
            self._endpoint = '{}{}'.format(endpoint, self._base_path)
        else:
            self._endpoint = endpoint

    def get_endpoint(self):
        return extract_service_endpoint(self._endpoint)

    def set_region(self, region):
        if self.regional_client:
            service_endpoint_template = self.handle_service_endpoint_template(region, self.service_endpoint_template, self.service_endpoint_template_per_realm)  # check what template to use
            self.endpoint = regions.endpoint_for(self.service, service_endpoint_template=service_endpoint_template, region=region, endpoint_service_name=self.endpoint_service_name)
        else:
            raise TypeError('Setting the region is not allowed for non-regional service clients. You must instead set the endpoint')

    @property
    def allow_control_chars(self):
        return self._allow_control_chars

    @allow_control_chars.setter
    def allow_control_chars(self, bool):
        self._allow_control_chars = bool

    @property
    def client_level_realm_specific_endpoint_template_enabled(self):
        return self._client_level_realm_specific_endpoint_template_enabled

    @client_level_realm_specific_endpoint_template_enabled.setter
    def client_level_realm_specific_endpoint_template_enabled(self, bool):
        self._client_level_realm_specific_endpoint_template_enabled = bool

        # Recalculate the endpoint since service endpoint template per realm is toggled
        region_to_use = None
        if 'region' in self.config and self.config['region']:
            region_to_use = self.config.get('region')
        elif hasattr(self.signer, 'region'):
            region_to_use = self.signer.region

        service_endpoint_template = self.handle_service_endpoint_template(region_to_use, self.service_endpoint_template, self.service_endpoint_template_per_realm)

        self.endpoint = regions.endpoint_for(
            self.service,
            service_endpoint_template=service_endpoint_template,
            region=region_to_use,
            endpoint=self.config.get('endpoint'),
            endpoint_service_name=self.endpoint_service_name)

    def is_instance_principal_or_resource_principal_signer(self):
        if (isinstance(self.signer, signers.InstancePrincipalsSecurityTokenSigner) or
                isinstance(self.signer, signers.ResourcePrincipalsFederationSigner) or
                isinstance(self.signer, signers.EphemeralResourcePrincipalSigner) or
                isinstance(self.signer, signers.EphemeralResourcePrincipalV21Signer) or
                isinstance(self.signer, signers.NestedResourcePrincipals) or
                isinstance(self.signer, signers.OkeWorkloadIdentityResourcePrincipalSigner)):
            return True
        else:
            return False

    def call_api(self, resource_path, method,
                 path_params=None,
                 query_params=None,
                 header_params=None,
                 body=None,
                 response_type=None,
                 enforce_content_headers=True,
                 allow_control_chars=None,
                 operation_name=None,
                 api_reference_link=None,
                 required_arguments=[]):
        """
        Makes the HTTP request and return the deserialized data.

        :param resource_path: Path to the resource (e.g. /instance)
        :param method: HTTP method
        :param path_params: (optional) Path parameters in the url.
        :param query_params: (optional) Query parameters in the url.
        :param header_params: (optional) Request header params.
        :param body: (optional) Request body.
        :param response_type: (optional) Response data type.
        :param enforce_content_headers: (optional) Whether content headers should be added for
            PUT and POST requests when not present.  Defaults to True.
        :param allow_control_chars: (optional) Boolean that allows whether or not the response object can contain control chars
        :param operation_name: (optional) String that represents the operational name of the API call.
        :param api_reference_link: (optional) String that represents the link to the API reference page for this operation.
        :return: A Response object, or throw in the case of an error.

        """

        if header_params:
            # Remove expect header if user has disabled it, or if the operation is not PUT, POST or PATCH
            if not enable_expect_header or method.lower() not in ["put", "post", "patch"]:
                map_lowercase_header_params_keys_to_actual_keys = {k.lower(): k for k in header_params}
                if "expect" in map_lowercase_header_params_keys_to_actual_keys:
                    header_params.pop(map_lowercase_header_params_keys_to_actual_keys.get("expect"), None)

            header_params = self.sanitize_for_serialization(header_params)

        header_params = header_params or {}

        # All the headers have been prepared for serialization at this point
        header_params = _sanitize_headers_for_requests(header_params)

        header_params[constants.HEADER_CLIENT_INFO] = USER_INFO
        header_params[constants.HEADER_USER_AGENT] = self.user_agent

        if header_params.get(constants.HEADER_REQUEST_ID, missing) is missing:
            header_params[constants.HEADER_REQUEST_ID] = self.build_request_id()

        # This allows for testing with "fake" database resources.
        opc_host_serial = os.environ.get('OCI_DB_OPC_HOST_SERIAL')
        if opc_host_serial:
            header_params['opc-host-serial'] = opc_host_serial

        if path_params:
            path_params = self.sanitize_for_serialization(path_params)
            for k, v in path_params.items():
                replacement = six.moves.urllib.parse.quote(str(self.to_path_value(v)))
                resource_path = resource_path.\
                    replace('{' + k + '}', replacement)

        if query_params:
            query_params = self.process_query_params(query_params)

        if body is not None and header_params.get('content-type') == 'application/json':
            body = self.sanitize_for_serialization(body)
            body = json.dumps(body)

        # Here is where we will change our endpoint with the path / query params if a {serviceParam} exists on the endpoint
        endpoint = self.handle_service_params_in_endpoint(path_params, query_params, required_arguments)
        url = endpoint + resource_path

        request = Request(
            method=method,
            url=url,
            query_params=query_params,
            header_params=header_params,
            body=body,
            response_type=response_type,
            enforce_content_headers=enforce_content_headers
        )

        if self.is_instance_principal_or_resource_principal_signer():
            call_attempts = 0
            while call_attempts < 2:
                try:
                    return self.request(request, allow_control_chars, operation_name, api_reference_link)
                except exceptions.ServiceError as e:
                    call_attempts += 1
                    if e.status == 401 and call_attempts < 2:
                        self.signer.refresh_security_token()
                    else:
                        raise
        else:
            start = timer()
            response = self.request(request, allow_control_chars, operation_name, api_reference_link)
            end = timer()
            self.logger.debug('time elapsed for request: {}'.format(str(end - start)))
            return response

    def map_service_params_to_values(self, service_params_url, path_params, query_params, required_arguments):
        service_params_map = {}
        service_params = set(filter(None, ("".join(service_params_url.replace(".", "").split("{"))).split("}")))
        for service_param in service_params:
            should_append_dot = False
            # # Check if there is a +Dot, if so we need to append a dot after appending the service_param
            if "+Dot" in service_param:
                should_append_dot = True
                dot_idx = service_param.find("+Dot")
                service_param = service_param[0:dot_idx]  # remove the +Dot in service param

            if service_param in required_arguments and (service_param in path_params or service_param in query_params):
                value = path_params[service_param] if service_param in path_params else query_params[service_param]
                if should_append_dot:
                    value += "."
                    service_params_map[service_param + "+Dot"] = value
                else:
                    service_params_map[service_param] = value
            else:  # If service_param was not found, we do not want to include it as part of the url. Set it as an empty string value
                if should_append_dot:
                    service_params_map[service_param + "+Dot"] = ""
                else:
                    service_params_map[service_param] = ""

        return service_params_map

    def handle_service_params_in_endpoint(self, path_params, query_params, required_arguments):
        endpoint = self.endpoint
        start_idx = len("https://")
        if endpoint[start_idx] == "{":  # If the character after https:// is a "{", we have service params
            end_idx = endpoint.rfind("}")
            service_params_url = endpoint[start_idx:end_idx + 1]
            service_params_map = self.map_service_params_to_values(service_params_url, path_params, query_params, required_arguments)

            for service_param, value in service_params_map.items():
                service_param = "{" + service_param + "}"
                endpoint = endpoint.replace(service_param, value)

        return endpoint

    def generate_collection_format_param(self, param_value, collection_format_type):
        if param_value is missing:
            return missing

        if collection_format_type not in VALID_COLLECTION_FORMAT_TYPES:
            raise ValueError('Invalid collection format type {}. Valid types are: {}'.format(collection_format_type, list(VALID_COLLECTION_FORMAT_TYPES.keys())))

        if collection_format_type == 'multi':
            return param_value
        else:
            return VALID_COLLECTION_FORMAT_TYPES[collection_format_type].join(param_value)

    def process_query_params(self, query_params):
        query_params = self.sanitize_for_serialization(query_params)

        processed_query_params = {}
        for k, v in query_params.items():
            # First divide our query params into ones where the param value is "simple" (not a dict or list), a list or a dict. Since we're
            # executing after sanitize_for_serialization has been called it's dicts, lists or primitives all the way down.
            #
            # The params where the value is a dict are, for example, tags we need to handle differently for inclusion
            # in the query string.
            #
            # The params where the value is a list are multivalued parameters in the query string.
            #
            # An example query_params is:
            #
            #   {
            #       "stuff": "things",
            #       "collectionFormat": ["val1", "val2", "val3"]
            #       "dictTags": { "tag1": ["val1", "val2", "val3"], "tag2": ["val1"] },
            #       "dictTagsExists": { "tag3": True, "tag4": True }
            #   }
            #
            # And we can categorize the params as:
            #
            #   Simple: "stuff":"things"
            #   List: "collectionFormat": ["val1", "val2", "val3"]
            #   Dict: "dictTags": { "tag1": ["val1", "val2", "val3"], "tag2": ["val1"] }, "dictTagsExists": { "tag3": True, "tag4": True }
            if isinstance(v, bool):
                # Python capitalizes boolean values in the query parameters.
                processed_query_params[k] = 'true' if v else 'false'
            elif not isinstance(v, dict) and not isinstance(v, list):
                processed_query_params[k] = self.to_path_value(v)
            elif isinstance(v, list):
                # The requests library supports lists to represent multivalued params natively
                # (http://docs.python-requests.org/en/master/api/#requests.Session.params) so we just have to assign
                # the list to the key (where the key is the query string param key)
                processed_query_params[k] = v
            else:
                # If we are here then we either have:
                #
                #   1) a dict where the value is an array. The requests library supports lists to represent multivalued params
                #      natively (http://docs.python-requests.org/en/master/api/#requests.Session.params) so we just have to
                #      manipulate things into the right key. In the case of something like:
                #
                #           "dictTags": { "tag1": ["val1", "val2", "val3"], "tag2": ["val1"] }
                #
                #      What we want is to end up with:
                #
                #           "dictTags.tag1": ["val1", "val2", "val3"], "dictTags.tag2": ["val1"]
                #
                #   2) a dict where the value is not an array and in this case we just explode out the content. For example if we have:
                #
                #           "dictTagsExists": { "tag3": True, "tag4": True }
                #
                #       What we'll end up with is:
                #
                #           "dictTagsExists.tag3": True, "dictTagsExists.tag4": True
                for inner_key, inner_val in v.items():
                    processed_query_params['{}.{}'.format(k, inner_key)] = inner_val

        return processed_query_params

    def request(self, request, allow_control_chars=None, operation_name=None, api_reference_link=None):
        self.logger.info(utc_now() + "Request: %s %s" % (str(request.method), request.url))

        initial_circuit_breaker_state = None
        if self.circuit_breaker_name:
            initial_circuit_breaker_state = CircuitBreakerMonitor.get(self.circuit_breaker_name).state
            if initial_circuit_breaker_state != circuitbreaker.STATE_CLOSED:
                self.logger.debug("Circuit Breaker State is {}!".format(initial_circuit_breaker_state))

        signer = self.signer
        if not request.enforce_content_headers:
            signer = signer.without_content_headers

        stream = False
        if request.response_type == STREAM_RESPONSE_TYPE:
            stream = True
        # If there is a possibility that the response could return a stream, then assume it is a stream by default
        # This will allow us to NOT consume the entire response in case of a streaming response
        if SSE_RESPONSE_HEADER_VALUE in request.header_params.get("accept", "empty"):
            stream = True

        try:
            start = timer()
            response = self.session.request(
                request.method,
                request.url,
                auth=signer,
                params=request.query_params,
                headers=request.header_params,
                data=request.body,
                stream=stream,
                timeout=self.timeout)
            end = timer()
            if request.header_params[constants.HEADER_REQUEST_ID]:
                self.logger.debug(utc_now() + 'time elapsed for request {}: {}'.format(request.header_params[constants.HEADER_REQUEST_ID], str(end - start)))
            if response and hasattr(response, 'elapsed'):
                self.logger.debug(utc_now() + "time elapsed in response: " + str(response.elapsed))
        except requests.exceptions.ConnectTimeout as e:
            if not e.args:
                e.args = ('',)
            e.args = e.args + ("Request Endpoint: " + request.method + " " + request.url + " See {} for help troubleshooting this error, or contact support and provide this full error message.".format(TROUBLESHOOT_URL),)
            raise exceptions.ConnectTimeout(e)
        except requests.exceptions.RequestException as e:
            if not e.args:
                e.args = ('',)
            e.args = e.args + ("Request Endpoint: " + request.method + " " + request.url + " See {} for help troubleshooting this error, or contact support and provide this full error message.".format(TROUBLESHOOT_URL),)
            raise exceptions.RequestException(e)

        response_type = request.response_type
        self.logger.debug(utc_now() + "Response status: %s" % str(response.status_code))

        # Raise Service Error or Transient Service Error
        if not 200 <= response.status_code <= 299:
            target_service = self.service
            request_endpoint = request.method + " " + request.url
            client_version = USER_INFO
            timestamp = datetime.now(timezone.utc).isoformat()

            service_code, message, deserialized_data = self.get_deserialized_service_code_and_message(response, allow_control_chars)
            if response.status_code == 413 and service_code == 'RequestEntityTooLarge':
                self.logger.warning("Recieved a 413/RequestEntityTooLarge from {}, resetting session".format(target_service))
                _ = response.content  # Read the response content to enable closing the socket.
                response.close()
                new_session = copy.copy(self.session)
                self.session.close()
                self.session = new_session
            if isinstance(self.circuit_breaker_strategy, CircuitBreakerStrategy) and self.circuit_breaker_strategy.is_transient_error(response.status_code, service_code):
                new_circuit_breaker_state = CircuitBreakerMonitor.get(self.circuit_breaker_name).state
                if initial_circuit_breaker_state != new_circuit_breaker_state:
                    self.logger.warning("Circuit Breaker state changed from {} to {}".format(initial_circuit_breaker_state, new_circuit_breaker_state))
                self.raise_transient_service_error(request, response, service_code, message, operation_name, api_reference_link, target_service, request_endpoint, client_version, timestamp, deserialized_data)
            else:
                self.raise_service_error(request, response, service_code, message, operation_name, api_reference_link, target_service, request_endpoint, client_version, timestamp, deserialized_data)

        if stream:
            if response.headers.get("content-type", "empty").lower() == SSE_RESPONSE_HEADER_VALUE:
                self.logger.warning("Received SSE response, returning an SSE client")
                # Return the SSE response as received
                deserialized_data = sseclient.SSEClient(response)
            elif response_type and response_type != STREAM_RESPONSE_TYPE and response.headers.get("content-type", "empty").lower() == APPLICATION_JSON_CONTENT_HEADER_VALUE:
                # If the response is non-streaming (in case of SSE), proceed with regular deserialization
                deserialized_data = self.deserialize_response_data(response.content, response_type, allow_control_chars)
            else:
                # Don't unpack a streaming response body
                deserialized_data = response
        elif response_type == BYTES_RESPONSE_TYPE:
            # Don't deserialize data responses.
            deserialized_data = response.content
        elif response_type:
            deserialized_data = self.deserialize_response_data(response.content, response_type, allow_control_chars)
        else:
            deserialized_data = None

        resp = Response(response.status_code, response.headers, deserialized_data, request)
        self.logger.debug(utc_now() + "Response returned")
        return resp

    # Builds the client info string to be sent with each request.
    def build_request_id(self):
        return str(uuid.uuid4()).replace('-', '').upper()

    def add_opc_retry_token_if_needed(self, header_params, retry_token_length=30):
        if 'opc-retry-token' not in header_params:
            header_params['opc-retry-token'] = ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(retry_token_length))

    @staticmethod
    def add_opc_client_retries_header(header_params):
        if 'opc-client-retries' not in header_params:
            header_params['opc-client-retries'] = "true"

    def to_path_value(self, obj):
        """
        Takes value and turn it into a string suitable for inclusion in
        the path, by url-encoding.

        :param obj: object or string value.

        :return string: quoted value.
        """
        if type(obj) == list:
            return ','.join(obj)
        else:
            return str(obj)

    def sanitize_for_serialization(self, obj, declared_type=None, field_name=None):
        """
        Builds a JSON POST object.

        If obj is str, int, float, bool, None, return directly.
        If obj is datetime.datetime, datetime.date
            convert to string in iso8601 format.
        If obj is list, sanitize each element in the list.
        If obj is dict, return the dict.
        If obj is swagger model, return the properties dict.

        :param obj: The data to serialize.
        :return: The serialized form of data.
        """
        types = (six.string_types, six.integer_types, float, bool, type(None))

        declared_swagger_type_to_acceptable_python_types = {
            'str': six.string_types,
            'bool': bool,
            'int': (float, six.integer_types),
            'float': (float, six.integer_types)
        }

        # if there is a declared type for this obj, then validate that obj is of that type. None types (either None or the NONE_SENTINEL) are not validated but
        # instead passed through
        if declared_type and not self.is_none_or_none_sentinel(obj):
            if declared_type.startswith('dict(') and not isinstance(obj, dict):
                self.raise_type_error_serializing_model(field_name, obj, declared_type)
            elif declared_type.startswith('list[') and not (isinstance(obj, list) or isinstance(obj, tuple)):
                self.raise_type_error_serializing_model(field_name, obj, declared_type)
            elif declared_type in self.complex_type_mappings:
                # if its supposed to be one of our models, it can either be an instance of that model OR a dict
                if not isinstance(obj, dict) and not isinstance(obj, self.complex_type_mappings[declared_type]):
                    self.raise_type_error_serializing_model(field_name, obj, declared_type)
            elif declared_type in declared_swagger_type_to_acceptable_python_types and not isinstance(obj, declared_swagger_type_to_acceptable_python_types[declared_type]):
                # if its a primitive with corresponding acceptable python types, validate that obj is an instance of one of those acceptable types
                self.raise_type_error_serializing_model(field_name, obj, declared_type)

        if isinstance(obj, types):
            return obj
        elif obj is NONE_SENTINEL:
            return None
        elif isinstance(obj, list) or isinstance(obj, tuple):
            return [self.sanitize_for_serialization(
                sub_obj,
                self.extract_list_item_type_from_swagger_type(declared_type) if declared_type else None,
                field_name + '[*]' if field_name else None)
                for sub_obj in obj]
        elif isinstance(obj, datetime):
            if not obj.tzinfo:
                obj = pytz.utc.localize(obj)
            return obj.astimezone(pytz.utc).isoformat().replace('+00:00', 'Z')
        elif isinstance(obj, date):
            return obj.isoformat()
        else:
            if isinstance(obj, dict):
                obj_dict = obj

                keys_to_types_and_field_name = None

                # if there is a declared type, then we can use that to validate the types of values in the dict
                if declared_type:
                    dict_value_type = self.extract_dict_value_type_from_swagger_type(declared_type)
                    keys_to_types_and_field_name = {k: (dict_value_type, k) for k in obj_dict}
            else:
                # at this point we are assuming it is one of our models with swagger_types so explicitly throw if its not to give a better error
                if not hasattr(obj, 'swagger_types'):
                    raise TypeError('Not able to serialize data: {} of type: {} in field: {}'.format(str(obj), type(obj).__name__, field_name))

                obj_dict = {obj.attribute_map[attr]: getattr(obj, attr)
                            for attr, _ in obj.swagger_types.items()
                            if getattr(obj, attr) is not None}

                keys_to_types_and_field_name = {obj.attribute_map[attr]: (swagger_type, attr) for attr, swagger_type in six.iteritems(obj.swagger_types)}

            sanitized_dict = {}
            for key, val in six.iteritems(obj_dict):
                value_declared_type = None
                inner_field_name = key
                if keys_to_types_and_field_name:
                    value_declared_type = keys_to_types_and_field_name[key][0]
                    inner_field_name = keys_to_types_and_field_name[key][1]

                inner_field_name = '{}.{}'.format(field_name, inner_field_name) if field_name else inner_field_name
                sanitized_dict[key] = self.sanitize_for_serialization(val, value_declared_type, inner_field_name)

            return sanitized_dict

    def is_none_or_none_sentinel(self, obj):
        return (obj is None) or (obj is NONE_SENTINEL)

    def raise_type_error_serializing_model(self, field_name, obj, declared_type):
        raise TypeError('Field {} with value {} was expected to be of type {} but was of type {}'.format(field_name, str(obj), declared_type, type(obj).__name__))

    def extract_dict_value_type_from_swagger_type(self, swagger_type):
        m = DICT_VALUE_TYPE_REGEX.search(swagger_type)

        result = None
        if m:
            result = m.group(1)

        return result

    def extract_list_item_type_from_swagger_type(self, swagger_type):
        m = LIST_ITEM_TYPE_REGEX.search(swagger_type)

        result = None
        if m:
            result = m.group(1)

        return result

    def raise_service_error(self, request, response, service_code, message, operation_name=None, api_reference_link=None, target_service=None, request_endpoint=None, client_version=None, timestamp=None, deserialized_data=None):
        raise exceptions.ServiceError(
            response.status_code,
            service_code,
            response.headers,
            message,
            original_request=request,
            operation_name=operation_name,
            api_reference_link=api_reference_link,
            target_service=target_service,
            request_endpoint=request_endpoint,
            client_version=client_version,
            timestamp=timestamp,
            deserialized_data=deserialized_data)

    def raise_transient_service_error(self, request, response, service_code, message, operation_name=None, api_reference_link=None, target_service=None, request_endpoint=None, client_version=None, timestamp=None, deserialized_data=None):
        raise exceptions.TransientServiceError(
            response.status_code,
            service_code,
            response.headers,
            message,
            original_request=request,
            operation_name=operation_name,
            api_reference_link=api_reference_link,
            target_service=target_service,
            request_endpoint=request_endpoint,
            client_version=client_version,
            timestamp=timestamp,
            deserialized_data=deserialized_data)

    def get_deserialized_service_code_and_message(self, response, allow_control_chars=None):
        deserialized_data = self.deserialize_response_data(response.content, 'object', allow_control_chars)
        service_code = None
        message = None

        if isinstance(deserialized_data, dict):
            service_code = deserialized_data.get('code')
            message = deserialized_data.get('message')
        else:
            # Deserialized data should be a string if we couldn't deserialize into a dict (i.e. it failed
            # json.loads()). There could still be error information of value to the customer, so instead
            # of black holing the message, just put it in the error
            message = deserialized_data

        return service_code, message, deserialized_data

    def deserialize_response_data(self, response_data, response_type, allow_control_chars=None):
        """
        Deserializes response into an object.

        :param response_data: object to be deserialized.
        :param response_type: class literal for
            deserialized object, or string of class name.
        :param allow_control_chars: boolean to allow control character in a response. Defaults to None

        :return: deserialized object.
        """
        # response.content is always bytes
        response_data = response_data.decode('utf8')

        try:
            should_allow_control_chars = self.should_allow_control_chars(allow_control_chars)

            # Taking the inverse result because strict=True means we do not allow control characters.
            json_response = json.loads(response_data, strict=not should_allow_control_chars)
            # Load everything as JSON and then verify that the object returned
            # is a string (six.text_type) if the response type is a string.
            # This is matches the previous behavior, which happens to strip
            # the embedded quotes in the get_namespace response.
            # There is the potential that an API will declare that it returns
            # a string and the string will be a valid JSON Object. In that case
            # we do not update the response_data with the json_response.
            # If we do later steps will fail because they are expecting the
            # response_data to be a string.
            if response_type != "str" or type(json_response) == six.text_type:
                response_data = json_response
        except ValueError:
            pass

        if self.skip_deserialization:
            return response_data
        else:
            start = timer()
            res = self.__deserialize(response_data, response_type)
            end = timer()
            self.logger.debug(utc_now() + 'python SDK time elapsed for deserializing: {}'.format(str(end - start)))
            return res

    def __deserialize(self, data, cls):
        """
        Deserialize a dict, list, or str into an object.

        :param data: dict, list or str
        :param cls: string of class name

        :return: object.
        """
        if data is None:
            return None

        if cls.startswith('list['):
            sub_kls = re.match(r'list\[(.*)\]', cls).group(1)  # noqa: W605
            return [self.__deserialize(sub_data, sub_kls)
                    for sub_data in data]

        if cls.startswith('dict('):
            sub_kls = re.match(r'dict\(([^,]*), (.*)\)', cls).group(2)  # noqa: W605
            return {k: self.__deserialize(v, sub_kls)
                    for k, v in data.items()}

        # Enums are not present in type mappings, and they are strings, so we need to call  __deserialize_primitive()
        if cls in self.type_mappings:
            cls = self.type_mappings[cls]
        else:
            return self.__deserialize_primitive(data, cls)

        if hasattr(cls, 'get_subtype'):
            # Use the discriminator value to get the correct subtype.
            cls = cls.get_subtype(data)  # get_subtype returns a str
            cls = self.type_mappings[cls]

        if cls in [int, float, six.u, bool]:
            return self.__deserialize_primitive(data, cls)
        elif cls == object:
            return data
        elif cls == date:
            return self.__deserialize_date(data)
        elif cls == datetime:
            return self.__deserialize_datetime(data)
        else:
            return self.__deserialize_model(data, cls)

    def __deserialize_primitive(self, data, cls):
        """
        Deserializes string to primitive type.

        :param data: str.
        :param cls: class literal.

        :return: int, float, str, bool.
        """
        try:
            value = cls(data)
        except UnicodeEncodeError:
            value = six.u(data)
        except TypeError:
            value = data
        return value

    def __deserialize_date(self, string):
        """
        Deserializes string to date.

        :param string: str.
        :return: date.
        """
        try:
            return parse(string).date()
        except ImportError:
            return string
        except ValueError:
            raise Exception("Failed to parse `{0}` into a date object".format(string))

    def __deserialize_datetime(self, string):
        """
        Deserializes string to datetime.

        The string should be in iso8601 datetime format.

        :param string: str.
        :return: datetime.
        """
        try:
            # If this parser creates a date without raising an exception
            # then the time zone is utc and needs to be set.
            naivedatetime = datetime.strptime(string, "%Y-%m-%dT%H:%M:%S.%fZ")
            awaredatetime = naivedatetime.replace(tzinfo=tz.tzutc())
            return awaredatetime

        except ValueError:
            try:
                return parse(string)
            except ImportError:
                return string
            except ValueError:
                raise Exception("Failed to parse `{0}` into a datetime object".format(string))
        except ImportError:
            return string

    def __deserialize_model(self, data, cls):
        """
        Deserializes list or dict to model.

        :param data: dict, list.
        :param cls: class literal.
        :return: model object.
        """
        instance = cls()

        for attr, attr_type in instance.swagger_types.items():
            property = instance.attribute_map[attr]
            if property in data:
                value = data[property]
                setattr(instance, attr, self.__deserialize(value, attr_type))

        return instance

    @staticmethod
    def get_preferred_retry_strategy(operation_retry_strategy, client_retry_strategy):
        """
        Gets the preferred Retry Strategy for the client
        :param operation_retry_strategy: (optional) Operation level Retry Strategy
        :param client_retry_strategy: (optional) Client level Retry Strategy
        :return: Preferred Retry Strategy.
        """
        retry_strategy = None
        if operation_retry_strategy:
            retry_strategy = operation_retry_strategy
        elif client_retry_strategy:
            retry_strategy = client_retry_strategy
        elif retry.GLOBAL_RETRY_STRATEGY:
            retry_strategy = retry.GLOBAL_RETRY_STRATEGY
        return retry_strategy

    def should_allow_control_chars(self, allow_control_chars):
        request_configuration = allow_control_chars
        client_configuration = self._allow_control_chars
        global_configuration = BaseClient.ALLOW_CONTROL_CHARACTERS

        # Check at the request level
        if request_configuration is not None:
            return request_configuration

        # Check at the client level
        if client_configuration is not None:
            return client_configuration

        # Check at the global level
        if global_configuration is True:
            return True

        return False

    def should_allow_template_per_realm(self):
        """
        Returns a boolean of whether or not we should use realm specific endpoint templates

        The highest precedence goes in decending order. The order: Client / Environment / Default (False)
        """
        client_configuration = self.client_level_realm_specific_endpoint_template_enabled
        env_configuration = os.environ.get(OCI_REALM_SPECIFIC_SERVICE_ENDPOINT_TEMPLATE_ENABLED)  # comes back as a string.

        # Check at the client level
        if client_configuration is not None:
            return client_configuration

        # Check at the environment level
        if env_configuration is not None:
            return env_configuration.lower() == 'true'

        # By default we should not allow this feature.
        return False

Youez - 2016 - github.com/yon3zu
LinuXploit