621a897bec
More conversations now route to VLM as primary. 9B VLM has 262K context window and 88 tok/s average — well suited for moderate conversations. Dense absorbs overflow and heavy reasoning.
632 lines
29 KiB
Python
632 lines
29 KiB
Python
import os, json, time, logging, traceback, threading, queue, statistics, math
|
|
import requests, redis
|
|
from flask import Flask, request, jsonify, Response, stream_with_context
|
|
|
|
REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379")
|
|
GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1")
|
|
GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1")
|
|
GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1")
|
|
|
|
GPU_SIDECARS = {
|
|
"qwen3.6-35B-A3B": "http://192.168.68.15:8090",
|
|
"qwen3.6-27B-code": "http://192.168.68.8:8090",
|
|
"qwen3.5-9b-vlm": "http://192.168.68.110:8090",
|
|
}
|
|
GPU_URLS = {
|
|
"qwen3.6-35B-A3B": GPU_MOE_URL,
|
|
"qwen3.6-27B-code": GPU_DENSE_URL,
|
|
"qwen3.5-9b-vlm": GPU_LIGHT_URL,
|
|
}
|
|
# Max concurrent requests per GPU (based on llama.cpp --parallel)
|
|
GPU_MAX_CONCURRENT = {
|
|
"qwen3.6-35B-A3B": 2, # 2 slots (Dense-first routing reduces thermal load)
|
|
"qwen3.6-27B-code": 1, # 1 slot (24GB VRAM saturated at 256K ctx)
|
|
"qwen3.5-9b-vlm": 2, # 2 slots (12GB VRAM, 4GB headroom)
|
|
}
|
|
|
|
# Context window sizes (tokens) — used for compaction signals
|
|
GPU_CONTEXT = {
|
|
"qwen3.6-35B-A3B": 262144,
|
|
"qwen3.6-27B-code": 196608,
|
|
"qwen3.5-9b-vlm": 262144,
|
|
}
|
|
|
|
TIER_MODELS = {
|
|
"starter": ["qwen3.5-9b-vlm"],
|
|
"professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
|
|
"enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
|
|
}
|
|
API_KEYS = {
|
|
"sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"},
|
|
"sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"},
|
|
"sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"},
|
|
"sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"},
|
|
"sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"},
|
|
"sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"},
|
|
"sk-syslog-koonimo": {"tier": "enterprise", "agent": "Koonimo"},
|
|
"sk-starter-abc123": {"tier": "starter", "agent": "test-starter"},
|
|
"sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"},
|
|
}
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s")
|
|
log = logging.getLogger("router")
|
|
try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping()
|
|
except Exception: r = None
|
|
|
|
|
|
def counter_audit_loop():
|
|
"""Every 30s, check GPU slots and reset counters if all slots idle."""
|
|
while True:
|
|
time.sleep(30)
|
|
if not r: continue
|
|
for model, url in GPU_URLS.items():
|
|
try:
|
|
resp = requests.get(url.replace("/v1","") + "/slots",
|
|
headers={"Authorization": "Bearer not-needed"}, timeout=5)
|
|
if resp.status_code == 200:
|
|
slots = resp.json()
|
|
all_idle = all(not s.get("is_processing", False) for s in slots)
|
|
if all_idle:
|
|
current = int(r.get("active:" + model) or 0)
|
|
if current > 0:
|
|
r.set("active:" + model, 0)
|
|
log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current)
|
|
except Exception:
|
|
pass
|
|
|
|
threading.Thread(target=counter_audit_loop, daemon=True).start()
|
|
|
|
app = Flask(__name__)
|
|
sse_subscribers = []; sse_lock = threading.Lock()
|
|
|
|
def gpu_active_count(model):
|
|
"""Get number of in-flight requests for a GPU."""
|
|
if r:
|
|
return int(r.get("active:" + model) or 0)
|
|
return 0
|
|
|
|
def gpu_incr(model):
|
|
if r: r.incr("active:" + model)
|
|
|
|
def gpu_decr(model):
|
|
if r:
|
|
v = r.decr("active:" + model)
|
|
if v and int(v) < 0:
|
|
r.set("active:" + model, 0) # never go negative
|
|
|
|
def check_gpu_health(model, sidecar_timeout=5, gpu_timeout=3):
|
|
url = GPU_SIDECARS.get(model)
|
|
if not url: return {"status": "unknown"}
|
|
try:
|
|
resp = requests.get(url, timeout=sidecar_timeout)
|
|
if resp.status_code == 200:
|
|
d = resp.json()
|
|
pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100
|
|
status = "healthy" # VRAM usage != saturation; busy slots handled by is_gpu_busy()
|
|
vram_warning = pct >= 95
|
|
# Also check if llama.cpp endpoint is actually responding
|
|
gpu_url = GPU_URLS.get(model, "")
|
|
try:
|
|
hr = requests.get(gpu_url.replace("/v1","") + "/health", headers={"Authorization": "Bearer not-needed"}, timeout=gpu_timeout)
|
|
if hr.status_code != 200:
|
|
status = "down"
|
|
except Exception:
|
|
status = "down"
|
|
return {"status": status, "vram_warning": vram_warning, "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")}
|
|
except Exception: pass
|
|
return {"status": "down"}
|
|
|
|
def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")]
|
|
|
|
def estimate_tokens(msgs):
|
|
"""Estimate token count from messages. Uses JSON length / 3.5 (closer to real tokenizer ratios for dense text)."""
|
|
return len(json.dumps(msgs, default=str)) // 3.5
|
|
|
|
def store_perf_record(model, agent, tier, reason, queue_ms, inference_ms, prompt_tokens, completion_tokens, stream):
|
|
"""Store detailed performance record in Redis for analytics."""
|
|
if not r: return
|
|
try:
|
|
total_ms = queue_ms + inference_ms
|
|
tps = completion_tokens / (inference_ms / 1000) if inference_ms > 0 and completion_tokens > 0 else 0
|
|
rec = json.dumps({
|
|
"ts": time.time(),
|
|
"model": model, "agent": agent, "tier": tier, "reason": reason,
|
|
"queue_ms": round(queue_ms, 1),
|
|
"inference_ms": round(inference_ms, 1),
|
|
"total_ms": round(total_ms, 1),
|
|
"prompt_tokens": prompt_tokens,
|
|
"completion_tokens": completion_tokens,
|
|
"tokens_per_sec": round(tps, 1),
|
|
"stream": stream
|
|
})
|
|
# Global recent list (last 500)
|
|
r.lpush("perf:recent", rec)
|
|
r.ltrim("perf:recent", 0, 499)
|
|
# Per-model list (last 200)
|
|
r.lpush("perf:model:" + model, rec)
|
|
r.ltrim("perf:model:" + model, 0, 199)
|
|
# Per-reason list (last 200)
|
|
r.lpush("perf:reason:" + reason, rec)
|
|
r.ltrim("perf:reason:" + reason, 0, 199)
|
|
# Per-agent list (last 200)
|
|
r.lpush("perf:agent:" + agent, rec)
|
|
r.ltrim("perf:agent:" + agent, 0, 199)
|
|
except Exception:
|
|
pass
|
|
|
|
def is_gpu_busy(model):
|
|
"""Check if GPU is at or near max concurrent capacity."""
|
|
active = gpu_active_count(model)
|
|
max_c = GPU_MAX_CONCURRENT.get(model, 1)
|
|
return active >= max_c
|
|
|
|
def select_best_gpu(candidates, reason):
|
|
"""Pick the best GPU from candidates IN ORDER — first non-busy one wins."""
|
|
for m in candidates:
|
|
if not is_gpu_busy(m):
|
|
return {"model": m, "reason": reason}
|
|
# All busy — pick least loaded
|
|
best = None
|
|
best_load = 999
|
|
for m in candidates:
|
|
load = gpu_active_count(m)
|
|
if load < best_load:
|
|
best_load = load
|
|
best = m
|
|
if best:
|
|
return {"model": best, "reason": "load_balanced_" + reason}
|
|
return None
|
|
|
|
def route(rd, tier):
|
|
msgs = rd.get("messages",[]); t = estimate_tokens(msgs)
|
|
sys = any(m.get("role")=="system" for m in msgs)
|
|
turns = len([m for m in msgs if m.get("role") in ("user","assistant")])
|
|
hints = rd.get("routing_hints",{})
|
|
allowed = TIER_MODELS.get(tier, ["qwen3.5-9b-vlm"])
|
|
avail = [m for m in available_models() if m in allowed]
|
|
if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True}
|
|
# Check if all available GPUs are at max capacity
|
|
if all(is_gpu_busy(m) for m in avail):
|
|
return {"model": avail[0], "reason": "all_saturated", "saturated": True}
|
|
|
|
req = rd.get("model","auto")
|
|
if req != "auto":
|
|
target = req if req in avail else avail[0]
|
|
# If explicit model is busy, check if another can take it
|
|
if is_gpu_busy(target) and req in allowed:
|
|
alts = [m for m in avail if m != target and m in allowed]
|
|
if alts:
|
|
alt = select_best_gpu(alts, "explicit")
|
|
if alt: return alt
|
|
return {"model": target, "reason": "explicit"}
|
|
|
|
if hints:
|
|
if hints.get("priority")=="speed" and "qwen3.5-9b-vlm" in avail:
|
|
return select_best_gpu(["qwen3.5-9b-vlm"], "hint_speed") or {"model":"qwen3.5-9b-vlm","reason":"hint_speed"}
|
|
if hints.get("priority")=="quality" and "qwen3.6-35B-A3B" in avail:
|
|
return select_best_gpu(["qwen3.6-35B-A3B"], "hint_quality") or {"model":"qwen3.6-35B-A3B","reason":"hint_quality"}
|
|
|
|
first_msg = msgs[0].get("content","") if msgs else ""
|
|
words = len(first_msg.split()) if isinstance(first_msg, str) else 99
|
|
|
|
# TIER 1: Lightweight — single-turn short queries → VLM (fastest)
|
|
if not sys and turns <= 1 and t <= 500 and words <= 100 and "qwen3.5-9b-vlm" in avail:
|
|
if not is_gpu_busy("qwen3.5-9b-vlm"):
|
|
return {"model":"qwen3.5-9b-vlm","reason":"lightweight"}
|
|
# VLM busy — Dense is faster for short queries than MoE
|
|
fallback = [m for m in ["qwen3.6-27B-code","qwen3.6-35B-A3B"] if m in avail]
|
|
result = select_best_gpu(fallback, "lightweight_fallback")
|
|
if result: return result
|
|
|
|
# TIER 2: Simple conversations — VLM primary (up to 10K tok), any prompt → VLM first, Dense second
|
|
if t <= 10000 and turns <= 10 and "qwen3.5-9b-vlm" in avail:
|
|
if not is_gpu_busy("qwen3.5-9b-vlm"):
|
|
return {"model":"qwen3.5-9b-vlm","reason":"simple_conv"}
|
|
# VLM busy — fall back to Dense, then MoE
|
|
fallback = [m for m in ["qwen3.6-27B-code","qwen3.6-35B-A3B"] if m in avail]
|
|
result = select_best_gpu(fallback, "simple_conv_fallback")
|
|
if result: return result
|
|
|
|
# TIER 3: Medium complexity — Dense primary (speed), MoE fallback
|
|
if t <= 25000 and turns <= 15:
|
|
candidates = [m for m in ["qwen3.6-27B-code","qwen3.5-9b-vlm","qwen3.6-35B-A3B"] if m in avail]
|
|
result = select_best_gpu(candidates, "medium")
|
|
if result: return result
|
|
|
|
# TIER 4: Heavy reasoning — Dense first (thermal), MoE fallback (192K/262K context)
|
|
if t > 25000 or turns > 15:
|
|
candidates = [m for m in ["qwen3.6-27B-code","qwen3.6-35B-A3B","qwen3.5-9b-vlm"] if m in avail]
|
|
result = select_best_gpu(candidates, "heavy_reasoning")
|
|
if result: return result
|
|
|
|
# TIER 5: Default — balanced distribution: Dense first (speed), MoE second (capacity)
|
|
candidates = [m for m in ["qwen3.6-27B-code","qwen3.5-9b-vlm","qwen3.6-35B-A3B"] if m in avail]
|
|
result = select_best_gpu(candidates, "default")
|
|
if result: return result
|
|
return {"model":avail[0],"reason":"last_resort"}
|
|
|
|
def clean_unicode(text):
|
|
if not isinstance(text, str): return text
|
|
text = text.replace(chr(0x2014), "-"); text = text.replace(chr(0x2013), "-")
|
|
text = text.replace(chr(0x2018), "'"); text = text.replace(chr(0x2019), "'")
|
|
text = text.replace(chr(0x201C), '"'); text = text.replace(chr(0x201D), '"')
|
|
text = text.replace(chr(0x2026), "..."); text = text.replace(chr(0x00A0), " ")
|
|
return text.encode("ascii", "ignore").decode("ascii")
|
|
|
|
def clean_response(d):
|
|
if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()}
|
|
if isinstance(d, list): return [clean_response(v) for v in d]
|
|
if isinstance(d, str): return clean_unicode(d)
|
|
return d
|
|
|
|
def get_metrics():
|
|
d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time(),"active_requests":{}}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m)
|
|
d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w"),"active_requests":gpu_active_count(m), "max_concurrent": GPU_MAX_CONCURRENT.get(m, 1)})
|
|
d["active_requests"][m] = gpu_active_count(m)
|
|
if r:
|
|
try:
|
|
for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0)
|
|
for k,v in API_KEYS.items():
|
|
c = int(r.get("routes:agent:"+v["agent"]) or 0)
|
|
if c>0: d["agent_counts"][v["agent"]] = c
|
|
for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0)
|
|
raw = r.lrange("routes:recent",0,49)
|
|
d["recent"] = [json.loads(x) for x in raw] if raw else []
|
|
except Exception: pass
|
|
return d
|
|
|
|
def bcast():
|
|
data = get_metrics(); payload = json.dumps(data)
|
|
with sse_lock:
|
|
dead = []
|
|
for q in sse_subscribers:
|
|
try: q.put(payload)
|
|
except Exception: dead.append(q)
|
|
for q in dead: sse_subscribers.remove(q)
|
|
|
|
QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503
|
|
|
|
@app.route("/v1/chat/completions", methods=["POST"])
|
|
def chat():
|
|
try:
|
|
rd = request.get_json(force=True)
|
|
ak = request.headers.get("Authorization","").replace("Bearer ","")
|
|
if not ak or ak not in API_KEYS:
|
|
log.warning("AUTH_REJECTED: no/invalid API key from %s", request.remote_addr)
|
|
return jsonify({"error": "Unauthorized — valid API key required"}), 401
|
|
ki = API_KEYS[ak]
|
|
tier, agent = ki["tier"], ki["agent"]
|
|
|
|
# Allow agent to override queue timeout via header
|
|
q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT)))
|
|
|
|
# Cross-turn context tracking: accumulate tokens per session
|
|
session_id = request.headers.get("X-Session-Id", "")
|
|
session_tokens = 0
|
|
if session_id and r:
|
|
try:
|
|
prev = int(r.get("session:" + session_id) or 0)
|
|
current = estimate_tokens(rd.get("messages",[]))
|
|
session_tokens = max(prev, current) # context only grows
|
|
r.set("session:" + session_id, session_tokens, ex=86400) # TTL 24h
|
|
except Exception: pass
|
|
|
|
d = route(rd, tier)
|
|
queue_start = time.time()
|
|
|
|
# Queue loop: wait for a GPU slot instead of immediate 503
|
|
while d.get("saturated"):
|
|
elapsed = time.time() - queue_start
|
|
if elapsed > q_timeout:
|
|
resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5})
|
|
resp.headers["Retry-After"] = "5"
|
|
log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed)
|
|
return resp, 503
|
|
time.sleep(0.5) # poll every 500ms
|
|
d = route(rd, tier)
|
|
|
|
queue_ms = (time.time() - queue_start) * 1000
|
|
if queue_ms > 500:
|
|
log.info("QUEUED: %s waited %.0fms before slot opened", agent, queue_ms)
|
|
model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
|
|
is_stream = rd.get("stream", False)
|
|
|
|
gpu_incr(model)
|
|
|
|
log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1))
|
|
if r:
|
|
try:
|
|
r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent)
|
|
r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H"))
|
|
r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent,"queue_ms": round(queue_ms,1)}))
|
|
r.ltrim("routes:recent",0,999)
|
|
except Exception: pass
|
|
start = time.time()
|
|
resp = requests.post(url+"/chat/completions", json=rd,
|
|
headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream)
|
|
lat = int((time.time()-start)*1000)
|
|
gpu_decr(model)
|
|
|
|
if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502
|
|
if is_stream:
|
|
# Buffer stream to capture timings from final SSE chunk
|
|
chunks = []
|
|
stream_timings = {}
|
|
for raw in resp.iter_content(chunk_size=None, decode_unicode=True):
|
|
if raw:
|
|
cleaned = clean_unicode(raw)
|
|
chunks.append(cleaned)
|
|
# Parse last content chunk (before [DONE]) for timings
|
|
if not stream_timings and '"timings"' in cleaned and '"predicted_n"' in cleaned:
|
|
try:
|
|
json_str = cleaned.replace("data: ", "").strip()
|
|
if json_str.startswith("{"):
|
|
tj = json.loads(json_str).get("timings", {})
|
|
if tj:
|
|
stream_timings = tj
|
|
except: pass
|
|
# Store perf record with real token counts from stream
|
|
if stream_timings:
|
|
pt = stream_timings.get("prompt_n", 0)
|
|
ct = stream_timings.get("predicted_n", 0)
|
|
tps = stream_timings.get("predicted_per_second", 0)
|
|
gen_ms = stream_timings.get("predicted_ms", lat)
|
|
store_perf_record(model, agent, tier, reason, queue_ms, gen_ms, pt, ct, True)
|
|
else:
|
|
store_perf_record(model, agent, tier, reason, queue_ms, lat, estimate_tokens(rd.get("messages",[])), 0, True)
|
|
# Yield all chunks to client
|
|
def gen():
|
|
for c in chunks: yield c
|
|
bcast()
|
|
ctx_remaining = GPU_CONTEXT.get(model, 65536) - max(session_tokens, estimate_tokens(rd.get("messages",[])))
|
|
ctx_pct = ctx_remaining / GPU_CONTEXT.get(model, 65536) * 100
|
|
ctx_warning = "compact_urgent" if ctx_pct < 5 else ("compact_recommended" if ctx_pct < 15 else ("compact_soon" if ctx_pct < 30 else "ok"))
|
|
sse_resp = Response(stream_with_context(gen()), mimetype="text/event-stream")
|
|
sse_resp.headers["X-Context-Remaining"] = str(max(0, ctx_remaining))
|
|
sse_resp.headers["X-Context-Warning"] = ctx_warning
|
|
sse_resp.headers["X-Context-Model"] = model
|
|
return sse_resp
|
|
data = clean_response(resp.json())
|
|
for c in data.get("choices",[]):
|
|
msg = c.get("message",{})
|
|
if not msg.get("content") and msg.get("reasoning_content"):
|
|
msg["content"] = msg["reasoning_content"]
|
|
# Extract performance data from llama.cpp response
|
|
usage = data.get("usage", {})
|
|
timings = data.get("timings", {})
|
|
prompt_tokens = usage.get("prompt_tokens", 0)
|
|
completion_tokens = usage.get("completion_tokens", 0)
|
|
inference_ms = lat # total GPU round-trip
|
|
store_perf_record(model, agent, tier, reason, queue_ms, inference_ms, prompt_tokens, completion_tokens, False)
|
|
ctx_remaining = GPU_CONTEXT.get(model, 65536) - max(session_tokens, estimate_tokens(rd.get("messages",[])))
|
|
ctx_pct = ctx_remaining / GPU_CONTEXT.get(model, 65536) * 100
|
|
ctx_warning = "compact_urgent" if ctx_pct < 5 else ("compact_recommended" if ctx_pct < 15 else ("compact_soon" if ctx_pct < 30 else "ok"))
|
|
data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"queue_ms": round(queue_ms,1),"active_gpu":gpu_active_count(model),"context_remaining": max(0, ctx_remaining),"context_pct": round(ctx_pct,1),"context_warning": ctx_warning}
|
|
resp = jsonify(data)
|
|
resp.headers["X-Context-Remaining"] = str(max(0, ctx_remaining))
|
|
resp.headers["X-Context-Warning"] = ctx_warning
|
|
resp.headers["X-Context-Model"] = model
|
|
bcast()
|
|
return resp
|
|
except requests.Timeout:
|
|
gpu_decr(model)
|
|
log.error("TIMEOUT: %s -> %s", agent, model)
|
|
return jsonify({"error":"timeout"}), 504
|
|
except Exception as e:
|
|
gpu_decr(model)
|
|
log.error("Error: %s\n%s", e, traceback.format_exc())
|
|
return jsonify({"error":str(e)}), 500
|
|
|
|
@app.route("/metrics/performance")
|
|
def performance():
|
|
"""Per-request performance analytics with percentiles per model/reason/agent."""
|
|
if not r: return jsonify({"error": "Redis unavailable"}), 503
|
|
try:
|
|
window_hours = int(request.args.get("window", "24"))
|
|
model_filter = request.args.get("model", "all")
|
|
|
|
# Load recent records
|
|
cutoff = time.time() - (window_hours * 3600)
|
|
raw = r.lrange("perf:recent", 0, -1)
|
|
records = []
|
|
for x in raw:
|
|
try:
|
|
rec = json.loads(x)
|
|
if rec["ts"] >= cutoff:
|
|
records.append(rec)
|
|
except: pass
|
|
|
|
# Filter by model if specified
|
|
if model_filter != "all":
|
|
records = [r for r in records if r["model"] == model_filter]
|
|
|
|
if not records:
|
|
return jsonify({"models": [], "reasons": [], "agents": [], "summary": {"total_requests": 0}})
|
|
|
|
def pct(values, p):
|
|
if len(values) < 2: return round(values[0], 1) if values else 0
|
|
return round(statistics.quantiles(sorted(values), n=100, method='inclusive')[min(p-1, 98)], 1)
|
|
|
|
# Per-model stats
|
|
model_groups = {}
|
|
for rec in records:
|
|
m = rec["model"]
|
|
if m not in model_groups: model_groups[m] = []
|
|
model_groups[m].append(rec)
|
|
|
|
models = []
|
|
for m, recs in sorted(model_groups.items()):
|
|
latencies = [r["total_ms"] for r in recs]
|
|
tps_vals = [r["tokens_per_sec"] for r in recs if r["tokens_per_sec"] > 0]
|
|
non_stream = [r for r in recs if not r["stream"]]
|
|
queue_times = [r["queue_ms"] for r in non_stream]
|
|
models.append({
|
|
"model": m,
|
|
"count": len(recs),
|
|
"stream_pct": round(len([r for r in recs if r["stream"]]) / len(recs) * 100, 1),
|
|
"latency": {
|
|
"avg": round(statistics.mean(latencies), 1),
|
|
"p50": pct(latencies, 50),
|
|
"p95": pct(latencies, 95),
|
|
"p99": pct(latencies, 99)
|
|
},
|
|
"throughput": {
|
|
"avg_tokens_per_sec": round(statistics.mean(tps_vals), 1) if tps_vals else 0,
|
|
"p50": pct(tps_vals, 50) if tps_vals else 0,
|
|
"p95": pct(tps_vals, 95) if tps_vals else 0,
|
|
},
|
|
"queue": {
|
|
"avg_ms": round(statistics.mean(queue_times), 1) if queue_times else 0,
|
|
"p95_ms": pct(queue_times, 95) if queue_times else 0,
|
|
} if queue_times else None
|
|
})
|
|
|
|
# Per-reason stats
|
|
reason_groups = {}
|
|
for rec in records:
|
|
rsn = rec["reason"]
|
|
if rsn not in reason_groups: reason_groups[rsn] = []
|
|
reason_groups[rsn].append(rec)
|
|
|
|
reasons = []
|
|
for rsn, recs in sorted(reason_groups.items(), key=lambda x: -len(x[1])):
|
|
latencies = [r["total_ms"] for r in recs]
|
|
reasons.append({
|
|
"reason": rsn,
|
|
"count": len(recs),
|
|
"avg_total_ms": round(statistics.mean(latencies), 1),
|
|
"p95_total_ms": pct(latencies, 95)
|
|
})
|
|
|
|
# Per-agent stats
|
|
agent_groups = {}
|
|
for rec in records:
|
|
ag = rec["agent"]
|
|
if ag not in agent_groups: agent_groups[ag] = []
|
|
agent_groups[ag].append(rec)
|
|
|
|
agents = []
|
|
for ag, recs in sorted(agent_groups.items(), key=lambda x: -len(x[1])):
|
|
latencies = [r["total_ms"] for r in recs]
|
|
tps_vals = [r["tokens_per_sec"] for r in recs if r["tokens_per_sec"] > 0]
|
|
agents.append({
|
|
"agent": ag,
|
|
"count": len(recs),
|
|
"avg_total_ms": round(statistics.mean(latencies), 1),
|
|
"avg_tokens_per_sec": round(statistics.mean(tps_vals), 1) if tps_vals else 0
|
|
})
|
|
|
|
all_lat = [r["total_ms"] for r in records]
|
|
all_tps = [r["tokens_per_sec"] for r in records if r["tokens_per_sec"] > 0]
|
|
summary = {
|
|
"total_requests": len(records),
|
|
"window_hours": window_hours,
|
|
"latency": {
|
|
"avg_ms": round(statistics.mean(all_lat), 1),
|
|
"p50_ms": pct(all_lat, 50),
|
|
"p95_ms": pct(all_lat, 95),
|
|
"p99_ms": pct(all_lat, 99)
|
|
},
|
|
"throughput_avg_tps": round(statistics.mean(all_tps), 1) if all_tps else 0
|
|
}
|
|
|
|
return jsonify({"models": models, "reasons": reasons, "agents": agents, "summary": summary})
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/metrics/scatter")
|
|
def scatter():
|
|
"""Return individual data points for scatter plots (prompt_tokens vs latency)."""
|
|
if not r: return jsonify({"error": "Redis unavailable"}), 503
|
|
try:
|
|
window_hours = int(request.args.get("window", "24"))
|
|
model_filter = request.args.get("model", "all")
|
|
cutoff = time.time() - (window_hours * 3600)
|
|
raw = r.lrange("perf:recent", 0, -1)
|
|
points = []
|
|
for x in raw:
|
|
try:
|
|
rec = json.loads(x)
|
|
if rec["ts"] >= cutoff:
|
|
if model_filter == "all" or rec["model"] == model_filter:
|
|
points.append({
|
|
"model": rec["model"],
|
|
"agent": rec["agent"],
|
|
"reason": rec["reason"],
|
|
"prompt_tokens": int(rec.get("prompt_tokens", 0)),
|
|
"completion_tokens": rec.get("completion_tokens", 0),
|
|
"inference_ms": round(rec["inference_ms"], 1),
|
|
"tokens_per_sec": rec.get("tokens_per_sec", 0),
|
|
"stream": rec.get("stream", False)
|
|
})
|
|
except: pass
|
|
return jsonify({"points": points, "count": len(points)})
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/v1/models")
|
|
def models():
|
|
def _h(m): return check_gpu_health(m, sidecar_timeout=1.5, gpu_timeout=1)
|
|
return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":_h(m).get("status"),"gpu":_h(m).get("gpu_name")} for m in GPU_URLS]})
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
gpus = {}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m, sidecar_timeout=1.5, gpu_timeout=1)
|
|
h["active_requests"] = gpu_active_count(m)
|
|
h["max_concurrent"] = GPU_MAX_CONCURRENT.get(m, 1)
|
|
gpus[m] = h
|
|
return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":gpus,"available_models":available_models()})
|
|
|
|
@app.route("/metrics")
|
|
def metrics(): return jsonify(get_metrics())
|
|
|
|
@app.route("/metrics/timeseries")
|
|
def metrics_timeseries():
|
|
period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys())
|
|
data = {"models": {}, "labels": []}
|
|
if period == "day":
|
|
buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
|
|
data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
|
|
elif period == "week":
|
|
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
|
|
data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
|
|
else:
|
|
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
|
|
data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
|
|
if r:
|
|
for model in models_list:
|
|
counts = []
|
|
for bucket in buckets:
|
|
total = 0
|
|
if period in ("week","month"):
|
|
for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0)
|
|
else: total = int(r.get("ts:"+model+":"+bucket) or 0)
|
|
counts.append(total)
|
|
data["models"][model] = counts
|
|
return jsonify(data)
|
|
|
|
@app.route("/stream")
|
|
def stream():
|
|
def ev():
|
|
q = queue.Queue()
|
|
with sse_lock: sse_subscribers.append(q)
|
|
try:
|
|
yield "data: "+json.dumps(get_metrics())+"\n\n"
|
|
while True:
|
|
try: yield "data: "+q.get(timeout=3)+"\n\n"
|
|
except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n"
|
|
except GeneratorExit: pass
|
|
finally:
|
|
with sse_lock:
|
|
if q in sse_subscribers: sse_subscribers.remove(q)
|
|
return Response(stream_with_context(ev()), mimetype="text/event-stream",
|
|
headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"})
|
|
|
|
if __name__ == "__main__":
|
|
log.info("Router on :9000 (load-aware)")
|
|
app.run(host="0.0.0.0", port=9000, debug=False)
|