403Webshell
Server IP : 103.119.228.120  /  Your IP : 52.15.185.147
Web Server : Apache
System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User : nobody ( 99)
PHP Version : 5.6.40
Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime
MySQL : ON |  cURL : ON |  WGET : ON |  Perl : ON |  Python : ON |  Sudo : ON |  Pkexec : ON
Directory :  /usr/local/ssl/lib/mysqlsh/lib/python3.9/site-packages/oci/encryption/internal/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /usr/local/ssl/lib/mysqlsh/lib/python3.9/site-packages/oci/encryption/internal/serialization.py
# coding: utf-8
# Copyright (c) 2016, 2024, Oracle and/or its affiliates.  All rights reserved.
# This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.

import struct
import json

from oci.encryption.internal.utils import (
    convert_to_bytes,
    convert_to_str,
    convert_bytes_to_base64_encoded_string,
    convert_base64_encoded_string_to_bytes,
)

from oci.encryption.internal.models import (
    EncryptedDataHeader,
    EncryptedDataHeaderDataEncryptionKey,
)

SERIALIZATION_FORMAT_VERSION = 0x0001

METADATA_KEY_ENCRYPTED_CONTENT_FORMAT = "encryptedContentFormat"
METADATA_KEY_ENCRYPTED_DATA_KEYS = "encryptedDataKeys"
METADATA_KEY_IV = "iv"
METADATA_KEY_ALGORITHM_ID = "algorithmId"
METADATA_KEY_ADDITIONAL_AUTHENTICATED_DATA = "additionalAuthenticatedData"

ENCRYPTED_DATA_KEY_MASTER_KEY_ID = "masterKeyId"
ENCRYPTED_DATA_KEY_VAULT_ID = "vaultId"
ENCRYPTED_DATA_KEY_ENCRYPTED_DATA_KEY = "encryptedDataKey"
ENCRYPTED_DATA_KEY_REGION = "region"

# docs: https://docs.python.org/3.8/library/struct.html
STRUCT_HEADER_FORMAT = (
    ">"  # big endian
    "H"  # serialization format version ID
    "I"  # JSON metadata length
    "{json_metadata_length}s"  # JSON metadata
)


def serialize_header(encrypted_data_header):
    encrypted_data_keys = []
    for encrypted_data_key in encrypted_data_header.encrypted_data_keys:
        encrypted_data_keys.append(
            {
                ENCRYPTED_DATA_KEY_MASTER_KEY_ID: encrypted_data_key.master_key_id,
                ENCRYPTED_DATA_KEY_VAULT_ID: encrypted_data_key.vault_id,
                ENCRYPTED_DATA_KEY_ENCRYPTED_DATA_KEY: convert_bytes_to_base64_encoded_string(
                    encrypted_data_key.encrypted_data_key_bytes
                ),
                ENCRYPTED_DATA_KEY_REGION: encrypted_data_key.region,
            }
        )

    metadata = {
        METADATA_KEY_ENCRYPTED_CONTENT_FORMAT: encrypted_data_header.encrypted_content_format,
        METADATA_KEY_ENCRYPTED_DATA_KEYS: encrypted_data_keys,
        METADATA_KEY_IV: convert_bytes_to_base64_encoded_string(
            encrypted_data_header.iv_bytes
        ),
        METADATA_KEY_ALGORITHM_ID: encrypted_data_header.algorithm_id,
        METADATA_KEY_ADDITIONAL_AUTHENTICATED_DATA: convert_to_str(
            encrypted_data_header.additional_authenticated_data_bytes
        ),
    }

    json_header_as_string = json.dumps(metadata)
    header_format = STRUCT_HEADER_FORMAT.format(
        json_metadata_length=len(json_header_as_string)
    )

    packed_header = struct.pack(
        header_format,
        SERIALIZATION_FORMAT_VERSION,
        len(json_header_as_string),
        convert_to_bytes(json_header_as_string),
    )

    return packed_header


def deserialize_header_from_stream(ciphertext_stream):
    short_format = ">H"
    short_size_offset = struct.calcsize(short_format)
    unsigned_int_format = ">I"
    unsigned_int_size_offset = struct.calcsize(unsigned_int_format)
    offset = 0

    # get serialization format version
    next_content = ciphertext_stream.read(short_size_offset)
    (serialization_format_version,) = struct.unpack_from(
        short_format, next_content, offset
    )
    offset = offset + short_size_offset

    if serialization_format_version != SERIALIZATION_FORMAT_VERSION:
        raise ValueError(
            "Could not deserialize header with unrecognized serialization format version: {}".format(
                serialization_format_version
            )
        )

    # get json metadata length
    next_content = ciphertext_stream.read(unsigned_int_size_offset)
    (json_metadata_length,) = struct.unpack_from(unsigned_int_format, next_content)
    offset = offset + short_size_offset

    # get json metadata
    chunk_format = "{}s".format(json_metadata_length)
    next_content = ciphertext_stream.read(struct.calcsize(chunk_format))
    (json_metadata_bytes,) = struct.unpack_from(chunk_format, next_content)
    offset = offset + struct.calcsize(chunk_format)

    json_metadata = convert_to_str(json_metadata_bytes)

    try:
        metadata = json.loads(json_metadata)
    except ValueError as e:
        raise ValueError(
            "Could not parse metadata inside header. Error: {}".format(str(e))
        )

    required_top_level_keys = [
        METADATA_KEY_IV,
        METADATA_KEY_ALGORITHM_ID,
        METADATA_KEY_ADDITIONAL_AUTHENTICATED_DATA,
    ]

    required_encrypted_data_key_keys = [
        ENCRYPTED_DATA_KEY_MASTER_KEY_ID,
        ENCRYPTED_DATA_KEY_VAULT_ID,
        ENCRYPTED_DATA_KEY_ENCRYPTED_DATA_KEY,
        ENCRYPTED_DATA_KEY_REGION,
    ]

    missing_or_none_top_level_keys = [required_key for required_key in required_top_level_keys if (required_key not in metadata) or (metadata.get(required_key, None) is None) or (isinstance(metadata.get(required_key), list) and len(metadata.get(required_key)) == 0)]
    if missing_or_none_top_level_keys:
        raise ValueError(
            "Invalid header. The following metadata keys must be present and not null: {}.".format(
                ", ".join(missing_or_none_top_level_keys)
            )
        )

    encrypted_data_keys_raw = metadata.get(METADATA_KEY_ENCRYPTED_DATA_KEYS)
    encrypted_data_keys = []
    for encrypted_data_key_raw in encrypted_data_keys_raw:
        missing_or_none_dek_keys = [required_key for required_key in required_encrypted_data_key_keys if (required_key not in encrypted_data_key_raw) or (encrypted_data_key_raw.get(required_key, None) is None)]
        if missing_or_none_dek_keys:
            raise ValueError(
                "Invalid header. The following metadata keys must be present and not null in each encrypted data key: {}.".format(
                    ", ".join(missing_or_none_dek_keys)
                )
            )

        encrypted_data_keys.append(
            EncryptedDataHeaderDataEncryptionKey(
                master_key_id=encrypted_data_key_raw.get(
                    ENCRYPTED_DATA_KEY_MASTER_KEY_ID
                ),
                vault_id=encrypted_data_key_raw.get(ENCRYPTED_DATA_KEY_VAULT_ID),
                encrypted_data_key_bytes=convert_base64_encoded_string_to_bytes(
                    encrypted_data_key_raw.get(ENCRYPTED_DATA_KEY_ENCRYPTED_DATA_KEY)
                ),
                region=encrypted_data_key_raw.get(ENCRYPTED_DATA_KEY_REGION),
            )
        )

    header = EncryptedDataHeader(
        encrypted_data_keys=encrypted_data_keys,
        iv_bytes=convert_base64_encoded_string_to_bytes(metadata.get(METADATA_KEY_IV)),
        algorithm_id=metadata.get(METADATA_KEY_ALGORITHM_ID),
        additional_authenticated_data_bytes=convert_to_bytes(
            metadata.get(METADATA_KEY_ADDITIONAL_AUTHENTICATED_DATA)
        ),
    )

    return header

Youez - 2016 - github.com/yon3zu
LinuXploit