Server IP : 103.119.228.120 / Your IP : 13.59.87.145 Web Server : Apache System : Linux v8.techscape8.com 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64 User : nobody ( 99) PHP Version : 5.6.40 Disable Function : shell_exec,symlink,system,exec,proc_get_status,proc_nice,proc_terminate,define_syslog_variables,syslog,openlog,closelog,escapeshellcmd,passthru,ocinum cols,ini_alter,leak,listen,chgrp,apache_note,apache_setenv,debugger_on,debugger_off,ftp_exec,dl,dll,myshellexec,proc_open,socket_bind,proc_close,escapeshellarg,parse_ini_filepopen,fpassthru,exec,passthru,escapeshellarg,escapeshellcmd,proc_close,proc_open,ini_alter,popen,show_source,proc_nice,proc_terminate,proc_get_status,proc_close,pfsockopen,leak,apache_child_terminate,posix_kill,posix_mkfifo,posix_setpgid,posix_setsid,posix_setuid,dl,symlink,shell_exec,system,dl,passthru,escapeshellarg,escapeshellcmd,myshellexec,c99_buff_prepare,c99_sess_put,fpassthru,getdisfunc,fx29exec,fx29exec2,is_windows,disp_freespace,fx29sh_getupdate,fx29_buff_prepare,fx29_sess_put,fx29shexit,fx29fsearch,fx29ftpbrutecheck,fx29sh_tools,fx29sh_about,milw0rm,imagez,sh_name,myshellexec,checkproxyhost,dosyayicek,c99_buff_prepare,c99_sess_put,c99getsource,c99sh_getupdate,c99fsearch,c99shexit,view_perms,posix_getpwuid,posix_getgrgid,posix_kill,parse_perms,parsesort,view_perms_color,set_encoder_input,ls_setcheckboxall,ls_reverse_all,rsg_read,rsg_glob,selfURL,dispsecinfo,unix2DosTime,addFile,system,get_users,view_size,DirFiles,DirFilesWide,DirPrintHTMLHeaders,GetFilesTotal,GetTitles,GetTimeTotal,GetMatchesCount,GetFileMatchesCount,GetResultFiles,fs_copy_dir,fs_copy_obj,fs_move_dir,fs_move_obj,fs_rmdir,SearchText,getmicrotime MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib64/python2.7/multiprocessing/ |
Upload File : |
# # Module implementing synchronization primitives # # multiprocessing/synchronize.py # # Copyright (c) 2006-2008, R Oudkerk # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of author nor the names of any contributors may be # used to endorse or promote products derived from this software # without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # __all__ = [ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' ] import threading import os import sys from time import time as _time, sleep as _sleep import _multiprocessing from multiprocessing.process import current_process from multiprocessing.util import Finalize, register_after_fork, debug from multiprocessing.forking import assert_spawning, Popen # Try to import the mp.synchronize module cleanly, if it fails # raise ImportError for platforms lacking a working sem_open implementation. # See issue 3770 try: from _multiprocessing import SemLock except (ImportError): raise ImportError("This platform lacks a functioning sem_open" + " implementation, therefore, the required" + " synchronization primitives needed will not" + " function, see issue 3770.") # # Constants # RECURSIVE_MUTEX, SEMAPHORE = range(2) SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX # # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` # class SemLock(object): def __init__(self, kind, value, maxvalue): sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) debug('created semlock with handle %s' % sl.handle) self._make_methods() if sys.platform != 'win32': def _after_fork(obj): obj._semlock._after_fork() register_after_fork(self, _after_fork) def _make_methods(self): self.acquire = self._semlock.acquire self.release = self._semlock.release def __enter__(self): return self._semlock.__enter__() def __exit__(self, *args): return self._semlock.__exit__(*args) def __getstate__(self): assert_spawning(self) sl = self._semlock return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) def __setstate__(self, state): self._semlock = _multiprocessing.SemLock._rebuild(*state) debug('recreated blocker with handle %r' % state[0]) self._make_methods() # # Semaphore # class Semaphore(SemLock): def __init__(self, value=1): SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX) def get_value(self): return self._semlock._get_value() def __repr__(self): try: value = self._semlock._get_value() except Exception: value = 'unknown' return '<Semaphore(value=%s)>' % value # # Bounded semaphore # class BoundedSemaphore(Semaphore): def __init__(self, value=1): SemLock.__init__(self, SEMAPHORE, value, value) def __repr__(self): try: value = self._semlock._get_value() except Exception: value = 'unknown' return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \ (value, self._semlock.maxvalue) # # Non-recursive lock # class Lock(SemLock): def __init__(self): SemLock.__init__(self, SEMAPHORE, 1, 1) def __repr__(self): try: if self._semlock._is_mine(): name = current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name elif self._semlock._get_value() == 1: name = 'None' elif self._semlock._count() > 0: name = 'SomeOtherThread' else: name = 'SomeOtherProcess' except Exception: name = 'unknown' return '<Lock(owner=%s)>' % name # # Recursive lock # class RLock(SemLock): def __init__(self): SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) def __repr__(self): try: if self._semlock._is_mine(): name = current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name count = self._semlock._count() elif self._semlock._get_value() == 1: name, count = 'None', 0 elif self._semlock._count() > 0: name, count = 'SomeOtherThread', 'nonzero' else: name, count = 'SomeOtherProcess', 'nonzero' except Exception: name, count = 'unknown', 'unknown' return '<RLock(%s, %s)>' % (name, count) # # Condition variable # class Condition(object): def __init__(self, lock=None): self._lock = lock or RLock() self._sleeping_count = Semaphore(0) self._woken_count = Semaphore(0) self._wait_semaphore = Semaphore(0) self._make_methods() def __getstate__(self): assert_spawning(self) return (self._lock, self._sleeping_count, self._woken_count, self._wait_semaphore) def __setstate__(self, state): (self._lock, self._sleeping_count, self._woken_count, self._wait_semaphore) = state self._make_methods() def __enter__(self): return self._lock.__enter__() def __exit__(self, *args): return self._lock.__exit__(*args) def _make_methods(self): self.acquire = self._lock.acquire self.release = self._lock.release def __repr__(self): try: num_waiters = (self._sleeping_count._semlock._get_value() - self._woken_count._semlock._get_value()) except Exception: num_waiters = 'unkown' return '<Condition(%s, %s)>' % (self._lock, num_waiters) def wait(self, timeout=None): assert self._lock._semlock._is_mine(), \ 'must acquire() condition before using wait()' # indicate that this thread is going to sleep self._sleeping_count.release() # release lock count = self._lock._semlock._count() for i in xrange(count): self._lock.release() try: # wait for notification or timeout self._wait_semaphore.acquire(True, timeout) finally: # indicate that this thread has woken self._woken_count.release() # reacquire lock for i in xrange(count): self._lock.acquire() def notify(self): assert self._lock._semlock._is_mine(), 'lock is not owned' assert not self._wait_semaphore.acquire(False) # to take account of timeouts since last notify() we subtract # woken_count from sleeping_count and rezero woken_count while self._woken_count.acquire(False): res = self._sleeping_count.acquire(False) assert res if self._sleeping_count.acquire(False): # try grabbing a sleeper self._wait_semaphore.release() # wake up one sleeper self._woken_count.acquire() # wait for the sleeper to wake # rezero _wait_semaphore in case a timeout just happened self._wait_semaphore.acquire(False) def notify_all(self): assert self._lock._semlock._is_mine(), 'lock is not owned' assert not self._wait_semaphore.acquire(False) # to take account of timeouts since last notify*() we subtract # woken_count from sleeping_count and rezero woken_count while self._woken_count.acquire(False): res = self._sleeping_count.acquire(False) assert res sleepers = 0 while self._sleeping_count.acquire(False): self._wait_semaphore.release() # wake up one sleeper sleepers += 1 if sleepers: for i in xrange(sleepers): self._woken_count.acquire() # wait for a sleeper to wake # rezero wait_semaphore in case some timeouts just happened while self._wait_semaphore.acquire(False): pass # # Event # class Event(object): def __init__(self): self._cond = Condition(Lock()) self._flag = Semaphore(0) def is_set(self): self._cond.acquire() try: if self._flag.acquire(False): self._flag.release() return True return False finally: self._cond.release() def set(self): self._cond.acquire() try: self._flag.acquire(False) self._flag.release() self._cond.notify_all() finally: self._cond.release() def clear(self): self._cond.acquire() try: self._flag.acquire(False) finally: self._cond.release() def wait(self, timeout=None): self._cond.acquire() try: if self._flag.acquire(False): self._flag.release() else: self._cond.wait(timeout) if self._flag.acquire(False): self._flag.release() return True return False finally: self._cond.release()