Server IP : 103.119.228.120 / Your IP : 52.14.209.100 Web Server : Apache System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64 User : nobody ( 99) PHP Version : 5.6.40 Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /lib/mysqlsh/lib/python3.9/site-packages/oci/ |
Upload File : |
# coding: utf-8 # Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved. # This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license. from __future__ import absolute_import import base64 import datetime import json import re import os import os.path import pytz from oci._vendor import six import oci.exceptions try: from urllib.parse import urlparse except ImportError: from urlparse import urlparse from io import SEEK_SET import logging logger = logging.getLogger(__name__) logger = logging.getLogger(__name__) INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE_VALUE_NAME = 'instance_principal' DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE = 'delegation_token_with_instance_principal' RESOURCE_PRINCIPAL_AUTHENTICATION_TYPE = 'resource_principal' DELEGATION_TOKEN_FILE_FIELD_NAME = 'delegation_token_file' AUTHENTICATION_TYPE_FIELD_NAME = 'authentication_type' MEBIBYTE = 1024 * 1024 DEFAULT_BUFFER_SIZE = 100 * MEBIBYTE DEFAULT_PART_SIZE = 128 * MEBIBYTE try: # PY3+ import collections.abc as abc except ImportError: # PY2 import collections as abc missing_attr = object() def to_dict(obj): """Helper to flatten models into dicts for rendering. The following conversions are applied: * datetime.date, datetime.datetime, datetime.time are converted into ISO8601 UTC strings """ # Shortcut strings so they don't count as Iterables if isinstance(obj, six.string_types): return obj elif obj is NONE_SENTINEL: return None elif isinstance(obj, (datetime.datetime, datetime.time)): # always use UTC if not obj.tzinfo: obj = pytz.utc.localize(obj) if isinstance(obj, datetime.datetime): # only datetime.datetime takes a separator return obj.isoformat(sep="T") return obj.isoformat() elif isinstance(obj, datetime.date): # datetime.date doesn't have a timezone return obj.isoformat() elif isinstance(obj, abc.Mapping): return {k: to_dict(v) for k, v in six.iteritems(obj)} elif isinstance(obj, abc.Iterable): return [to_dict(v) for v in obj] # Not a string, datetime, dict, list, or model - return directly elif not hasattr(obj, "swagger_types"): return obj # Collect attrs from obj according to swagger_types into a dict as_dict = {} for key in six.iterkeys(obj.swagger_types): value = getattr(obj, key, missing_attr) if value is not missing_attr: as_dict[key] = to_dict(value) return as_dict def formatted_flat_dict(model): """Returns a string of the model flattened as a dict, sorted""" as_dict = to_dict(model) return json.dumps( as_dict, indent=2, sort_keys=True ) def value_allowed_none_or_none_sentinel(value_to_test, allowed_values): return value_to_test is None or value_to_test is NONE_SENTINEL or value_to_test in allowed_values def file_content_as_launch_instance_user_data(file_path): """ Takes a file path and returns a Base64-encoded string which can be provided as the value of the ``user_data`` key in the ``metadata`` dictionary when launching an instance(see :py:class:`~oci.core.models.LaunchInstanceDetails` for more information). :param str file_path: The path to the file whose contents will be Base64-encoded :return: The Base64-encoded string :rtype: str """ full_path = os.path.expandvars(os.path.expanduser(file_path)) with open(full_path, 'rb') as f: file_contents = f.read() return base64.b64encode(file_contents).decode('utf-8') class Sentinel(object): """Named singletons for clear docstrings. Also used to differentiate an explicit param of None from a lack of argument. .. code-block:: pycon >>> missing = Sentinel("Missing", False) >>> also_missing = Sentinel("Missing", False) >>> assert missing is also_missing >>> repr(missing) <Missing> >>> assert bool(missing) is False """ _symbols = {} def __new__(cls, name, truthy=True): sentinel = Sentinel._symbols.get(name, None) if sentinel is None: sentinel = Sentinel._symbols[name] = super(Sentinel, cls).__new__(cls) elif sentinel.truthy is not truthy: raise ValueError("Tried to get existing Sentinel {!r} with wrong truthy value".format(sentinel)) return sentinel def __init__(self, name, truthy=True): self.name = name self.truthy = truthy def __repr__(self): # Sentinel("Missing") -> <Missing> return "<{}>".format(self.name) def __bool__(self): return self.truthy # PY2 Compatibility __nonzero__ = __bool__ NONE_SENTINEL = Sentinel(name='None', truthy=False) WAIT_RESOURCE_NOT_FOUND = Sentinel(name='WaitResourceNotFound', truthy=False) def _get_signer_from_delegation_token_instance_principal(config): # Import the signer inside the function to avoid circular imports during initialization from oci.auth.signers import InstancePrincipalsDelegationTokenSigner signer_kwargs = {} delegation_token_file_path = config.get(DELEGATION_TOKEN_FILE_FIELD_NAME) if delegation_token_file_path is None: raise oci.exceptions.InvalidConfig('ERROR: {} was not provided.'.format(DELEGATION_TOKEN_FILE_FIELD_NAME)) expanded_delegation_token_file_path = os.path.expanduser(delegation_token_file_path) if not os.path.isfile(expanded_delegation_token_file_path): raise oci.exceptions.InvalidConfig('ERROR: delegation token file not found at {}'.format(expanded_delegation_token_file_path)) with open(expanded_delegation_token_file_path, mode="r") as f: delegation_token = f.read().strip() if delegation_token is None: raise oci.exceptions.InvalidConfig('ERROR: delegation_token was not provided.') signer_kwargs['delegation_token'] = delegation_token # Return signer with delegation token return InstancePrincipalsDelegationTokenSigner(**signer_kwargs) def _get_signer_from_resource_principal(config): # Import the signer inside the function to avoid circular imports during initialization from oci.auth.signers import get_resource_principal_signer return get_resource_principal_signer() # This map can be easily extended to accommodate support for more auth types through the config file AUTH_TYPE_TO_SIGNER_FUNCTION_MAP = { DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE: _get_signer_from_delegation_token_instance_principal, RESOURCE_PRINCIPAL_AUTHENTICATION_TYPE: _get_signer_from_resource_principal, } def get_signer_from_authentication_type(config): # This is currently made for allowing SDK to run seamlessly on cloud shell auth_type = get_authentication_type_from_config(config) # Get the signer function from map signer_function = AUTH_TYPE_TO_SIGNER_FUNCTION_MAP.get(auth_type) return signer_function(config) def get_authentication_type_from_config(config): auth_type = config.get(AUTHENTICATION_TYPE_FIELD_NAME) if auth_type is None: raise ValueError("{} not provided".format(AUTHENTICATION_TYPE_FIELD_NAME)) # Currently the SDK supports only the cloud shell use case, this can be extended for other auth types if auth_type == INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE_VALUE_NAME: if DELEGATION_TOKEN_FILE_FIELD_NAME in config: return DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE else: raise oci.exceptions.InvalidConfig("The authentication type {} requires config values for the keys {}".format(DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE, DELEGATION_TOKEN_FILE_FIELD_NAME)) else: raise oci.exceptions.InvalidConfig("The authentication type {} is not supported".format(auth_type)) def back_up_body_calculate_stream_content_length(stream, buffer_limit=DEFAULT_BUFFER_SIZE): # Note: Need to read in chunks, older version of Python will fail to read more than 2GB at once. # We pull data in chunks (128MB at a time) from the stream until there is no more logger.warning("Reading the stream to calculate its content-length. Process may freeze for very big streams. Consider passing in content length for big objects") try: keep_reading = True part_size = DEFAULT_PART_SIZE content_length = 0 content = "" byte_content = b'' while keep_reading: if hasattr(stream, 'buffer'): content = stream.buffer.read(part_size) elif hasattr(stream, 'read'): content = stream.read(part_size) else: raise TypeError("Stream object does not contain a 'read' property. Cannot auto calculate content length, please pass in content length") if len(content) == 0: keep_reading = False byte_content += content content_length += len(content) if (buffer_limit and content_length > buffer_limit): raise BufferError("Stream size is greater than the buffer_limit, please pass in a bigger buffer_limit or pass in content length to the request") return {"content_length": content_length, "byte_content": byte_content} except (IOError, OSError): raise TypeError("Stream object's content length cannot be calculated, please pass in content length") # This helper function checks if an object can have it's content length auto-calculated by the Request library def is_content_length_calculable_by_req_util(o): if hasattr(o, '__len__') or hasattr(o, 'len') or hasattr(o, 'fileno') or hasattr(o, 'tell'): return True logger.warning("Request did not contain content-length and stream object does not contain fileno or tell property. Stream object will be read to calculate content-length") return False def extract_service_endpoint(endpoint_with_base_path): """ Takes a Service Endpoint with base-path embedded and extracts the Service Endpoint from it. :param str endpoint_with_base_path: Service Endpoint with base-path embedded :return: The Service Endpoint without base-path. :rtype: str """ parsed_endpoint = urlparse(endpoint_with_base_path) return parsed_endpoint.scheme + r'://' + parsed_endpoint.netloc def should_record_body_position_for_retry(func_ref, **func_kwargs): func_name = func_ref.__name__ # TODO: remove Python 2 requirements, use qualname if func_name == 'call_api': body = func_kwargs.get('body') # A file-like object body should be treated differently for retry if body and hasattr(body, 'read'): return True return False return False def record_body_position_for_rewind(body): is_body_rewindable = True if getattr(body, 'tell', None) is not None: try: # Attempt to record current body position body_position = body.tell() except (IOError, OSError): # If we cannot record the current body position for a file-like body, then we should not retry is_body_rewindable = False body_position = None logger.warning("Unable to record body position for rewinding. This request will not be retried/rewound") else: # If the body does not support tell, then don't retry is_body_rewindable = False body_position = None logger.warning("Unable to record body position for rewinding. This request will not be retried/rewound") return is_body_rewindable, body_position def rewind_body(body, body_position): if getattr(body, 'seek', None) is not None: try: body.seek(body_position, SEEK_SET) except (IOError, OSError): # If we're unable to reset the body position, then we should not retry logger.warning("Unable to reset body position for rewinding. This request will not be retried/rewound") return False return True # if the body does not support seek, then we should not retry logger.warning("Unable to reset body position for rewinding. This request will not be retried/rewound") return False def read_stream_for_signing(signing_algorithm, body): bytes_read = 0 try: while True: chunk = "" if hasattr(body, "read"): chunk = body.read(DEFAULT_PART_SIZE) elif hasattr(body, "buffer"): chunk = body.buffer.read(DEFAULT_PART_SIZE) if len(chunk) == 0: break bytes_read += len(chunk) signing_algorithm.update(chunk) except (IOError, OSError): logger.warning("Unable to read stream body for signing") bytes_read = -1 return bytes_read def camel_to_snake(name): s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() def camel_to_snake_keys(dictionary): outdict = {} for k, v in dictionary.items(): outdict[camel_to_snake(k)] = v return outdict