2.9. 并发编程

2.9.1. 启动与停止线程


import time

def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()


class CountDown(Thread):
    def __init__(self, n: int) -> None:
        super().__init__()
        self.n = n

    def run(self) -> None:
        while self.n > 0:
            print('T-minus', self.n)
            self.n -= 1
            time.sleep(5)

c = CountDown(5)
c.start()

2.9.2. 判断线程是否已经启动


import time
from threading import Thread, Event

def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()


class CountDown(Thread):
    def __init__(self, n: int,event) -> None:
        super().__init__()
        self.n = n

    def run(self) -> None:
        print('Countdown is running 1')
        event.set()
        while self.n > 0:
            print('T-minus', self.n)
            self.n -= 1
            time.sleep(5)

event = Event()
c = CountDown(5,event)
c.start()

event.wait()
print('Countdown is running 2 ')

2.9.3. 线程间通信

import random
import time 
def producer(q):
    while True:
        q.put(random.randint(0, 1000))
        time.sleep(0.5)

def consumer(q):
    while True:
        print(q.get())
        time.sleep(0.5)

if __name__ == '__main__':
    from queue import Queue
    q = Queue()
    from threading import Thread
    t1 = Thread(target=consumer, args=(q,))
    t2 = Thread(target=producer, args=(q,))
    t1.start()
    t2.start()
    

2.9.4. 线程访问加锁

from threading import Lock
class ShareCount(object):
    def __init__(self,start_cnt) -> None:
        self.start_cnt = start_cnt
        self.lock = Lock()

    def inc(self) -> None:
        with self.lock:
            self.start_cnt += 1

    def dec(self) -> None:
        self.lock.acquire()
        self.start_cnt -= 1
        self.lock.release()

    def get(self) -> int:
       
        return self.start_cnt
        

2.9.5. 防止死锁的加锁机制

可以对资源进行排序,加锁顺序按照顺序来申请。 避免交叉等待。

2.9.6. 保存线程的状态信息

每个 threading.local() 实例为每个线程维护着一个单独的实例字典。 所有普通实例操作比如获取、修改和删除值仅仅操作这个字典。 每个线程使用一个独立的字典就可以保证数据的隔离了。

2.9.7. 创建一个线程池

from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_url(url):
    u = requests.get(url)
    return u.text

pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')

# Get the results back
x = a.result()
y = b.result()

print(x,y)

2.9.8. 定义一个Actor任务

from queue import Queue
from threading import Event, Thread

class ActorExit(Exception):
    pass
class Actor(object):
    def __init__(self) -> None:
        self.queue = Queue()
        self._terminated = Event ()
        self.task = None

    def close(self):
        self.queue.put(ActorExit)

    def send(self, msg):
        self.queue.put(msg)

    def recv(self):
        msg = self.queue.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg

    def start(self):
        # self.event.set()
        self.task = Thread(target=self._bootstrap)
        self.task.daemon = True
        self.task.start()
        pass

    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()
                

    def run(self):
        raise NotImplementedError

    def join(self):
        self._terminated.wait()

class PrintActor(Actor):
    def run(self):
        while True:
            msg = self.recv()
            print('Got:', msg)


p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()

2.9.9. 实现消息发布订阅模式

from collections import defaultdict
from contextlib import contextmanager
class Exchange(object):
    def __init__(self) -> None:
        self.subscribers = set()

    def attach(self,task):
        self.subscribers.add(task)

    def detach(self,task):
        self.subscribers.remove(task)

    def send(self,msg):
        for subscriber in self.subscribers:
            subscriber.send(msg)

    def attach_tasks(self,*tasks):
        for task in tasks:
            self.attach(task)
    def detach_tasks(self,*tasks):
        for task in tasks:
            self.detach(task)
    def subscribe(self,task):
        self.attach(task)
    
    @contextmanager
    def subscribe_tasks(self,*tasks):
        self.attach_tasks(*tasks)
        try:
            yield
        finally:
            self.detach_tasks(*tasks)

    def clear(self):
        self.subscribers.clear()
    def get_subs(self): 
        return self.subscribers

_exchanges = defaultdict(Exchange)
def get_exchange(name):
    return _exchanges[name]

class Task:
    def send(self, msg):
        print(msg)

d = get_exchange('d')
t1 = Task()
t2 = Task()
d.attach(t1)
d.attach(t2)
d.send('hello')
d.send('world')
d.detach(t1)
print(d.get_subs())
d.clear()
print(d.get_subs())


with d.subscribe_tasks(t1,t2):
    d.send('hello')
    d.send('world')

print(d.get_subs())



2.9.10. 使用生成器代替线程

from collections import deque
def count_down(n):
    while n > 0:
        print('T-minus', n)
        yield
        n -= 1
def count_up(n):
    x = 0
    while x < n:
        print('Counting up', x)
        yield
        x += 1

class TaskScheduler:
    def __init__(self) -> None:
        self.tasks =deque()

    def new_task(self,task):
        self.tasks.append(task)

    def run(self):
        while self.tasks:
            task = self.tasks.popleft()
            try:
                next(task)
                self.tasks.append(task)
            except StopIteration:
                pass

sched = TaskScheduler()
sched.new_task(count_down(10))
sched.new_task(count_down(5))
sched.new_task(count_up(15))
sched.run()

2.9.11. 多个线程队列轮询

一个套接字被传给 select() 或类似的一个轮询数据到达的函数