To make a better user experience I’ll be implementing a work queue to handle tasks that take a long time but are not necessary to be completed before returning a successful API response. The classic example is image processing: we want to let the user know that everything is okay as soon as the file is finished uploading, we don’t want them to have to wait for thumbnails to be created or other background processing to take place. Instead, as soon as the upload is complete we’ll add a message to a queue and return a successful response to the user. A separate process listening for messages from the queue will then do the rest of the processing asynchronously.
There are a number of options for how exactly to implement a background work queue. Queues can be implemented on a cloud service like AWS SQS, or a database like Redis (the excellent sidekiq library for Ruby runs on Redis), or even something more heavy-duty like PostgreSQL or MySQL. There are even some purpose-built software like beanstalkd. Since we’re using Go we could also just immediately spawn a goroutine to do the processing and then immediately return the response. This last option would be fine except that it isn’t durable (i.e., if the server restarts with a job in-flight then we’d lose that job). We could add some additional processing to create a record in a database before spawning the goroutine and then search for incomplete jobs on startup, periodically, or similar but then we’ve just (re)invented a traditional work queue. This could also be perceived as a premature optimization, but spawning a goroutine in the webserver process also doesn’t allow us to distribute the work to a different server (Raspberry Pi) which could negatively affect the performance of the webserver or create a situation in which we can’t process the queue in a reasonable timeframe.
I’ve decided that the solution to this problem is to use a real work queue. As you’ve probably gathered by the title of this post the backend will use RabbitMQ which is technically a message queue, but by using less of the total functionality that it offers we can create a simple work queue on top which also seems to be an official use-case of the project since they offer a tutorial on how to implement it.
In this article we’ll focus on the worker part; adding items to the queue is beyond the scope of what I want to cover. We’ll start with (a slightly modified version of) the official tutorial on how to implement a work queue:
package main
import (
"bytes"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Panic(err)
}
defer ch.Close()
// the rabbitmq example declares the queue here, we'll assume that
// you either do that or, like me, you declare your queues as part of
// an infrastructure-as-code using e.g., ansible or terraform
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
log.Panic(err)
}
msgs, err := ch.Consume(
"qname", // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Panic(err)
}
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf("Waiting for messages. To exit press CTRL^C")
<-forever
}
This works fine except that if you press Ctrl^C while a job is in flight you
cancel the job immediately. That’s also mostly okay because without the Ack
the job will eventually be re-queued, but a more graceful solution is to stop
processing new messages and then give a brief window to allow any in-flight jobs
to finish.
The code below has been mostly cobbled together following examples from a few different sources:
- An example from the official RabbitMQ Go client: https://github.com/rabbitmq/amqp091-go/blob/main/_examples/consumer/consumer.go
- A gist that I found on Github that claims to do a graceful shutdown: https://gist.github.com/spksoft/59d1cb62c664d818a2500dab1e873761
- The now-defunt Graceful-Shutdown library: https://github.com/mramshaw/Graceful-Shutdown/blob/master/graceful_shutdown.go
- The graceful shutdown example from gin-gonic: https://gin-gonic.com/docs/examples/graceful-restart-or-stop/
I’ve added comments in the code inline which should explain what is happening and why.
package main
import (
"bytes"
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// create a context to listen for interrupt signal from the OS
ctx, stop := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer stop()
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Panic(err)
}
defer ch.Close()
// the rabbitmq example declares the queue here, we'll assume that
// you either do that or, like me, you declare your queues as part of
// an infrastructure-as-code using e.g., ansible or terraform
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
log.Panic(err)
}
msgs, err := ch.Consume(
"qname", // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Panic(err)
}
// we'll use this channel to know when we're done processing any
// in-flight messages when exiting the app
done := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
// we've cancelled and closed the channel so no more messages
// will be processed, notify our "done" channel that we are
// indeed done processing messages
done <- true
}()
log.Printf("Waiting for messages. To exit press CTRL^C")
// we've intercepted the interrupt signal, restore normal signal
// handling
<-ctx.Done()
stop()
log.Println("received sigint/sigterm, shutting down...")
log.Println("press Ctrl^C again to force shutdown")
if err := ch.Cancel("qname", false); err != nil {
log.Panic(err)
}
if err := ch.Close(); err != nil {
log.Panic(err)
}
// if we get a message on the "done" channel it means that we
// successfully finished the job that we had in-flight (if there was
// one) and exited the for loop. if that job takes longer than our
// timeout then we'll get a message on the timeout (time.After) channel
// instead
select {
case <-time.After(5 * time.Second):
log.Println("timed out waiting for jobs to finish")
case <-done:
log.Println("finished processing all jobs")
}
}
You can test this out by running the code sending some messages that take shorter or longer than the timeout and press Ctrl^C at different times during the processing: with enough time for the job to finish you’ll see that as soon as the last message is processed the worker exits immediately (also if there are no jobs currently in flight), without enough time for the job to finish you’ll see that after the timeout the worker exits anyway, or if you press Ctrl^C again while waiting for a job to finish the worker will exit immediately anyway.