54a4f26db7
Heavy tier keeps MoE primary (workhorse for >25K tok). Default tier routes Dense → VLM → MoE to prevent MoE overload. MoE had 5 timeouts in 15 min when Default pushed overflow to it.
632 lines
29 KiB
Python
632 lines
29 KiB
Python
import os, json, time, logging, traceback, threading, queue, statistics, math
|
|
import requests, redis
|
|
from flask import Flask, request, jsonify, Response, stream_with_context
|
|
|
|
REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379")
|
|
GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1")
|
|
GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1")
|
|
GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1")
|
|
|
|
GPU_SIDECARS = {
|
|
"qwen3.6-35B-A3B": "http://192.168.68.15:8090",
|
|
"qwen3.6-27B-code": "http://192.168.68.8:8090",
|
|
"qwen3.5-9b-vlm": "http://192.168.68.110:8090",
|
|
}
|
|
GPU_URLS = {
|
|
"qwen3.6-35B-A3B": GPU_MOE_URL,
|
|
"qwen3.6-27B-code": GPU_DENSE_URL,
|
|
"qwen3.5-9b-vlm": GPU_LIGHT_URL,
|
|
}
|
|
# Max concurrent requests per GPU (based on llama.cpp --parallel)
|
|
GPU_MAX_CONCURRENT = {
|
|
"qwen3.6-35B-A3B": 2, # 2 slots (Dense-first routing reduces thermal load)
|
|
"qwen3.6-27B-code": 2, # 2 slots (128K context frees VRAM)
|
|
"qwen3.5-9b-vlm": 2, # 2 slots (12GB VRAM, 4GB headroom)
|
|
}
|
|
|
|
# Context window sizes (tokens) — used for compaction signals
|
|
GPU_CONTEXT = {
|
|
"qwen3.6-35B-A3B": 262144,
|
|
"qwen3.6-27B-code": 131072,
|
|
"qwen3.5-9b-vlm": 262144,
|
|
}
|
|
|
|
TIER_MODELS = {
|
|
"starter": ["qwen3.5-9b-vlm"],
|
|
"professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
|
|
"enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
|
|
}
|
|
API_KEYS = {
|
|
"sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"},
|
|
"sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"},
|
|
"sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"},
|
|
"sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"},
|
|
"sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"},
|
|
"sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"},
|
|
"sk-syslog-koonimo": {"tier": "enterprise", "agent": "Koonimo"},
|
|
"sk-starter-abc123": {"tier": "starter", "agent": "test-starter"},
|
|
"sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"},
|
|
}
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s")
|
|
log = logging.getLogger("router")
|
|
try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping()
|
|
except Exception: r = None
|
|
|
|
|
|
def counter_audit_loop():
|
|
"""Every 30s, check GPU slots and reset counters if all slots idle."""
|
|
while True:
|
|
time.sleep(30)
|
|
if not r: continue
|
|
for model, url in GPU_URLS.items():
|
|
try:
|
|
resp = requests.get(url.replace("/v1","") + "/slots",
|
|
headers={"Authorization": "Bearer not-needed"}, timeout=5)
|
|
if resp.status_code == 200:
|
|
slots = resp.json()
|
|
all_idle = all(not s.get("is_processing", False) for s in slots)
|
|
if all_idle:
|
|
current = int(r.get("active:" + model) or 0)
|
|
if current > 0:
|
|
r.set("active:" + model, 0)
|
|
log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current)
|
|
except Exception:
|
|
pass
|
|
|
|
threading.Thread(target=counter_audit_loop, daemon=True).start()
|
|
|
|
app = Flask(__name__)
|
|
sse_subscribers = []; sse_lock = threading.Lock()
|
|
|
|
def gpu_active_count(model):
|
|
"""Get number of in-flight requests for a GPU."""
|
|
if r:
|
|
return int(r.get("active:" + model) or 0)
|
|
return 0
|
|
|
|
def gpu_incr(model):
|
|
if r: r.incr("active:" + model)
|
|
|
|
def gpu_decr(model):
|
|
if r:
|
|
v = r.decr("active:" + model)
|
|
if v and int(v) < 0:
|
|
r.set("active:" + model, 0) # never go negative
|
|
|
|
def check_gpu_health(model, sidecar_timeout=5, gpu_timeout=3):
|
|
url = GPU_SIDECARS.get(model)
|
|
if not url: return {"status": "unknown"}
|
|
try:
|
|
resp = requests.get(url, timeout=sidecar_timeout)
|
|
if resp.status_code == 200:
|
|
d = resp.json()
|
|
pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100
|
|
status = "healthy" # VRAM usage != saturation; busy slots handled by is_gpu_busy()
|
|
vram_warning = pct >= 95
|
|
# Also check if llama.cpp endpoint is actually responding
|
|
gpu_url = GPU_URLS.get(model, "")
|
|
try:
|
|
hr = requests.get(gpu_url.replace("/v1","") + "/health", headers={"Authorization": "Bearer not-needed"}, timeout=gpu_timeout)
|
|
if hr.status_code != 200:
|
|
status = "down"
|
|
except Exception:
|
|
status = "down"
|
|
return {"status": status, "vram_warning": vram_warning, "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")}
|
|
except Exception: pass
|
|
return {"status": "down"}
|
|
|
|
def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")]
|
|
|
|
def estimate_tokens(msgs):
|
|
"""Estimate token count from messages. Uses JSON length / 3.5 (closer to real tokenizer ratios for dense text)."""
|
|
return len(json.dumps(msgs, default=str)) // 3.5
|
|
|
|
def store_perf_record(model, agent, tier, reason, queue_ms, inference_ms, prompt_tokens, completion_tokens, stream):
|
|
"""Store detailed performance record in Redis for analytics."""
|
|
if not r: return
|
|
try:
|
|
total_ms = queue_ms + inference_ms
|
|
tps = completion_tokens / (inference_ms / 1000) if inference_ms > 0 and completion_tokens > 0 else 0
|
|
rec = json.dumps({
|
|
"ts": time.time(),
|
|
"model": model, "agent": agent, "tier": tier, "reason": reason,
|
|
"queue_ms": round(queue_ms, 1),
|
|
"inference_ms": round(inference_ms, 1),
|
|
"total_ms": round(total_ms, 1),
|
|
"prompt_tokens": prompt_tokens,
|
|
"completion_tokens": completion_tokens,
|
|
"tokens_per_sec": round(tps, 1),
|
|
"stream": stream
|
|
})
|
|
# Global recent list (last 500)
|
|
r.lpush("perf:recent", rec)
|
|
r.ltrim("perf:recent", 0, 499)
|
|
# Per-model list (last 200)
|
|
r.lpush("perf:model:" + model, rec)
|
|
r.ltrim("perf:model:" + model, 0, 199)
|
|
# Per-reason list (last 200)
|
|
r.lpush("perf:reason:" + reason, rec)
|
|
r.ltrim("perf:reason:" + reason, 0, 199)
|
|
# Per-agent list (last 200)
|
|
r.lpush("perf:agent:" + agent, rec)
|
|
r.ltrim("perf:agent:" + agent, 0, 199)
|
|
except Exception:
|
|
pass
|
|
|
|
def is_gpu_busy(model):
|
|
"""Check if GPU is at or near max concurrent capacity."""
|
|
active = gpu_active_count(model)
|
|
max_c = GPU_MAX_CONCURRENT.get(model, 1)
|
|
return active >= max_c
|
|
|
|
def select_best_gpu(candidates, reason):
|
|
"""Pick the best GPU from candidates IN ORDER — first non-busy one wins."""
|
|
for m in candidates:
|
|
if not is_gpu_busy(m):
|
|
return {"model": m, "reason": reason}
|
|
# All busy — pick least loaded
|
|
best = None
|
|
best_load = 999
|
|
for m in candidates:
|
|
load = gpu_active_count(m)
|
|
if load < best_load:
|
|
best_load = load
|
|
best = m
|
|
if best:
|
|
return {"model": best, "reason": "load_balanced_" + reason}
|
|
return None
|
|
|
|
def route(rd, tier):
|
|
msgs = rd.get("messages",[]); t = estimate_tokens(msgs)
|
|
sys = any(m.get("role")=="system" for m in msgs)
|
|
turns = len([m for m in msgs if m.get("role") in ("user","assistant")])
|
|
hints = rd.get("routing_hints",{})
|
|
allowed = TIER_MODELS.get(tier, ["qwen3.5-9b-vlm"])
|
|
avail = [m for m in available_models() if m in allowed]
|
|
if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True}
|
|
# Check if all available GPUs are at max capacity
|
|
if all(is_gpu_busy(m) for m in avail):
|
|
return {"model": avail[0], "reason": "all_saturated", "saturated": True}
|
|
|
|
req = rd.get("model","auto")
|
|
if req != "auto":
|
|
target = req if req in avail else avail[0]
|
|
# If explicit model is busy, check if another can take it
|
|
if is_gpu_busy(target) and req in allowed:
|
|
alts = [m for m in avail if m != target and m in allowed]
|
|
if alts:
|
|
alt = select_best_gpu(alts, "explicit")
|
|
if alt: return alt
|
|
return {"model": target, "reason": "explicit"}
|
|
|
|
if hints:
|
|
if hints.get("priority")=="speed" and "qwen3.5-9b-vlm" in avail:
|
|
return select_best_gpu(["qwen3.5-9b-vlm"], "hint_speed") or {"model":"qwen3.5-9b-vlm","reason":"hint_speed"}
|
|
if hints.get("priority")=="quality" and "qwen3.6-35B-A3B" in avail:
|
|
return select_best_gpu(["qwen3.6-35B-A3B"], "hint_quality") or {"model":"qwen3.6-35B-A3B","reason":"hint_quality"}
|
|
|
|
first_msg = msgs[0].get("content","") if msgs else ""
|
|
words = len(first_msg.split()) if isinstance(first_msg, str) else 99
|
|
|
|
# TIER 1: Lightweight — single-turn short queries → VLM (fastest)
|
|
if not sys and turns <= 1 and t <= 500 and words <= 100 and "qwen3.5-9b-vlm" in avail:
|
|
if not is_gpu_busy("qwen3.5-9b-vlm"):
|
|
return {"model":"qwen3.5-9b-vlm","reason":"lightweight"}
|
|
# VLM busy — Dense is faster for short queries than MoE
|
|
fallback = [m for m in ["qwen3.6-27B-code","qwen3.6-35B-A3B"] if m in avail]
|
|
result = select_best_gpu(fallback, "lightweight_fallback")
|
|
if result: return result
|
|
|
|
# TIER 2: Simple conversations — VLM primary (up to 15K tok), fastest for moderate chat
|
|
if t <= 15000 and turns <= 12 and "qwen3.5-9b-vlm" in avail:
|
|
if not is_gpu_busy("qwen3.5-9b-vlm"):
|
|
return {"model":"qwen3.5-9b-vlm","reason":"simple_conv"}
|
|
# VLM busy — fall back to Dense, then MoE
|
|
fallback = [m for m in ["qwen3.6-27B-code","qwen3.6-35B-A3B"] if m in avail]
|
|
result = select_best_gpu(fallback, "simple_conv_fallback")
|
|
if result: return result
|
|
|
|
# TIER 3: Medium complexity — Dense primary, VLM fallback (quality + speed balance)
|
|
if t <= 25000:
|
|
candidates = [m for m in ["qwen3.6-27B-code","qwen3.5-9b-vlm","qwen3.6-35B-A3B"] if m in avail]
|
|
result = select_best_gpu(candidates, "medium")
|
|
if result: return result
|
|
|
|
# TIER 4: Heavy reasoning — MoE primary (workhorse), Dense fallback
|
|
if t > 25000:
|
|
candidates = [m for m in ["qwen3.6-35B-A3B","qwen3.6-27B-code","qwen3.5-9b-vlm"] if m in avail]
|
|
result = select_best_gpu(candidates, "heavy_reasoning")
|
|
if result: return result
|
|
|
|
# TIER 5: Default — Dense primary, MoE fallback
|
|
candidates = [m for m in ["qwen3.6-27B-code","qwen3.5-9b-vlm","qwen3.6-35B-A3B"] if m in avail]
|
|
result = select_best_gpu(candidates, "default")
|
|
if result: return result
|
|
return {"model":avail[0],"reason":"last_resort"}
|
|
|
|
def clean_unicode(text):
|
|
if not isinstance(text, str): return text
|
|
text = text.replace(chr(0x2014), "-"); text = text.replace(chr(0x2013), "-")
|
|
text = text.replace(chr(0x2018), "'"); text = text.replace(chr(0x2019), "'")
|
|
text = text.replace(chr(0x201C), '"'); text = text.replace(chr(0x201D), '"')
|
|
text = text.replace(chr(0x2026), "..."); text = text.replace(chr(0x00A0), " ")
|
|
return text.encode("ascii", "ignore").decode("ascii")
|
|
|
|
def clean_response(d):
|
|
if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()}
|
|
if isinstance(d, list): return [clean_response(v) for v in d]
|
|
if isinstance(d, str): return clean_unicode(d)
|
|
return d
|
|
|
|
def get_metrics():
|
|
d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time(),"active_requests":{}}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m)
|
|
d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w"),"active_requests":gpu_active_count(m), "max_concurrent": GPU_MAX_CONCURRENT.get(m, 1)})
|
|
d["active_requests"][m] = gpu_active_count(m)
|
|
if r:
|
|
try:
|
|
for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0)
|
|
for k,v in API_KEYS.items():
|
|
c = int(r.get("routes:agent:"+v["agent"]) or 0)
|
|
if c>0: d["agent_counts"][v["agent"]] = c
|
|
for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0)
|
|
raw = r.lrange("routes:recent",0,49)
|
|
d["recent"] = [json.loads(x) for x in raw] if raw else []
|
|
except Exception: pass
|
|
return d
|
|
|
|
def bcast():
|
|
data = get_metrics(); payload = json.dumps(data)
|
|
with sse_lock:
|
|
dead = []
|
|
for q in sse_subscribers:
|
|
try: q.put(payload)
|
|
except Exception: dead.append(q)
|
|
for q in dead: sse_subscribers.remove(q)
|
|
|
|
QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503
|
|
|
|
@app.route("/v1/chat/completions", methods=["POST"])
|
|
def chat():
|
|
try:
|
|
rd = request.get_json(force=True)
|
|
ak = request.headers.get("Authorization","").replace("Bearer ","")
|
|
if not ak or ak not in API_KEYS:
|
|
log.warning("AUTH_REJECTED: no/invalid API key from %s", request.remote_addr)
|
|
return jsonify({"error": "Unauthorized — valid API key required"}), 401
|
|
ki = API_KEYS[ak]
|
|
tier, agent = ki["tier"], ki["agent"]
|
|
|
|
# Allow agent to override queue timeout via header
|
|
q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT)))
|
|
|
|
# Cross-turn context tracking: accumulate tokens per session
|
|
session_id = request.headers.get("X-Session-Id", "")
|
|
session_tokens = 0
|
|
if session_id and r:
|
|
try:
|
|
prev = int(r.get("session:" + session_id) or 0)
|
|
current = estimate_tokens(rd.get("messages",[]))
|
|
session_tokens = max(prev, current) # context only grows
|
|
r.set("session:" + session_id, session_tokens, ex=86400) # TTL 24h
|
|
except Exception: pass
|
|
|
|
d = route(rd, tier)
|
|
queue_start = time.time()
|
|
|
|
# Queue loop: wait for a GPU slot instead of immediate 503
|
|
while d.get("saturated"):
|
|
elapsed = time.time() - queue_start
|
|
if elapsed > q_timeout:
|
|
resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5})
|
|
resp.headers["Retry-After"] = "5"
|
|
log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed)
|
|
return resp, 503
|
|
time.sleep(0.5) # poll every 500ms
|
|
d = route(rd, tier)
|
|
|
|
queue_ms = (time.time() - queue_start) * 1000
|
|
if queue_ms > 500:
|
|
log.info("QUEUED: %s waited %.0fms before slot opened", agent, queue_ms)
|
|
model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
|
|
is_stream = rd.get("stream", False)
|
|
|
|
gpu_incr(model)
|
|
|
|
log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1))
|
|
if r:
|
|
try:
|
|
r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent)
|
|
r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H"))
|
|
r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent,"queue_ms": round(queue_ms,1)}))
|
|
r.ltrim("routes:recent",0,999)
|
|
except Exception: pass
|
|
start = time.time()
|
|
resp = requests.post(url+"/chat/completions", json=rd,
|
|
headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream)
|
|
lat = int((time.time()-start)*1000)
|
|
gpu_decr(model)
|
|
|
|
if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502
|
|
if is_stream:
|
|
# Buffer stream to capture timings from final SSE chunk
|
|
chunks = []
|
|
stream_timings = {}
|
|
for raw in resp.iter_content(chunk_size=None, decode_unicode=True):
|
|
if raw:
|
|
cleaned = clean_unicode(raw)
|
|
chunks.append(cleaned)
|
|
# Parse last content chunk (before [DONE]) for timings
|
|
if not stream_timings and '"timings"' in cleaned and '"predicted_n"' in cleaned:
|
|
try:
|
|
json_str = cleaned.replace("data: ", "").strip()
|
|
if json_str.startswith("{"):
|
|
tj = json.loads(json_str).get("timings", {})
|
|
if tj:
|
|
stream_timings = tj
|
|
except: pass
|
|
# Store perf record with real token counts from stream
|
|
if stream_timings:
|
|
pt = stream_timings.get("prompt_n", 0)
|
|
ct = stream_timings.get("predicted_n", 0)
|
|
tps = stream_timings.get("predicted_per_second", 0)
|
|
gen_ms = stream_timings.get("predicted_ms", lat)
|
|
store_perf_record(model, agent, tier, reason, queue_ms, gen_ms, pt, ct, True)
|
|
else:
|
|
store_perf_record(model, agent, tier, reason, queue_ms, lat, estimate_tokens(rd.get("messages",[])), 0, True)
|
|
# Yield all chunks to client
|
|
def gen():
|
|
for c in chunks: yield c
|
|
bcast()
|
|
ctx_remaining = GPU_CONTEXT.get(model, 65536) - max(session_tokens, estimate_tokens(rd.get("messages",[])))
|
|
ctx_pct = ctx_remaining / GPU_CONTEXT.get(model, 65536) * 100
|
|
ctx_warning = "compact_urgent" if ctx_pct < 5 else ("compact_recommended" if ctx_pct < 15 else ("compact_soon" if ctx_pct < 30 else "ok"))
|
|
sse_resp = Response(stream_with_context(gen()), mimetype="text/event-stream")
|
|
sse_resp.headers["X-Context-Remaining"] = str(max(0, ctx_remaining))
|
|
sse_resp.headers["X-Context-Warning"] = ctx_warning
|
|
sse_resp.headers["X-Context-Model"] = model
|
|
return sse_resp
|
|
data = clean_response(resp.json())
|
|
for c in data.get("choices",[]):
|
|
msg = c.get("message",{})
|
|
if not msg.get("content") and msg.get("reasoning_content"):
|
|
msg["content"] = msg["reasoning_content"]
|
|
# Extract performance data from llama.cpp response
|
|
usage = data.get("usage", {})
|
|
timings = data.get("timings", {})
|
|
prompt_tokens = usage.get("prompt_tokens", 0)
|
|
completion_tokens = usage.get("completion_tokens", 0)
|
|
inference_ms = lat # total GPU round-trip
|
|
store_perf_record(model, agent, tier, reason, queue_ms, inference_ms, prompt_tokens, completion_tokens, False)
|
|
ctx_remaining = GPU_CONTEXT.get(model, 65536) - max(session_tokens, estimate_tokens(rd.get("messages",[])))
|
|
ctx_pct = ctx_remaining / GPU_CONTEXT.get(model, 65536) * 100
|
|
ctx_warning = "compact_urgent" if ctx_pct < 5 else ("compact_recommended" if ctx_pct < 15 else ("compact_soon" if ctx_pct < 30 else "ok"))
|
|
data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"queue_ms": round(queue_ms,1),"active_gpu":gpu_active_count(model),"context_remaining": max(0, ctx_remaining),"context_pct": round(ctx_pct,1),"context_warning": ctx_warning}
|
|
resp = jsonify(data)
|
|
resp.headers["X-Context-Remaining"] = str(max(0, ctx_remaining))
|
|
resp.headers["X-Context-Warning"] = ctx_warning
|
|
resp.headers["X-Context-Model"] = model
|
|
bcast()
|
|
return resp
|
|
except requests.Timeout:
|
|
gpu_decr(model)
|
|
log.error("TIMEOUT: %s -> %s", agent, model)
|
|
return jsonify({"error":"timeout"}), 504
|
|
except Exception as e:
|
|
gpu_decr(model)
|
|
log.error("Error: %s\n%s", e, traceback.format_exc())
|
|
return jsonify({"error":str(e)}), 500
|
|
|
|
@app.route("/metrics/performance")
|
|
def performance():
|
|
"""Per-request performance analytics with percentiles per model/reason/agent."""
|
|
if not r: return jsonify({"error": "Redis unavailable"}), 503
|
|
try:
|
|
window_hours = int(request.args.get("window", "24"))
|
|
model_filter = request.args.get("model", "all")
|
|
|
|
# Load recent records
|
|
cutoff = time.time() - (window_hours * 3600)
|
|
raw = r.lrange("perf:recent", 0, -1)
|
|
records = []
|
|
for x in raw:
|
|
try:
|
|
rec = json.loads(x)
|
|
if rec["ts"] >= cutoff:
|
|
records.append(rec)
|
|
except: pass
|
|
|
|
# Filter by model if specified
|
|
if model_filter != "all":
|
|
records = [r for r in records if r["model"] == model_filter]
|
|
|
|
if not records:
|
|
return jsonify({"models": [], "reasons": [], "agents": [], "summary": {"total_requests": 0}})
|
|
|
|
def pct(values, p):
|
|
if len(values) < 2: return round(values[0], 1) if values else 0
|
|
return round(statistics.quantiles(sorted(values), n=100, method='inclusive')[min(p-1, 98)], 1)
|
|
|
|
# Per-model stats
|
|
model_groups = {}
|
|
for rec in records:
|
|
m = rec["model"]
|
|
if m not in model_groups: model_groups[m] = []
|
|
model_groups[m].append(rec)
|
|
|
|
models = []
|
|
for m, recs in sorted(model_groups.items()):
|
|
latencies = [r["total_ms"] for r in recs]
|
|
tps_vals = [r["tokens_per_sec"] for r in recs if r["tokens_per_sec"] > 0]
|
|
non_stream = [r for r in recs if not r["stream"]]
|
|
queue_times = [r["queue_ms"] for r in non_stream]
|
|
models.append({
|
|
"model": m,
|
|
"count": len(recs),
|
|
"stream_pct": round(len([r for r in recs if r["stream"]]) / len(recs) * 100, 1),
|
|
"latency": {
|
|
"avg": round(statistics.mean(latencies), 1),
|
|
"p50": pct(latencies, 50),
|
|
"p95": pct(latencies, 95),
|
|
"p99": pct(latencies, 99)
|
|
},
|
|
"throughput": {
|
|
"avg_tokens_per_sec": round(statistics.mean(tps_vals), 1) if tps_vals else 0,
|
|
"p50": pct(tps_vals, 50) if tps_vals else 0,
|
|
"p95": pct(tps_vals, 95) if tps_vals else 0,
|
|
},
|
|
"queue": {
|
|
"avg_ms": round(statistics.mean(queue_times), 1) if queue_times else 0,
|
|
"p95_ms": pct(queue_times, 95) if queue_times else 0,
|
|
} if queue_times else None
|
|
})
|
|
|
|
# Per-reason stats
|
|
reason_groups = {}
|
|
for rec in records:
|
|
rsn = rec["reason"]
|
|
if rsn not in reason_groups: reason_groups[rsn] = []
|
|
reason_groups[rsn].append(rec)
|
|
|
|
reasons = []
|
|
for rsn, recs in sorted(reason_groups.items(), key=lambda x: -len(x[1])):
|
|
latencies = [r["total_ms"] for r in recs]
|
|
reasons.append({
|
|
"reason": rsn,
|
|
"count": len(recs),
|
|
"avg_total_ms": round(statistics.mean(latencies), 1),
|
|
"p95_total_ms": pct(latencies, 95)
|
|
})
|
|
|
|
# Per-agent stats
|
|
agent_groups = {}
|
|
for rec in records:
|
|
ag = rec["agent"]
|
|
if ag not in agent_groups: agent_groups[ag] = []
|
|
agent_groups[ag].append(rec)
|
|
|
|
agents = []
|
|
for ag, recs in sorted(agent_groups.items(), key=lambda x: -len(x[1])):
|
|
latencies = [r["total_ms"] for r in recs]
|
|
tps_vals = [r["tokens_per_sec"] for r in recs if r["tokens_per_sec"] > 0]
|
|
agents.append({
|
|
"agent": ag,
|
|
"count": len(recs),
|
|
"avg_total_ms": round(statistics.mean(latencies), 1),
|
|
"avg_tokens_per_sec": round(statistics.mean(tps_vals), 1) if tps_vals else 0
|
|
})
|
|
|
|
all_lat = [r["total_ms"] for r in records]
|
|
all_tps = [r["tokens_per_sec"] for r in records if r["tokens_per_sec"] > 0]
|
|
summary = {
|
|
"total_requests": len(records),
|
|
"window_hours": window_hours,
|
|
"latency": {
|
|
"avg_ms": round(statistics.mean(all_lat), 1),
|
|
"p50_ms": pct(all_lat, 50),
|
|
"p95_ms": pct(all_lat, 95),
|
|
"p99_ms": pct(all_lat, 99)
|
|
},
|
|
"throughput_avg_tps": round(statistics.mean(all_tps), 1) if all_tps else 0
|
|
}
|
|
|
|
return jsonify({"models": models, "reasons": reasons, "agents": agents, "summary": summary})
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/metrics/scatter")
|
|
def scatter():
|
|
"""Return individual data points for scatter plots (prompt_tokens vs latency)."""
|
|
if not r: return jsonify({"error": "Redis unavailable"}), 503
|
|
try:
|
|
window_hours = int(request.args.get("window", "24"))
|
|
model_filter = request.args.get("model", "all")
|
|
cutoff = time.time() - (window_hours * 3600)
|
|
raw = r.lrange("perf:recent", 0, -1)
|
|
points = []
|
|
for x in raw:
|
|
try:
|
|
rec = json.loads(x)
|
|
if rec["ts"] >= cutoff:
|
|
if model_filter == "all" or rec["model"] == model_filter:
|
|
points.append({
|
|
"model": rec["model"],
|
|
"agent": rec["agent"],
|
|
"reason": rec["reason"],
|
|
"prompt_tokens": int(rec.get("prompt_tokens", 0)),
|
|
"completion_tokens": rec.get("completion_tokens", 0),
|
|
"inference_ms": round(rec["inference_ms"], 1),
|
|
"tokens_per_sec": rec.get("tokens_per_sec", 0),
|
|
"stream": rec.get("stream", False)
|
|
})
|
|
except: pass
|
|
return jsonify({"points": points, "count": len(points)})
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/v1/models")
|
|
def models():
|
|
def _h(m): return check_gpu_health(m, sidecar_timeout=1.5, gpu_timeout=1)
|
|
return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":_h(m).get("status"),"gpu":_h(m).get("gpu_name")} for m in GPU_URLS]})
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
gpus = {}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m, sidecar_timeout=1.5, gpu_timeout=1)
|
|
h["active_requests"] = gpu_active_count(m)
|
|
h["max_concurrent"] = GPU_MAX_CONCURRENT.get(m, 1)
|
|
gpus[m] = h
|
|
return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":gpus,"available_models":available_models()})
|
|
|
|
@app.route("/metrics")
|
|
def metrics(): return jsonify(get_metrics())
|
|
|
|
@app.route("/metrics/timeseries")
|
|
def metrics_timeseries():
|
|
period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys())
|
|
data = {"models": {}, "labels": []}
|
|
if period == "day":
|
|
buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
|
|
data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
|
|
elif period == "week":
|
|
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
|
|
data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
|
|
else:
|
|
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
|
|
data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
|
|
if r:
|
|
for model in models_list:
|
|
counts = []
|
|
for bucket in buckets:
|
|
total = 0
|
|
if period in ("week","month"):
|
|
for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0)
|
|
else: total = int(r.get("ts:"+model+":"+bucket) or 0)
|
|
counts.append(total)
|
|
data["models"][model] = counts
|
|
return jsonify(data)
|
|
|
|
@app.route("/stream")
|
|
def stream():
|
|
def ev():
|
|
q = queue.Queue()
|
|
with sse_lock: sse_subscribers.append(q)
|
|
try:
|
|
yield "data: "+json.dumps(get_metrics())+"\n\n"
|
|
while True:
|
|
try: yield "data: "+q.get(timeout=3)+"\n\n"
|
|
except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n"
|
|
except GeneratorExit: pass
|
|
finally:
|
|
with sse_lock:
|
|
if q in sse_subscribers: sse_subscribers.remove(q)
|
|
return Response(stream_with_context(ev()), mimetype="text/event-stream",
|
|
headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"})
|
|
|
|
if __name__ == "__main__":
|
|
log.info("Router on :9000 (load-aware)")
|
|
app.run(host="0.0.0.0", port=9000, debug=False)
|