0%

互斥锁条件变量

一种情景

虽然互斥锁可以解决资源竞争的问题,但如果有以下一种场景:生产者并非持续生产,而是在某一段时间内不生产,此阶段消费者还是会不停的竞争锁,然后发现此时并不需要消费,再释放锁,再竞争获取锁,再释放,会产生服务器资源的消耗。

使用条件变量

此时可以引入条件变量,当条件变量满足的时候,通知其他线程进行竞争锁,否则让其他线程进入睡眠状态,这样在条件变量不符的情况下,就不会出现持续竞争的情形了,降低了服务器资源的消耗。

python代码实现condition功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format="(%(threadName)s) %(message)s',")


def producer(con: threading.Condition):
logging.debug("Producer thread started")
# with上下文,自动包含了acquire() and release()方法
with con:
logging.debug("Making resource available, sleep 5s")
# 等待5s。模拟生产
time.sleep(5)
logging.debug("Notify all the consumers")
# 唤醒consumer
con.notify_all()


def consumer(con: threading.Condition):
logging.debug("Consumer thread started")
with con:
logging.debug("Consumer waiting")
# wait()方法释放锁,并且进入阻塞等待状态,直到下一次被notify() 或者 notify_all().
con.wait()
logging.debug('Consumer consumed the resource')


if __name__ == '__main__':
# 实例化一个condition
condition = threading.Condition()

# producer线程创建
pd = threading.Thread(name='producer', target=producer, args=(condition,))

# consumer线程创建
cs_list = []
cs_num = 5
for i in range(cs_num):
cs_list.append(threading.Thread(name='consumer_{}'.format(i), target=consumer, args=(condition,)))

# consumer线程启动
for c in cs_list:
c.start()
time.sleep(1)

# producer线程启动
pd.start()

condition结合实际情况的用途

  1. 引入一个公共变量。
  2. 该公共变量作为condition的判断依据
  3. producer判断是否符合condition,如果符合,则唤醒其他进程,此时其他进程进行竞争,抢到锁的进行处理,处理完后释放锁,没抢到锁的,继续进入睡眠状态。
  4. producer加锁操作变量,操作完后通知其他进程,然后解锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import threading
import logging
import time
import random

logging.basicConfig(level=logging.DEBUG, format="(%(threadName)s) %(message)s',")

# resource为公共资源
resource = []
sleep_time = 2


# 定义producer 添加resource
def add_resource(num):
logging.debug("producer add item {} in resource".format(num))
time.sleep(sleep_time)
resource.append(num)


# 定义consumer 消费resource
def consume_resource(re):
logging.debug("consume from resource")
logging.debug("consume {}".format(re[0]))
resource.remove(re[0])


# 定义consumer
def consumer(re, cn: threading.Condition):
# 获取锁
cn.acquire()
while True:
# 如果消费失败,则调用wait方法,线程睡眠,且释放锁,等待被notify
try:
consume_resource(re)
except:
logging.debug("resource is empty")
logging.debug("wait until be notified")
cn.wait()


def producer(cn: threading.Condition):
r = random.randint(2, 10)
logging.debug("random num is {}".format(r))

for i in range(0, r):
logging.debug("produce item, add {} in resource in 2s".format(i))
time.sleep(sleep_time)

logging.debug("try to get lock")
# 获取锁
cn.acquire()

try:
# 尝试添加资源。并且通知所有其他线程
add_resource(i)
cn.notify_all()
# 无论如何最终都释放锁
finally:
cn.release()


if __name__ == '__main__':
con = threading.Condition()
pd = threading.Thread(name="producer", target=producer, args=(con,))

cs_list = []
cs_num = 3
for i in range(1, cs_num):
cs_list.append(threading.Thread(name="cs_{}".format(i), target=consumer, args=(resource, con,)))

for c in cs_list:
c.start()
time.sleep(sleep_time)

pd.start()

坚持原创技术分享,您的支持将鼓励我继续创作!