goroutine原理(Not finishing)
1. 设计思路
1.1. 设计描述
- 启动服务之时先初始化一个 Goroutine Pool 池,这个 Pool 维护了一个类似栈的 LIFO 队列 ,里面存放负责处理任务的 Worker
- 然后在 client 端提交 task 到 Pool 中之后,在 Pool 内部,接收 task 之后的核心操作是
- 检查当前 Worker 队列中是否有可用的 Worker,如果有,取出执行当前的 task;
- 没有可用的 Worker,判断当前在运行的 Worker 是否已超过该 Pool 的容量
- 每个 Worker 执行完任务之后,放回 Pool 的队列中等待
1.2. Pool struct
type sig struct{}
type f func() error
// pool从客户端获取任务,它限制goroutines总数,并且回收再使用
type Pool struct {
capacity int32
running int32
expiryDuration time.Duration
workers []*Worker
release chan sig
lock sync.Mutex
once sync.Once
}
1.3. 初始化 Pool 并启动定期清理过期 worker 任务
func NewPool(size int) (*Pool, error) {
return NewTimingPool(size, DefaultCleanIntervalTime)
}
func NewTimingPool(size, expiry int) (*Pool, error) {
if size <= 0 {
return nil, ErrInvalidPoolSize
}
if expiry <= 0 {
return nil, ErrInvalidPoolExpiry
}
p := &Pool{
capacity: int32(size),
freeSignal: make(chan sig, math.MaxInt32),
release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second,
}
p.monitorAndClear()
return p, nil
}
1.4. 提交任务到 Pool
- 第一个 if 判断当前 Pool 是否已被关闭,若是则不再接受新任务,否则获取一个 Pool 中可用的 worker,绑定该
task
执行。
func (p *Pool) Submit(task f) error {
if len(p.release) > 0 {
return ErrPoolClosed
}
w := p.getWorker()
w.task <- task
return nil
}
1.5. 获取可用 worker(核心)
func (p *Pool) getWorker() *Worker {
var w *Worker
waiting := false
p.lock.Lock()
idleWorkers := p.workers
n := len(idleWorkers) - 1
if n < 0 {
waiting = p.Running() >= p.Cap()
} else {
w = idleWorkers[n]
idleWorkers[n] = nil
p.workers = idleWorkers[:n]
}
p.lock.Unlock()
if waiting {
for {
p.lock.Lock()
idleWorkers = p.workers
l := len(idleWorkers) - 1
if l < 0 {
p.lock.Unlock()
continue
}
w = idleWorkers[l]
idleWorkers[l] = nil
p.workers = idleWorkers[:l]
p.lock.Unlock()
break
}
} else if w == nil {
w = &Worker{
pool: p,
task: make(chan f, 1),
}
w.run()
p.incRunning()
}
return w
}
1.6. 执行任务
- 结合前面的
p.Submit(task f)
和 p.getWorker()
,提交任务到 Pool 之后,获取一个可用 worker
- 每新建一个 worker 实例之时都需要调用
w.run()
启动一个 goroutine 监听 worker 的任务列表 task
,一有任务提交进来就执行;
- 所以,当调用 worker 的
sendTask(task f)
方法提交任务到 worker 的任务队列之后,马上就可以被接收并执行
- 当任务执行完之后,会调用
w.pool.putWorker(w *Worker)
方法将这个已经执行完任务的 worker 从当前任务解绑放回 Pool 中,以供下个任务可以使用
- 至此,一个任务从提交到完成的过程就此结束,Pool 调度将进入下一个循环。
type Worker struct {
pool *Pool
task chan f
recycleTime time.Time
}
func (w *Worker) run() {
go func() {
for f := range w.task {
if f == nil {
w.pool.decRunning()
return
}
f()
w.pool.putWorker(w)
}
}()
}
1.7. worker回收(goroutine 复用)
func (p *Pool) putWorker(worker *Worker) {
worker.recycleTime = time.Now()
p.lock.Lock()
p.workers = append(p.workers, worker)
p.lock.Unlock()
}
1.8. 动态扩容或者缩小池容量
func (p *Pool) ReSize(size int) {
if size == p.Cap() {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
diff := p.Running() - size
if diff > 0 {
for i := 0; i < diff; i++ {
p.getWorker().task <- nil
}
}
}
1.9. 定期清理过期 Worker
- 定期检查空闲 worker 队列中是否有已过期的 worker 并清理
- 因为采用了 LIFO 后进先出队列存放空闲 worker,所以该队列默认已经是按照 worker 的最后运行时间由远及近排序
- 可以方便地按顺序取出空闲队列中的每个 worker 并判断它们的最后运行时间与当前时间之差是否超过设置的过期时长
- 若是,则清理掉该 goroutine,释放该 worker,并且将剩下的未过期 worker 重新分配到当前 Pool 的空闲 worker 队列中,进一步节省系统资源
func (p *Pool) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration)
for range heartbeat.C {
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
if len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 {
p.lock.Unlock()
return
}
n := 0
for i, w := range idleWorkers {
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
break
}
n = i
w.task <- nil
idleWorkers[i] = nil
}
n++
if n >= len(idleWorkers) {
p.workers = idleWorkers[:0]
} else {
p.workers = idleWorkers[n:]
}
p.lock.Unlock()
}
}
2. pool使用
2.1. 公共池
package main
import (
"fmt"
"sync"
"time"
"github.com/panjf2000/ants/v2"
)
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {
defer ants.Release()
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
}
2.2. 方法绑定池
package main
import (
"fmt"
"github.com/panjf2000/ants/v2"
"sync"
)
func myFunc(i interface{}) {
fmt.Printf("run with %d\n", i)
}
func main() {
defer ants.Release()
var wg sync.WaitGroup
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
for i := 0; i < 1000; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
}