import os, json, time, logging, traceback, threading, queue import requests, redis from flask import Flask, request, jsonify, Response, stream_with_context REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379") GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1") GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1") GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1") GPU_SIDECARS = { "qwen3.6-35B-A3B": "http://192.168.68.15:8090", "qwen3.6-27B-code": "http://192.168.68.8:8090", "qwen3.5-9b-vlm": "http://192.168.68.110:8090", } GPU_URLS = { "qwen3.6-35B-A3B": GPU_MOE_URL, "qwen3.6-27B-code": GPU_DENSE_URL, "qwen3.5-9b-vlm": GPU_LIGHT_URL, } # Max concurrent requests per GPU (based on llama.cpp --parallel) GPU_MAX_CONCURRENT = { "qwen3.6-35B-A3B": 2, # 2 slots "qwen3.6-27B-code": 1, # 1 slot (24GB VRAM saturated at 256K ctx) "qwen3.5-9b-vlm": 2, # 2 slots (12GB VRAM, 4GB headroom) } # Context window sizes (tokens) — used for compaction signals GPU_CONTEXT = { "qwen3.6-35B-A3B": 262144, "qwen3.6-27B-code": 262144, "qwen3.5-9b-vlm": 262144, } TIER_MODELS = { "starter": ["qwen3.5-9b-vlm"], "professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"], "enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"], } API_KEYS = { "sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"}, "sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"}, "sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"}, "sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"}, "sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"}, "sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"}, "sk-syslog-koonimo": {"tier": "enterprise", "agent": "Koonimo"}, "sk-starter-abc123": {"tier": "starter", "agent": "test-starter"}, "sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"}, } logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s") log = logging.getLogger("router") try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping() except Exception: r = None def counter_audit_loop(): """Every 30s, check GPU slots and reset counters if all slots idle.""" while True: time.sleep(30) if not r: continue for model, url in GPU_URLS.items(): try: resp = requests.get(url.replace("/v1","") + "/slots", headers={"Authorization": "Bearer not-needed"}, timeout=5) if resp.status_code == 200: slots = resp.json() all_idle = all(not s.get("is_processing", False) for s in slots) if all_idle: current = int(r.get("active:" + model) or 0) if current > 0: r.set("active:" + model, 0) log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current) except Exception: pass threading.Thread(target=counter_audit_loop, daemon=True).start() app = Flask(__name__) sse_subscribers = []; sse_lock = threading.Lock() def gpu_active_count(model): """Get number of in-flight requests for a GPU.""" if r: return int(r.get("active:" + model) or 0) return 0 def gpu_incr(model): if r: r.incr("active:" + model) def gpu_decr(model): if r: v = r.decr("active:" + model) if v and int(v) < 0: r.set("active:" + model, 0) # never go negative def check_gpu_health(model, sidecar_timeout=5, gpu_timeout=3): url = GPU_SIDECARS.get(model) if not url: return {"status": "unknown"} try: resp = requests.get(url, timeout=sidecar_timeout) if resp.status_code == 200: d = resp.json() pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100 status = "healthy" if pct < 90 else "saturated" # Also check if llama.cpp endpoint is actually responding gpu_url = GPU_URLS.get(model, "") try: hr = requests.get(gpu_url.replace("/v1","") + "/health", headers={"Authorization": "Bearer not-needed"}, timeout=gpu_timeout) if hr.status_code != 200: status = "down" except Exception: status = "down" return {"status": status, "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")} except Exception: pass return {"status": "down"} def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")] def estimate_tokens(msgs): """Estimate token count from messages. Uses JSON length / 3.5 (closer to real tokenizer ratios for dense text).""" return len(json.dumps(msgs, default=str)) // 3.5 def is_gpu_busy(model): """Check if GPU is at or near max concurrent capacity.""" active = gpu_active_count(model) max_c = GPU_MAX_CONCURRENT.get(model, 1) return active >= max_c def select_best_gpu(candidates, reason): """Pick the best GPU from candidates IN ORDER — first non-busy one wins.""" for m in candidates: if not is_gpu_busy(m): return {"model": m, "reason": reason} # All busy — pick least loaded best = None best_load = 999 for m in candidates: load = gpu_active_count(m) if load < best_load: best_load = load best = m if best: return {"model": best, "reason": "load_balanced_" + reason} return None def route(rd, tier): msgs = rd.get("messages",[]); t = estimate_tokens(msgs) sys = any(m.get("role")=="system" for m in msgs) turns = len([m for m in msgs if m.get("role") in ("user","assistant")]) hints = rd.get("routing_hints",{}) allowed = TIER_MODELS.get(tier, ["qwen3.5-9b-vlm"]) avail = [m for m in available_models() if m in allowed] if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True} # Check if all available GPUs are at max capacity if all(is_gpu_busy(m) for m in avail): return {"model": avail[0], "reason": "all_saturated", "saturated": True} req = rd.get("model","auto") if req != "auto": target = req if req in avail else avail[0] # If explicit model is busy, check if another can take it if is_gpu_busy(target) and req in allowed: alts = [m for m in avail if m != target and m in allowed] if alts: alt = select_best_gpu(alts, "explicit") if alt: return alt return {"model": target, "reason": "explicit"} if hints: if hints.get("priority")=="speed" and "qwen3.5-9b-vlm" in avail: return select_best_gpu(["qwen3.5-9b-vlm"], "hint_speed") or {"model":"qwen3.5-9b-vlm","reason":"hint_speed"} if hints.get("priority")=="quality" and "qwen3.6-27B-code" in avail: return select_best_gpu(["qwen3.6-27B-code"], "hint_quality") or {"model":"qwen3.6-27B-code","reason":"hint_quality"} first_msg = msgs[0].get("content","") if msgs else "" words = len(first_msg.split()) if isinstance(first_msg, str) else 99 # TIER 1: Lightweight — single-turn short queries → VLM first if not sys and turns <= 1 and words <= 100 and "qwen3.5-9b-vlm" in avail: if not is_gpu_busy("qwen3.5-9b-vlm"): return {"model":"qwen3.5-9b-vlm","reason":"lightweight"} # VLM busy — fall back to Dense, then MoE fallback = [m for m in ["qwen3.6-35B-A3B","qwen3.6-27B-code"] if m in avail] result = select_best_gpu(fallback, "lightweight_fallback") if result: return result # TIER 2: Simple conversations — short context, any prompt → VLM preferred if t <= 1000 and turns <= 4 and "qwen3.5-9b-vlm" in avail: if not is_gpu_busy("qwen3.5-9b-vlm"): return {"model":"qwen3.5-9b-vlm","reason":"simple_conv"} # VLM busy — try Dense if "qwen3.6-27B-code" in avail and not is_gpu_busy("qwen3.6-27B-code"): return {"model":"qwen3.6-27B-code","reason":"simple_conv_fallback"} # TIER 3: Heavy reasoning — extremely large context or very long conversations if t > 50000 or turns > 25: # MoE first (131K context handles heavy sessions), then Dense (98K reasoning), then Light (131K fallback) candidates = [m for m in ["qwen3.6-35B-A3B","qwen3.6-27B-code","qwen3.5-9b-vlm"] if m in avail] result = select_best_gpu(candidates, "heavy_reasoning") if result: return result # TIER 4: Default — MoE first, VLM helps, Dense last (slow) if t <= 50000: candidates = [m for m in ["qwen3.6-35B-A3B","qwen3.5-9b-vlm","qwen3.6-27B-code"] if m in avail] result = select_best_gpu(candidates, "default") if result: return result # Fallback — best available if "qwen3.6-35B-A3B" in avail and not is_gpu_busy("qwen3.6-35B-A3B"): return {"model":"qwen3.6-35B-A3B","reason":"default_moe"} result = select_best_gpu([m for m in avail], "fallback") if result: return result return {"model":avail[0],"reason":"last_resort"} def clean_unicode(text): if not isinstance(text, str): return text text = text.replace(chr(0x2014), "-"); text = text.replace(chr(0x2013), "-") text = text.replace(chr(0x2018), "'"); text = text.replace(chr(0x2019), "'") text = text.replace(chr(0x201C), '"'); text = text.replace(chr(0x201D), '"') text = text.replace(chr(0x2026), "..."); text = text.replace(chr(0x00A0), " ") return text.encode("ascii", "ignore").decode("ascii") def clean_response(d): if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()} if isinstance(d, list): return [clean_response(v) for v in d] if isinstance(d, str): return clean_unicode(d) return d def get_metrics(): d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time(),"active_requests":{}} for m in GPU_URLS: h = check_gpu_health(m) d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w"),"active_requests":gpu_active_count(m), "max_concurrent": GPU_MAX_CONCURRENT.get(m, 1)}) d["active_requests"][m] = gpu_active_count(m) if r: try: for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0) for k,v in API_KEYS.items(): c = int(r.get("routes:agent:"+v["agent"]) or 0) if c>0: d["agent_counts"][v["agent"]] = c for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0) raw = r.lrange("routes:recent",0,49) d["recent"] = [json.loads(x) for x in raw] if raw else [] except Exception: pass return d def bcast(): data = get_metrics(); payload = json.dumps(data) with sse_lock: dead = [] for q in sse_subscribers: try: q.put(payload) except Exception: dead.append(q) for q in dead: sse_subscribers.remove(q) QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503 @app.route("/v1/chat/completions", methods=["POST"]) def chat(): try: rd = request.get_json(force=True) ak = request.headers.get("Authorization","").replace("Bearer ","") if not ak or ak not in API_KEYS: log.warning("AUTH_REJECTED: no/invalid API key from %s", request.remote_addr) return jsonify({"error": "Unauthorized — valid API key required"}), 401 ki = API_KEYS[ak] tier, agent = ki["tier"], ki["agent"] # Allow agent to override queue timeout via header q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT))) # Cross-turn context tracking: accumulate tokens per session session_id = request.headers.get("X-Session-Id", "") session_tokens = 0 if session_id and r: try: prev = int(r.get("session:" + session_id) or 0) current = estimate_tokens(rd.get("messages",[])) session_tokens = max(prev, current) # context only grows r.set("session:" + session_id, session_tokens, ex=86400) # TTL 24h except Exception: pass d = route(rd, tier) queue_start = time.time() # Queue loop: wait for a GPU slot instead of immediate 503 while d.get("saturated"): elapsed = time.time() - queue_start if elapsed > q_timeout: resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5}) resp.headers["Retry-After"] = "5" log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed) return resp, 503 time.sleep(0.5) # poll every 500ms d = route(rd, tier) waited = time.time() - queue_start if waited > 0.5: log.info("QUEUED: %s waited %.1fs before slot opened", agent, waited) model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]] is_stream = rd.get("stream", False) gpu_incr(model) log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1)) if r: try: r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent) r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H")) r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent})) r.ltrim("routes:recent",0,999) except Exception: pass start = time.time() resp = requests.post(url+"/chat/completions", json=rd, headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream) lat = int((time.time()-start)*1000) gpu_decr(model) if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502 if is_stream: def gen(): for raw in resp.iter_content(chunk_size=None, decode_unicode=True): if raw: yield clean_unicode(raw) bcast() ctx_remaining = GPU_CONTEXT.get(model, 65536) - max(session_tokens, estimate_tokens(rd.get("messages",[]))) ctx_pct = ctx_remaining / GPU_CONTEXT.get(model, 65536) * 100 ctx_warning = "compact_urgent" if ctx_pct < 5 else ("compact_recommended" if ctx_pct < 15 else ("compact_soon" if ctx_pct < 30 else "ok")) sse_resp = Response(stream_with_context(gen()), mimetype="text/event-stream") sse_resp.headers["X-Context-Remaining"] = str(max(0, ctx_remaining)) sse_resp.headers["X-Context-Warning"] = ctx_warning sse_resp.headers["X-Context-Model"] = model return sse_resp data = clean_response(resp.json()) for c in data.get("choices",[]): msg = c.get("message",{}) if not msg.get("content") and msg.get("reasoning_content"): msg["content"] = msg["reasoning_content"] ctx_remaining = GPU_CONTEXT.get(model, 65536) - max(session_tokens, estimate_tokens(rd.get("messages",[]))) ctx_pct = ctx_remaining / GPU_CONTEXT.get(model, 65536) * 100 ctx_warning = "compact_urgent" if ctx_pct < 5 else ("compact_recommended" if ctx_pct < 15 else ("compact_soon" if ctx_pct < 30 else "ok")) data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"active_gpu":gpu_active_count(model),"context_remaining": max(0, ctx_remaining),"context_pct": round(ctx_pct,1),"context_warning": ctx_warning} resp = jsonify(data) resp.headers["X-Context-Remaining"] = str(max(0, ctx_remaining)) resp.headers["X-Context-Warning"] = ctx_warning resp.headers["X-Context-Model"] = model bcast() return resp except requests.Timeout: gpu_decr(model) log.error("TIMEOUT: %s -> %s", agent, model) return jsonify({"error":"timeout"}), 504 except Exception as e: gpu_decr(model) log.error("Error: %s\n%s", e, traceback.format_exc()) return jsonify({"error":str(e)}), 500 @app.route("/v1/models") def models(): def _h(m): return check_gpu_health(m, sidecar_timeout=1.5, gpu_timeout=1) return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":_h(m).get("status"),"gpu":_h(m).get("gpu_name")} for m in GPU_URLS]}) @app.route("/health") def health(): gpus = {} for m in GPU_URLS: h = check_gpu_health(m, sidecar_timeout=1.5, gpu_timeout=1) h["active_requests"] = gpu_active_count(m) h["max_concurrent"] = GPU_MAX_CONCURRENT.get(m, 1) gpus[m] = h return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":gpus,"available_models":available_models()}) @app.route("/metrics") def metrics(): return jsonify(get_metrics()) @app.route("/metrics/timeseries") def metrics_timeseries(): period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys()) data = {"models": {}, "labels": []} if period == "day": buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)] data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)] elif period == "week": buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)] data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)] else: buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)] data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)] if r: for model in models_list: counts = [] for bucket in buckets: total = 0 if period in ("week","month"): for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0) else: total = int(r.get("ts:"+model+":"+bucket) or 0) counts.append(total) data["models"][model] = counts return jsonify(data) @app.route("/stream") def stream(): def ev(): q = queue.Queue() with sse_lock: sse_subscribers.append(q) try: yield "data: "+json.dumps(get_metrics())+"\n\n" while True: try: yield "data: "+q.get(timeout=3)+"\n\n" except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n" except GeneratorExit: pass finally: with sse_lock: if q in sse_subscribers: sse_subscribers.remove(q) return Response(stream_with_context(ev()), mimetype="text/event-stream", headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"}) if __name__ == "__main__": log.info("Router on :9000 (load-aware)") app.run(host="0.0.0.0", port=9000, debug=False)