程序员社区

使用channel控制程序并发数

需求:有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太 高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控 制并发数。

package main

import (
    "fmt"
    "time"
)

var limit = make(chan int, 3)

func main() {
    for _, w := range work() {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    select {
    case <-time.After(10 * time.Second):
    }
}

func work() []func() {
    var works []func()
    for i := 0; i < 20; i++ {
        works = append(works, func() {
            time.Sleep(1 * time.Second)
            fmt.Println("do work")
        })
    }
    return works
}

该程序在执行works切片里面的work时,每次最多只能执行limit个并发任务(work),因为channel的缓冲已满,必须要等接收之后才能继续发送数据。这样就达到了控制并发数的目的。程序中加入了time.Sleep等待时间,便于查看效果。
这里需要注意的是,如果w()函数执行的时候出现panic的话,limit就无法接收,需要使用defer来实现:

如果没有defer处理程序会panic:

package main

import (
    "fmt"
    "time"
)

var limit = make(chan int, 3)

func main() {
    for i, w := range work() {
        go func(f int) {
            limit <- 1
            w(f)
            <-limit
        }(i)
    }
    select {
    case <-time.After(10 * time.Second):
    }
}

func work() []func(int) {
    var works []func(int)
    for i := 0; i < 20; i++ {
        works = append(works, func(f int) {
            if f == 6 {
                panic("work panic")
            }
            time.Sleep(1 * time.Second)
            fmt.Printf("do work %d\n", f)
        })
    }
    return works
}

output:

do work 1
do work 0
do work 2
do work 4
do work 5
do work 3
panic: work panic

添加defer处理panic:

package main

import (
    "fmt"
    "time"
)

var limit = make(chan int, 3)

func main() {
    for i, w := range work() {
        go func(f int) {
            defer func() {
                if err := recover(); err != nil {
                    fmt.Println(err)
                    <-limit
                }
            }()
            limit <- 1
            w(f)
            <-limit
        }(i)
    }
    select {
    case <-time.After(10 * time.Second):
    }
}

func work() []func(int) {
    var works []func(int)
    for i := 0; i < 20; i++ {
        works = append(works, func(f int) {
            if f == 6 {
                panic("work panic")
            }
            time.Sleep(1 * time.Second)
            fmt.Printf("do work %d\n", f)
        })
    }
    return works
}

output:

do work 3
do work 4
do work 2
work panic
do work 12
do work 7
do work 5
do work 10
do work 9
do work 8
do work 0
do work 16
do work 11
do work 14
do work 13
do work 15
do work 17
do work 19
do work 18
do work 1

发生panic的work6,没有成功执行。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » 使用channel控制程序并发数

一个分享Java & Python知识的社区