feat: add request queuing to router (replaces hard 503 on saturation)
When all GPUs are saturated, requests now enter a queue loop (poll every 500ms) instead of immediately returning 503. Configurable via QUEUE_TIMEOUT env var (default 30s) or X-Queue-Timeout header per-request. This prevents agent failures from cluster saturation — agents wait for a slot instead of crashing on fallback.
This commit is contained in:
+20
-2
@@ -223,6 +223,8 @@ def bcast():
|
|||||||
except Exception: dead.append(q)
|
except Exception: dead.append(q)
|
||||||
for q in dead: sse_subscribers.remove(q)
|
for q in dead: sse_subscribers.remove(q)
|
||||||
|
|
||||||
|
QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503
|
||||||
|
|
||||||
@app.route("/v1/chat/completions", methods=["POST"])
|
@app.route("/v1/chat/completions", methods=["POST"])
|
||||||
def chat():
|
def chat():
|
||||||
try:
|
try:
|
||||||
@@ -230,11 +232,27 @@ def chat():
|
|||||||
ak = request.headers.get("Authorization","").replace("Bearer ","")
|
ak = request.headers.get("Authorization","").replace("Bearer ","")
|
||||||
ki = API_KEYS.get(ak, {"tier":"starter","agent":"unknown"})
|
ki = API_KEYS.get(ak, {"tier":"starter","agent":"unknown"})
|
||||||
tier, agent = ki["tier"], ki["agent"]
|
tier, agent = ki["tier"], ki["agent"]
|
||||||
|
|
||||||
|
# Allow agent to override queue timeout via header
|
||||||
|
q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT)))
|
||||||
|
|
||||||
d = route(rd, tier)
|
d = route(rd, tier)
|
||||||
if d.get("saturated"):
|
queue_start = time.time()
|
||||||
resp = jsonify({"error": "All GPUs saturated", "retry_after_s": 5})
|
|
||||||
|
# Queue loop: wait for a GPU slot instead of immediate 503
|
||||||
|
while d.get("saturated"):
|
||||||
|
elapsed = time.time() - queue_start
|
||||||
|
if elapsed > q_timeout:
|
||||||
|
resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5})
|
||||||
resp.headers["Retry-After"] = "5"
|
resp.headers["Retry-After"] = "5"
|
||||||
|
log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed)
|
||||||
return resp, 503
|
return resp, 503
|
||||||
|
time.sleep(0.5) # poll every 500ms
|
||||||
|
d = route(rd, tier)
|
||||||
|
|
||||||
|
waited = time.time() - queue_start
|
||||||
|
if waited > 0.5:
|
||||||
|
log.info("QUEUED: %s waited %.1fs before slot opened", agent, waited)
|
||||||
model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
|
model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
|
||||||
is_stream = rd.get("stream", False)
|
is_stream = rd.get("stream", False)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user