Files
syslog-harness/router/router.py
T
Abiba 6efd5ff51c feat: context-aware routing + compaction signals
- Added GPU_CONTEXT map (MoE 131K, VLM 131K, Dense 65K)
- Heavy tier now prefers MoE/VLM (131K) over Dense (65K) for large requests
- Response headers: X-Context-Remaining, X-Context-Model
- Routing data includes context_remaining field
- Agents can use this to trigger compaction when nearing limits
2026-05-19 21:13:56 +00:00

397 lines
18 KiB
Python

import os, json, time, logging, traceback, threading, queue
import requests, redis
from flask import Flask, request, jsonify, Response, stream_with_context
REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379")
GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1")
GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1")
GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1")
GPU_SIDECARS = {
"qwen3.6-35B-A3B": "http://192.168.68.15:8090",
"qwen3.6-27B-code": "http://192.168.68.8:8090",
"qwen3.5-9b-vlm": "http://192.168.68.110:8090",
}
GPU_URLS = {
"qwen3.6-35B-A3B": GPU_MOE_URL,
"qwen3.6-27B-code": GPU_DENSE_URL,
"qwen3.5-9b-vlm": GPU_LIGHT_URL,
}
# Max concurrent requests per GPU (based on llama.cpp --parallel)
GPU_MAX_CONCURRENT = {
"qwen3.6-35B-A3B": 2, # 2 slots
"qwen3.6-27B-code": 2, # 2 slots
"qwen3.5-9b-vlm": 2, # 2 slots (12GB VRAM, 4GB headroom)
}
# Context window sizes (tokens) — used for compaction signals
GPU_CONTEXT = {
"qwen3.6-35B-A3B": 131072,
"qwen3.6-27B-code": 65536,
"qwen3.5-9b-vlm": 131072,
}
TIER_MODELS = {
"starter": ["qwen3.5-9b-vlm"],
"professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
"enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
}
API_KEYS = {
"sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"},
"sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"},
"sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"},
"sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"},
"sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"},
"sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"},
"sk-syslog-koonimo": {"tier": "enterprise", "agent": "Koonimo"},
"sk-starter-abc123": {"tier": "starter", "agent": "test-starter"},
"sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"},
}
logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s")
log = logging.getLogger("router")
try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping()
except Exception: r = None
def counter_audit_loop():
"""Every 30s, check GPU slots and reset counters if all slots idle."""
while True:
time.sleep(30)
if not r: continue
for model, url in GPU_URLS.items():
try:
resp = requests.get(url.replace("/v1","") + "/slots",
headers={"Authorization": "Bearer not-needed"}, timeout=5)
if resp.status_code == 200:
slots = resp.json()
all_idle = all(not s.get("is_processing", False) for s in slots)
if all_idle:
current = int(r.get("active:" + model) or 0)
if current > 0:
r.set("active:" + model, 0)
log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current)
except Exception:
pass
threading.Thread(target=counter_audit_loop, daemon=True).start()
app = Flask(__name__)
sse_subscribers = []; sse_lock = threading.Lock()
def gpu_active_count(model):
"""Get number of in-flight requests for a GPU."""
if r:
return int(r.get("active:" + model) or 0)
return 0
def gpu_incr(model):
if r: r.incr("active:" + model)
def gpu_decr(model):
if r:
v = r.decr("active:" + model)
if v and int(v) < 0:
r.set("active:" + model, 0) # never go negative
def check_gpu_health(model):
url = GPU_SIDECARS.get(model)
if not url: return {"status": "unknown"}
try:
resp = requests.get(url, timeout=5)
if resp.status_code == 200:
d = resp.json()
pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100
status = "healthy" if pct < 90 else "saturated"
# Also check if llama.cpp endpoint is actually responding
gpu_url = GPU_URLS.get(model, "")
try:
hr = requests.get(gpu_url.replace("/v1","") + "/health", headers={"Authorization": "Bearer not-needed"}, timeout=3)
if hr.status_code != 200:
status = "down"
except Exception:
status = "down"
return {"status": status, "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")}
except Exception: pass
return {"status": "down"}
def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")]
def estimate_tokens(msgs): return sum(len(str(m.get("content",""))) for m in msgs) // 4
def is_gpu_busy(model):
"""Check if GPU is at or near max concurrent capacity."""
active = gpu_active_count(model)
max_c = GPU_MAX_CONCURRENT.get(model, 1)
return active >= max_c
def select_best_gpu(candidates, reason):
"""Pick the best GPU from candidates IN ORDER — first non-busy one wins."""
for m in candidates:
if not is_gpu_busy(m):
return {"model": m, "reason": reason}
# All busy — pick least loaded
best = None
best_load = 999
for m in candidates:
load = gpu_active_count(m)
if load < best_load:
best_load = load
best = m
if best:
return {"model": best, "reason": "load_balanced_" + reason}
return None
def route(rd, tier):
msgs = rd.get("messages",[]); t = estimate_tokens(msgs)
sys = any(m.get("role")=="system" for m in msgs)
turns = len([m for m in msgs if m.get("role") in ("user","assistant")])
hints = rd.get("routing_hints",{})
allowed = TIER_MODELS.get(tier, ["qwen3.5-9b-vlm"])
avail = [m for m in available_models() if m in allowed]
if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True}
req = rd.get("model","auto")
if req != "auto":
target = req if req in avail else avail[0]
# If explicit model is busy, check if another can take it
if is_gpu_busy(target) and req in allowed:
alts = [m for m in avail if m != target and m in allowed]
if alts:
alt = select_best_gpu(alts, "explicit")
if alt: return alt
return {"model": target, "reason": "explicit"}
if hints:
if hints.get("priority")=="speed" and "qwen3.5-9b-vlm" in avail:
return select_best_gpu(["qwen3.5-9b-vlm"], "hint_speed") or {"model":"qwen3.5-9b-vlm","reason":"hint_speed"}
if hints.get("priority")=="quality" and "qwen3.6-27B-code" in avail:
return select_best_gpu(["qwen3.6-27B-code"], "hint_quality") or {"model":"qwen3.6-27B-code","reason":"hint_quality"}
first_msg = msgs[0].get("content","") if msgs else ""
words = len(first_msg.split()) if isinstance(first_msg, str) else 99
# TIER 1: Lightweight — single-turn short queries → VLM first
if not sys and turns <= 1 and words <= 100 and "qwen3.5-9b-vlm" in avail:
if not is_gpu_busy("qwen3.5-9b-vlm"):
return {"model":"qwen3.5-9b-vlm","reason":"lightweight"}
# VLM busy — fall back to Dense, then MoE
fallback = [m for m in ["qwen3.6-35B-A3B","qwen3.6-27B-code"] if m in avail]
result = select_best_gpu(fallback, "lightweight_fallback")
if result: return result
# TIER 2: Simple conversations — short context, any prompt → VLM preferred
if t <= 1000 and turns <= 4 and "qwen3.5-9b-vlm" in avail:
if not is_gpu_busy("qwen3.5-9b-vlm"):
return {"model":"qwen3.5-9b-vlm","reason":"simple_conv"}
# VLM busy — try Dense
if "qwen3.6-27B-code" in avail and not is_gpu_busy("qwen3.6-27B-code"):
return {"model":"qwen3.6-27B-code","reason":"simple_conv_fallback"}
# TIER 3: Heavy reasoning — extremely large context or very long conversations
if t > 50000 or turns > 25:
# Prefer models with larger context windows (MoE/VLM at 131K, Dense at 65K)
candidates = [m for m in ["qwen3.6-35B-A3B","qwen3.5-9b-vlm","qwen3.6-27B-code"] if m in avail]
result = select_best_gpu(candidates, "heavy_reasoning")
if result: return result
# TIER 4: Default — MoE first, VLM helps, Dense last (slow)
if t <= 50000:
candidates = [m for m in ["qwen3.6-35B-A3B","qwen3.5-9b-vlm","qwen3.6-27B-code"] if m in avail]
result = select_best_gpu(candidates, "default")
if result: return result
# Fallback — best available
if "qwen3.6-35B-A3B" in avail and not is_gpu_busy("qwen3.6-35B-A3B"):
return {"model":"qwen3.6-35B-A3B","reason":"default_moe"}
result = select_best_gpu([m for m in avail], "fallback")
if result: return result
return {"model":avail[0],"reason":"last_resort"}
def clean_unicode(text):
if not isinstance(text, str): return text
text = text.replace(chr(0x2014), "-"); text = text.replace(chr(0x2013), "-")
text = text.replace(chr(0x2018), "'"); text = text.replace(chr(0x2019), "'")
text = text.replace(chr(0x201C), '"'); text = text.replace(chr(0x201D), '"')
text = text.replace(chr(0x2026), "..."); text = text.replace(chr(0x00A0), " ")
return text.encode("ascii", "ignore").decode("ascii")
def clean_response(d):
if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()}
if isinstance(d, list): return [clean_response(v) for v in d]
if isinstance(d, str): return clean_unicode(d)
return d
def get_metrics():
d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time(),"active_requests":{}}
for m in GPU_URLS:
h = check_gpu_health(m)
d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w"),"active_requests":gpu_active_count(m), "max_concurrent": GPU_MAX_CONCURRENT.get(m, 1)})
d["active_requests"][m] = gpu_active_count(m)
if r:
try:
for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0)
for k,v in API_KEYS.items():
c = int(r.get("routes:agent:"+v["agent"]) or 0)
if c>0: d["agent_counts"][v["agent"]] = c
for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0)
raw = r.lrange("routes:recent",0,49)
d["recent"] = [json.loads(x) for x in raw] if raw else []
except Exception: pass
return d
def bcast():
data = get_metrics(); payload = json.dumps(data)
with sse_lock:
dead = []
for q in sse_subscribers:
try: q.put(payload)
except Exception: dead.append(q)
for q in dead: sse_subscribers.remove(q)
QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503
@app.route("/v1/chat/completions", methods=["POST"])
def chat():
try:
rd = request.get_json(force=True)
ak = request.headers.get("Authorization","").replace("Bearer ","")
if not ak or ak not in API_KEYS:
log.warning("AUTH_REJECTED: no/invalid API key from %s", request.remote_addr)
return jsonify({"error": "Unauthorized — valid API key required"}), 401
ki = API_KEYS[ak]
tier, agent = ki["tier"], ki["agent"]
# Allow agent to override queue timeout via header
q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT)))
d = route(rd, tier)
queue_start = time.time()
# Queue loop: wait for a GPU slot instead of immediate 503
while d.get("saturated"):
elapsed = time.time() - queue_start
if elapsed > q_timeout:
resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5})
resp.headers["Retry-After"] = "5"
log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed)
return resp, 503
time.sleep(0.5) # poll every 500ms
d = route(rd, tier)
waited = time.time() - queue_start
if waited > 0.5:
log.info("QUEUED: %s waited %.1fs before slot opened", agent, waited)
model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
is_stream = rd.get("stream", False)
gpu_incr(model)
log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1))
if r:
try:
r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent)
r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H"))
r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent}))
r.ltrim("routes:recent",0,999)
except Exception: pass
start = time.time()
resp = requests.post(url+"/chat/completions", json=rd,
headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream)
lat = int((time.time()-start)*1000)
gpu_decr(model)
if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502
if is_stream:
def gen():
for raw in resp.iter_content(chunk_size=None, decode_unicode=True):
if raw: yield clean_unicode(raw)
bcast()
ctx_remaining = GPU_CONTEXT.get(model, 65536) - estimate_tokens(rd.get("messages",[]))
r = Response(stream_with_context(gen()), mimetype="text/event-stream")
r.headers["X-Context-Remaining"] = str(max(0, ctx_remaining))
r.headers["X-Context-Model"] = model
return r
data = clean_response(resp.json())
for c in data.get("choices",[]):
msg = c.get("message",{})
if not msg.get("content") and msg.get("reasoning_content"):
msg["content"] = msg["reasoning_content"]
ctx_remaining = GPU_CONTEXT.get(model, 65536) - estimate_tokens(rd.get("messages",[]))
data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"active_gpu":gpu_active_count(model),"context_remaining": max(0, ctx_remaining)}
resp = jsonify(data)
resp.headers["X-Context-Remaining"] = str(max(0, ctx_remaining))
resp.headers["X-Context-Model"] = model
bcast()
return resp
except requests.Timeout:
gpu_decr(model)
log.error("TIMEOUT: %s -> %s", agent, model)
return jsonify({"error":"timeout"}), 504
except Exception as e:
gpu_decr(model)
log.error("Error: %s\n%s", e, traceback.format_exc())
return jsonify({"error":str(e)}), 500
@app.route("/v1/models")
def models(): return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":check_gpu_health(m).get("status"),"gpu":check_gpu_health(m).get("gpu_name")} for m in GPU_URLS]})
@app.route("/health")
def health():
gpus = {}
for m in GPU_URLS:
h = check_gpu_health(m)
h["active_requests"] = gpu_active_count(m)
h["max_concurrent"] = GPU_MAX_CONCURRENT.get(m, 1)
gpus[m] = h
return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":gpus,"available_models":available_models()})
@app.route("/metrics")
def metrics(): return jsonify(get_metrics())
@app.route("/metrics/timeseries")
def metrics_timeseries():
period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys())
data = {"models": {}, "labels": []}
if period == "day":
buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
elif period == "week":
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
else:
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
if r:
for model in models_list:
counts = []
for bucket in buckets:
total = 0
if period in ("week","month"):
for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0)
else: total = int(r.get("ts:"+model+":"+bucket) or 0)
counts.append(total)
data["models"][model] = counts
return jsonify(data)
@app.route("/stream")
def stream():
def ev():
q = queue.Queue()
with sse_lock: sse_subscribers.append(q)
try:
yield "data: "+json.dumps(get_metrics())+"\n\n"
while True:
try: yield "data: "+q.get(timeout=3)+"\n\n"
except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n"
except GeneratorExit: pass
finally:
with sse_lock:
if q in sse_subscribers: sse_subscribers.remove(q)
return Response(stream_with_context(ev()), mimetype="text/event-stream",
headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"})
if __name__ == "__main__":
log.info("Router on :9000 (load-aware)")
app.run(host="0.0.0.0", port=9000, debug=False)