package main
import (
"fmt"
"sync"
)
type Job interface {
Do()
}
type Worker struct {
id int
jobChannel chan Job
done chan bool
}
type Pool struct {
workers []*Worker
jobQueue chan Job
wg sync.WaitGroup
}
func NewWorker(id int, wg *sync.WaitGroup) *Worker {
worker := Worker{
id: id,
jobChannel: make(chan Job),
done: make(chan bool),
}
go func() {
for job := range worker.jobChannel {
job.Do()
}
wg.Done()
}()
return &worker
}
func NewPool(numWorkers int) *Pool {
pool := Pool{
workers: make([]*Worker, numWorkers),
jobQueue: make(chan Job),
}
for i := 0; i < numWorkers; i++ {
pool.workers[i] = NewWorker(i, &pool.wg)
}
go pool.run()
return &pool
}
func (w *Worker) Start(job Job) {
w.jobChannel <- job
}
func (w *Worker) Stop() {
close(w.jobChannel)
<-w.done
}
func (p *Pool) run() {
for job := range p.jobQueue {
worker := p.getAvailableWorker()
worker.Start(job)
}
for _, worker := range p.workers {
worker.Stop()
}
p.wg.Done()
}
func (p *Pool) getAvailableWorker() *Worker {
for {
for _, worker := range p.workers {
select {
case <-worker.done:
default:
return worker
}
}
}
}
func (p *Pool) Submit(job Job) {
p.wg.Add(1)
p.jobQueue <- job
}
func (p *Pool) Shutdown() {
close(p.jobQueue)
p.wg.Wait()
}
type PrintJob struct {
id int
}
func (pj PrintJob) Do() {
fmt.Printf("Printing job %d\n", pj.id)
}
func main() {
pool := NewPool(5)
for i := 0; i < 10; i++ {
pool.Submit(PrintJob{id: i})
}
pool.Shutdown()
}
1
perfectlife 2023-06-15 16:25:06 +08:00
这段代码实现了一个简单的线程池( goroutine pool )。
在这个线程池中,有多个 goroutine (即上面代码中的 Worker ),它们不断地从一个 job 队列(即 Pool 结构体中的 jobQueue )中获取任务,并执行相应的操作。同时,这个线程池还提供了一个 Submit 方法,可以将具体的任务(即实现了 Job 接口的结构体)提交到 job 队列中。当所有的任务都被执行完毕后,可以调用 Shutdown 方法来关闭整个线程池。 在这段代码中,我们定义了一个 PrintJob 结构体,它实现了 Job 接口的 Do() 方法,在该方法中输出一段指定格式的字符串。我们使用这个 PrintJob 来模拟一个需要复杂处理的任务,然后将 10 个这样的任务提交到线程池中进行处理。 需要注意的是,在线程池中,由于所有的 goroutine 是并行运行的,因此无法保证任务的执行顺序和完成时间。如果需要控制任务的顺序或者依赖关系,就需要在代码中增加相应的控制逻辑。 |
2
lincanbin 2023-06-15 16:29:35 +08:00
你问问 ChatGPT 吧
|
3
wchhm 2023-06-15 16:36:48 +08:00
@perfectlife 这位老哥应该就是问了 GPT 吧
|
4
perfectlife 2023-06-15 16:45:38 +08:00
@wchhm 对,简单粗暴一点
|
5
raphaell2e 2023-06-15 18:55:58 +08:00
异步任务
|