Server IP : 103.119.228.120 / Your IP : 3.133.145.17 Web Server : Apache System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64 User : nobody ( 99) PHP Version : 5.6.40 Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/mysqlsh/lib/python3.9/site-packages/oci/ |
Upload File : |
# coding: utf-8 # Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved. # This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license. from __future__ import absolute_import import os import json import logging from enum import Enum from . import regions_definitions from . import service_endpoints from oci.exceptions import InvalidAlloyConfig from oci._vendor import six from oci._vendor import requests from oci._vendor.requests.exceptions import HTTPError, ConnectionError, RetryError from oci._vendor.requests.adapters import HTTPAdapter from oci._vendor.urllib3.util.retry import Retry REGIONS_SHORT_NAMES = regions_definitions.REGIONS_SHORT_NAMES REGION_REALMS = regions_definitions.REGION_REALMS REALMS = regions_definitions.REALMS REGIONS = regions_definitions.REGIONS SERVICE_ENDPOINTS = service_endpoints.SERVICE_ENDPOINTS SERVICE_ENDPOINTS['auth'] = 'https://auth.{domain}' DOMAIN_FORMAT = "{region}.{secondLevelDomain}" OCI_REGION_METADATA_ENV_VAR_NAME = "OCI_REGION_METADATA" REALM_KEY_PROPERTY_NAME = "realmKey" REALM_DOMAIN_COMPONENT_PROPERTY_NAME = "realmDomainComponent" REGION_KEY_PROPERTY_NAME = "regionKey" REGION_IDENTIFIER_PROPERTY_NAME = "regionIdentifier" # Region Metadata schema REGION_METADATA_KEYS = (REALM_KEY_PROPERTY_NAME, REALM_DOMAIN_COMPONENT_PROPERTY_NAME, REGION_KEY_PROPERTY_NAME, REGION_IDENTIFIER_PROPERTY_NAME) # Constants for getting region info from IMDS METADATA_URL_BASE = 'http://169.254.169.254/opc/v2' GET_REGION_URL = '{}/instance/regionInfo'.format(METADATA_URL_BASE) METADATA_AUTH_HEADERS = {'Authorization': 'Bearer Oracle'} # Constants for regions config file REGIONS_CONFIG_FILE_PATH = os.path.join('~', '.oci', 'regions-config.json') # Enumerate region information sources outside of lookups ExternalSources = Enum('ExternalSources', ['REGIONS_CFG_FILE', 'ENV_VAR', 'IMDS', 'SECOND_LEVEL_DOMAIN_FALLBACK']) # Constant env name for second level domain fallback. DEFAULT_REALM_ENV_VARNAME = "OCI_DEFAULT_REALM" # Variable for second_level_domain_fallback, initially empty string, but will be set to fallback if env.DEFAULT_REALM_ENV_VARNAME exists second_level_domain_fallback = "" # Dict to track if we have read the ExternalSources # Reading from IMDS is opt-in and can be enabled by calling enable_instance_metadata_service() _has_been_read_external_sources = { ExternalSources.REGIONS_CFG_FILE: False, ExternalSources.ENV_VAR: False, ExternalSources.IMDS: True, ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK: False } # Alloy config _IS_ALLOY_MODE = False OCI_ALLOY_REGION_COEXIST_ENV_VAR_NAME = "OCI_ALLOY_REGION_COEXIST" _IS_ALLOY_REGION_COEXIST = False ALLOY_PROVIDER_NAME = None logger = logging.getLogger(__name__) def is_region(region_name): region = region_name.lower() if region in REGIONS_SHORT_NAMES: return False if region not in REGIONS: return _check_and_add_region_metadata(region) return True def is_region_short_name(region): region = region.lower() if region in REGIONS_SHORT_NAMES: return True if region in REGIONS: return False if _check_and_add_region_metadata(region): # Above call will return true if the requested region is now known, after considering additional sources # Check is needed if region short code was passed if region in REGIONS_SHORT_NAMES: return True return False def get_region_from_short_name(region): region = region.lower() if not is_region_short_name(region): logger.debug("Could not find region information in lookups and any other sources") # Return the input string to allow fallback to OC1 return region # Return region from lookup return REGIONS_SHORT_NAMES[region] def get_realm_from_region(region): region = region.lower() # If short name is passed, get the long name # get_region_from_short_name also adds metadata from other sources region = get_region_from_short_name(region) if not is_region(region): logger.debug("Cannot find region information in lookups and other sources, defaulting to OC1 realm") return "oc1" else: return REGION_REALMS[region] def endpoint_for(service, region=None, endpoint=None, service_endpoint_template=None, endpoint_service_name=None): """Returns the base URl for a service, either in the given region or at the specified endpoint. If endpoint and region are provided, endpoint is used. If the region information is not available in the existing maps, following sources are checked in order: 1. Regions Configuration File at ~/.oci/regions-config.json 2. Region Metadata Environment variable 3. Instance Metadata Service Lookup from Instance Metadata Service is disabled by default. To enable, call enable_instance_metadata_service() The region metadata schema is: { "realmKey" : string, "realmDomainComponent" : string, "regionKey" : string, "regionIdentifier" : string } For example, for the Sydney OC1 region, the schema would be filled out as follows: { "realmKey" : "OC1", "realmDomainComponent" : "oraclecloud.com", "regionKey" : "SYD", "regionIdentifier" : "ap-sydney-1" } If the region still cannot be resolved, we fall back to OC1 realm. If alloy mode is enabled, and OCI_ALLOY_REGION_COEXIST is not set, only the alloy regions are available. If you want to access OCI and alloy regions at the same time, please set OCI_ALLOY_REGION_COEXIST to True. Fallback to OC1 realm is not available when OCI_ALLOY_REGION_COEXIST is not set. """ if not (endpoint or region): raise ValueError("Must supply either a region or an endpoint.") if endpoint: # endpoint takes priority return _format_endpoint(service, endpoint, service_endpoint_template, endpoint_service_name) region = region.lower() # If unable to find region information from existing maps, check the other sources and add if not (region in REGIONS or region in REGIONS_SHORT_NAMES): _check_and_add_region_metadata(region) return _endpoint_for(service, region, service_endpoint_template, endpoint_service_name) def skip_instance_metadata_service(): logger.debug("Disabling region metadata lookup from IMDS") _set_source_has_been_read(ExternalSources.IMDS) def enable_instance_metadata_service(): logger.debug("Enabling region metadata lookup from IMDS") _set_source_has_been_read(ExternalSources.IMDS, False) def add_region_schema_for_alloy(region_metadata): """ This method is called from the alloy_config.py file to register an alloy region. Args: region_metadata: dict """ # If we are here, then alloy-config.json has already been processed by _process_region_metadata_from_alloy_config, # since that method is called while initialization. It also processes _IS_ALLOY_REGION_COEXIST and resets region info # and external sources accordingly, except for regions added through alloy-config.json file. Check coexists and # reset accordingly if not _IS_ALLOY_REGION_COEXIST: _reset_region_info() _add_alloy_regions(region_metadata) def _set_source_has_been_read(source, value=True): _has_been_read_external_sources[source] = value def _get_source_has_been_read(source): return _has_been_read_external_sources[source] def _reset_region_info(): logger.debug("Resetting regions information") global REGIONS_SHORT_NAMES, REGION_REALMS, REALMS, REGIONS REGIONS_SHORT_NAMES = {} REGION_REALMS = {} REALMS = {} REGIONS = [] def _add_region_information_to_override(region_metadata): if _check_valid_schema(region_metadata): region_metadata = {k: v.lower() for k, v in six.iteritems(region_metadata)} REGIONS_SHORT_NAMES[region_metadata[REGION_KEY_PROPERTY_NAME]] = region_metadata[ REGION_IDENTIFIER_PROPERTY_NAME] REGION_REALMS[region_metadata[REGION_IDENTIFIER_PROPERTY_NAME]] = region_metadata[REALM_KEY_PROPERTY_NAME] REALMS[region_metadata[REALM_KEY_PROPERTY_NAME]] = region_metadata[REALM_DOMAIN_COMPONENT_PROPERTY_NAME] REGIONS.append(region_metadata[REGION_IDENTIFIER_PROPERTY_NAME]) def _add_alloy_regions(alloy_regions): logger.debug("Adding alloy regions from {}".format(alloy_regions)) if isinstance(alloy_regions, list): for region_metadata in alloy_regions: _add_region_information_to_override(region_metadata) elif isinstance(alloy_regions, dict): _add_region_information_to_override(alloy_regions) else: raise TypeError("Invalid type for alloy regions. Please use a dictionary or a list of dictionaries") def _process_region_metadata_from_alloy_config(alloy_regions_list): global _IS_ALLOY_REGION_COEXIST _IS_ALLOY_REGION_COEXIST = os.getenv(OCI_ALLOY_REGION_COEXIST_ENV_VAR_NAME) if _IS_ALLOY_REGION_COEXIST and _IS_ALLOY_REGION_COEXIST.lower() in ["true", "1"]: _IS_ALLOY_REGION_COEXIST = True else: _IS_ALLOY_REGION_COEXIST = False # If IS_ALLOY_REGION_COEXIST is false and alloy is true then set all sources except OCI_DEFAULT_REALM has been read, # reset regions data structures, add alloy regions if _IS_ALLOY_MODE and not _IS_ALLOY_REGION_COEXIST: _reset_region_info() for source in _has_been_read_external_sources: _set_source_has_been_read(source) _set_source_has_been_read(ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK, False) _add_alloy_regions(alloy_regions_list) # If coexist is true and alloy is true, add alloy regions and override existing regions with alloy config if _IS_ALLOY_MODE and _IS_ALLOY_REGION_COEXIST: _add_alloy_regions(alloy_regions_list) def _check_and_add_region_metadata(region): if not _get_source_has_been_read(ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK): global second_level_domain_fallback second_level_domain_fallback = os.environ.get(DEFAULT_REALM_ENV_VARNAME) _set_source_has_been_read(ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK) # Follow the hierarchy of sources if _set_region_metadata_from_cfg_file(region): logger.debug("Added metadata information for {} region from regions config file".format(region)) return True elif _set_region_metadata_from_env_var(region): logger.debug("Added metadata information for {} region from environment variable".format(region)) return True elif _set_region_from_instance_metadata_service(region): logger.debug("Added metadata information for {} region from IMDS".format(region)) return True elif '.' in region: logger.debug("Unknown regionId '{}', contains '.', will use it as domain (<service>.<domain>)".format(region)) return False elif second_level_domain_fallback: logger.debug("Unknown regionId '{}', default realm is defined, will fallback to '{}'".format(region, second_level_domain_fallback)) return False elif _IS_ALLOY_MODE and not _IS_ALLOY_REGION_COEXIST: logger.debug("You're using the {provider} Alloy configuration file, the region you're targeting is not declared" " in this config file. Please check if this is the correct region you're targeting or contact the {provider} " "cloud provider for help. If you want to target both OCI regions and {provider} regions, please set the " "OCI_ALLOY_REGION_COEXIST env var to True.".format(provider=ALLOY_PROVIDER_NAME)) else: logger.debug("Unknown regionId '{}', will assume it's in Realm OC1".format(region)) return False def _set_region_metadata_from_cfg_file(region): # Check if region information from regions cfg file has already been read if _get_source_has_been_read(ExternalSources.REGIONS_CFG_FILE): # Return False to allow fallback return False # Set source as read _set_source_has_been_read(ExternalSources.REGIONS_CFG_FILE) expanded_file_location = os.path.expanduser(REGIONS_CONFIG_FILE_PATH) if os.path.isfile(expanded_file_location): logger.debug("Regions configuration file found") try: with open(expanded_file_location, 'r') as regions_config_file: regions_metadata_raw = regions_config_file.read() except (OSError, IOError) as e: logger.debug("Reading regions configuration file failed because of error: {}".format(e)) return False # Try importing JSONDecodeError for Python 2 and Python 3 compatibility try: from json.decoder import JSONDecodeError except ImportError: JSONDecodeError = ValueError try: regions_metadata_array = json.loads(regions_metadata_raw) except JSONDecodeError as e: # Unable to parse the json array logger.debug("Decoding JSON array from regions configuration file failed because of error: {}".format(e)) return False added = False for region_metadata in regions_metadata_array: if _check_valid_schema(region_metadata): if _add_region_information_to_lookup(region_metadata, region): added = True return added return False def _set_region_metadata_from_env_var(region): # Check if region information from env var has already been read if _get_source_has_been_read(ExternalSources.ENV_VAR): # Return False to allow fallback return False # Set source as read _set_source_has_been_read(ExternalSources.ENV_VAR) if os.environ.get(OCI_REGION_METADATA_ENV_VAR_NAME): # Try importing JSONDecodeError for Python 2 and Python 3 compatibility try: from json.decoder import JSONDecodeError except ImportError: JSONDecodeError = ValueError try: region_metadata = json.loads(os.environ.get(OCI_REGION_METADATA_ENV_VAR_NAME)) except JSONDecodeError as e: # Unable to parse the env variable logger.debug("Decoding JSON from environment variable failed because of error: {}".format(e)) return False if _check_valid_schema(region_metadata): return _add_region_information_to_lookup(region_metadata, region) return False def _set_region_from_instance_metadata_service(region): # Check if region information from IMDS has already been read if _get_source_has_been_read(ExternalSources.IMDS): # Return False to allow fallback return False # Set source as read _set_source_has_been_read(ExternalSources.IMDS) try: retry_strategy = Retry( total=3, status_forcelist=[429, 500, 502, 503, 504], read=0, connect=3, status=3 ) s = requests.Session() s.mount(GET_REGION_URL, HTTPAdapter(max_retries=retry_strategy)) response = s.get(GET_REGION_URL, timeout=(10, 60), headers=METADATA_AUTH_HEADERS) region_metadata_raw = response.text logger.debug("Region metadata blob from IMDS is: {}".format(region_metadata_raw)) except (HTTPError, ConnectionError, RetryError) as e: logger.debug("Failed to call IMDS service because of error: {}".format(e)) return False if response.status_code != 200: logger.debug("HTTP Get Failed: URL: {}, Status: {}, Message: {}".format(GET_REGION_URL, response.status_code, response.text)) return False try: from json.decoder import JSONDecodeError except ImportError: JSONDecodeError = ValueError try: region_metadata = json.loads(region_metadata_raw) except JSONDecodeError as e: # Unable to parse the raw response logger.debug("Decoding JSON from IMDS service because of error: {}".format(e)) return False if _check_valid_schema(region_metadata): return _add_region_information_to_lookup(region_metadata, region) return False def _check_valid_schema(region_metadata): for key in REGION_METADATA_KEYS: if key not in region_metadata: logger.debug("Key {} not found in region metadata".format(key)) return False if region_metadata[key] == "": logger.debug("Value for key {} in region metadata is empty".format(key)) return False return True def _add_region_information_to_lookup(region_metadata, region=None): # Check if the region metadata has information about the requested region # Add the region information from region_metadata to the existing lookups region_metadata = {k: v.lower() for k, v in six.iteritems(region_metadata)} if region_metadata[REGION_KEY_PROPERTY_NAME] not in REGIONS_SHORT_NAMES: REGIONS_SHORT_NAMES[region_metadata[REGION_KEY_PROPERTY_NAME]] = region_metadata[REGION_IDENTIFIER_PROPERTY_NAME] if region_metadata[REGION_IDENTIFIER_PROPERTY_NAME] not in REGION_REALMS: REGION_REALMS[region_metadata[REGION_IDENTIFIER_PROPERTY_NAME]] = region_metadata[REALM_KEY_PROPERTY_NAME] if region_metadata[REALM_KEY_PROPERTY_NAME] not in REALMS: REALMS[region_metadata[REALM_KEY_PROPERTY_NAME]] = region_metadata[REALM_DOMAIN_COMPONENT_PROPERTY_NAME] if region_metadata[REGION_IDENTIFIER_PROPERTY_NAME] not in REGIONS: REGIONS.append(region_metadata[REGION_IDENTIFIER_PROPERTY_NAME]) # Return True if the requested region is now known if region == region_metadata[REGION_IDENTIFIER_PROPERTY_NAME] or region == region_metadata[REGION_KEY_PROPERTY_NAME]: return True return False def _endpoint_for(service, region=None, service_endpoint_template=None, endpoint_service_name=None): # Convert short code to region identifier if region in REGIONS_SHORT_NAMES: region = REGIONS_SHORT_NAMES[region] # for backwards compatibility, if region already has a '.' # then consider it the full domain and do not append '.oraclecloud.com' if '.' in region: if endpoint_service_name: endpoint = 'https://{}.{}'.format(endpoint_service_name, region) logger.debug("Construction endpoint from endpoint service name {} and region {} with '.' as domain. Endpoint: {}".format(endpoint_service_name, region, endpoint)) return endpoint endpoint = _format_endpoint(service, region, service_endpoint_template, endpoint_service_name) return endpoint else: # get endpoint from template return _endpoint(service, region, service_endpoint_template) def _endpoint(service, region, service_endpoint_template=None): if service_endpoint_template: endpoint = service_endpoint_template if region: endpoint = endpoint.replace("{region}", region) return endpoint.replace("{secondLevelDomain}", _second_level_domain(region)) else: return _format_endpoint(service, DOMAIN_FORMAT.format(region=region, secondLevelDomain=_second_level_domain(region))) def _second_level_domain(region): if not _get_source_has_been_read(ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK): global second_level_domain_fallback second_level_domain_fallback = os.environ.get(DEFAULT_REALM_ENV_VARNAME) _set_source_has_been_read(ExternalSources.SECOND_LEVEL_DOMAIN_FALLBACK) if region in REGION_REALMS.keys(): realm = REGION_REALMS[region] elif second_level_domain_fallback: # if second_level_domain_fallback exists, default to using "unknown" realm # second_level_domain_fallback as the second level domain REALMS["unknown"] = second_level_domain_fallback realm = "unknown" else: # default realm to OC1 when available if 'oc1' not in REALMS: raise InvalidAlloyConfig("Fallback to default OCI region is blocked. This is likely happening if you are " "trying to access an alloy region not defined in the alloy region sources, and " "the env var OCI_ALLOY_REGION_COEXIST is not set to True.") realm = 'oc1' return REALMS[realm] def _format_endpoint(service, domain, service_endpoint_template=None, endpoint_service_name=None): url_format = None if service.lower() not in SERVICE_ENDPOINTS: if service_endpoint_template is None: if endpoint_service_name: # No service_endpoint_template, but the spec specified the "service" portion of the endpoint in the spec url_format = "https://" + endpoint_service_name + ".{domain}" else: raise ValueError("Unknown service {!r}".format(service)) else: # If there is no entry in SERVICE_ENDPOINTS and there is # a service_enpoint_template attempt to convert from new service template # to old template. # From "https://service.{region}.{subdomain}" to "https://service.{domain}" pos = service_endpoint_template.find("{region}") if pos != -1: url_format = service_endpoint_template[:pos] + "{domain}" else: url_format = SERVICE_ENDPOINTS[service.lower()] if url_format is None: raise ValueError("Unable to format endpoint for service, {} , and service_endpoint_template, {}".format(service, service_endpoint_template)) endpoint = url_format.format(domain=domain) return endpoint