import os, json, time, logging, traceback, threading, queue, statistics, math import requests, redis from flask import Flask, request, jsonify, Response, stream_with_context REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379") GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1") GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1") GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1") GPU_SIDECARS = { "qwen3.6-35B-A3B": "http://192.168.68.15:8090", "qwen3.6-27B-code": "http://192.168.68.8:8090", "qwen3.5-9b-vlm": "http://192.168.68.110:8090", } GPU_URLS = { "qwen3.6-35B-A3B": GPU_MOE_URL, "qwen3.6-27B-code": GPU_DENSE_URL, "qwen3.5-9b-vlm": GPU_LIGHT_URL, } # Max concurrent requests per GPU (based on llama.cpp --parallel) GPU_MAX_CONCURRENT = { "qwen3.6-35B-A3B": 2, # 2 slots "qwen3.6-27B-code": 1, # 1 slot (24GB VRAM saturated at 256K ctx) "qwen3.5-9b-vlm": 2, # 2 slots (12GB VRAM, 4GB headroom) } # Context window sizes (tokens) — used for compaction signals GPU_CONTEXT = { "qwen3.6-35B-A3B": 262144, "qwen3.6-27B-code": 196608, "qwen3.5-9b-vlm": 262144, } TIER_MODELS = { "starter": ["qwen3.5-9b-vlm"], "professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"], "enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"], } API_KEYS = { "sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"}, "sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"}, "sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"}, "sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"}, "sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"}, "sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"}, "sk-syslog-koonimo": {"tier": "enterprise", "agent": "Koonimo"}, "sk-starter-abc123": {"tier": "starter", "agent": "test-starter"}, "sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"}, } logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s") log = logging.getLogger("router") try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping() except Exception: r = None def counter_audit_loop(): """Every 30s, check GPU slots and reset counters if all slots idle.""" while True: time.sleep(30) if not r: continue for model, url in GPU_URLS.items(): try: resp = requests.get(url.replace("/v1","") + "/slots", headers={"Authorization": "Bearer not-needed"}, timeout=5) if resp.status_code == 200: slots = resp.json() all_idle = all(not s.get("is_processing", False) for s in slots) if all_idle: current = int(r.get("active:" + model) or 0) if current > 0: r.set("active:" + model, 0) log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current) except Exception: pass threading.Thread(target=counter_audit_loop, daemon=True).start() app = Flask(__name__) sse_subscribers = []; sse_lock = threading.Lock() def gpu_active_count(model): """Get number of in-flight requests for a GPU.""" if r: return int(r.get("active:" + model) or 0) return 0 def gpu_incr(model): if r: r.incr("active:" + model) def gpu_decr(model): if r: v = r.decr("active:" + model) if v and int(v) < 0: r.set("active:" + model, 0) # never go negative def check_gpu_health(model, sidecar_timeout=5, gpu_timeout=3): url = GPU_SIDECARS.get(model) if not url: return {"status": "unknown"} try: resp = requests.get(url, timeout=sidecar_timeout) if resp.status_code == 200: d = resp.json() pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100 status = "healthy" # VRAM usage != saturation; busy slots handled by is_gpu_busy() vram_warning = pct >= 95 # Also check if llama.cpp endpoint is actually responding gpu_url = GPU_URLS.get(model, "") try: hr = requests.get(gpu_url.replace("/v1","") + "/health", headers={"Authorization": "Bearer not-needed"}, timeout=gpu_timeout) if hr.status_code != 200: status = "down" except Exception: status = "down" return {"status": status, "vram_warning": vram_warning, "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")} except Exception: pass return {"status": "down"} def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")] def estimate_tokens(msgs): """Estimate token count from messages. Uses JSON length / 3.5 (closer to real tokenizer ratios for dense text).""" return len(json.dumps(msgs, default=str)) // 3.5 def store_perf_record(model, agent, tier, reason, queue_ms, inference_ms, prompt_tokens, completion_tokens, stream): """Store detailed performance record in Redis for analytics.""" if not r: return try: total_ms = queue_ms + inference_ms tps = completion_tokens / (inference_ms / 1000) if inference_ms > 0 and completion_tokens > 0 else 0 rec = json.dumps({ "ts": time.time(), "model": model, "agent": agent, "tier": tier, "reason": reason, "queue_ms": round(queue_ms, 1), "inference_ms": round(inference_ms, 1), "total_ms": round(total_ms, 1), "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "tokens_per_sec": round(tps, 1), "stream": stream }) # Global recent list (last 500) r.lpush("perf:recent", rec) r.ltrim("perf:recent", 0, 499) # Per-model list (last 200) r.lpush("perf:model:" + model, rec) r.ltrim("perf:model:" + model, 0, 199) # Per-reason list (last 200) r.lpush("perf:reason:" + reason, rec) r.ltrim("perf:reason:" + reason, 0, 199) # Per-agent list (last 200) r.lpush("perf:agent:" + agent, rec) r.ltrim("perf:agent:" + agent, 0, 199) except Exception: pass def is_gpu_busy(model): """Check if GPU is at or near max concurrent capacity.""" active = gpu_active_count(model) max_c = GPU_MAX_CONCURRENT.get(model, 1) return active >= max_c def select_best_gpu(candidates, reason): """Pick the best GPU from candidates IN ORDER — first non-busy one wins.""" for m in candidates: if not is_gpu_busy(m): return {"model": m, "reason": reason} # All busy — pick least loaded best = None best_load = 999 for m in candidates: load = gpu_active_count(m) if load < best_load: best_load = load best = m if best: return {"model": best, "reason": "load_balanced_" + reason} return None def route(rd, tier): msgs = rd.get("messages",[]); t = estimate_tokens(msgs) sys = any(m.get("role")=="system" for m in msgs) turns = len([m for m in msgs if m.get("role") in ("user","assistant")]) hints = rd.get("routing_hints",{}) allowed = TIER_MODELS.get(tier, ["qwen3.5-9b-vlm"]) avail = [m for m in available_models() if m in allowed] if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True} # Check if all available GPUs are at max capacity if all(is_gpu_busy(m) for m in avail): return {"model": avail[0], "reason": "all_saturated", "saturated": True} req = rd.get("model","auto") if req != "auto": target = req if req in avail else avail[0] # If explicit model is busy, check if another can take it if is_gpu_busy(target) and req in allowed: alts = [m for m in avail if m != target and m in allowed] if alts: alt = select_best_gpu(alts, "explicit") if alt: return alt return {"model": target, "reason": "explicit"} if hints: if hints.get("priority")=="speed" and "qwen3.5-9b-vlm" in avail: return select_best_gpu(["qwen3.5-9b-vlm"], "hint_speed") or {"model":"qwen3.5-9b-vlm","reason":"hint_speed"} if hints.get("priority")=="quality" and "qwen3.6-27B-code" in avail: return select_best_gpu(["qwen3.6-27B-code"], "hint_quality") or {"model":"qwen3.6-27B-code","reason":"hint_quality"} first_msg = msgs[0].get("content","") if msgs else "" words = len(first_msg.split()) if isinstance(first_msg, str) else 99 # TIER 1: Lightweight — single-turn short queries → VLM first if not sys and turns <= 1 and words <= 100 and "qwen3.5-9b-vlm" in avail: if not is_gpu_busy("qwen3.5-9b-vlm"): return {"model":"qwen3.5-9b-vlm","reason":"lightweight"} # VLM busy — fall back to Dense, then MoE fallback = [m for m in ["qwen3.6-35B-A3B","qwen3.6-27B-code"] if m in avail] result = select_best_gpu(fallback, "lightweight_fallback") if result: return result # TIER 2: Simple conversations — short context, any prompt → VLM preferred if t <= 1000 and turns <= 4 and "qwen3.5-9b-vlm" in avail: if not is_gpu_busy("qwen3.5-9b-vlm"): return {"model":"qwen3.5-9b-vlm","reason":"simple_conv"} # VLM busy — try Dense if "qwen3.6-27B-code" in avail and not is_gpu_busy("qwen3.6-27B-code"): return {"model":"qwen3.6-27B-code","reason":"simple_conv_fallback"} # TIER 3: Heavy reasoning — extremely large context or very long conversations if t > 50000 or turns > 25: # MoE first (131K context handles heavy sessions), then Dense (98K reasoning), then Light (131K fallback) candidates = [m for m in ["qwen3.6-35B-A3B","qwen3.6-27B-code","qwen3.5-9b-vlm"] if m in avail] result = select_best_gpu(candidates, "heavy_reasoning") if result: return result # TIER 4: Default — MoE first, VLM helps, Dense last (slow) if t <= 50000: candidates = [m for m in ["qwen3.6-35B-A3B","qwen3.5-9b-vlm","qwen3.6-27B-code"] if m in avail] result = select_best_gpu(candidates, "default") if result: return result # Fallback — best available if "qwen3.6-35B-A3B" in avail and not is_gpu_busy("qwen3.6-35B-A3B"): return {"model":"qwen3.6-35B-A3B","reason":"default_moe"} result = select_best_gpu([m for m in avail], "fallback") if result: return result return {"model":avail[0],"reason":"last_resort"} def clean_unicode(text): if not isinstance(text, str): return text text = text.replace(chr(0x2014), "-"); text = text.replace(chr(0x2013), "-") text = text.replace(chr(0x2018), "'"); text = text.replace(chr(0x2019), "'") text = text.replace(chr(0x201C), '"'); text = text.replace(chr(0x201D), '"') text = text.replace(chr(0x2026), "..."); text = text.replace(chr(0x00A0), " ") return text.encode("ascii", "ignore").decode("ascii") def clean_response(d): if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()} if isinstance(d, list): return [clean_response(v) for v in d] if isinstance(d, str): return clean_unicode(d) return d def get_metrics(): d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time(),"active_requests":{}} for m in GPU_URLS: h = check_gpu_health(m) d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w"),"active_requests":gpu_active_count(m), "max_concurrent": GPU_MAX_CONCURRENT.get(m, 1)}) d["active_requests"][m] = gpu_active_count(m) if r: try: for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0) for k,v in API_KEYS.items(): c = int(r.get("routes:agent:"+v["agent"]) or 0) if c>0: d["agent_counts"][v["agent"]] = c for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0) raw = r.lrange("routes:recent",0,49) d["recent"] = [json.loads(x) for x in raw] if raw else [] except Exception: pass return d def bcast(): data = get_metrics(); payload = json.dumps(data) with sse_lock: dead = [] for q in sse_subscribers: try: q.put(payload) except Exception: dead.append(q) for q in dead: sse_subscribers.remove(q) QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503 @app.route("/v1/chat/completions", methods=["POST"]) def chat(): try: rd = request.get_json(force=True) ak = request.headers.get("Authorization","").replace("Bearer ","") if not ak or ak not in API_KEYS: log.warning("AUTH_REJECTED: no/invalid API key from %s", request.remote_addr) return jsonify({"error": "Unauthorized — valid API key required"}), 401 ki = API_KEYS[ak] tier, agent = ki["tier"], ki["agent"] # Allow agent to override queue timeout via header q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT))) # Cross-turn context tracking: accumulate tokens per session session_id = request.headers.get("X-Session-Id", "") session_tokens = 0 if session_id and r: try: prev = int(r.get("session:" + session_id) or 0) current = estimate_tokens(rd.get("messages",[])) session_tokens = max(prev, current) # context only grows r.set("session:" + session_id, session_tokens, ex=86400) # TTL 24h except Exception: pass d = route(rd, tier) queue_start = time.time() # Queue loop: wait for a GPU slot instead of immediate 503 while d.get("saturated"): elapsed = time.time() - queue_start if elapsed > q_timeout: resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5}) resp.headers["Retry-After"] = "5" log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed) return resp, 503 time.sleep(0.5) # poll every 500ms d = route(rd, tier) queue_ms = (time.time() - queue_start) * 1000 if queue_ms > 500: log.info("QUEUED: %s waited %.0fms before slot opened", agent, queue_ms) model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]] is_stream = rd.get("stream", False) gpu_incr(model) log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1)) if r: try: r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent) r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H")) r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent,"queue_ms": round(queue_ms,1)})) r.ltrim("routes:recent",0,999) except Exception: pass start = time.time() resp = requests.post(url+"/chat/completions", json=rd, headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream) lat = int((time.time()-start)*1000) gpu_decr(model) if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502 if is_stream: def gen(): for raw in resp.iter_content(chunk_size=None, decode_unicode=True): if raw: yield clean_unicode(raw) # Streaming: can't get token counts without parsing stream, store latency + estimated tokens store_perf_record(model, agent, tier, reason, queue_ms, lat, estimate_tokens(rd.get("messages",[])), 0, True) bcast() ctx_remaining = GPU_CONTEXT.get(model, 65536) - max(session_tokens, estimate_tokens(rd.get("messages",[]))) ctx_pct = ctx_remaining / GPU_CONTEXT.get(model, 65536) * 100 ctx_warning = "compact_urgent" if ctx_pct < 5 else ("compact_recommended" if ctx_pct < 15 else ("compact_soon" if ctx_pct < 30 else "ok")) sse_resp = Response(stream_with_context(gen()), mimetype="text/event-stream") sse_resp.headers["X-Context-Remaining"] = str(max(0, ctx_remaining)) sse_resp.headers["X-Context-Warning"] = ctx_warning sse_resp.headers["X-Context-Model"] = model return sse_resp data = clean_response(resp.json()) for c in data.get("choices",[]): msg = c.get("message",{}) if not msg.get("content") and msg.get("reasoning_content"): msg["content"] = msg["reasoning_content"] # Extract performance data from llama.cpp response usage = data.get("usage", {}) timings = data.get("timings", {}) prompt_tokens = usage.get("prompt_tokens", 0) completion_tokens = usage.get("completion_tokens", 0) inference_ms = lat # total GPU round-trip store_perf_record(model, agent, tier, reason, queue_ms, inference_ms, prompt_tokens, completion_tokens, False) ctx_remaining = GPU_CONTEXT.get(model, 65536) - max(session_tokens, estimate_tokens(rd.get("messages",[]))) ctx_pct = ctx_remaining / GPU_CONTEXT.get(model, 65536) * 100 ctx_warning = "compact_urgent" if ctx_pct < 5 else ("compact_recommended" if ctx_pct < 15 else ("compact_soon" if ctx_pct < 30 else "ok")) data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"queue_ms": round(queue_ms,1),"active_gpu":gpu_active_count(model),"context_remaining": max(0, ctx_remaining),"context_pct": round(ctx_pct,1),"context_warning": ctx_warning} resp = jsonify(data) resp.headers["X-Context-Remaining"] = str(max(0, ctx_remaining)) resp.headers["X-Context-Warning"] = ctx_warning resp.headers["X-Context-Model"] = model bcast() return resp except requests.Timeout: gpu_decr(model) log.error("TIMEOUT: %s -> %s", agent, model) return jsonify({"error":"timeout"}), 504 except Exception as e: gpu_decr(model) log.error("Error: %s\n%s", e, traceback.format_exc()) return jsonify({"error":str(e)}), 500 @app.route("/metrics/performance") def performance(): """Per-request performance analytics with percentiles per model/reason/agent.""" if not r: return jsonify({"error": "Redis unavailable"}), 503 try: window_hours = int(request.args.get("window", "24")) model_filter = request.args.get("model", "all") # Load recent records cutoff = time.time() - (window_hours * 3600) raw = r.lrange("perf:recent", 0, -1) records = [] for x in raw: try: rec = json.loads(x) if rec["ts"] >= cutoff: records.append(rec) except: pass # Filter by model if specified if model_filter != "all": records = [r for r in records if r["model"] == model_filter] if not records: return jsonify({"models": [], "reasons": [], "agents": [], "summary": {"total_requests": 0}}) def pct(values, p): if not values: return 0 return round(statistics.quantiles(sorted(values), n=100, method='inclusive')[min(p-1, 98)], 1) # Per-model stats model_groups = {} for rec in records: m = rec["model"] if m not in model_groups: model_groups[m] = [] model_groups[m].append(rec) models = [] for m, recs in sorted(model_groups.items()): latencies = [r["total_ms"] for r in recs] tps_vals = [r["tokens_per_sec"] for r in recs if r["tokens_per_sec"] > 0] non_stream = [r for r in recs if not r["stream"]] queue_times = [r["queue_ms"] for r in non_stream] models.append({ "model": m, "count": len(recs), "stream_pct": round(len([r for r in recs if r["stream"]]) / len(recs) * 100, 1), "latency": { "avg": round(statistics.mean(latencies), 1), "p50": pct(latencies, 50), "p95": pct(latencies, 95), "p99": pct(latencies, 99) }, "throughput": { "avg_tokens_per_sec": round(statistics.mean(tps_vals), 1) if tps_vals else 0, "p50": pct(tps_vals, 50) if tps_vals else 0, "p95": pct(tps_vals, 95) if tps_vals else 0, }, "queue": { "avg_ms": round(statistics.mean(queue_times), 1) if queue_times else 0, "p95_ms": pct(queue_times, 95) if queue_times else 0, } if queue_times else None }) # Per-reason stats reason_groups = {} for rec in records: rsn = rec["reason"] if rsn not in reason_groups: reason_groups[rsn] = [] reason_groups[rsn].append(rec) reasons = [] for rsn, recs in sorted(reason_groups.items(), key=lambda x: -len(x[1])): latencies = [r["total_ms"] for r in recs] reasons.append({ "reason": rsn, "count": len(recs), "avg_total_ms": round(statistics.mean(latencies), 1), "p95_total_ms": pct(latencies, 95) }) # Per-agent stats agent_groups = {} for rec in records: ag = rec["agent"] if ag not in agent_groups: agent_groups[ag] = [] agent_groups[ag].append(rec) agents = [] for ag, recs in sorted(agent_groups.items(), key=lambda x: -len(x[1])): latencies = [r["total_ms"] for r in recs] tps_vals = [r["tokens_per_sec"] for r in recs if r["tokens_per_sec"] > 0] agents.append({ "agent": ag, "count": len(recs), "avg_total_ms": round(statistics.mean(latencies), 1), "avg_tokens_per_sec": round(statistics.mean(tps_vals), 1) if tps_vals else 0 }) all_lat = [r["total_ms"] for r in records] all_tps = [r["tokens_per_sec"] for r in records if r["tokens_per_sec"] > 0] summary = { "total_requests": len(records), "window_hours": window_hours, "latency": { "avg_ms": round(statistics.mean(all_lat), 1), "p50_ms": pct(all_lat, 50), "p95_ms": pct(all_lat, 95), "p99_ms": pct(all_lat, 99) }, "throughput_avg_tps": round(statistics.mean(all_tps), 1) if all_tps else 0 } return jsonify({"models": models, "reasons": reasons, "agents": agents, "summary": summary}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/v1/models") def models(): def _h(m): return check_gpu_health(m, sidecar_timeout=1.5, gpu_timeout=1) return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":_h(m).get("status"),"gpu":_h(m).get("gpu_name")} for m in GPU_URLS]}) @app.route("/health") def health(): gpus = {} for m in GPU_URLS: h = check_gpu_health(m, sidecar_timeout=1.5, gpu_timeout=1) h["active_requests"] = gpu_active_count(m) h["max_concurrent"] = GPU_MAX_CONCURRENT.get(m, 1) gpus[m] = h return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":gpus,"available_models":available_models()}) @app.route("/metrics") def metrics(): return jsonify(get_metrics()) @app.route("/metrics/timeseries") def metrics_timeseries(): period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys()) data = {"models": {}, "labels": []} if period == "day": buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)] data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)] elif period == "week": buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)] data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)] else: buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)] data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)] if r: for model in models_list: counts = [] for bucket in buckets: total = 0 if period in ("week","month"): for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0) else: total = int(r.get("ts:"+model+":"+bucket) or 0) counts.append(total) data["models"][model] = counts return jsonify(data) @app.route("/stream") def stream(): def ev(): q = queue.Queue() with sse_lock: sse_subscribers.append(q) try: yield "data: "+json.dumps(get_metrics())+"\n\n" while True: try: yield "data: "+q.get(timeout=3)+"\n\n" except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n" except GeneratorExit: pass finally: with sse_lock: if q in sse_subscribers: sse_subscribers.remove(q) return Response(stream_with_context(ev()), mimetype="text/event-stream", headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"}) if __name__ == "__main__": log.info("Router on :9000 (load-aware)") app.run(host="0.0.0.0", port=9000, debug=False)