python conditon and RLock

本文分析python中条件变量Conditon和可重入锁RLock的底层实现。完整的源代码部分在**/usr/lib/python3.10/threading.py**(Ubuntu 22.04,其他版本的Linux系统应该也类似)。

Condition

条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:

  • wait:一个线程等待条件变量的条件成立而挂起自身
  • notify:一个线程使得条件变量的条件成立而唤醒等待线程

我们可以理解为:一个线程通过wait等待某个条件成立,在这之前它自动挂起,直到另一个线程通过调用notify通知其条件已成立,解除挂起。python中实现条件变量的是一个Condition类,其源码注解中有对该类的说明:

1
2
3
4
5
6
7
8
9
10
"""Class that implements a condition variable.

A condition variable allows one or more threads to wait until they are
notified by another thread.

If the lock argument is given and not None, it must be a Lock or RLock
object, and it is used as the underlying lock. Otherwise, a new RLock object
is created and used as the underlying lock.

"""

init

即初始化一个条件变量,主要功能步骤为:

  • 接收一个可选参数lock,即用户给定的锁,也可以选择让condition自己准备一个锁,默认情况下condition会选择一个RLock。该锁一般称为underlying lock
  • export该锁成员的一些方法
  • 初始化一个deque类型的成员_waiters,当线程调用该条件变量的wait方法时会使用到。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def __init__(self, lock=None):
# 如果没有传入lock,则默认使用一个RLock
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
# 初始化一个deque
self._waiters = _deque()

wait

线程通过调用该方法,挂起自身直到被唤醒(其他线程调用notify)或超时(通过自己传入的timeout参数决定)。wait执行的步骤:

  • 获取一个锁(名为waiter),并acquire
  • 将其加入到condition的deque中
  • 尝试重新获取该锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.

If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.

This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.

When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof).

When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.

"""
# wait时必须尝试获取到_lock
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire() # 获取新分配的锁
self._waiters.append(waiter) # 入队_waiters
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:葡萄
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass

notify

线程通过notify,使得条件变量的条件成立,从而换醒因等待条件变量而挂起的线程。notify方法接受一个可选的参数n,指明唤醒的线程个数,其默认值为1。其执行步骤为:

  • 当指定的唤醒线程数目n不为0,取_waiters的队首元素(一个锁)waiter,并调用其release方法,唤醒等待该锁的线程,即将waiter入队的线程。并从队列中移除该锁。循环直至n为0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def notify(self, n=1):
"""Wake up one or more threads waiting on this condition, if any.

If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.

This method wakes up at most n of the threads waiting for the condition
variable; it is a no-op if no threads are waiting.

"""
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
waiters = self._waiters
while waiters and n > 0:
waiter = waiters[0] # 获取队列首元素并release
try:
waiter.release()
except RuntimeError:
# gh-92530: The previous call of notify() released the lock,
# but was interrupted before removing it from the queue.
# It can happen if a signal handler raises an exception,
# like CTRL+C which raises KeyboardInterrupt.
pass
else:
n -= 1
try:
waiters.remove(waiter) # 移除该锁
except ValueError:
pass

总结

python中的条件变量,通过一个互斥锁_lock以及一个等待队列_waiters,管理线程的wait和notify。每个线程调用wait和notify方法时,需要获取underlying lock,然后才能对_waiters进行操作。对于wait方法,其分配并获取一个锁,放入waiters队列中,并尝试再次获取该锁,从而阻塞;对于notify,其从waiters中出队指定的锁数,并release这些锁,唤醒将这些锁入队后再阻塞在锁上的线程。

RLock

即可重入锁,与一般锁不同的是:一个线程如果尝试获取其已经拥有的锁,则会阻塞造成死锁。而可重入锁可以避免此种情况,需要注意的是,可重入锁需要手动释放锁,且加锁次数和释放次数要相同。python中的可重入锁由**_RLock**实现,其源码中的注解如下:

1
2
3
4
5
6
7
8
"""This class implements reentrant lock objects.

A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it
again without blocking; the thread must release it once for each time it
has acquired it.

"""

init

初始化一个RLock,使用了三个成员:_block(锁本身)、_owner(锁的拥有者,初始为None)以及_count(获取锁的次数,recursion level)。

1
2
3
4
def __init__(self):
self._block = _allocate_lock()
self._owner = None
self._count = 0

acquire

尝试获取RLock,接受两个参数:blocking(默认为True)和timeout(默认为-1)。具体执行情况有:

  • 当线程已经获取了该锁,则递增recursion level然后返回。
  • 当其他线程拥有锁,则阻塞至获取锁或超时(timeout为正浮点数时)

源程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def acquire(self, blocking=True, timeout=-1):
"""Acquire a lock, blocking or non-blocking.

When invoked without arguments: if this thread already owns the lock,
increment the recursion level by one, and return immediately. Otherwise,
if another thread owns the lock, block until the lock is unlocked. Once
the lock is unlocked (not owned by any thread), then grab ownership, set
the recursion level to one, and return. If more than one thread is
blocked waiting until the lock is unlocked, only one at a time will be
able to grab ownership of the lock. There is no return value in this
case.

When invoked with the blocking argument set to true, do the same thing
as when called without arguments, and return true.

When invoked with the blocking argument set to false, do not block. If a
call without an argument would block, return false immediately;
otherwise, do the same thing as when called without arguments, and
return true.

When invoked with the floating-point timeout argument set to a positive
value, block for at most the number of seconds specified by timeout
and as long as the lock cannot be acquired. Return true if the lock has
been acquired, false if the timeout has elapsed.

"""
me = get_ident()
if self._owner == me: # 如果已经拥有该锁,递增_count
self._count += 1
return 1
rc = self._block.acquire(blocking, timeout) # 尝试获取锁
if rc:
self._owner = me
self._count = 1

release

拥有锁的线程调用release递减获取的次数,次数递减至0则释放锁。只有拥有锁的线程可以调用,否则会触发运行时异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def release(self):
"""Release a lock, decrementing the recursion level.

If after the decrement it is zero, reset the lock to unlocked (not owned
by any thread), and if any other threads are blocked waiting for the
lock to become unlocked, allow exactly one of them to proceed. If after
the decrement the recursion level is still nonzero, the lock remains
locked and owned by the calling thread.

Only call this method when the calling thread owns the lock. A
RuntimeError is raised if this method is called when the lock is
unlocked.

There is no return value.

"""
if self._owner != get_ident(): # 检查是否当前线程是锁的拥有者
raise RuntimeError("cannot release un-acquired lock")
self._count = count = self._count - 1 # 递减count
if not count: # count为0则释放锁
self._owner = None
self._block.release()