Getting started with Golang Task/Job Queue

Installation

taskq supports 2 last Go versions and requires a Go version with modulesopen in new window support. So before installing taskq, make sure to initialize a Go module:

go mod init github.com/my/repo

And then install taskq/v3 (note v3 in the import; omitting it is a popular mistake):

go get github.com/vmihailenco/taskq/v3

Producer and consumer

With asynchronous tasks, you typically split your app into the two separate processes:

  • Producer accepts requests from customers and adds tasks to queues.
  • Consumer fetches tasks from the queues and processes them.

This way you can:

  • Isolate producers and consumers from each other, for example, producers will continue working if consumers start crashing.
  • Scale producers and consumers separately.
  • Have different configs, for example, use large network timeouts in consumers.

For details, see redisexampleopen in new window that demonstrates this approach using Redis backend.

Backends

To get started, you need to create a queue factory, for example, using Redis as a backend:

import "github.com/vmihailenco/taskq/v3/redisq"

var QueueFactory = redisq.NewFactory()

Or SQS:

import "github.com/vmihailenco/taskq/v3/azsqs"

var QueueFactory = azsqs.NewFactory()

Or IronMQ:

import "github.com/vmihailenco/taskq/v3/ironmq"

var QueueFactory = ironmq.NewFactory()

Creating queues and tasks

Using that factory, you can define queues:

var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
    Name:  "api-worker",
    Redis: Redis, // go-redis client
})

Next, you need to register a task that will be used to process jobs:

import "github.com/vmihailenco/taskq/v3"

var CountTask = taskq.RegisterTask(&taskq.TaskOptions{
    Name: "counter",
    Handler: func() error {
        IncrLocalCounter()
        return nil
    },
})

Producing tasks

Having a queue and a task, you can start producing jobs:

ctx := context.Background()

for {
	// Add the task without any args.
	err := MainQueue.Add(CountTask.WithArgs(ctx))
	if err != nil {
		panic(err)
	}
	time.Sleep(time.Second)
}

Consuming tasks

To consume jobs, you can just start the queue:

ctx := context.Background()

if err := MainQueue.Consumer().Start(ctx); err != nil {
    log.Fatal(err)
}

API overview

t := myQueue.RegisterTask(&taskq.TaskOptions{
    Name:    "greeting",
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },
})

// Say "Hello World".
err := myQueue.Add(t.WithArgs(context.Background(), "World"))
if err != nil {
    panic(err)
}

// Say "Hello World" with 1 hour delay.
msg := t.WithArgs(ctx, "World")
msg.Delay = time.Hour
_ = myQueue.Add(msg)

// Say "Hello World" once.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World")
    msg.Name = "hello-world" // unique
    _ = myQueue.Add(msg)
}

// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World")
    msg.Name = "hello-world"
    msg.Delay = time.Hour
    _ = myQueue.Add(msg)
}

// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour)
    _ = myQueue.Add(msg)
}

// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour, "World", "europe")
    _ = myQueue.Add(msg)
}

Handlers

A Handler and FallbackHandler are supplied to RegisterTask in the TaskOptions.

There are three permitted types of signature:

  1. A zero-argument function
  2. A function whose arguments are assignable in type from those which are passed in the message
  3. A function which takes a single *Message argument

If a task is registered with a handler that takes a Go context.Context as its first argument then when that handler is invoked it will be passed the same Context that was passed to Consumer.Start(ctx). This can be used to transmit a signal to abort to all tasks being processed:

var AbortableTask = MainQueue.RegisterTask(&taskq.TaskOptions{
    Name: "SomethingLongwinded",
    Handler: func(ctx context.Context) error {
        for range time.Tick(time.Second) {
            select {
                case <-ctx.Done():
                    return ctx.Err()
                default:
                    fmt.Println("Wee!")
            }
        }
        return nil
    },
})

Message deduplication

If a Message has a Name then this will be used as unique identifier and messages with the same name will be deduplicated (i.e. not processed again) within a 24 hour period (or possibly longer if not evicted from local cache after that period). Where Name is omitted then non deduplication occurs and each message will be processed. Task's WithMessage and WithArgs both produces messages with no Name so will not be deduplicated. OnceWithArgs sets a name based off a consistent hash of the arguments and a quantised period of time (i.e. 'this hour', 'today') passed to OnceWithArgs a period. This guarantees that the same function will not be called with the same arguments during `period'.

Custom message delay

If error returned by handler implements Delay() time.Duration interface then that delay is used to postpone message processing.

type RateLimitError string

func (e RateLimitError) Error() string {
    return string(e)
}

func (RateLimitError) Delay() time.Duration {
    return time.Hour
}

func handler() error {
    return RateLimitError("calm down")
}