Persistent Retries¶
localqueue.retry is for code that already receives the work but still needs a
retry budget that survives process restarts.
This is the durable retry layer used by queue workers and by code that already
has its own delivery mechanism. It wraps Tenacity's Retrying and
AsyncRetrying classes. You still configure stop, wait, retry,
callbacks, and retry_with() the same way; the wrapper adds a durable attempt
record before each stop decision.
Best-fit retry workflows:
- cron jobs that call fragile external services
- HTTP handlers or consumers that need durable retry budgets per logical job
- scripts where retry state must outlive the current process
Use this layer when the missing piece is persistent attempt tracking. If you need ack, release, leases, dead-letter records, or queue inspection, start with Persistent queues instead.
The main design constraint is stable identity: each logical job needs a stable retry key so the same work does not retry forever under a new process.
Decorator API¶
from localqueue.retry import key_from_argument, persistent_retry
from tenacity import retry_if_exception_type, stop_after_attempt, wait_fixed
@persistent_retry(
key_fn=key_from_argument("job_id"),
stop=stop_after_attempt(3),
wait=wait_fixed(1),
retry=retry_if_exception_type(ConnectionError),
)
def sync_task(job_id: str) -> str:
return call_remote_service(job_id)
For async functions, use persistent_async_retry.
from localqueue.retry import key_from_argument, persistent_async_retry
from tenacity import stop_after_attempt
@persistent_async_retry(
key_fn=key_from_argument("job_id"),
stop=stop_after_attempt(3),
)
async def async_task(job_id: str) -> dict:
return await fetch_payload(job_id)
Retry policies by exception type¶
Use the retry predicate to make the policy match the failure mode. Keep permanent validation errors out of the retry set and reserve retries for transient failures.
Transient network failures¶
Retry connection problems and timeout-like failures when the remote side is temporarily unavailable.
from localqueue.retry import key_from_argument, persistent_retry
from tenacity import retry_if_exception_type, stop_after_attempt, wait_fixed
@persistent_retry(
key_fn=key_from_argument("job_id"),
stop=stop_after_attempt(5),
wait=wait_fixed(2),
retry=retry_if_exception_type((ConnectionError, TimeoutError)),
)
def send_webhook(job_id: str) -> None:
post_webhook(job_id)
File-system or storage contention¶
Retry contention errors when the local store is busy and the operation can be retried safely.
from localqueue.retry import key_from_argument, persistent_retry
from tenacity import retry_if_exception_type, stop_after_attempt, wait_fixed
@persistent_retry(
key_fn=key_from_argument("job_id"),
stop=stop_after_attempt(4),
wait=wait_fixed(1),
retry=retry_if_exception_type(OSError),
)
def write_report(job_id: str) -> None:
persist_report(job_id)
Permanent validation failures¶
Do not add validation errors to the retry predicate. Let them fail fast so the recorded attempt history stays about recoverable failures, not bad input.
from localqueue.retry import key_from_argument, persistent_retry
from tenacity import retry_if_exception_type, stop_after_attempt
@persistent_retry(
key_fn=key_from_argument("job_id"),
stop=stop_after_attempt(3),
retry=retry_if_exception_type(ConnectionError),
)
def process_invoice(job_id: str, amount: int) -> None:
if amount <= 0:
raise ValueError("amount must be positive")
charge_card(job_id, amount)
ValueError in this example is not retried. Only the transient connection
failure is.
Exhausted retry records can be cleaned up later with the CLI:
This removes only exhausted retry records older than the requested age. It does not touch active records that may still be in use.
If you want a default retention policy, set retry_record_ttl_seconds with
localqueue config set. Then localqueue retry prune --dry-run can preview the
configured age and localqueue retry prune can use it directly.
Rate limiting and breaker behavior live on the worker side, not inside the
retry wrapper. Use min_interval to slow down a worker between messages and
circuit_breaker_failures plus circuit_breaker_cooldown to pause a noisy
worker after repeated recoverable failures.
Use Tenacity wait for spacing between retry attempts inside one call. Use
release_delay when a worker gives the message back to the queue and you want
the next delivery to be delayed. If the handler runtime can approach the lease
limit, increase lease_timeout so the store does not reclaim the message too
early. Those three knobs solve different timing problems.
Retry keys¶
Each persistent retry needs a stable key. Pass key= when the retryer is bound
to one logical job, or pass key_fn= when the key must be derived from call
data. Calls without either option raise ValueError before the wrapped function
is called. Pass key= when the retryer is bound to one logical job.
from localqueue.retry import PersistentRetrying
retryer = PersistentRetrying(key="invoice:1001", max_tries=5)
retryer(generate_invoice)
Use a documented key factory when the key must be derived from call data.
from localqueue.retry import idempotency_key_from_id
from localqueue.retry import key_from_argument
from localqueue.retry import key_from_attr
from localqueue.retry import persistent_retry
def retry_key(fn, args, kwargs) -> str:
tenant = kwargs["tenant_id"]
invoice = kwargs["invoice_id"]
return f"{tenant}:invoice:{invoice}"
@persistent_retry(key_fn=retry_key, max_tries=5)
def export_invoice(*, tenant_id: str, invoice_id: str) -> None:
...
@persistent_retry(key_fn=key_from_argument("job_id"), max_tries=5)
def export_job(*, job_id: str) -> None:
...
@persistent_retry(key_fn=key_from_attr("task", "id", prefix="video"), max_tries=5)
def process_video(task: VideoTask) -> None:
...
@persistent_retry(
key_fn=idempotency_key_from_id("task", prefix="video"),
max_tries=5,
)
def transcode_video(task: VideoTask) -> None:
...
If no key can be derived, the wrapper raises ValueError.
Attempt budgets¶
max_tries= is a convenience alias for stop=stop_after_attempt(...).
from localqueue.retry import key_from_argument, persistent_retry
@persistent_retry(key_fn=key_from_argument("payment_id"), max_tries=4)
def charge_card(payment_id: str) -> None:
...
Do not pass max_tries= and stop= together. If both are supplied, localqueue raises ValueError because there would be two sources of truth for the retry budget.
When a retry budget is exhausted, the attempt record is marked as exhausted. A later call with the same key raises PersistentRetryExhausted before calling the wrapped function again.
from localqueue.retry import PersistentRetryExhausted
try:
charge_card("payment:123")
except PersistentRetryExhausted as exc:
print(exc.key, exc.attempts)
Stores¶
The default attempt store is SQLite at
$XDG_DATA_HOME/localqueue/retries.sqlite3. When XDG_DATA_HOME is not set,
the fallback is ~/.local/share/localqueue/retries.sqlite3.
store_path= creates a SQLite attempt-store file, matching the default backend.
from localqueue.retry import PersistentRetrying, key_from_argument
retryer = PersistentRetrying(
store_path="/var/lib/my-worker/retries.sqlite3",
max_tries=5,
key_fn=key_from_argument("job_id"),
)
Provide a store instance with store= when you need LMDB, full control, or
in-memory tests. Install localqueue[lmdb] when you want the LMDB backend.
from localqueue.retry import LMDBAttemptStore, key_from_argument, persistent_retry
store = LMDBAttemptStore("/var/lib/my-worker/retries")
@persistent_retry(
store=store,
key_fn=key_from_argument("job_id"),
max_tries=2,
)
def flaky(job_id: str) -> str:
...
The CLI retry_store_path setting and queue process --retry-store-path use a
SQLite file path. queue process and queue exec keep one retry-store
connection open for the worker loop and close it when the loop exits.
from localqueue.retry import MemoryAttemptStore, key_from_argument, persistent_retry
store = MemoryAttemptStore()
@persistent_retry(
store=store,
key_fn=key_from_argument("job_id"),
max_tries=2,
wait=lambda state: 0,
)
def flaky(job_id: str) -> str:
...
store= and store_path= are mutually exclusive.
Default retry stores are thread-local. Call close_default_store() when a
long-lived thread is shutting down and you want to close its default SQLite
connection explicitly. At process shutdown, close_default_store(all_threads=True)
closes known factory-created default stores.
State and callbacks¶
Tenacity callbacks and strategies receive a state object that behaves like Tenacity's RetryCallState, with persistent attempt numbering.
attempt_number includes attempts loaded from the store. start_time and seconds_since_start are based on the first persisted attempt for the key.
Clearing and resetting¶
Successful calls clear retry state by default.
from localqueue.retry import PersistentRetrying, key_from_argument
retryer = PersistentRetrying(key_fn=key_from_argument("job_id"), max_tries=3)
result = retryer(process, "job:1")
Use clear_on_success=False when you want to inspect records after success.
retryer = PersistentRetrying(
key_fn=key_from_argument("job_id"),
max_tries=3,
clear_on_success=False,
)
Retryers also expose helpers for operational workflows.
Low-level API¶
Use PersistentRetrying directly when decorators are not a good fit.
from localqueue.retry import PersistentRetrying, key_from_argument
retryer = PersistentRetrying(key_fn=key_from_argument("job_id"), max_tries=5)
result = retryer(run_job, "job:123", {"priority": "high"})
Use PersistentAsyncRetrying for coroutine functions.
from localqueue.retry import PersistentAsyncRetrying, key_from_argument
retryer = PersistentAsyncRetrying(key_fn=key_from_argument("job_id"), max_tries=5)
result = await retryer(run_async_job, "job:123")
The decorated function keeps Tenacity's retry_with() pattern.