A simple task processor for Amazon SQS
Project description
pysqsx
A simple task processor for Amazon SQS.
quickstart
For this demonstration we will use elasticmq locally using docker:
docker run --name pysqsx-elasticmq -p 9324:9324 -d softwaremill/elasticmq-native
Install the package:
pip install sqsx
Now let's create a script that will create some tasks and we will consume them:
# file script.py
import logging
import boto3
from sqsx import Queue
# configure the logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
# create the sqs_client
queue_url = "http://localhost:9324/000000000000/tests"
queue_name = "tests"
sqs_client = boto3.client(
"sqs",
endpoint_url="http://localhost:9324",
region_name="elasticmq",
aws_secret_access_key="x",
aws_access_key_id="x",
use_ssl=False,
)
# create the new sqs queue
sqs_client.create_queue(QueueName=queue_name)
# create the sqsx.Queue
queue = Queue(url=queue_url, sqs_client=sqs_client)
# add a new task
queue.add_task("my_task", a=1, b=2, c=3)
# create the task handler, the first argument must be the context
def task_handler(context, a, b, c):
print(f"context={context}, a={a}, b={b}, c={c}")
# add a new task handler
queue.add_task_handler("my_task", task_handler)
# start the consumption of tasks, to stop press ctrl+c to exit gracefully
queue.consume_tasks()
Running the script:
python script.py
INFO:sqsx.queue:Starting consuming tasks, queue_url=http://localhost:9324/000000000000/tests
context={'queue_url': 'http://localhost:9324/000000000000/tests', 'task_name': 'my_task', 'sqs_message': {'MessageId': '0c126462-0184-485b-b66f-77ed0b6f3780', 'ReceiptHandle': '0c126462-0184-485b-b66f-77ed0b6f3780#2959620c-7ead-4e12-80a7-672530c43f26', 'MD5OfBody': '8087eb7436895841c5d646156a8a469f', 'Body': 'eyJrd2FyZ3MiOiB7ImEiOiAxLCAiYiI6IDIsICJjIjogM319', 'Attributes': {'SentTimestamp': '1702527171600', 'ApproximateReceiveCount': '1', 'ApproximateFirstReceiveTimestamp': '1702527171603', 'SenderId': '127.0.0.1'}, 'MD5OfMessageAttributes': '5346f2cd7c539a880febaf9112a86921', 'MessageAttributes': {'TaskName': {'StringValue': 'my_task', 'DataType': 'String'}}}}, a=1, b=2, c=3
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
^CINFO:sqsx.queue:Starting graceful shutdown process
INFO:sqsx.queue:Stopping consuming tasks, queue_url=http://localhost:9324/000000000000/tests
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
sqsx-0.1.0.tar.gz
(7.2 kB
view details)
Built Distribution
sqsx-0.1.0-py3-none-any.whl
(8.5 kB
view details)
File details
Details for the file sqsx-0.1.0.tar.gz
.
File metadata
- Download URL: sqsx-0.1.0.tar.gz
- Upload date:
- Size: 7.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.12.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f02c777ad67a6b39b97b43a225498986fb0fddd7c283f782bc01eafa60368305 |
|
MD5 | aa9bce73132531c30c371d9eb02c2920 |
|
BLAKE2b-256 | 074df046e354bc5cdeaa8156cec8ffb5be7403cf7759542e040478071c670bd9 |
File details
Details for the file sqsx-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: sqsx-0.1.0-py3-none-any.whl
- Upload date:
- Size: 8.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.12.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5ddad0322d073e5464947fd6534e124dda76348bf1e4517f61b8f7cd287bd8dd |
|
MD5 | 1e02a4d228f34452ab2189d2aa2ac984 |
|
BLAKE2b-256 | c45e7ae9bedb66c034d25a276394b7f5c137cfedff275c26b549de5ecec74f07 |