When a queue worker exhausts max_attempts, jobs land in mlx:dlq and disappear from ops view unless you build a handler. This recipe classifies failures, alerts humans, and supports safe replay — companion to the webhook receiver.
Architecture
Worker fail (max_attempts) → LPUSH mlx:dlq DLQ consumer → classify error → alert / auto-retry / hold Replay CLI → LPUSH mlx:jobs (reset attempt=1) Observability → dlq_depth, replay_total metrics
DLQ entry schema
{
"job_id": "uuid",
"original_job": { "...full job json..." },
"failed_at": "2026-06-17T14:22:00Z",
"attempt": 3,
"error_class": "transient|permanent|ban",
"error_message": "profile start timeout",
"worker_id": "worker-07",
"profile_id": "mlx-profile-uuid"
}
Push to DLQ (worker side)
import json
import redis
from datetime import datetime, timezone
r = redis.Redis(decode_responses=True)
def move_to_dlq(job: dict, error: Exception, error_class: str):
entry = {
"job_id": job["job_id"],
"original_job": job,
"failed_at": datetime.now(timezone.utc).isoformat(),
"attempt": job.get("attempt", 1),
"error_class": error_class,
"error_message": str(error)[:500],
"worker_id": WORKER_ID,
"profile_id": job.get("profile_id"),
}
r.lpush("mlx:dlq", json.dumps(entry))
release_lease(job["profile_id"], WORKER_ID)
Error classification
| Class | Examples | Action |
|---|---|---|
transient | HTTP 502, CDP timeout, lease conflict | Auto-retry after backoff (max 1 DLQ retry) |
permanent | Invalid profile_id, auth 401, task schema error | Alert ops; hold until manual fix |
ban | 403 on platform, captcha loop, account suspended | Tag CMDB tier=burn; no auto-replay |
def classify_error(exc: Exception) -> str:
msg = str(exc).lower()
if any(x in msg for x in ("403", "banned", "suspended", "captcha")):
return "ban"
if any(x in msg for x in ("401", "invalid profile", "not found")):
return "permanent"
return "transient"
DLQ consumer loop
async def dlq_consumer():
while True:
_, raw = r.brpop("mlx:dlq", timeout=10)
if not raw:
continue
entry = json.loads(raw)
if entry["error_class"] == "ban":
tag_cmdb_burn(entry["profile_id"])
notify_slack(f"🔴 BAN signal: {entry['job_id']}", entry)
r.lpush("mlx:dlq:archive", raw)
continue
if entry["error_class"] == "transient":
job = entry["original_job"]
job["attempt"] = 1
await asyncio.sleep(300) # 5 min backoff
r.lpush("mlx:jobs", json.dumps(job))
continue
# permanent — alert and archive
notify_slack(f"⚠️ DLQ permanent: {entry['job_id']}", entry)
r.lpush("mlx:dlq:hold", raw)
Manual replay CLI
# replay one job from hold queue python -m mlx_ops replay-dlq --job-id 550e8400-e29b-41d4-a716-446655440000 # list hold queue depth python -m mlx_ops dlq-stats
def replay_job(job_id: str):
for key in ("mlx:dlq:hold", "mlx:dlq:archive"):
items = r.lrange(key, 0, -1)
for raw in items:
entry = json.loads(raw)
if entry["job_id"] == job_id:
job = entry["original_job"]
job["attempt"] = 1
r.lpush("mlx:jobs", json.dumps(job))
r.lrem(key, 1, raw)
return True
return False
Metrics & alerts
mlx_dlq_depth— gauge onLLEN mlx:dlq+ hold + archivemlx_dlq_replay_total{result}— counter on replay success/fail- Alert when
dlq_depth > 10for 15 min — see observability guide - Never auto-replay
banclass without human ack
Related
Queue worker
Webhook receiver
Observability
Ban recovery runbook
Debug runbook
CDP reconnect
Code hub
Disclosure: MLX-MMO affiliated with Multilogin.