API Reference¶
This page lists the public symbols exported by localqueue and
localqueue.retry.
localqueue¶
Queue classes¶
PersistentQueue¶
Persistent queue with queue.Queue-style methods and explicit message methods.
Constructor options:
| Option | Meaning |
|---|---|
name |
queue name inside the store |
store |
queue store instance |
store_path |
explicit path for the SQLite queue store |
lease_timeout |
seconds before an inflight message is redelivered |
lease_policy |
lease and visibility-timeout behavior; defaults from lease_timeout |
maxsize |
maximum number of ready messages; 0 means unbounded |
retry_defaults |
Tenacity retry keyword defaults inherited by workers |
semantics |
descriptive queue semantics; defaults to LOCAL_AT_LEAST_ONCE |
locality_policy |
locality behavior; defaults to LOCAL_QUEUE_PLACEMENT |
acknowledgement_policy |
acknowledgement behavior; defaults to EXPLICIT_ACKNOWLEDGEMENT |
dead_letter_policy |
dead-letter behavior; defaults to DEAD_LETTER_QUEUE |
deduplication_policy |
deduplication behavior; defaults to DEDUPE_KEY_SUPPORT |
consumption_policy |
consumption behavior; defaults to PULL_CONSUMPTION |
dispatch_policy |
in-process dispatch behavior; defaults to NO_DISPATCHER |
notification_policy |
wake-up notification behavior; defaults to NO_NOTIFICATION |
delivery_policy |
delivery behavior; defaults to AT_LEAST_ONCE_DELIVERY |
ordering_policy |
ready-message ordering behavior; defaults to FIFO_READY_ORDERING |
routing_policy |
message routing behavior; defaults to POINT_TO_POINT_ROUTING |
subscription_policy |
subscription fanout behavior; defaults to NO_SUBSCRIPTIONS |
backpressure |
strategy object for capacity checks; defaults from maxsize |
policy_set |
reusable bundle of queue policies; conflicts with explicit policy options |
Core methods:
| Method | Meaning |
|---|---|
put(item, block=True, timeout=None, delay=0.0, priority=0) |
enqueue an item |
put_nowait(item) |
enqueue without blocking |
get(block=True, timeout=None, leased_by=None) |
get only the value |
get_nowait() |
get without blocking |
get_message(block=True, timeout=None, leased_by=None) |
lease and return a QueueMessage |
inspect(message_id) |
read one message without leasing it |
ack(message) |
remove a message permanently |
release(message, delay=0.0, error=None) |
return a message to ready delivery, optionally recording the failure |
dead_letter(message, error=None) |
move a message out of normal delivery, optionally recording the failure |
task_done() |
acknowledge the oldest unfinished get() message |
join() |
wait for unfinished get() messages |
qsize() |
count ready messages |
stats() |
count ready, delayed, inflight, dead, and total messages |
dead_letters(limit=None) |
list dead-letter messages |
requeue_dead(message, delay=0.0) |
return a dead-letter message to ready delivery |
prune_dead_letters(older_than) |
remove dead letters older than the given age |
count_dead_letters_older_than(older_than) |
count dead letters older than the given age without removing them |
empty() |
whether there are no ready messages |
full() |
whether ready capacity is reached |
purge() |
remove all queue records |
QueueSemantics¶
Descriptive value object for the queueing concepts implemented by a
configuration. The default LOCAL_AT_LEAST_ONCE describes the current
PersistentQueue behavior: local storage, at-least-once delivery,
point-to-point routing, pull consumption, ready-order delivery, leases,
acknowledgements, dead letters, dedupe-key support, no subscriptions, and no
notifications.
QueuePolicySet¶
Reusable bundle for queue policies. Pass policy_set= to PersistentQueue when
you want to keep locality, leases, acknowledgements, dead letters, delivery,
ordering, routing, consumption, dispatch, notifications, semantics, and
backpressure choices together as one configuration object. Explicit constructor
options remain available and conflict with the same option inside the policy set
so configuration stays unambiguous.
QueuePolicySet.at_least_once(...), QueuePolicySet.at_most_once(...), and
QueuePolicySet.effectively_once(...) build common delivery bundles with
optional locality, lease, acknowledgement, dead-letter, deduplication,
consumption, dispatch, notification, ordering, routing, subscription, and
backpressure policies. The effectively-once factory also accepts idempotency,
result, and commit policies.
QoS¶
Small enum for fluent queue specs. The built-in values are
QoS.AT_LEAST_ONCE, QoS.AT_MOST_ONCE, and QoS.EFFECTIVELY_ONCE.
QueueSpec¶
Fluent builder for common queue and worker settings. Use it when you want one
object to describe queue delivery defaults plus worker retry and pacing
defaults, then build the runtime objects explicitly with build_queue() and
build_worker_config(). PersistentQueue.from_spec(spec) and
PersistentQueue(spec=spec) are the matching queue entry points when you want
the queue type to own construction. Use with_name(...) when one base spec
should be reused across multiple queue names.
with_qos(...) maps to the queue delivery policy, with_dead_letter_queue()
maps to the queue dead-letter policy, with_retry(...) stores retry defaults
for the queue and worker config, and with_dead_letter_on_failure(...),
with_release_delay(...), with_min_interval(...), and
with_circuit_breaker(...) map to
PersistentWorkerConfig.
LocalityPolicy¶
Protocol for naming where queue state lives relative to the process using the queue.
LocalQueuePlacement¶
Locality policy used by default. It describes the current queue behavior: queue state is co-located with the process host and no network boundary is crossed to operate on the queue store.
RemoteQueuePlacement¶
Locality policy for queue definitions backed by a remote boundary. It names
locality="remote" in the queue configuration and makes the placement explicit
without turning the built-in local store into a distributed broker.
LeasePolicy¶
Protocol for naming how messages are leased while a consumer is processing them.
FixedLeaseTimeout¶
Lease policy used by default. FixedLeaseTimeout(timeout=30.0) means an
inflight message can be redelivered after the fixed visibility timeout expires.
The lease_timeout= constructor option remains the simple shortcut for this
policy.
AcknowledgementPolicy¶
Protocol for naming how completed messages are acknowledged.
ExplicitAcknowledgement¶
Acknowledgement policy used by default. It describes the current queue behavior: consumers explicitly acknowledge successful work, and acknowledged messages are removed from normal queue storage.
DeadLetterPolicy¶
Protocol for naming how failed messages leave normal delivery.
DeadLetterQueue¶
Dead-letter policy used by default. It describes the current queue behavior: failed messages can be moved to inspectable dead-letter storage and later requeued.
DeduplicationPolicy¶
Protocol for naming whether the queue tracks stable dedupe keys.
DedupeKeySupport¶
Deduplication policy used by default. It describes the current queue behavior:
messages may carry a stable dedupe_key and duplicate keys can reuse the same
stored message.
NoDeduplication¶
Deduplication policy for queues that should not accept stable dedupe keys. It
names the deduplication=False contract explicitly while leaving the rest of
the queue configuration unchanged.
AtLeastOnceDelivery¶
Delivery policy used by default. It describes the current queue behavior: messages are leased before handling, acknowledged after successful handling, and redelivered if the lease expires before acknowledgement.
AtMostOnceDelivery¶
Delivery policy for workflows that prefer losing a message over processing it
more than once. The queue removes the message before returning it from get() or
get_message(), so handler failures are not redelivered or dead-lettered.
EffectivelyOnceDelivery¶
Delivery policy for idempotent workflows. It keeps the at-least-once processing
mechanics, but requires dedupe_key on put() so producers always provide a
stable identity for the work item. This is the base contract for higher-level
effectively-once features such as idempotency ledgers or cached results.
Pass idempotency_store= when you want to attach an explicit store interface for
dedupe/result coordination. When an attached store already marks a
dedupe_key as succeeded, worker helpers acknowledge the duplicate delivery and
skip handler execution. Until a future ResultPolicy exists, that short-circuit
path returns None by default. Pass result_policy=ReturnStoredResult() to
persist the successful handler result inline in the idempotency ledger and
return it on duplicate delivery. Pass result_store= to ReturnStoredResult
when you want result storage to live outside the idempotency ledger.
Pass commit_policy= when you want to name the coordination model explicitly.
LocalAtomicCommit is the default and matches the current local flow: the
worker records the final result and then acknowledges the queue message. The
other built-in policies are descriptive ports for outbox, two-phase, and
saga-style coordination.
NoResultPolicy¶
Default result policy for EffectivelyOnceDelivery. It keeps ledger state, but
does not persist or replay handler results.
ReturnStoredResult¶
Result policy for EffectivelyOnceDelivery that stores successful handler
results inline in the idempotency ledger and returns the cached value when a
duplicate delivery is skipped. When result_store= is attached, the policy saves
the handler result there and only keeps a result_key in the idempotency
ledger.
CommitPolicy¶
Protocol for naming how a successful handler result is coordinated with queue acknowledgement and external side effects.
LocalAtomicCommit, TransactionalOutboxCommit, TwoPhaseCommit, SagaCommit¶
Built-in commit policy variants. LocalAtomicCommit is the default. The other
variants are explicit ports for outbox, two-phase, and saga-style coordination.
TransactionalOutboxCommit accepts outbox_store= for the durable outbox
envelope.
TwoPhaseCommit accepts prepare_store= and commit_store= for explicit
prepare/commit envelopes.
SagaCommit accepts saga_store= for forward and compensation envelopes.
localqueue.results¶
ResultStore¶
Protocol for loading, saving, and deleting cached worker results by key.
MemoryResultStore, SQLiteResultStore, LMDBResultStore¶
Built-in result store adapters for in-memory, SQLite, and LMDB-backed cached worker results.
PullConsumption¶
Consumption policy used by default. It describes the current queue behavior:
workers explicitly request messages with get(), get_message(), or the worker
helpers. Producers only enqueue work; they do not invoke handlers directly.
PushConsumption¶
Consumption policy for workflows that model push-based delivery, where a
producer or dispatcher invokes handlers instead of workers polling for messages.
It names the concept with pattern="push" and can be used in policy sets while
the current built-in worker helpers remain pull-based.
DispatchPolicy¶
Protocol for naming how in-process handlers are invoked after messages are enqueued.
NoDispatcher¶
Dispatch policy used by default. Producers only persist messages; workers or callers still pull work explicitly.
CallbackDispatcher¶
Dispatch policy for local push workflows. It calls one or more in-process
handlers after put() persists a message. This is not cross-process
notification; external wake-up mechanisms should be modeled by a notification
adapter.
NotificationPolicy¶
Protocol for naming how a queue wakes listeners after a message is persisted.
NoNotification¶
Notification policy used by default. The queue remains quiet after put()
unless an explicit notification policy is attached.
CallbackNotification¶
Notification policy for local push or polling hybrids. It calls in-process listeners after persistence, which can be used to wake a worker loop or refresh an in-memory subscriber. Cross-process wake-up still belongs in a separate adapter.
InProcessNotification¶
Notification policy backed by threading.Event. put() sets the event after
the message is persisted, and local threads can call wait() and clear() on
the policy to coordinate producer wake-up with consumer polling. It is
in-process only and does not wake other processes.
PointToPointRouting¶
Routing policy used by default. It describes the current queue behavior: each message is leased to one consumer at a time, and publishing a message does not fan it out to multiple independent subscriber queues.
PublishSubscribeRouting¶
Routing policy for workflows that model publish/subscribe fanout. It names the
concept explicitly with pattern="publish-subscribe" and fanout=True. When
combined with a subscription policy that names subscribers, put() fans out
one durable message into each physical subscriber queue.
SubscriptionPolicy¶
Protocol for naming whether a queue definition has subscribers and whether messages should be fanned out.
NoSubscriptions¶
Subscription policy used by default. It keeps point-to-point queue definitions simple: there are no named subscribers and no fanout contract.
StaticFanoutSubscriptions¶
Subscription policy for publish/subscribe definitions with a fixed subscriber
set. Combined with PublishSubscribeRouting, it materializes one physical queue
per subscriber, for example events.billing and events.audit.
FifoReadyOrdering¶
Ordering policy used by default. It describes the current store ordering:
messages become eligible by available_at, and messages with the same
availability keep enqueue order.
PriorityOrdering¶
Ordering policy for queues that should deliver higher-priority ready messages
first. Pass ordering_policy=PriorityOrdering() to PersistentQueue, then use
put(..., priority=n) with non-negative integer priorities. Higher numbers are
delivered before lower numbers when messages are available at the same time.
Messages with the same priority keep enqueue order.
BestEffortOrdering¶
Ordering policy for workflows that do not require stable delivery order. It
names guarantee="best-effort" in the queue configuration and avoids promising
ready-before-delayed, same-timestamp stability, or priority ordering.
BoundedBackpressure¶
Capacity strategy used by PersistentQueue.full() and blocking put() calls.
BoundedBackpressure(maxsize=0) is unbounded. Positive values cap the number of
ready messages, matching the existing maxsize constructor option. The default
overflow="block" preserves queue.Queue-style producer behavior.
RejectingBackpressure¶
Capacity strategy for producers that should fail immediately when the queue is
full. It uses overflow="reject", so put() raises queue.Full instead of
waiting for capacity even when block=True.
PersistentWorkerConfig¶
Reusable configuration for persistent_worker() and persistent_async_worker().
It keeps worker behavior and retry options together so the same policy can be
shared by multiple queues.
from localqueue import PersistentWorkerConfig
from tenacity import retry_if_exception_type, wait_fixed
config = PersistentWorkerConfig(
max_tries=3,
wait=wait_fixed(1),
retry=retry_if_exception_type(ConnectionError),
dead_letter_on_failure=False,
release_delay=30,
)
Constructor options:
| Option | Meaning |
|---|---|
dead_letter_on_failure |
dead-letter final handler failures when True |
dead_letter_on_exhaustion |
compatibility alias for dead_letter_on_failure |
release_delay |
delay used when releasing failed messages |
min_interval |
minimum seconds to wait between worker message starts |
circuit_breaker_failures |
consecutive recoverable failures before pausing the worker |
circuit_breaker_cooldown |
pause duration after the breaker opens |
**retry_kwargs |
forwarded to PersistentRetrying |
Queue-level retry defaults can also be attached to PersistentQueue and are
merged into worker retry kwargs before explicit worker overrides. That keeps
shared queue policies close to the queue definition while still letting a
worker override a specific retry parameter when needed.
min_interval is a per-worker rate limit. Set it when a handler talks to an
external service that should not be hit back-to-back. The circuit-breaker pair
(circuit_breaker_failures + circuit_breaker_cooldown) opens after repeated
recoverable failures and pauses the worker before it fetches the next message.
QueueMessage¶
Dataclass returned by put() and get_message().
| Field | Meaning |
|---|---|
id |
generated message id |
value |
Python value stored in the queue |
queue |
queue name |
state |
current message state: ready, inflight, or dead |
attempts |
delivery attempt count |
created_at |
creation timestamp |
available_at |
earliest delivery timestamp |
priority |
non-negative priority; higher values are delivered first with PriorityOrdering |
leased_until |
lease expiration timestamp, if inflight |
leased_by |
optional worker id that currently owns the lease, if inflight |
dedupe_key |
optional idempotency key used to reuse the same stored message |
attempt_history |
list of lease and outcome events recorded for this message |
last_error |
structured error from the most recent failed processing attempt, if recorded |
failed_at |
timestamp for last_error, if recorded |
For localqueue queue exec failures, last_error also includes command,
exit_code, stdout, and stderr fields so command workers can be inspected
from queue inspect and queue dead. dedupe_key is returned on inspection
and lets repeated enqueues reuse the same message until it is acknowledged or
cleaned up. attempt_history shows the lease and terminal events that led to
the current state.
QueueStats¶
Dataclass returned by stats().
| Field | Meaning |
|---|---|
ready |
messages available for immediate delivery |
delayed |
ready-state messages whose available_at is in the future |
inflight |
leased messages not yet acknowledged, released, or dead-lettered |
dead |
dead-letter messages hidden from normal delivery |
total |
all messages still stored for the queue |
by_worker_id |
current inflight counts grouped by leased_by |
leases_by_worker_id |
historical lease counts grouped by leased_by, a coarse throughput proxy |
last_seen_by_worker_id |
most recent heartbeat timestamp for each recorded worker id |
oldest_ready_age_seconds |
age of the oldest ready message currently waiting |
oldest_inflight_age_seconds |
age of the oldest current inflight lease |
average_inflight_age_seconds |
average age across current inflight leases |
Worker decorators¶
persistent_worker(queue, config=None, **retry_kwargs)¶
Builds a queue consumer around PersistentRetrying. The leased message id is
used as the persistent retry key. Worker handlers receive message.value as
their first argument.
from localqueue import PersistentWorkerConfig, persistent_worker
config = PersistentWorkerConfig(max_tries=3)
@persistent_worker(queue, config=config)
def handle(payload: dict) -> None:
...
Direct keyword arguments are still accepted and override values from config.
persistent_async_worker(queue, config=None, **retry_kwargs)¶
Async equivalent backed by PersistentAsyncRetrying.
Options:
| Option | Meaning |
|---|---|
dead_letter_on_failure |
dead-letter final handler failures when True |
dead_letter_on_exhaustion |
compatibility alias for dead_letter_on_failure |
release_delay |
delay used when releasing failed messages |
**retry_kwargs |
forwarded to PersistentRetrying |
Queue stores¶
QueueStore¶
Protocol for custom queue stores.
SQLiteQueueStore¶
SQLite-backed queue store. This is the default backend. Records are serialized as versioned JSON; values must be JSON-serializable.
The SQLite store tracks its on-disk schema version with PRAGMA user_version.
Current releases migrate older compatible versions and reject future versions
they do not know how to migrate yet.
LMDBQueueStore¶
LMDB-backed queue store. Records are serialized as versioned JSON; values must be JSON-serializable.
MemoryQueueStore¶
Thread-safe in-memory queue store for tests.
QueueStoreLockedError¶
Raised when LMDB reports that the queue store is locked by another process.
Install localqueue[cli] when you want the CLI entry points, and
localqueue[lmdb] when you want the LMDB queue store backend.
localqueue.idempotency¶
IdempotencyRecord¶
Value object stored by idempotency adapters. It tracks status,
first_seen_at, optional completed_at, optional result_key, and free-form
metadata.
IdempotencyStore¶
Protocol for loading, saving, deleting, and pruning idempotency records by stable key.
Built-in stores¶
MemoryIdempotencyStoreSQLiteIdempotencyStoreLMDBIdempotencyStore
localqueue.retry¶
Retry decorators¶
persistent_retry(**kwargs)¶
Creates a decorator backed by PersistentRetrying.
from localqueue.retry import key_from_argument, persistent_retry
@persistent_retry(key_fn=key_from_argument("job_id"), max_tries=3)
def run(job_id: str) -> None:
...
persistent_async_retry(**kwargs)¶
Creates a decorator backed by PersistentAsyncRetrying.
from localqueue.retry import key_from_argument, persistent_async_retry
@persistent_async_retry(key_fn=key_from_argument("job_id"), max_tries=3)
async def run(job_id: str) -> None:
...
Both decorators require key= or key_fn= and accept the persistent options below plus Tenacity options such as stop, wait, retry, before, after, before_sleep, retry_error_callback, and reraise.
| Option | Meaning |
|---|---|
store |
attempt store instance |
store_path |
explicit path for a SQLite attempt-store file |
key |
fixed retry key |
key_fn |
function that derives a retry key from the call |
clear_on_success |
delete the attempt record after success |
max_tries |
alias for stop_after_attempt(max_tries) |
Key helpers¶
key_from_argument(name)¶
Creates a key_fn that reads the retry key from a named function argument.
from localqueue.retry import key_from_argument, persistent_retry
@persistent_retry(key_fn=key_from_argument("job_id"))
def run(*, job_id: str) -> None:
...
key_from_attr(argument_name, attribute_name, *, prefix=None)¶
Creates a key_fn that reads an attribute from a named function argument. Pass
prefix= when the same store may contain keys for different task domains.
from localqueue.retry import key_from_attr, persistent_retry
@persistent_retry(key_fn=key_from_attr("task", "id", prefix="video"))
def run(task: VideoTask) -> None:
...
idempotency_key_from_id(argument_name, *, prefix=None)¶
Shortcut for key_from_attr(argument_name, "id", prefix=prefix).
from localqueue.retry import idempotency_key_from_id, persistent_retry
@persistent_retry(key_fn=idempotency_key_from_id("task", prefix="video"))
def run(task: VideoTask) -> None:
...
Retry classes¶
PersistentRetrying¶
Synchronous retryer. It composes Tenacity's Retrying internally and exposes the
same call/decorator flow.
from localqueue.retry import PersistentRetrying, key_from_argument
retryer = PersistentRetrying(key_fn=key_from_argument("job_id"), max_tries=5)
retryer(fn, "job:1")
Methods:
| Method | Meaning |
|---|---|
get_record(key) |
load the persisted RetryRecord, if any |
reset(key) |
delete the persisted retry record |
copy(**kwargs) |
copy the retryer, preserving persistent settings |
PersistentAsyncRetrying¶
Async retryer. It accepts coroutine functions and supports coroutine Tenacity callbacks and strategies where Tenacity supports them.
from localqueue.retry import PersistentAsyncRetrying, key_from_argument
retryer = PersistentAsyncRetrying(key_fn=key_from_argument("job_id"), max_tries=5)
await retryer(fn, "job:1")
PersistentRetryExhausted¶
Raised before the wrapped function is called when a persisted key is already exhausted.
Attributes:
| Attribute | Meaning |
|---|---|
key |
exhausted retry key |
attempts |
persisted attempt count |
Attempt stores¶
RetryRecord¶
Dataclass stored per retry key.
| Field | Type | Meaning |
|---|---|---|
attempts |
int |
number of failed attempts recorded |
first_attempt_at |
float |
Unix timestamp of the first persisted attempt |
exhausted |
bool |
whether the retry budget is exhausted |
close_default_store(all_threads=False)¶
Close the default retry store for the current thread. Pass all_threads=True
at process shutdown to close known factory-created default stores.
AttemptStore¶
Protocol for custom attempt stores.
class AttemptStore:
def load(self, key: str) -> RetryRecord | None: ...
def save(self, key: str, record: RetryRecord) -> None: ...
def delete(self, key: str) -> None: ...
def prune_exhausted(self, *, older_than: float, now: float) -> int: ...
LMDBAttemptStore¶
LMDB-backed attempt store.
from localqueue.retry import LMDBAttemptStore
store = LMDBAttemptStore("/var/lib/my-worker/retries")
SQLiteAttemptStore¶
SQLite-backed attempt store. This is the default backend and does not require
LMDB's native dependency. Retention cleanup uses indexed exhausted and
first_attempt_at columns instead of scanning every serialized retry record.
from localqueue.retry import SQLiteAttemptStore
store = SQLiteAttemptStore("/var/lib/my-worker/retries.sqlite3")
MemoryAttemptStore¶
Thread-safe in-memory attempt store for tests and local scenarios.
AttemptStoreLockedError¶
Raised when LMDB reports that the attempt store is locked by another process.