Go语言的并发编程进阶
并发编程基础
Go语言的并发编程基于goroutine和channel,这使得并发编程变得简单而高效。本文将介绍Go语言并发编程的进阶概念和技巧,帮助开发者编写更复杂、更高效的并发程序。
高级通道操作
通道的关闭
package main import ( "fmt" "time" ) func main() { ch := make(chan int, 5) // 发送数据 go func() { for i := 0; i < 5; i++ { ch <- i time.Sleep(100 * time.Millisecond) } close(ch) // 关闭通道 }() // 接收数据 for value := range ch { fmt.Println("Received:", value) } fmt.Println("Channel closed") }带缓冲的通道
package main import ( "fmt" "time" ) func main() { // 创建带缓冲的通道 ch := make(chan int, 3) // 发送数据 go func() { for i := 0; i < 5; i++ { ch <- i fmt.Println("Sent:", i) } close(ch) }() // 接收数据 time.Sleep(500 * time.Millisecond) for value := range ch { fmt.Println("Received:", value) time.Sleep(200 * time.Millisecond) } }选择器(Select)
基本使用
package main import ( "fmt" "time" ) func main() { ch1 := make(chan string) ch2 := make(chan string) go func() { time.Sleep(1 * time.Second) ch1 <- "Message from channel 1" }() go func() { time.Sleep(2 * time.Second) ch2 <- "Message from channel 2" }() for i := 0; i < 2; i++ { select { case msg1 := <-ch1: fmt.Println("Received:", msg1) case msg2 := <-ch2: fmt.Println("Received:", msg2) case <-time.After(3 * time.Second): fmt.Println("Timeout") } } }非阻塞操作
package main import ( "fmt" "time" ) func main() { ch := make(chan int) // 非阻塞发送 select { case ch <- 1: fmt.Println("Sent successfully") default: fmt.Println("Channel is full, cannot send") } // 非阻塞接收 select { case value := <-ch: fmt.Println("Received:", value) default: fmt.Println("No data available") } // 启动goroutine发送数据 go func() { time.Sleep(100 * time.Millisecond) ch <- 2 }() // 带超时的接收 select { case value := <-ch: fmt.Println("Received:", value) case <-time.After(500 * time.Millisecond): fmt.Println("Timeout") } }并发模式
工作池模式
package main import ( "fmt" "sync" "time" ) func worker(id int, jobs <-chan int, results chan<- int) { for job := range jobs { fmt.Printf("Worker %d processing job %d\n", id, job) time.Sleep(500 * time.Millisecond) results <- job * 2 } } func main() { const numJobs = 10 const numWorkers = 3 jobs := make(chan int, numJobs) results := make(chan int, numJobs) // 启动工作池 var wg sync.WaitGroup for i := 1; i <= numWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() worker(workerID, jobs, results) }(i) } // 发送任务 for i := 1; i <= numJobs; i++ { jobs <- i } close(jobs) // 等待所有工作完成 wg.Wait() close(results) // 收集结果 for result := range results { fmt.Println("Result:", result) } }扇入扇出模式
package main import ( "fmt" "sync" "time" ) func producer(id int, out chan<- int) { for i := 0; i < 5; i++ { value := id*10 + i out <- value fmt.Printf("Producer %d produced %d\n", id, value) time.Sleep(100 * time.Millisecond) } } func fanIn(channels ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup for _, ch := range channels { wg.Add(1) go func(c <-chan int) { defer wg.Done() for value := range c { out <- value } }(ch) } go func() { wg.Wait() close(out) }() return out } func main() { ch1 := make(chan int) ch2 := make(chan int) ch3 := make(chan int) // 启动生产者 go producer(1, ch1) go producer(2, ch2) go producer(3, ch3) // 扇入 merged := fanIn(ch1, ch2, ch3) // 消费合并后的数据 for value := range merged { fmt.Println("Consumed:", value) } }互斥锁和读写锁
互斥锁
package main import ( "fmt" "sync" "time" ) type Counter struct { value int mu sync.Mutex } func (c *Counter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.value++ } func (c *Counter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.value } func main() { counter := &Counter{} var wg sync.WaitGroup // 启动多个goroutine递增计数器 for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() counter.Increment() }() } wg.Wait() fmt.Println("Final value:", counter.Value()) }读写锁
package main import ( "fmt" "sync" "time" ) type DataStore struct { data map[string]string rwmu sync.RWMutex } func (ds *DataStore) Set(key, value string) { ds.rwmu.Lock() defer ds.rwmu.Unlock() ds.data[key] = value } func (ds *DataStore) Get(key string) string { ds.rwmu.RLock() defer ds.rwmu.RUnlock() return ds.data[key] } func main() { ds := &DataStore{data: make(map[string]string)} var wg sync.WaitGroup // 启动多个读goroutine for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 100; j++ { key := fmt.Sprintf("key%d", j%10) value := ds.Get(key) fmt.Printf("Reader %d got %s: %s\n", id, key, value) time.Sleep(10 * time.Millisecond) } }(i) } // 启动写goroutine for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 20; j++ { key := fmt.Sprintf("key%d", j%10) value := fmt.Sprintf("value%d-%d", id, j) ds.Set(key, value) fmt.Printf("Writer %d set %s: %s\n", id, key, value) time.Sleep(50 * time.Millisecond) } }(i) } wg.Wait() fmt.Println("Done") }原子操作
package main import ( "fmt" "sync" "sync/atomic" ) func main() { var counter int64 var wg sync.WaitGroup // 启动多个goroutine递增计数器 for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() atomic.AddInt64(&counter, 1) }() } wg.Wait() fmt.Println("Final value:", atomic.LoadInt64(&counter)) }上下文(Context)
基本使用
package main import ( "context" "fmt" "time" ) func worker(ctx context.Context, id int) { for { select { case <-ctx.Done(): fmt.Printf("Worker %d received cancel signal\n", id) return default: fmt.Printf("Worker %d working\n", id) time.Sleep(500 * time.Millisecond) } } } func main() { // 创建可取消的上下文 ctx, cancel := context.WithCancel(context.Background()) // 启动多个工作goroutine for i := 1; i <= 3; i++ { go worker(ctx, i) } // 运行一段时间后取消 time.Sleep(2 * time.Second) fmt.Println("Cancelling work") cancel() // 等待一段时间观察结果 time.Sleep(1 * time.Second) fmt.Println("Done") }带超时的上下文
package main import ( "context" "fmt" "time" ) func worker(ctx context.Context, id int) { for { select { case <-ctx.Done(): fmt.Printf("Worker %d received cancel signal: %v\n", id, ctx.Err()) return default: fmt.Printf("Worker %d working\n", id) time.Sleep(500 * time.Millisecond) } } } func main() { // 创建带超时的上下文 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() // 启动工作goroutine go worker(ctx, 1) // 等待 time.Sleep(3 * time.Second) fmt.Println("Done") }示例:并发下载器
package main import ( "context" "fmt" "io" "net/http" "os" "path/filepath" "sync" "time" ) func downloadFile(ctx context.Context, url string, dest string, wg *sync.WaitGroup) { defer wg.Done() // 创建目标目录 dir := filepath.Dir(dest) if err := os.MkdirAll(dir, 0755); err != nil { fmt.Printf("Error creating directory: %v\n", err) return } // 创建HTTP请求 req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { fmt.Printf("Error creating request: %v\n", err) return } // 发送请求 client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(req) if err != nil { fmt.Printf("Error downloading %s: %v\n", url, err) return } defer resp.Body.Close() // 检查响应状态 if resp.StatusCode != http.StatusOK { fmt.Printf("Error downloading %s: status code %d\n", url, resp.StatusCode) return } // 创建目标文件 file, err := os.Create(dest) if err != nil { fmt.Printf("Error creating file: %v\n", err) return } defer file.Close() // 复制内容 _, err = io.Copy(file, resp.Body) if err != nil { fmt.Printf("Error saving file: %v\n", err) return } fmt.Printf("Downloaded %s to %s\n", url, dest) } func main() { // 下载任务 tasks := []struct { url string dest string }{ {"https://example.com", "downloads/example.html"}, {"https://golang.org", "downloads/golang.html"}, {"https://github.com", "downloads/github.html"}, {"https://stackoverflow.com", "downloads/stackoverflow.html"}, {"https://go.dev", "downloads/go.html"}, } // 创建上下文 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() // 启动下载 var wg sync.WaitGroup for _, task := range tasks { wg.Add(1) go downloadFile(ctx, task.url, task.dest, &wg) } // 等待所有下载完成 wg.Wait() fmt.Println("All downloads completed") }并发编程最佳实践
- 优先使用通道进行goroutine间通信
- 使用context管理goroutine的生命周期
- 避免使用共享内存,优先使用通道传递数据
- 使用互斥锁和读写锁保护共享资源
- 对于简单的计数器,使用原子操作
- 合理设置通道缓冲区大小
- 使用工作池模式管理并发任务
- 避免goroutine泄漏
总结
Go语言的并发编程功能强大且易于使用,通过掌握高级通道操作、选择器、并发模式和同步原语,可以编写高效、可靠的并发程序。在实际开发中,应该根据具体场景选择合适的并发模式,确保程序的正确性和性能。