Mumuni review action items: health checks for all containers, version pinning, 503+Retry-After on all-GPU saturation
This commit is contained in:
+37
-4
@@ -46,6 +46,29 @@ log = logging.getLogger("router")
|
||||
try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping()
|
||||
except Exception: r = None
|
||||
|
||||
|
||||
def counter_audit_loop():
|
||||
"""Every 30s, check GPU slots and reset counters if all slots idle."""
|
||||
while True:
|
||||
time.sleep(30)
|
||||
if not r: continue
|
||||
for model, url in GPU_URLS.items():
|
||||
try:
|
||||
resp = requests.get(url.replace("/v1","") + "/slots",
|
||||
headers={"Authorization": "Bearer not-needed"}, timeout=5)
|
||||
if resp.status_code == 200:
|
||||
slots = resp.json()
|
||||
all_idle = all(not s.get("is_processing", False) for s in slots)
|
||||
if all_idle:
|
||||
current = int(r.get("active:" + model) or 0)
|
||||
if current > 0:
|
||||
r.set("active:" + model, 0)
|
||||
log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
threading.Thread(target=counter_audit_loop, daemon=True).start()
|
||||
|
||||
app = Flask(__name__)
|
||||
sse_subscribers = []; sse_lock = threading.Lock()
|
||||
|
||||
@@ -118,7 +141,7 @@ def route(rd, tier):
|
||||
hints = rd.get("routing_hints",{})
|
||||
allowed = TIER_MODELS.get(tier, ["gemma-4-E4B"])
|
||||
avail = [m for m in available_models() if m in allowed]
|
||||
if not avail: return {"model": allowed[0], "reason": "all_saturated"}
|
||||
if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True}
|
||||
|
||||
req = rd.get("model","auto")
|
||||
if req != "auto":
|
||||
@@ -207,10 +230,16 @@ def chat():
|
||||
ak = request.headers.get("Authorization","").replace("Bearer ","")
|
||||
ki = API_KEYS.get(ak, {"tier":"starter","agent":"unknown"})
|
||||
tier, agent = ki["tier"], ki["agent"]
|
||||
d = route(rd, tier); model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
|
||||
d = route(rd, tier)
|
||||
if d.get("saturated"):
|
||||
resp = jsonify({"error": "All GPUs saturated", "retry_after_s": 5})
|
||||
resp.headers["Retry-After"] = "5"
|
||||
return resp, 503
|
||||
model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
|
||||
is_stream = rd.get("stream", False)
|
||||
|
||||
gpu_incr(model) # Track active request
|
||||
gpu_incr(model)
|
||||
decremented = False
|
||||
|
||||
log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1))
|
||||
if r:
|
||||
@@ -224,7 +253,8 @@ def chat():
|
||||
resp = requests.post(url+"/chat/completions", json=rd,
|
||||
headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream)
|
||||
lat = int((time.time()-start)*1000)
|
||||
gpu_decr(model) # Release slot
|
||||
gpu_decr(model)
|
||||
decremented = True # Release slot
|
||||
|
||||
if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502
|
||||
if is_stream:
|
||||
@@ -241,6 +271,9 @@ def chat():
|
||||
data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"active_gpu":gpu_active_count(model)}
|
||||
bcast()
|
||||
return jsonify(data)
|
||||
if not decremented:
|
||||
try: gpu_decr(model)
|
||||
except: pass
|
||||
except requests.Timeout:
|
||||
return jsonify({"error":"timeout"}), 504
|
||||
log.error("Error: %s\n%s", e, traceback.format_exc())
|
||||
|
||||
Reference in New Issue
Block a user