worker API

worker

package

API reference for the worker package.

T
type

Task

Task is the unit of work executed by the pool.

pkg/worker/worker.go:9-9
type Task func(context.Context) error
S
struct

Pool

Pool is a fixed-size worker pool with graceful shutdown.

pkg/worker/worker.go:12-19
type Pool struct

Methods

worker
Method

worker pulls tasks from the tasks channel until done is closed.

func (*Pool) worker()
{
	defer p.wg.Done()
	for {
		select {
		case task := <-p.tasks:
			_ = task(p.ctx)
		case <-p.done:
			return
		}
	}
}
Submit
Method

Submit enqueues a task for execution. Returns false if the pool is shut down or shutting down; the task is not executed in that case.

Parameters

task Task

Returns

bool
func (*Pool) Submit(task Task) bool
{
	select {
	case p.tasks <- task:
		return true
	case <-p.done:
		return false
	}
}
Shutdown
Method

Shutdown stops accepting new tasks and waits for in-progress work to finish. The task channel is unbuffered, so no tasks can be pending pickup when done is closed — any Submit in progress will observe done and return false.

func (*Pool) Shutdown()
{
	p.once.Do(func() {
		close(p.done)
		p.wg.Wait()
		p.cancel()
	})
}

Fields

Name Type Description
wg sync.WaitGroup
tasks chan Task
done chan struct{}
cancel context.CancelFunc
ctx context.Context
once sync.Once
F
function

NewPool

NewPool creates a fixed-size worker pool of n goroutines.

Parameters

n
int

Returns

pkg/worker/worker.go:30-47
func NewPool(n int) *Pool

{
	if n <= 0 {
		n = 1
	}

	ctx, cancel := context.WithCancel(context.Background())
	p := &Pool{
		tasks:  make(chan Task),
		done:   make(chan struct{}),
		cancel: cancel,
		ctx:    ctx,
	}
	for i := 0; i < n; i++ {
		p.wg.Add(1)
		go p.worker()
	}
	return p
}

Example

pool := worker.NewPool(4)
defer pool.Shutdown()
pool.Submit(func(ctx context.Context) error {
	return doWork(ctx)
})