Server IP : 103.119.228.120 / Your IP : 18.216.70.205 Web Server : Apache System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64 User : nobody ( 99) PHP Version : 5.6.40 Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/mysqlsh/lib/python3.9/site-packages/oci/encryption/internal/ |
Upload File : |
# coding: utf-8 # Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved. # This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license. import struct import json from oci.encryption.internal.utils import ( convert_to_bytes, convert_to_str, convert_bytes_to_base64_encoded_string, convert_base64_encoded_string_to_bytes, ) from oci.encryption.internal.models import ( EncryptedDataHeader, EncryptedDataHeaderDataEncryptionKey, ) SERIALIZATION_FORMAT_VERSION = 0x0001 METADATA_KEY_ENCRYPTED_CONTENT_FORMAT = "encryptedContentFormat" METADATA_KEY_ENCRYPTED_DATA_KEYS = "encryptedDataKeys" METADATA_KEY_IV = "iv" METADATA_KEY_ALGORITHM_ID = "algorithmId" METADATA_KEY_ADDITIONAL_AUTHENTICATED_DATA = "additionalAuthenticatedData" ENCRYPTED_DATA_KEY_MASTER_KEY_ID = "masterKeyId" ENCRYPTED_DATA_KEY_VAULT_ID = "vaultId" ENCRYPTED_DATA_KEY_ENCRYPTED_DATA_KEY = "encryptedDataKey" ENCRYPTED_DATA_KEY_REGION = "region" # docs: https://docs.python.org/3.8/library/struct.html STRUCT_HEADER_FORMAT = ( ">" # big endian "H" # serialization format version ID "I" # JSON metadata length "{json_metadata_length}s" # JSON metadata ) def serialize_header(encrypted_data_header): encrypted_data_keys = [] for encrypted_data_key in encrypted_data_header.encrypted_data_keys: encrypted_data_keys.append( { ENCRYPTED_DATA_KEY_MASTER_KEY_ID: encrypted_data_key.master_key_id, ENCRYPTED_DATA_KEY_VAULT_ID: encrypted_data_key.vault_id, ENCRYPTED_DATA_KEY_ENCRYPTED_DATA_KEY: convert_bytes_to_base64_encoded_string( encrypted_data_key.encrypted_data_key_bytes ), ENCRYPTED_DATA_KEY_REGION: encrypted_data_key.region, } ) metadata = { METADATA_KEY_ENCRYPTED_CONTENT_FORMAT: encrypted_data_header.encrypted_content_format, METADATA_KEY_ENCRYPTED_DATA_KEYS: encrypted_data_keys, METADATA_KEY_IV: convert_bytes_to_base64_encoded_string( encrypted_data_header.iv_bytes ), METADATA_KEY_ALGORITHM_ID: encrypted_data_header.algorithm_id, METADATA_KEY_ADDITIONAL_AUTHENTICATED_DATA: convert_to_str( encrypted_data_header.additional_authenticated_data_bytes ), } json_header_as_string = json.dumps(metadata) header_format = STRUCT_HEADER_FORMAT.format( json_metadata_length=len(json_header_as_string) ) packed_header = struct.pack( header_format, SERIALIZATION_FORMAT_VERSION, len(json_header_as_string), convert_to_bytes(json_header_as_string), ) return packed_header def deserialize_header_from_stream(ciphertext_stream): short_format = ">H" short_size_offset = struct.calcsize(short_format) unsigned_int_format = ">I" unsigned_int_size_offset = struct.calcsize(unsigned_int_format) offset = 0 # get serialization format version next_content = ciphertext_stream.read(short_size_offset) (serialization_format_version,) = struct.unpack_from( short_format, next_content, offset ) offset = offset + short_size_offset if serialization_format_version != SERIALIZATION_FORMAT_VERSION: raise ValueError( "Could not deserialize header with unrecognized serialization format version: {}".format( serialization_format_version ) ) # get json metadata length next_content = ciphertext_stream.read(unsigned_int_size_offset) (json_metadata_length,) = struct.unpack_from(unsigned_int_format, next_content) offset = offset + short_size_offset # get json metadata chunk_format = "{}s".format(json_metadata_length) next_content = ciphertext_stream.read(struct.calcsize(chunk_format)) (json_metadata_bytes,) = struct.unpack_from(chunk_format, next_content) offset = offset + struct.calcsize(chunk_format) json_metadata = convert_to_str(json_metadata_bytes) try: metadata = json.loads(json_metadata) except ValueError as e: raise ValueError( "Could not parse metadata inside header. Error: {}".format(str(e)) ) required_top_level_keys = [ METADATA_KEY_IV, METADATA_KEY_ALGORITHM_ID, METADATA_KEY_ADDITIONAL_AUTHENTICATED_DATA, ] required_encrypted_data_key_keys = [ ENCRYPTED_DATA_KEY_MASTER_KEY_ID, ENCRYPTED_DATA_KEY_VAULT_ID, ENCRYPTED_DATA_KEY_ENCRYPTED_DATA_KEY, ENCRYPTED_DATA_KEY_REGION, ] missing_or_none_top_level_keys = [required_key for required_key in required_top_level_keys if (required_key not in metadata) or (metadata.get(required_key, None) is None) or (isinstance(metadata.get(required_key), list) and len(metadata.get(required_key)) == 0)] if missing_or_none_top_level_keys: raise ValueError( "Invalid header. The following metadata keys must be present and not null: {}.".format( ", ".join(missing_or_none_top_level_keys) ) ) encrypted_data_keys_raw = metadata.get(METADATA_KEY_ENCRYPTED_DATA_KEYS) encrypted_data_keys = [] for encrypted_data_key_raw in encrypted_data_keys_raw: missing_or_none_dek_keys = [required_key for required_key in required_encrypted_data_key_keys if (required_key not in encrypted_data_key_raw) or (encrypted_data_key_raw.get(required_key, None) is None)] if missing_or_none_dek_keys: raise ValueError( "Invalid header. The following metadata keys must be present and not null in each encrypted data key: {}.".format( ", ".join(missing_or_none_dek_keys) ) ) encrypted_data_keys.append( EncryptedDataHeaderDataEncryptionKey( master_key_id=encrypted_data_key_raw.get( ENCRYPTED_DATA_KEY_MASTER_KEY_ID ), vault_id=encrypted_data_key_raw.get(ENCRYPTED_DATA_KEY_VAULT_ID), encrypted_data_key_bytes=convert_base64_encoded_string_to_bytes( encrypted_data_key_raw.get(ENCRYPTED_DATA_KEY_ENCRYPTED_DATA_KEY) ), region=encrypted_data_key_raw.get(ENCRYPTED_DATA_KEY_REGION), ) ) header = EncryptedDataHeader( encrypted_data_keys=encrypted_data_keys, iv_bytes=convert_base64_encoded_string_to_bytes(metadata.get(METADATA_KEY_IV)), algorithm_id=metadata.get(METADATA_KEY_ALGORITHM_ID), additional_authenticated_data_bytes=convert_to_bytes( metadata.get(METADATA_KEY_ADDITIONAL_AUTHENTICATED_DATA) ), ) return header