This is the pattern MLX-MMO uses in GitHub issue threads when teams outgrow a single asyncio.run() script. It adds concurrency limits, retries, and guaranteed profile stop — the three gaps that cause zombie profiles and API bans.

Architecture

Job Queue → Worker (semaphore) → Launch MLX profile → CDP attach
         → Health check URL → User automation → Stop profile (finally)

Full worker module

import asyncio
import os
import random
from contextlib import asynccontextmanager

import httpx
from playwright.async_api import async_playwright

MLX_BASE = os.environ.get("MLX_API", "https://api.multilogin.com")
TOKEN = os.environ["MLX_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}
LAUNCH_SEM = asyncio.Semaphore(int(os.environ.get("MLX_MAX_PARALLEL", "4")))


async def _backoff(attempt: int) -> None:
    await asyncio.sleep(min(60, (2 ** attempt) + random.random()))


@asynccontextmanager
async def multilogin_session(profile_id: str):
    async with LAUNCH_SEM:
        async with httpx.AsyncClient(base_url=MLX_BASE, timeout=90) as client:
            cdp_url = None
            for attempt in range(5):
                try:
                    r = await client.post(
                        "/profile/start",
                        headers=HEADERS,
                        json={"profile_id": profile_id, "headless": False},
                    )
                    if r.status_code == 429:
                        await _backoff(attempt)
                        continue
                    r.raise_for_status()
                    cdp_url = r.json().get("cdp_url") or r.json().get("wsUrl")
                    if cdp_url:
                        break
                    await asyncio.sleep(2)
                except httpx.HTTPError:
                    await _backoff(attempt)
            if not cdp_url:
                raise RuntimeError(f"CDP unavailable for {profile_id}")

            pw = await async_playwright().start()
            browser = None
            try:
                browser = await pw.chromium.connect_over_cdp(cdp_url, timeout=60_000)
                ctx = browser.contexts[0] if browser.contexts else await browser.new_context()
                page = ctx.pages[0] if ctx.pages else await ctx.new_page()
                yield page
            finally:
                if browser:
                    await browser.close()
                await pw.stop()
                for attempt in range(3):
                    try:
                        await client.post(
                            "/profile/stop",
                            headers=HEADERS,
                            json={"profile_id": profile_id},
                        )
                        break
                    except httpx.HTTPError:
                        await _backoff(attempt)


async def health_check(page, url: str = "https://example.com") -> None:
    resp = await page.goto(url, wait_until="domcontentloaded", timeout=45_000)
    if not resp or resp.status >= 400:
        raise RuntimeError(f"Health check failed: {resp.status if resp else 'no response'}")


async def run_job(profile_id: str, target_url: str) -> str:
    async with multilogin_session(profile_id) as page:
        await health_check(page)
        await page.goto(target_url, wait_until="domcontentloaded")
        return await page.title()


async def main():
    profiles = os.environ["MLX_PROFILE_IDS"].split(",")
    tasks = [run_job(pid.strip(), "https://example.com") for pid in profiles]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    for pid, res in zip(profiles, results):
        print(pid, "OK" if isinstance(res, str) else f"ERR: {res}")


if __name__ == "__main__":
    asyncio.run(main())

Operational knobs

Env varDefaultPurpose
MLX_MAX_PARALLEL4Semaphore cap per worker VM
MLX_PROFILE_IDSComma-separated queue batch
MLX_APIapi.multilogin.comOverride for launcher/local API

Related

Disclosure: MLX-MMO affiliated with Multilogin. Adapt endpoints to your API version. SAAS50 / MIN50.