From b849cd3395cbe95ab09694babdaebbe79bbdc4a0 Mon Sep 17 00:00:00 2001 From: Abiba Date: Mon, 25 May 2026 16:50:45 +0000 Subject: [PATCH] feat: per-request performance tracking + /metrics/performance endpoint router/router.py (+158 lines): - store_perf_record(): captures queue_ms, inference_ms, prompt_tokens, completion_tokens, tokens_per_sec per request in Redis - Per-model, per-reason, per-agent rolling windows (last 200-500) - /metrics/performance?window=N endpoint with percentiles (p50/p95/p99) for latency, throughput, and queue time per model/reason/agent - Queue time now surfaced in routing metadata and routes:recent - Streaming requests tracked with estimated prompt tokens nginx/nginx.conf: - Added /metrics/ proxy pass to router_api Enables model performance comparison and routing tier validation. --- nginx/nginx.conf | 7 ++ router/router.py | 170 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 171 insertions(+), 6 deletions(-) diff --git a/nginx/nginx.conf b/nginx/nginx.conf index 83e5640..da33ca2 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf @@ -78,6 +78,13 @@ http { proxy_buffering off; } + # Performance analytics + location /metrics/ { + proxy_pass http://router_api; + proxy_http_version 1.1; + proxy_set_header Host $host; + } + location /health { proxy_pass http://router_api/health; proxy_http_version 1.1; diff --git a/router/router.py b/router/router.py index c1a239e..06eeb60 100644 --- a/router/router.py +++ b/router/router.py @@ -1,4 +1,4 @@ -import os, json, time, logging, traceback, threading, queue +import os, json, time, logging, traceback, threading, queue, statistics, math import requests, redis from flask import Flask, request, jsonify, Response, stream_with_context @@ -122,6 +122,38 @@ def estimate_tokens(msgs): """Estimate token count from messages. Uses JSON length / 3.5 (closer to real tokenizer ratios for dense text).""" return len(json.dumps(msgs, default=str)) // 3.5 +def store_perf_record(model, agent, tier, reason, queue_ms, inference_ms, prompt_tokens, completion_tokens, stream): + """Store detailed performance record in Redis for analytics.""" + if not r: return + try: + total_ms = queue_ms + inference_ms + tps = completion_tokens / (inference_ms / 1000) if inference_ms > 0 and completion_tokens > 0 else 0 + rec = json.dumps({ + "ts": time.time(), + "model": model, "agent": agent, "tier": tier, "reason": reason, + "queue_ms": round(queue_ms, 1), + "inference_ms": round(inference_ms, 1), + "total_ms": round(total_ms, 1), + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "tokens_per_sec": round(tps, 1), + "stream": stream + }) + # Global recent list (last 500) + r.lpush("perf:recent", rec) + r.ltrim("perf:recent", 0, 499) + # Per-model list (last 200) + r.lpush("perf:model:" + model, rec) + r.ltrim("perf:model:" + model, 0, 199) + # Per-reason list (last 200) + r.lpush("perf:reason:" + reason, rec) + r.ltrim("perf:reason:" + reason, 0, 199) + # Per-agent list (last 200) + r.lpush("perf:agent:" + agent, rec) + r.ltrim("perf:agent:" + agent, 0, 199) + except Exception: + pass + def is_gpu_busy(model): """Check if GPU is at or near max concurrent capacity.""" active = gpu_active_count(model) @@ -296,9 +328,9 @@ def chat(): time.sleep(0.5) # poll every 500ms d = route(rd, tier) - waited = time.time() - queue_start - if waited > 0.5: - log.info("QUEUED: %s waited %.1fs before slot opened", agent, waited) + queue_ms = (time.time() - queue_start) * 1000 + if queue_ms > 500: + log.info("QUEUED: %s waited %.0fms before slot opened", agent, queue_ms) model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]] is_stream = rd.get("stream", False) @@ -309,7 +341,7 @@ def chat(): try: r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent) r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H")) - r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent})) + r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent,"queue_ms": round(queue_ms,1)})) r.ltrim("routes:recent",0,999) except Exception: pass start = time.time() @@ -323,6 +355,8 @@ def chat(): def gen(): for raw in resp.iter_content(chunk_size=None, decode_unicode=True): if raw: yield clean_unicode(raw) + # Streaming: can't get token counts without parsing stream, store latency + estimated tokens + store_perf_record(model, agent, tier, reason, queue_ms, lat, estimate_tokens(rd.get("messages",[])), 0, True) bcast() ctx_remaining = GPU_CONTEXT.get(model, 65536) - max(session_tokens, estimate_tokens(rd.get("messages",[]))) ctx_pct = ctx_remaining / GPU_CONTEXT.get(model, 65536) * 100 @@ -337,10 +371,17 @@ def chat(): msg = c.get("message",{}) if not msg.get("content") and msg.get("reasoning_content"): msg["content"] = msg["reasoning_content"] + # Extract performance data from llama.cpp response + usage = data.get("usage", {}) + timings = data.get("timings", {}) + prompt_tokens = usage.get("prompt_tokens", 0) + completion_tokens = usage.get("completion_tokens", 0) + inference_ms = lat # total GPU round-trip + store_perf_record(model, agent, tier, reason, queue_ms, inference_ms, prompt_tokens, completion_tokens, False) ctx_remaining = GPU_CONTEXT.get(model, 65536) - max(session_tokens, estimate_tokens(rd.get("messages",[]))) ctx_pct = ctx_remaining / GPU_CONTEXT.get(model, 65536) * 100 ctx_warning = "compact_urgent" if ctx_pct < 5 else ("compact_recommended" if ctx_pct < 15 else ("compact_soon" if ctx_pct < 30 else "ok")) - data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"active_gpu":gpu_active_count(model),"context_remaining": max(0, ctx_remaining),"context_pct": round(ctx_pct,1),"context_warning": ctx_warning} + data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"queue_ms": round(queue_ms,1),"active_gpu":gpu_active_count(model),"context_remaining": max(0, ctx_remaining),"context_pct": round(ctx_pct,1),"context_warning": ctx_warning} resp = jsonify(data) resp.headers["X-Context-Remaining"] = str(max(0, ctx_remaining)) resp.headers["X-Context-Warning"] = ctx_warning @@ -356,6 +397,123 @@ def chat(): log.error("Error: %s\n%s", e, traceback.format_exc()) return jsonify({"error":str(e)}), 500 +@app.route("/metrics/performance") +def performance(): + """Per-request performance analytics with percentiles per model/reason/agent.""" + if not r: return jsonify({"error": "Redis unavailable"}), 503 + try: + window_hours = int(request.args.get("window", "24")) + model_filter = request.args.get("model", "all") + + # Load recent records + cutoff = time.time() - (window_hours * 3600) + raw = r.lrange("perf:recent", 0, -1) + records = [] + for x in raw: + try: + rec = json.loads(x) + if rec["ts"] >= cutoff: + records.append(rec) + except: pass + + # Filter by model if specified + if model_filter != "all": + records = [r for r in records if r["model"] == model_filter] + + if not records: + return jsonify({"models": [], "reasons": [], "agents": [], "summary": {"total_requests": 0}}) + + def pct(values, p): + if not values: return 0 + return round(statistics.quantiles(sorted(values), n=100, method='inclusive')[min(p-1, 98)], 1) + + # Per-model stats + model_groups = {} + for rec in records: + m = rec["model"] + if m not in model_groups: model_groups[m] = [] + model_groups[m].append(rec) + + models = [] + for m, recs in sorted(model_groups.items()): + latencies = [r["total_ms"] for r in recs] + tps_vals = [r["tokens_per_sec"] for r in recs if r["tokens_per_sec"] > 0] + non_stream = [r for r in recs if not r["stream"]] + queue_times = [r["queue_ms"] for r in non_stream] + models.append({ + "model": m, + "count": len(recs), + "stream_pct": round(len([r for r in recs if r["stream"]]) / len(recs) * 100, 1), + "latency": { + "avg": round(statistics.mean(latencies), 1), + "p50": pct(latencies, 50), + "p95": pct(latencies, 95), + "p99": pct(latencies, 99) + }, + "throughput": { + "avg_tokens_per_sec": round(statistics.mean(tps_vals), 1) if tps_vals else 0, + "p50": pct(tps_vals, 50) if tps_vals else 0, + "p95": pct(tps_vals, 95) if tps_vals else 0, + }, + "queue": { + "avg_ms": round(statistics.mean(queue_times), 1) if queue_times else 0, + "p95_ms": pct(queue_times, 95) if queue_times else 0, + } if queue_times else None + }) + + # Per-reason stats + reason_groups = {} + for rec in records: + rsn = rec["reason"] + if rsn not in reason_groups: reason_groups[rsn] = [] + reason_groups[rsn].append(rec) + + reasons = [] + for rsn, recs in sorted(reason_groups.items(), key=lambda x: -len(x[1])): + latencies = [r["total_ms"] for r in recs] + reasons.append({ + "reason": rsn, + "count": len(recs), + "avg_total_ms": round(statistics.mean(latencies), 1), + "p95_total_ms": pct(latencies, 95) + }) + + # Per-agent stats + agent_groups = {} + for rec in records: + ag = rec["agent"] + if ag not in agent_groups: agent_groups[ag] = [] + agent_groups[ag].append(rec) + + agents = [] + for ag, recs in sorted(agent_groups.items(), key=lambda x: -len(x[1])): + latencies = [r["total_ms"] for r in recs] + tps_vals = [r["tokens_per_sec"] for r in recs if r["tokens_per_sec"] > 0] + agents.append({ + "agent": ag, + "count": len(recs), + "avg_total_ms": round(statistics.mean(latencies), 1), + "avg_tokens_per_sec": round(statistics.mean(tps_vals), 1) if tps_vals else 0 + }) + + all_lat = [r["total_ms"] for r in records] + all_tps = [r["tokens_per_sec"] for r in records if r["tokens_per_sec"] > 0] + summary = { + "total_requests": len(records), + "window_hours": window_hours, + "latency": { + "avg_ms": round(statistics.mean(all_lat), 1), + "p50_ms": pct(all_lat, 50), + "p95_ms": pct(all_lat, 95), + "p99_ms": pct(all_lat, 99) + }, + "throughput_avg_tps": round(statistics.mean(all_tps), 1) if all_tps else 0 + } + + return jsonify({"models": models, "reasons": reasons, "agents": agents, "summary": summary}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + @app.route("/v1/models") def models(): def _h(m): return check_gpu_health(m, sidecar_timeout=1.5, gpu_timeout=1)