3cbf38e3e2
Agent conversations with system prompts easily exceed 4000 tokens, forcing everything to Dense. Now only truly heavy work triggers Dense. Most agent convos will route to MoE (default) instead.
381 lines
17 KiB
Python
381 lines
17 KiB
Python
import os, json, time, logging, traceback, threading, queue
|
|
import requests, redis
|
|
from flask import Flask, request, jsonify, Response, stream_with_context
|
|
|
|
REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379")
|
|
GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1")
|
|
GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1")
|
|
GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1")
|
|
|
|
GPU_SIDECARS = {
|
|
"qwen3.6-35B-A3B": "http://192.168.68.15:8090",
|
|
"qwen3.6-27B-code": "http://192.168.68.8:8090",
|
|
"qwen3.5-9b-vlm": "http://192.168.68.110:8090",
|
|
}
|
|
GPU_URLS = {
|
|
"qwen3.6-35B-A3B": GPU_MOE_URL,
|
|
"qwen3.6-27B-code": GPU_DENSE_URL,
|
|
"qwen3.5-9b-vlm": GPU_LIGHT_URL,
|
|
}
|
|
# Max concurrent requests per GPU (based on llama.cpp --parallel)
|
|
GPU_MAX_CONCURRENT = {
|
|
"qwen3.6-35B-A3B": 2, # 2 slots
|
|
"qwen3.6-27B-code": 2, # 2 slots
|
|
"qwen3.5-9b-vlm": 2, # 2 slots (12GB VRAM, 4GB headroom)
|
|
}
|
|
|
|
TIER_MODELS = {
|
|
"starter": ["qwen3.5-9b-vlm"],
|
|
"professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
|
|
"enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
|
|
}
|
|
API_KEYS = {
|
|
"sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"},
|
|
"sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"},
|
|
"sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"},
|
|
"sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"},
|
|
"sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"},
|
|
"sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"},
|
|
"sk-syslog-koonimo": {"tier": "enterprise", "agent": "Koonimo"},
|
|
"sk-starter-abc123": {"tier": "starter", "agent": "test-starter"},
|
|
"sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"},
|
|
}
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s")
|
|
log = logging.getLogger("router")
|
|
try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping()
|
|
except Exception: r = None
|
|
|
|
|
|
def counter_audit_loop():
|
|
"""Every 30s, check GPU slots and reset counters if all slots idle."""
|
|
while True:
|
|
time.sleep(30)
|
|
if not r: continue
|
|
for model, url in GPU_URLS.items():
|
|
try:
|
|
resp = requests.get(url.replace("/v1","") + "/slots",
|
|
headers={"Authorization": "Bearer not-needed"}, timeout=5)
|
|
if resp.status_code == 200:
|
|
slots = resp.json()
|
|
all_idle = all(not s.get("is_processing", False) for s in slots)
|
|
if all_idle:
|
|
current = int(r.get("active:" + model) or 0)
|
|
if current > 0:
|
|
r.set("active:" + model, 0)
|
|
log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current)
|
|
except Exception:
|
|
pass
|
|
|
|
threading.Thread(target=counter_audit_loop, daemon=True).start()
|
|
|
|
app = Flask(__name__)
|
|
sse_subscribers = []; sse_lock = threading.Lock()
|
|
|
|
def gpu_active_count(model):
|
|
"""Get number of in-flight requests for a GPU."""
|
|
if r:
|
|
return int(r.get("active:" + model) or 0)
|
|
return 0
|
|
|
|
def gpu_incr(model):
|
|
if r: r.incr("active:" + model)
|
|
|
|
def gpu_decr(model):
|
|
if r:
|
|
v = r.decr("active:" + model)
|
|
if v and int(v) < 0:
|
|
r.set("active:" + model, 0) # never go negative
|
|
|
|
def check_gpu_health(model):
|
|
url = GPU_SIDECARS.get(model)
|
|
if not url: return {"status": "unknown"}
|
|
try:
|
|
resp = requests.get(url, timeout=5)
|
|
if resp.status_code == 200:
|
|
d = resp.json()
|
|
pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100
|
|
status = "healthy" if pct < 90 else "saturated"
|
|
# Also check if llama.cpp endpoint is actually responding
|
|
gpu_url = GPU_URLS.get(model, "")
|
|
try:
|
|
hr = requests.get(gpu_url.replace("/v1","") + "/health", headers={"Authorization": "Bearer not-needed"}, timeout=3)
|
|
if hr.status_code != 200:
|
|
status = "down"
|
|
except Exception:
|
|
status = "down"
|
|
return {"status": status, "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")}
|
|
except Exception: pass
|
|
return {"status": "down"}
|
|
|
|
def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")]
|
|
|
|
def estimate_tokens(msgs): return sum(len(str(m.get("content",""))) for m in msgs) // 4
|
|
|
|
def is_gpu_busy(model):
|
|
"""Check if GPU is at or near max concurrent capacity."""
|
|
active = gpu_active_count(model)
|
|
max_c = GPU_MAX_CONCURRENT.get(model, 1)
|
|
return active >= max_c
|
|
|
|
def select_best_gpu(candidates, reason):
|
|
"""Pick the best GPU from candidates IN ORDER — first non-busy one wins."""
|
|
for m in candidates:
|
|
if not is_gpu_busy(m):
|
|
return {"model": m, "reason": reason}
|
|
# All busy — pick least loaded
|
|
best = None
|
|
best_load = 999
|
|
for m in candidates:
|
|
load = gpu_active_count(m)
|
|
if load < best_load:
|
|
best_load = load
|
|
best = m
|
|
if best:
|
|
return {"model": best, "reason": "load_balanced_" + reason}
|
|
return None
|
|
|
|
def route(rd, tier):
|
|
msgs = rd.get("messages",[]); t = estimate_tokens(msgs)
|
|
sys = any(m.get("role")=="system" for m in msgs)
|
|
turns = len([m for m in msgs if m.get("role") in ("user","assistant")])
|
|
hints = rd.get("routing_hints",{})
|
|
allowed = TIER_MODELS.get(tier, ["qwen3.5-9b-vlm"])
|
|
avail = [m for m in available_models() if m in allowed]
|
|
if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True}
|
|
|
|
req = rd.get("model","auto")
|
|
if req != "auto":
|
|
target = req if req in avail else avail[0]
|
|
# If explicit model is busy, check if another can take it
|
|
if is_gpu_busy(target) and req in allowed:
|
|
alts = [m for m in avail if m != target and m in allowed]
|
|
if alts:
|
|
alt = select_best_gpu(alts, "explicit")
|
|
if alt: return alt
|
|
return {"model": target, "reason": "explicit"}
|
|
|
|
if hints:
|
|
if hints.get("priority")=="speed" and "qwen3.5-9b-vlm" in avail:
|
|
return select_best_gpu(["qwen3.5-9b-vlm"], "hint_speed") or {"model":"qwen3.5-9b-vlm","reason":"hint_speed"}
|
|
if hints.get("priority")=="quality" and "qwen3.6-27B-code" in avail:
|
|
return select_best_gpu(["qwen3.6-27B-code"], "hint_quality") or {"model":"qwen3.6-27B-code","reason":"hint_quality"}
|
|
|
|
first_msg = msgs[0].get("content","") if msgs else ""
|
|
words = len(first_msg.split()) if isinstance(first_msg, str) else 99
|
|
|
|
# TIER 1: Lightweight — single-turn short queries → VLM first
|
|
if not sys and turns <= 1 and words <= 100 and "qwen3.5-9b-vlm" in avail:
|
|
if not is_gpu_busy("qwen3.5-9b-vlm"):
|
|
return {"model":"qwen3.5-9b-vlm","reason":"lightweight"}
|
|
# VLM busy — fall back to Dense, then MoE
|
|
fallback = [m for m in ["qwen3.6-35B-A3B","qwen3.6-27B-code"] if m in avail]
|
|
result = select_best_gpu(fallback, "lightweight_fallback")
|
|
if result: return result
|
|
|
|
# TIER 2: Simple conversations — short context, any prompt → VLM preferred
|
|
if t <= 1000 and turns <= 4 and "qwen3.5-9b-vlm" in avail:
|
|
if not is_gpu_busy("qwen3.5-9b-vlm"):
|
|
return {"model":"qwen3.5-9b-vlm","reason":"simple_conv"}
|
|
# VLM busy — try Dense
|
|
if "qwen3.6-27B-code" in avail and not is_gpu_busy("qwen3.6-27B-code"):
|
|
return {"model":"qwen3.6-27B-code","reason":"simple_conv_fallback"}
|
|
|
|
# TIER 3: Heavy reasoning — very large context or very long conversations
|
|
if t > 12000 or turns > 15:
|
|
candidates = [m for m in ["qwen3.6-27B-code","qwen3.6-35B-A3B","qwen3.5-9b-vlm"] if m in avail]
|
|
result = select_best_gpu(candidates, "heavy_reasoning")
|
|
if result: return result
|
|
|
|
# TIER 4: Default — MoE first, VLM helps, Dense last (slow)
|
|
if t <= 4000:
|
|
candidates = [m for m in ["qwen3.6-35B-A3B","qwen3.5-9b-vlm","qwen3.6-27B-code"] if m in avail]
|
|
result = select_best_gpu(candidates, "default")
|
|
if result: return result
|
|
|
|
# Fallback — best available
|
|
if "qwen3.6-35B-A3B" in avail and not is_gpu_busy("qwen3.6-35B-A3B"):
|
|
return {"model":"qwen3.6-35B-A3B","reason":"default_moe"}
|
|
result = select_best_gpu([m for m in avail], "fallback")
|
|
if result: return result
|
|
return {"model":avail[0],"reason":"last_resort"}
|
|
|
|
def clean_unicode(text):
|
|
if not isinstance(text, str): return text
|
|
text = text.replace(chr(0x2014), "-"); text = text.replace(chr(0x2013), "-")
|
|
text = text.replace(chr(0x2018), "'"); text = text.replace(chr(0x2019), "'")
|
|
text = text.replace(chr(0x201C), '"'); text = text.replace(chr(0x201D), '"')
|
|
text = text.replace(chr(0x2026), "..."); text = text.replace(chr(0x00A0), " ")
|
|
return text.encode("ascii", "ignore").decode("ascii")
|
|
|
|
def clean_response(d):
|
|
if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()}
|
|
if isinstance(d, list): return [clean_response(v) for v in d]
|
|
if isinstance(d, str): return clean_unicode(d)
|
|
return d
|
|
|
|
def get_metrics():
|
|
d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time(),"active_requests":{}}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m)
|
|
d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w"),"active_requests":gpu_active_count(m), "max_concurrent": GPU_MAX_CONCURRENT.get(m, 1)})
|
|
d["active_requests"][m] = gpu_active_count(m)
|
|
if r:
|
|
try:
|
|
for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0)
|
|
for k,v in API_KEYS.items():
|
|
c = int(r.get("routes:agent:"+v["agent"]) or 0)
|
|
if c>0: d["agent_counts"][v["agent"]] = c
|
|
for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0)
|
|
raw = r.lrange("routes:recent",0,49)
|
|
d["recent"] = [json.loads(x) for x in raw] if raw else []
|
|
except Exception: pass
|
|
return d
|
|
|
|
def bcast():
|
|
data = get_metrics(); payload = json.dumps(data)
|
|
with sse_lock:
|
|
dead = []
|
|
for q in sse_subscribers:
|
|
try: q.put(payload)
|
|
except Exception: dead.append(q)
|
|
for q in dead: sse_subscribers.remove(q)
|
|
|
|
QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503
|
|
|
|
@app.route("/v1/chat/completions", methods=["POST"])
|
|
def chat():
|
|
try:
|
|
rd = request.get_json(force=True)
|
|
ak = request.headers.get("Authorization","").replace("Bearer ","")
|
|
if not ak or ak not in API_KEYS:
|
|
log.warning("AUTH_REJECTED: no/invalid API key from %s", request.remote_addr)
|
|
return jsonify({"error": "Unauthorized — valid API key required"}), 401
|
|
ki = API_KEYS[ak]
|
|
tier, agent = ki["tier"], ki["agent"]
|
|
|
|
# Allow agent to override queue timeout via header
|
|
q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT)))
|
|
|
|
d = route(rd, tier)
|
|
queue_start = time.time()
|
|
|
|
# Queue loop: wait for a GPU slot instead of immediate 503
|
|
while d.get("saturated"):
|
|
elapsed = time.time() - queue_start
|
|
if elapsed > q_timeout:
|
|
resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5})
|
|
resp.headers["Retry-After"] = "5"
|
|
log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed)
|
|
return resp, 503
|
|
time.sleep(0.5) # poll every 500ms
|
|
d = route(rd, tier)
|
|
|
|
waited = time.time() - queue_start
|
|
if waited > 0.5:
|
|
log.info("QUEUED: %s waited %.1fs before slot opened", agent, waited)
|
|
model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
|
|
is_stream = rd.get("stream", False)
|
|
|
|
gpu_incr(model)
|
|
|
|
log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1))
|
|
if r:
|
|
try:
|
|
r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent)
|
|
r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H"))
|
|
r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent}))
|
|
r.ltrim("routes:recent",0,999)
|
|
except Exception: pass
|
|
start = time.time()
|
|
resp = requests.post(url+"/chat/completions", json=rd,
|
|
headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream)
|
|
lat = int((time.time()-start)*1000)
|
|
gpu_decr(model)
|
|
|
|
if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502
|
|
if is_stream:
|
|
def gen():
|
|
for raw in resp.iter_content(chunk_size=None, decode_unicode=True):
|
|
if raw: yield clean_unicode(raw)
|
|
bcast()
|
|
return Response(stream_with_context(gen()), mimetype="text/event-stream")
|
|
data = clean_response(resp.json())
|
|
for c in data.get("choices",[]):
|
|
msg = c.get("message",{})
|
|
if not msg.get("content") and msg.get("reasoning_content"):
|
|
msg["content"] = msg["reasoning_content"]
|
|
data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"active_gpu":gpu_active_count(model)}
|
|
bcast()
|
|
return jsonify(data)
|
|
except requests.Timeout:
|
|
gpu_decr(model)
|
|
log.error("TIMEOUT: %s -> %s", agent, model)
|
|
return jsonify({"error":"timeout"}), 504
|
|
except Exception as e:
|
|
gpu_decr(model)
|
|
log.error("Error: %s\n%s", e, traceback.format_exc())
|
|
return jsonify({"error":str(e)}), 500
|
|
|
|
@app.route("/v1/models")
|
|
def models(): return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":check_gpu_health(m).get("status"),"gpu":check_gpu_health(m).get("gpu_name")} for m in GPU_URLS]})
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
gpus = {}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m)
|
|
h["active_requests"] = gpu_active_count(m)
|
|
h["max_concurrent"] = GPU_MAX_CONCURRENT.get(m, 1)
|
|
gpus[m] = h
|
|
return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":gpus,"available_models":available_models()})
|
|
|
|
@app.route("/metrics")
|
|
def metrics(): return jsonify(get_metrics())
|
|
|
|
@app.route("/metrics/timeseries")
|
|
def metrics_timeseries():
|
|
period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys())
|
|
data = {"models": {}, "labels": []}
|
|
if period == "day":
|
|
buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
|
|
data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
|
|
elif period == "week":
|
|
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
|
|
data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
|
|
else:
|
|
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
|
|
data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
|
|
if r:
|
|
for model in models_list:
|
|
counts = []
|
|
for bucket in buckets:
|
|
total = 0
|
|
if period in ("week","month"):
|
|
for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0)
|
|
else: total = int(r.get("ts:"+model+":"+bucket) or 0)
|
|
counts.append(total)
|
|
data["models"][model] = counts
|
|
return jsonify(data)
|
|
|
|
@app.route("/stream")
|
|
def stream():
|
|
def ev():
|
|
q = queue.Queue()
|
|
with sse_lock: sse_subscribers.append(q)
|
|
try:
|
|
yield "data: "+json.dumps(get_metrics())+"\n\n"
|
|
while True:
|
|
try: yield "data: "+q.get(timeout=3)+"\n\n"
|
|
except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n"
|
|
except GeneratorExit: pass
|
|
finally:
|
|
with sse_lock:
|
|
if q in sse_subscribers: sse_subscribers.remove(q)
|
|
return Response(stream_with_context(ev()), mimetype="text/event-stream",
|
|
headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"})
|
|
|
|
if __name__ == "__main__":
|
|
log.info("Router on :9000 (load-aware)")
|
|
app.run(host="0.0.0.0", port=9000, debug=False)
|