This is the pattern MLX-MMO uses in GitHub issue threads when teams outgrow a single asyncio.run() script. It adds concurrency limits, retries, and guaranteed profile stop — the three gaps that cause zombie profiles and API bans.
Architecture
Job Queue → Worker (semaphore) → Launch MLX profile → CDP attach
→ Health check URL → User automation → Stop profile (finally)
Full worker module
import asyncio
import os
import random
from contextlib import asynccontextmanager
import httpx
from playwright.async_api import async_playwright
MLX_BASE = os.environ.get("MLX_API", "https://api.multilogin.com")
TOKEN = os.environ["MLX_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}
LAUNCH_SEM = asyncio.Semaphore(int(os.environ.get("MLX_MAX_PARALLEL", "4")))
async def _backoff(attempt: int) -> None:
await asyncio.sleep(min(60, (2 ** attempt) + random.random()))
@asynccontextmanager
async def multilogin_session(profile_id: str):
async with LAUNCH_SEM:
async with httpx.AsyncClient(base_url=MLX_BASE, timeout=90) as client:
cdp_url = None
for attempt in range(5):
try:
r = await client.post(
"/profile/start",
headers=HEADERS,
json={"profile_id": profile_id, "headless": False},
)
if r.status_code == 429:
await _backoff(attempt)
continue
r.raise_for_status()
cdp_url = r.json().get("cdp_url") or r.json().get("wsUrl")
if cdp_url:
break
await asyncio.sleep(2)
except httpx.HTTPError:
await _backoff(attempt)
if not cdp_url:
raise RuntimeError(f"CDP unavailable for {profile_id}")
pw = await async_playwright().start()
browser = None
try:
browser = await pw.chromium.connect_over_cdp(cdp_url, timeout=60_000)
ctx = browser.contexts[0] if browser.contexts else await browser.new_context()
page = ctx.pages[0] if ctx.pages else await ctx.new_page()
yield page
finally:
if browser:
await browser.close()
await pw.stop()
for attempt in range(3):
try:
await client.post(
"/profile/stop",
headers=HEADERS,
json={"profile_id": profile_id},
)
break
except httpx.HTTPError:
await _backoff(attempt)
async def health_check(page, url: str = "https://example.com") -> None:
resp = await page.goto(url, wait_until="domcontentloaded", timeout=45_000)
if not resp or resp.status >= 400:
raise RuntimeError(f"Health check failed: {resp.status if resp else 'no response'}")
async def run_job(profile_id: str, target_url: str) -> str:
async with multilogin_session(profile_id) as page:
await health_check(page)
await page.goto(target_url, wait_until="domcontentloaded")
return await page.title()
async def main():
profiles = os.environ["MLX_PROFILE_IDS"].split(",")
tasks = [run_job(pid.strip(), "https://example.com") for pid in profiles]
results = await asyncio.gather(*tasks, return_exceptions=True)
for pid, res in zip(profiles, results):
print(pid, "OK" if isinstance(res, str) else f"ERR: {res}")
if __name__ == "__main__":
asyncio.run(main())
Operational knobs
| Env var | Default | Purpose |
|---|---|---|
MLX_MAX_PARALLEL | 4 | Semaphore cap per worker VM |
MLX_PROFILE_IDS | — | Comma-separated queue batch |
MLX_API | api.multilogin.com | Override for launcher/local API |
Related
Disclosure: MLX-MMO affiliated with Multilogin. Adapt endpoints to your API version. SAAS50 / MIN50.