From 28fc57c5c7f681f5e84f36c03fdc722fea67b4b6 Mon Sep 17 00:00:00 2001 From: Abiba Date: Tue, 19 May 2026 15:03:47 +0000 Subject: [PATCH] May 19, 2026: Full harness update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Model migration: gemma-4-E4B → qwen3.5-9b-vlm - Dashboard reorder: Usage Over Time + GPU Metrics to top - Router counter leak fix (gpu_decr in except handler) - VLM slot upgrade 1→2 - Automated maintenance cron job - LiteLLM config update --- .gitignore | 3 + dashboard/Dockerfile | 7 + dashboard/dashboard.py | 232 +++++++++++++++++++ dashboard/requirements.txt | 2 + docker-compose.yml | 99 ++++++++ docker-compose.yml.bak | 97 ++++++++ litellm_config.yaml | 25 ++ maintenance.sh | 37 +++ nginx/nginx.conf | 88 +++++++ router/Dockerfile | 9 + router/http_patch.py | 90 ++++++++ router/requirements.txt | 3 + router/router.py | 341 +++++++++++++++++++++++++++ router/router.py.bak.20260518074236 | 342 ++++++++++++++++++++++++++++ router/ts_patch.py | 80 +++++++ 15 files changed, 1455 insertions(+) create mode 100644 .gitignore create mode 100644 dashboard/Dockerfile create mode 100644 dashboard/dashboard.py create mode 100644 dashboard/requirements.txt create mode 100644 docker-compose.yml create mode 100644 docker-compose.yml.bak create mode 100644 litellm_config.yaml create mode 100755 maintenance.sh create mode 100644 nginx/nginx.conf create mode 100644 router/Dockerfile create mode 100644 router/http_patch.py create mode 100644 router/requirements.txt create mode 100644 router/router.py create mode 100644 router/router.py.bak.20260518074236 create mode 100644 router/ts_patch.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eeb7737 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.git +__pycache__/ +*.pyc diff --git a/dashboard/Dockerfile b/dashboard/Dockerfile new file mode 100644 index 0000000..2e207e3 --- /dev/null +++ b/dashboard/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.12-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY dashboard.py . +EXPOSE 3000 +CMD ["python", "dashboard.py"] diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py new file mode 100644 index 0000000..ecda580 --- /dev/null +++ b/dashboard/dashboard.py @@ -0,0 +1,232 @@ +"""SyslogAI Harness Dashboard — Modern Design.""" +import os, json, time, queue, threading +import requests +from flask import Flask, request, render_template_string, Response, stream_with_context + +ROUTER_METRICS = os.environ.get("ROUTER_METRICS_URL", "http://router:9000/metrics") +app = Flask(__name__) +sse_subscribers = []; sse_lock = threading.Lock() + +def fetch_state(): + try: + r = requests.get(ROUTER_METRICS, timeout=5) + if r.status_code == 200: return r.json() + except Exception: pass + return {"gpus":[],"route_counts":{},"agent_counts":{},"recent":[],"timestamp":time.time()} + +def broadcast_loop(): + while True: + time.sleep(3) + data = fetch_state(); payload = json.dumps(data) + with sse_lock: + dead = [q for q in sse_subscribers if not q.put(payload)] + for q in dead: sse_subscribers.remove(q) +threading.Thread(target=broadcast_loop, daemon=True).start() + +DASHBOARD_HTML = r""" + + + +SyslogAI Harness + + + + + + +
+
+
⚡ SyslogAI Harness
+
+ + live · +
+
+
+
0
Requests
+
0
Active
+
0
Agents
+
+
+ +
+ +
+ Usage Over Time +
+ + + +
+
+
GPU Metrics
+ + +
Loading...
+
Loading...
+
Loading...
+ + +
Queue Status
+
Model Distribution
+
Agent Activity
+ + +
Live Stream
+
+ + +
TimeAgentModelReasonTier
+
+
+ + + +""" + +@app.route("/") +def dashboard(): return render_template_string(DASHBOARD_HTML) + +@app.route("/api/state") +def api_state(): return fetch_state() + +@app.route("/api/timeseries") +def api_timeseries(): + period = request.args.get("period", "day") + try: + r = requests.get("http://router:9000/metrics/timeseries?period=" + period, timeout=5) + if r.status_code == 200: return r.json() + except Exception: pass + return {"models": {}, "labels": []} + +@app.route("/api/stream") +def api_stream(): + def ev(): + q = queue.Queue() + with sse_lock: sse_subscribers.append(q) + try: + yield "data: "+json.dumps(fetch_state())+"\n\n" + while True: + try: msg = q.get(timeout=3); yield "data: "+msg+"\n\n" + except queue.Empty: yield "data: "+json.dumps(fetch_state())+"\n\n" + except GeneratorExit: pass + finally: + with sse_lock: + if q in sse_subscribers: sse_subscribers.remove(q) + return Response(stream_with_context(ev()), mimetype="text/event-stream", headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"}) + +@app.route("/health") +def health(): return {"status":"healthy","service":"harness-dashboard"} + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=3000, debug=False) diff --git a/dashboard/requirements.txt b/dashboard/requirements.txt new file mode 100644 index 0000000..b1d9136 --- /dev/null +++ b/dashboard/requirements.txt @@ -0,0 +1,2 @@ +flask==3.1.* +requests==2.32.* diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7ded8b3 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,99 @@ +version: '3.8' + +services: + redis: + image: redis:7-alpine + container_name: harness-redis + restart: unless-stopped + ports: + - "127.0.0.1:6379:6379" + volumes: + - redis-data:/data + command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 3s + retries: 5 + + router: + build: ./router + container_name: harness-router + restart: unless-stopped + ports: + - "127.0.0.1:9000:9000" + environment: + - REDIS_URL=redis://redis:6379 + - GPU_MOE_URL=http://192.168.68.15:8080/v1 + - GPU_DENSE_URL=http://192.168.68.8:8080/v1 + - GPU_LIGHT_URL=http://192.168.68.110:8080/v1 + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:9000/health')"] + interval: 15s + timeout: 5s + retries: 3 + depends_on: + redis: + condition: service_healthy + + litellm: + image: ghcr.io/berriai/litellm:main-stable + command: ["--config", "/app/config.yaml", "--port", "4000"] + container_name: harness-litellm + restart: unless-stopped + ports: + - "127.0.0.1:8081:4000" + volumes: + - ./litellm_config.yaml:/app/config.yaml + environment: + - LITELLM_MASTER_KEY=sk-sys...-key + extra_hosts: + - "host.docker.internal:host-gateway" + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:4000/health/liveliness')"] + interval: 15s + timeout: 5s + retries: 3 + depends_on: + redis: + condition: service_healthy + + nginx: + image: nginx:alpine + container_name: harness-nginx + restart: unless-stopped + ports: + - "80:80" + volumes: + - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro + healthcheck: + test: ["CMD", "curl", "-f", "http://127.0.0.1/health"] + interval: 15s + timeout: 5s + retries: 3 + depends_on: + - litellm + - dashboard + + dashboard: + build: ./dashboard + container_name: harness-dashboard + restart: unless-stopped + ports: + - "127.0.0.1:3000:3000" + environment: + - REDIS_URL=redis://redis:6379 + - GPU_SIDECARS=192.168.68.15:8090,192.168.68.8:8090,192.168.68.110:8090 + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3000/health')"] + interval: 15s + timeout: 5s + retries: 3 + depends_on: + - redis + +volumes: + redis-data: + +# LiteLLM command override to load config +# (appended to fix config loading issue) diff --git a/docker-compose.yml.bak b/docker-compose.yml.bak new file mode 100644 index 0000000..ceb2bbb --- /dev/null +++ b/docker-compose.yml.bak @@ -0,0 +1,97 @@ +version: '3.8' + +services: + redis: + image: redis:7-alpine + container_name: harness-redis + restart: unless-stopped + ports: + - "127.0.0.1:6379:6379" + volumes: + - redis-data:/data + command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 3s + retries: 5 + + router: + build: ./router + container_name: harness-router + restart: unless-stopped + ports: + - "9000:9000" + environment: + - REDIS_URL=redis://redis:6379 + - GPU_MOE_URL=http://192.168.68.15:8080/v1 + - GPU_DENSE_URL=http://192.168.68.8:8080/v1 + - GPU_LIGHT_URL=http://192.168.68.110:8080/v1 + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:9000/health')"] + interval: 15s + timeout: 5s + retries: 3 + depends_on: + redis: + condition: service_healthy + + litellm: + image: ghcr.io/berriai/litellm:main-stable + command: ["--config", "/app/config.yaml", "--port", "4000"] + container_name: harness-litellm + restart: unless-stopped + ports: + - "8081:4000" + volumes: + - ./litellm_config.yaml:/app/config.yaml + environment: + - LITELLM_MASTER_KEY=sk-syslog-local-master-key + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:4000/health/liveliness')"] + interval: 15s + timeout: 5s + retries: 3 + depends_on: + redis: + condition: service_healthy + + nginx: + image: nginx:alpine + container_name: harness-nginx + restart: unless-stopped + ports: + - "80:80" + volumes: + - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro + healthcheck: + test: ["CMD", "curl", "-f", "http://127.0.0.1/health"] + interval: 15s + timeout: 5s + retries: 3 + depends_on: + - litellm + - dashboard + + dashboard: + build: ./dashboard + container_name: harness-dashboard + restart: unless-stopped + ports: + - "3000:3000" + environment: + - REDIS_URL=redis://redis:6379 + - GPU_SIDECARS=192.168.68.15:8090,192.168.68.8:8090,192.168.68.110:8090 + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3000/health')"] + interval: 15s + timeout: 5s + retries: 3 + depends_on: + - redis + +volumes: + redis-data: + +# LiteLLM command override to load config +# (appended to fix config loading issue) diff --git a/litellm_config.yaml b/litellm_config.yaml new file mode 100644 index 0000000..b7bd2d2 --- /dev/null +++ b/litellm_config.yaml @@ -0,0 +1,25 @@ +model_list: + - model_name: qwen3.6-35B-A3B + litellm_params: + model: openai/qwen3.6-35B-A3B + api_base: http://192.168.68.15:8080/v1 + api_key: "not-needed" + + - model_name: qwen3.6-27B-code + litellm_params: + model: openai/qwen3.6-27B-code-text + api_base: http://192.168.68.8:8080/v1 + api_key: "not-needed" + + - model_name: qwen3.5-9b-vlm + litellm_params: + model: openai/qwen3.5-9b-vlm + api_base: http://192.168.68.110:8080/v1 + api_key: "not-needed" + +general_settings: + master_key: sk-syslog-local-master-key + +litellm_settings: + drop_params: true + request_timeout: 120 diff --git a/maintenance.sh b/maintenance.sh new file mode 100755 index 0000000..e72225b --- /dev/null +++ b/maintenance.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# SyslogAI Harness — Automated Maintenance +# Runs daily via cron + +LOG="/var/log/harness-maintenance.log" +echo "=== $(date) ===" >> "$LOG" + +# 1. Clean Redis timeseries keys older than 60 days +CUTOFF=$(date -d "60 days ago" +%Y%m%d%H) +echo "Redis: removing ts:* keys older than $CUTOFF" >> "$LOG" +DELETED=0 +for key in $(docker exec harness-redis redis-cli KEYS "ts:*" 2>/dev/null); do + TS=$(echo "$key" | grep -oP '\d{10}$') + if [ -n "$TS" ] && [ "$TS" -lt "$CUTOFF" ] 2>/dev/null; then + docker exec harness-redis redis-cli DEL "$key" > /dev/null 2>&1 + DELETED=$((DELETED + 1)) + fi +done +echo "Redis: deleted $DELETED stale timeseries keys" >> "$LOG" + +# 2. Log stale model keys (leftover from migrations) +STALE=$(docker exec harness-redis redis-cli KEYS "*gemma*" 2>/dev/null) +if [ -n "$STALE" ]; then + echo "WARNING: stale gemma keys found: $STALE" >> "$LOG" +fi + +# 3. Prune Docker build cache (older than 7 days) +echo "Docker: pruning build cache" >> "$LOG" +docker builder prune -f --filter until=168h >> "$LOG" 2>&1 + +# 4. Log container health status +docker ps --format "table {{.Names}}\t{{.Status}}\t{{.RunningFor}}" >> "$LOG" 2>&1 + +# 5. Log Redis memory +docker exec harness-redis redis-cli INFO memory | grep used_memory_human >> "$LOG" 2>&1 + +echo "" >> "$LOG" diff --git a/nginx/nginx.conf b/nginx/nginx.conf new file mode 100644 index 0000000..83e5640 --- /dev/null +++ b/nginx/nginx.conf @@ -0,0 +1,88 @@ +worker_processes auto; +error_log /var/log/nginx/error.log warn; +pid /var/run/nginx.pid; + +events { worker_connections 1024; } + +http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" rt=$request_time'; + access_log /var/log/nginx/access.log main; + error_log /var/log/nginx/error.log; + sendfile on; + keepalive_timeout 65; + + upstream router_api { server router:9000; } + upstream dashboard_ui { server dashboard:3000; } + upstream litellm_backend { server litellm:4000; } + + server { + listen 80; + + # Security headers + add_header X-Content-Type-Options nosniff always; + add_header X-Frame-Options SAMEORIGIN always; + add_header X-XSS-Protection "1; mode=block" always; + + # Disable buffering for SSE streams + proxy_buffering off; + + # API — through router + location /v1/ { + proxy_pass http://router_api; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header Authorization $http_authorization; + proxy_connect_timeout 10s; + proxy_read_timeout 600s; + proxy_buffering off; + } + + # SSE streaming endpoint + location /stream { + proxy_pass http://router_api; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header Connection ""; + proxy_buffering off; + chunked_transfer_encoding off; + } + + # Dashboard API proxy for SSE + location /api/ { + proxy_pass http://dashboard_ui; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_buffering off; + } + + # LiteLLM debug + location /litellm/ { + rewrite ^/litellm/(.*) /$1 break; + proxy_pass http://litellm_backend; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header Authorization $http_authorization; + } + + # Dashboard + location / { + proxy_pass http://dashboard_ui; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_buffering off; + } + + location /health { + proxy_pass http://router_api/health; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + } + } +} diff --git a/router/Dockerfile b/router/Dockerfile new file mode 100644 index 0000000..b5d85f7 --- /dev/null +++ b/router/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.12-slim + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY router.py . + +EXPOSE 9000 +CMD ["python", "router.py"] diff --git a/router/http_patch.py b/router/http_patch.py new file mode 100644 index 0000000..4d9b249 --- /dev/null +++ b/router/http_patch.py @@ -0,0 +1,90 @@ +# Insert streaming support before the gpu_resp call +import re +with open('/opt/inference-harness/router/router.py') as f: + code = f.read() + +# Find the gpu_resp block and replace with streaming-aware version +old = ''' start = time.time() + gpu_resp = requests.post( + gpu_url + "/chat/completions", + json=req_data, + headers={"Content-Type": "application/json", "Authorization": "Bearer not-needed"}, + timeout=120, + ) + latency_ms = int((time.time() - start) * 1000) + + if gpu_resp.status_code != 200: + log.error("GPU error: %s %s", gpu_resp.status_code, gpu_resp.text[:200]) + return jsonify({"error": "GPU backend returned " + str(gpu_resp.status_code)}), 502 + + response_data = gpu_resp.json() + response_data = fix_reasoning_content(response_data) + + response_data["routing"] = { + "model": model, "reason": reason, "gpu": gpu_url, + "tier": tier, "agent": agent, "latency_ms": latency_ms, + } + + return jsonify(response_data)''' + +new = ''' start = time.time() + is_stream = req_data.get("stream", False) + + gpu_resp = requests.post( + gpu_url + "/chat/completions", + json=req_data, + headers={"Content-Type": "application/json", "Authorization": "Bearer not-needed"}, + timeout=120, + stream=is_stream, + ) + latency_ms = int((time.time() - start) * 1000) + + if gpu_resp.status_code != 200: + log.error("GPU error: %s %s", gpu_resp.status_code, gpu_resp.text[:200]) + return jsonify({"error": "GPU backend returned " + str(gpu_resp.status_code)}), 502 + + if is_stream: + # Stream response back to client + def generate(): + first = True + for line in gpu_resp.iter_lines(decode_unicode=True): + if line: + if first and line.startswith("data: "): + # Inject routing into first chunk + try: + chunk = json.loads(line[6:]) + chunk["routing"] = { + "model": model, "reason": reason, "gpu": gpu_url, + "tier": tier, "agent": agent, "latency_ms": latency_ms, + } + yield "data: " + json.dumps(chunk) + "\n\n" + first = False + continue + except Exception: + pass + yield line + "\n" + yield "data: [DONE]\n\n" + return Response(stream_with_context(generate()), mimetype="text/event-stream") + + response_data = gpu_resp.json() + response_data = fix_reasoning_content(response_data) + + response_data["routing"] = { + "model": model, "reason": reason, "gpu": gpu_url, + "tier": tier, "agent": agent, "latency_ms": latency_ms, + } + + return jsonify(response_data)''' + +code = code.replace(old, new) + +# Add missing import +if 'from flask import Flask, request, jsonify' in code: + code = code.replace( + 'from flask import Flask, request, jsonify', + 'from flask import Flask, request, jsonify, Response, stream_with_context' + ) + +with open('/opt/inference-harness/router/router.py', 'w') as f: + f.write(code) +print('Streaming support added') diff --git a/router/requirements.txt b/router/requirements.txt new file mode 100644 index 0000000..2ec289b --- /dev/null +++ b/router/requirements.txt @@ -0,0 +1,3 @@ +flask==3.1.* +redis==5.2.* +requests==2.32.* diff --git a/router/router.py b/router/router.py new file mode 100644 index 0000000..2843fb3 --- /dev/null +++ b/router/router.py @@ -0,0 +1,341 @@ +import os, json, time, logging, traceback, threading, queue +import requests, redis +from flask import Flask, request, jsonify, Response, stream_with_context + +REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379") +GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1") +GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1") +GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1") + +GPU_SIDECARS = { + "qwen3.6-35B-A3B": "http://192.168.68.15:8090", + "qwen3.6-27B-code": "http://192.168.68.8:8090", + "qwen3.5-9b-vlm": "http://192.168.68.110:8090", +} +GPU_URLS = { + "qwen3.6-35B-A3B": GPU_MOE_URL, + "qwen3.6-27B-code": GPU_DENSE_URL, + "qwen3.5-9b-vlm": GPU_LIGHT_URL, +} +# Max concurrent requests per GPU (based on llama.cpp --parallel) +GPU_MAX_CONCURRENT = { + "qwen3.6-35B-A3B": 2, # 2 slots + "qwen3.6-27B-code": 2, # 2 slots + "qwen3.5-9b-vlm": 2, # 2 slots (12GB VRAM, 4GB headroom) +} + +TIER_MODELS = { + "starter": ["qwen3.5-9b-vlm"], + "professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"], + "enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"], +} +API_KEYS = { + "sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"}, + "sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"}, + "sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"}, + "sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"}, + "sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"}, + "sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"}, + "sk-syslog-koonimo": {"tier": "enterprise", "agent": "Koonimo"}, + "sk-starter-abc123": {"tier": "starter", "agent": "test-starter"}, + "sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"}, +} + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s") +log = logging.getLogger("router") +try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping() +except Exception: r = None + + +def counter_audit_loop(): + """Every 30s, check GPU slots and reset counters if all slots idle.""" + while True: + time.sleep(30) + if not r: continue + for model, url in GPU_URLS.items(): + try: + resp = requests.get(url.replace("/v1","") + "/slots", + headers={"Authorization": "Bearer not-needed"}, timeout=5) + if resp.status_code == 200: + slots = resp.json() + all_idle = all(not s.get("is_processing", False) for s in slots) + if all_idle: + current = int(r.get("active:" + model) or 0) + if current > 0: + r.set("active:" + model, 0) + log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current) + except Exception: + pass + +threading.Thread(target=counter_audit_loop, daemon=True).start() + +app = Flask(__name__) +sse_subscribers = []; sse_lock = threading.Lock() + +def gpu_active_count(model): + """Get number of in-flight requests for a GPU.""" + if r: + return int(r.get("active:" + model) or 0) + return 0 + +def gpu_incr(model): + if r: r.incr("active:" + model) + +def gpu_decr(model): + if r: + v = r.decr("active:" + model) + if v and int(v) < 0: + r.set("active:" + model, 0) # never go negative + +def check_gpu_health(model): + url = GPU_SIDECARS.get(model) + if not url: return {"status": "unknown"} + try: + resp = requests.get(url, timeout=5) + if resp.status_code == 200: + d = resp.json() + pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100 + status = "healthy" if pct < 90 else "saturated" + # Also check if llama.cpp endpoint is actually responding + gpu_url = GPU_URLS.get(model, "") + try: + hr = requests.get(gpu_url.replace("/v1","") + "/health", headers={"Authorization": "Bearer not-needed"}, timeout=3) + if hr.status_code != 200: + status = "down" + except Exception: + status = "down" + return {"status": status, "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")} + except Exception: pass + return {"status": "down"} + +def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")] + +def estimate_tokens(msgs): return sum(len(str(m.get("content",""))) for m in msgs) // 4 + +def is_gpu_busy(model): + """Check if GPU is at or near max concurrent capacity.""" + active = gpu_active_count(model) + max_c = GPU_MAX_CONCURRENT.get(model, 1) + return active >= max_c + +def select_best_gpu(candidates, reason): + """Pick the best GPU from candidates, preferring least-loaded.""" + best = None + best_load = 999 + for m in candidates: + load = gpu_active_count(m) + if load < best_load: + best_load = load + best = m + if best: + actual_reason = reason + if is_gpu_busy(best): + actual_reason = "load_balanced_" + reason + return {"model": best, "reason": actual_reason} + return None + +def route(rd, tier): + msgs = rd.get("messages",[]); t = estimate_tokens(msgs) + sys = any(m.get("role")=="system" for m in msgs) + turns = len([m for m in msgs if m.get("role") in ("user","assistant")]) + hints = rd.get("routing_hints",{}) + allowed = TIER_MODELS.get(tier, ["qwen3.5-9b-vlm"]) + avail = [m for m in available_models() if m in allowed] + if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True} + + req = rd.get("model","auto") + if req != "auto": + target = req if req in avail else avail[0] + # If explicit model is busy, check if another can take it + if is_gpu_busy(target) and req in allowed: + alts = [m for m in avail if m != target and m in allowed] + if alts: + alt = select_best_gpu(alts, "explicit") + if alt: return alt + return {"model": target, "reason": "explicit"} + + if hints: + if hints.get("priority")=="speed" and "qwen3.5-9b-vlm" in avail: + return select_best_gpu(["qwen3.5-9b-vlm"], "hint_speed") or {"model":"qwen3.5-9b-vlm","reason":"hint_speed"} + if hints.get("priority")=="quality" and "qwen3.6-27B-code" in avail: + return select_best_gpu(["qwen3.6-27B-code"], "hint_quality") or {"model":"qwen3.6-27B-code","reason":"hint_quality"} + + # Heavy -> dense (but fall back to MoE if dense is busy) + if t > 4000 or sys or turns > 6: + candidates = ["qwen3.6-27B-code","qwen3.6-35B-A3B","qwen3.5-9b-vlm"] + candidates = [m for m in candidates if m in avail] + result = select_best_gpu(candidates, "heavy_reasoning") + if result: return result + + # Ultra-light -> VLM + first_msg = msgs[0].get("content","") if msgs else "" + words = len(first_msg.split()) if isinstance(first_msg, str) else 99 + if words <= 3 and turns <= 1 and not sys and "qwen3.5-9b-vlm" in avail: + if not is_gpu_busy("qwen3.5-9b-vlm"): + return {"model":"qwen3.5-9b-vlm","reason":"ultra_light"} + + # Default: MoE, fall back to dense if MoE is busy + if "qwen3.6-35B-A3B" in avail: + if is_gpu_busy("qwen3.6-35B-A3B") and "qwen3.6-27B-code" in avail: + return {"model": "qwen3.6-27B-code", "reason": "load_balanced_default"} + return {"model":"qwen3.6-35B-A3B","reason":"default_moe"} + + return {"model":avail[0],"reason":"fallback"} + +def clean_unicode(text): + if not isinstance(text, str): return text + text = text.replace(chr(0x2014), "-"); text = text.replace(chr(0x2013), "-") + text = text.replace(chr(0x2018), "'"); text = text.replace(chr(0x2019), "'") + text = text.replace(chr(0x201C), '"'); text = text.replace(chr(0x201D), '"') + text = text.replace(chr(0x2026), "..."); text = text.replace(chr(0x00A0), " ") + return text.encode("ascii", "ignore").decode("ascii") + +def clean_response(d): + if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()} + if isinstance(d, list): return [clean_response(v) for v in d] + if isinstance(d, str): return clean_unicode(d) + return d + +def get_metrics(): + d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time(),"active_requests":{}} + for m in GPU_URLS: + h = check_gpu_health(m) + d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w"),"active_requests":gpu_active_count(m), "max_concurrent": GPU_MAX_CONCURRENT.get(m, 1)}) + d["active_requests"][m] = gpu_active_count(m) + if r: + try: + for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0) + for k,v in API_KEYS.items(): + c = int(r.get("routes:agent:"+v["agent"]) or 0) + if c>0: d["agent_counts"][v["agent"]] = c + for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0) + raw = r.lrange("routes:recent",0,49) + d["recent"] = [json.loads(x) for x in raw] if raw else [] + except Exception: pass + return d + +def bcast(): + data = get_metrics(); payload = json.dumps(data) + with sse_lock: + dead = [] + for q in sse_subscribers: + try: q.put(payload) + except Exception: dead.append(q) + for q in dead: sse_subscribers.remove(q) + +@app.route("/v1/chat/completions", methods=["POST"]) +def chat(): + try: + rd = request.get_json(force=True) + ak = request.headers.get("Authorization","").replace("Bearer ","") + ki = API_KEYS.get(ak, {"tier":"starter","agent":"unknown"}) + tier, agent = ki["tier"], ki["agent"] + d = route(rd, tier) + if d.get("saturated"): + resp = jsonify({"error": "All GPUs saturated", "retry_after_s": 5}) + resp.headers["Retry-After"] = "5" + return resp, 503 + model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]] + is_stream = rd.get("stream", False) + + gpu_incr(model) + + log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1)) + if r: + try: + r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent) + r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H")) + r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent})) + r.ltrim("routes:recent",0,999) + except Exception: pass + start = time.time() + resp = requests.post(url+"/chat/completions", json=rd, + headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream) + lat = int((time.time()-start)*1000) + gpu_decr(model) + + if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502 + if is_stream: + def gen(): + for raw in resp.iter_content(chunk_size=None, decode_unicode=True): + if raw: yield clean_unicode(raw) + bcast() + return Response(stream_with_context(gen()), mimetype="text/event-stream") + data = clean_response(resp.json()) + for c in data.get("choices",[]): + msg = c.get("message",{}) + if not msg.get("content") and msg.get("reasoning_content"): + msg["content"] = msg["reasoning_content"] + data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"active_gpu":gpu_active_count(model)} + bcast() + return jsonify(data) + except requests.Timeout: + gpu_decr(model) + log.error("TIMEOUT: %s -> %s", agent, model) + return jsonify({"error":"timeout"}), 504 + except Exception as e: + gpu_decr(model) + log.error("Error: %s\n%s", e, traceback.format_exc()) + return jsonify({"error":str(e)}), 500 + +@app.route("/v1/models") +def models(): return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":check_gpu_health(m).get("status"),"gpu":check_gpu_health(m).get("gpu_name")} for m in GPU_URLS]}) + +@app.route("/health") +def health(): + gpus = {} + for m in GPU_URLS: + h = check_gpu_health(m) + h["active_requests"] = gpu_active_count(m) + h["max_concurrent"] = GPU_MAX_CONCURRENT.get(m, 1) + gpus[m] = h + return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":gpus,"available_models":available_models()}) + +@app.route("/metrics") +def metrics(): return jsonify(get_metrics()) + +@app.route("/metrics/timeseries") +def metrics_timeseries(): + period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys()) + data = {"models": {}, "labels": []} + if period == "day": + buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)] + data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)] + elif period == "week": + buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)] + data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)] + else: + buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)] + data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)] + if r: + for model in models_list: + counts = [] + for bucket in buckets: + total = 0 + if period in ("week","month"): + for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0) + else: total = int(r.get("ts:"+model+":"+bucket) or 0) + counts.append(total) + data["models"][model] = counts + return jsonify(data) + +@app.route("/stream") +def stream(): + def ev(): + q = queue.Queue() + with sse_lock: sse_subscribers.append(q) + try: + yield "data: "+json.dumps(get_metrics())+"\n\n" + while True: + try: yield "data: "+q.get(timeout=3)+"\n\n" + except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n" + except GeneratorExit: pass + finally: + with sse_lock: + if q in sse_subscribers: sse_subscribers.remove(q) + return Response(stream_with_context(ev()), mimetype="text/event-stream", + headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"}) + +if __name__ == "__main__": + log.info("Router on :9000 (load-aware)") + app.run(host="0.0.0.0", port=9000, debug=False) diff --git a/router/router.py.bak.20260518074236 b/router/router.py.bak.20260518074236 new file mode 100644 index 0000000..46d38d9 --- /dev/null +++ b/router/router.py.bak.20260518074236 @@ -0,0 +1,342 @@ +import os, json, time, logging, traceback, threading, queue +import requests, redis +from flask import Flask, request, jsonify, Response, stream_with_context + +REDIS_URL = os.environ.get("REDIS_URL", "redis://redis:6379") +GPU_MOE_URL = os.environ.get("GPU_MOE_URL", "http://192.168.68.15:8080/v1") +GPU_DENSE_URL = os.environ.get("GPU_DENSE_URL", "http://192.168.68.8:8080/v1") +GPU_LIGHT_URL = os.environ.get("GPU_LIGHT_URL", "http://192.168.68.110:8080/v1") + +GPU_SIDECARS = { + "qwen3.6-35B-A3B": "http://192.168.68.15:8090", + "qwen3.6-27B-code": "http://192.168.68.8:8090", + "qwen3.5-9b-vlm": "http://192.168.68.110:8090", +} +GPU_URLS = { + "qwen3.6-35B-A3B": GPU_MOE_URL, + "qwen3.6-27B-code": GPU_DENSE_URL, + "qwen3.5-9b-vlm": GPU_LIGHT_URL, +} +# Max concurrent requests per GPU (based on llama.cpp --parallel) +GPU_MAX_CONCURRENT = { + "qwen3.6-35B-A3B": 2, # 2 slots + "qwen3.6-27B-code": 2, # 2 slots + "qwen3.5-9b-vlm": 1, # 1 slot +} + +TIER_MODELS = { + "starter": ["qwen3.5-9b-vlm"], + "professional": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"], + "enterprise": ["qwen3.6-35B-A3B", "qwen3.6-27B-code", "qwen3.5-9b-vlm"], +} +API_KEYS = { + "sk-syslog-local-master-key": {"tier": "enterprise", "agent": "admin"}, + "sk-syslog-abiba": {"tier": "enterprise", "agent": "Abiba"}, + "sk-syslog-mumuni": {"tier": "enterprise", "agent": "Mumuni"}, + "sk-syslog-tanko": {"tier": "enterprise", "agent": "Tanko"}, + "sk-syslog-koby": {"tier": "enterprise", "agent": "Koby"}, + "sk-syslog-kagenz0": {"tier": "enterprise", "agent": "Kagenz0"}, + "sk-syslog-koonimo": {"tier": "enterprise", "agent": "Koonimo"}, + "sk-starter-abc123": {"tier": "starter", "agent": "test-starter"}, + "sk-professional-xyz789": {"tier": "professional", "agent": "test-pro"}, +} + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [ROUTER] %(levelname)s %(message)s") +log = logging.getLogger("router") +try: r = redis.from_url(REDIS_URL, decode_responses=True); r.ping() +except Exception: r = None + + +def counter_audit_loop(): + """Every 30s, check GPU slots and reset counters if all slots idle.""" + while True: + time.sleep(30) + if not r: continue + for model, url in GPU_URLS.items(): + try: + resp = requests.get(url.replace("/v1","") + "/slots", + headers={"Authorization": "Bearer not-needed"}, timeout=5) + if resp.status_code == 200: + slots = resp.json() + all_idle = all(not s.get("is_processing", False) for s in slots) + if all_idle: + current = int(r.get("active:" + model) or 0) + if current > 0: + r.set("active:" + model, 0) + log.info("AUDIT: Reset stuck counter for %s (was %d)", model, current) + except Exception: + pass + +threading.Thread(target=counter_audit_loop, daemon=True).start() + +app = Flask(__name__) +sse_subscribers = []; sse_lock = threading.Lock() + +def gpu_active_count(model): + """Get number of in-flight requests for a GPU.""" + if r: + return int(r.get("active:" + model) or 0) + return 0 + +def gpu_incr(model): + if r: r.incr("active:" + model) + +def gpu_decr(model): + if r: + v = r.decr("active:" + model) + if v and int(v) < 0: + r.set("active:" + model, 0) # never go negative + +def check_gpu_health(model): + url = GPU_SIDECARS.get(model) + if not url: return {"status": "unknown"} + try: + resp = requests.get(url, timeout=5) + if resp.status_code == 200: + d = resp.json() + pct = (d.get("vram_used_mb",0) / max(d.get("vram_total_mb",1), 1)) * 100 + status = "healthy" if pct < 90 else "saturated" + # Also check if llama.cpp endpoint is actually responding + gpu_url = GPU_URLS.get(model, "") + try: + hr = requests.get(gpu_url.replace("/v1","") + "/health", headers={"Authorization": "Bearer not-needed"}, timeout=3) + if hr.status_code != 200: + status = "down" + except Exception: + status = "down" + return {"status": status, "vram_used_mb": d.get("vram_used_mb"), "vram_total_mb": d.get("vram_total_mb"), "vram_pct": round(pct,1), "temp_c": d.get("temp_c"), "gpu_util_pct": d.get("gpu_util_pct"), "gpu_name": d.get("gpu_name"), "power_w": d.get("power_w"), "power_limit_w": d.get("power_limit_w")} + except Exception: pass + return {"status": "down"} + +def available_models(): return [m for m in GPU_URLS if check_gpu_health(m)["status"] in ("healthy","saturated")] + +def estimate_tokens(msgs): return sum(len(str(m.get("content",""))) for m in msgs) // 4 + +def is_gpu_busy(model): + """Check if GPU is at or near max concurrent capacity.""" + active = gpu_active_count(model) + max_c = GPU_MAX_CONCURRENT.get(model, 1) + return active >= max_c + +def select_best_gpu(candidates, reason): + """Pick the best GPU from candidates, preferring least-loaded.""" + best = None + best_load = 999 + for m in candidates: + load = gpu_active_count(m) + if load < best_load: + best_load = load + best = m + if best: + actual_reason = reason + if is_gpu_busy(best): + actual_reason = "load_balanced_" + reason + return {"model": best, "reason": actual_reason} + return None + +def route(rd, tier): + msgs = rd.get("messages",[]); t = estimate_tokens(msgs) + sys = any(m.get("role")=="system" for m in msgs) + turns = len([m for m in msgs if m.get("role") in ("user","assistant")]) + hints = rd.get("routing_hints",{}) + allowed = TIER_MODELS.get(tier, ["qwen3.5-9b-vlm"]) + avail = [m for m in available_models() if m in allowed] + if not avail: return {"model": allowed[0], "reason": "all_saturated", "saturated": True} + + req = rd.get("model","auto") + if req != "auto": + target = req if req in avail else avail[0] + # If explicit model is busy, check if another can take it + if is_gpu_busy(target) and req in allowed: + alts = [m for m in avail if m != target and m in allowed] + if alts: + alt = select_best_gpu(alts, "explicit") + if alt: return alt + return {"model": target, "reason": "explicit"} + + if hints: + if hints.get("priority")=="speed" and "qwen3.5-9b-vlm" in avail: + return select_best_gpu(["qwen3.5-9b-vlm"], "hint_speed") or {"model":"qwen3.5-9b-vlm","reason":"hint_speed"} + if hints.get("priority")=="quality" and "qwen3.6-27B-code" in avail: + return select_best_gpu(["qwen3.6-27B-code"], "hint_quality") or {"model":"qwen3.6-27B-code","reason":"hint_quality"} + + # Heavy -> dense (but fall back to MoE if dense is busy) + if t > 4000 or sys or turns > 6: + candidates = ["qwen3.6-27B-code","qwen3.6-35B-A3B","qwen3.5-9b-vlm"] + candidates = [m for m in candidates if m in avail] + result = select_best_gpu(candidates, "heavy_reasoning") + if result: return result + + # Ultra-light -> VLM + first_msg = msgs[0].get("content","") if msgs else "" + words = len(first_msg.split()) if isinstance(first_msg, str) else 99 + if words <= 3 and turns <= 1 and not sys and "qwen3.5-9b-vlm" in avail: + if not is_gpu_busy("qwen3.5-9b-vlm"): + return {"model":"qwen3.5-9b-vlm","reason":"ultra_light"} + + # Default: MoE, fall back to dense if MoE is busy + if "qwen3.6-35B-A3B" in avail: + if is_gpu_busy("qwen3.6-35B-A3B") and "qwen3.6-27B-code" in avail: + return {"model": "qwen3.6-27B-code", "reason": "load_balanced_default"} + return {"model":"qwen3.6-35B-A3B","reason":"default_moe"} + + return {"model":avail[0],"reason":"fallback"} + +def clean_unicode(text): + if not isinstance(text, str): return text + text = text.replace(chr(0x2014), "-"); text = text.replace(chr(0x2013), "-") + text = text.replace(chr(0x2018), "'"); text = text.replace(chr(0x2019), "'") + text = text.replace(chr(0x201C), '"'); text = text.replace(chr(0x201D), '"') + text = text.replace(chr(0x2026), "..."); text = text.replace(chr(0x00A0), " ") + return text.encode("ascii", "ignore").decode("ascii") + +def clean_response(d): + if isinstance(d, dict): return {k: clean_response(v) for k,v in d.items()} + if isinstance(d, list): return [clean_response(v) for v in d] + if isinstance(d, str): return clean_unicode(d) + return d + +def get_metrics(): + d = {"gpus":[],"route_counts":{},"agent_counts":{},"tier_counts":{},"recent":[],"timestamp":time.time(),"active_requests":{}} + for m in GPU_URLS: + h = check_gpu_health(m) + d["gpus"].append({"id":m,"gpu_name":h.get("gpu_name",m),"status":h.get("status"),"vram_used_mb":h.get("vram_used_mb"),"vram_total_mb":h.get("vram_total_mb"),"vram_pct":h.get("vram_pct"),"temp_c":h.get("temp_c"),"gpu_util_pct":h.get("gpu_util_pct"),"power_w":h.get("power_w"),"power_limit_w":h.get("power_limit_w"),"active_requests":gpu_active_count(m), "max_concurrent": GPU_MAX_CONCURRENT.get(m, 1)}) + d["active_requests"][m] = gpu_active_count(m) + if r: + try: + for m in GPU_URLS: d["route_counts"][m] = int(r.get("routes:"+m) or 0) + for k,v in API_KEYS.items(): + c = int(r.get("routes:agent:"+v["agent"]) or 0) + if c>0: d["agent_counts"][v["agent"]] = c + for t in TIER_MODELS: d["tier_counts"][t] = int(r.get("routes:tier:"+t) or 0) + raw = r.lrange("routes:recent",0,49) + d["recent"] = [json.loads(x) for x in raw] if raw else [] + except Exception: pass + return d + +def bcast(): + data = get_metrics(); payload = json.dumps(data) + with sse_lock: + dead = [] + for q in sse_subscribers: + try: q.put(payload) + except Exception: dead.append(q) + for q in dead: sse_subscribers.remove(q) + +@app.route("/v1/chat/completions", methods=["POST"]) +def chat(): + try: + rd = request.get_json(force=True) + ak = request.headers.get("Authorization","").replace("Bearer ","") + ki = API_KEYS.get(ak, {"tier":"starter","agent":"unknown"}) + tier, agent = ki["tier"], ki["agent"] + d = route(rd, tier) + if d.get("saturated"): + resp = jsonify({"error": "All GPUs saturated", "retry_after_s": 5}) + resp.headers["Retry-After"] = "5" + return resp, 503 + model, reason, url = d["model"], d["reason"], GPU_URLS[d["model"]] + is_stream = rd.get("stream", False) + + gpu_incr(model) + decremented = False + + log.info("ROUTE: %s -> %s (%s) stream=%s active=%d/%d", agent, model, reason, is_stream, gpu_active_count(model), GPU_MAX_CONCURRENT.get(model,1)) + if r: + try: + r.incr("routes:"+model); r.incr("routes:tier:"+tier); r.incr("routes:agent:"+agent) + r.incr("ts:"+model+":"+time.strftime("%Y%m%d%H")) + r.lpush("routes:recent", json.dumps({"ts":time.time(),"model":model,"reason":reason,"tier":tier,"agent":agent})) + r.ltrim("routes:recent",0,999) + except Exception: pass + start = time.time() + resp = requests.post(url+"/chat/completions", json=rd, + headers={"Content-Type":"application/json","Authorization":"Bearer not-needed"}, timeout=300, stream=is_stream) + lat = int((time.time()-start)*1000) + gpu_decr(model) + decremented = True # Release slot + + if resp.status_code != 200: return jsonify({"error":"GPU error "+str(resp.status_code)}), 502 + if is_stream: + def gen(): + for raw in resp.iter_content(chunk_size=None, decode_unicode=True): + if raw: yield clean_unicode(raw) + bcast() + return Response(stream_with_context(gen()), mimetype="text/event-stream") + data = clean_response(resp.json()) + for c in data.get("choices",[]): + msg = c.get("message",{}) + if not msg.get("content") and msg.get("reasoning_content"): + msg["content"] = msg["reasoning_content"] + data["routing"] = {"model":model,"reason":reason,"gpu":url,"tier":tier,"agent":agent,"latency_ms":lat,"active_gpu":gpu_active_count(model)} + bcast() + return jsonify(data) + if not decremented: + try: gpu_decr(model) + except: pass + except requests.Timeout: + return jsonify({"error":"timeout"}), 504 + log.error("Error: %s\n%s", e, traceback.format_exc()) + return jsonify({"error":str(e)}), 500 + +@app.route("/v1/models") +def models(): return jsonify({"object":"list","data":[{"id":m,"object":"model","owned_by":"syslog","status":check_gpu_health(m).get("status"),"gpu":check_gpu_health(m).get("gpu_name")} for m in GPU_URLS]}) + +@app.route("/health") +def health(): + gpus = {} + for m in GPU_URLS: + h = check_gpu_health(m) + h["active_requests"] = gpu_active_count(m) + h["max_concurrent"] = GPU_MAX_CONCURRENT.get(m, 1) + gpus[m] = h + return jsonify({"status":"healthy","redis":"connected" if r else "down","gpus":gpus,"available_models":available_models()}) + +@app.route("/metrics") +def metrics(): return jsonify(get_metrics()) + +@app.route("/metrics/timeseries") +def metrics_timeseries(): + period = request.args.get("period", "day"); models_list = list(GPU_URLS.keys()) + data = {"models": {}, "labels": []} + if period == "day": + buckets = [time.strftime("%Y%m%d%H", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)] + data["labels"] = [time.strftime("%H:00", time.gmtime(time.time()-h*3600)) for h in range(23,-1,-1)] + elif period == "week": + buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)] + data["labels"] = [time.strftime("%a", time.gmtime(time.time()-d*86400)) for d in range(6,-1,-1)] + else: + buckets = [time.strftime("%Y%m%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)] + data["labels"] = [time.strftime("%m/%d", time.gmtime(time.time()-d*86400)) for d in range(29,-1,-1)] + if r: + for model in models_list: + counts = [] + for bucket in buckets: + total = 0 + if period in ("week","month"): + for hh in range(24): total += int(r.get("ts:"+model+":"+bucket+"{:02d}".format(hh)) or 0) + else: total = int(r.get("ts:"+model+":"+bucket) or 0) + counts.append(total) + data["models"][model] = counts + return jsonify(data) + +@app.route("/stream") +def stream(): + def ev(): + q = queue.Queue() + with sse_lock: sse_subscribers.append(q) + try: + yield "data: "+json.dumps(get_metrics())+"\n\n" + while True: + try: yield "data: "+q.get(timeout=3)+"\n\n" + except queue.Empty: yield "data: "+json.dumps(get_metrics())+"\n\n" + except GeneratorExit: pass + finally: + with sse_lock: + if q in sse_subscribers: sse_subscribers.remove(q) + return Response(stream_with_context(ev()), mimetype="text/event-stream", + headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no","Access-Control-Allow-Origin":"*"}) + +if __name__ == "__main__": + log.info("Router on :9000 (load-aware)") + app.run(host="0.0.0.0", port=9000, debug=False) diff --git a/router/ts_patch.py b/router/ts_patch.py new file mode 100644 index 0000000..9554bd8 --- /dev/null +++ b/router/ts_patch.py @@ -0,0 +1,80 @@ +# Add time-series tracking and endpoint to router +with open('/opt/inference-harness/router/router.py') as f: + code = f.read() + +# Add time-series tracking in the chat handler (after Redis incr) +old_track = '''r.incr('routes:'+model); r.incr('routes:tier:'+tier); r.incr('routes:agent:'+agent) + r.lpush('routes:recent', json.dumps''' +new_track = '''r.incr('routes:'+model); r.incr('routes:tier:'+tier); r.incr('routes:agent:'+agent) + # Time-series: hourly bucket + hour_key = 'ts:'+model+':'+time.strftime('%Y%m%d%H') + r.incr(hour_key) + r.expire(hour_key, 86400*31) # keep 31 days + r.lpush('routes:recent', json.dumps''' + +code = code.replace(old_track, new_track) + +# Add /metrics/timeseries endpoint before if __name__ +ts_endpoint = ''' +@app.route('/metrics/timeseries') +def metrics_timeseries(): + period = request.args.get('period', 'day') + models = list(GPU_URLS.keys()) + data = {'models': {}, 'labels': []} + + if period == 'day': + # Last 24 hours, hourly buckets + buckets = [] + for h in range(23, -1, -1): + t = time.time() - h * 3600 + buckets.append(time.strftime('%Y%m%d%H', time.gmtime(t))) + data['labels'] = [time.strftime('%H:00', time.gmtime(time.time() - h*3600)) for h in range(23, -1, -1)] + elif period == 'week': + # Last 7 days, daily buckets + buckets = [] + for d in range(6, -1, -1): + t = time.time() - d * 86400 + buckets.append(time.strftime('%Y%m%d', time.gmtime(t))) + data['labels'] = [time.strftime('%a', time.gmtime(time.time() - d*86400)) for d in range(6, -1, -1)] + else: + # Month — last 30 days, 3-day buckets + buckets = [] + for d in range(29, -1, -3): + t = time.time() - d * 86400 + buckets.append(time.strftime('%Y%m%d', time.gmtime(t))) + data['labels'] = [time.strftime('%m/%d', time.gmtime(time.time() - d*86400)) for d in range(29, -1, -3)] + + if r: + for model in models: + counts = [] + for bucket in buckets: + if period == 'month': + # Sum 3 consecutive days per bucket + total = 0 + base = time.strptime(bucket, '%Y%m%d') + for offset in range(3): + d = time.strftime('%Y%m%d', time.gmtime(time.mktime(base) + offset*86400)) + total += int(r.get('ts:'+model+':'+d) or 0) + # Also check hourly keys for today + for hh in range(24): + total += int(r.get('ts:'+model+':'+d+'{:02d}'.format(hh)) or 0) + counts.append(total) + else: + key = 'ts:'+model+':'+bucket + if period == 'week': + # Sum all hours in the day + total = sum(int(r.get(key+'{:02d}'.format(h)) or 0) for h in range(24)) + else: + total = int(r.get(key) or 0) + counts.append(total) + data['models'][model] = counts + + return jsonify(data) +''' + +# Insert before if __name__ +code = code.replace(if __name__ == __main__:, ts_endpoint + nif __name__ == __main__:) + +with open('/opt/inference-harness/router/router.py', 'w') as f: + f.write(code) +print('Time-series tracking and endpoint added')