When a queue worker exhausts max_attempts, jobs land in mlx:dlq and disappear from ops view unless you build a handler. This recipe classifies failures, alerts humans, and supports safe replay — companion to the webhook receiver.

Architecture

Worker fail (max_attempts) → LPUSH mlx:dlq
DLQ consumer           → classify error → alert / auto-retry / hold
Replay CLI             → LPUSH mlx:jobs (reset attempt=1)
Observability            → dlq_depth, replay_total metrics

DLQ entry schema

{
  "job_id": "uuid",
  "original_job": { "...full job json..." },
  "failed_at": "2026-06-17T14:22:00Z",
  "attempt": 3,
  "error_class": "transient|permanent|ban",
  "error_message": "profile start timeout",
  "worker_id": "worker-07",
  "profile_id": "mlx-profile-uuid"
}

Push to DLQ (worker side)

import json
import redis
from datetime import datetime, timezone

r = redis.Redis(decode_responses=True)

def move_to_dlq(job: dict, error: Exception, error_class: str):
    entry = {
        "job_id": job["job_id"],
        "original_job": job,
        "failed_at": datetime.now(timezone.utc).isoformat(),
        "attempt": job.get("attempt", 1),
        "error_class": error_class,
        "error_message": str(error)[:500],
        "worker_id": WORKER_ID,
        "profile_id": job.get("profile_id"),
    }
    r.lpush("mlx:dlq", json.dumps(entry))
    release_lease(job["profile_id"], WORKER_ID)

Error classification

ClassExamplesAction
transientHTTP 502, CDP timeout, lease conflictAuto-retry after backoff (max 1 DLQ retry)
permanentInvalid profile_id, auth 401, task schema errorAlert ops; hold until manual fix
ban403 on platform, captcha loop, account suspendedTag CMDB tier=burn; no auto-replay
def classify_error(exc: Exception) -> str:
    msg = str(exc).lower()
    if any(x in msg for x in ("403", "banned", "suspended", "captcha")):
        return "ban"
    if any(x in msg for x in ("401", "invalid profile", "not found")):
        return "permanent"
    return "transient"

DLQ consumer loop

async def dlq_consumer():
    while True:
        _, raw = r.brpop("mlx:dlq", timeout=10)
        if not raw:
            continue
        entry = json.loads(raw)

        if entry["error_class"] == "ban":
            tag_cmdb_burn(entry["profile_id"])
            notify_slack(f"🔴 BAN signal: {entry['job_id']}", entry)
            r.lpush("mlx:dlq:archive", raw)
            continue

        if entry["error_class"] == "transient":
            job = entry["original_job"]
            job["attempt"] = 1
            await asyncio.sleep(300)  # 5 min backoff
            r.lpush("mlx:jobs", json.dumps(job))
            continue

        # permanent — alert and archive
        notify_slack(f"⚠️ DLQ permanent: {entry['job_id']}", entry)
        r.lpush("mlx:dlq:hold", raw)

Manual replay CLI

# replay one job from hold queue
python -m mlx_ops replay-dlq --job-id 550e8400-e29b-41d4-a716-446655440000

# list hold queue depth
python -m mlx_ops dlq-stats
def replay_job(job_id: str):
    for key in ("mlx:dlq:hold", "mlx:dlq:archive"):
        items = r.lrange(key, 0, -1)
        for raw in items:
            entry = json.loads(raw)
            if entry["job_id"] == job_id:
                job = entry["original_job"]
                job["attempt"] = 1
                r.lpush("mlx:jobs", json.dumps(job))
                r.lrem(key, 1, raw)
                return True
    return False

Metrics & alerts

Related

Disclosure: MLX-MMO affiliated with Multilogin.