13518219792

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

5分钟完全掌握Python协程

 1. 协程相关的概念

专注于为中小企业提供网站建设、做网站服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业徽县免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了上千家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

1.1 进程和线程

进程(Process)是应用程序启动的实例,拥有代码、数据和文件和独立的内存空间,是操作系统最小资源管理单元。每个进程下面有一个或者多个线程(Thread),来负责执行程序的计算,是最小的执行单元。

重点是:操作系统会负责进程的资源的分配;控制权主要在操作系统。另一方面,线程做为任务的执行单元,有新建、可运行runnable(调用start方法,进入调度池,等待获取cpu使用权)、运行running(得到cpu使用权开始执行程序) 阻塞blocked(放弃了cpu 使用权,再次等待) 死亡dead5中不同的状态。线程的转态也是由操作系统进行控制。线程如果存在资源共享的情况下,就需要加锁,比如生产者和消费者模式,生产者生产数据多共享队列,消费者从共享队列中消费数据。

线程和进程在得到和放弃cpu使用权时,cpu使用权的切换都需损耗性能,因为某个线程为了能够在再次获得cpu使用权时能继续执行任务,必须记住上一次执行的所有状态。另外线程还有锁的问题。

1.2 并行和并发

并行和并发,听起来都像是同时执行不同的任务。但是这个同时的含义是不一样的。

上面说的多核cpu可能同时执行,这里的可能是和操作系统调度有关,如果操作系统调度到同一个cpu,那就需要cpu进行上下文切换。当然多核情况下,操作系统调度会尽可能考虑不同cpu。

这里的上下文切换可以理解为需要保留不同执行任务的状态和数据。所有的并发处理都有排队等候,唤醒,执行至少三个这样的步骤

1.3 协程

我们知道线程的提出是为了能够在多核cpu的情况下,达到并行的目的。而且线程的执行完全是操作系统控制的。而协程(Coroutine)是线程下的,控制权在于用户,本质是为了能让多组过程能不独自占用完所有资源,在一个线程内交叉执行,达到高并发的目的。

协程的优势:

协程和线程区别:

我们姑且也过一遍这些文字上的概念,show your code的时候再联系起来,就会更清晰的。

2. python中的线程

python中的线程由于历史原因,即使在多核cpu的情况下并不能达真正的并行。这个原因就是全局解释器锁GIL(global interpreter lock),准确的说GIL不是python的特性,而是cpython引入的一个概念。cpython解释器在解析多线程时,会上GIL锁,保证同一时刻只有一个线程获取CPU使用权。

考虑下如果有两个线程A和B同时引用一个对象obj,这个时候obj的引用计数为2;A打算撤销对obj的引用,完成第一步时引用计数减去1时,这时发生了线程切换,A挂起等待,还没执行销毁对象操作。B进入运行状态,这个时候B也对obj撤销引用,并完成引用计数减1,销毁对象,这个时候obj的引用数为0,释放内存。如果此时A重新唤醒,要继续销毁对象,可是这个时候已经没有对象了。所以为了保证不出现数据污染,才引入GIL。

每个线程使用前都会去获取GIL权限,使用完释放GIL权限。释放线程的时机由python的另一个机制check_interval来决定。

在多核cpu时,因为需要获取和释放GIL锁,会存在性能上额外的损耗。特别是由于调度控制的原因,比如一个线程释放了锁,调度接着又分配cpu资源给同一个线程,该线程发起申请时,又重新获得GIL,而其他线程实际上都在等待,白白浪费了申请和释放锁的操作耗时。

python中的线程比较适合I/O密集型的操作(磁盘IO或者网络IO)。

 
 
 
 
  1. import os 
  2. import time 
  3. import sys 
  4. from concurrent import futures 
  5. def to_do(info):   
  6.     for i in range(100000000): 
  7.         pass 
  8.     return info[0] 
  9. MAX_WORKERS = 10 
  10. param_list = [] 
  11. for i in range(5): 
  12.     param_list.append(('text%s' % i, 'info%s' % i)) 
  13. workers = min(MAX_WORKERS, len(param_list)) 
  14. # with 默认会等所有任务都完成才返回,所以这里会阻塞 
  15. with futures.ThreadPoolExecutor(workers) as executor: 
  16.     results = executor.map(to_do, sorted(param_list)) 
  17. # 打印所有 
  18. for result in results: 
  19.     print(result) 
  20. # 非阻塞的方式,适合不需要返回结果的情况 
  21. workers = min(MAX_WORKERS, len(param_list)) 
  22. executor = futures.ThreadPoolExecutor(workers) 
  23. results = [] 
  24. for idx, param in enumerate(param_list): 
  25.     result = executor.submit(to_do, param) 
  26.     results.append(result) 
  27.     print('result %s' % idx) 
  28. # 手动等待所有任务完成 
  29. executor.shutdown() 
  30. print('='*10) 
  31. for result in results: 
  32.     print(result.result())

3. python中的进程

python提供的multiprocessing包来规避GIL的缺点,实现在多核cpu上并行的目的。multiprocessing还提供进程之间数据和内存共享的机制。这里介绍的concurrent.futures的实现。用法和线程基本一样,ThreadPoolExecutor改成ProcessPoolExecutor

 
 
 
 
  1. import os 
  2. import time 
  3. import sys 
  4. from concurrent import futures 
  5. def to_do(info):   
  6.     for i in range(10000000): 
  7.         pass 
  8.     return info[0] 
  9. start_time = time.time() 
  10. MAX_WORKERS = 10 
  11. param_list = [] 
  12. for i in range(5): 
  13.     param_list.append(('text%s' % i, 'info%s' % i)) 
  14. workers = min(MAX_WORKERS, len(param_list)) 
  15. # with 默认会等所有任务都完成才返回,所以这里会阻塞 
  16. with futures.ProcessPoolExecutor(workers) as executor: 
  17.     results = executor.map(to_do, sorted(param_list))   
  18.  # 打印所有 
  19. for result in results: 
  20.     print(result) 
  21. print(time.time()-start_time) 
  22. # 耗时0.3704512119293213s, 而线程版本需要14.935384511947632s

4. python中的协程

4.1 简单协程

我们先来看下python是怎么实现协程的。答案是yield。以下例子的功能是实现计算移动平均数

 
 
 
 
  1. from collections import namedtuple 
  2. Result = namedtuple('Result', 'count average') 
  3. # 协程函数 
  4. def averager(): 
  5.     total = 0.0 
  6.     count = 0 
  7.     average = None 
  8.     while True: 
  9.         term = yield None  # 暂停,等待主程序传入数据唤醒 
  10.         if term is None: 
  11.             break  # 决定是否退出 
  12.         total += term 
  13.         count += 1 
  14.         average = total/count # 累计状态,包括上一次的状态 
  15.     return Result(count, average) 
  16. # 协程的触发 
  17. coro_avg = averager() 
  18. # 预激活协程 
  19. next(coro_avg) 
  20. # 调用者给协程提供数据 
  21. coro_avg.send(10) 
  22. coro_avg.send(30) 
  23. coro_avg.send(6.5) 
  24. try: 
  25.     coro_avg.send(None) 
  26. except StopIteration as exc: # 执行完成,会抛出StopIteration异常,返回值包含在异常的属性value里 
  27.     result = exc.value 
  28. print(result)

yield关键字有两个含义:产出和让步;  把yield的右边的值产出给调用方,同时做出让步,暂停执行,让程序继续执行。

上面的例子可知

我们来回顾下1.3中协程的概念:本质是为了能让多组过程能不独自占用完所有资源,在一个线程内交叉执行,达到高并发的目的。。上面的例子怎么解释呢?

4.2 asyncio协程应用包

asyncio即异步I/O, 如在高并发(如百万并发)网络请求。异步I/O即你发起一个I/O操作不必等待执行结束,可以做其他事情。asyncio底层是协程的方式来实现的。我们先来看一个例子,了解下asyncio的五脏六腑。

 
 
 
 
  1. import time 
  2. import asyncio 
  3. now = lambda : time.time() 
  4. # async定义协程 
  5. async def do_some_work(x): 
  6.     print("waiting:",x) 
  7.     # await挂起阻塞, 相当于yield, 通常是耗时操作 
  8.     await asyncio.sleep(x) 
  9.     return "Done after {}s".format(x) 
  10. # 回调函数,和yield产出类似功能 
  11. def callback(future): 
  12.     print("callback:",future.result()) 
  13. start = now() 
  14. tasks = [] 
  15. for i in range(1, 4): 
  16.     # 定义多个协程,同时预激活 
  17.     coroutine = do_some_work(i) 
  18.     task = asyncio.ensure_future(coroutine) 
  19.     task.add_done_callback(callback) 
  20.     tasks.append(task) 
  21. # 定一个循环事件列表,把任务协程放在里面, 
  22. loop = asyncio.get_event_loop() 
  23. try:
  24.      # 异步执行协程,直到所有操作都完成, 也可以通过asyncio.gather来收集多个任务 
  25.     loop.run_until_complete(asyncio.wait(tasks)) 
  26.     for task in tasks: 
  27.         print("Task ret:",task.result()) 
  28. except KeyboardInterrupt as e: # 协程任务的状态控制 
  29.     print(asyncio.Task.all_tasks()) 
  30.     for task in asyncio.Task.all_tasks(): 
  31.         print(task.cancel()) 
  32.     loop.stop() 
  33.     loop.run_forever() 
  34. finally: 
  35.     loop.close() 
  36. print("Time:", now()-start)

上面涉及到的几个概念:

再来看一个http下载的例子,比如你想下载5个不同的url(同样的,你想接收外部的百万的请求)

 
 
 
 
  1. import time 
  2. import asyncio 
  3. from aiohttp import ClientSession 
  4. tasks = [] 
  5. url = "https://www.baidu.com/{}" 
  6. async def hello(url): 
  7.     async with ClientSession() as session:
  8.         async with session.get(url) as response: 
  9.             response = await response.read() 
  10. #            print(response) 
  11.             print('Hello World:%s' % time.time())
  12. if __name__ == '__main__': 
  13.     loop = asyncio.get_event_loop() 
  14.     for i in range(5): 
  15.         task = asyncio.ensure_future(hello(url.format(i))) 
  16.         tasks.append(task) 
  17.     loop.run_until_complete(asyncio.wait(tasks))

4.3 协程的应用场景

5. 总结

本文分享关于python协程的概念和asyncio包的初步使用情况,同时也介绍了基本的相关概念,如进程、线程、并发、并行等。希望对你有帮助,欢迎交流(@mintel)。简要总结如下:


当前文章:5分钟完全掌握Python协程
URL分享:http://cdbrznjsb.com/article/dpsgohd.html

其他资讯

让你的专属顾问为你服务