Writing an SQS Consumer in Python

This tutorial will walk you through the process of creating a production-ready SQS consumer in Python. You can view the code on github here. We will cover the following topics:

  • Consuming messages
  • Handling exceptions
  • Exiting gracefully from a signal
  • Long polling
  • Monitoring queue health
  • Replaying messages from a dead letter queue

Consuming messages

Let's start with a basic SQS consumer:

sqs_consumer.py
import boto3

sqs = boto3.resource("sqs")
queue = sqs.get_queue_by_name(QueueName="your-queue-name")

def process_message(message_body):
    print(f"processing message: {sqs_message}")
    # do what you want with the message here
    pass

if __name__ == "__main__":
    while True:
        messages = sqs_queue.receive_messages()
        for message in messages:
            process_message(message.body)
            message.delete()

You can run this consumer with python -m sqs_consumer. For it to work, you'll need to install boto3 (pip install boto3), and configure boto3 with your AWS credentials. You'll also need an SQS queue.

Handling exceptions

Currently if an error occurs while processing a message, the whole consumer will crash. Instead, we'd like messages to be retried 3 times and then end up in a dead letter queue. You can read more about dead letter queues in the AWS docs here

By default, SQS makes messages invisible for 30 seconds after they have been read. If you have a redrive policy set up, SQS will automatically put your message on the dead letter queue after 3 unsuccessful reads. Therefore, we can handle errors by simply not deleting the message if an exception is thrown:

sqs_consumer.py
...
if __name__ == "__main__":
    while True:
        messages = sqs_queue.receive_messages()
        for message in messages:
            try:
                process_message(sns_message.body)
            except Exception as e:
                print(f"exception while processing message: {repr(e)}")
                continue

            message.delete()

Exiting gracefully from a signal

If we're deploying our application to production, our consumer must be able to exit gracefully. If your deployment infrastructure is anything like mine, your consumer will be sent a SIGTERM or SIGINT signal during each deployment. We want to finish handling the current message and then exit the while loop, rather than being forced to exit mid-execution. We can use the signal package from the Python standard library for this. Let's write a class to tell our consumer if we've received an exit signal:

sqs_consumer.py
...
from signal import signal, SIGINT, SIGTERM
...

class SignalHandler:
    def __init__(self):
        self.received_signal = False
        signal(SIGINT, self._signal_handler)
        signal(SIGTERM, self._signal_handler)

    def _signal_handler(self, signal, frame):
        print(f"handling signal {signal}, exiting gracefully")
        self.received_signal = True
...

if __name__ == "__main__":
    signal_handler = SignalHandler()
    while not signal_handler.received_signal:
        messages = sqs_queue.receive_messages()
...

Now if you run the consumer and hit ctrl+c, you'll see it finish the current iteration of the while loop before exiting.

Long polling

Currently the consumer uses a lot of CPU. This is because we are using short polling instead of long polling. With short polling, SQS responds right away, even if the query found no messages. With long polling, SQS waits to respond until at least one message is found, or a maximum time threshold is reached. Amazon has a great explanation of long polling vs short polling here. Let's modify our consumer to use long polling:

sqs_consumer.py
...
if __name__ == "__main__":
    signal_handler = SignalHandler()
    while not signal_handler.received_signal:
        messages = dlq.receive_messages(
            MaxNumberOfMessages=10,
            WaitTimeSeconds=1
        )
...

Now Amazon will return up to 10 messages at a time, and wait a maximum of 1 second before returning if no messages are found.

Monitoring queue health

To ensure that our queue stays healthy in production, there are two things we want to alert on:

  1. If the number of messages on the queue exceeds some threshold
  2. If there are messages on the dead letter queue

You can use Datadog's SQS integration to get queue length metrics automatically, but unfortunately those metrics are delayed by several minutes, which isn't great. We want to be alerted right away if something is wrong. Let's write a function that sends the queue length as a custom metric to Datadog:

sqs_consumer.py
...
from datadog import statsd
...

queue = sqs.get_queue_by_name(QueueName="your-queue-name")
dlq = sqs.get_queue_by_name(QueueName="dead-letter-queue")
...

def send_queue_metrics(sqs_queue):
    sqs_queue.load()
    queue_name: str = sqs_queue.attributes["QueueArn"].split(":")[-1]
    queue_length: int = sqs_queue.attributes["ApproximateNumberOfMessages"]
    statsd.gauge("sqs.queue.message_count", queue_length, tags=[f"queue:{queue_name}"])
...

if __name__ == "__main__":
    signal_handler = SignalHandler()
    while not signal_handler.received_signal:
        send_queue_metrics(queue)
        send_queue_metrics(dlq)
        messages = dlq.receive_messages(
            MaxNumberOfMessages=10,
            WaitTimeSeconds=1
        )
...

Optionally, we can use a decorator to send queue metrics every 15 seconds instead of every iteration of the while loop:

sqs_consumer.py
...
def wait(seconds: int):
    def decorator(fun):
        last_run = time.monotonic()

        def new_fun(*args, **kwargs):
            nonlocal last_run
            now = time.monotonic()
            if time.monotonic() - last_run > seconds:
                last_run = now
                return fun(*args, **kwargs)

        return new_fun

    return decorator
...

@wait(seconds=15)
def send_queue_metrics(sqs_queue):
...

Replaying messages from the dead letter queue

Inevitably messages will end up on the dead letter queue for one reason or another. To deal with this, we'll need to be able to reprocess those failed messages. Let's add a second script for this. Luckily we've written reusable pieces of code, so this is going to be easy! Here's our new script replay_dlq.py:

replay_dlq.py
import os
import boto3
from common import SignalHandler, process_message, queue_length

sqs = boto3.resource("sqs")
dlq = sqs.get_queue_by_name(QueueName=os.environ["SQS_DEAD_LETTER_QUEUE_NAME"])

if __name__ == "__main__":
    signal_handler = SignalHandler()
    while queue_length(dlq) and not signal_handler.received_signal:
        print(f"queue length: {queue_length(dlq)}")
        messages = dlq.receive_messages(
            MaxNumberOfMessages=10,
            WaitTimeSeconds=1
        )
        for message in messages:
            process_message(message.body)
            message.delete()

I've done a little refactoring here and put ReceivedSignal, queue_length, and process_message in a shared file common.py.

Couple of things to note:

  • We're not putting the failed messages back on the original queue. That's another valid approach, but this way if there's still a bug we'll see the failure right away instead of having to wait for the usual 3 retries with 30 seconds between each retry.
  • We also removed the try/except around process_message(message_body). This is also to make sure that if there's still a bug, it blows up in our face instead of failing silently.
  • In the while loop condition we're checking while queue_length(dlq). This will make the script exit once the dead letter queue is empty.

Closing remarks

Optionally, you could use asyncio, threading, or the multiprocessing library to increase concurrency and process messages more quickly. From my personal experience, however, I find it easier to just scale up the number of consumer processes using the infrastructure already in place to scale services.

You can find the code on github. Any questions? Feel free to email me at perandre@mailia.co and I'll help ya out.