beb2d1790a
Mumuni's Ollama client probes /v1/props for model discovery and /v1/models/<id> for per-model details. Previously both returned 404, causing client retries. Now returns proper model properties and details.
392 lines
17 KiB
Python
392 lines
17 KiB
Python
import os, json, time, logging, traceback, threading, queue
|
|
import requests, redis
|
|
from flask import Flask, request, jsonify, Response, stream_with_context
|
|
|
|
REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379")
|
|
GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1")
|
|
GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1")
|
|
GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1")
|
|
|
|
GPU_SIDECARS = {
|
|
"qwen3.6-35B-A3B": "http://192.168.68.15:8090",
|
|
"qwen3.6-27B-code": "http://192.168.68.8:8090",
|
|
"qwen3.5-9b-vlm": "http://192.168.68.110:8090",
|
|
}
|
|
GPU_URLS = {
|
|
"qwen3.6-35B-A3B": GPU_MOE_URL,
|
|
"qwen3.6-27B-code": GPU_DENSE_URL,
|
|
"qwen3.5-9b-vlm": GPU_LIGHT_URL,
|
|
}
|
|
# Max concurrent requests per GPU (based on llama.cpp --parallel)
|
|
GPU_MAX_CONCURRENT = {
|
|
"qwen3.6-35B-A3B": 2, # 2 slots
|
|
"qwen3.6-27B-code": 2, # 2 slots
|
|
"qwen3.5-9b-vlm": 2, # 2 slots (12GB VRAM, 4GB headroom)
|
|
}
|
|
|
|
TIER_MODELS = {
|
|
"starter": ["qwen3.5-9b-vlm"],
|
|
"professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
|
|
"enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"],
|
|
}
|
|
API_KEYS = {
|
|
"sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"},
|
|
"sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"},
|
|
"sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"},
|
|
"sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"},
|
|
"sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"},
|
|
"sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"},
|
|
"sk-syslog-koonimo": {"tier": "enterprise", "agent": "Koonimo"},
|
|
"sk-starter-abc123": {"tier": "starter", "agent": "test-starter"},
|
|
"sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"},
|
|
}
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s")
|
|
log = logging.getLogger("router")
|
|
try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping()
|
|
except Exception: r = None
|
|
|
|
|
|
def counter_audit_loop():
|
|
"""Every 30s, check GPU slots and reset counters if all slots idle."""
|
|
while True:
|
|
time.sleep(30)
|
|
if not r: continue
|
|
for model, url in GPU_URLS.items():
|
|
try:
|
|
resp = requests.get(url.replace("/v1","") + "/slots",
|
|
headers={"Authorization": "Bearer not-needed"}, timeout=5)
|
|
if resp.status_code == 200:
|
|
slots = resp.json()
|
|
all_idle = all(not s.get("is_processing", False) for s in slots)
|
|
if all_idle:
|
|
current = int(r.get("active:" + model) or 0)
|
|
if current > 0:
|
|
r.set("active:" + model, 0)
|
|
log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current)
|
|
except Exception:
|
|
pass
|
|
|
|
threading.Thread(target=counter_audit_loop, daemon=True).start()
|
|
|
|
app = Flask(__name__)
|
|
sse_subscribers = []; sse_lock = threading.Lock()
|
|
|
|
def gpu_active_count(model):
|
|
"""Get number of in-flight requests for a GPU."""
|
|
if r:
|
|
return int(r.get("active:" + model) or 0)
|
|
return 0
|
|
|
|
def gpu_incr(model):
|
|
if r: r.incr("active:" + model)
|
|
|
|
def gpu_decr(model):
|
|
if r:
|
|
v = r.decr("active:" + model)
|
|
if v and int(v) < 0:
|
|
r.set("active:" + model, 0) # never go negative
|
|
|
|
def check_gpu_health(model):
|
|
url = GPU_SIDECARS.get(model)
|
|
if not url: return {"status": "unknown"}
|
|
try:
|
|
resp = requests.get(url, timeout=5)
|
|
if resp.status_code == 200:
|
|
d = resp.json()
|
|
pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100
|
|
status = "healthy" if pct < 90 else "saturated"
|
|
# Also check if llama.cpp endpoint is actually responding
|
|
gpu_url = GPU_URLS.get(model, "")
|
|
try:
|
|
hr = requests.get(gpu_url.replace("/v1","") + "/health", headers={"Authorization": "Bearer not-needed"}, timeout=3)
|
|
if hr.status_code != 200:
|
|
status = "down"
|
|
except Exception:
|
|
status = "down"
|
|
return {"status": status, "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")}
|
|
except Exception: pass
|
|
return {"status": "down"}
|
|
|
|
def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")]
|
|
|
|
def estimate_tokens(msgs): return sum(len(str(m.get("content",""))) for m in msgs) // 4
|
|
|
|
def is_gpu_busy(model):
|
|
"""Check if GPU is at or near max concurrent capacity."""
|
|
active = gpu_active_count(model)
|
|
max_c = GPU_MAX_CONCURRENT.get(model, 1)
|
|
return active >= max_c
|
|
|
|
def select_best_gpu(candidates, reason):
|
|
"""Pick the best GPU from candidates, preferring least-loaded."""
|
|
best = None
|
|
best_load = 999
|
|
for m in candidates:
|
|
load = gpu_active_count(m)
|
|
if load < best_load:
|
|
best_load = load
|
|
best = m
|
|
if best:
|
|
actual_reason = reason
|
|
if is_gpu_busy(best):
|
|
actual_reason = "load_balanced_" + reason
|
|
return {"model": best, "reason": actual_reason}
|
|
return None
|
|
|
|
def route(rd, tier):
|
|
msgs = rd.get("messages",[]); t = estimate_tokens(msgs)
|
|
sys = any(m.get("role")=="system" for m in msgs)
|
|
turns = len([m for m in msgs if m.get("role") in ("user","assistant")])
|
|
hints = rd.get("routing_hints",{})
|
|
allowed = TIER_MODELS.get(tier, ["qwen3.5-9b-vlm"])
|
|
avail = [m for m in available_models() if m in allowed]
|
|
if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True}
|
|
|
|
req = rd.get("model","auto")
|
|
if req != "auto":
|
|
target = req if req in avail else avail[0]
|
|
# If explicit model is busy, check if another can take it
|
|
if is_gpu_busy(target) and req in allowed:
|
|
alts = [m for m in avail if m != target and m in allowed]
|
|
if alts:
|
|
alt = select_best_gpu(alts, "explicit")
|
|
if alt: return alt
|
|
return {"model": target, "reason": "explicit"}
|
|
|
|
if hints:
|
|
if hints.get("priority")=="speed" and "qwen3.5-9b-vlm" in avail:
|
|
return select_best_gpu(["qwen3.5-9b-vlm"], "hint_speed") or {"model":"qwen3.5-9b-vlm","reason":"hint_speed"}
|
|
if hints.get("priority")=="quality" and "qwen3.6-27B-code" in avail:
|
|
return select_best_gpu(["qwen3.6-27B-code"], "hint_quality") or {"model":"qwen3.6-27B-code","reason":"hint_quality"}
|
|
|
|
# Heavy -> dense (but fall back to MoE if dense is busy)
|
|
if t > 4000 or sys or turns > 6:
|
|
candidates = ["qwen3.6-27B-code","qwen3.6-35B-A3B","qwen3.5-9b-vlm"]
|
|
candidates = [m for m in candidates if m in avail]
|
|
result = select_best_gpu(candidates, "heavy_reasoning")
|
|
if result: return result
|
|
|
|
# Ultra-light -> VLM
|
|
first_msg = msgs[0].get("content","") if msgs else ""
|
|
words = len(first_msg.split()) if isinstance(first_msg, str) else 99
|
|
if words <= 3 and turns <= 1 and not sys and "qwen3.5-9b-vlm" in avail:
|
|
if not is_gpu_busy("qwen3.5-9b-vlm"):
|
|
return {"model":"qwen3.5-9b-vlm","reason":"ultra_light"}
|
|
|
|
# Default: MoE, fall back to dense if MoE is busy
|
|
if "qwen3.6-35B-A3B" in avail:
|
|
if is_gpu_busy("qwen3.6-35B-A3B") and "qwen3.6-27B-code" in avail:
|
|
return {"model": "qwen3.6-27B-code", "reason": "load_balanced_default"}
|
|
return {"model":"qwen3.6-35B-A3B","reason":"default_moe"}
|
|
|
|
return {"model":avail[0],"reason":"fallback"}
|
|
|
|
def clean_unicode(text):
|
|
if not isinstance(text, str): return text
|
|
text = text.replace(chr(0x2014), "-"); text = text.replace(chr(0x2013), "-")
|
|
text = text.replace(chr(0x2018), "'"); text = text.replace(chr(0x2019), "'")
|
|
text = text.replace(chr(0x201C), '"'); text = text.replace(chr(0x201D), '"')
|
|
text = text.replace(chr(0x2026), "..."); text = text.replace(chr(0x00A0), " ")
|
|
return text.encode("ascii", "ignore").decode("ascii")
|
|
|
|
def clean_response(d):
|
|
if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()}
|
|
if isinstance(d, list): return [clean_response(v) for v in d]
|
|
if isinstance(d, str): return clean_unicode(d)
|
|
return d
|
|
|
|
def get_metrics():
|
|
d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time(),"active_requests":{}}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m)
|
|
d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w"),"active_requests":gpu_active_count(m), "max_concurrent": GPU_MAX_CONCURRENT.get(m, 1)})
|
|
d["active_requests"][m] = gpu_active_count(m)
|
|
if r:
|
|
try:
|
|
for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0)
|
|
for k,v in API_KEYS.items():
|
|
c = int(r.get("routes:agent:"+v["agent"]) or 0)
|
|
if c>0: d["agent_counts"][v["agent"]] = c
|
|
for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0)
|
|
raw = r.lrange("routes:recent",0,49)
|
|
d["recent"] = [json.loads(x) for x in raw] if raw else []
|
|
except Exception: pass
|
|
return d
|
|
|
|
def bcast():
|
|
data = get_metrics(); payload = json.dumps(data)
|
|
with sse_lock:
|
|
dead = []
|
|
for q in sse_subscribers:
|
|
try: q.put(payload)
|
|
except Exception: dead.append(q)
|
|
for q in dead: sse_subscribers.remove(q)
|
|
|
|
QUEUE_TIMEOUT = int(os.environ.get("QUEUE_TIMEOUT", "30")) # max seconds to queue before 503
|
|
|
|
@app.route("/v1/chat/completions", methods=["POST"])
|
|
def chat():
|
|
try:
|
|
rd = request.get_json(force=True)
|
|
ak = request.headers.get("Authorization","").replace("Bearer ","")
|
|
ki = API_KEYS.get(ak, {"tier":"starter","agent":"unknown"})
|
|
tier, agent = ki["tier"], ki["agent"]
|
|
|
|
# Allow agent to override queue timeout via header
|
|
q_timeout = int(request.headers.get("X-Queue-Timeout", str(QUEUE_TIMEOUT)))
|
|
|
|
d = route(rd, tier)
|
|
queue_start = time.time()
|
|
|
|
# Queue loop: wait for a GPU slot instead of immediate 503
|
|
while d.get("saturated"):
|
|
elapsed = time.time() - queue_start
|
|
if elapsed > q_timeout:
|
|
resp = jsonify({"error": "All GPUs saturated", "queued_s": round(elapsed,1), "retry_after_s": 5})
|
|
resp.headers["Retry-After"] = "5"
|
|
log.warning("QUEUE_TIMEOUT: %s waited %.1fs, all GPUs saturated", agent, elapsed)
|
|
return resp, 503
|
|
time.sleep(0.5) # poll every 500ms
|
|
d = route(rd, tier)
|
|
|
|
waited = time.time() - queue_start
|
|
if waited > 0.5:
|
|
log.info("QUEUED: %s waited %.1fs before slot opened", agent, waited)
|
|
model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]]
|
|
is_stream = rd.get("stream", False)
|
|
|
|
gpu_incr(model)
|
|
|
|
log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1))
|
|
if r:
|
|
try:
|
|
r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent)
|
|
r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H"))
|
|
r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent}))
|
|
r.ltrim("routes:recent",0,999)
|
|
except Exception: pass
|
|
start = time.time()
|
|
resp = requests.post(url+"/chat/completions", json=rd,
|
|
headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream)
|
|
lat = int((time.time()-start)*1000)
|
|
gpu_decr(model)
|
|
|
|
if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502
|
|
if is_stream:
|
|
def gen():
|
|
for raw in resp.iter_content(chunk_size=None, decode_unicode=True):
|
|
if raw: yield clean_unicode(raw)
|
|
bcast()
|
|
return Response(stream_with_context(gen()), mimetype="text/event-stream")
|
|
data = clean_response(resp.json())
|
|
for c in data.get("choices",[]):
|
|
msg = c.get("message",{})
|
|
if not msg.get("content") and msg.get("reasoning_content"):
|
|
msg["content"] = msg["reasoning_content"]
|
|
data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"active_gpu":gpu_active_count(model)}
|
|
bcast()
|
|
return jsonify(data)
|
|
except requests.Timeout:
|
|
gpu_decr(model)
|
|
log.error("TIMEOUT: %s -> %s", agent, model)
|
|
return jsonify({"error":"timeout"}), 504
|
|
except Exception as e:
|
|
gpu_decr(model)
|
|
log.error("Error: %s\n%s", e, traceback.format_exc())
|
|
return jsonify({"error":str(e)}), 500
|
|
|
|
@app.route("/v1/models")
|
|
def models(): return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":check_gpu_health(m).get("status"),"gpu":check_gpu_health(m).get("gpu_name")} for m in GPU_URLS]})
|
|
|
|
@app.route("/v1/props")
|
|
def props():
|
|
"""Ollama-compatible model properties endpoint."""
|
|
props = {}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m)
|
|
props[m] = {
|
|
"status": h.get("status", "unknown"),
|
|
"gpu": h.get("gpu_name", m),
|
|
"max_concurrent": GPU_MAX_CONCURRENT.get(m, 1),
|
|
"active_requests": gpu_active_count(m),
|
|
"vram_used_mb": h.get("vram_used_mb"),
|
|
"vram_total_mb": h.get("vram_total_mb"),
|
|
}
|
|
return jsonify({"models": props})
|
|
|
|
@app.route("/v1/models/<model_id>")
|
|
def model_detail(model_id):
|
|
"""Single model detail — Ollama-compatible."""
|
|
if model_id in GPU_URLS:
|
|
h = check_gpu_health(model_id)
|
|
return jsonify({
|
|
"id": model_id,
|
|
"object": "model",
|
|
"owned_by": "syslog",
|
|
"status": h.get("status"),
|
|
"gpu": h.get("gpu_name"),
|
|
"max_concurrent": GPU_MAX_CONCURRENT.get(model_id, 1),
|
|
"active_requests": gpu_active_count(model_id),
|
|
})
|
|
return jsonify({"error": "model not found"}), 404
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
gpus = {}
|
|
for m in GPU_URLS:
|
|
h = check_gpu_health(m)
|
|
h["active_requests"] = gpu_active_count(m)
|
|
h["max_concurrent"] = GPU_MAX_CONCURRENT.get(m, 1)
|
|
gpus[m] = h
|
|
return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":gpus,"available_models":available_models()})
|
|
|
|
@app.route("/metrics")
|
|
def metrics(): return jsonify(get_metrics())
|
|
|
|
@app.route("/metrics/timeseries")
|
|
def metrics_timeseries():
|
|
period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys())
|
|
data = {"models": {}, "labels": []}
|
|
if period == "day":
|
|
buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
|
|
data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)]
|
|
elif period == "week":
|
|
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
|
|
data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)]
|
|
else:
|
|
buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
|
|
data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)]
|
|
if r:
|
|
for model in models_list:
|
|
counts = []
|
|
for bucket in buckets:
|
|
total = 0
|
|
if period in ("week","month"):
|
|
for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0)
|
|
else: total = int(r.get("ts:"+model+":"+bucket) or 0)
|
|
counts.append(total)
|
|
data["models"][model] = counts
|
|
return jsonify(data)
|
|
|
|
@app.route("/stream")
|
|
def stream():
|
|
def ev():
|
|
q = queue.Queue()
|
|
with sse_lock: sse_subscribers.append(q)
|
|
try:
|
|
yield "data: "+json.dumps(get_metrics())+"\n\n"
|
|
while True:
|
|
try: yield "data: "+q.get(timeout=3)+"\n\n"
|
|
except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n"
|
|
except GeneratorExit: pass
|
|
finally:
|
|
with sse_lock:
|
|
if q in sse_subscribers: sse_subscribers.remove(q)
|
|
return Response(stream_with_context(ev()), mimetype="text/event-stream",
|
|
headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"})
|
|
|
|
if __name__ == "__main__":
|
|
log.info("Router on :9000 (load-aware)")
|
|
app.run(host="0.0.0.0", port=9000, debug=False)
|