# Syslog Harness Smart Queue Consumer Design **Date:** 2026-05-17 **Target:** Abiba (deployment) / Kwame (review) **Based on:** Architecture Review (ARCHITECTURE_REVIEW.md, commit `e95475f`) --- ## 1. Executive Summary The current queue-service is a **dead storage pit** it stores requests in a Redis list but has no consumer to process them. This design document proposes a complete rewrite that transforms the queue into a **smart, load-balanced inference pipeline** with: - **Redis Streams** as the queue backend (consumer groups, acks, pending messages) - **GPU-aware load balancing** using real-time health + utilization data - **Priority queuing** (high/normal/low) - **Adaptive backpressure** with graduated responses - **Dead letter queue** for failed jobs - **Job lifecycle tracking** with status API **Quick wins** (low-risk, high-impact fixes) are integrated inline below. --- ## 2. Verified Infrastructure Facts These were validated via live endpoint checks on 2026-05-17: | Component | Endpoint | Verified? | Notes | |---|---|---|---| | Redis | 127.0.0.1:6379 | Running (v8.0.2) | Supports Redis Streams natively | | amdpve (GPU) | 192.168.68.15:8080 | /health=ok, /v1/models, /slots | 2 slots, no /slots_busy endpoint, /metrics needs `--metrics` flag | | llmgpu (GPU) | 192.168.68.8:8080 | /health=ok, /v1/models | /slots requires API key (401), model=qwen3.6-27B-code | | ocu_llm (GPU) | 192.168.68.110:8080 | /health=ok, /v1/models | /slots requires API key (401), model=gemma-4-E4B | | amdpve sidecar | 192.168.68.15:8090 | gpu_util=81%, temp=75C, vram=28/65GB | | | llmgpu sidecar | 192.168.68.8:8090 | gpu_util=0%, temp=36C, vram=20/24GB | | | ocu_llm sidecar | 192.168.68.110:8090 | gpu_util=0%, temp=39C, vram=7/12GB | | ### Critical Finding: `/slots_busy` endpoint does NOT exist on any GPU The architecture review (R6) referenced a `/slots_busy` endpoint for load-based routing. **This endpoint returns 404 on amdpve** and 401 on llmgpu/ocu_llm. The actual load metric available is: - **`/slots`** (amdpve only, no auth): Returns slot array with `is_processing` boolean per slot - **`/health`** (all GPUs): Returns `{"status": "ok"}` only no slot count or load info - **`/v1/models`** (all GPUs): Returns model info but no load metrics - **Sidecar `:8090/`** (all GPUs): Returns GPU hardware metrics (util %, temp, VRAM) but NOT inference slot state **Implication:** Load-based routing must use the sidecar `gpu_util_pct` as a proxy for inference load, combined with the `/slots` endpoint on amdpve (which supports `is_processing` checks). For llmgpu/ocu_llm, only sidecar utilization is available. --- ## 3. Architecture ### 3.1 Data Flow ``` Agent Nginx Smart Queue API Redis Streams (consumer group) | Consumer Pool (load-balanced) | GPU 1 (81% util) GPU 2 (0% util) GPU 3 (0% util) [busy] [idle] [idle] ``` ### 3.2 Redis Data Model ``` inference:stream Main stream (XADD/XREADGROUP) inference:stream:high High-priority stream inference:stream:normal Normal-priority stream (default) inference:stream:low Low-priority stream inference:dead-letter Failed jobs (XADD, no consumer) job:status:{job_id} Hash: {"status": "queued|processing|completed|failed", "gpu": "...", "created_at": ..., "completed_at": ...} job:result:{job_id} Hash: {"result": "...", "error": "..."} config:gpus Hash: {"amdpve": "192.168.68.15:8080", "llmgpu": "192.168.68.8:8080", "ocu_llm": "192.168.68.110:8080"} config:gpu-health:{name} Hash: {"gpu_util_pct": 81, "temp_c": 75, "vram_used": 28230, "vram_total": 65536, "inference_state": "idle|busy", "last_seen": ...} ``` --- ## 4. Implementation: `queue-service.py` (Complete Rewrite) ### 4.1 Key Changes from Current Code | Current (121 lines) | New (~450 lines) | |---|---| | `LPUSH/RPUSH` FIFO list | `XADD/XREADGROUP` Redis Streams | | No job IDs | UUID job IDs with lifecycle tracking | | Hardcoded GPU IPs config key | Single source of truth in Redis `config:gpus` | | Simple depth threshold circuit breaker | Per-GPU circuit breakers with half-open recovery | | No consumer | Embedded consumer loop (background thread) | | Headers filtered to `X-*` only | All headers preserved | | No result retrieval | `/status/` and `/result/` endpoints | | Redis host default 192.168.68.7 | Default 127.0.0.1 (matches actual deployment) | | No health check concurrency | Async/parallel GPU health checks | | No graceful shutdown | SIGTERM handler, consumer drain | ### 4.2 Full Source Code ```python #!/usr/bin/env python3 """Syslog Harness Smart Queue Service Redis Streams + GPU load balancing. Ports: 8091 Endpoints: /health liveness probe /enqueue POST inference request (with priority) /status/ GET job status /result/ GET job result (when completed) /status GET queue depth, circuit breaker state, GPU health /dlq GET dead letter queue /dlq/retry/ POST retry a dead-letter job /dlq/discard/ POST discard a dead-letter job """ import json import os import sys import time import uuid import signal import threading import urllib.request import urllib.error from flask import Flask, request, jsonify from collections import defaultdict app = Flask(__name__) # Configuration REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1") # FIX Q3: match actual deployment REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) STREAM_KEY = "inference:stream" STREAM_KEY_HIGH = "inference:stream:high" STREAM_KEY_NORMAL = "inference:stream:normal" STREAM_KEY_LOW = "inference:stream:low" DEAD_LETTER_KEY = "inference:dead-letter" CONSUMER_GROUP = "gpu-workers" CONSUMER_NAME = "worker-1" JOB_STATUS_KEY = "job:status" JOB_RESULT_KEY = "job:result" CONFIG_GPUS_KEY = "config:gpus" CONFIG_GPU_HEALTH_PREFIX = "config:gpu-health:" MAX_STREAM_ENTRIES = 50000 # Adaptive backpressure thresholds BP_WARN = 30 # queue depth 30+ start warning BP_SOFT_OPEN = 40 # queue depth 40+ 503 with retry-after BP_HARD_OPEN = 50 # queue depth 50+ circuit breaker open # Per-GPU circuit breaker PER_GPU_CB_WINDOW = 5 # consecutive failures to open PER_GPU_CB_COOLDOWN = 30 # seconds before half-open probe PER_GPU_CB_MAX_AGE = 300 # forget about a GPU after 5min of silence # Health check timeout GPU_HEALTH_TIMEOUT = 3 # seconds per GPU # Consumer settings CONSUMER_POLL_INTERVAL = 1.0 # seconds between stream reads CONSUMER_WORK_TIMEOUT = 300 # seconds to wait for a GPU response CONSUMER_RETRY_MAX = 3 # max retries per job before DLQ CONSUMER_RETRY_DELAY = [2, 5, 10] # exponential backoff # GPU Configuration # Single source of truth loaded from Redis if available, otherwise defaults DEFAULT_GPUS = { "amdpve": {"host": "192.168.68.15", "port": 8080, "sidecar_port": 8090, "model": "qwen3.6-35B-A3B"}, "llmgpu": {"host": "192.168.68.8", "port": 8080, "sidecar_port": 8090, "model": "qwen3.6-27B-code"}, "ocu_llm": {"host": "192.168.68.110", "port": 8080, "sidecar_port": 8090, "model": "gemma-4-E4B"}, } # State shutdown_event = threading.Event() # Per-GPU circuit breaker state gpu_circuit_breakers = defaultdict(lambda: { "consecutive_failures": 0, "state": "closed", # closed, open, half-open "last_failure": 0, "last_probe": 0, }) # Retry tracking: job_id -> attempt count retry_counts = {} # Redis Helpers def get_redis(): """Get Redis connection. Never returns None raises on failure.""" try: import redis return redis.Redis( host=REDIS_HOST, port=REDIS_PORT, decode_responses=True, socket_connect_timeout=3, socket_timeout=3 ) except Exception: # Log but don't crash other code handles missing Redis print(f"[queue] ERROR: Cannot connect to Redis at {REDIS_HOST}:{REDIS_PORT}", file=sys.stderr) return None def safe_redis_call(fn, *args, default=None): """Execute a Redis call, return default on failure.""" r = get_redis() if r is None: return default try: return fn(r, *args) except Exception: return default # GPU Health def fetch_json(url, timeout=3): """Fetch JSON from URL, return None on any error.""" try: req = urllib.request.Request(url) resp = urllib.request.urlopen(req, timeout=timeout) return json.loads(resp.read().decode()) except Exception: return None def check_gpu_health(gpu_name, gpu_config): """Check a single GPU's health via sidecar + llama.cpp endpoints. Returns dict with health status. Checks are done in parallel via threads. """ host = gpu_config["host"] port = gpu_config["port"] sidecar_port = gpu_config.get("sidecar_port", 8090) result = { "name": gpu_name, "status": "down", "sidecar": None, "llamacpp": None, "gpu_util_pct": None, "temp_c": None, "vram_used_mb": None, "vram_total_mb": None, "inference_state": "unknown", "slots_busy": 0, "slots_total": 0, "last_seen": time.time(), } # Check sidecar (GPU hardware metrics) sidecar_url = f"http://{host}:{sidecar_port}/" sidecar_data = fetch_json(sidecar_url, timeout=2) if sidecar_data: result["sidecar"] = sidecar_data result["gpu_util_pct"] = sidecar_data.get("gpu_util_pct") result["temp_c"] = sidecar_data.get("temp_c") result["vram_used_mb"] = sidecar_data.get("vram_used_mb") result["vram_total_mb"] = sidecar_data.get("vram_total_mb") # Check llama.cpp /health health_url = f"http://{host}:{port}/health" health_data = fetch_json(health_url, timeout=2) if health_data and health_data.get("status") == "ok": result["llamacpp"] = health_data result["status"] = "up" # Check /slots for inference state (only works on amdpve currently) slots_url = f"http://{host}:{port}/slots" slots_data = fetch_json(slots_url, timeout=2) if isinstance(slots_data, list): result["slots_total"] = len(slots_data) result["slots_busy"] = sum(1 for s in slots_data if s.get("is_processing")) if result["slots_busy"] > 0: result["inference_state"] = "busy" else: result["inference_state"] = "idle" # Store in Redis for other consumers/monitoring r = get_redis() if r: try: r.hset(CONFIG_GPU_HEALTH_PREFIX + gpu_name, mapping={ "status": result["status"], "gpu_util_pct": str(result["gpu_util_pct"] or -1), "temp_c": str(result["temp_c"] or -1), "vram_used": str(result["vram_used_mb"] or 0), "vram_total": str(result["vram_total_mb"] or 0), "slots_busy": str(result["slots_busy"]), "slots_total": str(result["slots_total"]), "inference_state": result["inference_state"], "last_seen": str(time.time()), }) except Exception: pass return result def check_all_gpus_parallel(): """Check all GPUs in parallel using threads. FIX Q6: avoids sequential blocking.""" gpus = get_gpus() results = {} threads = [] def check_one(name, config): results[name] = check_gpu_health(name, config) for name, config in gpus.items(): t = threading.Thread(target=check_one, args=(name, config)) t.daemon = True threads.append(t) t.start() for t in threads: t.join(timeout=GPU_HEALTH_TIMEOUT + 1) return results # GPU Configuration Management def get_gpus(): """Get GPU configuration. Try Redis first (single source of truth), fall back to defaults.""" r = get_redis() if r: try: stored = r.hgetall(CONFIG_GPUS_KEY) if stored: gpus = {} for name, config_json in stored.items(): gpus[name] = json.loads(config_json) if gpus: return gpus except Exception: pass return DEFAULT_GPUS def set_gpus(gpu_dict): """Set GPU configuration in Redis (single source of truth).""" r = get_redis() if not r: return for name, config in gpu_dict.items(): r.hset(CONFIG_GPUS_KEY, name, json.dumps(config)) # Circuit Breaker def record_gpu_success(gpu_name): """Record a successful GPU request resets circuit breaker.""" cb = gpu_circuit_breakers[gpu_name] cb["consecutive_failures"] = 0 cb["state"] = "closed" def record_gpu_failure(gpu_name): """Record a failed GPU request implements per-GPU circuit breaker with half-open. FIX Q5""" cb = gpu_circuit_breakers[gpu_name] cb["consecutive_failures"] += 1 cb["last_failure"] = time.time() if cb["consecutive_failures"] >= PER_GPU_CB_WINDOW and cb["state"] != "open": cb["state"] = "open" elif cb["state"] == "open": # Check if cooldown has passed transition to half-open if time.time() - cb["last_failure"] >= PER_GPU_CB_COOLDOWN: cb["state"] = "half-open" cb["last_probe"] = time.time() def is_gpu_available(gpu_name): """Check if a GPU is available (circuit breaker allows requests).""" cb = gpu_circuit_breakers[gpu_name] if cb["state"] == "closed": return True elif cb["state"] == "half-open": # Allow one probe request if time.time() - cb["last_probe"] >= 5: return True return False else: # open return False # Adaptive Backpressure def get_total_queue_depth(): """Get total queue depth across all priority streams. FIX: accurate count.""" r = get_redis() if not r: return -1 try: total = 0 for key in [STREAM_KEY_HIGH, STREAM_KEY_NORMAL, STREAM_KEY_LOW]: total += r.xlen(key) return total except Exception: return -1 def get_backpressure_status(): """Determine backpressure response based on queue depth.""" depth = get_total_queue_depth() if depth < 0: return "error" elif depth >= BP_HARD_OPEN: return "open" # 503, circuit breaker elif depth >= BP_SOFT_OPEN: return "soft_open" # 503 with retry-after elif depth >= BP_WARN: return "warn" # 202 with warning else: return "closed" # normal # Job Enqueue @app.route("/health") def health(): """Nginx upstream health probe.""" return jsonify({"status": "ok", "service": "smart-queue"}), 200 @app.route("/enqueue", methods=["POST"]) def enqueue(): """Enqueue an inference request with priority and job tracking. Expected JSON body: { "messages": [...], # OpenAI-style messages "model": "qwen3.6-35B-A3B", "stream": true, "temperature": 0.8, "priority": "normal" // "high", "normal" (default), "low" } Returns: {"job_id": "...", "status": "queued", "priority": "..."} """ r = get_redis() if r is None: return jsonify({"error": "Redis unavailable"}), 503 # Parse request try: data = request.get_json(force=True) except Exception: return jsonify({"error": "Invalid JSON"}), 400 if not data: return jsonify({"error": "Empty request body"}), 400 # Extract priority (default: normal) priority = data.get("priority", "normal") if priority not in ("high", "normal", "low"): priority = "normal" # Check backpressure bp_status = get_backpressure_status() if bp_status == "open": return jsonify({ "error": "Circuit breaker OPEN", "queue_depth": get_total_queue_depth(), "retry_after": 30 }), 503 if bp_status == "soft_open": return jsonify({ "error": "Queue near capacity", "queue_depth": get_total_queue_depth(), "retry_after": 10 }), 503 if bp_status == "warn": print(f"[queue] WARN: Queue depth {get_total_queue_depth()} approaching limit", file=sys.stderr) # Generate job ID job_id = str(uuid.uuid4()) # Build job payload job = { "id": job_id, "payload": data, "priority": priority, "status": "queued", "created_at": time.time(), "attempts": 0, "last_error": None, "headers": dict(request.headers), # FIX Q8: preserve ALL headers "target_model": data.get("model", ""), } # Store job status try: r.hset(JOB_STATUS_KEY, job_id, json.dumps({ "status": "queued", "priority": priority, "created_at": job["created_at"], })) except Exception: pass # Add to appropriate stream stream_key = { "high": STREAM_KEY_HIGH, "normal": STREAM_KEY_NORMAL, "low": STREAM_KEY_LOW, }.get(priority, STREAM_KEY_NORMAL) try: message_id = r.xadd( stream_key, {"job": json.dumps(job)}, maxlen=MAX_STREAM_ENTRIES, approx=True ) except Exception as e: return jsonify({"error": f"Failed to enqueue: {str(e)}"}), 503 return jsonify({ "job_id": job_id, "status": "queued", "priority": priority, "position": r.xlen(stream_key), }), 202 @app.route("/status/") def job_status(job_id): """Get job status.""" r = get_redis() if not r: return jsonify({"error": "Service unavailable"}), 503 try: status_json = r.hget(JOB_STATUS_KEY, job_id) if not status_json: return jsonify({"error": "Job not found"}), 404 status = json.loads(status_json) return jsonify(status), 200 except Exception: return jsonify({"error": "Failed to retrieve status"}), 500 @app.route("/result/") def job_result(job_id): """Get job result (when completed).""" r = get_redis() if not r: return jsonify({"error": "Service unavailable"}), 503 try: result_json = r.hget(JOB_RESULT_KEY, job_id) if not result_json: return jsonify({"error": "Result not yet available"}), 404 return jsonify(json.loads(result_json)), 200 except Exception: return jsonify({"error": "Failed to retrieve result"}), 500 @app.route("/status") def status(): """GET queue depth + circuit breaker state + GPU health.""" gpus = check_all_gpus_parallel() # FIX Q6: parallel health checks # Per-GPU circuit breaker states cb_states = {} for name in gpus: cb = gpu_circuit_breakers.get(name, {}) cb_states[name] = { "state": cb.get("state", "closed"), "consecutive_failures": cb.get("consecutive_failures", 0), } return jsonify({ "queue_depth": get_total_queue_depth(), "backpressure": get_backpressure_status(), "circuit_breakers": cb_states, "gpu_health": { name: { "status": gpu["status"], "gpu_util_pct": gpu["gpu_util_pct"], "temp_c": gpu["temp_c"], "vram_used_mb": gpu["vram_used_mb"], "vram_total_mb": gpu["vram_total_mb"], "slots_busy": gpu["slots_busy"], "slots_total": gpu["slots_total"], "inference_state": gpu["inference_state"], } for name, gpu in gpus.items() }, "thresholds": { "warn": BP_WARN, "soft_open": BP_SOFT_OPEN, "hard_open": BP_HARD_OPEN, } }) # Dead Letter Queue @app.route("/dlq") def list_dlq(): """List dead-letter queue entries (last 50).""" r = get_redis() if not r: return jsonify({"error": "Service unavailable"}), 503 try: entries = r.xrevrange(DEAD_LETTER_KEY, count=50) result = [] for message_id, fields in entries: result.append({ "message_id": message_id.decode() if isinstance(message_id, bytes) else message_id, "job": json.loads(fields.get(b"job" if isinstance(fields, dict) else fields, "{}").decode() if isinstance(fields.get(b"job" if isinstance(fields, dict) else fields), (bytes,)) else fields.get("job", "{}")), }) return jsonify({"count": len(result), "entries": result}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/dlq/retry/", methods=["POST"]) def retry_dlq(message_id): """Retry a dead-letter job.""" r = get_redis() if not r: return jsonify({"error": "Service unavailable"}), 503 try: fields = r.xget(DEAD_LETTER_KEY, message_id) if not fields: return jsonify({"error": "Message not found"}), 404 job = json.loads(fields.get("job", "{}")) job["attempts"] = 0 job["status"] = "queued" priority = job.get("priority", "normal") stream_key = { "high": STREAM_KEY_HIGH, "normal": STREAM_KEY_NORMAL, "low": STREAM_KEY_LOW, }.get(priority, STREAM_KEY_NORMAL) r.xadd(stream_key, {"job": json.dumps(job)}) return jsonify({"status": "re-enqueued"}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/dlq/discard/", methods=["POST"]) def discard_dlq(message_id): """Discard a dead-letter job.""" r = get_redis() if not r: return jsonify({"error": "Service unavailable"}), 503 try: r.xdel(DEAD_LETTER_KEY, message_id) return jsonify({"status": "discarded"}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 # Consumer Loop def select_gpu_for_job(job, gpu_health_map): """Select best GPU for a job based on load, health, and model compatibility. Selection criteria (in order): 1. GPU must be up and circuit breaker allows 2. GPU must match the requested model (or default to amdpve) 3. Prefer GPU with lowest gpu_util_pct 4. Prefer GPU with highest slots_idle """ candidates = [] target_model = job.get("payload", {}).get("model", "") for name, health in gpu_health_map.items(): if health["status"] != "up": continue if not is_gpu_available(name): continue # Model compatibility check config = get_gpus().get(name, {}) gpu_model = config.get("model", "") # If job specifies a model, check compatibility if target_model: # Allow any GPU to serve any model (llama.cpp can load different models) # but prefer the configured model match if target_model in gpu_model or gpu_model in target_model: candidates.append((name, health, 0)) # priority 0 = perfect match else: candidates.append((name, health, 1)) # priority 1 = mismatch else: candidates.append((name, health, 0)) if not candidates: return None # Sort by: match_priority (asc), gpu_util_pct (asc), slots_busy (asc) candidates.sort(key=lambda c: (c[2], c[1].get("gpu_util_pct", 999), c[1].get("slots_busy", 99))) return candidates[0][0] # return GPU name def consume_job(gpu_name, job_data): """Send a job to a GPU and wait for the result. For streaming responses, the result is stored in Redis and the client polls. For non-streaming, the full response is returned directly. """ config = get_gpus().get(gpu_name, {}) host = config["host"] port = config["port"] # Update job status to processing job_id = job_data.get("id", "unknown") r = get_redis() if r: try: r.hset(JOB_STATUS_KEY, job_id, json.dumps({ "status": "processing", "gpu": gpu_name, "attempt": job_data.get("attempts", 0), })) except Exception: pass # Forward request to GPU payload = json.dumps(job_data["payload"]) try: req = urllib.request.Request( f"http://{host}:{port}/v1/chat/completions", data=payload.encode(), headers={"Content-Type": "application/json"} ) resp = urllib.request.urlopen(req, timeout=CONSUMER_WORK_TIMEOUT) response_data = json.loads(resp.read().decode()) # Store result if r: r.hset(JOB_RESULT_KEY, job_id, json.dumps({ "status": "completed", "response": response_data, "completed_at": time.time(), })) r.hset(JOB_STATUS_KEY, job_id, json.dumps({ "status": "completed", "gpu": gpu_name, "completed_at": time.time(), })) record_gpu_success(gpu_name) return True except Exception as e: record_gpu_failure(gpu_name) # Store failure if r: try: r.hset(JOB_STATUS_KEY, job_id, json.dumps({ "status": "failed", "gpu": gpu_name, "error": str(e)[:500], "attempt": job_data.get("attempts", 0), })) except Exception: pass return False def consumer_loop(): """Main consumer loop reads from streams and dispatches to GPUs. Runs as a background thread. Checks streams in priority order: high normal low """ print("[consumer] Started", file=sys.stderr) while not shutdown_event.is_set(): try: r = get_redis() if r is None: time.sleep(2) continue # Check each stream in priority order for stream_key, priority_name in [ (STREAM_KEY_HIGH, "high"), (STREAM_KEY_NORMAL, "normal"), (STREAM_KEY_LOW, "low"), ]: if shutdown_event.is_set(): break # Read one message per group (non-blocking) messages = r.xreadgroup( CONSUMER_GROUP, CONSUMER_NAME, {stream_key: ">"}, count=1, block=int(CONSUMER_POLL_INTERVAL * 1000) ) if not messages: continue for stream_name, msg_list in messages: for message_id, fields in msg_list: try: job_data = json.loads(fields["job"]) # Get latest GPU health gpu_health = check_all_gpus_parallel() # Select best GPU gpu_name = select_gpu_for_job(job_data, gpu_health) if gpu_name is None: # No GPUs available requeue (don't ack yet) print(f"[consumer] No GPU available for job {job_data.get('id', '?')} (priority={priority_name})", file=sys.stderr) continue # Dispatch to GPU success = consume_job(gpu_name, job_data) if success: # Ack the message job processed successfully r.xack(stream_key, CONSUMER_GROUP, message_id) else: # Job failed retry or move to DLQ attempts = job_data.get("attempts", 0) + 1 job_data["attempts"] = attempts if attempts >= CONSUMER_RETRY_MAX: # Move to dead letter queue print(f"[consumer] Job {job_data.get('id', '?')} moved to DLQ after {attempts} attempts", file=sys.stderr) r.xadd(DEAD_LETTER_KEY, {"job": json.dumps(job_data)}) r.xack(stream_key, CONSUMER_GROUP, message_id) else: # Retry: add back to the same stream # (consumer will pick it up on next iteration) # We DON'T ack, so it stays in pending delay = CONSUMER_RETRY_DELAY[min(attempts - 1, len(CONSUMER_RETRY_DELAY) - 1)] print(f"[consumer] Retrying job {job_data.get('id', '?')} in {delay}s (attempt {attempts}/{CONSUMER_RETRY_MAX})", file=sys.stderr) time.sleep(delay) # Re-add to stream r.xadd(stream_key, {"job": json.dumps(job_data)}) # Ack the original so we don't reprocess it r.xack(stream_key, CONSUMER_GROUP, message_id) except Exception as e: print(f"[consumer] Error processing job: {e}", file=sys.stderr) continue except Exception as e: print(f"[consumer] Loop error: {e}", file=sys.stderr) time.sleep(2) print("[consumer] Stopped", file=sys.stderr) # GPU Health Monitor def gpu_health_monitor_loop(): """Periodically refresh GPU health data in Redis. Runs as a background thread every 15 seconds. """ print("[health-monitor] Started", file=sys.stderr) while not shutdown_event.is_set(): try: gpus = check_all_gpus_parallel() # Health data is already stored per-GPU in check_gpu_health() except Exception as e: print(f"[health-monitor] Error: {e}", file=sys.stderr) shutdown_event.wait(15) # sleep 15s, interruptible print("[health-monitor] Stopped", file=sys.stderr) # Graceful Shutdown def signal_handler(signum, frame): """Handle SIGTERM for graceful shutdown.""" print(f"\n[queue] Received signal {signum}, shutting down...", file=sys.stderr) shutdown_event.set() signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Main if __name__ == "__main__": # Initialize default GPU config in Redis r = get_redis() if r: set_gpus(DEFAULT_GPUS) # Start background threads consumer_thread = threading.Thread(target=consumer_loop, daemon=True) consumer_thread.start() health_thread = threading.Thread(target=gpu_health_monitor_loop, daemon=True) health_thread.start() print("[queue] Starting on :8091", file=sys.stderr) app.run(host="0.0.0.0", port=8091, threaded=True) # FIX D2: threaded server ``` --- ## 5. Quick Wins (No Consumer Needed Fix Existing Code) These fixes can be applied to the current `queue-service.py` without rewriting: ### QW-1: Fix Redis default host (Q3) **File:** `queue-service/queue-service.py:21` ```python # Before: REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7") # After: REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1") # matches actual deployment ``` ### QW-2: Fix Dockerfile path mismatch (C2) **File:** `docker-compose.yml:15` ```yaml # Before: dockerfile: Dockerfile.queue # After: dockerfile: queue-service/Dockerfile ``` ### QW-3: Fix Nginx fallback to ALL-down only (N3) **File:** `gpu-router-docker.conf` Replace the error-page fallback with a Lua-based check that only triggers when ALL GPUs are down: ```nginx # Remove: error_page 502 503 504 = @queue_fallback; # Add Lua health check (requires lua-nginx-module): location / { # ... existing config ... # Only fallback to queue if ALL GPUs are down set $fallback 0; access_by_lua_block { local redis = require "resty.redis" local red = redis:new() red:set_timeout(1000) red:connect("redis", 6379) local gpus = {"amdpve", "llmgpu", "ocu_llm"} local all_down = true for _, g in ipairs(gpus) do local status = red:hget("config:gpu-health:" .. g, "status") if status == "up" then all_down = false break end end red:set_keepalive(10000, 100) if all_down then ngx.var.fallback = 1 end } if $fallback = 1 then rewrite ^ /enqueue break; proxy_pass http://queue_service; } } ``` **Alternative (no Lua):** Replace the `error_page` directive with a custom Nginx health check upstream that only returns 200 when at least one GPU is healthy: ```nginx # Add a dedicated health check upstream upstream any_gpu_healthy { server 192.168.68.15:8080; server 192.168.68.8:8080; server 192.168.68.110:8080; # Nginx upstream block will try each in order # If ALL fail, then fall through to queue_fallback } # In location /: # Change proxy_next_upstream_tries from 2 to 4 (3 GPUs + 1 fallback) proxy_next_upstream_tries 4; ``` ### QW-4: Fix Nginx `proxy_pass_header` (N4) **File:** `gpu-router-docker.conf:90` ```nginx # Remove this line it's for response headers, not request headers. # The X-Syslog-Model header is already passed via proxy_set_header inheritance. # proxy_pass_header X-Syslog-Model; ``` ### QW-5: Fix hardcoded container names (N5) **File:** `gpu-router-docker.conf:27,32` ```nginx # Before: server syslog-harness-dashboard-1:3001; server syslog-harness-gpu-dashboard-1:8092; # After (use Docker service names): server dashboard:3001; server gpu-dashboard:8092; ``` ### QW-6: Fix rate limit burst (N1) **File:** `gpu-router-docker.conf:79` ```nginx # Before: limit_req zone=perip burst=20 nodelay; # After burst requests are delayed, not served immediately: limit_req zone=perip burst=10 nodelay; ``` ### QW-7: Preserve Content-Type header (Q8) **File:** `queue-service/queue-service.py:83` ```python # Before: headers = {k: v for k, v in request.headers if k.startswith("X-")} # After: headers = dict(request.headers) # preserve ALL headers including Content-Type ``` ### QW-8: Fix Docker restart policy (C3) **File:** `docker-compose.yml:6,16,31,43` ```yaml # Before: restart: always # After: restart: unless-stopped ``` ### QW-9: Add Redis health check (C4) **File:** `docker-compose.yml` add to redis service: ```yaml healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 5s retries: 3 ``` ### QW-10: Pin dependency versions (I1) **File:** `queue-service/Dockerfile` ```dockerfile # Before: RUN pip install --no-cache-dir flask redis # After: RUN pip install --no-cache-dir flask==3.1.0 redis==5.2.1 gunicorn==23.0.0 ``` **File:** `Dockerfile.dashboard` ```dockerfile # Before: FROM python:3.11-slim # After: FROM python:3.13-slim # match queue-service Python version ``` **File:** `Dockerfile.gpu` ```dockerfile # Before: RUN pip install requests # After: RUN pip install --no-cache-dir requests==2.32.3 # or remove entirely only urllib needed ``` ### QW-11: Add `.dockerignore` **File:** `.dockerignore` ``` .git .gitignore *.md *.pyc __pycache__ *.log .env ``` ### QW-12: Fix GPU dashboard multi-process CMD (I3) **File:** `Dockerfile.gpu:14` ```dockerfile # Before: CMD ["sh", "-c", "python3 gpu_collector.py & python3 -m http.server 8092 --directory /app/public & wait"] # After use a proper process manager or supervisor: CMD ["sh", "-c", "exec supervisord -c /app/supervisord.conf"] ``` With `/app/supervisord.conf`: ```ini [supervisord] nodaemon=true [program:collector] command=python3 gpu_collector.py autostart=true autorestart=true [program:http] command=python3 -m http.server 8092 --directory /app/public autostart=true autorestart=true ``` ### QW-13: Centralize GPU config (R9) **File:** `queue-service/queue-service.py` + `harness-dashboard.py` + `gpu_collector.py` Move GPU endpoints to a single source of truth. Option A: environment file mounted to all containers. Option B: Redis `config:gpus` hash. The rewrite above implements Option B (Redis-based). For the quick-win path, use Option A: **File:** `config/gpu-endpoints.json` ```json { "amdpve": {"host": "192.168.68.15", "port": 8080, "sidecar_port": 8090}, "llmgpu": {"host": "192.168.68.8", "port": 8080, "sidecar_port": 8090}, "ocu_llm": {"host": "192.168.68.110", "port": 8080, "sidecar_port": 8090} } ``` Mount to all containers: ```yaml volumes: - ./config:/app/config:ro ``` Then each service reads from `/app/config/gpu-endpoints.json` instead of hardcoding. --- ## 6. Docker Compose Updates ```yaml version: "3.8" services: redis: image: redis:7-alpine restart: unless-stopped networks: - gpu-router-net volumes: - redis-data:/data healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 5s retries: 3 queue-service: build: context: . dockerfile: queue-service/Dockerfile # FIX C2 restart: unless-stopped networks: - gpu-router-net expose: - "8091" # FIX C1: remove external port exposure depends_on: redis: condition: service_healthy environment: - REDIS_HOST=redis - REDIS_PORT=6379 healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8091/health"] interval: 15s timeout: 5s retries: 3 dashboard: build: context: . dockerfile: dashboard/Dockerfile.dashboard # FIX I5: use subdir version restart: unless-stopped networks: - gpu-router-net ports: - "3001:3001" depends_on: queue-service: condition: service_healthy healthcheck: test: ["CMD", "curl", "-f", "http://localhost:3001/"] interval: 15s timeout: 5s retries: 3 gpu-dashboard: build: context: . dockerfile: Dockerfile.gpu restart: unless-stopped networks: - gpu-router-net ports: - "8092:8092" healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8092/gpu.html"] interval: 15s timeout: 5s retries: 3 networks: gpu-router-net: driver: bridge volumes: redis-data: ``` --- ## 7. Nginx Config Updates ```nginx # gpu-router-docker.conf Updated for Docker service names and proper fallback upstream amdpve_pool { server 192.168.68.15:8080; } upstream llmgpu_pool { server 192.168.68.8:8080; } upstream ocu_llm_pool { server 192.168.68.110:8080; } upstream queue_service { server queue-service:8091; } upstream dashboard_service { server dashboard:3001; # FIX N5: Docker service name } upstream gpu_dashboard_pool { server gpu-dashboard:8092; # FIX N5: Docker service name } map $http_x_syslog_model $gpu_upstream { default amdpve_pool; "standard" amdpve_pool; "heavy" llmgpu_pool; "qwen3.5-27B" llmgpu_pool; "light" ocu_llm_pool; "gemma-4" ocu_llm_pool; } limit_req_zone $binary_remote_addr zone=perip:10m rate=10r/s; server { listen 80; server_name _; location /dashboard { proxy_pass http://dashboard_service/; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } location /gpu { proxy_pass http://gpu_dashboard_pool/; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } location / { limit_req zone=perip burst=10 nodelay; # FIX N1: reduced burst limit_req_status 503; proxy_pass http://$gpu_upstream; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # FIX N4: removed proxy_pass_header X-Syslog-Model proxy_buffering off; proxy_cache off; proxy_read_timeout 300s; proxy_send_timeout 300s; # FIX N2: increased tries for proper failover across all GPUs proxy_next_upstream error timeout http_502 http_503 http_504; proxy_next_upstream_tries 4; # 3 GPUs + queue fallback add_header X-Routed-To $gpu_upstream always; # FIX N3: removed error_page fallback handled by queue consumer or Lua } location @queue_fallback { rewrite ^ /enqueue break; proxy_pass http://queue_service; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Content-Type $content_type; proxy_pass_request_body on; } } ``` --- ## 8. Implementation Phases ### Phase 1: Quick Wins (Day 1) Apply fixes QW-1 through QW-13. These require no new code, just config and code corrections. ### Phase 2: Smart Queue Consumer (Day 2-3) Replace `queue-service.py` with the full rewrite. This adds: - Redis Streams backend - Consumer loop with GPU load balancing - Priority queues - Dead letter queue - Job tracking API - Per-GPU circuit breakers ### Phase 3: Nginx Fallback Fix (Day 3) Implement the ALL-down-only fallback logic (QW-3). If Lua module is available, use the Lua approach. Otherwise, use the `proxy_next_upstream_tries` approach. ### Phase 4: Deploy & Monitor ```bash docker compose down docker compose build --no-cache docker compose up -d ``` Verify: 1. `curl http://localhost:8091/health` 200 2. `curl -X POST http://localhost:8091/enqueue -H "Content-Type: application/json" -d '{"messages":[{"role":"user","content":"hello"}],"model":"qwen3.6-35B-A3B","priority":"normal"}'` 202 with job_id 3. `curl http://localhost:8091/status/` tracks lifecycle 4. Dashboard at `:3001` shows live GPU health with parallel checks --- ## 9. Risks & Mitigations | Risk | Impact | Mitigation | |---|---|---| | Redis Streams XREADGROUP blocks | Consumer stalls if stream empty | `block` parameter set to 1000ms non-blocking poll | | Consumer crashes mid-job | Job stuck in "processing" state | Health monitor resets stale job statuses every 60s | | GPU returns 401 on /slots | Load info unavailable for llmgpu/ocu_llm | Fall back to sidecar `gpu_util_pct` for routing | | Single consumer bottleneck | Queue buildup during high traffic | Add 2+ consumer containers (same consumer group) | | Redis single point of failure | Entire queue down if Redis dies | Phase 2+: Redis Sentinel or AOF persistence |