403Webshell
Server IP : 103.119.228.120  /  Your IP : 52.14.209.100
Web Server : Apache
System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User : nobody ( 99)
PHP Version : 5.6.40
Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime
MySQL : ON |  cURL : ON |  WGET : ON |  Perl : ON |  Python : ON |  Sudo : ON |  Pkexec : ON
Directory :  /lib/mysqlsh/lib/python3.9/site-packages/oci/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /lib/mysqlsh/lib/python3.9/site-packages/oci/util.py
# coding: utf-8
# Copyright (c) 2016, 2024, Oracle and/or its affiliates.  All rights reserved.
# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.

from __future__ import absolute_import

import base64
import datetime
import json
import re
import os
import os.path

import pytz

from oci._vendor import six
import oci.exceptions

try:
    from urllib.parse import urlparse
except ImportError:
    from urlparse import urlparse

from io import SEEK_SET
import logging

logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE_VALUE_NAME = 'instance_principal'
DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE = 'delegation_token_with_instance_principal'
RESOURCE_PRINCIPAL_AUTHENTICATION_TYPE = 'resource_principal'
DELEGATION_TOKEN_FILE_FIELD_NAME = 'delegation_token_file'
AUTHENTICATION_TYPE_FIELD_NAME = 'authentication_type'
MEBIBYTE = 1024 * 1024
DEFAULT_BUFFER_SIZE = 100 * MEBIBYTE
DEFAULT_PART_SIZE = 128 * MEBIBYTE

try:
    # PY3+
    import collections.abc as abc
except ImportError:
    # PY2
    import collections as abc

missing_attr = object()


def to_dict(obj):
    """Helper to flatten models into dicts for rendering.

    The following conversions are applied:

    * datetime.date, datetime.datetime, datetime.time
      are converted into ISO8601 UTC strings
    """
    # Shortcut strings so they don't count as Iterables
    if isinstance(obj, six.string_types):
        return obj
    elif obj is NONE_SENTINEL:
        return None
    elif isinstance(obj, (datetime.datetime, datetime.time)):
        # always use UTC
        if not obj.tzinfo:
            obj = pytz.utc.localize(obj)
        if isinstance(obj, datetime.datetime):
            # only datetime.datetime takes a separator
            return obj.isoformat(sep="T")
        return obj.isoformat()
    elif isinstance(obj, datetime.date):
        # datetime.date doesn't have a timezone
        return obj.isoformat()
    elif isinstance(obj, abc.Mapping):
        return {k: to_dict(v) for k, v in six.iteritems(obj)}
    elif isinstance(obj, abc.Iterable):
        return [to_dict(v) for v in obj]
    # Not a string, datetime, dict, list, or model - return directly
    elif not hasattr(obj, "swagger_types"):
        return obj

    # Collect attrs from obj according to swagger_types into a dict
    as_dict = {}
    for key in six.iterkeys(obj.swagger_types):
        value = getattr(obj, key, missing_attr)
        if value is not missing_attr:
            as_dict[key] = to_dict(value)
    return as_dict


def formatted_flat_dict(model):
    """Returns a string of the model flattened as a dict, sorted"""
    as_dict = to_dict(model)
    return json.dumps(
        as_dict,
        indent=2,
        sort_keys=True
    )


def value_allowed_none_or_none_sentinel(value_to_test, allowed_values):
    return value_to_test is None or value_to_test is NONE_SENTINEL or value_to_test in allowed_values


def file_content_as_launch_instance_user_data(file_path):
    """
    Takes a file path and returns a Base64-encoded string which can be provided as the value of the ``user_data`` key
    in the ``metadata`` dictionary when launching an instance(see :py:class:`~oci.core.models.LaunchInstanceDetails`
    for more information).

    :param str file_path:
      The path to the file whose contents will be Base64-encoded

    :return: The Base64-encoded string
    :rtype: str
    """

    full_path = os.path.expandvars(os.path.expanduser(file_path))
    with open(full_path, 'rb') as f:
        file_contents = f.read()

    return base64.b64encode(file_contents).decode('utf-8')


class Sentinel(object):
    """Named singletons for clear docstrings.
    Also used to differentiate an explicit param of None from a lack of argument.

    .. code-block:: pycon

        >>> missing = Sentinel("Missing", False)
        >>> also_missing = Sentinel("Missing", False)
        >>> assert missing is also_missing
        >>> repr(missing)
        <Missing>
        >>> assert bool(missing) is False
    """
    _symbols = {}

    def __new__(cls, name, truthy=True):
        sentinel = Sentinel._symbols.get(name, None)
        if sentinel is None:
            sentinel = Sentinel._symbols[name] = super(Sentinel, cls).__new__(cls)
        elif sentinel.truthy is not truthy:
            raise ValueError("Tried to get existing Sentinel {!r} with wrong truthy value".format(sentinel))
        return sentinel

    def __init__(self, name, truthy=True):
        self.name = name
        self.truthy = truthy

    def __repr__(self):
        # Sentinel("Missing") -> <Missing>
        return "<{}>".format(self.name)

    def __bool__(self):
        return self.truthy
    # PY2 Compatibility
    __nonzero__ = __bool__


NONE_SENTINEL = Sentinel(name='None', truthy=False)

WAIT_RESOURCE_NOT_FOUND = Sentinel(name='WaitResourceNotFound', truthy=False)


def _get_signer_from_delegation_token_instance_principal(config):
    # Import the signer inside the function to avoid circular imports during initialization
    from oci.auth.signers import InstancePrincipalsDelegationTokenSigner

    signer_kwargs = {}

    delegation_token_file_path = config.get(DELEGATION_TOKEN_FILE_FIELD_NAME)

    if delegation_token_file_path is None:
        raise oci.exceptions.InvalidConfig('ERROR: {} was not provided.'.format(DELEGATION_TOKEN_FILE_FIELD_NAME))

    expanded_delegation_token_file_path = os.path.expanduser(delegation_token_file_path)
    if not os.path.isfile(expanded_delegation_token_file_path):
        raise oci.exceptions.InvalidConfig('ERROR: delegation token file not found at {}'.format(expanded_delegation_token_file_path))

    with open(expanded_delegation_token_file_path, mode="r") as f:
        delegation_token = f.read().strip()
    if delegation_token is None:
        raise oci.exceptions.InvalidConfig('ERROR: delegation_token was not provided.')
    signer_kwargs['delegation_token'] = delegation_token
    # Return signer with delegation token
    return InstancePrincipalsDelegationTokenSigner(**signer_kwargs)


def _get_signer_from_resource_principal(config):
    # Import the signer inside the function to avoid circular imports during initialization
    from oci.auth.signers import get_resource_principal_signer

    return get_resource_principal_signer()


# This map can be easily extended to accommodate support for more auth types through the config file
AUTH_TYPE_TO_SIGNER_FUNCTION_MAP = {
    DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE: _get_signer_from_delegation_token_instance_principal,
    RESOURCE_PRINCIPAL_AUTHENTICATION_TYPE: _get_signer_from_resource_principal,
}


def get_signer_from_authentication_type(config):
    # This is currently made for allowing SDK to run seamlessly on cloud shell
    auth_type = get_authentication_type_from_config(config)

    # Get the signer function from map
    signer_function = AUTH_TYPE_TO_SIGNER_FUNCTION_MAP.get(auth_type)
    return signer_function(config)


def get_authentication_type_from_config(config):
    auth_type = config.get(AUTHENTICATION_TYPE_FIELD_NAME)
    if auth_type is None:
        raise ValueError("{} not provided".format(AUTHENTICATION_TYPE_FIELD_NAME))

    # Currently the SDK supports only the cloud shell use case, this can be extended for other auth types
    if auth_type == INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE_VALUE_NAME:
        if DELEGATION_TOKEN_FILE_FIELD_NAME in config:
            return DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE
        else:
            raise oci.exceptions.InvalidConfig("The authentication type {} requires config values for the keys {}".format(DELEGATION_TOKEN_WITH_INSTANCE_PRINCIPAL_AUTHENTICATION_TYPE, DELEGATION_TOKEN_FILE_FIELD_NAME))
    else:
        raise oci.exceptions.InvalidConfig("The authentication type {} is not supported".format(auth_type))


def back_up_body_calculate_stream_content_length(stream, buffer_limit=DEFAULT_BUFFER_SIZE):
    # Note: Need to read in chunks, older version of Python will fail to read more than 2GB at once.
    # We pull data in chunks (128MB at a time) from the stream until there is no more
    logger.warning("Reading the stream to calculate its content-length. Process may freeze for very big streams. Consider passing in content length for big objects")
    try:
        keep_reading = True
        part_size = DEFAULT_PART_SIZE
        content_length = 0
        content = ""
        byte_content = b''
        while keep_reading:
            if hasattr(stream, 'buffer'):
                content = stream.buffer.read(part_size)
            elif hasattr(stream, 'read'):
                content = stream.read(part_size)
            else:
                raise TypeError("Stream object does not contain a 'read' property. Cannot auto calculate content length, please pass in content length")
            if len(content) == 0:
                keep_reading = False
            byte_content += content
            content_length += len(content)
            if (buffer_limit and content_length > buffer_limit):
                raise BufferError("Stream size is greater than the buffer_limit, please pass in a bigger buffer_limit or pass in content length to the request")
        return {"content_length": content_length, "byte_content": byte_content}
    except (IOError, OSError):
        raise TypeError("Stream object's content length cannot be calculated, please pass in content length")


# This helper function checks if an object can have it's content length auto-calculated by the Request library
def is_content_length_calculable_by_req_util(o):
    if hasattr(o, '__len__') or hasattr(o, 'len') or hasattr(o, 'fileno') or hasattr(o, 'tell'):
        return True
    logger.warning("Request did not contain content-length and stream object does not contain fileno or tell property. Stream object will be read to calculate content-length")
    return False


def extract_service_endpoint(endpoint_with_base_path):
    """
    Takes a Service Endpoint with base-path embedded and extracts the Service Endpoint from it.

    :param str endpoint_with_base_path:
    Service Endpoint with base-path embedded

    :return: The Service Endpoint without base-path.
    :rtype: str
    """
    parsed_endpoint = urlparse(endpoint_with_base_path)
    return parsed_endpoint.scheme + r'://' + parsed_endpoint.netloc


def should_record_body_position_for_retry(func_ref, **func_kwargs):
    func_name = func_ref.__name__
    # TODO: remove Python 2 requirements, use qualname
    if func_name == 'call_api':
        body = func_kwargs.get('body')
        # A file-like object body should be treated differently for retry
        if body and hasattr(body, 'read'):
            return True
        return False
    return False


def record_body_position_for_rewind(body):
    is_body_rewindable = True
    if getattr(body, 'tell', None) is not None:
        try:
            # Attempt to record current body position
            body_position = body.tell()
        except (IOError, OSError):
            # If we cannot record the current body position for a file-like body, then we should not retry
            is_body_rewindable = False
            body_position = None
            logger.warning("Unable to record body position for rewinding. This request will not be retried/rewound")
    else:
        # If the body does not support tell, then don't retry
        is_body_rewindable = False
        body_position = None
        logger.warning("Unable to record body position for rewinding. This request will not be retried/rewound")
    return is_body_rewindable, body_position


def rewind_body(body, body_position):
    if getattr(body, 'seek', None) is not None:
        try:
            body.seek(body_position, SEEK_SET)
        except (IOError, OSError):
            # If we're unable to reset the body position, then we should not retry
            logger.warning("Unable to reset body position for rewinding. This request will not be retried/rewound")
            return False
        return True
    # if the body does not support seek, then we should not retry
    logger.warning("Unable to reset body position for rewinding. This request will not be retried/rewound")
    return False


def read_stream_for_signing(signing_algorithm, body):
    bytes_read = 0
    try:
        while True:
            chunk = ""
            if hasattr(body, "read"):
                chunk = body.read(DEFAULT_PART_SIZE)
            elif hasattr(body, "buffer"):
                chunk = body.buffer.read(DEFAULT_PART_SIZE)
            if len(chunk) == 0:
                break
            bytes_read += len(chunk)
            signing_algorithm.update(chunk)
    except (IOError, OSError):
        logger.warning("Unable to read stream body for signing")
        bytes_read = -1
    return bytes_read


def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()


def camel_to_snake_keys(dictionary):
    outdict = {}
    for k, v in dictionary.items():
        outdict[camel_to_snake(k)] = v
    return outdict

Youez - 2016 - github.com/yon3zu
LinuXploit