Skip to main content

A simple task processor for Amazon SQS

Project description

pysqsx

Tests PyPI - Version PyPI - Python Version GitHub License

A simple task processor for Amazon SQS.

Quickstart

For this demonstration we will use elasticmq locally using docker:

docker run --name pysqsx-elasticmq -p 9324:9324 -d softwaremill/elasticmq-native

Install the package:

pip install sqsx

Working with sqsx.Queue

We use sqsx.Queue when we need to work with scheduling and consuming tasks.

Now let's create a script that will create a new task and we will consume them:

# file script.py
import logging

import boto3

from sqsx import Queue

# configure the logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)

# create the sqs_client
queue_url = "http://localhost:9324/000000000000/tests"
queue_name = "tests"
sqs_client = boto3.client(
    "sqs",
    endpoint_url="http://localhost:9324",
    region_name="elasticmq",
    aws_secret_access_key="x",
    aws_access_key_id="x",
    use_ssl=False,
)

# create the new sqs queue
sqs_client.create_queue(QueueName=queue_name)

# create the sqsx.Queue
queue = Queue(url=queue_url, sqs_client=sqs_client)

# add a new task
queue.add_task("my_task", a=1, b=2, c=3)

# create the task handler, which must be a simple function like this
def task_handler(context: dict, a: int, b: int, c: int):
    print(f"context={context}, a={a}, b={b}, c={c}")

# add a new task handler
queue.add_task_handler("my_task", task_handler)

# start the consumption of messages, to stop press ctrl+c to exit gracefully
queue.consume_messages()

Running the script:

python script.py
INFO:sqsx.queue:Starting consuming tasks, queue_url=http://localhost:9324/000000000000/tests
context={'queue_url': 'http://localhost:9324/000000000000/tests', 'task_name': 'my_task', 'sqs_message': {'MessageId': '42513c2d-ac93-4701-bb63-83b45e6fe2ca', 'ReceiptHandle': '42513c2d-ac93-4701-bb63-83b45e6fe2ca#6eb5443b-a2eb-454e-8619-86f6d2e67561', 'MD5OfBody': '8087eb7436895841c5d646156a8a469f', 'Body': 'eyJrd2FyZ3MiOiB7ImEiOiAxLCAiYiI6IDIsICJjIjogM319', 'Attributes': {'SentTimestamp': '1702573178736', 'ApproximateReceiveCount': '1', 'ApproximateFirstReceiveTimestamp': '1702573178740', 'SenderId': '127.0.0.1'}, 'MD5OfMessageAttributes': '5346f2cd7c539a880febaf9112a86921', 'MessageAttributes': {'TaskName': {'StringValue': 'my_task', 'DataType': 'String'}}}}, a=1, b=2, c=3
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
^CINFO:sqsx.queue:Starting graceful shutdown process
INFO:sqsx.queue:Stopping consuming tasks, queue_url=http://localhost:9324/000000000000/tests

Working with sqsx.RawQueue

We use sqsx.RawQueue when we need to work with one handler consuming all the queue messages.

Now let's create a script that will create a new message and we will consume them:

# file raw_script.py
import logging

import boto3

from sqsx import RawQueue

# configure the logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)

# create the sqs_client
queue_url = "http://localhost:9324/000000000000/tests"
queue_name = "tests"
sqs_client = boto3.client(
    "sqs",
    endpoint_url="http://localhost:9324",
    region_name="elasticmq",
    aws_secret_access_key="x",
    aws_access_key_id="x",
    use_ssl=False,
)

# create the new sqs queue
sqs_client.create_queue(QueueName=queue_name)

# create the message handler, which must be a simple function like this
def message_handler(queue_url: str, sqs_message: dict):
    print(f"queue_url={queue_url}, sqs_message={sqs_message}")

# create the sqsx.Queue
queue = RawQueue(url=queue_url, message_handler_function=message_handler, sqs_client=sqs_client)

# add a new message
queue.add_message(
    message_body="My Message",
    message_attributes={"Attr1": {"DataType": "String", "StringValue": "Attr1"}},
)

# start the consumption of messages, to stop press ctrl+c to exit gracefully
queue.consume_messages()

Running the script:

INFO:sqsx.queue:Starting consuming tasks, queue_url=http://localhost:9324/000000000000/tests
queue_url=http://localhost:9324/000000000000/tests, sqs_message={'MessageId': 'fb2ed6cf-9346-4ded-8cfe-4fc297f95928', 'ReceiptHandle': 'fb2ed6cf-9346-4ded-8cfe-4fc297f95928#bd9f27a6-0a73-4d27-9c1e-0947f21d3c02', 'MD5OfBody': '069840f6917e85a02167febb964f0041', 'Body': 'My Message', 'Attributes': {'SentTimestamp': '1702573585302', 'ApproximateReceiveCount': '1', 'ApproximateFirstReceiveTimestamp': '1702573585306', 'SenderId': '127.0.0.1'}, 'MD5OfMessageAttributes': '90f34a800b9d242c1b32320e4a3ed630', 'MessageAttributes': {'Attr1': {'StringValue': 'Attr1', 'DataType': 'String'}}}
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
^CINFO:sqsx.queue:Starting graceful shutdown process
INFO:sqsx.queue:Stopping consuming tasks, queue_url=http://localhost:9324/000000000000/tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqsx-0.2.0.tar.gz (8.1 kB view details)

Uploaded Source

Built Distribution

sqsx-0.2.0-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file sqsx-0.2.0.tar.gz.

File metadata

  • Download URL: sqsx-0.2.0.tar.gz
  • Upload date:
  • Size: 8.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.12.0

File hashes

Hashes for sqsx-0.2.0.tar.gz
Algorithm Hash digest
SHA256 4fda9bba17639d0ba0f7862713dcdb9ea6213966f25bda056b87c56334dc802a
MD5 e06f841b7dce192761c94581330187f1
BLAKE2b-256 f311dd46749025fe77e142854328b5d97e76213f2e5423f5a5292c92296110a3

See more details on using hashes here.

File details

Details for the file sqsx-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: sqsx-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 9.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.12.0

File hashes

Hashes for sqsx-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bcf493f1e52b18905538c79539bd4f12872cc648df18b1f5de9624a6de45e72d
MD5 dc74e0a8503b327ac74cde095d9a3b2d
BLAKE2b-256 ab48228574b4a874b9e485c6306c98a0425f958f793fc35f5861116e44121dc2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page