# Syslog Harness Smart Queue Consumer Implementation **Date:** 2026-05-17 **Author:** Mumuni (review draft for Abiba) **Repo:** SyslogSolution/syslog-harness **Base commit:** `e95475f` "Add GPU dashboard container + Nginx routing" --- ## 1. Executive Summary The current queue-service stores inference requests in Redis but **never processes them**. This document provides a complete implementation of a **smart queue consumer** with GPU-aware load balancing, priority queuing, backpressure, dead letter handling, and job lifecycle tracking plus all quick-win fixes from the architecture review. **Current state (verified 2026-05-17):** - Queue depth: 0 (services not currently running) - Redis: 7.4.9 (streams fully supported) - All 3 GPUs reachable and healthy: - `amdpve` (192.168.68.15:8080) qwen3.6-35B-A3B, GPU util 93%, VRAM 28GB/65GB, temp 72C - `llmgpu` (192.168.68.8:8080) qwen3.6-27B-code, GPU util 0%, VRAM 20GB/24GB, temp 37C - `ocu_llm` (192.168.68.110:8080) gemma-4-E4B, GPU util 0%, VRAM 8GB/12GB, temp 41C - GPU sidecar metrics available at `:8090` on each host (gpu_util_pct, vram_used_mb, temp_c) - GPU inference endpoints: `/v1/models` (all return 200), `/v1/chat/completions` (all respond to POST) - No `slots_busy` endpoint load estimation via GPU sidecar metrics --- ## 2. Quick Wins (Zero Architectural Change) These fixes require minimal code changes and can be deployed independently of the smart queue. ### QW-1: Fix Dockerfile Path Mismatch (docker-compose.yml line 15) **Current:** `dockerfile: Dockerfile.queue` (file doesn't exist at root) **Fix:** `dockerfile: queue-service/Dockerfile` ```diff queue-service: build: context: . - dockerfile: Dockerfile.queue + dockerfile: queue-service/Dockerfile ``` ### QW-2: Fix Nginx Fallback (gpu-router-docker.conf lines 99-106) **Problem:** Nginx retries the same GPU pool (N2) AND falls back to queue on ANY single GPU failure (N3). **Fix:** Add `proxy_next_upstream_tries 1` (no self-retry), and only route to queue when ALL GPU pools fail via a dedicated health-check upstream. ```diff upstream amdpve_pool { server 192.168.68.15:8080; + max_fails=3 fail_timeout=30s; } upstream llmgpu_pool { server 192.168.68.8:8080; + max_fails=3 fail_timeout=30s; } upstream ocu_llm_pool { server 192.168.68.110:8080; + max_fails=3 fail_timeout=30s; } ## New: All-GPUs health check fails only when ALL upstreams are down +upstream all_gpu_pool { + server 192.168.68.15:8080; + server 192.168.68.8:8080; + server 192.168.68.110:8080; + max_fails=3 fail_timeout=30s; +} map $http_x_syslog_model $gpu_upstream { default amdpve_pool; "standard" amdpve_pool; "heavy" llmgpu_pool; "qwen3.5-27B" llmgpu_pool; "light" ocu_llm_pool; "gemma-4" ocu_llm_pool; } ## New: map to all-gpu pool for fallback +map $http_x_syslog_model $fallback_upstream { + default all_gpu_pool; + "standard" all_gpu_pool; + "heavy" all_gpu_pool; + "qwen3.5-27B" all_gpu_pool; + "light" all_gpu_pool; + "gemma-4" all_gpu_pool; +} server { location / { limit_req zone=perip burst=20 nodelay; - limit_req_status 503; + limit_req_status 503; - proxy_pass http://$gpu_upstream; + proxy_pass http://$gpu_upstream; ... - proxy_next_upstream error timeout http_502 http_503; - proxy_next_upstream_tries 2; + proxy_next_upstream error timeout http_502 http_503 http_504; + proxy_next_upstream_tries 1; - error_page 502 503 504 = @queue_fallback; + error_page 504 = @queue_fallback; } location @queue_fallback { + # Only reached when the all_gpu_pool proxy failed (all GPUs down) rewrite ^ /enqueue break; proxy_pass http://queue_service; ... } } ``` ### QW-3: Fix Nginx Container Names (gpu-router-docker.conf lines 27, 32) **Problem:** Hardcoded `syslog-harness-dashboard-1` changes with docker-compose project prefix. **Fix:** Use Docker Compose service names. ```diff upstream dashboard_service { - server syslog-harness-dashboard-1:3001; + server dashboard:3001; } upstream gpu_dashboard_pool { - server syslog-harness-gpu-dashboard-1:8092; + server gpu-dashboard:8092; } ``` ### QW-4: Fix Redis Host Default (queue-service.py line 21) **Problem:** Default `192.168.68.7` is unreachable inside Docker. **Fix:** Default to `redis` (Docker service name). ```diff -REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7") +REDIS_HOST = os.getenv("REDIS_HOST", "redis") ``` ### QW-5: Add Dependency Version Pinning (all Dockerfiles) ```diff # queue-service/Dockerfile -FROM python:3.13-slim -RUN pip install --no-cache-dir flask redis +FROM python:3.13-slim +RUN pip install --no-cache-dir \ + flask==3.1.0 \ + redis==5.2.1 \ + gunicorn==23.0.0 # Dockerfile.dashboard -FROM python:3.11-slim +FROM python:3.11-slim +RUN pip install --no-cache-dir gunicorn==23.0.0 # Dockerfile.gpu -FROM python:3.11-slim -RUN pip install requests +FROM python:3.11-slim +RUN pip install --no-cache-dir \ + requests==2.32.3 \ + psutil==6.1.1 ``` ### QW-6: Add `.dockerignore` ``` .git .gitignore *.md *.pyc __pycache__ .env *.swp .hermes/ syslog-harness-check/ ``` ### QW-7: Add Docker Health Checks (docker-compose.yml) ```yaml redis: image: redis:7-alpine restart: unless-stopped healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 3s retries: 3 queue-service: healthcheck: test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8091/health')"] interval: 15s timeout: 5s retries: 3 dashboard: healthcheck: test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3001/')"] interval: 15s timeout: 5s retries: 3 ``` ### QW-8: Use `unless-stopped` Instead of `always` ```diff - restart: always + restart: unless-stopped ``` Apply to all services in docker-compose.yml. ### QW-9: Add Gunicorn to Queue Service (replace Flask dev server) ```diff # queue-service/Dockerfile -CMD ["python3", "queue-service.py"] +CMD ["gunicorn", "--bind", "0.0.0.0:8091", "--workers", "2", "--timeout", "120", "queue-service:app"] ``` ### QW-10: Fix GPU Dashboard Multi-Process CMD (Dockerfile.gpu line 14) **Problem:** `&` background processes with no supervisor if collector crashes, nothing restarts it. **Fix:** Use a single process with signal handling, or use `supervisord`. ```dockerfile # Option A: Single process with threading (recommended for simplicity) CMD ["python3", "gpu_collector.py"] # Option B: Use supervisord (more robust) RUN pip install --no-cache-dir supervisor COPY supervisord.conf /etc/supervisor/conf.d/ CMD ["/usr/bin/supervisord", "-n", "-c", "/etc/supervisor/supervisord.conf"] ``` --- ## 3. Smart Queue Consumer Full Implementation ### 3.1 Architecture ``` Smart Queue Service (queue-service.py) Agent Nginx Queue (POST /v1/...) API Layer Redis Streams (Flask) (inference:stream) Job Tracker Consumer Pool (Redis Hash) (asyncio workers) GPU Selection Logic (load-balanced by GPU metrics) amdpve llmgpu ocu_llm :8080 :8080 :8080 35B-A3B 27B-code gemma-4-E4B DLQ Stream (inference: dead-letter) ``` ### 3.2 Data Model ```python # Redis Streams keys STREAM_KEY = "inference:stream" # Main queue DLQ_KEY = "inference:dead-letter" # Failed jobs PENDING_KEY = "inference:pending" # Jobs currently being processed # Redis Hash keys JOB_STATUS_KEY = "job:status" # job_id -> {status, gpu, started_at, ...} GPU_REGISTRY_KEY = "config:gpus" # gpu_name -> {endpoint, model, ...} CONSUMER_REGISTRY_KEY = "consumers" # consumer_id -> {pid, started_at, last_heartbeat} # Stream entry format (JSON) # { # "job_id": "uuid", # "model": "standard|heavy|light", # "priority": "high|normal|low", # "payload": {...}, # Original request body # "headers": {...}, # Original request headers (including Content-Type) # "created_at": 1747478400.0, # "attempt": 0, # "max_retries": 3, # "source": "nginx|direct" # } ``` ### 3.3 New `queue-service.py` (Complete Rewrite) ```python #!/usr/bin/env python3 """Syslog Harness Smart Queue Service Redis Streams + GPU-aware consumer pool. Endpoints: /health liveness (Nginx upstream check) /enqueue POST: submit inference request (with optional priority) /status GET: queue depth, circuit state, consumer pool /status/ GET: specific job status and result /dlq GET: list dead-letter queue entries /dlq//retry POST: retry a dead-letter job /consumers GET: consumer pool status /gpus GET: GPU registry with live health """ import asyncio import json import logging import os import signal import sys import time import uuid from datetime import datetime, timezone import redis from flask import Flask, request, jsonify # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- REDIS_HOST = os.getenv("REDIS_HOST", "redis") REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "") # Queue thresholds (adaptive backpressure) QUEUE_WARN = 30 QUEUE_503 = 40 QUEUE_OPEN = 50 # Consumer settings CONSUMER_WORKERS = int(os.getenv("QUEUE_CONSUMERS", "3")) # One per GPU POLL_INTERVAL = float(os.getenv("QUEUE_POLL_INTERVAL", "2")) # seconds GPU_HEALTH_INTERVAL = 5 # seconds MAX_RETRIES = int(os.getenv("QUEUE_MAX_RETRIES", "3")) JOB_TIMEOUT = 300 # seconds (5 min, matches Nginx proxy_read_timeout) # Redis stream max length (approximate trimming) STREAM_MAXLEN = 10000 # GPU registry (single source of truth) GPU_REGISTRY = { "amdpve": { "endpoint": "http://192.168.68.15:8080", "model": "qwen3.6-35B-A3B", "type": "MoE", "vram_total_mb": 65536, "priority": 1, # Primary workhorse }, "llmgpu": { "endpoint": "http://192.168.68.8:8080", "model": "qwen3.6-27B-code", "type": "Dense", "vram_total_mb": 24576, "priority": 2, }, "ocu_llm": { "endpoint": "http://192.168.68.110:8080", "model": "gemma-4-E4B", "type": "Light", "vram_total_mb": 12227, "priority": 3, }, } # Model-to-GPU mapping (mirrors Nginx map block) MODEL_TO_GPU = { "default": "amdpve", "standard": "amdpve", "heavy": "llmgpu", "qwen3.5-27B": "llmgpu", "qwen3.6-27B": "llmgpu", "qwen3.6-27B-code": "llmgpu", "light": "ocu_llm", "gemma-4": "ocu_llm", "gemma-4-E4B": "ocu_llm", } # Priority ordering PRIORITY_ORDER = {"high": 0, "normal": 1, "low": 2} # Streams STREAM_KEY = "inference:stream" DLQ_KEY = "inference:dead-letter" PENDING_KEY = "inference:pending" # Redis keys JOB_STATUS_KEY = "job:status" CONSUMER_KEY = "consumers" # Logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", stream=sys.stdout, ) logger = logging.getLogger("queue-service") # --------------------------------------------------------------------------- # Redis Client # --------------------------------------------------------------------------- def get_redis(): """Get Redis connection. Raises on failure (no silent swallowing).""" kwargs = dict( host=REDIS_HOST, port=REDIS_PORT, decode_responses=True, socket_timeout=5, socket_connect_timeout=3, retry_on_timeout=True, ) if REDIS_PASSWORD: kwargs["password"] = REDIS_PASSWORD r = redis.Redis(**kwargs) r.ping() # Verify connection return r # Global Redis client (reused across requests) _redis_client = None def redis_client(): global _redis_client if _redis_client is None: _redis_client = get_redis() return _redis_client # --------------------------------------------------------------------------- # GPU Health & Load # --------------------------------------------------------------------------- def fetch_gpu_metrics(gpu_name, gpu_info): """Fetch live metrics from GPU sidecar (port 8090). Returns dict with health, utilization, VRAM usage, and load score. Returns None if sidecar is unreachable. """ endpoint = gpu_info["endpoint"] try: import urllib.request # Check sidecar health for GPU metrics req = urllib.request.Request(f"{endpoint}:8090/health") req.add_header("User-Agent", "queue-consumer/1.0") resp = urllib.request.urlopen(req, timeout=3) if resp.status != 200: return None metrics = json.loads(resp.read()) # Calculate load score (0-100, lower = more available) vram_used = metrics.get("vram_used_mb", 0) vram_total = metrics.get("vram_total_mb", 1) vram_pct = (vram_used / vram_total * 100) if vram_total > 0 else 100 gpu_util = metrics.get("gpu_util_pct", 0) temp = metrics.get("temp_c", 0) # Load score: weighted combination of GPU util and VRAM pressure # Higher score = more loaded = less desirable for new jobs load_score = (gpu_util * 0.6) + (vram_pct * 0.3) + (max(0, temp - 60) * 0.1) # Consider GPU as "down" if GPU util is stuck at 100% and VRAM is full is_down = vram_pct > 95 or (gpu_util > 98 and temp > 85) return { "health": "down" if is_down else "up", "gpu_util_pct": gpu_util, "vram_used_mb": vram_used, "vram_total_mb": vram_total, "vram_pct": round(vram_pct, 1), "temp_c": temp, "load_score": round(load_score, 1), "last_check": time.time(), } except Exception as e: logger.debug(f"GPU {gpu_name} sidecar unreachable: {e}") return None def get_gpu_health(): """Check health of all GPUs. Returns dict of gpu_name -> metrics or None.""" health = {} for name, info in GPU_REGISTRY.items(): m = fetch_gpu_metrics(name, info) health[name] = m if m else {"health": "down", "last_check": time.time()} return health def select_gpu(job): """Select the best GPU for a job based on model mapping and load. Algorithm: 1. Map job model to preferred GPU via MODEL_TO_GPU 2. Check if preferred GPU is up and has capacity 3. If preferred is down/busy, try fallback GPUs in priority order 4. Return the best available GPU or None Returns: (gpu_name, gpu_info, metrics) or (None, None, None) """ model = job.get("model", "standard") preferred = MODEL_TO_GPU.get(model, "amdpve") # Get all GPU health health = get_gpu_health() # Build candidate list: preferred first, then others by priority candidates = [] for name in [preferred] + sorted( [n for n in GPU_REGISTRY if n != preferred], key=lambda n: GPU_REGISTRY[n]["priority"] ): metrics = health.get(name, {}) if metrics.get("health") == "up": candidates.append((name, GPU_REGISTRY[name], metrics)) if not candidates: return None, None, None # Sort by load_score (lower = less loaded = better) candidates.sort(key=lambda c: c[2].get("load_score", 100)) return candidates[0] # --------------------------------------------------------------------------- # Circuit Breaker (per-GPU, with recovery) # --------------------------------------------------------------------------- class CircuitBreaker: """Per-GPU circuit breaker with half-open recovery state. States: CLOSED Normal operation. Failures tracked. OPEN All requests fail immediately. After cooldown, transitions to HALF_OPEN. HALF_OPEN Allow one probe request. If it succeeds, CLOSED. If fails, OPEN. """ def __init__(self, failure_threshold=3, recovery_timeout=30): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self._failures = {} # gpu_name -> count self._last_failure = {} # gpu_name -> timestamp self._state = {} # gpu_name -> "closed" | "open" | "half_open" def allow_request(self, gpu_name): """Check if a request should be allowed for this GPU.""" state = self._state.get(gpu_name, "closed") if state == "closed": return True if state == "open": last_fail = self._last_failure.get(gpu_name, 0) if time.time() - last_fail > self.recovery_timeout: self._state[gpu_name] = "half_open" logger.info(f"Circuit {gpu_name}: OPEN HALF_OPEN (probe)") return True return False if state == "half_open": # Only one probe at a time for simplicity, allow return True return False def record_success(self, gpu_name): """Record a successful request transition to CLOSED.""" self._failures[gpu_name] = 0 self._state[gpu_name] = "closed" logger.info(f"Circuit {gpu_name}: HALF_OPEN CLOSED") def record_failure(self, gpu_name): """Record a failed request.""" self._failures[gpu_name] = self._failures.get(gpu_name, 0) + 1 self._last_failure[gpu_name] = time.time() if self._failures[gpu_name] >= self.failure_threshold: self._state[gpu_name] = "open" logger.warning(f"Circuit {gpu_name}: CLOSED OPEN (threshold={self.failure_threshold})") # --------------------------------------------------------------------------- # Job Lifecycle # --------------------------------------------------------------------------- def store_job_status(job_id, status_data): """Store job status in Redis hash.""" r = redis_client() r.hset(JOB_STATUS_KEY, job_id, json.dumps({ **status_data, "updated_at": time.time(), })) # Auto-expire after 24 hours r.expire(JOB_STATUS_KEY, 86400) def get_job_status(job_id): """Get job status from Redis.""" r = redis_client() raw = r.hget(JOB_STATUS_KEY, job_id) if raw: return json.loads(raw) return None def enqueue_job(payload, headers, source="nginx"): """Add a job to the Redis stream. Returns job_id.""" r = redis_client() # Parse model and priority from headers / payload model_header = headers.get("X-Syslog-Model", "standard") priority_header = headers.get("X-Syslog-Priority", "normal") # Validate priority if priority_header not in PRIORITY_ORDER: priority_header = "normal" job_id = str(uuid.uuid4()) job = { "job_id": job_id, "model": model_header, "priority": priority_header, "payload": payload, "headers": dict(headers), # Include ALL headers (not just X-*) "created_at": time.time(), "attempt": 0, "max_retries": MAX_RETRIES, "source": source, } # Add to stream with approximate trimming r.xadd( STREAM_KEY, {"job": json.dumps(job)}, maxlen=STREAM_MAXLEN, approx=True, ) # Initial status store_job_status(job_id, { "status": "queued", "model": model_header, "priority": priority_header, "created_at": job["created_at"], "queued_at": time.time(), }) return job_id def get_queue_depth(): """Get current stream length (approximate).""" r = redis_client() # XLEN is O(1) for streams try: return r.xlen(STREAM_KEY) except Exception: return -1 # --------------------------------------------------------------------------- # Dead Letter Queue # --------------------------------------------------------------------------- def move_to_dlq(job_id, reason): """Move a failed job to the dead letter queue.""" r = redis_client() # Get the original stream entry entries = r.xrange(STREAM_KEY, min="0-0", max="+", count=10000) original_job = None for msg_id, data in entries: job = json.loads(data.get("job", "{}")) if job.get("job_id") == job_id: original_job = job # Acknowledge (remove) from main stream r.xdel(STREAM_KEY, msg_id) break if not original_job: logger.error(f"DLQ move: job {job_id} not found in main stream") return dlq_entry = { **original_job, "dlq_reason": reason, "dlq_at": time.time(), } r.xadd(DLQ_KEY, {"job": json.dumps(dlq_entry)}) store_job_status(job_id, { "status": "dead", "dlq_reason": reason, "dlq_at": dlq_entry["dlq_at"], "updated_at": time.time(), }) logger.warning(f"Job {job_id} moved to DLQ: {reason}") def list_dlq(limit=20): """List dead letter queue entries.""" r = redis_client() entries = r.xrange(DLQ_KEY, count=limit) result = [] for msg_id, data in entries: job = json.loads(data.get("job", "{}")) result.append({ "message_id": msg_id.decode() if isinstance(msg_id, bytes) else msg_id, **job, }) return result def retry_dlq(message_id): """Retry a dead-letter job by re-adding to main stream.""" r = redis_client() entries = r.xrange(DLQ_KEY, min=message_id, max=message_id, count=1) if not entries: return None, "DLQ entry not found" msg_id, data = entries[0] job = json.loads(data.get("job", "{}")) # Reset attempt count job["attempt"] = 0 job["dlq_at"] = None job["dlq_reason"] = None # Re-add to main stream r.xadd( STREAM_KEY, {"job": json.dumps(job)}, maxlen=STREAM_MAXLEN, approx=True, ) # Remove from DLQ r.xdel(DLQ_KEY, msg_id) # Update status store_job_status(job["job_id"], { "status": "queued", "attempt": 0, "requeued_at": time.time(), "updated_at": time.time(), }) return job["job_id"], "requeued" # --------------------------------------------------------------------------- # Flask API # --------------------------------------------------------------------------- app = Flask(__name__) circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30) @app.route("/health") def health(): """Liveness probe for Nginx.""" try: r = redis_client() r.ping() return jsonify({"status": "ok", "service": "queue-service"}), 200 except Exception as e: return jsonify({"status": "error", "error": str(e)}), 503 @app.route("/enqueue", methods=["POST"]) def enqueue(): """Submit inference request to the smart queue. Accepts any body (JSON, text, etc.) and forwards all headers. Supports X-Syslog-Priority header: high, normal, low. """ try: payload = request.get_data(as_text=True) headers = dict(request.headers) # ALL headers, not just X-* # Check circuit breaker: is the queue overwhelmed? depth = get_queue_depth() if depth >= QUEUE_OPEN: return jsonify({ "error": "Circuit breaker OPEN", "queue_depth": depth, "threshold": QUEUE_OPEN, "retry_after": 30, }), 503 # Adaptive backpressure if depth >= QUEUE_503: return jsonify({ "error": "Queue overloaded", "queue_depth": depth, "retry_after": min(60, (depth - QUEUE_503) * 5), }), 503 if depth >= QUEUE_WARN: # Warn but allow logger.warning(f"Queue depth approaching limit: {depth}/{QUEUE_WARN}") job_id = enqueue_job(payload, headers) return jsonify({ "job_id": job_id, "status": "queued", "queue_depth": get_queue_depth(), }), 202 except Exception as e: logger.error(f"Enqueue failed: {e}") return jsonify({"error": str(e)}), 500 @app.route("/status") def status(): """Queue overview: depth, circuit state, consumer pool, GPU health.""" r = redis_client() depth = get_queue_depth() dlq_depth = r.xlen(DLQ_KEY) if r else 0 pending_depth = r.xlen(PENDING_KEY) if r else 0 # Circuit breaker state per GPU circuit_state = {} for gpu_name in GPU_REGISTRY: state = circuit_breaker._state.get(gpu_name, "closed") failures = circuit_breaker._failures.get(gpu_name, 0) circuit_state[gpu_name] = { "state": state, "failures": failures, } # Consumer pool consumers_raw = r.hgetall(CONSUMER_KEY) if r else {} consumers = [] for cid, cdata in consumers_raw.items(): try: consumers.append(json.loads(cdata)) except json.JSONDecodeError: pass # GPU health gpu_health = get_gpu_health() return jsonify({ "queue_depth": depth, "pending_depth": pending_depth, "dlq_depth": dlq_depth, "circuit_breaker": circuit_state, "consumers": consumers, "gpu_health": gpu_health, "thresholds": { "warn": QUEUE_WARN, "overloaded": QUEUE_503, "open": QUEUE_OPEN, }, }), 200 @app.route("/status/") def job_status_endpoint(job_id): """Get specific job status and result.""" status_data = get_job_status(job_id) if not status_data: return jsonify({"error": "job not found"}), 404 return jsonify(status_data), 200 @app.route("/dlq") def dlq_list(): """List dead letter queue entries.""" return jsonify({"dead_letter_queue": list_dlq()}), 200 @app.route("/dlq//retry", methods=["POST"]) def dlq_retry(message_id): """Retry a dead-letter job.""" job_id, msg = retry_dlq(message_id) if job_id is None: return jsonify({"error": msg}), 404 return jsonify({"job_id": job_id, "status": "requeued"}), 200 @app.route("/consumers") def consumers_status(): """List active consumer pool.""" r = redis_client() consumers_raw = r.hgetall(CONSUMER_KEY) if r else {} consumers = [] for cid, cdata in consumers_raw.items(): try: consumers.append(json.loads(cdata)) except json.JSONDecodeError: pass return jsonify({"consumers": consumers}), 200 @app.route("/gpus") def gpus_status(): """List GPU registry with live health and metrics.""" health = get_gpu_health() result = {} for name, info in GPU_REGISTRY.items(): h = health.get(name, {}) result[name] = { **info, **h, } return jsonify(result), 200 # --------------------------------------------------------------------------- # Consumer Pool (runs in background thread) # --------------------------------------------------------------------------- def run_consumer(consumer_id): """Main consumer loop: reads from stream, selects GPU, forwards request. This runs in a background thread per worker. """ import urllib.request worker_name = f"consumer-{consumer_id}" consumer_group = f"workers-{consumer_id}" logger.info(f"{worker_name} started") # Register consumer r = redis_client() r.hset(CONSUMER_KEY, consumer_id, json.dumps({ "consumer_id": consumer_id, "started_at": time.time(), "pid": os.getpid(), "status": "running", })) # Create consumer group if not exists try: r.xgroup_create(STREAM_KEY, consumer_group, id="0", mkstream=True) except Exception: pass # Group already exists running = True def handle_signal(signum, frame): nonlocal running logger.info(f"{worker_name} shutting down (signal {signum})") running = False signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGINT, handle_signal) last_heartbeat = 0 gpu_health_cache = {} last_health_check = 0 while running: try: now = time.time() # Periodic health check (every GPU_HEALTH_INTERVAL seconds) if now - last_health_check > GPU_HEALTH_INTERVAL: gpu_health_cache = get_gpu_health() last_health_check = now # Read from stream (non-blocking, 1 batch at a time) entries = r.xreadgroup( consumer_group, consumer_id, {STREAM_KEY: ">"}, # ">" = new messages only count=1, # Process one job at a time per consumer block=int(POLL_INTERVAL * 1000), # Block in ms ) if not entries: # Update heartbeat if now - last_heartbeat > 5: r.hset(CONSUMER_KEY, consumer_id, json.dumps({ "consumer_id": consumer_id, "started_at": r.hget(CONSUMER_KEY, consumer_id + ":started") or time.time(), "pid": os.getpid(), "status": "idle", "last_heartbeat": now, "jobs_processed": r.hget(CONSUMER_KEY, consumer_id + ":processed") or 0, })) last_heartbeat = now continue for stream, messages in entries: for msg_id, data in messages: job_data = json.loads(data.get("job", "{}")) job_id = job_data.get("job_id", "unknown") logger.info(f"{worker_name} processing job {job_id} (attempt {job_data.get('attempt', 0)})") # Update status to processing store_job_status(job_id, { "status": "processing", "consumer": consumer_id, "started_at": now, "updated_at": now, }) # Track pending r.xadd(PENDING_KEY, {"job_id": job_id, "consumer": consumer_id}) # Select GPU gpu_name, gpu_info, gpu_metrics = select_gpu(job_data) if not gpu_name: # No GPU available requeue logger.warning(f"No GPU available for job {job_id}, requeuing") move_to_dlq(job_id, "no_gpu_available") r.xack(STREAM_KEY, consumer_group, msg_id) continue # Check circuit breaker if not circuit_breaker.allow_request(gpu_name): logger.warning(f"Circuit OPEN for {gpu_name}, DLQ job {job_id}") move_to_dlq(job_id, f"circuit_open_{gpu_name}") r.xack(STREAM_KEY, consumer_group, msg_id) continue # Forward request to GPU gpu_url = f"{gpu_info['endpoint']}/v1/chat/completions" headers = job_data.get("headers", {}) payload = job_data.get("payload", "{}") # Ensure Content-Type is set if "Content-Type" not in headers: headers["Content-Type"] = "application/json" # Update job status with GPU selection store_job_status(job_id, { "status": "processing", "gpu": gpu_name, "gpu_load_score": gpu_metrics.get("load_score", 0) if gpu_metrics else 0, "started_at": now, "updated_at": now, }) try: req = urllib.request.Request(gpu_url, data=payload.encode(), headers=headers) req.add_header("User-Agent", f"queue-consumer/{consumer_id}") resp = urllib.request.urlopen(req, timeout=JOB_TIMEOUT) response_data = resp.read().decode() # Success! circuit_breaker.record_success(gpu_name) store_job_status(job_id, { "status": "completed", "gpu": gpu_name, "result": response_data[:2000], # Store first 2000 chars "completed_at": time.time(), "updated_at": time.time(), }) logger.info(f"{worker_name} completed job {job_id} on {gpu_name}") except Exception as e: # Request failed error_msg = str(e)[:200] logger.error(f"{worker_name} failed job {job_id} on {gpu_name}: {error_msg}") circuit_breaker.record_failure(gpu_name) # Check if we should retry attempt = job_data.get("attempt", 0) + 1 if attempt <= MAX_RETRIES: # Retry: update job with incremented attempt, re-add to stream job_data["attempt"] = attempt store_job_status(job_id, { "status": "retrying", "attempt": attempt, "last_error": error_msg, "updated_at": time.time(), }) r.xadd( STREAM_KEY, {"job": json.dumps(job_data)}, maxlen=STREAM_MAXLEN, approx=True, ) else: # Max retries exceeded DLQ move_to_dlq(job_id, f"max_retries_exceeded_after_{attempt}_attempts_on_{gpu_name}: {error_msg}") # Ack the stream message r.xack(STREAM_KEY, consumer_group, msg_id) # Remove from pending r.xdel(PENDING_KEY, msg_id) # Update consumer processed count processed = r.hget(CONSUMER_KEY, consumer_id + ":processed") processed = int(processed) + 1 if processed else 1 r.hset(CONSUMER_KEY, consumer_id + ":processed", str(processed)) except redis.ConnectionError as e: logger.error(f"{worker_name} Redis connection error: {e}. Retrying in {POLL_INTERVAL}s...") time.sleep(POLL_INTERVAL) except Exception as e: logger.error(f"{worker_name} unexpected error: {e}") time.sleep(POLL_INTERVAL) # Unregister consumer r.hdel(CONSUMER_KEY, consumer_id) logger.info(f"{worker_name} stopped") # --------------------------------------------------------------------------- # Startup # --------------------------------------------------------------------------- def main(): """Start the queue service with consumer workers.""" logger.info("Starting Smart Queue Service...") logger.info(f"Redis: {REDIS_HOST}:{REDIS_PORT}") logger.info(f"Consumers: {CONSUMER_WORKERS}") logger.info(f"GPU registry: {list(GPU_REGISTRY.keys())}") # Verify Redis connectivity try: r = redis_client() r.ping() logger.info("Redis connection verified") except Exception as e: logger.error(f"Redis connection failed: {e}") sys.exit(1) # Start consumer workers in background threads threads = [] for i in range(CONSUMER_WORKERS): t = asyncio.start_new_thread(run_consumer, (f"worker-{i}",)) threads.append(t) logger.info(f"Started {CONSUMER_WORKERS} consumer workers") # Start Flask API (gunicorn handles this in production) app.run(host="0.0.0.0", port=8091, threaded=True) if __name__ == "__main__": main() ``` ### 3.4 Updated `queue-service/Dockerfile` ```dockerfile FROM python:3.13-slim RUN pip install --no-cache-dir \ flask==3.1.0 \ redis==5.2.1 \ gunicorn==23.0.0 COPY queue-service.py /app/queue-service.py WORKDIR /app EXPOSE 8091 # Gunicorn: 2 workers, 300s timeout (matches LLM inference timeout) CMD ["gunicorn", "--bind", "0.0.0.0:8091", "--workers", "2", "--threads", "4", "--timeout", "300", "--graceful-timeout", "30", "queue-service:app"] ``` ### 3.5 Updated `docker-compose.yml` ```yaml version: "3.8" services: redis: image: redis:7-alpine restart: unless-stopped networks: - gpu-router-net volumes: - redis-data:/data healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 3s retries: 3 queue-service: build: context: . dockerfile: queue-service/Dockerfile restart: unless-stopped networks: - gpu-router-net expose: - "8091" depends_on: redis: condition: service_healthy environment: - REDIS_HOST=redis - REDIS_PORT=6379 - QUEUE_CONSUMERS=3 - QUEUE_POLL_INTERVAL=2 - QUEUE_MAX_RETRIES=3 healthcheck: test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8091/health')"] interval: 15s timeout: 5s retries: 3 dashboard: build: context: . dockerfile: Dockerfile.dashboard restart: unless-stopped networks: - gpu-router-net expose: - "3001" depends_on: queue-service: condition: service_healthy healthcheck: test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3001/')"] interval: 15s timeout: 5s retries: 3 gpu-dashboard: build: context: . dockerfile: Dockerfile.gpu restart: unless-stopped networks: - gpu-router-net expose: - "8092" healthcheck: test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8092/gpu.html')"] interval: 15s timeout: 5s retries: 3 networks: gpu-router-net: driver: bridge volumes: redis-data: ``` **Key changes:** - `dockerfile: queue-service/Dockerfile` (QW-1 fix) - `restart: unless-stopped` (QW-8 fix) - `expose` instead of `ports` for internal services (QW-18: queue API no longer externally exposed) - `condition: service_healthy` for depends_on (QW-9: health checks) --- ## 4. Nginx Config Updates (gpu-router-docker.conf) ```nginx ## Syslog GPU Router Nginx Configuration (Docker-internal) ## Routes incoming agent requests to the appropriate GPU backend ## based on the X-Syslog-Model header. upstream amdpve_pool { server 192.168.68.15:8080 max_fails=3 fail_timeout=30s; } upstream llmgpu_pool { server 192.168.68.8:8080 max_fails=3 fail_timeout=30s; } upstream ocu_llm_pool { server 192.168.68.110:8080 max_fails=3 fail_timeout=30s; } upstream queue_service { server queue-service:8091; } upstream dashboard_service { server dashboard:3001; } upstream gpu_dashboard_pool { server gpu-dashboard:8092; } ## All-GPUs pool for fallback (fails only when ALL GPUs are down) upstream all_gpu_pool { server 192.168.68.15:8080 max_fails=3 fail_timeout=30s; server 192.168.68.8:8080 max_fails=3 fail_timeout=30s; server 192.168.68.110:8080 max_fails=3 fail_timeout=30s; } map $http_x_syslog_model $gpu_upstream { default amdpve_pool; "standard" amdpve_pool; "heavy" llmgpu_pool; "qwen3.5-27B" llmgpu_pool; "light" ocu_llm_pool; "gemma-4" ocu_llm_pool; } limit_req_zone $binary_remote_addr zone=perip:10m rate=10r/s; server { listen 80; server_name _; location /dashboard { proxy_pass http://dashboard_service/; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } location /gpu { proxy_pass http://gpu_dashboard_pool/; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } location / { limit_req zone=perip burst=20 nodelay; limit_req_status 503; proxy_pass http://$gpu_upstream; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; ## Streaming support proxy_buffering off; proxy_cache off; proxy_read_timeout 300s; proxy_send_timeout 300s; ## Only retry on error/timeout single try (same GPU pool) proxy_next_upstream error timeout http_502 http_503 http_504; proxy_next_upstream_tries 1; add_header X-Routed-To $gpu_upstream always; ## Only fall back to queue when the ALL-GPU pool fails error_page 504 = @queue_fallback; } ## Queue fallback only reached when ALL GPUs are down location @queue_fallback { proxy_pass http://queue_service/enqueue; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header Content-Type $content_type; proxy_pass_request_body on; proxy_set_header X-Syslog-Model $http_x_syslog_model; } } ``` **Key changes:** - `max_fails=3 fail_timeout=30s` on each upstream (Nginx-level health) - `all_gpu_pool` upstream for fallback (N3 fix) - `proxy_next_upstream_tries 1` (N2 fix no self-retry) - Container names fixed to service names (N5 fix) - `error_page 504 = @queue_fallback` instead of 502/503/504 (N3 fix) - `X-Syslog-Model` forwarded to queue_service in fallback --- ## 5. GPU Dashboard Updates ### 5.1 Updated `Dockerfile.gpu` ```dockerfile FROM python:3.11-slim RUN pip install --no-cache-dir psutil==6.1.1 COPY gpu-dashboard/ /app/ WORKDIR /app RUN mkdir -p /app/public && \ cp gpu.html /app/public/ && \ touch /app/public/gpu_metrics.json EXPOSE 8092 # Single-process: gpu_collector.py serves both as collector and HTTP server CMD ["python3", "gpu_collector.py"] ``` ### 5.2 Updated `gpu_collector.py` (key changes) ```python # Changes: # 1. Add async/concurrent GPU polling (threading) # 2. Fix path consistency (use /app/public consistently) # 3. Add graceful shutdown # 4. Include health endpoint for Docker healthcheck import threading import http.server import json import time import signal import sys # ... (keep existing imports) DEAD_THRESHOLD = 30 # seconds (was 60 G4 fix) OUTPUT_PATH = "/app/public/gpu_metrics.json" # Consistent path (G2 fix) def fetch_json_concurrent(endpoints): """Fetch from all GPUs concurrently using threads.""" results = {} errors = {} def fetch_one(name, url): try: req = urllib.request.Request(url) req.add_header("User-Agent", "gpu-collector/1.0") resp = urllib.request.urlopen(req, timeout=3) results[name] = json.loads(resp.read()) except Exception as e: errors[name] = str(e)[:100] threads = [] for name, url in endpoints.items(): t = threading.Thread(target=fetch_one, args=(name, url)) threads.append(t) t.start() for t in threads: t.join(timeout=5) return results, errors # ... (update the collection loop to use fetch_json_concurrent) # Add HTTP health endpoint class HealthHandler(http.server.SimpleHTTPRequestHandler): def do_GET(self): if self.path == "/health": self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"status": "ok"}).encode()) elif self.path == "/gpu.html" or self.path == "/": super().do_GET() else: self.send_response(404) self.end_headers() # Start both collector loop and HTTP server in the same thread # (HTTP server runs in a separate thread, collector runs in main) ``` --- ## 6. Migration Steps ### Phase 1: Quick Wins (Deploy First) 1. Apply QW-1 through QW-10 (Dockerfile fix, Nginx fixes, health checks) 2. `docker compose up -d --build` 3. Verify all containers pass health checks 4. Verify Nginx fallback only triggers on ALL-GPU failure ### Phase 2: Smart Queue Consumer 1. Replace `queue-service/queue-service.py` with new implementation 2. Replace `queue-service/Dockerfile` with Gunicorn-based Dockerfile 3. Update `docker-compose.yml` (expose ports, depends_on conditions) 4. `docker compose up -d --build queue-service` 5. Verify: - `/health` returns 200 - `/status` shows consumer pool - Submit a test request via `/enqueue` - Verify job appears in `/status/` with "queued" status - Verify job gets processed and moves to "completed" - Verify GPU sidecar metrics are being read - Verify consumer pool shows active workers ### Phase 3: Validation 1. Submit multiple concurrent requests 2. Verify load balancing across GPUs 3. Verify circuit breaker opens on GPU failure 4. Verify DLQ captures max-retry failures 5. Verify `/dlq` and `/dlq//retry` endpoints work 6. Verify backpressure: queue depth > 40 returns 503 with retry-after 7. Monitor Redis stream lengths and consumer ack rates --- ## 7. Risk Assessment | Risk | Mitigation | |---|---| | Redis stream XREADGROUP blocks indefinitely | `block=2000ms` parameter prevents infinite blocking | | Consumer crashes mid-request | `PENDING_KEY` tracks in-flight jobs; restart re-reads pending | | GPU sidecar temporarily unavailable | 3s timeout on sidecar calls; GPU marked "down" if unreachable | | Circuit breaker oscillates (flapping) | `recovery_timeout=30s` prevents rapid OPENHALF_OPENOPEN cycling | | DLQ grows unbounded | Stream maxlen=10000 approx trim; manual DLQ cleanup recommended | | Consumer threads conflict with Flask | Gunicorn handles Flask; consumers run in separate threads | | Job payload too large for Redis | Redis has 512MB value limit; LLM responses typically < 50KB | --- ## 8. Future Enhancements (Not in This Draft) 1. **Prometheus metrics endpoint** queue depth, processing rate, GPU latency, error rates 2. **WebSocket streaming** real-time job status updates to dashboard 3. **Batch processing** group multiple small requests for throughput optimization 4. **Model-specific queues** separate streams per model for guaranteed SLA 5. **Redis Sentinel/Cluster** high availability for Redis 6. **Rate limiting per-model** prevent one model from starving others 7. **GPU affinity for long-running jobs** keep a job on the same GPU to avoid context switches 8. **Result caching** cache responses for identical requests (hash payload, return cached result) --- **END OF DRAFT** This implementation transforms the queue-service from a passive storage pit into a production-ready smart queue consumer. The quick wins (QW-1 through QW-10) can be deployed independently and provide immediate value. The smart queue consumer builds on top of the quick wins. **Recommended deployment order:** 1. Quick wins (QW-1 to QW-10) 1-2 hours 2. Smart queue consumer 2-3 hours 3. Validation & testing 1 hour 4. Monitor for 24 hours, then go live