import os, json, time, logging, traceback, threading, queue import requests, redis from flask import Flask, request, jsonify, Response, stream_with_context REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379") GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1") GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1") GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1") GPU_SIDECARS = { "qwen3.6-35B-A3B": "http://192.168.68.15:8090", "qwen3.6-27B-code": "http://192.168.68.8:8090", "gemma-4-E4B": "http://192.168.68.110:8090", } GPU_URLS = { "qwen3.6-35B-A3B": GPU_MOE_URL, "qwen3.6-27B-code": GPU_DENSE_URL, "gemma-4-E4B": GPU_LIGHT_URL, } TIER_MODELS = { "starter": ["gemma-4-E4B"], "professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "gemma-4-E4B"], "enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "gemma-4-E4B"], } API_KEYS = { "sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"}, "sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"}, "sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"}, "sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"}, "sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"}, "sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"}, "sk-starter-abc123": {"tier": "starter", "agent": "test-starter"}, "sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"}, } logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s") log = logging.getLogger("router") try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping() except Exception: r = None app = Flask(__name__) sse_subscribers = []; sse_lock = threading.Lock() def check_gpu_health(model): url = GPU_SIDECARS.get(model) if not url: return {"status": "unknown"} try: resp = requests.get(url, timeout=5) if resp.status_code == 200: d = resp.json() pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100 return {"status": "healthy" if pct < 90 else "saturated", "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")} except Exception: pass return {"status": "down"} def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")] def estimate_tokens(msgs): return sum(len(str(m.get("content",""))) for m in msgs) // 4 def route(rd, tier): msgs = rd.get("messages",[]); t = estimate_tokens(msgs) sys = any(m.get("role")=="system" for m in msgs) turns = len([m for m in msgs if m.get("role") in ("user","assistant")]) hints = rd.get("routing_hints",{}) allowed = TIER_MODELS.get(tier, ["gemma-4-E4B"]) avail = [m for m in available_models() if m in allowed] if not avail: return {"model": allowed[0], "reason": "all_saturated"} req = rd.get("model","auto") if req != "auto": return {"model": req if req in avail else avail[0], "reason": "explicit"} if hints: if hints.get("priority")=="speed" and "gemma-4-E4B" in avail: return {"model":"gemma-4-E4B","reason":"hint_speed"} if hints.get("priority")=="quality" and "qwen3.6-27B-code" in avail: return {"model":"qwen3.6-27B-code","reason":"hint_quality"} if t > 4000 or sys or turns > 6: for m in ["qwen3.6-27B-code","qwen3.6-35B-A3B","gemma-4-E4B"]: if m in avail: return {"model":m,"reason":"heavy_reasoning"} first_msg = msgs[0].get("content","") if msgs else "" words = len(first_msg.split()) if isinstance(first_msg, str) else 99 if words <= 3 and turns <= 1 and not sys and "gemma-4-E4B" in avail: return {"model":"gemma-4-E4B","reason":"ultra_light"} if "qwen3.6-35B-A3B" in avail: return {"model":"qwen3.6-35B-A3B","reason":"default_moe"} return {"model":avail[0],"reason":"fallback"} def clean_unicode(text): if not isinstance(text, str): return text return text.replace("\u2014","-").replace("\u2013","-").replace("\u2018",").replace(u2019,").replace("\u201c",').replace(u201d,').replace("\u2026","...").replace("\u00a0"," ") def clean_response(d): if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()} if isinstance(d, list): return [clean_response(v) for v in d] if isinstance(d, str): return clean_unicode(d) return d def get_metrics(): d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time()} for m in GPU_URLS: h = check_gpu_health(m) d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w")}) if r: try: for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0) for k,v in API_KEYS.items(): c = int(r.get("routes:agent:"+v["agent"]) or 0) if c>0: d["agent_counts"][v["agent"]] = c for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0) raw = r.lrange("routes:recent",0,49) d["recent"] = [json.loads(x) for x in raw] if raw else [] except Exception: pass return d def bcast(): data = get_metrics(); payload = json.dumps(data) with sse_lock: dead = [] for q in sse_subscribers: try: q.put(payload) except Exception: dead.append(q) for q in dead: sse_subscribers.remove(q) @app.route("/v1/chat/completions", methods=["POST"]) def chat(): try: rd = request.get_json(force=True) ak = request.headers.get("Authorization","").replace("Bearer ","") ki = API_KEYS.get(ak, {"tier":"starter","agent":"unknown"}) tier, agent = ki["tier"], ki["agent"] d = route(rd, tier); model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]] is_stream = rd.get("stream", False) log.info("ROUTE: %s -> %s (%s) stream=%s", agent, model, reason, is_stream) if r: try: r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent) r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H")) r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent})) r.ltrim("routes:recent",0,999) except Exception: pass start = time.time() resp = requests.post(url+"/chat/completions", json=rd, headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=120, stream=is_stream) lat = int((time.time()-start)*1000) if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502 if is_stream: def gen(): for raw in resp.iter_content(chunk_size=None, decode_unicode=True): if raw: yield clean_unicode(raw) bcast() return Response(stream_with_context(gen()), mimetype="text/event-stream") data = clean_response(resp.json()) for c in data.get("choices",[]): msg = c.get("message",{}) if not msg.get("content") and msg.get("reasoning_content"): msg["content"] = msg["reasoning_content"] data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat} bcast() return jsonify(data) except requests.Timeout: return jsonify({"error":"timeout"}), 504 except Exception as e: log.error("Error: %s\n%s", e, traceback.format_exc()) return jsonify({"error":str(e)}), 500 @app.route("/v1/models") def models(): return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":check_gpu_health(m).get("status"),"gpu":check_gpu_health(m).get("gpu_name")} for m in GPU_URLS]}) @app.route("/health") def health(): return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":{m:check_gpu_health(m) for m in GPU_URLS},"available_models":available_models()}) @app.route("/metrics") def metrics(): return jsonify(get_metrics()) @app.route("/metrics/timeseries") def metrics_timeseries(): period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys()) data = {"models": {}, "labels": []} if period == "day": buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)] data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)] elif period == "week": buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)] data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)] else: buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)] data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)] if r: for model in models_list: counts = [] for bucket in buckets: total = 0 if period in ("week","month"): for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0) else: total = int(r.get("ts:"+model+":"+bucket) or 0) counts.append(total) data["models"][model] = counts return jsonify(data) @app.route("/stream") def stream(): def ev(): q = queue.Queue() with sse_lock: sse_subscribers.append(q) try: yield "data: "+json.dumps(get_metrics())+"\n\n" while True: try: yield "data: "+q.get(timeout=3)+"\n\n" except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n" except GeneratorExit: pass finally: with sse_lock: if q in sse_subscribers: sse_subscribers.remove(q) return Response(stream_with_context(ev()), mimetype="text/event-stream", headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"}) if __name__ == "__main__": log.info("Router on :9000") app.run(host="0.0.0.0", port=9000, debug=False)