目录
- 一.并发和并行的区别
- 1.并发
- 2.并行
- 二,进程
- 1.概念
- 2.创建一个进程
- 3.进程间数据不共享
- 4.进程间通信(IPC)
- 三,线程
- 1.线程间资源共享
- 2.互斥锁
- 3.守护线程
- 进程 (Process) 与 线程 (Thread) 核心对比表
- 四,协程
- 1.如何定义协程
- 2.和普通函数的区别,协程的作用
一.并发和并行的区别
在开始之前,先了解并发和并行的概念,搞清楚计算机是怎么‘同时’处理多任务的。
1.并发
- 定义:在同一个时间段内,多个任务向前推进。但是细看可能并不是同时运行的,操作系统通过时间片轮转,在多个任务之间快速地进行切换。
2.并行
- 定义:在同一个时刻,多个任务真正在不同的物理核心上执行。这需要多核cpu的硬件支持。
并发是“应对”多件事的能力,并行是“同时做”多件事的能力。在 Python 网络编程中,我们绝大多数时候是在处理“并发”问题。
二,进程
把操作系统看成一座城市,进程就是一家家独立的工厂。
1.概念
- 进程是操作系统分配资源的最小单位,比如说电脑上同时打开了qq和微信,这就是两个独立的进程。
- 进程间不共享数据,它会拷贝一份在自己的虚拟内存空间中使用
- 进程的优点是极其稳定,一个进程崩溃不会影响其他进程,但是这导致它启动慢,开销大。
2.创建一个进程
如何创建一个进程?
我们需要导入multiprocessing模块,如下:
importmultiprocessingimporttimedefworker():"""子进程执行的函数"""print("子进程正在运行")time.sleep(2)print("子进程结束")if__name__=='__main__':# 1. 创建进程p=multiprocessing.Process(target=worker)# 2. 启动进程p.start()# 3. 等待进程完成(可选)p.join()print("主进程结束")核心三步:
- 创建:multiprocessing.Process(target=函数名)
- 启动:p.start()
- 等待:p.join() (阻塞主进程,直到子进程执行完毕)
关于multiprocessing.Process()还有其它参数,如下:
- target = 目标函数
- args = 函数参数,元组形式
- kwargs = 函数参数,字典形式
- daemon = bool值,是否设置为守护进程,默认为False,也就是主进程会等待子进程结束后才彻底关闭,为True则不管任务是否完成,只要主进程结束子进程立马结束(在python中是这样,但是java中貌似会在执行极小的时间后关闭。
限制:必须在调用 start() 方法之前设置才有效。
注意,在 Windows 系统上,multiprocessing 创建新进程时,会重新导入(import)主模块文件。如果创建进程的代码没有放在 ifname== ‘main’: 保护下,会导致无限递归创建进程,最终报错或程序崩溃。
3.进程间数据不共享
我们先举一个例子:
frommultiprocessingimportProcessimporttime# 定义一个全局列表data_list=[]defadd_data():"""子进程执行的任务"""globaldata_listprint("【子进程】开始向列表添加数据...")data_list.append("Python")data_list.append("FastAPI")data_list.append("TCP")print(f"【子进程】当前的列表内容是:{data_list}")print("【子进程】任务结束。")if__name__=="__main__":print(f"【主进程】初始列表内容:{data_list}")# 创建子进程p=Process(target=add_data)# 启动子进程p.start()# 等待子进程结束p.join()print("-"*30)print(f"【主进程】子进程运行完后,现在的列表内容:{data_list}")iflen(data_list)==0:print("【结论】验证成功:子进程修改的是自己的内存副本,主进程的数据未受影响。")else:print("【结论】验证失败。")运行的结果为:
可以看到,子进程对全局资源的修改仅在自己的内存副本中生效,对主进程完全没有影响。
每一个进程都拥有自己独立的虚拟内存空间。当子进程启动时,它虽然会像‘克隆’一样复制一份父进程的数据快照,但一旦克隆完成,两者就完全无关。进程 A 里的变量 x 对进程 B 来说是物理隔离的,B 既看不见也改不了。这种内存隔离机制确保了系统的稳定性——即使一个子进程因为数据污染而崩溃,也不会波及到主进程或其他进程的安全。
4.进程间通信(IPC)
既然进程间内存是隔离的,那么他们怎么交换数据呢?
最常用的方法是使用Queue(消息队列):
frommultiprocessingimportProcess,Queueimportos,time# 生产者:往队列里放数据defproducer(q):foritemin["数据A","数据B"]:time.sleep(1)q.put(item)print(f"进程{os.getpid()}已发送:{item}")# 消费者:从队列里拿数据defconsumer(q):whileTrue:item=q.get()print(f"进程{os.getpid()}已收到:{item}")ifitem=="数据B":breakif__name__=="__main__":q=Queue()# 创建公共队列p1=Process(target=producer,args=(q,))p2=Process(target=consumer,args=(q,))p1.start()p2.start()p1.join()# 主进程等待子进程结束p2.join()像这样就可以实现进程间的数据交互,Queue的特点是先进先出,而且multiprocessing内部自带了锁机制,这意味着即使有多个进程同时往里塞数据或拿数据,队列也能保证数据不会乱套,也不会出现多个进程抢到同一条数据的情况。
还可以用Queue的参数设置最大长度,如Queue(5)
如果队列已满,再put程序就会阻塞,为空时同理,get时阻塞
可以像下面这样避免卡死:
try:# 尝试放入,如果队列满了立刻抛出异常,不等待q.put_nowait("数据6")# 或者:尝试等待 3 秒,3 秒后还没位置就报错# q.put("数据6", timeout=3)except:print("队列已满,无法继续放入数据!")三,线程
如果进程是独立的工厂,那么线程就是工厂里的工人。
线程是操作系统资源调度的最小单位。
1.线程间资源共享
一个进程内的所有线程,共享该进程的全部资源(如全局变量、内存空间)。这样的好处是轻量级,创建和切换速度很快,通信方便,但是也会有缺陷,一个线程崩溃可能导致整个进程瘫痪。
而且因为大伙都在同一个内存空间,如果不加控制,就会发生竞态条件,多个线程同时修改一个变量,导致数据错乱。
importthreadingimporttimeimportos count=0defa():globalcountprint(f"[线程A] 开始执行,线程ID:{threading.current_thread().name}")foriinrange(1000000):count+=1print(f"[线程A] 执行完成,count ={count}")defb():globalcountprint(f"[线程B] 开始执行,线程ID:{threading.current_thread().name}")foriinrange(1000000):count+=1print(f"[线程B] 执行完成,count ={count}")defmain():print(f"[主线程] 开始,线程ID:{threading.current_thread().name}")print(f"[主线程] 初始 count ={count}\n")t1=threading.Thread(target=a,name='Thread-A')t2=threading.Thread(target=b,name='Thread-B')print(f"[主线程] 启动线程...")t1.start()t2.start()print(f"[主线程] 立即打印 count ={count}(此时子线程还在运行)\n")t1.join()t2.join()print(f"\n[主线程] 所有子线程完成,最终 count ={count}")if__name__=="__main__":main()如果用python3.10之后的版本,由于GIL(全局解释器锁)的切换机制变得更加“稳重”了,所以可能看起来没有问题,但是这么写其实是不安全的,可以用下面的代码模拟出现的问题:
importthreadingimporttime count=0defwork():globalcountfor_inrange(1000):# 次数不需要多,关键看切换temp=count# 强制切换线程,模拟在读取和写入之间发生了极其漫长的“网络延迟”或“系统调度”time.sleep(0)count=temp+1if__name__=='__main__':t1=threading.Thread(target=work)t2=threading.Thread(target=work)t1.start()t2.start()t1.join()t2.join()print(f"预期结果: 2000")print(f"实际结果:{count}")可以看到每次运行的时候count都不一样,原因就是如果线程t1和t2在同一时刻拿到了count这个资源,比如说count为0时,t1对它加1,t2也拿到了为0的count对它加1,两个线程都返回count=1,那么这就加了两次但是count只为1,所以我们需要互斥锁来保证这种操作不会出错。
2.互斥锁
importthreadingimporttime count=0lock=threading.Lock()#这里创建了一把锁defwork():globalcountfor_inrange(1000):withlock:#获取锁,执行完自动释放temp=count time.sleep(0)count=temp+1if__name__=='__main__':t1=threading.Thread(target=work)t2=threading.Thread(target=work)t1.start()t2.start()t1.join()t2.join()print(f"预期结果: 2000")print(f"实际结果:{count}")互斥锁有两种写法:
# 方式1:使用 with 语句(推荐,自动释放锁)withlock:temp=count count=temp+1# 离开 with 块时,锁自动释放# 方式2:手动加锁和解锁lock.acquire()# 获取锁try:temp=count count=temp+1finally:lock.release()# 释放锁线程A: 获取锁 → 读取count → 计算 → 写入count → 释放锁
线程B: ------------------------------------------------------------获取锁 → 读取count → 计算 → 写入count → 释放锁
这样无论多少次结果都是2000。
不过这样会降低运行速度。
3.守护线程
比如说qq,我们打开几个聊天页面后肯定希望它们会随着qq这个进程的关闭而关闭的,这个时候就需要设置线程为守护线程。
其实就是在创建线程的时候用daemon设置一下就行,之前还有setdaemon方法,但是已经过时了。
t=threading.Thread(target=background_task,daemon=True)#或者用t.daemon直接设置进程 (Process) 与 线程 (Thread) 核心对比表
| 维度 | 进程 (Process) | 线程 (Thread) |
|---|---|---|
| 定义 | 操作系统分配资源的最小单位 | 操作系统调度的最小单位 |
| 角色比喻 | 独立的工厂 | 工厂里的工人 |
| 资源消耗 | 重(拥有独立的内存空间) | 轻(共享所属进程的资源) |
| 通信成本 | 高(需 IPC,如队列、管道) | 低(直接读写同一进程的变量) |
| 稳定性 | 强(互不影响,一个崩了其他照跑) | 弱(一崩全崩,主线程会受牵连) |
| 安全机制 | 天然隔离,数据安全 | 需加锁(Lock)防止数据竞争 |
| Python 并行 | 支持多核并行(每个进程独立 GIL) | 受 GIL 限制(仅能多核并发) |
| 适用场景 | CPU 密集型(科学计算、视频渲染) | IO 密集型(TCP 服务、网络爬虫) |
四,协程
协程是单线程内的“轻量级任务”,由用户态事件循环协作式调度;操作系统只看得到线程,并以抢占式方式分配 CPU 给各线程。
可以看到,进程包含线程,线程包含事件循环,事件循环调度协程
1.如何定义协程
使用async关键字修饰的函数就是协程,举个简单的例子:
importasyncioasyncdefsay_hello(name,delay):"""模拟一个异步任务,等待指定时间后打印问候语"""print(f"{name}开始执行...")awaitasyncio.sleep(delay)print(f"{name}完成!")returnf"{name}的结果"asyncdefmain():"""主协程函数"""print("=== 协程示例 ===\n")# 创建多个协程任务task1=say_hello("任务1",2)task2=say_hello("任务2",1)task3=say_hello("任务3",3)# 并发执行所有任务print("开始并发执行任务...\n")results=awaitasyncio.gather(task1,task2,task3)print("\n所有任务完成!")print("结果:",results)# 运行协程if__name__=="__main__":asyncio.run(main())- async def - 定义协程函数,这些函数不会立即执行,而是返回一个协程对象(变量是对协程对象的引用)
- await - 等待异步操作完成,在等待期间可以切换到其他协程执行
- asyncio.gather() - 并发执行多个协程,等待所有任务完成
- asyncio.run() - 运行主协程的入口点
event loop就像一个大脑,面对很多可以执行的任务(task),然后决定执行哪个任务。
在python的asyncio中,同时执行的任务只能有一个,需要每一个协程主动告诉event loop,我这边结束了,其它地方可以开始了。
所有用async修饰的函数本身是一个coroutine function,当它调用的时候(用括号)返回的是一个coroutine object。他不会运行这个函数里面的程序。
那么如何运行呢?
需要两件事,第一进入async模式也就是进入event loop里面控制整个程序的状态, 第二就是把coroutine变成task。
用下面这个例子来说明:
importasyncioimporttimeasyncdefsay_hello(delay,what):awaitasyncio.sleep(delay)print(what)asyncdefmain():print(f'{time.strftime("%X")}')awaitsay_hello(1,'hello')print(f'{time.strftime("%X")}')awaitsay_hello(2,'world')print(f'{time.strftime("%X")}')asyncio.run(main())asyncio.run(coroutine)这句代码相当于在线程中创建了一个event loop,并把传进去的这个coroutine object 变成了一个task放到eventloop中,eventloop一看现在只有一个task,就执行它,进到main函数中。
而await coroutine这句,相当于又把coroutine变成了一个task丢到event loop里,并说我现在要这个task执行完。
在例子中,就是进到了第一个say_hello函数里,而asyncio.sleep(delay)这句其实也是一个coroutine,所以还要等这个task执行完,也就是1秒后才打印‘hello’。
现在eventloop中只有2个task,分别是say_hello(由await say_hello创建)和内部的asyncio.sleep(由await asyncio.sleep创建)。但实际执行时,当进入say_hello后,遇到await asyncio.sleep(1),事件循环会挂起当前的say_hello任务,并尝试运行其他就绪的任务。此时没有其他任务,所以事件循环就处于空闲等待状态,直到1秒后sleep完成,事件循环再重新唤醒say_hello任务,继续执行打印’hello’。
随后say_hello任务结束,控制权返回给main中的await say_hello(1,‘hello’),main得以继续,打印第二个时间戳,然后遇到第二个await say_hello(2,‘world’),同样再经历一次挂起、等待、恢复的过程。
整个程序按顺序完成了两个延时操作,总耗时约3秒,但并没有体现并发。如果想真正并发执行多个任务,就需要用asyncio.create_task将多个协程提前包装成任务,然后用asyncio.gather或await asyncio.wait等来同时等待它们,这样事件循环才能在sleep时切换执行另一个任务,从而节省总时间。
如下:
importasyncioimporttimeasyncdefsay_hello(delay,what):awaitasyncio.sleep(delay)print(what)asyncdefmain():task1=asyncio.create_task(say_hello(1,'hello'))task2=asyncio.create_task(say_hello(2,'world'))print(f'{time.strftime("%X")}')awaittask1awaittask2print(f'{time.strftime("%X")}')asyncio.run(main())create_task相当于提前把task1,task2注册到eventloop里面了,当我们执行await task的时候,event loop就会有三个task,就是执行task1时,task2已经在event loop中,就几乎可以在等task1 的 1 秒的时候同时执行task2,最终我们会发现这里只需要2秒就运行完了。
不过如果有多个task,我们就需要gather了。
importasyncioimporttimeasyncdefsay_hello(delay,what):awaitasyncio.sleep(delay)print(what)returnf'{delay}-{what}'asyncdefmain():task1=asyncio.create_task(say_hello(1,'hello'))task2=asyncio.create_task(say_hello(2,'world'))print(f'{time.strftime("%X")}')# 使用 gather 并发执行并收集返回值results=awaitasyncio.gather(task1,task2)print(f'{time.strftime("%X")}')print(f'返回值:{results}')# 也可以分别访问每个任务的返回值print(f'task1 的返回值:{results[0]}')print(f'task2 的返回值:{results[1]}')asyncio.run(main())gather返回的并不是一个coroutine,而是future,也是可以填在await后面的。而且最终的返回结果可以看到是一个list,里面包含了各个task的返回值,而且是有序的。
gather还可以直接传coroutine而不是task,可以少几行代码。
这样写的好处是可以更直观地看到多个task并发执行以及更方便。
2.和普通函数的区别,协程的作用
如果我们要从3个网站获取数据。
用普通函数执行:
importtimedefdownload_sync(url):print(f"开始下载{url}")time.sleep(2)# 模拟网络延迟,阻塞线程print(f"完成下载{url}")returnf"数据 from{url}"defmain_sync():urls=["a.com","b.com","c.com"]start=time.time()results=[]forurlinurls:result=download_sync(url)# 一个一个来,等2秒results.append(result)print(f"全部下载完成,耗时{time.time()-start:.2f}秒")main_sync()- 总耗时 = 2秒 × 3 = 6秒
- 在 time.sleep(2) 期间,线程完全阻塞,不能做任何其他事。
而使用协程后:
importasyncioasyncdefdownload_async(url):print(f"开始下载{url}")awaitasyncio.sleep(2)# 异步等待,主动让出控制权print(f"完成下载{url}")returnf"数据 from{url}"asyncdefmain_async():urls=["a.com","b.com","c.com"]start=time.time()# 创建3个任务,让它们并发执行#asyncio.create_task() 函数用于将协程对象包装成 Task 对象并调度其执行。tasks=[asyncio.create_task(download_async(url))forurlinurls]results=awaitasyncio.gather(*tasks)# 等待所有任务完成print(f"全部下载完成,耗时{time.time()-start:.2f}秒")asyncio.run(main_async())- 总耗时 ≈ 2秒(取最长的那个任务)
- 三个 await asyncio.sleep(2) 几乎同时开始等待,事件循环在等待期间不阻塞,所以总时间只有 2 秒。
总的来说,协程最大的作用是在单线程内,用极低的成本实现高并发 I/O 操作,大幅提升程序吞吐量。