Today I Learned: 12/09/2023 - Extracting a FastAPI JSON payload from an AWS SQS or SNS->SQS message triggering an AWS Lambda
I have deployed a few asynchronous systems using FastAPI to submit the JSON payload received by the endpoint to an AWS SQS queue or AWS SNS Topic -> SQS queue chain and then process that payload in an AWS Lambda function, triggered by messages arriving in the SQS queue. It hasn’t been as obvious to me as I originally expected what the messages will look like when delivered to the Lambda function so I’ve gone through some trial and error to figure out how to get the JSON payload data into a usable form in the Lambda function. A few useful things I’ve learned:
Batch message processing
When a Lambda is triggered by messages in an SQS queue, the function is invoked on a batch basis. The messages are supplied as a list of one or more JSON dictionaries in the Records key of the event argument supplied as the first argument to the Lambda function’s entrypoint (the argument and variable names here are used in the AWS Python examples, your mileage may vary for other languages). A simple Python example to get each SQS message in turn:
for record in event["Records"]:
# record is a dict
print(record)
SQS -> Lambda
For cases where the FastAPI payload is placed directly into an SQS queue, the JSON payload dictonary is converted to a JSON-valid string and submitted to the SQS queue in the MessageBody key of the SQS SendMessage method. On the lambda side, the payload data is in the body key of each record and can be extracted like so:
data = json.loads(record["body"])
SNS -> SQS -> Lambda
For cases where the FastAPI payload is published to an SNS topic and then posted into an SQS queue from SNS, the JSON payload dictionary is again converted to a JSON-valid string and published to the SNS topic in the Message key of the SNS Topic Publish method. On the lambda side, the payload data needs a few more steps to extract once it has passed through SNS into SQS:
- The body key of each record holds the SNS topic data as a JSON-valid string.
- The Message key inside that JSON holds the payload published from FastAPI, again as a JSON-valid string.
- This inner JSON is itself a JSON valid string which can be converted to a dictionary, which will match the FastAPI payload.
A simple example may help:
Publishing this JSON payload:
{"a": "b",
"c": {"d": "e"}}
to an SNS topic outputting to an SQS queue triggering a Lambda function results in records with a body value like this arriving at the Lambda:
json.loads(record["body"])
{'Type': 'Notification',
'MessageId': 'GUID',
'TopicArn': 'arn:aws:sns:ARN',
'Message': '"{\"a\":\"b\",\"c\":{\"d\":\"e\"}}"',
'Timestamp': '2023-09-12T20:34:23.609Z',
'SignatureVersion': '1',
'Signature': 'BASE64',
'SigningCertURL': 'URL',
'UnsubscribeURL': 'URL'}
The Message value is an escape-laden JSON-valid string. Converting it to JSON yields a simpler string:
json.loads(json.loads(record["body"])["Message"])
'{"a":"b","c":{"d":"e"}}'
Converting that string to JSON yields the original payload. An example of doing this for a batch of messages:
for record in event["Records"]:
data = json.loads(json.loads(json.loads(record["body"])["Message"]))
data.keys()
("a", "c")
Deleting messages
To prevent messages being processed more than once, the Lambda function should remove them once they have been successfully processed - for Python Lambdas boto3 is always included and can be used to delete messages.