"""Class that implements a condition variable. A condition variable allows one or more threads to wait until they are notified by another thread. If the lock argument is given and not None, it must be a Lock or RLock object, and it is used as the underlying lock. Otherwise, a new RLock object is created and used as the underlying lock. """
defwait(self, timeout=None): """Wait until notified or until a timeout occurs. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. This method releases the underlying lock, and then blocks until it is awakened by a notify() or notify_all() call for the same condition variable in another thread, or until the optional timeout occurs. Once awakened or timed out, it re-acquires the lock and returns. When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). When the underlying lock is an RLock, it is not released using its release() method, since this may not actually unlock the lock when it was acquired multiple times recursively. Instead, an internal interface of the RLock class is used, which really unlocks it even when it has been recursively acquired several times. Another internal interface is then used to restore the recursion level when the lock is reacquired. """ # wait时必须尝试获取到_lock ifnot self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() # 获取新分配的锁 self._waiters.append(waiter) # 入队_waiters saved_state = self._release_save() gotit = False try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout isNone: waiter.acquire() gotit = True else: if timeout > 0: gotit = waiter.acquire(True, timeout) else:葡萄 gotit = waiter.acquire(False) return gotit finally: self._acquire_restore(saved_state) ifnot gotit: try: self._waiters.remove(waiter) except ValueError: pass
defnotify(self, n=1): """Wake up one or more threads waiting on this condition, if any. If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised. This method wakes up at most n of the threads waiting for the condition variable; it is a no-op if no threads are waiting. """ ifnot self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") waiters = self._waiters while waiters and n > 0: waiter = waiters[0] # 获取队列首元素并release try: waiter.release() except RuntimeError: # gh-92530: The previous call of notify() released the lock, # but was interrupted before removing it from the queue. # It can happen if a signal handler raises an exception, # like CTRL+C which raises KeyboardInterrupt. pass else: n -= 1 try: waiters.remove(waiter) # 移除该锁 except ValueError: pass
"""This class implements reentrant lock objects. A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it. """
defacquire(self, blocking=True, timeout=-1): """Acquire a lock, blocking or non-blocking. When invoked without arguments: if this thread already owns the lock, increment the recursion level by one, and return immediately. Otherwise, if another thread owns the lock, block until the lock is unlocked. Once the lock is unlocked (not owned by any thread), then grab ownership, set the recursion level to one, and return. If more than one thread is blocked waiting until the lock is unlocked, only one at a time will be able to grab ownership of the lock. There is no return value in this case. When invoked with the blocking argument set to true, do the same thing as when called without arguments, and return true. When invoked with the blocking argument set to false, do not block. If a call without an argument would block, return false immediately; otherwise, do the same thing as when called without arguments, and return true. When invoked with the floating-point timeout argument set to a positive value, block for at most the number of seconds specified by timeout and as long as the lock cannot be acquired. Return true if the lock has been acquired, false if the timeout has elapsed. """ me = get_ident() if self._owner == me: # 如果已经拥有该锁,递增_count self._count += 1 return1 rc = self._block.acquire(blocking, timeout) # 尝试获取锁 if rc: self._owner = me self._count = 1
defrelease(self): """Release a lock, decrementing the recursion level. If after the decrement it is zero, reset the lock to unlocked (not owned by any thread), and if any other threads are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed. If after the decrement the recursion level is still nonzero, the lock remains locked and owned by the calling thread. Only call this method when the calling thread owns the lock. A RuntimeError is raised if this method is called when the lock is unlocked. There is no return value. """ if self._owner != get_ident(): # 检查是否当前线程是锁的拥有者 raise RuntimeError("cannot release un-acquired lock") self._count = count = self._count - 1# 递减count ifnot count: # count为0则释放锁 self._owner = None self._block.release()