一种情景
虽然互斥锁可以解决资源竞争的问题,但如果有以下一种场景:生产者并非持续生产,而是在某一段时间内不生产,此阶段消费者还是会不停的竞争锁,然后发现此时并不需要消费,再释放锁,再竞争获取锁,再释放,会产生服务器资源的消耗。
使用条件变量
此时可以引入条件变量,当条件变量满足的时候,通知其他线程进行竞争锁,否则让其他线程进入睡眠状态,这样在条件变量不符的情况下,就不会出现持续竞争的情形了,降低了服务器资源的消耗。
python代码实现condition功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import threading import time import logging
logging.basicConfig(level=logging.DEBUG, format="(%(threadName)s) %(message)s',")
def producer(con: threading.Condition): logging.debug("Producer thread started") with con: logging.debug("Making resource available, sleep 5s") time.sleep(5) logging.debug("Notify all the consumers") con.notify_all()
def consumer(con: threading.Condition): logging.debug("Consumer thread started") with con: logging.debug("Consumer waiting") con.wait() logging.debug('Consumer consumed the resource')
if __name__ == '__main__': condition = threading.Condition()
pd = threading.Thread(name='producer', target=producer, args=(condition,))
cs_list = [] cs_num = 5 for i in range(cs_num): cs_list.append(threading.Thread(name='consumer_{}'.format(i), target=consumer, args=(condition,)))
for c in cs_list: c.start() time.sleep(1)
pd.start()
|
condition结合实际情况的用途
- 引入一个公共变量。
- 该公共变量作为condition的判断依据
- producer判断是否符合condition,如果符合,则唤醒其他进程,此时其他进程进行竞争,抢到锁的进行处理,处理完后释放锁,没抢到锁的,继续进入睡眠状态。
- producer加锁操作变量,操作完后通知其他进程,然后解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| import threading import logging import time import random
logging.basicConfig(level=logging.DEBUG, format="(%(threadName)s) %(message)s',")
resource = [] sleep_time = 2
def add_resource(num): logging.debug("producer add item {} in resource".format(num)) time.sleep(sleep_time) resource.append(num)
def consume_resource(re): logging.debug("consume from resource") logging.debug("consume {}".format(re[0])) resource.remove(re[0])
def consumer(re, cn: threading.Condition): cn.acquire() while True: try: consume_resource(re) except: logging.debug("resource is empty") logging.debug("wait until be notified") cn.wait()
def producer(cn: threading.Condition): r = random.randint(2, 10) logging.debug("random num is {}".format(r))
for i in range(0, r): logging.debug("produce item, add {} in resource in 2s".format(i)) time.sleep(sleep_time)
logging.debug("try to get lock") cn.acquire()
try: add_resource(i) cn.notify_all() finally: cn.release()
if __name__ == '__main__': con = threading.Condition() pd = threading.Thread(name="producer", target=producer, args=(con,))
cs_list = [] cs_num = 3 for i in range(1, cs_num): cs_list.append(threading.Thread(name="cs_{}".format(i), target=consumer, args=(resource, con,)))
for c in cs_list: c.start() time.sleep(sleep_time)
pd.start()
|