Server IP : 103.119.228.120 / Your IP : 3.15.211.71 Web Server : Apache System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64 User : nobody ( 99) PHP Version : 5.6.40 Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/mysqlsh/lib/python3.9/site-packages/oci/_vendor/jwt/ |
Upload File : |
# coding: utf-8 # Modified Work: Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. # This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license. # Original Work: Copyright (c) 2015 José Padilla import binascii import json import warnings try: # import required by mypy to perform type checking, not used for normal execution from typing import Callable, Dict, List, Optional, Union # NOQA except ImportError: pass from .algorithms import ( Algorithm, get_default_algorithms, has_crypto, requires_cryptography # NOQA ) from .compat import Mapping, binary_type, string_types, text_type from .exceptions import ( DecodeError, InvalidAlgorithmError, InvalidSignatureError, InvalidTokenError ) from .utils import base64url_decode, base64url_encode, force_bytes, merge_dict class PyJWS(object): header_typ = 'JWT' def __init__(self, algorithms=None, options=None): self._algorithms = get_default_algorithms() self._valid_algs = (set(algorithms) if algorithms is not None else set(self._algorithms)) # Remove algorithms that aren't on the whitelist for key in list(self._algorithms.keys()): if key not in self._valid_algs: del self._algorithms[key] if not options: options = {} self.options = merge_dict(self._get_default_options(), options) @staticmethod def _get_default_options(): return { 'verify_signature': True } def register_algorithm(self, alg_id, alg_obj): """ Registers a new Algorithm for use when creating and verifying tokens. """ if alg_id in self._algorithms: raise ValueError('Algorithm already has a handler.') if not isinstance(alg_obj, Algorithm): raise TypeError('Object is not of type `Algorithm`') self._algorithms[alg_id] = alg_obj self._valid_algs.add(alg_id) def unregister_algorithm(self, alg_id): """ Unregisters an Algorithm for use when creating and verifying tokens Throws KeyError if algorithm is not registered. """ if alg_id not in self._algorithms: raise KeyError('The specified algorithm could not be removed' ' because it is not registered.') del self._algorithms[alg_id] self._valid_algs.remove(alg_id) def get_algorithms(self): """ Returns a list of supported values for the 'alg' parameter. """ return list(self._valid_algs) def encode(self, payload, # type: Union[Dict, bytes] key, # type: str algorithm='HS256', # type: str headers=None, # type: Optional[Dict] json_encoder=None # type: Optional[Callable] ): segments = [] if algorithm is None: algorithm = 'none' if algorithm not in self._valid_algs: pass # Header header = {'typ': self.header_typ, 'alg': algorithm} if headers: self._validate_headers(headers) header.update(headers) json_header = force_bytes( json.dumps( header, separators=(',', ':'), cls=json_encoder ) ) segments.append(base64url_encode(json_header)) segments.append(base64url_encode(payload)) # Segments signing_input = b'.'.join(segments) try: alg_obj = self._algorithms[algorithm] key = alg_obj.prepare_key(key) signature = alg_obj.sign(signing_input, key) except KeyError: if not has_crypto and algorithm in requires_cryptography: raise NotImplementedError( "Algorithm '%s' could not be found. Do you have cryptography " "installed?" % algorithm ) else: raise NotImplementedError('Algorithm not supported') segments.append(base64url_encode(signature)) return b'.'.join(segments) def decode(self, jwt, # type: str key='', # type: str verify=True, # type: bool algorithms=None, # type: List[str] options=None, # type: Dict **kwargs): merged_options = merge_dict(self.options, options) verify_signature = merged_options['verify_signature'] if verify_signature and not algorithms: warnings.warn( 'It is strongly recommended that you pass in a ' + 'value for the "algorithms" argument when calling decode(). ' + 'This argument will be mandatory in a future version.', DeprecationWarning ) payload, signing_input, header, signature = self._load(jwt) if not verify: warnings.warn('The verify parameter is deprecated. ' 'Please use verify_signature in options instead.', DeprecationWarning, stacklevel=2) elif verify_signature: self._verify_signature(payload, signing_input, header, signature, key, algorithms) return payload def get_unverified_header(self, jwt): """Returns back the JWT header parameters as a dict() Note: The signature is not verified so the header parameters should not be fully trusted until signature verification is complete """ headers = self._load(jwt)[2] self._validate_headers(headers) return headers def _load(self, jwt): if isinstance(jwt, text_type): jwt = jwt.encode('utf-8') if not issubclass(type(jwt), binary_type): raise DecodeError("Invalid token type. Token must be a {0}".format( binary_type)) try: signing_input, crypto_segment = jwt.rsplit(b'.', 1) header_segment, payload_segment = signing_input.split(b'.', 1) except ValueError: raise DecodeError('Not enough segments') try: header_data = base64url_decode(header_segment) except (TypeError, binascii.Error): raise DecodeError('Invalid header padding') try: header = json.loads(header_data.decode('utf-8')) except ValueError as e: raise DecodeError('Invalid header string: %s' % e) if not isinstance(header, Mapping): raise DecodeError('Invalid header string: must be a json object') try: payload = base64url_decode(payload_segment) except (TypeError, binascii.Error): raise DecodeError('Invalid payload padding') try: signature = base64url_decode(crypto_segment) except (TypeError, binascii.Error): raise DecodeError('Invalid crypto padding') return (payload, signing_input, header, signature) def _verify_signature(self, payload, signing_input, header, signature, key='', algorithms=None): alg = header.get('alg') if algorithms is not None and alg not in algorithms: raise InvalidAlgorithmError('The specified alg value is not allowed') try: alg_obj = self._algorithms[alg] key = alg_obj.prepare_key(key) if not alg_obj.verify(signing_input, key, signature): raise InvalidSignatureError('Signature verification failed') except KeyError: raise InvalidAlgorithmError('Algorithm not supported') def _validate_headers(self, headers): if 'kid' in headers: self._validate_kid(headers['kid']) def _validate_kid(self, kid): if not isinstance(kid, string_types): raise InvalidTokenError('Key ID header parameter must be a string') _jws_global_obj = PyJWS() encode = _jws_global_obj.encode decode = _jws_global_obj.decode register_algorithm = _jws_global_obj.register_algorithm unregister_algorithm = _jws_global_obj.unregister_algorithm get_unverified_header = _jws_global_obj.get_unverified_header