news 2026/3/20 6:26:58

【C/C++】线程池详解

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【C/C++】线程池详解

线程池详解 (Thread Pool Deep Dive)

什么是线程池?(What is a Thread Pool?)

线程池是一种多线程处理模式,它预先创建一定数量的线程,将任务放入队列中,由空闲的线程从队列中取出任务并执行。

为什么需要线程池?

频繁创建和销毁线程的开销很大。线程池通过复用已创建的线程来避免这种开销,同时可以控制并发线程的数量,防止系统资源耗尽。

English Translation: A thread pool pre-creates a fixed number of threads. Tasks are placed in a queue, and idle threads pick tasks from the queue to execute. This avoids the overhead of frequent thread creation/destruction and controls the level of concurrency.


架构概览 (Architecture Overview)

┌─────────────────────────────────────────────────────────────────────────────┐ │ ThreadPool │ │ │ │ ┌─────────────┐ ┌─────────────────────────────────┐ │ │ │ Main Thread │ │ Task Queue │ │ │ │ │ submit │ ┌──────┬──────┬──────┬──────┐ │ │ │ │ pool. │────────▶│ │ Task │ Task │ Task │ Task │ │ │ │ │ submit(f) │ │ │ 1 │ 2 │ 3 │ 4 │ │ │ │ │ │ │ └──────┴──────┴──────┴──────┘ │ │ │ └─────────────┘ └───────────────┬─────────────────┘ │ │ │ │ │ │ fetch task │ │ ▼ │ │ ┌───────────────────────────────┐ │ │ │ Worker Threads │ │ │ │ │ │ │ │ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │ │ │ W1 │ │ W2 │ │ W3 │ ... │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─────┘ └─────┘ └─────┘ │ │ │ │ │ │ │ └───────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ Synchronization Primitives │ │ │ │ │ │ │ │ std::mutex mutex_ std::condition_variable cv_ │ │ │ │ (保护队列访问) (线程间通信) │ │ │ │ (Protects queue) (Thread communication) │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘

核心组件 (Core Components)

┌────────────────────────────────────────────────────────────────┐ │ ThreadPool 成员变量 │ │ (Member Variables) │ ├────────────────────────────────────────────────────────────────┤ │ │ │ workers_ 任务队列 同步原语 │ │ 工作线程 tasks_ mutex_ + cv_ │ │ │ │ ┌─────────┐ ┌─────────────┐ ┌─────────────────┐ │ │ │ thread │ │ std::queue │ │ std::mutex │ │ │ │ thread │ │ <function │ │ │ │ │ │ thread │ │ <void()>> │ │ std::condition │ │ │ │ thread │ │ │ │ _variable │ │ │ └─────────┘ └─────────────┘ └─────────────────┘ │ │ │ │ stop_ │ │ 停止标志 (bool) │ │ │ └────────────────────────────────────────────────────────────────┘

任务提交流程 (Task Submit Flow)

Main Thread ThreadPool │ │ │ pool.submit(task) │ │───────────────────────────────────────▶│ │ │ │ ┌─────────────┴─────────────┐ │ │ 1. 获取 mutex_ 锁 │ │ │ Acquire mutex_ lock │ │ ├───────────────────────────┤ │ │ 2. 检查 stop_ 标志 │ │ │ Check stop_ flag │ │ ├───────────────────────────┤ │ │ 3. 任务入队 │ │ │ Push task to queue │ │ ├───────────────────────────┤ │ │ 4. 释放锁 │ │ │ Release lock │ │ ├───────────────────────────┤ │ │ 5. cv_.notify_one() │ │ │ Wake one worker │ │ └─────────────┬─────────────┘ │ │ │◀───────────────────────────────────────│ │ return │

工作线程生命周期 (Worker Thread Lifecycle)

┌──────────────────┐ │ Start │ │ 线程启动 │ └────────┬─────────┘ │ ▼ ┌──────────────────────────────┐ │ 获取锁 (Acquire Lock) │ └──────────────┬───────────────┘ │ ▼ ┌──────────────────────────────┐ │ cv_.wait(lock, predicate) │◀─────────────┐ │ │ │ │ 等待条件: │ │ │ stop_ || !tasks_.empty() │ │ │ │ │ │ Wait for: │ │ │ stop OR queue not empty │ │ └──────────────┬───────────────┘ │ │ │ │ (被唤醒 woken up) │ ▼ │ ┌──────────────────────────────┐ │ │ stop_ && tasks_.empty() ? │ │ └──────────────┬───────────────┘ │ │ │ ┌────────────┴────────────┐ │ │ YES │ NO │ ▼ ▼ │ ┌───────────────────┐ ┌──────────────────────────┐ │ │ return │ │ 取出任务 │ │ │ 线程退出 │ │ Fetch task from queue │ │ └───────────────────┘ └────────────┬─────────────┘ │ │ │ ▼ │ ┌──────────────────────────┐ │ │ 释放锁 │ │ │ Release lock │ │ └────────────┬─────────────┘ │ │ │ ▼ │ ┌──────────────────────────┐ │ │ 执行任务 (无锁) │ │ │ Execute task (no lock) │ │ └────────────┬─────────────┘ │ │ │ └─────────────────┘

关闭流程 (Shutdown Flow)

Main Thread Worker Threads │ │ │ ~ThreadPool() │ (waiting on cv_) │ │ ▼ │ ┌─────────────────┐ │ │ 获取锁 │ │ │ Acquire lock │ │ └───────┬─────────┘ │ │ │ ▼ │ ┌─────────────────┐ │ │ stop_ = true │ │ └───────┬─────────┘ │ │ │ ▼ │ ┌─────────────────┐ │ │ 释放锁 │ │ │ Release lock │ │ └───────┬─────────┘ │ │ │ ▼ │ ┌─────────────────┐ cv_.notify_all() │ │ 唤醒所有线程 │ ──────────────────────▶ │ │ Wake all │ │ └───────┬─────────┘ │ │ ▼ │ ┌─────────────────┐ │ │ 检查条件 │ │ │ Check condition │ │ │ │ │ │ stop_=true │ │ │ queue empty? │ │ └────────┬────────┘ │ │ │ ▼ │ ┌─────────────────┐ │ │ 是 → 退出 │ │ │ Yes → return │ │ │ │ │ │ 否 → 处理剩余 │ │ │ No → process │ │ │ remaining tasks │ │ └────────┬────────┘ │ │ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ │ join() 所有线程 │◀───────────────│ 线程结束 │ │ Wait for all │ │ Thread exits │ └───────┬─────────┘ └─────────────────┘ │ ▼ ┌─────────────────┐ │ 析构完成 │ │ Destructor done │ └─────────────────┘

条件变量等待机制 (Condition Variable Wait Mechanism)

┌───────────────────────────────────────────────────────────────────────────┐ │ │ │ cv_.wait(lock, predicate) 内部实现等价于: │ │ Internally equivalent to: │ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ while (!predicate()) { // 循环检查条件 │ │ │ │ cv_.wait(lock); // Loop until predicate is true │ │ │ │ } │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ ├───────────────────────────────────────────────────────────────────────────┤ │ │ │ 为什么需要循环? (Why loop?) │ │ │ │ 情况1: 虚假唤醒 (Spurious Wakeup) │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Thread A: waiting... → (spurious wakeup!) → 没有任务! │ │ │ │ → No task! │ │ │ │ │ │ │ │ 如果用 if: 会尝试访问空队列 → 崩溃! │ │ │ │ With if: would access empty queue → crash! │ │ │ │ │ │ │ │ 如果用 while: 重新检查条件 → 继续等待 │ │ │ │ With while: recheck condition → continue waiting │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ 情况2: 任务被抢走 (Stolen Task) │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Time Thread A Thread B Queue │ │ │ │ ──── ──────── ──────── ───── │ │ │ │ 1 waiting waiting empty │ │ │ │ 2 (notified) (also wakes) [task] │ │ │ │ 3 gets lock waiting [task] │ │ │ │ 4 takes task gets lock empty │ │ │ │ 5 working queue empty! empty │ │ │ │ │ │ │ │ Thread B 必须重新检查,否则访问空队列 │ │ │ │ Thread B must recheck or it accesses empty queue │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ └───────────────────────────────────────────────────────────────────────────┘

锁的作用域问题 (Lock Scope Issues)

正确 vs 错误 (Correct vs Wrong)

┌─────────────────────────────────┐ ┌─────────────────────────────────┐ │ ❌ 错误 (WRONG) │ │ ✅ 正确 (CORRECT) │ ├─────────────────────────────────┤ ├─────────────────────────────────┤ │ │ │ │ │ void worker() { │ │ void worker() { │ │ while (true) { │ │ while (true) { │ │ unique_lock lk(mutex_); │ │ function<void()> task; │ │ cv_.wait(lk, ...); │ │ { │ │ task = move(queue.front); │ │ unique_lock lk(mutex_); │ │ queue.pop(); │ │ cv_.wait(lk, ...); │ │ │ │ task = move(queue.front);│ │ task(); ← 持有锁执行! │ │ queue.pop(); │ │ (Hold lock!) │ │ } ← 锁在此释放 │ │ } │ │ (Lock released here) │ │ } │ │ task(); ← 无锁执行 │ │ │ │ (No lock held) │ │ │ │ } │ │ │ │ } │ └─────────────────────────────────┘ └─────────────────────────────────┘ │ │ ▼ ▼ ┌─────────────────────────────────┐ ┌─────────────────────────────────┐ │ 结果: 所有任务串行执行 │ │ 结果: 任务真正并行执行 │ │ Result: All tasks serialized │ │ Result: True parallel execution│ │ │ │ │ │ W1: ████████ │ │ W1: ████ │ │ W2: ████████ │ │ W2: ████ │ │ W3: ████████ │ │ W3: ████ │ │ W4: ████ │ W4: ████ │ │ │ │ │ │ 时间 ──────────────────────▶ │ │ 时间 ────▶ │ └─────────────────────────────────┘ └─────────────────────────────────┘

析构函数死锁问题 (Destructor Deadlock)

┌───────────────────────────────────────────────────────────────────────────┐ │ ❌ 死锁场景 (Deadlock) │ ├───────────────────────────────────────────────────────────────────────────┤ │ │ │ Main Thread Worker Thread │ │ │ │ │ │ ▼ │ │ │ ┌─────────────┐ │ │ │ │ lock(mutex) │ │ │ │ └──────┬──────┘ │ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ stop = true │ │ want lock │ │ │ └──────┬──────┘ │ 想要获取锁 │ │ │ │ └──────┬──────┘ │ │ ▼ │ │ │ ┌─────────────┐ │ │ │ │ join() │ ─────等待─────┐ │ │ │ │ 等待线程结束 │ │ │ │ │ └─────────────┘ │ │ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────────────────┐ │ │ │ │ │ │ │ 永远等待 (Forever) │ │ │ │ 💀 DEADLOCK 💀 │ │ │ │ │ │ │ └─────────────────────────┘ │ │ │ └───────────────────────────────────────────────────────────────────────────┘ ┌───────────────────────────────────────────────────────────────────────────┐ │ ✅ 正确做法 (Correct) │ ├───────────────────────────────────────────────────────────────────────────┤ │ │ │ ~ThreadPool() { │ │ { ◀─── 作用域块 │ │ lock_guard lk(mutex); Scoped block │ │ stop_ = true; │ │ } ◀─── 锁在此释放 │ │ cv_.notify_all(); Lock released here │ │ for (auto& t : workers_) │ │ t.join(); ◀─── 此时不持有锁 │ │ } No lock held here │ │ │ └───────────────────────────────────────────────────────────────────────────┘

shared_ptr 捕获机制 (shared_ptr Capture in Lambda)

┌───────────────────────────────────────────────────────────────────────────┐ │ submitWithFuture() 内存管理 │ │ Memory Management │ ├───────────────────────────────────────────────────────────────────────────┤ │ │ │ auto task = make_shared<packaged_task<T()>>(f); │ │ │ │ ┌────────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ task (shared_ptr) │ │ │ │ │ │ │ │ │ │ ref_count = 1 │ │ │ │ ▼ │ │ │ │ ┌─────────────────────┐ │ │ │ │ │ packaged_task │ │ │ │ │ │ on heap │ │ │ │ │ └─────────────────────┘ │ │ │ │ │ │ │ └────────────────────────────────────────────────────────────────┘ │ │ │ │ tasks_.push([task]() { (*task)(); }); ◀─── 按值捕获 │ │ Capture by value │ │ │ │ ┌────────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ task (local) lambda (in queue) │ │ │ │ │ │ │ │ │ │ │ ref_count = 2 │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ │ │ │ │ ┌─────────────────────────────┐ │ │ │ │ │ packaged_task │ │ │ │ │ │ on heap │ │ │ │ │ └─────────────────────────────┘ │ │ │ │ │ │ │ └────────────────────────────────────────────────────────────────┘ │ │ │ │ return fut; // submit() 返回,局部 task 销毁 │ │ // submit() returns, local task destroyed │ │ │ │ ┌────────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ lambda (in queue) │ │ │ │ │ │ │ │ │ │ ref_count = 1 ◀─── 仍然存活! │ │ │ │ │ Still alive! │ │ │ │ ▼ │ │ │ │ ┌─────────────────────┐ │ │ │ │ │ packaged_task │ │ │ │ │ └─────────────────────┘ │ │ │ │ │ │ │ └────────────────────────────────────────────────────────────────┘ │ │ │ ├───────────────────────────────────────────────────────────────────────────┤ │ │ │ ❌ 如果用引用捕获 [&task]: │ │ If captured by reference: │ │ │ │ submit() 返回 → task 销毁 → lambda 持有悬空指针 → 💀 崩溃 │ │ submit returns → task destroyed → lambda has dangling ptr → crash │ │ │ └───────────────────────────────────────────────────────────────────────────┘

完整实现 (Complete Implementation)

#include<condition_variable>#include<functional>#include<future>#include<memory>#include<mutex>#include<queue>#include<thread>#include<vector>#include<stdexcept>classThreadPool{public:// 构造函数:创建 n 个工作线程// Constructor: spawn n worker threadsexplicitThreadPool(size_t n):stop_(false){for(size_t i=0;i<n;i++){// 每个线程运行 worker() 函数// Each thread runs the worker() functionworkers_.emplace_back([this]{worker();});}}// 禁止拷贝和赋值// Disable copy and assignmentThreadPool(constThreadPool&)=delete;ThreadPool&operator=(constThreadPool&)=delete;// 析构函数:优雅关闭// Destructor: graceful shutdown~ThreadPool(){// 设置停止标志(需要加锁)// Set stop flag (requires lock){std::lock_guard<std::mutex>lk(mutex_);stop_=true;}// 唤醒所有等待的线程// Wake up all waiting threadscv_.notify_all();// 等待所有线程结束// Wait for all threads to finishfor(auto&t:workers_){t.join();}}// 提交任务(基础版本)// Submit task (basic version)voidsubmit(std::function<void()>task){{std::lock_guard<std::mutex>lk(mutex_);// 如果已经停止,拒绝新任务// Reject new tasks if already stoppedif(stop_){throwstd::runtime_error("submit on stopped ThreadPool");}tasks_.push(std::move(task));}// 通知一个等待的线程// Notify one waiting threadcv_.notify_one();}// 提交任务(带返回值版本)// Submit task (with return value)template<typenameF>autosubmitWithFuture(F&&f)->std::future<decltype(f())>{usingReturnType=decltype(f());// 将任务包装成 packaged_task// Wrap the task in a packaged_taskautotask=std::make_shared<std::packaged_task<ReturnType()>>(std::forward<F>(f));// 获取 future 用于获取结果// Get future to retrieve the result laterstd::future<ReturnType>fut=task->get_future();{std::lock_guard<std::mutex>lk(mutex_);if(stop_){throwstd::runtime_error("submit on stopped ThreadPool");}// 将 packaged_task 包装成 void() 函数// Wrap packaged_task into a void() function// 注意:task 是按值捕获的 shared_ptr// Note: task is a shared_ptr captured by valuetasks_.push([task](){(*task)();});}cv_.notify_one();returnfut;}private:// 工作线程函数// Worker thread functionvoidworker(){while(true){std::function<void()>task;{std::unique_lock<std::mutex>lk(mutex_);// 等待条件:有任务或者需要停止// Wait condition: has task or need to stop// 使用谓词形式自动处理虚假唤醒// Predicate form automatically handles spurious wakeupscv_.wait(lk,[this]{returnstop_||!tasks_.empty();});// 如果停止且队列为空,退出线程// If stopped and queue empty, exit threadif(stop_&&tasks_.empty()){return;}// 取出任务// Fetch tasktask=std::move(tasks_.front());tasks_.pop();}// 锁已释放,执行任务// Lock released, execute task// 这样其他线程可以同时取任务// This allows other threads to fetch tasks concurrentlytask();}}std::vector<std::thread>workers_;// 工作线程std::queue<std::function<void()>>tasks_;// 任务队列std::mutex mutex_;// 保护共享数据std::condition_variable cv_;// 线程间通信boolstop_;// 停止标志};

使用示例 (Usage Example)

#include<iostream>#include<chrono>intmain(){// 创建 4 个工作线程的线程池// Create a thread pool with 4 workersThreadPoolpool(4);// 提交简单任务// Submit simple tasksfor(inti=0;i<8;i++){pool.submit([i]{std::cout<<"Task "<<i<<" running on thread "<<std::this_thread::get_id()<<std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));});}// 提交带返回值的任务// Submit tasks with return valuesautofuture1=pool.submitWithFuture([]{return42;});autofuture2=pool.submitWithFuture([]{returnstd::string("hello from thread pool");});// 获取结果(会阻塞直到任务完成)// Get results (blocks until task completes)std::cout<<"Result 1: "<<future1.get()<<std::endl;std::cout<<"Result 2: "<<future2.get()<<std::endl;return0;// pool 析构时会等待所有任务完成// pool destructor waits for all tasks to complete}

总结 (Summary)

┌───────────────────────────────────────────────────────────────────────────┐ │ 关键要点 (Key Takeaways) │ ├───────────────────────────────────────────────────────────────────────────┤ │ │ │ ✅ 使用 while 循环或谓词等待条件变量 │ │ Use while loop or predicate for condition variable wait │ │ │ │ ✅ 执行任务前释放锁,允许真正的并行 │ │ Release lock before executing task for true parallelism │ │ │ │ ✅ 析构时先释放锁,再 join 线程 │ │ Release lock before joining threads in destructor │ │ │ │ ✅ Lambda 捕获 shared_ptr 时用值捕获,避免悬空引用 │ │ Capture shared_ptr by value in lambda to avoid dangling reference │ │ │ │ ✅ 检查停止标志,优雅处理关闭后的提交请求 │ │ Check stop flag, gracefully handle submissions after shutdown │ │ │ │ ✅ 禁止拷贝构造和赋值 │ │ Disable copy constructor and assignment │ │ │ └───────────────────────────────────────────────────────────────────────────┘
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/14 13:50:25

告别截图!Antigravity 集成 Figma MCP 打造像素级还原的 AI 编程体验

🚀 告别截图!Antigravity 集成 Figma MCP 打造像素级还原的 AI 编程体验 作为开发者,你是不是经常截图发给 AI 让它写 CSS,结果写出来的效果总是“差强人意”?👀 间距不对、颜色有色差、字体搞错…… 今天我们要玩点高端的!利用 MCP (Model Context Protocol) 协议,…

作者头像 李华
网站建设 2026/3/16 2:13:07

实测对比后!千笔,最受欢迎的AI论文写作软件

你是否曾为论文选题发愁&#xff1f;是否在深夜面对空白文档无从下笔&#xff1f;是否反复修改却总觉得表达不够专业&#xff1f;论文写作不仅是对知识的考验&#xff0c;更是对耐心和效率的挑战。对于无数本科生来说&#xff0c;从开题到定稿&#xff0c;每一步都充满压力。而…

作者头像 李华
网站建设 2026/3/20 1:34:40

MySQL之InnoDB单表推荐2000W记录缘由

MySQL之InnoDB单表推荐2000W记录缘由 一、概述 在MySQL数据库设计实践中&#xff0c;业界普遍推荐InnoDB单表记录数控制在2000万&#xff08;20M&#xff09;左右。这个数值并非MySQL的硬性限制&#xff0c;而是基于性能、维护成本和架构设计等多个维度综合考量的经验值。本文将…

作者头像 李华
网站建设 2026/3/15 11:28:34

lvgl v8之label使用示例

void lvgl_test() {lv_obj_t* label1 = lv_label_create(lv_scr_act());

作者头像 李华
网站建设 2026/3/19 17:03:43

导师严选! AI论文软件 千笔ai写作 VS 文途AI,继续教育写作者必备!

随着人工智能技术的迅猛迭代与普及&#xff0c;AI辅助写作工具已逐步渗透到高校学术写作场景中&#xff0c;成为专科生、本科生、研究生完成毕业论文不可或缺的辅助手段。越来越多面临毕业论文压力的学生&#xff0c;开始依赖各类AI工具简化写作流程、提升创作效率。但与此同时…

作者头像 李华