Server IP : 103.119.228.120 / Your IP : 18.119.163.95 Web Server : Apache System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64 User : nobody ( 99) PHP Version : 5.6.40 Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /lib/mysqlsh/lib/python3.9/concurrent/futures/ |
Upload File : |
# Copyright 2009 Brian Quinlan. All Rights Reserved. # Licensed to PSF under a Contributor Agreement. """Implements ThreadPoolExecutor.""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' from concurrent.futures import _base import itertools import queue import threading import types import weakref import os _threads_queues = weakref.WeakKeyDictionary() _shutdown = False # Lock that ensures that new workers are not created while the interpreter is # shutting down. Must be held while mutating _threads_queues and _shutdown. _global_shutdown_lock = threading.Lock() def _python_exit(): global _shutdown with _global_shutdown_lock: _shutdown = True items = list(_threads_queues.items()) for t, q in items: q.put(None) for t, q in items: t.join() # Register for `_python_exit()` to be called just before joining all # non-daemon threads. This is used instead of `atexit.register()` for # compatibility with subinterpreters, which no longer support daemon threads. # See bpo-39812 for context. threading._register_atexit(_python_exit) # At fork, reinitialize the `_global_shutdown_lock` lock in the child process if hasattr(os, 'register_at_fork'): os.register_at_fork(before=_global_shutdown_lock.acquire, after_in_child=_global_shutdown_lock._at_fork_reinit, after_in_parent=_global_shutdown_lock.release) class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn self.args = args self.kwargs = kwargs def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as exc: self.future.set_exception(exc) # Break a reference cycle with the exception 'exc' self = None else: self.future.set_result(result) __class_getitem__ = classmethod(types.GenericAlias) def _worker(executor_reference, work_queue, initializer, initargs): if initializer is not None: try: initializer(*initargs) except BaseException: _base.LOGGER.critical('Exception in initializer:', exc_info=True) executor = executor_reference() if executor is not None: executor._initializer_failed() return try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() # Delete references to object. See issue16284 del work_item # attempt to increment idle count executor = executor_reference() if executor is not None: executor._idle_semaphore.release() del executor continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Flag the executor as shutting down as early as possible if it # is not gc-ed yet. if executor is not None: executor._shutdown = True # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) class BrokenThreadPool(_base.BrokenExecutor): """ Raised when a worker thread in a ThreadPoolExecutor failed initializing. """ class ThreadPoolExecutor(_base.Executor): # Used to assign unique thread names when thread_name_prefix is not supplied. _counter = itertools.count().__next__ def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer. """ if max_workers is None: # ThreadPoolExecutor is often used to: # * CPU bound task which releases GIL # * I/O bound task (which releases GIL, of course) # # We use cpu_count + 4 for both types of tasks. # But we limit it to 32 to avoid consuming surprisingly large resource # on many core machine. max_workers = min(32, (os.cpu_count() or 1) + 4) if max_workers <= 0: raise ValueError("max_workers must be greater than 0") if initializer is not None and not callable(initializer): raise TypeError("initializer must be a callable") self._max_workers = max_workers self._work_queue = queue.SimpleQueue() self._idle_semaphore = threading.Semaphore(0) self._threads = set() self._broken = False self._shutdown = False self._shutdown_lock = threading.Lock() self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter())) self._initializer = initializer self._initargs = initargs def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock, _global_shutdown_lock: if self._broken: raise BrokenThreadPool(self._broken) if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') if _shutdown: raise RuntimeError('cannot schedule new futures after ' 'interpreter shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): # if idle threads are available, don't spin new threads if self._idle_semaphore.acquire(timeout=0): return # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) num_threads = len(self._threads) if num_threads < self._max_workers: thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue, self._initializer, self._initargs)) t.start() self._threads.add(t) _threads_queues[t] = self._work_queue def _initializer_failed(self): with self._shutdown_lock: self._broken = ('A thread initializer failed, the thread pool ' 'is not usable anymore') # Drain work queue and mark pending futures failed while True: try: work_item = self._work_queue.get_nowait() except queue.Empty: break if work_item is not None: work_item.future.set_exception(BrokenThreadPool(self._broken)) def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: self._shutdown = True if cancel_futures: # Drain all work items from the queue, and then cancel their # associated futures. while True: try: work_item = self._work_queue.get_nowait() except queue.Empty: break if work_item is not None: work_item.future.cancel() # Send a wake-up to prevent threads calling # _work_queue.get(block=True) from permanently blocking. self._work_queue.put(None) if wait: for t in self._threads: t.join() shutdown.__doc__ = _base.Executor.shutdown.__doc__