f519a3fa60
System messages are common in agent conversations but don't indicate heavy workload. Now only token count (>4000) and turn count (>8) trigger heavy routing. Simple conversations with system prompts can now route to VLM.
378 lines
17 KiB
Python
378 lines
17 KiB
Python
import os, json, time, logging, traceback, threading, queue
|
|
import requests, redis
|
|
from flask import Flask, request, jsonify, Response, stream_with_context
|
|
|
|
REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379")
|
|
GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1")
|
|
GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1")
|
|
GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1")
|
|
|
|
GPU_SIDECARS = {
|
|
"qwen3.6-35B-A3B": "http://192.168.68.15:8090",
|
|
"qwen3.6-27B-code": "http://192.168.68.8:8090",
|
|
"qwen3.5-9b-vlm": "http://192.168.68.110:8090",
|
|
}
|
|
GPU_URLS = {
|
|
"qwen3.6-35B-A3B": GPU_MOE_URL,
|
|
"qwen3.6-27B-code": GPU_DENSE_URL,
|
|
"qwen3.5-9b-vlm": GPU_LIGHT_URL,
|
|
}
|
|
# Max concurrent requests per GPU (based on llama.cpp --parallel)
|
|
GPU_MAX_CONCURRENT = {
|
|
"qwen3.6-35B-A3B": 2, # 2 slots
|
|
"qwen3.6-27B-code": 2, # 2 slots
|
|
"qwen3.5-9b-vlm": 2, # 2 slots (12GB VRAM, 4GB headroom)
|
|
}
|
|
|
|
TIER_MODELS = {
|
|
"starter": ["qwen3.5-9b-vlm"],
|
|
"professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
|
|
"enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
|
|
}
|
|
API_KEYS = {
|
|
"sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"},
|
|
"sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"},
|
|
"sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"},
|
|
"sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"},
|
|
"sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"},
|
|
"sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"},
|
|
"sk-syslog-koonimo": {"tier": "enterprise", "agent": "Koonimo"},
|
|
"sk-starter-abc123": {"tier": "starter", "agent": "test-starter"},
|
|
"sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"},
|
|
}
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s")
|
|
log = logging.getLogger("router")
|
|
try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping()
|
|
except Exception: r = None
|
|
|
|
|
|
def counter_audit_loop():
|
|
"""Every 30s, check GPU slots and reset counters if all slots idle."""
|
|
while True:
|
|
time.sleep(30)
|
|
if not r: continue
|
|
for model, url in GPU_URLS.items():
|
|
try:
|
|
resp = requests.get(url.replace("/v1","") + "/slots",
|
|
headers={"Authorization": "Bearer not-needed"}, timeout=5)
|
|
if resp.status_code == 200:
|
|
slots = resp.json()
|
|
all_idle = all(not s.get("is_processing", False) for s in slots)
|
|
if all_idle:
|
|
current = int(r.get("active:" + model) or 0)
|
|
if current > 0:
|
|
r.set("active:" + model, 0)
|
|
log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current)
|
|
except Exception:
|
|
pass
|
|
|
|
threading.Thread(target=counter_audit_loop, daemon=True).start()
|
|
|
|
app = Flask(__name__)
|
|
sse_subscribers = []; sse_lock = threading.Lock()
|
|
|
|
def gpu_active_count(model):
|
|
"""Get number of in-flight requests for a GPU."""
|
|
if r:
|
|
return int(r.get("active:" + model) or 0)
|
|
return 0
|
|
|
|
def gpu_incr(model):
|
|
if r: r.incr("active:" + model)
|
|
|
|
def gpu_decr(model):
|
|
if r:
|
|
v = r.decr("active:" + model)
|
|
if v and int(v) < 0:
|
|
r.set("active:" + model, 0) # never go negative
|
|
|
|
def check_gpu_health(model):
|
|
url = GPU_SIDECARS.get(model)
|
|
if not url: return {"status": "unknown"}
|
|
try:
|
|
resp = requests.get(url, timeout=5)
|
|
if resp.status_code == 200:
|
|
d = resp.json()
|
|
pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100
|
|
status = "healthy" if pct < 90 else "saturated"
|
|
# Also check if llama.cpp endpoint is actually responding
|
|
gpu_url = GPU_URLS.get(model, "")
|
|
try:
|
|
hr = requests.get(gpu_url.replace("/v1","") + "/health", headers={"Authorization": "Bearer not-needed"}, timeout=3)
|
|
if hr.status_code != 200:
|
|
status = "down"
|
|
except Exception:
|
|
status = "down"
|
|
return {"status": status, "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")}
|
|
except Exception: pass
|
|
return {"status": "down"}
|
|
|
|
def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")]
|
|
|
|
def estimate_tokens(msgs): return sum(len(str(m.get("content",""))) for m in msgs) // 4
|
|
|
|
def is_gpu_busy(model):
|
|
"""Check if GPU is at or near max concurrent capacity."""
|
|
active = gpu_active_count(model)
|
|
max_c = GPU_MAX_CONCURRENT.get(model, 1)
|
|
return active >= max_c
|
|
|
|
def select_best_gpu(candidates, reason):
|
|
"""Pick the best GPU from candidates, preferring least-loaded."""
|
|
best = None
|
|
best_load = 999
|
|
for m in candidates:
|
|
load = gpu_active_count(m)
|
|
if load < best_load:
|
|
best_load = load
|
|
best = m
|
|
if best:
|
|
actual_reason = reason
|
|
if is_gpu_busy(best):
|
|
actual_reason = "load_balanced_" + reason
|
|
return {"model": best, "reason": actual_reason}
|
|
return None
|
|
|
|
def route(rd, tier):
|
|
msgs = rd.get("messages",[]); t = estimate_tokens(msgs)
|
|
sys = any(m.get("role")=="system" for m in msgs)
|
|
turns = len([m for m in msgs if m.get("role") in ("user","assistant")])
|
|
hints = rd.get("routing_hints",{})
|
|
allowed = TIER_MODELS.get(tier, ["qwen3.5-9b-vlm"])
|
|
avail = [m for m in available_models() if m in allowed]
|
|
if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True}
|
|
|
|
req = rd.get("model","auto")
|
|
if req != "auto":
|
|
target = req if req in avail else avail[0]
|
|
# If explicit model is busy, check if another can take it
|
|
if is_gpu_busy(target) and req in allowed:
|
|
alts = [m for m in avail if m != target and m in allowed]
|
|
if alts:
|
|
alt = select_best_gpu(alts, "explicit")
|
|
if alt: return alt
|
|
return {"model": target, "reason": "explicit"}
|
|
|
|
if hints:
|
|
if hints.get("priority")=="speed" and "qwen3.5-9b-vlm" in avail:
|
|
return select_best_gpu(["qwen3.5-9b-vlm"], "hint_speed") or {"model":"qwen3.5-9b-vlm","reason":"hint_speed"}
|
|
if hints.get("priority")=="quality" and "qwen3.6-27B-code" in avail:
|
|
return select_best_gpu(["qwen3.6-27B-code"], "hint_quality") or {"model":"qwen3.6-27B-code","reason":"hint_quality"}
|
|
|
|
first_msg = msgs[0].get("content","") if msgs else ""
|
|
words = len(first_msg.split()) if isinstance(first_msg, str) else 99
|
|
|
|
# TIER 1: Lightweight — single-turn short queries → VLM first
|
|
if not sys and turns <= 1 and words <= 100 and "qwen3.5-9b-vlm" in avail:
|
|
if not is_gpu_busy("qwen3.5-9b-vlm"):
|
|
return {"model":"qwen3.5-9b-vlm","reason":"lightweight"}
|
|
# VLM busy — fall back to Dense, then MoE
|
|
fallback = [m for m in ["qwen3.6-27B-code","qwen3.6-35B-A3B"] if m in avail]
|
|
result = select_best_gpu(fallback, "lightweight_fallback")
|
|
if result: return result
|
|
|
|
# TIER 2: Simple conversations — short context, any prompt → VLM preferred
|
|
if t <= 1000 and turns <= 4 and "qwen3.5-9b-vlm" in avail:
|
|
if not is_gpu_busy("qwen3.5-9b-vlm"):
|
|
return {"model":"qwen3.5-9b-vlm","reason":"simple_conv"}
|
|
# VLM busy — try Dense
|
|
if "qwen3.6-27B-code" in avail and not is_gpu_busy("qwen3.6-27B-code"):
|
|
return {"model":"qwen3.6-27B-code","reason":"simple_conv_fallback"}
|
|
|
|
# TIER 3: Heavy reasoning — large context or very long conversations
|
|
if t > 4000 or turns > 8:
|
|
candidates = [m for m in ["qwen3.6-27B-code","qwen3.6-35B-A3B","qwen3.5-9b-vlm"] if m in avail]
|
|
result = select_best_gpu(candidates, "heavy_reasoning")
|
|
if result: return result
|
|
|
|
# TIER 4: Default — Dense preferred for medium tasks, MoE as workhorse, VLM as overflow
|
|
if turns <= 6 and t <= 4000:
|
|
# Medium complexity — try Dense first, then MoE, then VLM
|
|
candidates = [m for m in ["qwen3.6-27B-code","qwen3.6-35B-A3B","qwen3.5-9b-vlm"] if m in avail]
|
|
result = select_best_gpu(candidates, "medium_task")
|
|
if result: return result
|
|
|
|
# Fallback — best available
|
|
if "qwen3.6-35B-A3B" in avail and not is_gpu_busy("qwen3.6-35B-A3B"):
|
|
return {"model":"qwen3.6-35B-A3B","reason":"default_moe"}
|
|
result = select_best_gpu([m for m in avail], "fallback")
|
|
if result: return result
|
|
return {"model":avail[0],"reason":"last_resort"}
|
|
|
|
def clean_unicode(text):
|
|
if not isinstance(text, str): return text
|
|
text = text.replace(chr(0x2014), "-"); text = text.replace(chr(0x2013), "-")
|
|
text = text.replace(chr(0x2018), "'"); text = text.replace(chr(0x2019), "'")
|
|
text = text.replace(chr(0x201C), '"'); text = text.replace(chr(0x201D), '"')
|
|
text = text.replace(chr(0x2026), "..."); text = text.replace(chr(0x00A0), " ")
|
|
return text.encode("ascii", "ignore").decode("ascii")
|
|
|
|
def clean_response(d):
|
|
if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()}
|
|
if isinstance(d, list): return [clean_response(v) for v in d]
|
|
if isinstance(d, str): return clean_unicode(d)
|
|
return d
|
|
|
|
def get_metrics():
|
|
d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time(),"active_requests":{}}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m)
|
|
d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w"),"active_requests":gpu_active_count(m), "max_concurrent": GPU_MAX_CONCURRENT.get(m, 1)})
|
|
d["active_requests"][m] = gpu_active_count(m)
|
|
if r:
|
|
try:
|
|
for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0)
|
|
for k,v in API_KEYS.items():
|
|
c = int(r.get("routes:agent:"+v["agent"]) or 0)
|
|
if c>0: d["agent_counts"][v["agent"]] = c
|
|
for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0)
|
|
raw = r.lrange("routes:recent",0,49)
|
|
d["recent"] = [json.loads(x) for x in raw] if raw else []
|
|
except Exception: pass
|
|
return d
|
|
|
|
def bcast():
|
|
data = get_metrics(); payload = json.dumps(data)
|
|
with sse_lock:
|
|
dead = []
|
|
for q in sse_subscribers:
|
|
try: q.put(payload)
|
|
except Exception: dead.append(q)
|
|
for q in dead: sse_subscribers.remove(q)
|
|
|
|
QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503
|
|
|
|
@app.route("/v1/chat/completions", methods=["POST"])
|
|
def chat():
|
|
try:
|
|
rd = request.get_json(force=True)
|
|
ak = request.headers.get("Authorization","").replace("Bearer ","")
|
|
ki = API_KEYS.get(ak, {"tier":"starter","agent":"unknown"})
|
|
tier, agent = ki["tier"], ki["agent"]
|
|
|
|
# Allow agent to override queue timeout via header
|
|
q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT)))
|
|
|
|
d = route(rd, tier)
|
|
queue_start = time.time()
|
|
|
|
# Queue loop: wait for a GPU slot instead of immediate 503
|
|
while d.get("saturated"):
|
|
elapsed = time.time() - queue_start
|
|
if elapsed > q_timeout:
|
|
resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5})
|
|
resp.headers["Retry-After"] = "5"
|
|
log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed)
|
|
return resp, 503
|
|
time.sleep(0.5) # poll every 500ms
|
|
d = route(rd, tier)
|
|
|
|
waited = time.time() - queue_start
|
|
if waited > 0.5:
|
|
log.info("QUEUED: %s waited %.1fs before slot opened", agent, waited)
|
|
model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
|
|
is_stream = rd.get("stream", False)
|
|
|
|
gpu_incr(model)
|
|
|
|
log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1))
|
|
if r:
|
|
try:
|
|
r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent)
|
|
r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H"))
|
|
r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent}))
|
|
r.ltrim("routes:recent",0,999)
|
|
except Exception: pass
|
|
start = time.time()
|
|
resp = requests.post(url+"/chat/completions", json=rd,
|
|
headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream)
|
|
lat = int((time.time()-start)*1000)
|
|
gpu_decr(model)
|
|
|
|
if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502
|
|
if is_stream:
|
|
def gen():
|
|
for raw in resp.iter_content(chunk_size=None, decode_unicode=True):
|
|
if raw: yield clean_unicode(raw)
|
|
bcast()
|
|
return Response(stream_with_context(gen()), mimetype="text/event-stream")
|
|
data = clean_response(resp.json())
|
|
for c in data.get("choices",[]):
|
|
msg = c.get("message",{})
|
|
if not msg.get("content") and msg.get("reasoning_content"):
|
|
msg["content"] = msg["reasoning_content"]
|
|
data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"active_gpu":gpu_active_count(model)}
|
|
bcast()
|
|
return jsonify(data)
|
|
except requests.Timeout:
|
|
gpu_decr(model)
|
|
log.error("TIMEOUT: %s -> %s", agent, model)
|
|
return jsonify({"error":"timeout"}), 504
|
|
except Exception as e:
|
|
gpu_decr(model)
|
|
log.error("Error: %s\n%s", e, traceback.format_exc())
|
|
return jsonify({"error":str(e)}), 500
|
|
|
|
@app.route("/v1/models")
|
|
def models(): return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":check_gpu_health(m).get("status"),"gpu":check_gpu_health(m).get("gpu_name")} for m in GPU_URLS]})
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
gpus = {}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m)
|
|
h["active_requests"] = gpu_active_count(m)
|
|
h["max_concurrent"] = GPU_MAX_CONCURRENT.get(m, 1)
|
|
gpus[m] = h
|
|
return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":gpus,"available_models":available_models()})
|
|
|
|
@app.route("/metrics")
|
|
def metrics(): return jsonify(get_metrics())
|
|
|
|
@app.route("/metrics/timeseries")
|
|
def metrics_timeseries():
|
|
period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys())
|
|
data = {"models": {}, "labels": []}
|
|
if period == "day":
|
|
buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
|
|
data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
|
|
elif period == "week":
|
|
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
|
|
data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
|
|
else:
|
|
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
|
|
data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
|
|
if r:
|
|
for model in models_list:
|
|
counts = []
|
|
for bucket in buckets:
|
|
total = 0
|
|
if period in ("week","month"):
|
|
for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0)
|
|
else: total = int(r.get("ts:"+model+":"+bucket) or 0)
|
|
counts.append(total)
|
|
data["models"][model] = counts
|
|
return jsonify(data)
|
|
|
|
@app.route("/stream")
|
|
def stream():
|
|
def ev():
|
|
q = queue.Queue()
|
|
with sse_lock: sse_subscribers.append(q)
|
|
try:
|
|
yield "data: "+json.dumps(get_metrics())+"\n\n"
|
|
while True:
|
|
try: yield "data: "+q.get(timeout=3)+"\n\n"
|
|
except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n"
|
|
except GeneratorExit: pass
|
|
finally:
|
|
with sse_lock:
|
|
if q in sse_subscribers: sse_subscribers.remove(q)
|
|
return Response(stream_with_context(ev()), mimetype="text/event-stream",
|
|
headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"})
|
|
|
|
if __name__ == "__main__":
|
|
log.info("Router on :9000 (load-aware)")
|
|
app.run(host="0.0.0.0", port=9000, debug=False)
|