news 2026/3/28 4:54:46

30、Python 并发编程:线程、进程与守护进程全解析

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
30、Python 并发编程:线程、进程与守护进程全解析

Python 并发编程:线程、进程与守护进程全解析

1. 线程编程基础

在 Python 中,线程是实现并发的一种重要方式。以下是一个简单的线程池示例代码:

worker.start() #spawn pool of arping threads for i in range(num_arp_threads): worker = Thread(target=arping, args=(i, out_queue)) worker.setDaemon(True) worker.start() print "Main Thread Waiting" #ensures that program does not exit until both queues have been emptied in_queue.join() out_queue.join() print "Done"

运行这段代码,输出如下:

python2.5 ping_thread_basic_2.py Main Thread Waiting Thread 0: Pinging 10.0.1.1 Thread 1: Pinging 10.0.1.3 Thread 2: Pinging 10.0.1.11 Thread 0: Pinging 10.0.1.51 IP Address: 10.0.1.1 | Mac Address: [00:00:00:00:00:01] IP Address: 10.0.1.51 | Mac Address: [00:00:00:80:E8:02] IP Address: 10.0.1.3 | Mac Address: [00:00:00:07:E4:03] 10.0.1.11: did not respond Done

通过添加线程池和队列,我们对第一个示例的行为进行了扩展。使用队列模块可以让线程的使用更加简单和安全,这是一项非常重要的技术。

2. 线程的定时延迟

Python 的threading.Timer提供了一种方便的方式来实现线程内函数的定时执行。以下是示例代码:

#!/usr/bin/env python from threading import Timer import sys import time import copy #simple error handling if len(sys.argv) != 2: print "Must enter an interval" sys.exit(1) #our function that we will run def hello(): print "Hello, I just got called after a %s sec delay" % call_time #we spawn our time delayed thread here delay = sys.argv[1] call_time = copy.copy(delay) #we copy the delay to use later t = Timer(int(delay), hello) t.start() #we validate that we are not blocked, and that the main program continues print "waiting %s seconds to run function" % delay for x in range(int(delay)): print "Main program is still running for %s more sec" % delay delay = int(delay) - 1 time.sleep(1)

运行此代码,我们可以看到主线程会继续运行,而函数会在指定的延迟后执行:

[ngift@Macintosh-6][H:10468][J:0]# python thread_timer.py 5 waiting 5 seconds to run function Main program is still running for 5 more sec Main program is still running for 4 more sec Main program is still running for 3 more sec Main program is still running for 2 more sec Main program is still running for 1 more sec Hello, I just got called after a 5 sec delay
3. 线程事件处理程序

我们可以将延迟线程技术应用到实际场景中,例如监控两个目录的文件名变化。以下是一个线程化的目录同步工具示例:

#!/usr/bin/env python from threading import Timer import sys import time import copy import os from subprocess import call class EventLoopDelaySpawn(object): """An Event Loop Class That Spawns a Method in a Delayed Thread""" def __init__(self, poll=10, wait=1, verbose=True, dir1="/tmp/dir1", dir2="/tmp/dir2"): self.poll = int(poll) self.wait = int(wait) self.verbose = verbose self.dir1 = dir1 self.dir2 = dir2 def poller(self): """Creates Poll Interval""" time.sleep(self.poll) if self.verbose: print "Polling at %s sec interval" % self.poll def action(self): if self.verbose: print "waiting %s seconds to run Action" % self.wait ret = call("rsync -av --delete %s/ %s" % (self.dir1, self.dir2), shell=True) def eventHandler(self): #if two directories contain same file names if os.listdir(self.dir1) != os.listdir(self.dir2): print os.listdir(self.dir1) t = Timer((self.wait), self.action) t.start() if self.verbose: print "Event Registered" else: if self.verbose: print "No Event Registered" def run(self): """Runs an event loop with a delayed action method""" try: while True: self.eventHandler() self.poller() except Exception, err: print "Error: %s " % err finally: sys.exit(0) E = EventLoopDelaySpawn() E.run()

延迟机制虽然不是严格必需的,但它可以带来一些好处,例如在发现其他事件时取消线程操作。

4. 进程编程

线程并不是 Python 中处理并发的唯一方式,进程在某些方面具有优势。由于全局解释器锁(GIL)的存在,Python 中的线程在同一时间只能有一个真正运行,并且只能使用一个处理器。因此,在需要大量使用 CPU 的情况下,使用进程是更好的选择。

以下是进程的优缺点对比:
| 对比项 | 线程 | 进程 |
| ---- | ---- | ---- |
| 可扩展性 | 受 GIL 限制,难以扩展到多个处理器 | 可以扩展到多个处理器 |
| 资源共享 | 共享全局状态 | 完全独立,通信需要更多努力 |
| 适用场景 | 适合 I/O 密集型任务 | 适合 CPU 密集型任务 |

5. 处理模块

处理模块(processing module)是一个用于 Python 的包,它支持使用标准库的线程模块的 API 来生成进程。以下是一个处理模块的入门示例:

#!/usr/bin/env python from processing import Process, Queue import time def f(q): x = q.get() print "Process number %s, sleeps for %s seconds" % (x,x) time.sleep(x) print "Process number %s finished" % x q = Queue() for i in range(10): q.put(i) i = Process(target=f, args=[q]) i.start() print "main process joins on queue" i.join() print "Main Program finished"

运行此代码的输出如下:

[ngift@Macintosh-7][H:11199][J:0]# python processing1.py Process number 0, sleeps for 0 seconds Process number 0 finished Process number 1, sleeps for 1 seconds Process number 2, sleeps for 2 seconds Process number 3, sleeps for 3 seconds Process number 4, sleeps for 4 seconds main process joins on queue Process number 5, sleeps for 5 seconds Process number 6, sleeps for 6 seconds Process number 8, sleeps for 8 seconds Process number 7, sleeps for 7 seconds Process number 9, sleeps for 9 seconds Process number 1 finished Process number 2 finished Process number 3 finished Process number 4 finished Process number 5 finished Process number 6 finished Process number 7 finished Process number 8 finished Process number 9 finished Main Program finished

以下是该程序的执行流程图:

graph TD; A[初始化队列] --> B[将元素放入队列]; B --> C[创建并启动进程]; C --> D[主进程等待队列]; D --> E[进程从队列获取元素]; E --> F[进程执行任务]; F --> G[进程完成任务]; G --> H[主程序结束];

我们还可以使用处理模块实现基于进程的 ping 扫描,示例代码如下:

#!/usr/bin/env python from processing import Process, Queue, Pool import time import subprocess from IPy import IP import sys q = Queue() ips = IP("10.0.1.0/24") def f(i,q): while True: if q.empty(): sys.exit() print "Process Number: %s" % i ip = q.get() ret = subprocess.call("ping -c 1 %s" % ip, shell=True, stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT) if ret == 0: print "%s: is alive" % ip else: print "Process Number: %s didn’t find a response for %s " % (i, ip) for ip in ips: q.put(ip) #q.put("192.168.1.1") for i in range(50): p = Process(target=f, args=[i,q]) p.start() print "main process joins on queue" p.join() print "Main Program finished"

这个代码与之前的线程代码类似,但在处理模块中,每个进程会在一个无限循环中从队列中获取元素。当队列为空时,进程会退出。

6. Python 进程调度

在掌握了 Python 中处理进程的多种方式后,接下来探讨如何调度这些进程。使用传统的 cron 来运行 Python 进程是非常合适的。

许多 POSIX 系统中的 cron 有一个不错的新特性,即调度目录。现在我们通常使用这种方式,只需将 Python 脚本放入四个默认目录之一:/etc/cron.daily/etc/cron.hourly/etc/cron.monthly/etc/cron.weekly即可。

以往,很多系统管理员会编写传统的磁盘使用情况邮件脚本。例如,将一个 Bash 脚本放在/etc/cron.daily目录下,内容如下:

df -h | mail -s "Nightly Disk Usage Report" staff@example.com

不过,使用 Python 脚本会是更好的选择。以下是一个基于 cron 的磁盘报告邮件 Python 脚本示例:

import smtplib import subprocess import string p = subprocess.Popen("df -h", shell=True, stdout=subprocess.PIPE) MSG = p.stdout.read() FROM = "guru-python-sysadmin@example.com" TO = "staff@example.com" SUBJECT = "Nightly Disk Usage Report" msg = string.join(( "From: %s" % FROM, "To: %s" % TO, "Subject: %s" % SUBJECT, "", MSG), "\r\n") server = smtplib.SMTP('localhost') server.sendmail(FROM, TO, msg) server.quit()

这个脚本的操作步骤如下:
1. 使用subprocess.Popen读取df命令的标准输出。
2. 创建FromToSubject变量。
3. 将这些字符串连接起来创建邮件消息。
4. 设置发件 SMTP 服务器为本地主机,并将之前设置的变量传递给server.sendmail()

通常的使用方式是将该脚本放在/etc/cron.daily/nightly_disk_report.py目录下。对于 Python 新手来说,可以将此脚本作为模板代码,快速实现一些有趣的功能。

7. 守护进程化

在 Unix 系统中,处理守护进程是一项常见任务。守护进程通常被认为是在后台运行且没有控制终端的任务。很多人可能认为在命令末尾加&或者使用Ctrl - zbg命令可以将进程变成守护进程,但实际上这些操作只是将进程放到后台,并没有使进程脱离 shell 进程,也没有与控制终端分离。守护进程有三个特征:在后台运行、与启动它的进程分离、没有控制终端。普通的 shell 作业控制只能实现第一个特征。

以下是一个定义daemonize()函数的代码,它可以使调用代码成为守护进程,该代码来自相关的 Python 代码示例:

import sys, os def daemonize (stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): # Perform first fork. try: pid = os.fork( ) if pid > 0: sys.exit(0) # Exit first parent. except OSError, e: sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror)) sys.exit(1) # Decouple from parent environment. os.chdir("/") os.umask(0) os.setsid( ) # Perform second fork. try: pid = os.fork( ) if pid > 0: sys.exit(0) # Exit second parent. except OSError, e: sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror)) sys.exit(1) # The process is now daemonized, redirect standard file descriptors. for f in sys.stdout, sys.stderr: f.flush( ) si = file(stdin, 'r') so = file(stdout, 'a+') se = file(stderr, 'a+', 0) os.dup2(si.fileno( ), sys.stdin.fileno( )) os.dup2(so.fileno( ), sys.stdout.fileno( )) os.dup2(se.fileno( ), sys.stderr.fileno( ))

该函数的执行步骤如下:
1.第一次 fork:调用os.fork()后会有两个相同的进程运行。检查pid,如果pid为正,说明处于父进程,父进程退出。若出现异常,进程也会退出。
2.与父环境解耦
- 使用os.chdir("/")将工作目录更改为根目录/,确保守护进程在一个始终存在的目录中运行,避免影响文件系统的卸载。
- 使用os.umask(0)将文件模式创建掩码设置为最宽松,防止继承的掩码对守护进程创建文件的权限产生不良影响。
- 使用os.setsid()创建一个新的会话,使进程成为新会话的领导者和新进程组的领导者,并且没有控制终端,避免受到终端的作业控制影响。
3.第二次 fork:再次进行fork操作,使最终的进程不能成为会话领导者,进一步确保进程不会获取控制终端。
4.重定向标准文件描述符:刷新标准输出和标准错误,然后将标准输入、输出和错误重定向到指定的文件(默认是/dev/null)。

以下是一个使用守护进程化函数的示例:

from daemonize import daemonize import time import sys def mod_5_watcher(): start_time = time.time() end_time = start_time + 20 while time.time() < end_time: now = time.time() if int(now) % 5 == 0: sys.stderr.write('Mod 5 at %s\n' % now) else: sys.stdout.write('No mod 5 at %s\n' % now) time.sleep(1) if __name__ == '__main__': daemonize(stdout='/tmp/stdout.log', stderr='/tmp/stderr.log') mod_5_watcher()

这个脚本首先将自身守护进程化,并指定使用/tmp/stdout.log作为标准输出,/tmp/stderr.log作为标准错误。然后在接下来的 20 秒内监控时间,每秒检查一次。如果时间(以秒为单位)能被 5 整除,则写入标准错误;否则写入标准输出。运行该脚本后,我们可以在相应的日志文件中看到结果。

运行脚本后,会立即出现一个新的命令提示符:

jmjones@dinkgutsy:code$ python use_daemonize.py jmjones@dinkgutsy:code$

查看结果文件:

jmjones@dinkgutsy:code$ cat /tmp/stdout.log No mod 5 at 1207272453.18 No mod 5 at 1207272454.18

综上所述,Python 提供了丰富的工具和方法来处理并发编程,包括线程、进程、进程调度和守护进程化等。通过合理运用这些技术,我们可以编写出高效、稳定的 Python 程序。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/20 11:57:03

38、Python编程实用指南:从基础到高级应用

Python编程实用指南&#xff1a;从基础到高级应用1. 回调函数与函数对象回调函数和函数传递的概念可能对一些人来说比较陌生&#xff0c;但深入了解它是很有价值的。在Python中&#xff0c;函数是“一等公民”&#xff0c;这意味着可以像操作对象一样传递和处理函数&#xff0c…

作者头像 李华
网站建设 2026/3/22 22:33:30

460. LFU 缓存

问题描述&#xff1a; 请你为 最不经常使用&#xff08;LFU&#xff09;缓存算法设计并实现数据结构。 实现 LFUCache 类&#xff1a; LFUCache(int capacity) - 用数据结构的容量 capacity 初始化对象int get(int key) - 如果键 key 存在于缓存中&#xff0c;则获取键的值&…

作者头像 李华
网站建设 2026/3/21 7:17:20

Betaflight 2025.12性能突破:智能飞控固件的全方位升级指南

Betaflight 2025.12性能突破&#xff1a;智能飞控固件的全方位升级指南 【免费下载链接】betaflight Open Source Flight Controller Firmware 项目地址: https://gitcode.com/gh_mirrors/be/betaflight 穿越机爱好者们翘首以盼的Betaflight 2025.12版本正式发布&#x…

作者头像 李华
网站建设 2026/3/13 1:10:14

42、Xenomai实时系统:从传统RTOS迁移到Linux的解决方案

Xenomai实时系统:从传统RTOS迁移到Linux的解决方案 1. Xenomai简介 Xenomai是一个实时子系统,能与Linux内核紧密集成,为应用程序提供可预测的响应时间。它基于双内核方法,一个小的协内核与Linux在同一硬件上并行运行。在主机内核支持内存管理单元(MMU)保护时,Xenomai支…

作者头像 李华
网站建设 2026/3/21 16:36:38

43、深入了解Xenomai实时系统

深入了解Xenomai实时系统 1. 核心代码分析 以下是一段关键代码,其主要功能是等待消息并处理超时和中断情况: task = vrtx_current_task(); /** Set up a few status bits the VRTX way, so that inquiries* about the task state will return proper information.*/ task-…

作者头像 李华
网站建设 2026/3/16 16:07:31

基于51单片机的蓝牙智能台灯设计

基于51单片机的蓝牙智能台灯设计 &#xff08;程序&#xff0b;原理图&#xff0b;设计报告&#xff09; 功能介绍 具体功能&#xff1a; 1、当人靠近时&#xff0c;灯亮起&#xff0c;如果人靠得太近&#xff0c;蜂鸣器会发出警报&#xff0c;一段时间后如果没有人&#xff…

作者头像 李华