403Webshell
Server IP : 103.119.228.120  /  Your IP : 3.136.236.178
Web Server : Apache
System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User : nobody ( 99)
PHP Version : 5.6.40
Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime
MySQL : ON |  cURL : ON |  WGET : ON |  Perl : ON |  Python : ON |  Sudo : ON |  Pkexec : ON
Directory :  /usr/lib/mysqlsh/lib/python3.9/site-packages/oci/encryption/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /usr/lib/mysqlsh/lib/python3.9/site-packages/oci/encryption/key_providers.py
# coding: utf-8
# Copyright (c) 2016, 2024, Oracle and/or its affiliates.  All rights reserved.
# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.

from oci._vendor import six

import abc
import base64
from cryptography.hazmat.primitives.ciphers import algorithms

from oci.exceptions import ServiceError
from oci.key_management.models import KeyShape, GenerateKeyDetails, DecryptDataDetails
from oci.key_management import KmsCryptoClient, KmsManagementClient, KmsVaultClient
from oci.encryption.internal.utils import (
    convert_to_str,
    verify_crc32_checksum,
    raise_runtime_error_from
)


@six.add_metaclass(abc.ABCMeta)
class MasterKeyProvider(object):
    """
    An abstract base class defining methods to vend MasterKeys
    for use in encryption and decryption.
    """
    @abc.abstractmethod
    def get_primary_master_key(self):
        """
        Returns the primary master key for this MasterKeyProvider.

        :rtype: oci.encryption.MasterKey
        """
        pass

    @abc.abstractmethod
    def get_master_key(self, **kwargs):
        """
        Returns a specific master key based on the arguments provided.

        :rtype: oci.encryption.MasterKey
        """
        pass


class KMSMasterKeyProvider(MasterKeyProvider):
    def __init__(self, config, kms_master_keys=None, **kwargs):
        """
        :param dict config: (required)
            An OCI config dict used to create underlying clients to talk to OCI KMS.
            Note, the 'region' in this config must match the region that the key / vault
            exist in otherwise they will not be found.

        :param list[KMSMasterKey] kms_master_keys: (optional)
            A list of KMSMasterKeys. Currently a max of 1 master key is supported.
            For decryption, you can use a KMSMasterKeyProvder with no master keys.

        :param signer: (optional)
            The signer to use when signing requests made by the service client. The default is to use a :py:class:`~oci.signer.Signer` based on the values
            provided in the config parameter.

            One use case for this parameter is for `Instance Principals authentication <https://docs.cloud.oracle.com/Content/Identity/Tasks/callingservicesfrominstances.htm>`__
            by passing an instance of :py:class:`~oci.auth.signers.InstancePrincipalsSecurityTokenSigner` as the value for this keyword argument
        :type signer: :py:class:`~oci.signer.AbstractBaseSigner`
        """
        if kms_master_keys is not None and len(kms_master_keys) > 1:
            raise ValueError(
                "Only one KMS master key is currently supported for KMSMasterKeyProvider"
            )

        self.primary_master_key = None
        if kms_master_keys:
            self.primary_master_key = kms_master_keys[0]

        self.signer = kwargs.get('signer')
        self.config = config
        if not self.config and not self.signer:
            raise ValueError(
                "Either a config or signer must be passed in"
            )

    def get_primary_master_key(self):
        return self.primary_master_key

    def get_master_key(self, **kwargs):
        """
        Get a KMSMasterKey based on the provided parameters.

        If this key provider already has the KMSMasterKey that was requested, it will return it.
        If it does not have a representation of the KMSMasterKey locally, it will attempt to
        retrieve it from KMS.

        :param str master_key_id: (required)
            The OCID of this master key

        :param str vault_id: (optional)
            The OCID of the vault this master key resides in

        :param str region: (optional)
            The region this master key resides in
        """
        if not kwargs.get("master_key_id"):
            raise ValueError("keyword argument master_key_id must not be None")

        if self.primary_master_key and self.primary_master_key.get_identifier() == kwargs.get(
            "master_key_id"
        ):
            return self.primary_master_key

        master_key_config = self.config

        if self.signer:
            kwargs["signer"] = self.signer

        kms_master_key = KMSMasterKey(config=master_key_config, **kwargs)
        return kms_master_key


@six.add_metaclass(abc.ABCMeta)
class MasterKey(object):
    """
    An abstract base class representing a MasterKey resource to be used in
    encryption and decryption operations.
    """
    @abc.abstractmethod
    def generate_data_encryption_key(self, algorithm):
        """
        Generates a data encryption key (DEK) based on the algorithm provided using
        this MasterKey.  The returned DataEncryptionKey includes a copy of the
        DEK encrypted under this MasterKey.

        :param oci.encryption.algorithms.Algorithm algorithm: (required)
            The algorithm the key will be used for.

        :rtype: oci.encryption.key_providers.DataEncryptionKey
        """
        pass

    @abc.abstractmethod
    def decrypt(self, bytes_to_decrypt):
        """
        Decrypts and returns bytes that were encrypted under this master key.

        :param bytes bytes_to_decrypt: (required)
            The bytes to decrypt using this MasterKey.

        :rtype: bytes
        """
        pass

    @abc.abstractmethod
    def get_identifier(self):
        """
        Returns an identifier for this MasterKey.

        :rtype: str
        """
        pass


class KMSMasterKey(MasterKey):
    def __init__(self, config, master_key_id, vault_id, **kwargs):
        """
        Represents a MasterKey contained in the OCI Key Management Service.

        :param dict config: (required)
            An OCI config dict used to create underlying clients to talk to OCI KMS.
            Note, the 'region' in this config must match the region that the key / vault
            exist in otherwise they will not be found.

        :param str master_key_id: (required)
            The OCID of the KMS master key

        :param str vault_id: (required)
            The OCID of the vault containing the master key

        :param signer: (optional)
            The signer to use when signing requests made by the service client. The default is to use a :py:class:`~oci.signer.Signer` based on the values
            provided in the config parameter.

            One use case for this parameter is for `Instance Principals authentication <https://docs.cloud.oracle.com/Content/Identity/Tasks/callingservicesfrominstances.htm>`__
            by passing an instance of :py:class:`~oci.auth.signers.InstancePrincipalsSecurityTokenSigner` as the value for this keyword argument
        :type signer: :py:class:`~oci.signer.AbstractBaseSigner`

        :param str region: (optional)
            The region this master key resides in
        """
        if not config and not kwargs.get("signer"):
            raise ValueError(
                "Either a config or signer must be passed in"
            )

        self.region = None
        # Get region from **kwargs, config, or signer
        if kwargs.get('region'):
            self.region = kwargs.get("region")
        elif "region" in config:
            self.region = config["region"]
        elif kwargs.get('signer'):
            self.region = kwargs.get("signer").region

        kms_vault_client = KmsVaultClient(config, **kwargs)
        # There is a chance that caller specified a region and differs from the config or signer's region
        kms_vault_client.base_client.set_region(self.region)

        try:
            vault = kms_vault_client.get_vault(vault_id).data
        except ServiceError as service_error:
            message = "Failed to access vaultId: {vault_id} while targeting region: {region}.".format(
                vault_id=vault_id, region=self.region
            )
            raise_runtime_error_from(message, service_error)

        self.kms_management_client = KmsManagementClient(
            config, service_endpoint=vault.management_endpoint, **kwargs
        )

        self.kms_crypto_client = KmsCryptoClient(
            config, service_endpoint=vault.crypto_endpoint, **kwargs
        )

        self.master_key_id = master_key_id
        self.vault_id = vault.id

    def generate_data_encryption_key(self, algorithm):
        dek_key_shape = KeyShape()

        # only AES is currently supported for client side encryption
        if algorithm.algorithm.name != algorithms.AES.name:
            raise ValueError(
                "Only AES is currently supported for client side encryption with KMS master key"
            )

        dek_key_shape.algorithm = KeyShape.ALGORITHM_AES
        dek_key_shape.length = algorithm.key_len

        generate_key_details = GenerateKeyDetails()
        generate_key_details.include_plaintext_key = True
        generate_key_details.key_id = self.master_key_id
        generate_key_details.key_shape = dek_key_shape

        try:
            generated_key = self.kms_crypto_client.generate_data_encryption_key(
                generate_key_details
            ).data
        except ServiceError as service_error:
            message = "Failed to generate data encryption key using masterKeyId: {master_key_id} while targeting vault: {vault_id}.".format(
                master_key_id=self.master_key_id,
                vault_id=self.vault_id
            )
            raise_runtime_error_from(message, service_error)

        dek_plaintext_bytes = base64.b64decode(generated_key.plaintext)
        dek_ciphertext_bytes = base64.b64decode(generated_key.ciphertext)

        return DataEncryptionKey(
            plaintext_key_bytes=dek_plaintext_bytes,
            encrypted_key_bytes=dek_ciphertext_bytes,
            plaintext_key_checksum=generated_key.plaintext_checksum,
        )

    def decrypt(self, bytes_to_decrypt):
        """
        Decrypts and returns bytes that were encrypted under this master key.

        :param bytes bytes_to_decrypt: (required)
            The bytes to decrypt using this MasterKey.

        :rtype: bytes
        """
        decrypt_data_details = DecryptDataDetails()

        # KMS API expects the key base64 encoded
        decrypt_data_details.ciphertext = convert_to_str(
            base64.b64encode(bytes_to_decrypt)
        )
        decrypt_data_details.key_id = self.master_key_id

        try:
            decrypted_data = self.kms_crypto_client.decrypt(decrypt_data_details).data
        except ServiceError as service_error:
            message = "Failed to decrypt data encryption key using masterKeyId: {master_key_id} while targeting vault: {vault_id}.".format(
                master_key_id=self.master_key_id,
                vault_id=self.vault_id
            )
            raise_runtime_error_from(message, service_error)

        verify_crc32_checksum(
            base64.b64decode(decrypted_data.plaintext),
            decrypted_data.plaintext_checksum,
        )

        return decrypted_data.plaintext

    def get_identifier(self):
        """
        Returns the OCID of this master key in the OCI Key Management Service.

        :rtype: str
        """
        return self.master_key_id


class DataEncryptionKey(object):
    """
    Represents a data encryption key used to encrypt and decrypt payloads.
    """
    def __init__(
        self, plaintext_key_bytes, encrypted_key_bytes, plaintext_key_checksum=None
    ):
        """
        :param bytes plaintext_key_bytes:
            The bytes of the data encryption key in plaintext

        :param bytes encrypted_key_bytes:
            The bytes of the data encrypted key encrypted under a master key

        :param str plaintext_key_checksum:
            The crc32 checsum of the plaintext data encryption key
        """
        self.plaintext_key_bytes = plaintext_key_bytes
        self.encrypted_key_bytes = encrypted_key_bytes
        self.plaintext_key_checksum = plaintext_key_checksum

        if self.plaintext_key_checksum:
            verify_crc32_checksum(plaintext_key_bytes, plaintext_key_checksum)

Youez - 2016 - github.com/yon3zu
LinuXploit