docs(Go concurrency): 添加 Go 并发模型文档侧边栏配置
- 在侧边栏配置文件中新增“Go并发模型”章节 - 包含 Goroutine 与 GPM 调度模型等相关多个文档链接 - 移除“08Go 并发入门Goroutine 基础 GPM 线程模型实战.md”文件内容 - 优化文档结构,支持并发模型内容折叠显示与导航
This commit is contained in:
@@ -95,6 +95,21 @@ export default sidebar({
|
||||
"07从零实现 Mini 日志库.md",
|
||||
],
|
||||
},
|
||||
{
|
||||
text: "Go并发模型",
|
||||
icon: "mdi:run-fast",
|
||||
collapsible: true,
|
||||
prefix: "Go并发模型/",
|
||||
children: [
|
||||
"08Goroutine与GPM调度模型.md",
|
||||
"09Channel与单向Channel.md",
|
||||
"10select与超时控制.md",
|
||||
"11context取消与超时.md",
|
||||
"12Mutex与WaitGroup.md",
|
||||
"13atomic原子操作.md",
|
||||
"14并发爬虫实战.md",
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
title: Go 并发入门:Goroutine 基础与 GPM 调度模型实战解析
|
||||
icon: go
|
||||
---
|
||||
title: Goroutine与GPM调度模型
|
||||
icon: mdi:hexagon-multiple-outline
|
||||
date: 2025-12-11
|
||||
category:
|
||||
- Go
|
||||
341
src/programming/backend/go/Go并发模型/09Channel与单向Channel.md
Normal file
341
src/programming/backend/go/Go并发模型/09Channel与单向Channel.md
Normal file
@@ -0,0 +1,341 @@
|
||||
---
|
||||
title: Channel与单向Channel
|
||||
icon: mdi:pipe
|
||||
date: 2025-12-12
|
||||
category:
|
||||
- Go
|
||||
- 并发编程
|
||||
tag:
|
||||
- channel
|
||||
- 无缓冲 channel
|
||||
- 有缓冲 channel
|
||||
- 单向 channel
|
||||
star: true
|
||||
---
|
||||
|
||||
本篇是你 Go 并发学习的 **第 9 天:专注 Channel 通信**,结合你的环境给出可直接实操的练习示例。
|
||||
|
||||
- **操作系统**:Linux Mint XFCE
|
||||
- **Go 版本**:go1.22.2 linux/amd64
|
||||
- **项目目录示例**:`/home/liumangmang/GolandProjects/go-channel-practice`
|
||||
|
||||
<!-- more -->
|
||||
|
||||
---
|
||||
|
||||
## 📌 标题
|
||||
|
||||
# Go 并发进阶:Channel(无缓冲/有缓冲)与单向 Channel 实战
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 1:创建练习项目
|
||||
|
||||
在终端中执行:
|
||||
|
||||
```bash
|
||||
cd /home/liumangmang/GolandProjects
|
||||
mkdir go-channel-practice && cd go-channel-practice
|
||||
go mod init go-channel-practice
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 2:无缓冲 channel 基础示例
|
||||
|
||||
创建 `unbuffered.go`:
|
||||
|
||||
```bash
|
||||
nano unbuffered.go
|
||||
```
|
||||
|
||||
粘贴以下代码,体验 **无缓冲 channel 的同步特性**:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ch := make(chan string) // 无缓冲 channel
|
||||
|
||||
go func() {
|
||||
fmt.Println("[Sender] 准备发送数据...")
|
||||
ch <- "hello from goroutine" // 这里会阻塞,直到有人接收
|
||||
fmt.Println("[Sender] 数据发送完毕")
|
||||
}()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
fmt.Println("[Main] 1 秒后开始接收数据...")
|
||||
|
||||
msg := <-ch // 接收数据,同时解除发送方阻塞
|
||||
fmt.Println("[Main] 收到:", msg)
|
||||
|
||||
fmt.Println("[Main] 程序结束")
|
||||
}
|
||||
```
|
||||
|
||||
### 运行 & 观察
|
||||
|
||||
```bash
|
||||
go run unbuffered.go
|
||||
```
|
||||
|
||||
你大概率会看到类似输出(时间顺序很关键):
|
||||
|
||||
```text
|
||||
[Sender] 准备发送数据...
|
||||
[Main] 1 秒后开始接收数据...
|
||||
[Main] 收到: hello from goroutine
|
||||
[Sender] 数据发送完毕
|
||||
[Main] 程序结束
|
||||
```
|
||||
|
||||
**关键理解:**
|
||||
|
||||
- **无缓冲 channel:发送和接收必须“同时就位”** 才能完成一次传输。
|
||||
- `ch <- value` 会阻塞,直到有 `value := <-ch` 在等待。
|
||||
- 这一特性非常适合做 **goroutine 间的同步**。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 3:有缓冲 channel 示例
|
||||
|
||||
创建 `buffered.go`:
|
||||
|
||||
```bash
|
||||
nano buffered.go
|
||||
```
|
||||
|
||||
粘贴以下代码,体验 **有缓冲 channel 的“容量”效果**:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ch := make(chan int, 3) // 容量为 3 的有缓冲 channel
|
||||
|
||||
fmt.Println("[Main] 开始发送 3 个元素...")
|
||||
ch <- 1
|
||||
fmt.Println("[Main] 已发送 1")
|
||||
ch <- 2
|
||||
fmt.Println("[Main] 已发送 2")
|
||||
ch <- 3
|
||||
fmt.Println("[Main] 已发送 3 (已满)")
|
||||
|
||||
// 再发送一个会怎样?
|
||||
go func() {
|
||||
fmt.Println("[Sender] 尝试发送第 4 个元素(会阻塞,直到有接收者)...")
|
||||
ch <- 4
|
||||
fmt.Println("[Sender] 第 4 个元素发送成功")
|
||||
}()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
fmt.Println("[Main] 开始接收...")
|
||||
for i := 0; i < 4; i++ {
|
||||
v := <-ch
|
||||
fmt.Println("[Main] 收到:", v)
|
||||
}
|
||||
|
||||
fmt.Println("[Main] 程序结束")
|
||||
}
|
||||
```
|
||||
|
||||
### 运行 & 观察
|
||||
|
||||
```bash
|
||||
go run buffered.go
|
||||
```
|
||||
|
||||
**核心对比:**
|
||||
|
||||
- 有缓冲 channel 在 **未填满之前**,发送不会阻塞。
|
||||
- 当缓冲区 **满了之后**,发送会阻塞,直到有接收方读取。
|
||||
- 这样可以在 **异步生产/消费** 场景中,减少 goroutine 的等待时间。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 4:无缓冲 vs 有缓冲 对比小结
|
||||
|
||||
| 特性 | 无缓冲 channel | 有缓冲 channel |
|
||||
|----------------|--------------------------|------------------------------------|
|
||||
| 创建方式 | `make(chan T)` | `make(chan T, N)` |
|
||||
| 是否有缓存 | 否 | 是(容量为 N) |
|
||||
| 发送是否阻塞 | 一定阻塞,直到有人接收 | 未满时不阻塞,满了才阻塞 |
|
||||
| 适用场景 | 强同步、事件通知 | 异步队列、生产者-消费者、缓冲数据 |
|
||||
|
||||
> **实践建议**:
|
||||
> - 如果你想表达“发送和接收必须同步发生”,优先用 **无缓冲 channel**。
|
||||
> - 如果你想“暂存一些数据,解耦生产和消费速度”,考虑用 **有缓冲 channel**。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 5:单向 channel(只发 / 只收)
|
||||
|
||||
单向 channel 不是新类型,而是对 **函数参数** 做的“能力限制”,用于表达更清晰的意图:
|
||||
|
||||
- **只发送**:`chan<- T`
|
||||
- **只接收**:`<-chan T`
|
||||
|
||||
创建 `directional.go`:
|
||||
|
||||
```bash
|
||||
nano directional.go
|
||||
```
|
||||
|
||||
粘贴以下代码:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import "fmt"
|
||||
|
||||
// 只负责发送数据
|
||||
func producer(out chan<- int) {
|
||||
for i := 1; i <= 5; i++ {
|
||||
fmt.Println("[Producer] 发送:", i)
|
||||
out <- i
|
||||
}
|
||||
fmt.Println("[Producer] 关闭 channel")
|
||||
close(out) // 只有发送方才能关闭
|
||||
}
|
||||
|
||||
// 只负责接收数据
|
||||
func consumer(in <-chan int) {
|
||||
for v := range in { // 直到 channel 被关闭
|
||||
fmt.Println("[Consumer] 接收:", v)
|
||||
}
|
||||
fmt.Println("[Consumer] channel 已关闭,接收结束")
|
||||
}
|
||||
|
||||
func main() {
|
||||
ch := make(chan int)
|
||||
|
||||
go producer(ch) // ch 在这里被当作 只发送 channel 使用
|
||||
consumer(ch) // ch 在这里被当作 只接收 channel 使用
|
||||
}
|
||||
```
|
||||
|
||||
### 运行
|
||||
|
||||
```bash
|
||||
go run directional.go
|
||||
```
|
||||
|
||||
你会看到生产者发送 1~5,消费者依次接收,并在 channel 关闭后退出循环。
|
||||
|
||||
### 单向 channel 的价值
|
||||
|
||||
- **约束函数的职责**:
|
||||
- `producer` 只能发送(写),不能接收(读)。
|
||||
- `consumer` 只能接收(读),不能发送(写)。
|
||||
- **提高可读性**:别人一看函数签名就知道它的用途。
|
||||
- **减少误用**:编译器会阻止错误使用(比如在只读 channel 上发送)。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 6:综合小实验:有缓冲 + 单向 channel
|
||||
|
||||
创建 `pipeline.go`,实现一个简单“生产者 → 处理者 → 消费者”的流水线:
|
||||
|
||||
```bash
|
||||
nano pipeline.go
|
||||
```
|
||||
|
||||
粘贴以下代码:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import "fmt"
|
||||
|
||||
// 生产者:产生 1~5
|
||||
func producer(out chan<- int) {
|
||||
for i := 1; i <= 5; i++ {
|
||||
fmt.Println("[Producer] 发送:", i)
|
||||
out <- i
|
||||
}
|
||||
close(out)
|
||||
}
|
||||
|
||||
// 处理者:把数字放大 10 倍
|
||||
func multiplier(in <-chan int, out chan<- int) {
|
||||
for v := range in {
|
||||
fmt.Println("[Multiplier] 接收:", v)
|
||||
out <- v * 10
|
||||
}
|
||||
close(out)
|
||||
}
|
||||
|
||||
// 消费者:打印结果
|
||||
func consumer(in <-chan int) {
|
||||
for v := range in {
|
||||
fmt.Println("[Consumer] 最终结果:", v)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
ch1 := make(chan int, 2) // 有缓冲,减轻 producer 阻塞
|
||||
ch2 := make(chan int, 2)
|
||||
|
||||
go producer(ch1)
|
||||
go multiplier(ch1, ch2)
|
||||
consumer(ch2)
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run pipeline.go
|
||||
```
|
||||
|
||||
这个例子同时用到了:
|
||||
|
||||
- 有缓冲 channel(`ch1`, `ch2`)
|
||||
- 单向 channel 函数参数
|
||||
- `range channel` + `close` 实现优雅退出
|
||||
|
||||
---
|
||||
|
||||
## ⚠️ 常见坑与排查方式
|
||||
|
||||
- **坑 1:fatal error: all goroutines are asleep - deadlock!**
|
||||
- 常见原因:
|
||||
- 无缓冲 channel 上只有发送,没有接收(或反之)。
|
||||
- 有缓冲 channel 被写满后,没有消费者读取。
|
||||
- 排查建议:
|
||||
- 检查每一个 `ch <-` 是否对应至少一个接收方。
|
||||
- 检查是否在合适的地方 `close(channel)` 让 `range` 能够退出。
|
||||
|
||||
- **坑 2:错误关闭 channel**
|
||||
- 只有 **发送方** 应该关闭 channel。
|
||||
- 不要在多个 goroutine 中同时 `close` 同一个 channel。
|
||||
|
||||
- **坑 3:误用单向 channel**
|
||||
- 记住:单向 channel 多用在 **函数参数**,一般不会在变量定义时直接写成单向。
|
||||
|
||||
---
|
||||
|
||||
## 📚 今日小结与思考题
|
||||
|
||||
- **你已经掌握:**
|
||||
- 无缓冲 channel:发送接收必须配对,同步传递数据。
|
||||
- 有缓冲 channel:可以暂存 N 个元素,适合异步场景。
|
||||
- 单向 channel:通过类型约束函数职责,提升代码可读性与安全性。
|
||||
|
||||
- **思考 & 练习:**
|
||||
1. 修改 `buffered.go`,把缓冲区从 3 改成 1、10,观察阻塞行为的变化。
|
||||
2. 在 `pipeline.go` 中增加一个新的阶段,例如:过滤掉奇数或小于 30 的数字。
|
||||
3. 尝试使用 `time.Sleep` 人为制造“生产速度远快于消费”的情况,观察有缓冲 channel 的效果。
|
||||
|
||||
如果你愿意,下一天我们可以继续练 **`select` 多路复用 + 超时控制 + context 取消**,构建更真实的并发场景 🚀
|
||||
399
src/programming/backend/go/Go并发模型/10select与超时控制.md
Normal file
399
src/programming/backend/go/Go并发模型/10select与超时控制.md
Normal file
@@ -0,0 +1,399 @@
|
||||
---
|
||||
title: select与超时控制
|
||||
icon: mdi:source-branch
|
||||
date: 2025-12-13
|
||||
category:
|
||||
- Go
|
||||
- 并发编程
|
||||
tag:
|
||||
- select
|
||||
- channel 超时
|
||||
- 多路复用
|
||||
- context
|
||||
star: true
|
||||
---
|
||||
|
||||
本篇是你 Go 并发学习的 **第 10 天:select + default、多路复用与 channel 超时控制**,内容会尽量结合 Java 的类比来帮助理解。
|
||||
|
||||
- **你的背景**:Java 程序员
|
||||
- **操作系统**:Linux Mint XFCE
|
||||
- **Go 版本**:go1.22.2 linux/amd64
|
||||
- **项目目录示例**:`/home/liumangmang/GolandProjects/go-select-practice`
|
||||
|
||||
<!-- more -->
|
||||
|
||||
---
|
||||
|
||||
## 📌 标题
|
||||
|
||||
# Go 并发进阶:select、多路复用与 channel 超时控制
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 1:创建练习项目
|
||||
|
||||
在终端中执行:
|
||||
|
||||
```bash
|
||||
cd /home/liumangmang/GolandProjects
|
||||
mkdir go-select-practice && cd go-select-practice
|
||||
go mod init go-select-practice
|
||||
```
|
||||
|
||||
> 如果你更喜欢放在之前的项目里(比如第 9 天的 `go-channel-practice`),直接在原项目中新建几个 `.go` 文件也可以,Go 模块不必每次重建。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 2:select 基本用法 —— 在多个 channel 之间“抢先处理”
|
||||
|
||||
**Java 类比:**
|
||||
|
||||
- 把 **goroutine** 想成 Java 的 **Thread / Runnable**。
|
||||
- 把 **channel** 想成 Java 的 **BlockingQueue**(带类型、安全阻塞)。
|
||||
- 而 **`select`** 有点像:
|
||||
- 同时在多个 BlockingQueue 上做 `take()` / `poll()`,哪个先有数据就先处理。
|
||||
- 再带一点 `switch` 语法糖的味道。
|
||||
|
||||
创建 `select_basic.go`:
|
||||
|
||||
```bash
|
||||
nano select_basic.go
|
||||
```
|
||||
|
||||
粘贴下面代码:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ch1 := make(chan string)
|
||||
ch2 := make(chan string)
|
||||
|
||||
// 模拟两个不同来源的“数据源”
|
||||
go func() {
|
||||
time.Sleep(1 * time.Second)
|
||||
ch1 <- "result from ch1 (1s)"
|
||||
}()
|
||||
|
||||
go func() {
|
||||
time.Sleep(2 * time.Second)
|
||||
ch2 <- "result from ch2 (2s)"
|
||||
}()
|
||||
|
||||
fmt.Println("等待 ch1 或 ch2 的结果...(谁先来处理谁)")
|
||||
|
||||
select {
|
||||
case v := <-ch1:
|
||||
fmt.Println("收到 ch1:", v)
|
||||
case v := <-ch2:
|
||||
fmt.Println("收到 ch2:", v)
|
||||
}
|
||||
|
||||
fmt.Println("main 结束")
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run select_basic.go
|
||||
```
|
||||
|
||||
一般会输出:
|
||||
|
||||
```text
|
||||
等待 ch1 或 ch2 的结果...(谁先来处理谁)
|
||||
收到 ch1: result from ch1 (1s)
|
||||
main 结束
|
||||
```
|
||||
|
||||
**关键点:**
|
||||
|
||||
- `select` 会**同时监听**多个 channel:
|
||||
- 哪个 `case` 可以立刻执行(比如有数据可读),就选哪个。
|
||||
- 多个 `case` 都准备好了时,会随机选一个(避免饥饿)。
|
||||
- 从 Java 视角:省掉了你手动写一堆 `if (queue1 有数据) {}`、`else if (queue2 有数据) {}` 的轮询代码。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 3:select + default —— 非阻塞轮询(Java 中的 `poll()`)
|
||||
|
||||
有时你不想在 `select` 上一直**阻塞等待**,而是:
|
||||
|
||||
- 如果当前没数据,就先去干点别的事情(日志、监控、心跳)。
|
||||
|
||||
这时可以用 `default` 分支,它在 **没有任何 case 就绪时立刻执行**。
|
||||
|
||||
创建 `select_default.go`:
|
||||
|
||||
```bash
|
||||
nano select_default.go
|
||||
```
|
||||
|
||||
粘贴下面代码:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ch := make(chan int)
|
||||
|
||||
go func() {
|
||||
for i := 1; i <= 5; i++ {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
ch <- i
|
||||
}
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case v, ok := <-ch:
|
||||
if !ok {
|
||||
fmt.Println("channel 已关闭,退出循环")
|
||||
return
|
||||
}
|
||||
fmt.Println("收到:", v)
|
||||
default:
|
||||
// 没有数据可读时,做点“其他事”
|
||||
fmt.Println("没有新数据,先忙点别的...")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run select_default.go
|
||||
```
|
||||
|
||||
你会看到“没有新数据...”和“收到: x”交替出现。
|
||||
|
||||
**Java 类比:**
|
||||
|
||||
- 默认 `BlockingQueue.take()` 是阻塞的,对应 Go 里**没有 default 的 select**。
|
||||
- 如果你在 Java 中用 `queue.poll(0, TimeUnit.MILLISECONDS)` 或直接 `queue.poll()` 非阻塞拿一次,就类似 Go 的:
|
||||
|
||||
```go
|
||||
select {
|
||||
case v := <-ch:
|
||||
// 有数据
|
||||
default:
|
||||
// 没有数据,立即返回
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 4:使用 select + time.After 实现超时
|
||||
|
||||
在 Java 里你可能写过:
|
||||
|
||||
- `future.get(2, TimeUnit.SECONDS)`
|
||||
- `socket.setSoTimeout(2000)`
|
||||
- 或者用 `ScheduledExecutorService` 做超时控制。
|
||||
|
||||
在 Go 里,一个非常常见的模式是:
|
||||
|
||||
- 使用 `time.After(duration)` 得到一个 **在 duration 后会“自动发送当前时间”的 channel**。
|
||||
- 然后在 `select` 里加一个 `case <-time.After(...)` 分支。
|
||||
|
||||
创建 `select_timeout.go`:
|
||||
|
||||
```bash
|
||||
nano select_timeout.go
|
||||
```
|
||||
|
||||
粘贴下面代码:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 模拟一个可能很慢的操作
|
||||
func slowOperation() (string, error) {
|
||||
time.Sleep(3 * time.Second) // 假设真的很慢
|
||||
return "slow result", nil
|
||||
}
|
||||
|
||||
func doWithTimeout(timeout time.Duration) (string, error) {
|
||||
resultCh := make(chan string, 1)
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
res, err := slowOperation()
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
resultCh <- res
|
||||
}()
|
||||
|
||||
select {
|
||||
case res := <-resultCh:
|
||||
return res, nil
|
||||
case err := <-errCh:
|
||||
return "", err
|
||||
case <-time.After(timeout):
|
||||
return "", errors.New("操作超时")
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
fmt.Println("开始调用,最大等待 2 秒...")
|
||||
res, err := doWithTimeout(2 * time.Second)
|
||||
if err != nil {
|
||||
fmt.Println("失败:", err)
|
||||
return
|
||||
}
|
||||
fmt.Println("成功:", res)
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run select_timeout.go
|
||||
```
|
||||
|
||||
你会发现 `slowOperation` 需要 3 秒,但我们只等 2 秒就返回了“操作超时”。
|
||||
|
||||
**Java 类比:**
|
||||
|
||||
- `doWithTimeout` 很像你在 Java 里自己封装的:
|
||||
|
||||
```java
|
||||
Future<String> f = executor.submit(this::slowOperation);
|
||||
try {
|
||||
return f.get(2, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
// 超时
|
||||
}
|
||||
```
|
||||
|
||||
- 不同点是:Go 用 `goroutine + channel + select` 组合来表达这个超时逻辑。
|
||||
|
||||
> 思考:当前实现里,超时后 goroutine 仍在后台跑完 `slowOperation`,只是结果被我们“丢掉”了。想真正取消它,就需要 `context`。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 5:配合 context 做“可取消”的超时任务
|
||||
|
||||
在 Java 里,如果你用的是 `Future`,可以 `future.cancel(true)` 尝试中断线程;如果用的是 Reactor / RxJava,会有 `dispose()`、`cancel()`。
|
||||
|
||||
在 Go 里,常见是用 `context.Context`+`select` 做“优雅取消”。
|
||||
|
||||
创建 `select_context.go`:
|
||||
|
||||
```bash
|
||||
nano select_context.go
|
||||
```
|
||||
|
||||
粘贴下面代码:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 模拟一个可被取消的操作
|
||||
func doWork(ctx context.Context) error {
|
||||
for i := 1; i <= 5; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// context 被取消或超时
|
||||
fmt.Println("doWork 被取消:", ctx.Err())
|
||||
return ctx.Err()
|
||||
default:
|
||||
fmt.Println("工作中 step", i)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
fmt.Println("doWork 正常完成")
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
fmt.Println("开始工作,最长 3 秒...")
|
||||
|
||||
if err := doWork(ctx); err != nil {
|
||||
fmt.Println("结束,原因:", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("结束: 正常完成")
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run select_context.go
|
||||
```
|
||||
|
||||
你会看到大约做 3 步左右,`context` 超时导致 `doWork` 提前退出,而不是做完 5 步。
|
||||
|
||||
**Java 类比:**
|
||||
|
||||
- 可以类比为:
|
||||
- 上游代码持有一个“取消令牌”(类似 RxJava 的 `Disposable`,或一般框架里的 `CancellationToken`)。
|
||||
- 业务逻辑中每一段都检查一下“是否被取消”。
|
||||
- Go 的 `context.Context` 就是标准化的“取消信号 + 截止时间 + 额外参数”的组合。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 6:从 Java 视角整体对比思维方式
|
||||
|
||||
- **goroutine vs Thread**:
|
||||
- goroutine 更轻,多数时候你可以“想到就开”,由 runtime 调度。
|
||||
- **channel vs BlockingQueue**:
|
||||
- 都是“带阻塞语义的安全队列”,区别是 channel 更轻量,类型约束更强。
|
||||
- **select vs 手写轮询**:
|
||||
- Java 中你可能会在多个队列/Socket 上手动轮询;
|
||||
- Go 中用 `select` 就能自然表达“谁先就绪就先处理谁”。
|
||||
- **time.After / context.WithTimeout vs Future.get(timeout)**:
|
||||
- 目的相同:都在说“最多等这么久,超过就算失败/超时”。
|
||||
- 表达方式不同:Go 借助 channel 统一到 `select` 上来处理。
|
||||
|
||||
> 心法:**把“等待某件事发生”统一建模为“等待某个 channel 有消息 / 关闭”,然后用 `select` 组合多个等待条件。**
|
||||
|
||||
---
|
||||
|
||||
## 📚 今日小结与练习
|
||||
|
||||
- **你已经掌握:**
|
||||
- `select` 在多个 channel 之间抢先处理。
|
||||
- `select + default` 实现非阻塞轮询(类似 Java 的 `poll()`)。
|
||||
- `select + time.After` 实现超时控制(类似 `Future.get(timeout)`)。
|
||||
- `select + context` 实现可取消、可超时的长时间任务。
|
||||
|
||||
- **推荐练习:**
|
||||
1. 修改 `select_basic.go`,让 `ch2` 更快(1s)而 `ch1` 更慢(2s),体会 `select` 会优先哪个。
|
||||
2. 在 `select_timeout.go` 中增加“重试 3 次再放弃”的逻辑。
|
||||
3. 在 `select_context.go` 中,让 `doWork` 再开启子 goroutine,并把同一个 `ctx` 传下去,体会“整条调用链都能感知到取消”。
|
||||
|
||||
如果你愿意,**第 11 天** 我们可以基于这些内容,带你实现一个完整的 **worker pool / 任务分发系统**,对标 Java 里的 `ThreadPoolExecutor`,帮助你把 Go 并发和 Java 并发打通。
|
||||
273
src/programming/backend/go/Go并发模型/11context取消与超时.md
Normal file
273
src/programming/backend/go/Go并发模型/11context取消与超时.md
Normal file
@@ -0,0 +1,273 @@
|
||||
---
|
||||
title: context取消与超时
|
||||
icon: mdi:timer-cancel-outline
|
||||
date: 2025-12-14
|
||||
category:
|
||||
- Go
|
||||
- 并发编程
|
||||
tag:
|
||||
- context
|
||||
- 取消任务
|
||||
- 超时控制
|
||||
- Java Future 对比
|
||||
star: true
|
||||
---
|
||||
|
||||
本篇是你 Go 并发学习的 **第 11 天:context 取消任务、超时、派生 context**,会大量结合 **Java Future / CompletableFuture / 线程中断** 来对比讲解。
|
||||
|
||||
- **你的背景**:Java 程序员
|
||||
- **操作系统**:Linux Mint XFCE
|
||||
- **Go 版本**:go1.22.2 linux/amd64
|
||||
- **项目目录示例**:`/home/liumangmang/GolandProjects/go-context-practice`
|
||||
|
||||
<!-- more -->
|
||||
|
||||
---
|
||||
|
||||
## 📌 标题
|
||||
|
||||
# Go 并发进阶:context 取消任务、超时与派生 Context(对标 Java Future)
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 1:创建练习项目
|
||||
|
||||
```bash
|
||||
cd /home/liumangmang/GolandProjects
|
||||
mkdir go-context-practice && cd go-context-practice
|
||||
go mod init go-context-practice
|
||||
```
|
||||
|
||||
> 你也可以复用前几天的项目,只要在同一个 Go module 里新建 `.go` 文件即可。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 2:context.WithCancel —— 类比 Java 的“取消令牌”
|
||||
|
||||
**Java 类比:**
|
||||
|
||||
- 好比你手里有一个 `CancellationToken`,传给各个线程;
|
||||
- 当某个时刻调用 `token.cancel()`,所有线程都会检测到“被取消了”,然后主动退出。
|
||||
|
||||
创建 `cancel_basic.go`:
|
||||
|
||||
```bash
|
||||
nano cancel_basic.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 模拟一个可被取消的循环任务
|
||||
func worker(ctx context.Context, name string) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println(name, "收到取消信号:", ctx.Err())
|
||||
return
|
||||
default:
|
||||
fmt.Println(name, "还在干活...")
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go worker(ctx, "worker-1")
|
||||
go worker(ctx, "worker-2")
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
fmt.Println("main: 决定取消所有 worker")
|
||||
cancel() // 发出取消信号
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
fmt.Println("main 结束")
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run cancel_basic.go
|
||||
```
|
||||
|
||||
你会看到两个 worker 一开始持续打印“还在干活...”,当 `cancel()` 被调用后,都会打印“收到取消信号”。
|
||||
|
||||
> 对比 Java:相当于你在循环里不断检查 `Thread.currentThread().isInterrupted()`,一旦发现中断标记就退出;只不过 Go 用的是 `ctx.Done()` channel 来统一表达取消。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 3:context.WithTimeout / context.WithDeadline —— 类比 Future.get(timeout)
|
||||
|
||||
**Java 类比:**
|
||||
|
||||
- `future.get(2, TimeUnit.SECONDS)`:超过 2 秒还没完成就抛 TimeoutException;
|
||||
- Go 里常见模式:用 `context.WithTimeout` 传给函数,由函数内部决定是否提前结束。
|
||||
|
||||
创建 `timeout_with_context.go`:
|
||||
|
||||
```bash
|
||||
nano timeout_with_context.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 模拟一个可能很慢的操作
|
||||
func slowJob(ctx context.Context) error {
|
||||
for i := 1; i <= 5; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("slowJob 被取消:", ctx.Err())
|
||||
return ctx.Err()
|
||||
default:
|
||||
fmt.Println("slowJob 进行中 step", i)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
fmt.Println("slowJob 正常完成")
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
// 最多给 slowJob 3 秒时间
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
fmt.Println("开始执行 slowJob,超时时间 3 秒...")
|
||||
if err := slowJob(ctx); err != nil {
|
||||
fmt.Println("结束,原因:", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("结束:正常完成")
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run timeout_with_context.go
|
||||
```
|
||||
|
||||
你会看到 `slowJob` 大约只执行 3 步,就因为 context 超时而退出。
|
||||
|
||||
> 对比 Java:就像 `future.get(3, TimeUnit.SECONDS)` 抛了 TimeoutException,只不过在 Go 里,**业务逻辑自己决定如何处理超时**(比如打印日志、回滚状态、释放资源等)。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 4:派生 Context:上下游“任务树”的取消传播
|
||||
|
||||
**Java 类比:**
|
||||
|
||||
- Web 请求入口创建一个“请求上下文”;
|
||||
- 请求处理过程中再创建各种“子任务”,都共享相同的取消信号;
|
||||
- 当用户取消请求时,整棵调用树都应该尽快结束。
|
||||
|
||||
创建 `derived_context.go`:
|
||||
|
||||
```bash
|
||||
nano derived_context.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func subTask(ctx context.Context, name string, d time.Duration) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println(name, "提前被取消:", ctx.Err())
|
||||
case <-time.After(d):
|
||||
fmt.Println(name, "完成,用时", d)
|
||||
}
|
||||
}
|
||||
|
||||
func mainTask(ctx context.Context) {
|
||||
// 从上游 ctx 派生两个子 context
|
||||
ctx1, cancel1 := context.WithCancel(ctx)
|
||||
defer cancel1()
|
||||
|
||||
ctx2, cancel2 := context.WithTimeout(ctx, 2*time.Second)
|
||||
defer cancel2()
|
||||
|
||||
go subTask(ctx1, "subTask-1", 5*time.Second)
|
||||
go subTask(ctx2, "subTask-2", 5*time.Second)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
fmt.Println("mainTask: 主动取消 subTask-1 的 ctx1")
|
||||
cancel1()
|
||||
|
||||
// 等待一会儿,看 subTask-2 是否因超时被取消
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
|
||||
func main() {
|
||||
root := context.Background()
|
||||
fmt.Println("开始 mainTask...")
|
||||
mainTask(root)
|
||||
fmt.Println("main 结束")
|
||||
}
|
||||
```
|
||||
|
||||
观察输出:
|
||||
|
||||
- `subTask-1` 会因为 `cancel1()` 被提前取消;
|
||||
- `subTask-2` 会因为自己的 `WithTimeout` 超时而被取消;
|
||||
- 两个子 context 都是从同一个上游 `ctx`(这里是 Background)派生出来的。
|
||||
|
||||
> 心法:**不要在 goroutine 里“平白无故”创建 Background context,而是尽量从上游传下来的 ctx 派生。**
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 5:最佳实践 & Java 对照
|
||||
|
||||
- **函数签名规范:**
|
||||
- Go:`func DoSomething(ctx context.Context, req *Request) (*Response, error)`
|
||||
- Java 中常见:`doSomething(Request request, CancellationToken token)` 或基于框架内置上下文。
|
||||
|
||||
- **不要把 ctx 存成 struct 字段**:
|
||||
- ctx 是“请求级别”的东西,应该顺着调用链传,不要挂在全局变量或长生命周期对象上。
|
||||
|
||||
- **取消是“协作式”的**:
|
||||
- 无论是 Java 的线程中断,还是 Go 的 context,都**不会强杀你的逻辑**,只是提供一个“你该停了”的信号。
|
||||
- 业务代码必须自己写 `select { case <-ctx.Done(): ... }` 这样的检查。
|
||||
|
||||
---
|
||||
|
||||
## 📚 今日小结与练习
|
||||
|
||||
- **你已经掌握:**
|
||||
- `context.WithCancel`:类似“取消令牌”,适合手动取消整条任务链。
|
||||
- `context.WithTimeout` / `WithDeadline`:携带超时信息,某个时刻之后自动取消。
|
||||
- 派生 context:从上游 request 派生子 context,实现“树状任务”的统一退出。
|
||||
|
||||
- **练习建议:**
|
||||
1. 把第 10 天的 `select_timeout.go` 改造为使用 `context.WithTimeout` 实现超时控制。
|
||||
2. 写一个“批量 HTTP 请求”的小工具,给每个请求传入相同的 `ctx`,当任意一个失败时取消全部。
|
||||
3. 在“并发爬虫”(第 14 天)中,为每个批次增加总超时时间,超时自动停止抓取。
|
||||
326
src/programming/backend/go/Go并发模型/12Mutex与WaitGroup.md
Normal file
326
src/programming/backend/go/Go并发模型/12Mutex与WaitGroup.md
Normal file
@@ -0,0 +1,326 @@
|
||||
---
|
||||
title: Mutex与WaitGroup
|
||||
icon: mdi:lock-outline
|
||||
date: 2025-12-15
|
||||
category:
|
||||
- Go
|
||||
- 并发编程
|
||||
tag:
|
||||
- sync.Mutex
|
||||
- sync.RWMutex
|
||||
- sync.WaitGroup
|
||||
- Java 锁对比
|
||||
star: true
|
||||
---
|
||||
|
||||
本篇是你 Go 并发学习的 **第 12 天:sync 包中的 Mutex/RWMutex、WaitGroup**,重点对比 Java 的 `synchronized`、`ReentrantLock`、`ReadWriteLock`、`CountDownLatch` 等概念。
|
||||
|
||||
- **你的背景**:Java 程序员
|
||||
- **操作系统**:Linux Mint XFCE
|
||||
- **Go 版本**:go1.22.2 linux/amd64
|
||||
- **项目目录示例**:`/home/liumangmang/GolandProjects/go-sync-practice`
|
||||
|
||||
<!-- more -->
|
||||
|
||||
---
|
||||
|
||||
## 📌 标题
|
||||
|
||||
# Go 并发基础:sync 包 Mutex/RWMutex 与 WaitGroup 实战(对标 Java 锁与 CountDownLatch)
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 1:创建练习项目
|
||||
|
||||
```bash
|
||||
cd /home/liumangmang/GolandProjects
|
||||
mkdir go-sync-practice && cd go-sync-practice
|
||||
go mod init go-sync-practice
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 2:不加锁的共享变量问题(数据竞争)
|
||||
|
||||
先来看一个**错误的写法**,感受一下“数据竞争”(race condition):
|
||||
|
||||
创建 `race_counter.go`:
|
||||
|
||||
```bash
|
||||
nano race_counter.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var wg sync.WaitGroup
|
||||
counter := 0
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < 1000; j++ {
|
||||
counter++
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
fmt.Println("期待的结果:", 1000*1000)
|
||||
fmt.Println("实际结果:", counter)
|
||||
}
|
||||
```
|
||||
|
||||
运行多几次:
|
||||
|
||||
```bash
|
||||
go run race_counter.go
|
||||
```
|
||||
|
||||
你会发现 **实际结果几乎总是小于 1_000_000**,说明出现了数据竞争。
|
||||
|
||||
> Java 类比:这就像在多个线程里对一个 `int` 直接 `counter++`,而没有用 `synchronized` 或 `AtomicInteger` 一样。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 3:使用 sync.Mutex 保护共享数据
|
||||
|
||||
Go 的 `sync.Mutex` 就像 Java 的 `ReentrantLock` 或 `synchronized`,用于**互斥访问**某段临界区。
|
||||
|
||||
创建 `mutex_counter.go`:
|
||||
|
||||
```bash
|
||||
nano mutex_counter.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
counter := 0
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < 1000; j++ {
|
||||
mu.Lock()
|
||||
counter++
|
||||
mu.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
fmt.Println("期待的结果:", 1000*1000)
|
||||
fmt.Println("实际结果:", counter)
|
||||
}
|
||||
```
|
||||
|
||||
再运行几次:
|
||||
|
||||
```bash
|
||||
go run mutex_counter.go
|
||||
```
|
||||
|
||||
你会看到结果稳定为 1_000_000。
|
||||
|
||||
> 心法:**只要有“多个 goroutine 改同一份数据”的情况,就要考虑加锁或者使用 channel / atomic。**
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 4:sync.RWMutex —— 读多写少场景的优化
|
||||
|
||||
**Java 类比:**
|
||||
|
||||
- 类似 `ReentrantReadWriteLock`:
|
||||
- 多个读可以并发;
|
||||
- 写是独占的。
|
||||
|
||||
创建 `rwmutex_cache.go`:
|
||||
|
||||
```bash
|
||||
nano rwmutex_cache.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Cache struct {
|
||||
mu sync.RWMutex
|
||||
data map[string]string
|
||||
}
|
||||
|
||||
func (c *Cache) Get(key string) string {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.data[key]
|
||||
}
|
||||
|
||||
func (c *Cache) Set(key, value string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.data[key] = value
|
||||
}
|
||||
|
||||
func main() {
|
||||
c := &Cache{data: make(map[string]string)}
|
||||
c.Set("foo", "bar")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// 多个读 goroutine
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < 5; j++ {
|
||||
v := c.Get("foo")
|
||||
fmt.Printf("reader-%d 第 %d 次读到: %s\n", id, j, v)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// 一个写 goroutine
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
fmt.Println("writer: 更新 foo -> baz")
|
||||
c.Set("foo", "baz")
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run rwmutex_cache.go
|
||||
```
|
||||
|
||||
你会看到在写之前,读到的都是 `bar`,写之后读到 `baz`;多个 reader 可以并行,写时会短暂阻塞读。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 5:sync.WaitGroup —— 对标 Java 的 CountDownLatch
|
||||
|
||||
第 8 天你已经见过 `WaitGroup`,这里从 Java 角度再强化一下:
|
||||
|
||||
- **Java CountDownLatch:**
|
||||
- `CountDownLatch latch = new CountDownLatch(N);`
|
||||
- 每个任务 `latch.countDown()`;
|
||||
- 主线程 `latch.await()`。
|
||||
- **Go sync.WaitGroup:**
|
||||
- `wg.Add(N)`;
|
||||
- 每个 goroutine `defer wg.Done()`;
|
||||
- 主 goroutine `wg.Wait()`。
|
||||
|
||||
创建 `waitgroup_like_latch.go`:
|
||||
|
||||
```bash
|
||||
nano waitgroup_like_latch.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func worker(id int, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
fmt.Printf("worker-%d 开始工作\n", id)
|
||||
time.Sleep(time.Duration(id) * 300 * time.Millisecond)
|
||||
fmt.Printf("worker-%d 完成\n", id)
|
||||
}
|
||||
|
||||
func main() {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
n := 3
|
||||
wg.Add(n)
|
||||
|
||||
for i := 1; i <= n; i++ {
|
||||
go worker(i, &wg)
|
||||
}
|
||||
|
||||
fmt.Println("main: 等待所有 worker 完成...")
|
||||
wg.Wait()
|
||||
fmt.Println("main: 全部完成")
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run waitgroup_like_latch.go
|
||||
```
|
||||
|
||||
**注意事项:**
|
||||
|
||||
- `Add` 一般在启动 goroutine 之前调用,避免“刚启动 goroutine 就已经 Done 完了”的竞态。
|
||||
- `WaitGroup` 只负责等待 **数量归零**,不区分成功/失败;如果需要结果,要自己用 channel 或别的结构传递。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 6:锁 vs channel:什么时候用哪个?
|
||||
|
||||
给你一个简单的判断思路:
|
||||
|
||||
- **偏向用锁(Mutex/RWMutex)的场景:**
|
||||
- 多 goroutine 操作一份 **嵌套数据结构**(map、树、对象图)。
|
||||
- 逻辑已经很面向“共享内存 + 加锁”思维,迁移成本低。
|
||||
- **偏向用 channel 的场景:**
|
||||
- 更像“任务队列”“消息传递”:一个 goroutine 生产数据,另一个消费。
|
||||
- 更容易建模为“流水线 / 队列 / 事件流”。
|
||||
|
||||
> 心法:**Go 官方更提倡“不要通过共享内存来通信,而是通过通信来共享内存”。但作为 Java 背景,短期内完全用锁也没问题,慢慢再向 channel 思维过渡。**
|
||||
|
||||
---
|
||||
|
||||
## 📚 今日小结与练习
|
||||
|
||||
- **你已经掌握:**
|
||||
- 不加锁的共享变量会产生数据竞争(race condition)。
|
||||
- 用 `sync.Mutex` 保护临界区,保证加减的原子性。
|
||||
- 用 `sync.RWMutex` 在读多写少时提升并发度。
|
||||
- 用 `sync.WaitGroup` 类比 Java `CountDownLatch` 等待一批任务完成。
|
||||
|
||||
- **练习建议:**
|
||||
1. 在 `race_counter.go` 上运行 `go run -race`,体验 Go 自带的 data race 检测器(需要安装完整 Go 工具链)。
|
||||
2. 改写缓存例子,加入 `LoadOrStore` 风格的逻辑:若 key 不存在则写入初始值。
|
||||
3. 将一个你熟悉的 Java 多线程 demo(比如多线程累加)改写成 Go 版本,分别用 Mutex 和 channel 实现一次,对比代码风格。
|
||||
334
src/programming/backend/go/Go并发模型/13atomic原子操作.md
Normal file
334
src/programming/backend/go/Go并发模型/13atomic原子操作.md
Normal file
@@ -0,0 +1,334 @@
|
||||
---
|
||||
title: atomic原子操作
|
||||
icon: mdi:atom
|
||||
date: 2025-12-16
|
||||
category:
|
||||
- Go
|
||||
- 并发编程
|
||||
tag:
|
||||
- sync/atomic
|
||||
- 原子操作
|
||||
- CPU 占用
|
||||
- 性能分析
|
||||
star: true
|
||||
---
|
||||
|
||||
本篇是你 Go 并发学习的 **第 13 天:atomic 包、CPU 占用分析**,会对比 Java 里的 `AtomicInteger`、自旋、CPU 100% 等问题。
|
||||
|
||||
- **你的背景**:Java 程序员
|
||||
- **操作系统**:Linux Mint XFCE
|
||||
- **Go 版本**:go1.22.2 linux/amd64
|
||||
- **项目目录示例**:`/home/liumangmang/GolandProjects/go-atomic-cpu`
|
||||
|
||||
<!-- more -->
|
||||
|
||||
---
|
||||
|
||||
## 📌 标题
|
||||
|
||||
# Go 并发进阶:sync/atomic 原子操作与 CPU 占用分析
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 1:创建练习项目
|
||||
|
||||
```bash
|
||||
cd /home/liumangmang/GolandProjects
|
||||
mkdir go-atomic-cpu && cd go-atomic-cpu
|
||||
go mod init go-atomic-cpu
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 2:用 sync/atomic 做计数器(对标 Java AtomicInteger)
|
||||
|
||||
在第 12 天,你用 `Mutex` 做过并发计数。这次我们用 `sync/atomic` 实现一个 **无锁计数器**。
|
||||
|
||||
创建 `atomic_counter.go`:
|
||||
|
||||
```bash
|
||||
nano atomic_counter.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var wg sync.WaitGroup
|
||||
var counter int64 // 注意必须是 int64/uint64 等特定类型
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < 1000; j++ {
|
||||
atomic.AddInt64(&counter, 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
fmt.Println("期待的结果:", 1000*1000)
|
||||
fmt.Println("实际结果:", counter)
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run atomic_counter.go
|
||||
```
|
||||
|
||||
你会看到结果稳定为 1_000_000,且没有使用 Mutex。
|
||||
|
||||
**Java 类比:**
|
||||
|
||||
- `AtomicInteger.incrementAndGet()` / `addAndGet()`。
|
||||
- 适用于 **非常简单的数值操作** 场景:加减、CAS 更新等;
|
||||
- 若逻辑稍复杂,就更适合 Mutex 或 channel。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 3:CompareAndSwap 模式(CAS)
|
||||
|
||||
CAS 是很多无锁算法的基础:
|
||||
|
||||
- “如果现在的值仍然等于旧值,就更新成新值;否则更新失败,再重试”。
|
||||
|
||||
创建 `atomic_cas.go`:
|
||||
|
||||
```bash
|
||||
nano atomic_cas.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var value int64 = 0
|
||||
|
||||
success := atomic.CompareAndSwapInt64(&value, 0, 42)
|
||||
fmt.Println("第一次 CAS 是否成功:", success, "当前值:", value)
|
||||
|
||||
success = atomic.CompareAndSwapInt64(&value, 0, 100)
|
||||
fmt.Println("第二次 CAS 是否成功:", success, "当前值:", value)
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run atomic_cas.go
|
||||
```
|
||||
|
||||
输出类似:
|
||||
|
||||
```text
|
||||
第一次 CAS 是否成功: true 当前值: 42
|
||||
第二次 CAS 是否成功: false 当前值: 42
|
||||
```
|
||||
|
||||
> 类比 Java:`compareAndSet(expected, update)`。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 4:CPU 100% 的典型错误写法(忙等)
|
||||
|
||||
**Java 中的坑:**
|
||||
|
||||
- `while (!flag) {}` 忙等,CPU 飙升。
|
||||
|
||||
Go 中一样会踩:
|
||||
|
||||
创建 `cpu_busy_loop.go`:
|
||||
|
||||
```bash
|
||||
nano cpu_busy_loop.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var stop int32 = 0
|
||||
|
||||
go func() {
|
||||
for atomic.LoadInt32(&stop) == 0 {
|
||||
// 忙等:什么也不干,不让出 CPU
|
||||
}
|
||||
fmt.Println("worker 退出")
|
||||
}()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
fmt.Println("main: 设置 stop=1")
|
||||
atomic.StoreInt32(&stop, 1)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
```
|
||||
|
||||
运行时,另外开一个终端,用 `top` 或 `htop` 看 CPU 占用,你会发现某个 Go 进程占了一整个核。
|
||||
|
||||
> 这是典型的“忙等”(busy waiting),会让 CPU 始终 100%。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 5:用 channel 或适当让出 CPU 降低占用
|
||||
|
||||
更好的方式:
|
||||
|
||||
- 要么用 channel 等待(阻塞时不占用 CPU);
|
||||
- 要么在循环中适当 `time.Sleep` 或 `runtime.Gosched()` 让出时间片。
|
||||
|
||||
### 方案一:用 channel 代替忙等
|
||||
|
||||
创建 `cpu_channel_wait.go`:
|
||||
|
||||
```bash
|
||||
nano cpu_channel_wait.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
stop := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-stop:
|
||||
fmt.Println("worker 收到停止信号,退出")
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
fmt.Println("main: 关闭 stop channel")
|
||||
close(stop)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
```
|
||||
|
||||
这里 goroutine 会 **阻塞在 channel 上**,几乎不占 CPU。
|
||||
|
||||
### 方案二:在忙等中适当 Sleep(不推荐但可对比)
|
||||
|
||||
```go
|
||||
for atomic.LoadInt32(&stop) == 0 {
|
||||
time.Sleep(1 * time.Millisecond) // 或 runtime.Gosched()
|
||||
}
|
||||
```
|
||||
|
||||
> 总体建议:**优先用 channel / select / context 表达等待,不要手写忙等。**
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 6:简单的 CPU 占用分析流程
|
||||
|
||||
在生产项目中,你可能会遇到:
|
||||
|
||||
- 程序 CPU 突然飙高;
|
||||
- 某些 goroutine 死循环或高频自旋。
|
||||
|
||||
这里给一个最小的实践路径(Linux 环境):
|
||||
|
||||
1. **用 top 找到进程**:
|
||||
|
||||
```bash
|
||||
top
|
||||
```
|
||||
|
||||
观察哪个 `go-xxx` 占用 CPU 很高。
|
||||
|
||||
2. **使用 runtime/pprof(简单版)**:
|
||||
|
||||
为了不复杂化,这里只演示最基本的写 CPU profile 到文件(需要有一定 Go 测试基础时再实践):
|
||||
|
||||
- 在 main 中引入:
|
||||
|
||||
```go
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"runtime/pprof"
|
||||
)
|
||||
|
||||
func main() {
|
||||
f, err := os.Create("cpu.prof")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
pprof.StartCPUProfile(f)
|
||||
defer pprof.StopCPUProfile()
|
||||
|
||||
// ... 你的业务逻辑 ...
|
||||
}
|
||||
```
|
||||
|
||||
- 运行程序一段时间后退出,会生成 `cpu.prof`;
|
||||
- 使用:
|
||||
|
||||
```bash
|
||||
go tool pprof cpu.prof
|
||||
```
|
||||
|
||||
然后在交互界面里用 `top`, `list` 等命令看哪些函数最耗 CPU。
|
||||
|
||||
> 等你对 Go 更熟悉后,可以进一步学习 `net/http/pprof` + 浏览器可视化分析。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 7:atomic vs Mutex:如何选择?
|
||||
|
||||
- **优先考虑 Mutex 或 channel**:
|
||||
- 代码更容易理解;
|
||||
- 出问题时更容易排查。
|
||||
- **在极少数性能敏感“数值累加/标记位”场景下**用 atomic:
|
||||
- 如:计数器、状态标志、统计指标;
|
||||
- 原子操作可以减少锁竞争,但可读性下降。
|
||||
|
||||
> Java 中也是类似:大多数场景用 `synchronized` / `ReentrantLock` 足够,只有在热点计数器或高性能队列里才大量用 atomic + CAS。
|
||||
|
||||
---
|
||||
|
||||
## 📚 今日小结与练习
|
||||
|
||||
- **你已经掌握:**
|
||||
- `sync/atomic` 的基础用法:`Add*`、`Load*`、`Store*`、`CompareAndSwap*`。
|
||||
- 忙等循环会导致 CPU 高占用,应尽量用 channel/ctx/select 等结构化方式等待。
|
||||
- 初步知道如何用 `top` 与 `pprof` 分析 CPU 占用高的问题。
|
||||
|
||||
- **练习建议:**
|
||||
1. 修改 `atomic_counter.go`,在计数完成后再开几个 goroutine 做只读统计,使用 `atomic.LoadInt64` 读取结果。
|
||||
2. 把第 12 天用 Mutex 的计数器改为 atomic 版本,对比两种写法的复杂度和可读性。
|
||||
3. 尝试在一个“错误示例”中加入 pprof,刻意制造一个死循环,生成 `cpu.prof` 后用 `go tool pprof` 看一下 `top` 函数列表。
|
||||
330
src/programming/backend/go/Go并发模型/14并发爬虫实战.md
Normal file
330
src/programming/backend/go/Go并发模型/14并发爬虫实战.md
Normal file
@@ -0,0 +1,330 @@
|
||||
---
|
||||
title: 并发爬虫实战
|
||||
icon: mdi:spider-web
|
||||
date: 2025-12-17
|
||||
category:
|
||||
- Go
|
||||
- 并发编程
|
||||
tag:
|
||||
- 并发爬虫
|
||||
- worker pool
|
||||
- goroutine
|
||||
- channel
|
||||
star: true
|
||||
---
|
||||
|
||||
本篇是你 Go 并发学习的 **第 14 天:实战项目——并发爬虫**。
|
||||
|
||||
- 目标:用 `goroutine + channel + WaitGroup + context` 实现一个简易的 **worker pool 爬虫**。
|
||||
- 对标:Java 里的 `ThreadPoolExecutor + BlockingQueue + Future`。
|
||||
|
||||
- **你的背景**:Java 程序员
|
||||
- **操作系统**:Linux Mint XFCE
|
||||
- **Go 版本**:go1.22.2 linux/amd64
|
||||
- **项目目录示例**:`/home/liumangmang/GolandProjects/go-crawler`
|
||||
|
||||
<!-- more -->
|
||||
|
||||
---
|
||||
|
||||
## 📌 标题
|
||||
|
||||
# Go 并发实战:用 goroutine + channel 写一个简易并发爬虫(对标 Java ThreadPoolExecutor)
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 1:创建项目
|
||||
|
||||
```bash
|
||||
cd /home/liumangmang/GolandProjects
|
||||
mkdir go-crawler && cd go-crawler
|
||||
go mod init go-crawler
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 2:设计思路(Java 对比版)
|
||||
|
||||
**Java 常见写法:**
|
||||
|
||||
- 使用 `ExecutorService pool = Executors.newFixedThreadPool(N)`;
|
||||
- 提交一批 URL 爬取任务 `pool.submit(() -> fetch(url))`;
|
||||
- 通过 `Future` 或回调收集结果。
|
||||
|
||||
**Go 对应思路:**
|
||||
|
||||
- 一个 `jobs` channel:放要爬的 URL(像 Java 的任务队列)。
|
||||
- 若干 worker goroutine:从 `jobs` 里取 URL,执行 `fetch`,把结果写到 `results` channel。
|
||||
- `WaitGroup` 等待所有 worker 结束;
|
||||
- 可选:`context` 控制总超时 / 取消。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 3:最小可用版并发爬虫
|
||||
|
||||
创建 `crawler_basic.go`:
|
||||
|
||||
```bash
|
||||
nano crawler_basic.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// 抓取页面并提取 <title>
|
||||
func fetchTitle(url string) (string, error) {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
re := regexp.MustCompile("(?is)<title>(.*?)</title>")
|
||||
matches := re.FindSubmatch(body)
|
||||
if len(matches) >= 2 {
|
||||
return string(matches[1]), nil
|
||||
}
|
||||
return "(no title)", nil
|
||||
}
|
||||
|
||||
// worker 从 jobs 读取 URL,写结果到 results
|
||||
func worker(id int, jobs <-chan string, results chan<- string, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
for url := range jobs { // channel 关闭后,range 自动结束
|
||||
title, err := fetchTitle(url)
|
||||
if err != nil {
|
||||
results <- fmt.Sprintf("[worker-%d] %s ERROR: %v", id, url, err)
|
||||
continue
|
||||
}
|
||||
results <- fmt.Sprintf("[worker-%d] %s => %s", id, url, title)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
urls := []string{
|
||||
"https://golang.org",
|
||||
"https://go.dev",
|
||||
"https://www.baidu.com",
|
||||
"https://www.bing.com",
|
||||
}
|
||||
|
||||
jobs := make(chan string)
|
||||
results := make(chan string)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
workerCount := 3
|
||||
|
||||
// 启动 worker
|
||||
for i := 1; i <= workerCount; i++ {
|
||||
wg.Add(1)
|
||||
go worker(i, jobs, results, &wg)
|
||||
}
|
||||
|
||||
// 发送任务
|
||||
go func() {
|
||||
for _, url := range urls {
|
||||
jobs <- url
|
||||
}
|
||||
close(jobs) // 不再有新任务
|
||||
}()
|
||||
|
||||
// 单独 goroutine 负责在所有 worker 结束后关闭 results
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(results)
|
||||
}()
|
||||
|
||||
// 主 goroutine 消费结果
|
||||
for res := range results {
|
||||
fmt.Println(res)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
运行:
|
||||
|
||||
```bash
|
||||
go run crawler_basic.go
|
||||
```
|
||||
|
||||
你会看到多个 `worker-x` 交错打印各个 URL 的 Title,这就是最基本的“并发爬虫 + worker pool”模型。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 4:加入 context 控制总超时
|
||||
|
||||
有时你不希望爬虫无限等待,而是设置一个**总超时时间**,到了就整体停止。
|
||||
|
||||
创建 `crawler_with_context.go`:
|
||||
|
||||
```bash
|
||||
nano crawler_with_context.go
|
||||
```
|
||||
|
||||
粘贴:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func fetchTitleCtx(ctx context.Context, url string) (string, error) {
|
||||
// 使用带 context 的请求
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
re := regexp.MustCompile("(?is)<title>(.*?)</title>")
|
||||
matches := re.FindSubmatch(body)
|
||||
if len(matches) >= 2 {
|
||||
return string(matches[1]), nil
|
||||
}
|
||||
return "(no title)", nil
|
||||
}
|
||||
|
||||
func workerCtx(ctx context.Context, id int, jobs <-chan string, results chan<- string, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
results <- fmt.Sprintf("[worker-%d] 收到取消信号: %v", id, ctx.Err())
|
||||
return
|
||||
case url, ok := <-jobs:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// 为每个请求单独派生一个带超时的 ctx(例如单个请求 3 秒)
|
||||
reqCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
title, err := fetchTitleCtx(reqCtx, url)
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
results <- fmt.Sprintf("[worker-%d] %s ERROR: %v", id, url, err)
|
||||
continue
|
||||
}
|
||||
results <- fmt.Sprintf("[worker-%d] %s => %s", id, url, title)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
urls := []string{
|
||||
"https://golang.org",
|
||||
"https://go.dev",
|
||||
"https://www.baidu.com",
|
||||
"https://www.bing.com",
|
||||
}
|
||||
|
||||
jobs := make(chan string)
|
||||
results := make(chan string)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
workerCount := 3
|
||||
|
||||
// 整体爬虫最多运行 5 秒
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
for i := 1; i <= workerCount; i++ {
|
||||
wg.Add(1)
|
||||
go workerCtx(ctx, i, jobs, results, &wg)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for _, url := range urls {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case jobs <- url:
|
||||
}
|
||||
}
|
||||
close(jobs)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(results)
|
||||
}()
|
||||
|
||||
for res := range results {
|
||||
fmt.Println(res)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
> 如果你刻意加入一些“很慢甚至不响应”的 URL,就能看到 **context 超时导致 worker 退出** 的效果。
|
||||
|
||||
---
|
||||
|
||||
## ✅ 步骤 5:与 Java ThreadPoolExecutor 的对照
|
||||
|
||||
- **worker pool = 固定大小线程池**:
|
||||
- Java:`newFixedThreadPool(N)`;
|
||||
- Go:启动 N 个 goroutine,统一从 `jobs` channel 取任务。
|
||||
|
||||
- **任务队列**:
|
||||
- Java:`BlockingQueue<Runnable>`;
|
||||
- Go:`chan string`(放 URL),`chan Result`(放结果)。
|
||||
|
||||
- **关闭流程**:
|
||||
- Java:`pool.shutdown();` + `awaitTermination(...)`;
|
||||
- Go:关闭 `jobs` channel + `WaitGroup.Wait()`,再关闭 `results`。
|
||||
|
||||
- **超时 / 取消**:
|
||||
- Java:`Future.get(timeout)` / 自己维护取消令牌;
|
||||
- Go:`context.WithTimeout` / `WithCancel` + 在 goroutine 中 `select <-ctx.Done()`。
|
||||
|
||||
> 心法:**把 ThreadPoolExecutor 想成“封装好的 worker pool + 队列 + 管理接口”,而在 Go 里你是直接用 goroutine + channel 原材料自己搭一个。**
|
||||
|
||||
---
|
||||
|
||||
## 📚 今日小结与扩展练习
|
||||
|
||||
- **你已经完成:**
|
||||
- 用 goroutine + channel + WaitGroup 实现了一个简易并发爬虫;
|
||||
- 用 context 控制了总超时时间和单个请求超时时间;
|
||||
- 理解了 worker pool 模式与 Java 线程池的对应关系。
|
||||
|
||||
- **扩展练习:**
|
||||
1. 给爬虫增加“深度”:解析页面中的链接,做一层或两层的递归抓取(注意不要无限爬)。
|
||||
2. 给每个 URL 定义一个结果结构体 `struct { URL, Title string; Err error }`,通过 `results <- result` 返回,更贴近真实项目。
|
||||
3. 增加最大并发限制(例如最多同时 5 个请求),通过调整 worker 数量和 channel 容量体会性能差异。
|
||||
|
||||
到这里,你已经把 Go 并发从概念(goroutine、GPM、channel、select)一路练到实战(worker pool 并发爬虫)。接下来可以根据兴趣,深入某块(比如高性能网络、微服务、消息队列消费者等),用相同的并发模型继续扩展。
|
||||
Reference in New Issue
Block a user