news 2025/12/18 8:19:13

从概念开始开始C++管道编程

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
从概念开始开始C++管道编程

第一章:管道编程的核心概念

1.1 什么是管道?

管道是UNIX和类UNIX系统中最古老、最基础的进程间通信(IPC)机制之一。你可以将它想象成现实世界中的水管:数据像水流一样从一个进程"流"向另一个进程。

核心特征

  • 半双工通信:数据只能单向流动(要么从A到B,要么从B到A)
  • 字节流导向:没有消息边界,数据是连续的字节流
  • 基于文件描述符:使用与文件操作相同的接口
  • 内核缓冲区:数据在内核缓冲区中暂存

1.2 管道的工作原理

让我们通过一个简单的比喻来理解管道的工作原理:

想象两个进程要通过管道通信:

进程A(写端) → [内核缓冲区] → 进程B(读端)

内核缓冲区的作用

  1. 当进程A写入数据时,数据先进入内核缓冲区
  2. 进程B从缓冲区读取数据
  3. 如果缓冲区空,读操作会阻塞(等待数据)
  4. 如果缓冲区满,写操作会阻塞(等待空间)

匿名管道的关键限制

  • 只能用于有"亲缘关系"的进程间通信(通常是父子进程或兄弟进程)
  • 生命周期随进程结束而结束
  • 无法在无关进程间使用

第二章:入门实践——创建第一个管道

2.1 理解文件描述符

在深入代码之前,必须理解文件描述符的概念:

// 每个进程都有这三个标准文件描述符:// 0 - 标准输入(stdin) → 通常从键盘读取// 1 - 标准输出(stdout) → 通常输出到屏幕// 2 - 标准错误(stderr) → 错误信息输出// 当创建管道时,系统会分配两个新的文件描述符:// pipefd[0] - 用于读取的端// pipefd[1] - 用于写入的端

2.2 创建第一个管道程序

让我们从最简单的例子开始:

#include<iostream>#include<unistd.h>// pipe(), fork(), read(), write()#include<string.h>// strlen()#include<sys/wait.h>// wait()intmain(){intpipefd[2];// 管道文件描述符数组charbuffer[100];// 步骤1:创建管道// pipe() 返回0表示成功,-1表示失败if(pipe(pipefd)==-1){std::cerr<<"管道创建失败"<<std::endl;return1;}// 步骤2:创建子进程pid_t pid=fork();if(pid==-1){std::cerr<<"进程创建失败"<<std::endl;return1;}if(pid==0){// 子进程代码// 关闭不需要的写端close(pipefd[1]);// 从管道读取数据intbytes_read=read(pipefd[0],buffer,sizeof(buffer));if(bytes_read>0){std::cout<<"子进程收到: "<<buffer<<std::endl;}close(pipefd[0]);return0;}else{// 父进程代码// 关闭不需要的读端close(pipefd[0]);constchar*message="Hello from parent!";// 向管道写入数据write(pipefd[1],message,strlen(message));// 关闭写端,表示数据发送完毕close(pipefd[1]);// 等待子进程结束wait(nullptr);}return0;}

2.3 关键原理分析

为什么需要关闭不用的描述符?

  1. 资源管理:每个进程都有文件描述符限制,及时关闭避免泄漏
  2. 正确终止:读进程需要知道何时没有更多数据
    • 所有写端关闭 → 读端返回0(EOF)
    • 否则读端会一直等待

管道的阻塞行为

  • 读阻塞:当管道空且仍有写端打开时,读操作会阻塞
  • 写阻塞:当管道满(默认64KB),写操作会阻塞
  • 非阻塞模式:可以通过fcntl()设置O_NONBLOCK

第三章:中级应用——双向通信与复杂管道

3.1 实现双向通信

单个管道只能单向通信,要实现双向通信,我们需要两个管道:

#include<iostream>#include<unistd.h>#include<string>classBidirectionalPipe{private:intparent_to_child[2];// 父→子管道intchild_to_parent[2];// 子→父管道public:BidirectionalPipe(){// 创建两个管道if(pipe(parent_to_child)==-1||pipe(child_to_parent)==-1){throwstd::runtime_error("管道创建失败");}}~BidirectionalPipe(){closeAll();}voidparentWrite(conststd::string&message){write(parent_to_child[1],message.c_str(),message.length());}std::stringparentRead(){charbuffer[256];intn=read(child_to_parent[0],buffer,sizeof(buffer)-1);if(n>0){buffer[n]='\0';returnstd::string(buffer);}return"";}voidchildWrite(conststd::string&message){write(child_to_parent[1],message.c_str(),message.length());}std::stringchildRead(){charbuffer[256];intn=read(parent_to_child[0],buffer,sizeof(buffer)-1);if(n>0){buffer[n]='\0';returnstd::string(buffer);}return"";}voidcloseParentSide(){close(parent_to_child[1]);// 关闭父进程的写端close(child_to_parent[0]);// 关闭父进程的读端}voidcloseChildSide(){close(parent_to_child[0]);// 关闭子进程的读端close(child_to_parent[1]);// 关闭子进程的写端}private:voidcloseAll(){close(parent_to_child[0]);close(parent_to_child[1]);close(child_to_parent[0]);close(child_to_parent[1]);}};

3.2 管道链的实现

管道链是UNIX shell中|操作符的基础,让我们实现一个简单的版本:

#include<vector>#include<array>classPipeline{private:// 存储多个命令std::vector<std::vector<std::string>>commands;public:voidaddCommand(conststd::vector<std::string>&cmd){commands.push_back(cmd);}voidexecute(){std::vector<int>prev_pipe_read;// 前一个管道的读端for(size_t i=0;i<commands.size();++i){intpipefd[2];// 如果不是最后一个命令,创建管道if(i<commands.size()-1){if(pipe(pipefd)==-1){throwstd::runtime_error("管道创建失败");}}pid_t pid=fork();if(pid==0){// 子进程代码// 设置输入重定向(从上一个管道读取)if(!prev_pipe_read.empty()){dup2(prev_pipe_read[0],STDIN_FILENO);close(prev_pipe_read[0]);}// 设置输出重定向(写入下一个管道)if(i<commands.size()-1){dup2(pipefd[1],STDOUT_FILENO);close(pipefd[0]);close(pipefd[1]);}// 准备exec参数std::vector<char*>args;for(constauto&arg:commands[i]){args.push_back(const_cast<char*>(arg.c_str()));}args.push_back(nullptr);// 执行命令execvp(args[0],args.data());// exec失败才执行到这里exit(1);}else{// 父进程代码// 关闭不再需要的描述符if(!prev_pipe_read.empty()){close(prev_pipe_read[0]);}if(i<commands.size()-1){close(pipefd[1]);// 父进程不需要写端prev_pipe_read={pipefd[0]};// 保存读端用于下一个进程}}}// 父进程等待所有子进程for(size_t i=0;i<commands.size();++i){wait(nullptr);}}};// 使用示例intmain(){Pipeline pipeline;// 模拟: ls -l | grep ".cpp" | wc -lpipeline.addCommand({"ls","-l"});pipeline.addCommand({"grep","\\.cpp"});pipeline.addCommand({"wc","-l"});pipeline.execute();return0;}

3.3 命名管道(FIFO)的深入理解

命名管道与匿名管道的区别

特性匿名管道命名管道(FIFO)
持久性进程结束即消失文件系统中有实体文件
进程关系必须有亲缘关系任意进程都可访问
创建方式pipe()系统调用mkfifo()函数
访问控制基于文件描述符继承基于文件权限

创建和使用命名管道

#include<iostream>#include<fcntl.h>#include<sys/stat.h>#include<unistd.h>classNamedPipe{private:std::string path;intfd;public:NamedPipe(conststd::string&pipePath):path(pipePath){// 创建命名管道(如果不存在)if(mkfifo(path.c_str(),0666)==-1){// 如果已存在,忽略EEXIST错误if(errno!=EEXIST){throwstd::runtime_error("无法创建命名管道");}}}// 作为读取者打开voidopenForReading(boolnonblock=false){intflags=O_RDONLY;if(nonblock)flags|=O_NONBLOCK;fd=open(path.c_str(),flags);if(fd==-1){throwstd::runtime_error("无法打开命名管道进行读取");}}// 作为写入者打开voidopenForWriting(boolnonblock=false){intflags=O_WRONLY;if(nonblock)flags|=O_NONBLOCK;fd=open(path.c_str(),flags);if(fd==-1){throwstd::runtime_error("无法打开命名管道进行写入");}}// 读取数据std::stringreadData(size_t max_size=1024){charbuffer[max_size];ssize_t bytes=read(fd,buffer,max_size-1);if(bytes>0){buffer[bytes]='\0';returnstd::string(buffer);}return"";}// 写入数据voidwriteData(conststd::string&data){write(fd,data.c_str(),data.length());}~NamedPipe(){if(fd!=-1){close(fd);}// 可以选择是否删除管道文件// unlink(path.c_str());}};

第四章:高级主题——性能与并发

4.1 非阻塞管道操作

非阻塞管道在某些场景下非常有用,比如同时监控多个管道:

#include<fcntl.h>classNonBlockingPipe{private:intpipefd[2];public:NonBlockingPipe(){if(pipe(pipefd)==-1){throwstd::runtime_error("管道创建失败");}// 设置为非阻塞模式setNonBlocking(pipefd[0]);setNonBlocking(pipefd[1]);}private:voidsetNonBlocking(intfd){intflags=fcntl(fd,F_GETFL,0);if(flags==-1){throwstd::runtime_error("获取文件状态失败");}if(fcntl(fd,F_SETFL,flags|O_NONBLOCK)==-1){throwstd::runtime_error("设置非阻塞模式失败");}}public:// 非阻塞读取booltryRead(std::string&result){charbuffer[1024];ssize_t bytes=read(pipefd[0],buffer,sizeof(buffer)-1);if(bytes>0){buffer[bytes]='\0';result=buffer;returntrue;}elseif(bytes==-1&&errno==EAGAIN){// 没有数据可读(非阻塞模式)returnfalse;}returnfalse;// 错误或EOF}};

4.2 使用select实现多路复用

当需要同时监控多个管道时,select是一个非常有效的工具:

#include<sys/select.h>#include<vector>classPipeMonitor{private:std::vector<int>read_fds;// 需要监控的读描述符public:voidaddPipe(intread_fd){read_fds.push_back(read_fd);}// 监控所有管道,返回有数据可读的管道列表std::vector<int>monitor(inttimeout_sec=0){fd_set read_set;FD_ZERO(&read_set);intmax_fd=0;for(intfd:read_fds){FD_SET(fd,&read_set);if(fd>max_fd)max_fd=fd;}structtimevaltimeout;timeout.tv_sec=timeout_sec;timeout.tv_usec=0;// 使用select等待数据intready=select(max_fd+1,&read_set,nullptr,nullptr,timeout_sec>=0?&timeout:nullptr);std::vector<int>ready_fds;if(ready>0){for(intfd:read_fds){if(FD_ISSET(fd,&read_set)){ready_fds.push_back(fd);}}}returnready_fds;}};

4.3 零拷贝技术:splice()

Linux提供了高级的系统调用来优化管道性能,避免不必要的数据拷贝:

#include<fcntl.h>classHighPerformancePipe{private:intpipefd[2];public:HighPerformancePipe(){if(pipe(pipefd)==-1){throwstd::runtime_error("管道创建失败");}}// 使用splice实现零拷贝数据传输// 将数据从一个文件描述符直接移动到管道ssize_ttransferFrom(intsource_fd,size_t len){// splice从source_fd读取数据,直接写入管道// 避免了用户空间的内存拷贝returnsplice(source_fd,nullptr,// 源文件描述符pipefd[1],nullptr,// 目标管道写端len,// 传输长度SPLICE_F_MOVE|SPLICE_F_MORE);}// 将数据从管道直接传输到目标文件描述符ssize_ttransferTo(intdest_fd,size_t len){returnsplice(pipefd[0],nullptr,// 源管道读端dest_fd,nullptr,// 目标文件描述符len,SPLICE_F_MOVE|SPLICE_F_MORE);}};

第五章:最佳实践与错误处理

5.1 RAII包装器

为了避免资源泄漏,使用RAII(资源获取即初始化)模式管理管道:

#include<memory>classPipeRAII{private:intpipefd[2];boolvalid;public:PipeRAII():valid(false){if(pipe(pipefd)==0){valid=true;}}~PipeRAII(){if(valid){close(pipefd[0]);close(pipefd[1]);}}// 删除拷贝构造函数和赋值运算符PipeRAII(constPipeRAII&)=delete;PipeRAII&operator=(constPipeRAII&)=delete;// 允许移动语义PipeRAII(PipeRAII&&other)noexcept:pipefd{other.pipefd[0],other.pipefd[1]},valid(other.valid){other.valid=false;}intreadEnd()const{returnvalid?pipefd[0]:-1;}intwriteEnd()const{returnvalid?pipefd[1]:-1;}explicitoperatorbool()const{returnvalid;}};// 使用智能指针管理classSafePipeManager{private:std::unique_ptr<PipeRAII>pipe;public:SafePipeManager():pipe(std::make_unique<PipeRAII>()){if(!*pipe){throwstd::runtime_error("管道创建失败");}}voidsendData(conststd::string&data){if(pipe){write(pipe->writeEnd(),data.c_str(),data.length());}}};

5.2 常见错误与处理

classRobustPipe{private:intpipefd[2];// 安全读取函数ssize_tsafeRead(void*buf,size_t count){ssize_t bytes_read;do{bytes_read=read(pipefd[0],buf,count);}while(bytes_read==-1&&errno==EINTR);// 处理信号中断returnbytes_read;}// 安全写入函数ssize_tsafeWrite(constvoid*buf,size_t count){ssize_t bytes_written;size_t total_written=0;constchar*ptr=static_cast<constchar*>(buf);while(total_written<count){do{bytes_written=write(pipefd[1],ptr+total_written,count-total_written);}while(bytes_written==-1&&errno==EINTR);if(bytes_written==-1){// 处理真正的错误if(errno==EPIPE){std::cerr<<"管道断裂:读端已关闭"<<std::endl;}return-1;}total_written+=bytes_written;}returntotal_written;}public:RobustPipe(){if(pipe(pipefd)==-1){// 检查具体错误switch(errno){caseEMFILE:throwstd::runtime_error("进程文件描述符耗尽");caseENFILE:throwstd::runtime_error("系统文件描述符耗尽");default:throwstd::runtime_error("未知管道创建错误");}}// 设置管道缓冲区大小(可选)intsize=65536;// 64KBfcntl(pipefd[0],F_SETPIPE_SZ,size);}};

第六章:实战应用案例

6.1 日志收集系统

#include<thread>#include<queue>#include<mutex>#include<condition_variable>classLogCollector{private:intlog_pipe[2];std::queue<std::string>log_queue;std::mutex queue_mutex;std::condition_variable queue_cv;std::thread worker_thread;boolrunning;voidworker(){charbuffer[4096];while(running){ssize_t bytes=read(log_pipe[0],buffer,sizeof(buffer)-1);if(bytes>0){buffer[bytes]='\0';std::stringlog_entry(buffer);{std::lock_guard<std::mutex>lock(queue_mutex);log_queue.push(log_entry);}queue_cv.notify_one();}}}public:LogCollector():running(true){if(pipe(log_pipe)==-1){throwstd::runtime_error("日志管道创建失败");}worker_thread=std::thread(&LogCollector::worker,this);}~LogCollector(){running=false;close(log_pipe[1]);// 关闭写端,使读端退出if(worker_thread.joinable()){worker_thread.join();}close(log_pipe[0]);}// 写入日志voidlog(conststd::string&message){write(log_pipe[1],message.c_str(),message.length());}// 获取日志(线程安全)std::stringgetLog(){std::unique_lock<std::mutex>lock(queue_mutex);queue_cv.wait(lock,[this]{return!log_queue.empty();});std::string log=log_queue.front();log_queue.pop();returnlog;}};

总结

管道编程是C++系统编程的重要部分,掌握它需要:

  1. 理解基本原理:文件描述符、缓冲区、阻塞行为
  2. 掌握核心API:pipe(), fork(), dup2(), read(), write()
  3. 学会高级技术:非阻塞IO、多路复用、零拷贝
  4. 遵循最佳实践:RAII管理、错误处理、资源清理

管道不仅是一种技术,更是一种设计哲学——它鼓励我们创建模块化、可组合的程序,这正是UNIX哲学的核心理念之一。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2025/12/17 2:23:06

模型推理 单多轮推理,gpu推理,lora推理和vllm(附代码示例)

模型推理 单多轮推理&#xff0c;gpu推理&#xff0c;lora推理和vllm 一、大语言模型推理基础 1. 推理与训练的核心差异 维度 模型训练 模型推理 硬件需求 需强大GPU集群、海量存储 硬件需求较低&#xff0c;支持CPU/GPU 计算逻辑 反向传播梯度下降&#xff0c;计算量大 仅前…

作者头像 李华
网站建设 2025/12/17 2:22:50

通信系统仿真:数字调制与解调技术_(34).同步技术

同步技术 同步技术在通信系统中起着至关重要的作用&#xff0c;特别是在数字调制与解调过程中。同步技术的目的是确保发送端和接收端在时间、频率和相位上保持一致&#xff0c;从而实现高效和准确的数据传输。本节将详细介绍同步技术的原理和内容&#xff0c;并通过具体的软件开…

作者头像 李华
网站建设 2025/12/17 2:22:02

介观交通流仿真软件:Aimsun Next_(7).行人行为模型

行人行为模型 行人行为模型在交通仿真中扮演着重要角色&#xff0c;尤其是在城市交通、大型活动、公共交通站点等场景中。Aimsun Next 提供了强大的行人行为建模功能&#xff0c;可以模拟行人在不同环境中的行为&#xff0c;包括行进、避让、等待等。本节将详细介绍如何在 Aim…

作者头像 李华
网站建设 2025/12/17 2:21:39

介观交通流仿真软件:DynusT_(1).DynusT基础介绍

DynusT基础介绍 1. DynusT概述 DynusT&#xff08;Dynamic Network User Equilibrium Model&#xff09;是一款介观交通流仿真软件&#xff0c;用于模拟城市交通网络中的动态交通流。与宏观数学模型和微观仿真软件相比&#xff0c;DynusT在时间和空间分辨率之间取得了平衡&…

作者头像 李华
网站建设 2025/12/17 2:21:11

so eazy!使用Netty和动态代理一键实现一个简单的RPC

RPC&#xff08;remote procedure call&#xff09;远程过程调用RPC是为了在分布式应用中&#xff0c;两台主机的Java进程进行通信&#xff0c;当A主机调用B主机的方法时&#xff0c;过程简洁&#xff0c;就像是调用自己进程里的方法一样。 RPC框架的职责就是&#xff0c;封装好…

作者头像 李华
网站建设 2025/12/17 2:21:09

Java性能优化通用方法都在这了!

作为一个程序员&#xff0c;性能优化是常有的事情&#xff0c;不管你是刚入行的小白还是已经入坑了很久的小秃头都会经历很多不同层次的性能优化——小到代码审查大到整个系统设计的优化&#xff01;大势所趋之下&#xff0c;如何让自己的优化方向精准到性能瓶颈的那个点以及尽…

作者头像 李华