需求:有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太 高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控 制并发数。
package main
import (
"fmt"
"time"
)
var limit = make(chan int, 3)
func main() {
for _, w := range work() {
go func() {
limit <- 1
w()
<-limit
}()
}
select {
case <-time.After(10 * time.Second):
}
}
func work() []func() {
var works []func()
for i := 0; i < 20; i++ {
works = append(works, func() {
time.Sleep(1 * time.Second)
fmt.Println("do work")
})
}
return works
}
该程序在执行works切片里面的work时,每次最多只能执行limit个并发任务(work),因为channel的缓冲已满,必须要等接收之后才能继续发送数据。这样就达到了控制并发数的目的。程序中加入了time.Sleep等待时间,便于查看效果。
这里需要注意的是,如果w()函数执行的时候出现panic的话,limit就无法接收,需要使用defer来实现:
如果没有defer处理程序会panic:
package main
import (
"fmt"
"time"
)
var limit = make(chan int, 3)
func main() {
for i, w := range work() {
go func(f int) {
limit <- 1
w(f)
<-limit
}(i)
}
select {
case <-time.After(10 * time.Second):
}
}
func work() []func(int) {
var works []func(int)
for i := 0; i < 20; i++ {
works = append(works, func(f int) {
if f == 6 {
panic("work panic")
}
time.Sleep(1 * time.Second)
fmt.Printf("do work %d\n", f)
})
}
return works
}
output:
do work 1
do work 0
do work 2
do work 4
do work 5
do work 3
panic: work panic
添加defer处理panic:
package main
import (
"fmt"
"time"
)
var limit = make(chan int, 3)
func main() {
for i, w := range work() {
go func(f int) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
<-limit
}
}()
limit <- 1
w(f)
<-limit
}(i)
}
select {
case <-time.After(10 * time.Second):
}
}
func work() []func(int) {
var works []func(int)
for i := 0; i < 20; i++ {
works = append(works, func(f int) {
if f == 6 {
panic("work panic")
}
time.Sleep(1 * time.Second)
fmt.Printf("do work %d\n", f)
})
}
return works
}
output:
do work 3
do work 4
do work 2
work panic
do work 12
do work 7
do work 5
do work 10
do work 9
do work 8
do work 0
do work 16
do work 11
do work 14
do work 13
do work 15
do work 17
do work 19
do work 18
do work 1
发生panic的work6,没有成功执行。