Python async api for creating and managing queues in postgres

Travis CI PyPI version Documentation Status codecov Code style: black

Postgres is not best solution for storing and managing queues, but sometimes we have no choice.

async-pq can work with millions of entities and thousands of requests in one queue. Is used in production. So its well tested and stable.

Install

> pip install async-pq

Quick start

To work with async-pq we need asyncpg library:

import asyncpg

conn = await asyncpg.connect('postgresql://postgres@localhost/test')

QueueFabric.find_queue method will create needed tables (one for requests and one for messages) in database if it is new queue. Also it has is_exists_queue method for situations when you need to know that it is exists or not and will be the new queue.

from async_pq import Queue, QueueFabric

queue: Queue = await QueueFabric(conn).find_queue('items')

Operations with queue

Put new items (should be dumped JSONs) in queue:

await queue.put('{"id":1,"data":[1,2,3]}', '{"id":2,"data":[3,2,6]}')

Pop items from queue with some limit. It will create one request and return its id. It is useful when you want to use acknowledgement pattern:

# If with_ack=False (default from > 0.2.1), massage will be acknowledged in place automatically
request_id, data = await queue.pop(limit=2, with_ack=True)

To acknowledge request use ack method:

# returns False if request does not found or acked already
is_acked: bool = await queue.ack(request_id)

Or vice versa:

# returns False if request does not found or acked already
is_unacked: bool = await queue.unack(request_id)

You can return unacknowledged massages older than timeout seconds (default limit=1000 requests) to queue:

requests_number = await queue.return_unacked(timeout=300, limit=500)

Clean queue (delete acknowledged massages) to not overfill database with old data (default limit= messages of 1000 requests):

requests_number = await queue.clean_acked_queue(limit=500)