From 5b99b167124c3cb613b721af67b4626d716c2722 Mon Sep 17 00:00:00 2001 From: Abiba Date: Tue, 19 May 2026 15:55:13 +0000 Subject: [PATCH] feat: add request queuing to router (replaces hard 503) --- router/router.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/router/router.py b/router/router.py index 2843fb3..c3674fb 100644 --- a/router/router.py +++ b/router/router.py @@ -223,6 +223,8 @@ def bcast(): except Exception: dead.append(q) for q in dead: sse_subscribers.remove(q) +QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503 + @app.route("/v1/chat/completions", methods=["POST"]) def chat(): try: @@ -230,11 +232,27 @@ def chat(): ak = request.headers.get("Authorization","").replace("Bearer ","") ki = API_KEYS.get(ak, {"tier":"starter","agent":"unknown"}) tier, agent = ki["tier"], ki["agent"] + + # Allow agent to override queue timeout via header + q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT))) + d = route(rd, tier) - if d.get("saturated"): - resp = jsonify({"error": "All GPUs saturated", "retry_after_s": 5}) - resp.headers["Retry-After"] = "5" - return resp, 503 + queue_start = time.time() + + # Queue loop: wait for a GPU slot instead of immediate 503 + while d.get("saturated"): + elapsed = time.time() - queue_start + if elapsed > q_timeout: + resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5}) + resp.headers["Retry-After"] = "5" + log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed) + return resp, 503 + time.sleep(0.5) # poll every 500ms + d = route(rd, tier) + + waited = time.time() - queue_start + if waited > 0.5: + log.info("QUEUED: %s waited %.1fs before slot opened", agent, waited) model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]] is_stream = rd.get("stream", False)