13518219792

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

创新互联Python教程:threading—-基于线程的并行

threading —- 基于线程的并行

源代码: Lib/threading.py


This module constructs higher-level threading interfaces on top of the lower level _thread module.

在 3.7 版更改: 这个模块曾经为可选项,但现在总是可用。

参见

concurrent.futures.ThreadPoolExecutor offers a higher level interface to push tasks to a background thread without blocking execution of the calling thread, while still being able to retrieve their results when needed.

queue provides a thread-safe interface for exchanging data between running threads.

asyncio offers an alternative approach to achieving task level concurrency without requiring the use of multiple operating system threads.

备注

在 python 2.x 系列中,此模块包含有某些方法和函数 camelCase 形式的名称。 它们在 Python 3.10 中已弃用,但为了与 Python 2.5 及更旧版本的兼容性而仍受到支持。

CPython 实现细节: 在 CPython 中,由于存在 全局解释器锁,同一时刻只有一个线程可以执行 Python 代码(虽然某些性能导向的库可能会去除此限制)。 如果你想让你的应用更好地利用多核心计算机的计算资源,推荐你使用 multiprocessing 或 concurrent.futures.ProcessPoolExecutor。 但是,如果你想要同时运行多个 I/O 密集型任务,则多线程仍然是一个合适的模型。

Availability: not Emscripten, not WASI.

This module does not work or is not available on WebAssembly platforms wasm32-emscripten and wasm32-wasi. See WebAssembly platforms for more information.

这个模块定义了以下函数:

threading.active_count()

返回当前存活的 Thread 对象的数量。 返回值与 enumerate() 所返回的列表长度一致。

函数 activeCount 是此函数的已弃用别名。

threading.current_thread()

返回当前对应调用者的控制线程的 Thread 对象。如果调用者的控制线程不是利用 threading 创建,会返回一个功能受限的虚拟线程对象。

函数 currentThread 是此函数的已弃用别名。

threading.excepthook(args, /)

处理由 Thread.run() 引发的未捕获异常。

args 参数具有以下属性:

如果 exc_type 为 SystemExit,则异常会被静默地忽略。 在其他情况下,异常将被打印到 sys.stderr。

如果此函数引发了异常,则会调用 sys.excepthook() 来处理它。

threading.excepthook() 可以被重载以控制由 Thread.run() 引发的未捕获异常的处理方式。

使用定制钩子存放 exc_value 可能会创建引用循环。 它应当在不再需要异常时被显式地清空以打破引用循环。

如果一个对象正在被销毁,那么使用自定义的钩子储存 thread 可能会将其复活。请在自定义钩子生效后避免储存 thread,以避免对象的复活。

参见

sys.excepthook() 处理未捕获的异常。

3.8 新版功能.

threading.__excepthook__

保存 threading.excepthook() 的原始值。 它被保存以便在原始值碰巧被已损坏或替代对象所替换的情况下可被恢复。

3.10 新版功能.

threading.get_ident()

返回当前线程的 “线程标识符”。它是一个非零的整数。它的值没有直接含义,主要是用作 magic cookie,比如作为含有线程相关数据的字典的索引。线程标识符可能会在线程退出,新线程创建时被复用。

3.3 新版功能.

threading.get_native_id()

返回内核分配给当前线程的原生集成线程 ID。 这是一个非负整数。 它的值可被用来在整个系统中唯一地标识这个特定线程(直到线程终结,在那之后该值可能会被 OS 回收再利用)。

可用性: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX。

3.8 新版功能.

threading.enumerate()

返回当前所有存活的 Thread 对象的列表。 该列表包括守护线程以及 current_thread() 创建的空线程。 它不包括已终结的和尚未开始的线程。 但是,主线程将总是结果的一部分,即使是在已终结的时候。

threading.main_thread()

返回主 Thread 对象。一般情况下,主线程是Python解释器开始时创建的线程。

3.4 新版功能.

threading.settrace(func)

为所有 threading 模块开始的线程设置追踪函数。在每个线程的 run() 方法被调用前,func 会被传递给 sys.settrace() 。

threading.gettrace()

返回由 settrace() 设置的跟踪函数。

3.10 新版功能.

threading.setprofile(func)

为所有 threading 模块开始的线程设置性能测试函数。在每个线程的 run() 方法被调用前,func 会被传递给 sys.setprofile() 。

threading.getprofile()

返回由 setprofile() 设置的性能分析函数。

3.10 新版功能.

threading.stack_size([size])

返回创建线程时使用的堆栈大小。可选参数 size 指定之后新建的线程的堆栈大小,而且一定要是0(根据平台或者默认配置)或者最小是32,768(32KiB)的一个正整数。如果 size 没有指定,默认是0。如果不支持改变线程堆栈大小,会抛出 RuntimeError 错误。如果指定的堆栈大小不合法,会抛出 ValueError 错误并且不会修改堆栈大小。32KiB是当前最小的能保证解释器有足够堆栈空间的堆栈大小。需要注意的是部分平台对于堆栈大小会有特定的限制,例如要求大于32KiB的堆栈大小或者需要根据系统内存页面的整数倍进行分配 - 应当查阅平台文档有关详细信息(4KiB页面比较普遍,在没有更具体信息的情况下,建议的方法是使用4096的倍数作为堆栈大小)。

Availability: Windows, pthreads.

Unix platforms with POSIX threads support.

这个模块同时定义了以下常量:

threading.TIMEOUT_MAX

阻塞函数( Lock.acquire(), RLock.acquire(), Condition.wait(), …)中形参 timeout 允许的最大值。传入超过这个值的 timeout 会抛出 OverflowError 异常。

3.2 新版功能.

这个模块定义了许多类,详见以下部分。

该模块的设计基于 Java的线程模型。 但是,在Java里面,锁和条件变量是每个对象的基础特性,而在Python里面,这些被独立成了单独的对象。 Python 的 Thread 类只是 Java 的 Thread 类的一个子集;目前还没有优先级,没有线程组,线程还不能被销毁、停止、暂停、恢复或中断。 Java 的 Thread 类的静态方法在实现时会映射为模块级函数。

下述方法的执行都是原子性的。

线程本地数据

线程本地数据是特定线程的数据。管理线程本地数据,只需要创建一个 local (或者一个子类型)的实例并在实例中储存属性:

 
 
 
 
  1. mydata = threading.local()
  2. mydata.x = 1

在不同的线程中,实例的值会不同。

class threading.local

一个代表线程本地数据的类。

更多相关细节和大量示例,参见 _threading_local 模块的文档。

线程对象

Thread 类代表在独立控制线程运行的活动。有两种方式指定活动:传递一个可调用对象给构造函数或者在子类重载 run() 方法。其它方法不应该在子类被(除了构造函数)重载。换句话说,只能 重载这个类的 __init__() 和 run() 方法。

当线程对象一旦被创建,其活动必须通过调用线程的 start() 方法开始。 这会在独立的控制线程中发起调用 run() 方法。

一旦线程活动开始,该线程会被认为是 ‘存活的’ 。当它的 run() 方法终结了(不管是正常的还是抛出未被处理的异常),就不是’存活的’。 is_alive() 方法用于检查线程是否存活。

其他线程可以调用一个线程的 join() 方法。这会阻塞调用该方法的线程,直到被调用 join() 方法的线程终结。

线程有名字。名字可以传递给构造函数,也可以通过 name 属性读取或者修改。

如果 run() 方法引发了异常,则会调用 threading.excepthook() 来处理它。 在默认情况下,threading.excepthook() 会静默地忽略 SystemExit。

一个线程可以被标记成一个“守护线程”。 这个标识的意义是,当剩下的线程都是守护线程时,整个 Python 程序将会退出。 初始值继承于创建线程。 这个标识可以通过 daemon 特征属性或者 daemon 构造器参数来设置。

备注

守护线程在程序关闭时会突然关闭。他们的资源(例如已经打开的文档,数据库事务等等)可能没有被正确释放。如果你想你的线程正常停止,设置他们成为非守护模式并且使用合适的信号机制,例如: Event。

有个 “主线程” 对象;这对应Python程序里面初始的控制线程。它不是一个守护线程。

There is the possibility that “dummy thread objects” are created. These are thread objects corresponding to “alien threads”, which are threads of control started outside the threading module, such as directly from C code. Dummy thread objects have limited functionality; they are always considered alive and daemonic, and cannot be joined. They are never deleted, since it is impossible to detect the termination of alien threads.

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, **, daemon=None*)

应当始终使用关键字参数调用此构造函数。 参数如下:

group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。

target 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任何方法。

name 是线程名称。 在默认情况下,会以 “Thread-N“ 的形式构造唯一名称,其中 N 为一个较小的十进制数值,或是 “Thread-N (target)” 的形式,其中 “target” 为 target.__name__,如果指定了 target 参数的话。

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs 是用于调用目标函数的关键字参数字典。默认是 {}

如果不是 Nonedaemon 参数将显式地设置该线程是否为守护模式。 如果是 None (默认值),线程将继承当前线程的守护模式属性。

如果子类型重载了构造函数,它一定要确保在做任何事前,先发起调用基类构造器(Thread.__init__())。

在 3.10 版更改: 使用 target 名称,如果 name 参数被省略的话。

在 3.3 版更改: 加入 daemon 参数。

锁对象

原始锁是一个在锁定时不属于特定线程的同步基元组件。在Python中,它是能用的最低级的同步基元组件,由 _thread 扩展模块直接实现。

原始锁处于 “锁定” 或者 “非锁定” 两种状态之一。它被创建时为非锁定状态。它有两个基本方法, acquire() 和 release() 。当状态为非锁定时, acquire() 将状态改为 锁定 并立即返回。当状态是锁定时, acquire() 将阻塞至其他线程调用 release() 将其改为非锁定状态,然后 acquire() 调用重置其为锁定状态并返回。 release() 只在锁定状态下调用; 它将状态改为非锁定并立即返回。如果尝试释放一个非锁定的锁,则会引发 RuntimeError 异常。

锁同样支持 上下文管理协议。

当多个线程在 acquire() 等待状态转变为未锁定被阻塞,然后 release() 重置状态为未锁定时,只有一个线程能继续执行;至于哪个等待线程继续执行没有定义,并且会根据实现而不同。

所有方法的执行都是原子性的。

class threading.Lock

实现原始锁对象的类。一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。

需要注意的是 Lock 其实是一个工厂函数,返回平台支持的具体锁类中最有效的版本的实例。

递归锁对象

重入锁是一个可以被同一个线程多次获取的同步基元组件。在内部,它在基元锁的锁定/非锁定状态上附加了 “所属线程” 和 “递归等级” 的概念。在锁定状态下,某些线程拥有锁 ; 在非锁定状态下, 没有线程拥有它。

若要锁定锁,线程调用其 acquire() 方法;一旦线程拥有了锁,方法将返回。若要解锁,线程调用 release() 方法。 acquire()/release() 对可以嵌套;只有最终 release() (最外面一对的 release() ) 将锁解开,才能让其他线程继续处理 acquire() 阻塞。

递归锁也支持 上下文管理协议。

class threading.RLock

此类实现了重入锁对象。重入锁必须由获取它的线程释放。一旦线程获得了重入锁,同一个线程再次获取它将不阻塞;线程必须在每次获取它时释放一次。

需要注意的是 RLock 其实是一个工厂函数,返回平台支持的具体递归锁类中最有效的版本的实例。

条件对象

条件变量总是与某种类型的锁对象相关联,锁对象可以通过传入获得,或者在缺省的情况下自动创建。当多个条件变量需要共享同一个锁时,传入一个锁很有用。锁是条件对象的一部分,你不必单独地跟踪它。

条件变量遵循 上下文管理协议 :使用 with 语句会在它包围的代码块内获取关联的锁。 acquire() 和 release() 方法也能调用关联锁的相关方法。

其它方法必须在持有关联的锁的情况下调用。 wait() 方法释放锁,然后阻塞直到其它线程调用 notify() 方法或 notify_all() 方法唤醒它。一旦被唤醒, wait() 方法重新获取锁并返回。它也可以指定超时时间。

The notify() method wakes up one of the threads waiting for the condition variable, if any are waiting. The notify_all() method wakes up all threads waiting for the condition variable.

注意: notify() 方法和 notify_all() 方法并不会释放锁,这意味着被唤醒的线程不会立即从它们的 wait() 方法调用中返回,而是会在调用了 notify() 方法或 notify_all() 方法的线程最终放弃了锁的所有权后返回。

使用条件变量的典型编程风格是将锁用于同步某些共享状态的权限,那些对状态的某些特定改变感兴趣的线程,它们重复调用 wait() 方法,直到看到所期望的改变发生;而对于修改状态的线程,它们将当前状态改变为可能是等待者所期待的新状态后,调用 notify() 方法或者 notify_all() 方法。例如,下面的代码是一个通用的无限缓冲区容量的生产者-消费者情形:

 
 
 
 
  1. # Consume one item
  2. with cv:
  3. while not an_item_is_available():
  4. cv.wait()
  5. get_an_available_item()
  6. # Produce one item
  7. with cv:
  8. make_an_item_available()
  9. cv.notify()

使用 while 循环检查所要求的条件成立与否是有必要的,因为 wait() 方法可能要经过不确定长度的时间后才会返回,而此时导致 notify() 方法调用的那个条件可能已经不再成立。这是多线程编程所固有的问题。 wait_for() 方法可自动化条件检查,并简化超时计算。

 
 
 
 
  1. # Consume an item
  2. with cv:
  3. cv.wait_for(an_item_is_available)
  4. get_an_available_item()

选择 notify() 还是 notify_all() ,取决于一次状态改变是只能被一个还是能被多个等待线程所用。例如在一个典型的生产者-消费者情形中,添加一个项目到缓冲区只需唤醒一个消费者线程。

class threading.Condition(lock=None)

实现条件变量对象的类。一个条件变量对象允许一个或多个线程在被其它线程所通知之前进行等待。

如果给出了非 Nonelock 参数,则它必须为 Lock 或者 RLock 对象,并且它将被用作底层锁。否则,将会创建新的 RLock 对象,并将其用作底层锁。

在 3.3 版更改: 从工厂函数变为类。

信号量对象

这是计算机科学史上最古老的同步原语之一,早期的荷兰科学家 Edsger W. Dijkstra 发明了它。(他使用名称 P()V() 而不是 acquire() 和 release() )。

一个信号量管理一个内部计数器,该计数器因 acquire() 方法的调用而递减,因 release() 方法的调用而递增。 计数器的值永远不会小于零;当 acquire() 方法发现计数器为零时,将会阻塞,直到其它线程调用 release() 方法。

信号量对象也支持 上下文管理协议 。

class threading.Semaphore(value=1)

该类实现信号量对象。信号量对象管理一个原子性的计数器,代表 release() 方法的调用次数减去 acquire() 的调用次数再加上一个初始值。如果需要, acquire() 方法将会阻塞直到可以返回而不会使得计数器变成负数。在没有显式给出 value 的值时,默认为1。

可选参数 value 赋予内部计数器初始值,默认值为 1 。如果 value 被赋予小于0的值,将会引发 ValueError 异常。

在 3.3 版更改: 从工厂函数变为类。

class threading.BoundedSemaphore(value=1)

该类实现有界信号量。有界信号量通过检查以确保它当前的值不会超过初始值。如果超过了初始值,将会引发 ValueError 异常。在大多情况下,信号量用于保护数量有限的资源。如果信号量被释放的次数过多,则表明出现了错误。没有指定时, value 的值默认为1。

在 3.3 版更改: 从工厂函数变为类。

Semaphore 例子

信号量通常用于保护数量有限的资源,例如数据库服务器。在资源数量固定的任何情况下,都应该使用有界信号量。在生成任何工作线程前,应该在主线程中初始化信号量。

 
 
 
 
  1. maxconnections = 5
  2. # ...
  3. pool_sema = BoundedSemaphore(value=maxconnections)

工作线程生成后,当需要连接服务器时,这些线程将调用信号量的 acquire 和 release 方法:

 
 
 
 
  1. with pool_sema:
  2. conn = connectdb()
  3. try:
  4. # ... use connection ...
  5. finally:
  6. conn.close()

使用有界信号量能减少这种编程错误:信号量的释放次数多于其请求次数。

事件对象

这是线程之间通信的最简单机制之一:一个线程发出事件信号,而其他线程等待该信号。

一个事件对象管理一个内部标识,调用 set() 方法可将其设置为 true ,调用 clear() 方法可将其设置为 false ,调用 wait() 方法将进入阻塞直到标识为 true 。

class threading.Event

实现事件对象的类。事件对象管理一个内部标识,调用 set() 方法可将其设置为true。调用 clear() 方法可将其设置为 false 。调用 wait() 方法将进入阻塞直到标识为true。这个标识初始时为 false 。

在 3.3 版更改: 从工厂函数变为类。

定时器对象

此类表示一个操作应该在等待一定的时间之后运行 —- 相当于一个定时器。 Timer 类是 Thread 类的子类,因此可以像一个自定义线程一样工作。

与线程一样,通过调用 start() 方法启动定时器。而 cancel() 方法可以停止计时器(在计时结束前), 定时器在执行其操作之前等待的时间间隔可能与用户指定的时间间隔不完全相同。

例如:

 
 
 
 
  1. def hello():
  2. print("hello, world")
  3. t = Timer(30.0, hello)
  4. t.start() # after 30 seconds, "hello, world" will be printed

class threading.Timer(interval, function, args=None, kwargs=None)

创建一个定时器,在经过 interval 秒的间隔事件后,将会用参数 args 和关键字参数 kwargs 调用 function。如果 argsNone (默认值),则会使用一个空列表。如果 kwargsNone (默认值),则会使用一个空字典。

在 3.3 版更改: 从工厂函数变为类。

栅栏对象

3.2 新版功能.

栅栏类提供一个简单的同步原语,用于应对固定数量的线程需要彼此相互等待的情况。线程调用 wait() 方法后将阻塞,直到所有线程都调用了 wait() 方法。此时所有线程将被同时释放。

栅栏对象可以被多次使用,但进程的数量不能改变。

这是一个使用简便的方法实现客户端进程与服务端进程同步的例子:

 
 
 
 
  1. b = Barrier(2, timeout=5)
  2. def server():
  3. start_server()
  4. b.wait()
  5. while True:
  6. connection = accept_connection()
  7. process_server_connection(connection)
  8. def client():
  9. b.wait()
  10. while True:
  11. connection = make_connection()
  12. process_client_connection(connection)

class threading.Barrier(parties, action=None, timeout=None)

创建一个需要 parties 个线程的栅栏对象。如果提供了可调用的 action 参数,它会在所有线程被释放时在其中一个线程中自动调用。 timeout 是默认的超时时间,如果没有在 wait() 方法中指定超时时间的话。

exception threading.BrokenBarrierError

异常类,是 RuntimeError 异常的子类,在 Barrier 对象重置时仍有线程阻塞时和对象进入破损态时被引发。

with 语句中使用锁、条件和信号量

这个模块提供的带有 acquire()release() 方法的对象,可以被用作 with 语句的上下文管理器。当进入语句块时 acquire() 方法会被调用,退出语句块时 release() 会被调用。因此,以下片段:

 
 
 
 
  1. with some_lock:
  2. # do something...

相当于:

 
 
 
 
  1. some_lock.acquire()
  2. try:
  3. # do something...
  4. finally:
  5. some_lock.release()

现在 Lock 、 RLock 、 Condition 、 Semaphore 和 BoundedSemaphore 对象可以用作 with 语句的上下文管理器。


分享标题:创新互联Python教程:threading—-基于线程的并行
URL分享:http://cdbrznjsb.com/article/cohchhs.html

其他资讯

让你的专属顾问为你服务