feat: add request queuing to router (replaces hard 503)
This commit is contained in:
+20
-2
@@ -223,6 +223,8 @@ def bcast():
|
||||
except Exception: dead.append(q)
|
||||
for q in dead: sse_subscribers.remove(q)
|
||||
|
||||
QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503
|
||||
|
||||
@app.route("/v1/chat/completions", methods=["POST"])
|
||||
def chat():
|
||||
try:
|
||||
@@ -230,11 +232,27 @@ def chat():
|
||||
ak = request.headers.get("Authorization","").replace("Bearer ","")
|
||||
ki = API_KEYS.get(ak, {"tier":"starter","agent":"unknown"})
|
||||
tier, agent = ki["tier"], ki["agent"]
|
||||
|
||||
# Allow agent to override queue timeout via header
|
||||
q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT)))
|
||||
|
||||
d = route(rd, tier)
|
||||
if d.get("saturated"):
|
||||
resp = jsonify({"error": "All GPUs saturated", "retry_after_s": 5})
|
||||
queue_start = time.time()
|
||||
|
||||
# Queue loop: wait for a GPU slot instead of immediate 503
|
||||
while d.get("saturated"):
|
||||
elapsed = time.time() - queue_start
|
||||
if elapsed > q_timeout:
|
||||
resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5})
|
||||
resp.headers["Retry-After"] = "5"
|
||||
log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed)
|
||||
return resp, 503
|
||||
time.sleep(0.5) # poll every 500ms
|
||||
d = route(rd, tier)
|
||||
|
||||
waited = time.time() - queue_start
|
||||
if waited > 0.5:
|
||||
log.info("QUEUED: %s waited %.1fs before slot opened", agent, waited)
|
||||
model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
|
||||
is_stream = rd.get("stream", False)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user