403Webshell
Server IP : 103.119.228.120  /  Your IP : 3.14.246.52
Web Server : Apache
System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User : nobody ( 99)
PHP Version : 5.6.40
Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime
MySQL : ON |  cURL : ON |  WGET : ON |  Perl : ON |  Python : ON |  Sudo : ON |  Pkexec : ON
Directory :  /lib/mysqlsh/lib/python3.9/site-packages/oci/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /lib/mysqlsh/lib/python3.9/site-packages/oci/regions.py
# coding: utf-8
# Copyright (c) 2016, 2024, Oracle and/or its affiliates.  All rights reserved.
# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.

from __future__ import absolute_import
import os
import json
import logging
from enum import Enum

from . import regions_definitions
from . import service_endpoints
from oci.exceptions import InvalidAlloyConfig
from oci._vendor import six
from oci._vendor import requests
from oci._vendor.requests.exceptions import HTTPError, ConnectionError, RetryError
from oci._vendor.requests.adapters import HTTPAdapter
from oci._vendor.urllib3.util.retry import Retry

REGIONS_SHORT_NAMES = regions_definitions.REGIONS_SHORT_NAMES
REGION_REALMS = regions_definitions.REGION_REALMS
REALMS = regions_definitions.REALMS
REGIONS = regions_definitions.REGIONS

SERVICE_ENDPOINTS = service_endpoints.SERVICE_ENDPOINTS
SERVICE_ENDPOINTS['auth'] = 'https://auth.{domain}'

DOMAIN_FORMAT = "{region}.{secondLevelDomain}"

OCI_REGION_METADATA_ENV_VAR_NAME = "OCI_REGION_METADATA"

REALM_KEY_PROPERTY_NAME = "realmKey"
REALM_DOMAIN_COMPONENT_PROPERTY_NAME = "realmDomainComponent"
REGION_KEY_PROPERTY_NAME = "regionKey"
REGION_IDENTIFIER_PROPERTY_NAME = "regionIdentifier"

# Region Metadata schema
REGION_METADATA_KEYS = (REALM_KEY_PROPERTY_NAME,
                        REALM_DOMAIN_COMPONENT_PROPERTY_NAME,
                        REGION_KEY_PROPERTY_NAME,
                        REGION_IDENTIFIER_PROPERTY_NAME)

# Constants for getting region info from IMDS
METADATA_URL_BASE = 'http://169.254.169.254/opc/v2'
GET_REGION_URL = '{}/instance/regionInfo'.format(METADATA_URL_BASE)
METADATA_AUTH_HEADERS = {'Authorization': 'Bearer Oracle'}

# Constants for regions config file
REGIONS_CONFIG_FILE_PATH = os.path.join('~', '.oci', 'regions-config.json')

# Enumerate region information sources outside of lookups
ExternalSources = Enum('ExternalSources', ['REGIONS_CFG_FILE', 'ENV_VAR', 'IMDS', 'SECOND_LEVEL_DOMAIN_FALLBACK'])

# Constant env name for second level domain fallback.
DEFAULT_REALM_ENV_VARNAME = "OCI_DEFAULT_REALM"

# Variable for second_level_domain_fallback, initially empty string, but will be set to fallback if env.DEFAULT_REALM_ENV_VARNAME exists
second_level_domain_fallback = ""

# Dict to track if we have read the ExternalSources
# Reading from IMDS is opt-in and can be enabled by calling enable_instance_metadata_service()
_has_been_read_external_sources = {
    ExternalSources.REGIONS_CFG_FILE: False,
    ExternalSources.ENV_VAR: False,
    ExternalSources.IMDS: True,
    ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK: False
}

# Alloy config
_IS_ALLOY_MODE = False
OCI_ALLOY_REGION_COEXIST_ENV_VAR_NAME = "OCI_ALLOY_REGION_COEXIST"
_IS_ALLOY_REGION_COEXIST = False

ALLOY_PROVIDER_NAME = None

logger = logging.getLogger(__name__)


def is_region(region_name):
    region = region_name.lower()

    if region in REGIONS_SHORT_NAMES:
        return False
    if region not in REGIONS:
        return _check_and_add_region_metadata(region)
    return True


def is_region_short_name(region):
    region = region.lower()
    if region in REGIONS_SHORT_NAMES:
        return True

    if region in REGIONS:
        return False

    if _check_and_add_region_metadata(region):
        # Above call will return true if the requested region is now known, after considering additional sources
        # Check is needed if region short code was passed
        if region in REGIONS_SHORT_NAMES:
            return True

    return False


def get_region_from_short_name(region):
    region = region.lower()
    if not is_region_short_name(region):
        logger.debug("Could not find region information in lookups and any other sources")
        # Return the input string to allow fallback to OC1
        return region
    # Return region from lookup
    return REGIONS_SHORT_NAMES[region]


def get_realm_from_region(region):
    region = region.lower()
    # If short name is passed, get the long name
    # get_region_from_short_name also adds metadata from other sources
    region = get_region_from_short_name(region)
    if not is_region(region):
        logger.debug("Cannot find region information in lookups and other sources, defaulting to OC1 realm")
        return "oc1"
    else:
        return REGION_REALMS[region]


def endpoint_for(service, region=None, endpoint=None, service_endpoint_template=None, endpoint_service_name=None):
    """Returns the base URl for a service, either in the given region or at the specified endpoint.

    If endpoint and region are provided, endpoint is used.

    If the region information is not available in the existing maps, following sources are checked in order:
    1. Regions Configuration File at ~/.oci/regions-config.json
    2. Region Metadata Environment variable
    3. Instance Metadata Service

    Lookup from Instance Metadata Service is disabled by default. To enable, call enable_instance_metadata_service()

    The region metadata schema is:
    {
        "realmKey" : string,
        "realmDomainComponent" : string,
        "regionKey" : string,
        "regionIdentifier" : string
    }

    For example,  for the Sydney OC1 region, the schema would be filled out as follows:
    {
        "realmKey" : "OC1",
        "realmDomainComponent" : "oraclecloud.com",
        "regionKey" : "SYD",
        "regionIdentifier" : "ap-sydney-1"
     }

     If the region still cannot be resolved, we fall back to OC1 realm.

     If alloy mode is enabled, and OCI_ALLOY_REGION_COEXIST is not set, only the alloy regions are available. If you want
     to access OCI and alloy regions at the same time, please set OCI_ALLOY_REGION_COEXIST to True. Fallback to OC1 realm
     is not available when OCI_ALLOY_REGION_COEXIST is not set.

    """
    if not (endpoint or region):
        raise ValueError("Must supply either a region or an endpoint.")

    if endpoint:
        # endpoint takes priority
        return _format_endpoint(service, endpoint, service_endpoint_template, endpoint_service_name)

    region = region.lower()
    # If unable to find region information from existing maps, check the other sources and add
    if not (region in REGIONS or region in REGIONS_SHORT_NAMES):
        _check_and_add_region_metadata(region)
    return _endpoint_for(service, region, service_endpoint_template, endpoint_service_name)


def skip_instance_metadata_service():
    logger.debug("Disabling region metadata lookup from IMDS")
    _set_source_has_been_read(ExternalSources.IMDS)


def enable_instance_metadata_service():
    logger.debug("Enabling region metadata lookup from IMDS")
    _set_source_has_been_read(ExternalSources.IMDS, False)


def add_region_schema_for_alloy(region_metadata):
    """
    This method is called from the alloy_config.py file to register an alloy region.
    Args:
        region_metadata: dict
    """
    # If we are here, then alloy-config.json has already been processed by _process_region_metadata_from_alloy_config,
    # since that method is called while initialization. It also processes _IS_ALLOY_REGION_COEXIST and resets region info
    # and external sources accordingly, except for regions added through alloy-config.json file. Check coexists and
    # reset accordingly
    if not _IS_ALLOY_REGION_COEXIST:
        _reset_region_info()
    _add_alloy_regions(region_metadata)


def _set_source_has_been_read(source, value=True):
    _has_been_read_external_sources[source] = value


def _get_source_has_been_read(source):
    return _has_been_read_external_sources[source]


def _reset_region_info():
    logger.debug("Resetting regions information")
    global REGIONS_SHORT_NAMES, REGION_REALMS, REALMS, REGIONS
    REGIONS_SHORT_NAMES = {}
    REGION_REALMS = {}
    REALMS = {}
    REGIONS = []


def _add_region_information_to_override(region_metadata):
    if _check_valid_schema(region_metadata):
        region_metadata = {k: v.lower() for k, v in six.iteritems(region_metadata)}
        REGIONS_SHORT_NAMES[region_metadata[REGION_KEY_PROPERTY_NAME]] = region_metadata[
            REGION_IDENTIFIER_PROPERTY_NAME]
        REGION_REALMS[region_metadata[REGION_IDENTIFIER_PROPERTY_NAME]] = region_metadata[REALM_KEY_PROPERTY_NAME]
        REALMS[region_metadata[REALM_KEY_PROPERTY_NAME]] = region_metadata[REALM_DOMAIN_COMPONENT_PROPERTY_NAME]
        REGIONS.append(region_metadata[REGION_IDENTIFIER_PROPERTY_NAME])


def _add_alloy_regions(alloy_regions):
    logger.debug("Adding alloy regions from {}".format(alloy_regions))
    if isinstance(alloy_regions, list):
        for region_metadata in alloy_regions:
            _add_region_information_to_override(region_metadata)
    elif isinstance(alloy_regions, dict):
        _add_region_information_to_override(alloy_regions)
    else:
        raise TypeError("Invalid type for alloy regions. Please use a dictionary or a list of dictionaries")


def _process_region_metadata_from_alloy_config(alloy_regions_list):
    global _IS_ALLOY_REGION_COEXIST
    _IS_ALLOY_REGION_COEXIST = os.getenv(OCI_ALLOY_REGION_COEXIST_ENV_VAR_NAME)
    if _IS_ALLOY_REGION_COEXIST and _IS_ALLOY_REGION_COEXIST.lower() in ["true", "1"]:
        _IS_ALLOY_REGION_COEXIST = True
    else:
        _IS_ALLOY_REGION_COEXIST = False

    # If IS_ALLOY_REGION_COEXIST is false and alloy is true then set all sources except OCI_DEFAULT_REALM has been read,
    # reset regions data structures, add alloy regions
    if _IS_ALLOY_MODE and not _IS_ALLOY_REGION_COEXIST:
        _reset_region_info()
        for source in _has_been_read_external_sources:
            _set_source_has_been_read(source)
        _set_source_has_been_read(ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK, False)
        _add_alloy_regions(alloy_regions_list)

    # If coexist is true and alloy is true, add alloy regions and override existing regions with alloy config
    if _IS_ALLOY_MODE and _IS_ALLOY_REGION_COEXIST:
        _add_alloy_regions(alloy_regions_list)


def _check_and_add_region_metadata(region):
    if not _get_source_has_been_read(ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK):
        global second_level_domain_fallback
        second_level_domain_fallback = os.environ.get(DEFAULT_REALM_ENV_VARNAME)
        _set_source_has_been_read(ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK)

    # Follow the hierarchy of sources
    if _set_region_metadata_from_cfg_file(region):
        logger.debug("Added metadata information for {} region from regions config file".format(region))
        return True
    elif _set_region_metadata_from_env_var(region):
        logger.debug("Added metadata information for {} region from environment variable".format(region))
        return True
    elif _set_region_from_instance_metadata_service(region):
        logger.debug("Added metadata information for {} region from IMDS".format(region))
        return True
    elif '.' in region:
        logger.debug("Unknown regionId '{}', contains '.', will use it as domain (<service>.<domain>)".format(region))
        return False
    elif second_level_domain_fallback:
        logger.debug("Unknown regionId '{}', default realm is defined, will fallback to '{}'".format(region, second_level_domain_fallback))
        return False
    elif _IS_ALLOY_MODE and not _IS_ALLOY_REGION_COEXIST:
        logger.debug("You're using the {provider} Alloy configuration file, the region you're targeting is not declared"
                     " in this config file. Please check if this is the correct region you're targeting or contact the {provider} "
                     "cloud provider for help. If you want to target both OCI regions and {provider} regions, please set the "
                     "OCI_ALLOY_REGION_COEXIST env var to True.".format(provider=ALLOY_PROVIDER_NAME))

    else:
        logger.debug("Unknown regionId '{}', will assume it's in Realm OC1".format(region))
    return False


def _set_region_metadata_from_cfg_file(region):
    # Check if region information from regions cfg file has already been read
    if _get_source_has_been_read(ExternalSources.REGIONS_CFG_FILE):
        # Return False to allow fallback
        return False

    # Set source as read
    _set_source_has_been_read(ExternalSources.REGIONS_CFG_FILE)

    expanded_file_location = os.path.expanduser(REGIONS_CONFIG_FILE_PATH)
    if os.path.isfile(expanded_file_location):
        logger.debug("Regions configuration file found")
        try:
            with open(expanded_file_location, 'r') as regions_config_file:
                regions_metadata_raw = regions_config_file.read()
        except (OSError, IOError) as e:
            logger.debug("Reading regions configuration file failed because of error: {}".format(e))
            return False

        # Try importing JSONDecodeError for Python 2 and Python 3 compatibility
        try:
            from json.decoder import JSONDecodeError
        except ImportError:
            JSONDecodeError = ValueError
        try:
            regions_metadata_array = json.loads(regions_metadata_raw)
        except JSONDecodeError as e:
            # Unable to parse the json array
            logger.debug("Decoding JSON array from regions configuration file failed because of error: {}".format(e))
            return False

        added = False
        for region_metadata in regions_metadata_array:
            if _check_valid_schema(region_metadata):
                if _add_region_information_to_lookup(region_metadata, region):
                    added = True
        return added
    return False


def _set_region_metadata_from_env_var(region):
    # Check if region information from env var has already been read
    if _get_source_has_been_read(ExternalSources.ENV_VAR):
        # Return False to allow fallback
        return False

    # Set source as read
    _set_source_has_been_read(ExternalSources.ENV_VAR)

    if os.environ.get(OCI_REGION_METADATA_ENV_VAR_NAME):
        # Try importing JSONDecodeError for Python 2 and Python 3 compatibility
        try:
            from json.decoder import JSONDecodeError
        except ImportError:
            JSONDecodeError = ValueError
        try:
            region_metadata = json.loads(os.environ.get(OCI_REGION_METADATA_ENV_VAR_NAME))
        except JSONDecodeError as e:
            # Unable to parse the env variable
            logger.debug("Decoding JSON from environment variable failed because of error: {}".format(e))
            return False

        if _check_valid_schema(region_metadata):
            return _add_region_information_to_lookup(region_metadata, region)

    return False


def _set_region_from_instance_metadata_service(region):
    # Check if region information from IMDS has already been read
    if _get_source_has_been_read(ExternalSources.IMDS):
        # Return False to allow fallback
        return False

    # Set source as read
    _set_source_has_been_read(ExternalSources.IMDS)

    try:

        retry_strategy = Retry(
            total=3,
            status_forcelist=[429, 500, 502, 503, 504],
            read=0,
            connect=3,
            status=3
        )
        s = requests.Session()
        s.mount(GET_REGION_URL, HTTPAdapter(max_retries=retry_strategy))
        response = s.get(GET_REGION_URL, timeout=(10, 60), headers=METADATA_AUTH_HEADERS)
        region_metadata_raw = response.text
        logger.debug("Region metadata blob from IMDS is: {}".format(region_metadata_raw))
    except (HTTPError, ConnectionError, RetryError) as e:
        logger.debug("Failed to call IMDS service because of error: {}".format(e))
        return False

    if response.status_code != 200:
        logger.debug("HTTP Get Failed: URL: {}, Status: {}, Message: {}".format(GET_REGION_URL, response.status_code, response.text))
        return False

    try:
        from json.decoder import JSONDecodeError
    except ImportError:
        JSONDecodeError = ValueError
    try:
        region_metadata = json.loads(region_metadata_raw)
    except JSONDecodeError as e:
        # Unable to parse the raw response
        logger.debug("Decoding JSON from IMDS service because of error: {}".format(e))
        return False

    if _check_valid_schema(region_metadata):
        return _add_region_information_to_lookup(region_metadata, region)

    return False


def _check_valid_schema(region_metadata):
    for key in REGION_METADATA_KEYS:
        if key not in region_metadata:
            logger.debug("Key {} not found in region metadata".format(key))
            return False
        if region_metadata[key] == "":
            logger.debug("Value for key {} in region metadata is empty".format(key))
            return False
    return True


def _add_region_information_to_lookup(region_metadata, region=None):
    # Check if the region metadata has information about the requested region
    # Add the region information from region_metadata to the existing lookups
    region_metadata = {k: v.lower() for k, v in six.iteritems(region_metadata)}
    if region_metadata[REGION_KEY_PROPERTY_NAME] not in REGIONS_SHORT_NAMES:
        REGIONS_SHORT_NAMES[region_metadata[REGION_KEY_PROPERTY_NAME]] = region_metadata[REGION_IDENTIFIER_PROPERTY_NAME]

    if region_metadata[REGION_IDENTIFIER_PROPERTY_NAME] not in REGION_REALMS:
        REGION_REALMS[region_metadata[REGION_IDENTIFIER_PROPERTY_NAME]] = region_metadata[REALM_KEY_PROPERTY_NAME]

    if region_metadata[REALM_KEY_PROPERTY_NAME] not in REALMS:
        REALMS[region_metadata[REALM_KEY_PROPERTY_NAME]] = region_metadata[REALM_DOMAIN_COMPONENT_PROPERTY_NAME]

    if region_metadata[REGION_IDENTIFIER_PROPERTY_NAME] not in REGIONS:
        REGIONS.append(region_metadata[REGION_IDENTIFIER_PROPERTY_NAME])

    # Return True if the requested region is now known
    if region == region_metadata[REGION_IDENTIFIER_PROPERTY_NAME] or region == region_metadata[REGION_KEY_PROPERTY_NAME]:
        return True

    return False


def _endpoint_for(service, region=None, service_endpoint_template=None, endpoint_service_name=None):
    # Convert short code to region identifier
    if region in REGIONS_SHORT_NAMES:
        region = REGIONS_SHORT_NAMES[region]
    # for backwards compatibility, if region already has a '.'
    # then consider it the full domain and do not append '.oraclecloud.com'
    if '.' in region:
        if endpoint_service_name:
            endpoint = 'https://{}.{}'.format(endpoint_service_name, region)
            logger.debug("Construction endpoint from endpoint service name {} and region {} with '.' as domain. Endpoint: {}".format(endpoint_service_name, region, endpoint))
            return endpoint
        endpoint = _format_endpoint(service, region, service_endpoint_template, endpoint_service_name)
        return endpoint
    else:
        # get endpoint from template
        return _endpoint(service, region, service_endpoint_template)


def _endpoint(service, region, service_endpoint_template=None):
    if service_endpoint_template:
        endpoint = service_endpoint_template

        if region:
            endpoint = endpoint.replace("{region}", region)
        return endpoint.replace("{secondLevelDomain}", _second_level_domain(region))
    else:
        return _format_endpoint(service, DOMAIN_FORMAT.format(region=region, secondLevelDomain=_second_level_domain(region)))


def _second_level_domain(region):
    if not _get_source_has_been_read(ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK):
        global second_level_domain_fallback
        second_level_domain_fallback = os.environ.get(DEFAULT_REALM_ENV_VARNAME)
        _set_source_has_been_read(ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK)

    if region in REGION_REALMS.keys():
        realm = REGION_REALMS[region]
    elif second_level_domain_fallback:
        # if second_level_domain_fallback exists, default to using "unknown" realm
        # second_level_domain_fallback as the second level domain
        REALMS["unknown"] = second_level_domain_fallback
        realm = "unknown"
    else:
        # default realm to OC1 when available
        if 'oc1' not in REALMS:
            raise InvalidAlloyConfig("Fallback to default OCI region is blocked. This is likely happening if you are "
                                     "trying to access an alloy region not defined in the alloy region sources, and "
                                     "the env var OCI_ALLOY_REGION_COEXIST is not set to True.")
        realm = 'oc1'
    return REALMS[realm]


def _format_endpoint(service, domain, service_endpoint_template=None, endpoint_service_name=None):
    url_format = None

    if service.lower() not in SERVICE_ENDPOINTS:
        if service_endpoint_template is None:
            if endpoint_service_name:
                # No service_endpoint_template, but the spec specified the "service" portion of the endpoint in the spec
                url_format = "https://" + endpoint_service_name + ".{domain}"
            else:
                raise ValueError("Unknown service {!r}".format(service))
        else:
            # If there is no entry in SERVICE_ENDPOINTS and there is
            # a service_enpoint_template attempt to convert from new service template
            # to old template.
            # From "https://service.{region}.{subdomain}" to "https://service.{domain}"
            pos = service_endpoint_template.find("{region}")
            if pos != -1:
                url_format = service_endpoint_template[:pos] + "{domain}"
    else:
        url_format = SERVICE_ENDPOINTS[service.lower()]

    if url_format is None:
        raise ValueError("Unable to format endpoint for service, {} , and service_endpoint_template, {}".format(service, service_endpoint_template))

    endpoint = url_format.format(domain=domain)

    return endpoint

Youez - 2016 - github.com/yon3zu
LinuXploit