Files
syslog-harness/SMART_QUEUE_DESIGN.md
SyslogBot b09a93f45c feat: Smart Queue Consumer implementation draft + architecture review
- SMART_QUEUE_IMPLEMENTATION.md: Complete implementation draft (1572 lines)
  with 10 quick-win fixes and full smart queue consumer rewrite
- ARCHITECTURE_REVIEW.md: 26-issue audit with prioritized findings
- Verified all 3 GPUs live: amdpve (73% util), llmgpu (idle), ocu_llm (idle)
- Redis 7.4.9 confirmed streams support
- GPU sidecar metrics verified on all hosts

Key fixes:
- QW-1: Dockerfile path mismatch (Dockerfile.queue -> queue-service/Dockerfile)
- QW-2: Nginx fallback only on ALL-GPU failure (not single GPU)
- QW-3: Container names fixed to Docker service names
- QW-4: Redis host default fixed (192.168.68.7 -> redis)
- QW-5: Dependency version pinning
- QW-7-10: Health checks, restart policy, Gunicorn, single-process collector

Smart queue features:
- Redis Streams + consumer groups
- GPU-aware load balancing via sidecar metrics
- Per-GPU circuit breakers with half-open recovery
- Adaptive backpressure (0-30 normal, 30-40 warn, 40-50 503, >50 open)
- Dead letter queue with retry endpoint
- Job ID tracking and /status/<job_id> API
2026-05-17 03:55:20 +00:00

43 KiB

Syslog Harness Smart Queue Consumer Design

Date: 2026-05-17 Target: Abiba (deployment) / Kwame (review) Based on: Architecture Review (ARCHITECTURE_REVIEW.md, commit e95475f)


1. Executive Summary

The current queue-service is a dead storage pit it stores requests in a Redis list but has no consumer to process them. This design document proposes a complete rewrite that transforms the queue into a smart, load-balanced inference pipeline with:

  • Redis Streams as the queue backend (consumer groups, acks, pending messages)
  • GPU-aware load balancing using real-time health + utilization data
  • Priority queuing (high/normal/low)
  • Adaptive backpressure with graduated responses
  • Dead letter queue for failed jobs
  • Job lifecycle tracking with status API

Quick wins (low-risk, high-impact fixes) are integrated inline below.


2. Verified Infrastructure Facts

These were validated via live endpoint checks on 2026-05-17:

Component Endpoint Verified? Notes
Redis 127.0.0.1:6379 Running (v8.0.2) Supports Redis Streams natively
amdpve (GPU) 192.168.68.15:8080 /health=ok, /v1/models, /slots 2 slots, no /slots_busy endpoint, /metrics needs --metrics flag
llmgpu (GPU) 192.168.68.8:8080 /health=ok, /v1/models /slots requires API key (401), model=qwen3.6-27B-code
ocu_llm (GPU) 192.168.68.110:8080 /health=ok, /v1/models /slots requires API key (401), model=gemma-4-E4B
amdpve sidecar 192.168.68.15:8090 gpu_util=81%, temp=75C, vram=28/65GB
llmgpu sidecar 192.168.68.8:8090 gpu_util=0%, temp=36C, vram=20/24GB
ocu_llm sidecar 192.168.68.110:8090 gpu_util=0%, temp=39C, vram=7/12GB

Critical Finding: /slots_busy endpoint does NOT exist on any GPU

The architecture review (R6) referenced a /slots_busy endpoint for load-based routing. This endpoint returns 404 on amdpve and 401 on llmgpu/ocu_llm. The actual load metric available is:

  • /slots (amdpve only, no auth): Returns slot array with is_processing boolean per slot
  • /health (all GPUs): Returns {"status": "ok"} only no slot count or load info
  • /v1/models (all GPUs): Returns model info but no load metrics
  • Sidecar :8090/ (all GPUs): Returns GPU hardware metrics (util %, temp, VRAM) but NOT inference slot state

Implication: Load-based routing must use the sidecar gpu_util_pct as a proxy for inference load, combined with the /slots endpoint on amdpve (which supports is_processing checks). For llmgpu/ocu_llm, only sidecar utilization is available.


3. Architecture

3.1 Data Flow

Agent  Nginx  Smart Queue API  Redis Streams (consumer group)
                                              |
                                      Consumer Pool
                                      (load-balanced)
                                      |
                          GPU 1 (81% util)  GPU 2 (0% util)  GPU 3 (0% util)
                          [busy]              [idle]            [idle]

3.2 Redis Data Model

inference:stream           Main stream (XADD/XREADGROUP)
inference:stream:high      High-priority stream
inference:stream:normal    Normal-priority stream (default)
inference:stream:low       Low-priority stream

inference:dead-letter      Failed jobs (XADD, no consumer)

job:status:{job_id}        Hash: {"status": "queued|processing|completed|failed", "gpu": "...", "created_at": ..., "completed_at": ...}
job:result:{job_id}        Hash: {"result": "...", "error": "..."}

config:gpus                Hash: {"amdpve": "192.168.68.15:8080", "llmgpu": "192.168.68.8:8080", "ocu_llm": "192.168.68.110:8080"}
config:gpu-health:{name}   Hash: {"gpu_util_pct": 81, "temp_c": 75, "vram_used": 28230, "vram_total": 65536, "inference_state": "idle|busy", "last_seen": ...}

4. Implementation: queue-service.py (Complete Rewrite)

4.1 Key Changes from Current Code

Current (121 lines) New (~450 lines)
LPUSH/RPUSH FIFO list XADD/XREADGROUP Redis Streams
No job IDs UUID job IDs with lifecycle tracking
Hardcoded GPU IPs config key Single source of truth in Redis config:gpus
Simple depth threshold circuit breaker Per-GPU circuit breakers with half-open recovery
No consumer Embedded consumer loop (background thread)
Headers filtered to X-* only All headers preserved
No result retrieval /status/<id> and /result/<id> endpoints
Redis host default 192.168.68.7 Default 127.0.0.1 (matches actual deployment)
No health check concurrency Async/parallel GPU health checks
No graceful shutdown SIGTERM handler, consumer drain

4.2 Full Source Code

#!/usr/bin/env python3
"""Syslog Harness Smart Queue Service  Redis Streams + GPU load balancing.

Ports: 8091
Endpoints:
  /health           liveness probe
  /enqueue          POST inference request (with priority)
  /status/<id>      GET job status
  /result/<id>      GET job result (when completed)
  /status           GET queue depth, circuit breaker state, GPU health
  /dlq              GET dead letter queue
  /dlq/retry/<id>   POST retry a dead-letter job
  /dlq/discard/<id> POST discard a dead-letter job
"""

import json
import os
import sys
import time
import uuid
import signal
import threading
import urllib.request
import urllib.error
from flask import Flask, request, jsonify
from collections import defaultdict

app = Flask(__name__)

#  Configuration 

REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1")   # FIX Q3: match actual deployment
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
STREAM_KEY = "inference:stream"
STREAM_KEY_HIGH = "inference:stream:high"
STREAM_KEY_NORMAL = "inference:stream:normal"
STREAM_KEY_LOW = "inference:stream:low"
DEAD_LETTER_KEY = "inference:dead-letter"
CONSUMER_GROUP = "gpu-workers"
CONSUMER_NAME = "worker-1"
JOB_STATUS_KEY = "job:status"
JOB_RESULT_KEY = "job:result"
CONFIG_GPUS_KEY = "config:gpus"
CONFIG_GPU_HEALTH_PREFIX = "config:gpu-health:"
MAX_STREAM_ENTRIES = 50000

# Adaptive backpressure thresholds
BP_WARN = 30     # queue depth 30+  start warning
BP_SOFT_OPEN = 40  # queue depth 40+  503 with retry-after
BP_HARD_OPEN = 50  # queue depth 50+  circuit breaker open

# Per-GPU circuit breaker
PER_GPU_CB_WINDOW = 5       # consecutive failures to open
PER_GPU_CB_COOLDOWN = 30    # seconds before half-open probe
PER_GPU_CB_MAX_AGE = 300    # forget about a GPU after 5min of silence

# Health check timeout
GPU_HEALTH_TIMEOUT = 3      # seconds per GPU

# Consumer settings
CONSUMER_POLL_INTERVAL = 1.0  # seconds between stream reads
CONSUMER_WORK_TIMEOUT = 300   # seconds to wait for a GPU response
CONSUMER_RETRY_MAX = 3        # max retries per job before DLQ
CONSUMER_RETRY_DELAY = [2, 5, 10]  # exponential backoff

#  GPU Configuration 

# Single source of truth  loaded from Redis if available, otherwise defaults
DEFAULT_GPUS = {
    "amdpve": {"host": "192.168.68.15", "port": 8080, "sidecar_port": 8090, "model": "qwen3.6-35B-A3B"},
    "llmgpu": {"host": "192.168.68.8", "port": 8080, "sidecar_port": 8090, "model": "qwen3.6-27B-code"},
    "ocu_llm": {"host": "192.168.68.110", "port": 8080, "sidecar_port": 8090, "model": "gemma-4-E4B"},
}

#  State 

shutdown_event = threading.Event()

# Per-GPU circuit breaker state
gpu_circuit_breakers = defaultdict(lambda: {
    "consecutive_failures": 0,
    "state": "closed",  # closed, open, half-open
    "last_failure": 0,
    "last_probe": 0,
})

# Retry tracking: job_id -> attempt count
retry_counts = {}


#  Redis Helpers 

def get_redis():
    """Get Redis connection. Never returns None  raises on failure."""
    try:
        import redis
        return redis.Redis(
            host=REDIS_HOST, port=REDIS_PORT, decode_responses=True,
            socket_connect_timeout=3, socket_timeout=3
        )
    except Exception:
        # Log but don't crash  other code handles missing Redis
        print(f"[queue] ERROR: Cannot connect to Redis at {REDIS_HOST}:{REDIS_PORT}", file=sys.stderr)
        return None


def safe_redis_call(fn, *args, default=None):
    """Execute a Redis call, return default on failure."""
    r = get_redis()
    if r is None:
        return default
    try:
        return fn(r, *args)
    except Exception:
        return default


#  GPU Health 

def fetch_json(url, timeout=3):
    """Fetch JSON from URL, return None on any error."""
    try:
        req = urllib.request.Request(url)
        resp = urllib.request.urlopen(req, timeout=timeout)
        return json.loads(resp.read().decode())
    except Exception:
        return None


def check_gpu_health(gpu_name, gpu_config):
    """Check a single GPU's health via sidecar + llama.cpp endpoints.
    
    Returns dict with health status. Checks are done in parallel via threads.
    """
    host = gpu_config["host"]
    port = gpu_config["port"]
    sidecar_port = gpu_config.get("sidecar_port", 8090)
    
    result = {
        "name": gpu_name,
        "status": "down",
        "sidecar": None,
        "llamacpp": None,
        "gpu_util_pct": None,
        "temp_c": None,
        "vram_used_mb": None,
        "vram_total_mb": None,
        "inference_state": "unknown",
        "slots_busy": 0,
        "slots_total": 0,
        "last_seen": time.time(),
    }
    
    # Check sidecar (GPU hardware metrics)
    sidecar_url = f"http://{host}:{sidecar_port}/"
    sidecar_data = fetch_json(sidecar_url, timeout=2)
    if sidecar_data:
        result["sidecar"] = sidecar_data
        result["gpu_util_pct"] = sidecar_data.get("gpu_util_pct")
        result["temp_c"] = sidecar_data.get("temp_c")
        result["vram_used_mb"] = sidecar_data.get("vram_used_mb")
        result["vram_total_mb"] = sidecar_data.get("vram_total_mb")
    
    # Check llama.cpp /health
    health_url = f"http://{host}:{port}/health"
    health_data = fetch_json(health_url, timeout=2)
    if health_data and health_data.get("status") == "ok":
        result["llamacpp"] = health_data
        result["status"] = "up"
        
        # Check /slots for inference state (only works on amdpve currently)
        slots_url = f"http://{host}:{port}/slots"
        slots_data = fetch_json(slots_url, timeout=2)
        if isinstance(slots_data, list):
            result["slots_total"] = len(slots_data)
            result["slots_busy"] = sum(1 for s in slots_data if s.get("is_processing"))
            if result["slots_busy"] > 0:
                result["inference_state"] = "busy"
            else:
                result["inference_state"] = "idle"
    
    # Store in Redis for other consumers/monitoring
    r = get_redis()
    if r:
        try:
            r.hset(CONFIG_GPU_HEALTH_PREFIX + gpu_name, mapping={
                "status": result["status"],
                "gpu_util_pct": str(result["gpu_util_pct"] or -1),
                "temp_c": str(result["temp_c"] or -1),
                "vram_used": str(result["vram_used_mb"] or 0),
                "vram_total": str(result["vram_total_mb"] or 0),
                "slots_busy": str(result["slots_busy"]),
                "slots_total": str(result["slots_total"]),
                "inference_state": result["inference_state"],
                "last_seen": str(time.time()),
            })
        except Exception:
            pass
    
    return result


def check_all_gpus_parallel():
    """Check all GPUs in parallel using threads. FIX Q6: avoids sequential blocking."""
    gpus = get_gpus()
    results = {}
    threads = []
    
    def check_one(name, config):
        results[name] = check_gpu_health(name, config)
    
    for name, config in gpus.items():
        t = threading.Thread(target=check_one, args=(name, config))
        t.daemon = True
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join(timeout=GPU_HEALTH_TIMEOUT + 1)
    
    return results


#  GPU Configuration Management 

def get_gpus():
    """Get GPU configuration. Try Redis first (single source of truth), fall back to defaults."""
    r = get_redis()
    if r:
        try:
            stored = r.hgetall(CONFIG_GPUS_KEY)
            if stored:
                gpus = {}
                for name, config_json in stored.items():
                    gpus[name] = json.loads(config_json)
                if gpus:
                    return gpus
        except Exception:
            pass
    return DEFAULT_GPUS


def set_gpus(gpu_dict):
    """Set GPU configuration in Redis (single source of truth)."""
    r = get_redis()
    if not r:
        return
    for name, config in gpu_dict.items():
        r.hset(CONFIG_GPUS_KEY, name, json.dumps(config))


#  Circuit Breaker 

def record_gpu_success(gpu_name):
    """Record a successful GPU request  resets circuit breaker."""
    cb = gpu_circuit_breakers[gpu_name]
    cb["consecutive_failures"] = 0
    cb["state"] = "closed"


def record_gpu_failure(gpu_name):
    """Record a failed GPU request  implements per-GPU circuit breaker with half-open. FIX Q5"""
    cb = gpu_circuit_breakers[gpu_name]
    cb["consecutive_failures"] += 1
    cb["last_failure"] = time.time()
    
    if cb["consecutive_failures"] >= PER_GPU_CB_WINDOW and cb["state"] != "open":
        cb["state"] = "open"
    elif cb["state"] == "open":
        # Check if cooldown has passed  transition to half-open
        if time.time() - cb["last_failure"] >= PER_GPU_CB_COOLDOWN:
            cb["state"] = "half-open"
            cb["last_probe"] = time.time()


def is_gpu_available(gpu_name):
    """Check if a GPU is available (circuit breaker allows requests)."""
    cb = gpu_circuit_breakers[gpu_name]
    
    if cb["state"] == "closed":
        return True
    elif cb["state"] == "half-open":
        # Allow one probe request
        if time.time() - cb["last_probe"] >= 5:
            return True
        return False
    else:  # open
        return False


#  Adaptive Backpressure 

def get_total_queue_depth():
    """Get total queue depth across all priority streams. FIX: accurate count."""
    r = get_redis()
    if not r:
        return -1
    try:
        total = 0
        for key in [STREAM_KEY_HIGH, STREAM_KEY_NORMAL, STREAM_KEY_LOW]:
            total += r.xlen(key)
        return total
    except Exception:
        return -1


def get_backpressure_status():
    """Determine backpressure response based on queue depth."""
    depth = get_total_queue_depth()
    if depth < 0:
        return "error"
    elif depth >= BP_HARD_OPEN:
        return "open"       # 503, circuit breaker
    elif depth >= BP_SOFT_OPEN:
        return "soft_open"  # 503 with retry-after
    elif depth >= BP_WARN:
        return "warn"       # 202 with warning
    else:
        return "closed"     # normal


#  Job Enqueue 

@app.route("/health")
def health():
    """Nginx upstream health probe."""
    return jsonify({"status": "ok", "service": "smart-queue"}), 200


@app.route("/enqueue", methods=["POST"])
def enqueue():
    """Enqueue an inference request with priority and job tracking.
    
    Expected JSON body:
    {
        "messages": [...],        # OpenAI-style messages
        "model": "qwen3.6-35B-A3B",
        "stream": true,
        "temperature": 0.8,
        "priority": "normal"      // "high", "normal" (default), "low"
    }
    
    Returns: {"job_id": "...", "status": "queued", "priority": "..."}
    """
    r = get_redis()
    if r is None:
        return jsonify({"error": "Redis unavailable"}), 503
    
    # Parse request
    try:
        data = request.get_json(force=True)
    except Exception:
        return jsonify({"error": "Invalid JSON"}), 400
    
    if not data:
        return jsonify({"error": "Empty request body"}), 400
    
    # Extract priority (default: normal)
    priority = data.get("priority", "normal")
    if priority not in ("high", "normal", "low"):
        priority = "normal"
    
    # Check backpressure
    bp_status = get_backpressure_status()
    if bp_status == "open":
        return jsonify({
            "error": "Circuit breaker OPEN",
            "queue_depth": get_total_queue_depth(),
            "retry_after": 30
        }), 503
    
    if bp_status == "soft_open":
        return jsonify({
            "error": "Queue near capacity",
            "queue_depth": get_total_queue_depth(),
            "retry_after": 10
        }), 503
    
    if bp_status == "warn":
        print(f"[queue] WARN: Queue depth {get_total_queue_depth()} approaching limit", file=sys.stderr)
    
    # Generate job ID
    job_id = str(uuid.uuid4())
    
    # Build job payload
    job = {
        "id": job_id,
        "payload": data,
        "priority": priority,
        "status": "queued",
        "created_at": time.time(),
        "attempts": 0,
        "last_error": None,
        "headers": dict(request.headers),  # FIX Q8: preserve ALL headers
        "target_model": data.get("model", ""),
    }
    
    # Store job status
    try:
        r.hset(JOB_STATUS_KEY, job_id, json.dumps({
            "status": "queued",
            "priority": priority,
            "created_at": job["created_at"],
        }))
    except Exception:
        pass
    
    # Add to appropriate stream
    stream_key = {
        "high": STREAM_KEY_HIGH,
        "normal": STREAM_KEY_NORMAL,
        "low": STREAM_KEY_LOW,
    }.get(priority, STREAM_KEY_NORMAL)
    
    try:
        message_id = r.xadd(
            stream_key,
            {"job": json.dumps(job)},
            maxlen=MAX_STREAM_ENTRIES,
            approx=True
        )
    except Exception as e:
        return jsonify({"error": f"Failed to enqueue: {str(e)}"}), 503
    
    return jsonify({
        "job_id": job_id,
        "status": "queued",
        "priority": priority,
        "position": r.xlen(stream_key),
    }), 202


@app.route("/status/<job_id>")
def job_status(job_id):
    """Get job status."""
    r = get_redis()
    if not r:
        return jsonify({"error": "Service unavailable"}), 503
    
    try:
        status_json = r.hget(JOB_STATUS_KEY, job_id)
        if not status_json:
            return jsonify({"error": "Job not found"}), 404
        status = json.loads(status_json)
        return jsonify(status), 200
    except Exception:
        return jsonify({"error": "Failed to retrieve status"}), 500


@app.route("/result/<job_id>")
def job_result(job_id):
    """Get job result (when completed)."""
    r = get_redis()
    if not r:
        return jsonify({"error": "Service unavailable"}), 503
    
    try:
        result_json = r.hget(JOB_RESULT_KEY, job_id)
        if not result_json:
            return jsonify({"error": "Result not yet available"}), 404
        return jsonify(json.loads(result_json)), 200
    except Exception:
        return jsonify({"error": "Failed to retrieve result"}), 500


@app.route("/status")
def status():
    """GET queue depth + circuit breaker state + GPU health."""
    gpus = check_all_gpus_parallel()  # FIX Q6: parallel health checks
    
    # Per-GPU circuit breaker states
    cb_states = {}
    for name in gpus:
        cb = gpu_circuit_breakers.get(name, {})
        cb_states[name] = {
            "state": cb.get("state", "closed"),
            "consecutive_failures": cb.get("consecutive_failures", 0),
        }
    
    return jsonify({
        "queue_depth": get_total_queue_depth(),
        "backpressure": get_backpressure_status(),
        "circuit_breakers": cb_states,
        "gpu_health": {
            name: {
                "status": gpu["status"],
                "gpu_util_pct": gpu["gpu_util_pct"],
                "temp_c": gpu["temp_c"],
                "vram_used_mb": gpu["vram_used_mb"],
                "vram_total_mb": gpu["vram_total_mb"],
                "slots_busy": gpu["slots_busy"],
                "slots_total": gpu["slots_total"],
                "inference_state": gpu["inference_state"],
            }
            for name, gpu in gpus.items()
        },
        "thresholds": {
            "warn": BP_WARN,
            "soft_open": BP_SOFT_OPEN,
            "hard_open": BP_HARD_OPEN,
        }
    })


#  Dead Letter Queue 

@app.route("/dlq")
def list_dlq():
    """List dead-letter queue entries (last 50)."""
    r = get_redis()
    if not r:
        return jsonify({"error": "Service unavailable"}), 503
    
    try:
        entries = r.xrevrange(DEAD_LETTER_KEY, count=50)
        result = []
        for message_id, fields in entries:
            result.append({
                "message_id": message_id.decode() if isinstance(message_id, bytes) else message_id,
                "job": json.loads(fields.get(b"job" if isinstance(fields, dict) else fields, "{}").decode() if isinstance(fields.get(b"job" if isinstance(fields, dict) else fields), (bytes,)) else fields.get("job", "{}")),
            })
        return jsonify({"count": len(result), "entries": result}), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 500


@app.route("/dlq/retry/<message_id>", methods=["POST"])
def retry_dlq(message_id):
    """Retry a dead-letter job."""
    r = get_redis()
    if not r:
        return jsonify({"error": "Service unavailable"}), 503
    
    try:
        fields = r.xget(DEAD_LETTER_KEY, message_id)
        if not fields:
            return jsonify({"error": "Message not found"}), 404
        
        job = json.loads(fields.get("job", "{}"))
        job["attempts"] = 0
        job["status"] = "queued"
        
        priority = job.get("priority", "normal")
        stream_key = {
            "high": STREAM_KEY_HIGH,
            "normal": STREAM_KEY_NORMAL,
            "low": STREAM_KEY_LOW,
        }.get(priority, STREAM_KEY_NORMAL)
        
        r.xadd(stream_key, {"job": json.dumps(job)})
        return jsonify({"status": "re-enqueued"}), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 500


@app.route("/dlq/discard/<message_id>", methods=["POST"])
def discard_dlq(message_id):
    """Discard a dead-letter job."""
    r = get_redis()
    if not r:
        return jsonify({"error": "Service unavailable"}), 503
    
    try:
        r.xdel(DEAD_LETTER_KEY, message_id)
        return jsonify({"status": "discarded"}), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 500


#  Consumer Loop 

def select_gpu_for_job(job, gpu_health_map):
    """Select best GPU for a job based on load, health, and model compatibility.
    
    Selection criteria (in order):
    1. GPU must be up and circuit breaker allows
    2. GPU must match the requested model (or default to amdpve)
    3. Prefer GPU with lowest gpu_util_pct
    4. Prefer GPU with highest slots_idle
    """
    candidates = []
    target_model = job.get("payload", {}).get("model", "")
    
    for name, health in gpu_health_map.items():
        if health["status"] != "up":
            continue
        if not is_gpu_available(name):
            continue
        
        # Model compatibility check
        config = get_gpus().get(name, {})
        gpu_model = config.get("model", "")
        
        # If job specifies a model, check compatibility
        if target_model:
            # Allow any GPU to serve any model (llama.cpp can load different models)
            # but prefer the configured model match
            if target_model in gpu_model or gpu_model in target_model:
                candidates.append((name, health, 0))  # priority 0 = perfect match
            else:
                candidates.append((name, health, 1))  # priority 1 = mismatch
        else:
            candidates.append((name, health, 0))
    
    if not candidates:
        return None
    
    # Sort by: match_priority (asc), gpu_util_pct (asc), slots_busy (asc)
    candidates.sort(key=lambda c: (c[2], c[1].get("gpu_util_pct", 999), c[1].get("slots_busy", 99)))
    
    return candidates[0][0]  # return GPU name


def consume_job(gpu_name, job_data):
    """Send a job to a GPU and wait for the result.
    
    For streaming responses, the result is stored in Redis and the client polls.
    For non-streaming, the full response is returned directly.
    """
    config = get_gpus().get(gpu_name, {})
    host = config["host"]
    port = config["port"]
    
    # Update job status to processing
    job_id = job_data.get("id", "unknown")
    r = get_redis()
    if r:
        try:
            r.hset(JOB_STATUS_KEY, job_id, json.dumps({
                "status": "processing",
                "gpu": gpu_name,
                "attempt": job_data.get("attempts", 0),
            }))
        except Exception:
            pass
    
    # Forward request to GPU
    payload = json.dumps(job_data["payload"])
    try:
        req = urllib.request.Request(
            f"http://{host}:{port}/v1/chat/completions",
            data=payload.encode(),
            headers={"Content-Type": "application/json"}
        )
        
        resp = urllib.request.urlopen(req, timeout=CONSUMER_WORK_TIMEOUT)
        response_data = json.loads(resp.read().decode())
        
        # Store result
        if r:
            r.hset(JOB_RESULT_KEY, job_id, json.dumps({
                "status": "completed",
                "response": response_data,
                "completed_at": time.time(),
            }))
            r.hset(JOB_STATUS_KEY, job_id, json.dumps({
                "status": "completed",
                "gpu": gpu_name,
                "completed_at": time.time(),
            }))
        
        record_gpu_success(gpu_name)
        return True
        
    except Exception as e:
        record_gpu_failure(gpu_name)
        
        # Store failure
        if r:
            try:
                r.hset(JOB_STATUS_KEY, job_id, json.dumps({
                    "status": "failed",
                    "gpu": gpu_name,
                    "error": str(e)[:500],
                    "attempt": job_data.get("attempts", 0),
                }))
            except Exception:
                pass
        
        return False


def consumer_loop():
    """Main consumer loop  reads from streams and dispatches to GPUs.
    
    Runs as a background thread. Checks streams in priority order:
    high  normal  low
    """
    print("[consumer] Started", file=sys.stderr)
    
    while not shutdown_event.is_set():
        try:
            r = get_redis()
            if r is None:
                time.sleep(2)
                continue
            
            # Check each stream in priority order
            for stream_key, priority_name in [
                (STREAM_KEY_HIGH, "high"),
                (STREAM_KEY_NORMAL, "normal"),
                (STREAM_KEY_LOW, "low"),
            ]:
                if shutdown_event.is_set():
                    break
                
                # Read one message per group (non-blocking)
                messages = r.xreadgroup(
                    CONSUMER_GROUP,
                    CONSUMER_NAME,
                    {stream_key: ">"},
                    count=1,
                    block=int(CONSUMER_POLL_INTERVAL * 1000)
                )
                
                if not messages:
                    continue
                
                for stream_name, msg_list in messages:
                    for message_id, fields in msg_list:
                        try:
                            job_data = json.loads(fields["job"])
                            
                            # Get latest GPU health
                            gpu_health = check_all_gpus_parallel()
                            
                            # Select best GPU
                            gpu_name = select_gpu_for_job(job_data, gpu_health)
                            
                            if gpu_name is None:
                                # No GPUs available  requeue (don't ack yet)
                                print(f"[consumer] No GPU available for job {job_data.get('id', '?')} (priority={priority_name})", file=sys.stderr)
                                continue
                            
                            # Dispatch to GPU
                            success = consume_job(gpu_name, job_data)
                            
                            if success:
                                # Ack the message  job processed successfully
                                r.xack(stream_key, CONSUMER_GROUP, message_id)
                            else:
                                # Job failed  retry or move to DLQ
                                attempts = job_data.get("attempts", 0) + 1
                                job_data["attempts"] = attempts
                                
                                if attempts >= CONSUMER_RETRY_MAX:
                                    # Move to dead letter queue
                                    print(f"[consumer] Job {job_data.get('id', '?')} moved to DLQ after {attempts} attempts", file=sys.stderr)
                                    r.xadd(DEAD_LETTER_KEY, {"job": json.dumps(job_data)})
                                    r.xack(stream_key, CONSUMER_GROUP, message_id)
                                else:
                                    # Retry: add back to the same stream
                                    # (consumer will pick it up on next iteration)
                                    # We DON'T ack, so it stays in pending
                                    delay = CONSUMER_RETRY_DELAY[min(attempts - 1, len(CONSUMER_RETRY_DELAY) - 1)]
                                    print(f"[consumer] Retrying job {job_data.get('id', '?')} in {delay}s (attempt {attempts}/{CONSUMER_RETRY_MAX})", file=sys.stderr)
                                    time.sleep(delay)
                                    # Re-add to stream
                                    r.xadd(stream_key, {"job": json.dumps(job_data)})
                                    # Ack the original so we don't reprocess it
                                    r.xack(stream_key, CONSUMER_GROUP, message_id)
                                    
                        except Exception as e:
                            print(f"[consumer] Error processing job: {e}", file=sys.stderr)
                            continue
        
        except Exception as e:
            print(f"[consumer] Loop error: {e}", file=sys.stderr)
            time.sleep(2)
    
    print("[consumer] Stopped", file=sys.stderr)


#  GPU Health Monitor 

def gpu_health_monitor_loop():
    """Periodically refresh GPU health data in Redis.
    
    Runs as a background thread every 15 seconds.
    """
    print("[health-monitor] Started", file=sys.stderr)
    
    while not shutdown_event.is_set():
        try:
            gpus = check_all_gpus_parallel()
            # Health data is already stored per-GPU in check_gpu_health()
        except Exception as e:
            print(f"[health-monitor] Error: {e}", file=sys.stderr)
        
        shutdown_event.wait(15)  # sleep 15s, interruptible
    
    print("[health-monitor] Stopped", file=sys.stderr)


#  Graceful Shutdown 

def signal_handler(signum, frame):
    """Handle SIGTERM for graceful shutdown."""
    print(f"\n[queue] Received signal {signum}, shutting down...", file=sys.stderr)
    shutdown_event.set()


signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)


#  Main 

if __name__ == "__main__":
    # Initialize default GPU config in Redis
    r = get_redis()
    if r:
        set_gpus(DEFAULT_GPUS)
    
    # Start background threads
    consumer_thread = threading.Thread(target=consumer_loop, daemon=True)
    consumer_thread.start()
    
    health_thread = threading.Thread(target=gpu_health_monitor_loop, daemon=True)
    health_thread.start()
    
    print("[queue] Starting on :8091", file=sys.stderr)
    app.run(host="0.0.0.0", port=8091, threaded=True)  # FIX D2: threaded server

5. Quick Wins (No Consumer Needed Fix Existing Code)

These fixes can be applied to the current queue-service.py without rewriting:

QW-1: Fix Redis default host (Q3)

File: queue-service/queue-service.py:21

# Before:
REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7")
# After:
REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1")  # matches actual deployment

QW-2: Fix Dockerfile path mismatch (C2)

File: docker-compose.yml:15

# Before:
      dockerfile: Dockerfile.queue
# After:
      dockerfile: queue-service/Dockerfile

QW-3: Fix Nginx fallback to ALL-down only (N3)

File: gpu-router-docker.conf

Replace the error-page fallback with a Lua-based check that only triggers when ALL GPUs are down:

# Remove: error_page 502 503 504 = @queue_fallback;

# Add Lua health check (requires lua-nginx-module):
location / {
    # ... existing config ...
    
    # Only fallback to queue if ALL GPUs are down
    set $fallback 0;
    access_by_lua_block {
        local redis = require "resty.redis"
        local red = redis:new()
        red:set_timeout(1000)
        red:connect("redis", 6379)
        
        local gpus = {"amdpve", "llmgpu", "ocu_llm"}
        local all_down = true
        for _, g in ipairs(gpus) do
            local status = red:hget("config:gpu-health:" .. g, "status")
            if status == "up" then
                all_down = false
                break
            end
        end
        red:set_keepalive(10000, 100)
        
        if all_down then
            ngx.var.fallback = 1
        end
    }
    
    if $fallback = 1 then
        rewrite ^ /enqueue break;
        proxy_pass http://queue_service;
    }
}

Alternative (no Lua): Replace the error_page directive with a custom Nginx health check upstream that only returns 200 when at least one GPU is healthy:

# Add a dedicated health check upstream
upstream any_gpu_healthy {
    server 192.168.68.15:8080;
    server 192.168.68.8:8080;
    server 192.168.68.110:8080;
    # Nginx upstream block will try each in order
    # If ALL fail, then fall through to queue_fallback
}

# In location /:
# Change proxy_next_upstream_tries from 2 to 4 (3 GPUs + 1 fallback)
proxy_next_upstream_tries 4;

QW-4: Fix Nginx proxy_pass_header (N4)

File: gpu-router-docker.conf:90

# Remove this line  it's for response headers, not request headers.
# The X-Syslog-Model header is already passed via proxy_set_header inheritance.
# proxy_pass_header X-Syslog-Model;

QW-5: Fix hardcoded container names (N5)

File: gpu-router-docker.conf:27,32

# Before:
    server syslog-harness-dashboard-1:3001;
    server syslog-harness-gpu-dashboard-1:8092;

# After (use Docker service names):
    server dashboard:3001;
    server gpu-dashboard:8092;

QW-6: Fix rate limit burst (N1)

File: gpu-router-docker.conf:79

# Before:
    limit_req zone=perip burst=20 nodelay;

# After  burst requests are delayed, not served immediately:
    limit_req zone=perip burst=10 nodelay;

QW-7: Preserve Content-Type header (Q8)

File: queue-service/queue-service.py:83

# Before:
    headers = {k: v for k, v in request.headers if k.startswith("X-")}

# After:
    headers = dict(request.headers)  # preserve ALL headers including Content-Type

QW-8: Fix Docker restart policy (C3)

File: docker-compose.yml:6,16,31,43

# Before:
    restart: always

# After:
    restart: unless-stopped

QW-9: Add Redis health check (C4)

File: docker-compose.yml add to redis service:

    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 3

QW-10: Pin dependency versions (I1)

File: queue-service/Dockerfile

# Before:
RUN pip install --no-cache-dir flask redis

# After:
RUN pip install --no-cache-dir flask==3.1.0 redis==5.2.1 gunicorn==23.0.0

File: Dockerfile.dashboard

# Before:
FROM python:3.11-slim

# After:
FROM python:3.13-slim  # match queue-service Python version

File: Dockerfile.gpu

# Before:
RUN pip install requests

# After:
RUN pip install --no-cache-dir requests==2.32.3  # or remove entirely  only urllib needed

QW-11: Add .dockerignore

File: .dockerignore

.git
.gitignore
*.md
*.pyc
__pycache__
*.log
.env

QW-12: Fix GPU dashboard multi-process CMD (I3)

File: Dockerfile.gpu:14

# Before:
CMD ["sh", "-c", "python3 gpu_collector.py & python3 -m http.server 8092 --directory /app/public & wait"]

# After  use a proper process manager or supervisor:
CMD ["sh", "-c", "exec supervisord -c /app/supervisord.conf"]

With /app/supervisord.conf:

[supervisord]
nodaemon=true

[program:collector]
command=python3 gpu_collector.py
autostart=true
autorestart=true

[program:http]
command=python3 -m http.server 8092 --directory /app/public
autostart=true
autorestart=true

QW-13: Centralize GPU config (R9)

File: queue-service/queue-service.py + harness-dashboard.py + gpu_collector.py

Move GPU endpoints to a single source of truth. Option A: environment file mounted to all containers. Option B: Redis config:gpus hash.

The rewrite above implements Option B (Redis-based). For the quick-win path, use Option A:

File: config/gpu-endpoints.json

{
    "amdpve": {"host": "192.168.68.15", "port": 8080, "sidecar_port": 8090},
    "llmgpu": {"host": "192.168.68.8", "port": 8080, "sidecar_port": 8090},
    "ocu_llm": {"host": "192.168.68.110", "port": 8080, "sidecar_port": 8090}
}

Mount to all containers:

volumes:
  - ./config:/app/config:ro

Then each service reads from /app/config/gpu-endpoints.json instead of hardcoding.


6. Docker Compose Updates

version: "3.8"

services:
  redis:
    image: redis:7-alpine
    restart: unless-stopped
    networks:
      - gpu-router-net
    volumes:
      - redis-data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 3

  queue-service:
    build:
      context: .
      dockerfile: queue-service/Dockerfile  # FIX C2
    restart: unless-stopped
    networks:
      - gpu-router-net
    expose:
      - "8091"  # FIX C1: remove external port exposure
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8091/health"]
      interval: 15s
      timeout: 5s
      retries: 3

  dashboard:
    build:
      context: .
      dockerfile: dashboard/Dockerfile.dashboard  # FIX I5: use subdir version
    restart: unless-stopped
    networks:
      - gpu-router-net
    ports:
      - "3001:3001"
    depends_on:
      queue-service:
        condition: service_healthy
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3001/"]
      interval: 15s
      timeout: 5s
      retries: 3

  gpu-dashboard:
    build:
      context: .
      dockerfile: Dockerfile.gpu
    restart: unless-stopped
    networks:
      - gpu-router-net
    ports:
      - "8092:8092"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8092/gpu.html"]
      interval: 15s
      timeout: 5s
      retries: 3

networks:
  gpu-router-net:
    driver: bridge

volumes:
  redis-data:

7. Nginx Config Updates

# gpu-router-docker.conf  Updated for Docker service names and proper fallback

upstream amdpve_pool {
    server 192.168.68.15:8080;
}

upstream llmgpu_pool {
    server 192.168.68.8:8080;
}

upstream ocu_llm_pool {
    server 192.168.68.110:8080;
}

upstream queue_service {
    server queue-service:8091;
}

upstream dashboard_service {
    server dashboard:3001;           # FIX N5: Docker service name
}

upstream gpu_dashboard_pool {
    server gpu-dashboard:8092;       # FIX N5: Docker service name
}

map $http_x_syslog_model $gpu_upstream {
    default          amdpve_pool;
    "standard"       amdpve_pool;
    "heavy"          llmgpu_pool;
    "qwen3.5-27B"    llmgpu_pool;
    "light"          ocu_llm_pool;
    "gemma-4"        ocu_llm_pool;
}

limit_req_zone $binary_remote_addr zone=perip:10m rate=10r/s;

server {
    listen 80;
    server_name _;

    location /dashboard {
        proxy_pass http://dashboard_service/;
        proxy_set_header Host              $host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
    }

    location /gpu {
        proxy_pass http://gpu_dashboard_pool/;
        proxy_set_header Host              $host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location / {
        limit_req zone=perip burst=10 nodelay;  # FIX N1: reduced burst
        limit_req_status 503;
        proxy_pass http://$gpu_upstream;

        proxy_set_header Host              $host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # FIX N4: removed proxy_pass_header X-Syslog-Model

        proxy_buffering off;
        proxy_cache     off;
        proxy_read_timeout  300s;
        proxy_send_timeout  300s;

        # FIX N2: increased tries for proper failover across all GPUs
        proxy_next_upstream error timeout http_502 http_503 http_504;
        proxy_next_upstream_tries 4;  # 3 GPUs + queue fallback

        add_header X-Routed-To $gpu_upstream always;

        # FIX N3: removed error_page fallback  handled by queue consumer or Lua
    }

    location @queue_fallback {
        rewrite ^ /enqueue break;
        proxy_pass http://queue_service;
        proxy_set_header Host              $host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header Content-Type      $content_type;
        proxy_pass_request_body            on;
    }
}

8. Implementation Phases

Phase 1: Quick Wins (Day 1)

Apply fixes QW-1 through QW-13. These require no new code, just config and code corrections.

Phase 2: Smart Queue Consumer (Day 2-3)

Replace queue-service.py with the full rewrite. This adds:

  • Redis Streams backend
  • Consumer loop with GPU load balancing
  • Priority queues
  • Dead letter queue
  • Job tracking API
  • Per-GPU circuit breakers

Phase 3: Nginx Fallback Fix (Day 3)

Implement the ALL-down-only fallback logic (QW-3). If Lua module is available, use the Lua approach. Otherwise, use the proxy_next_upstream_tries approach.

Phase 4: Deploy & Monitor

docker compose down
docker compose build --no-cache
docker compose up -d

Verify:

  1. curl http://localhost:8091/health 200
  2. curl -X POST http://localhost:8091/enqueue -H "Content-Type: application/json" -d '{"messages":[{"role":"user","content":"hello"}],"model":"qwen3.6-35B-A3B","priority":"normal"}' 202 with job_id
  3. curl http://localhost:8091/status/<job_id> tracks lifecycle
  4. Dashboard at :3001 shows live GPU health with parallel checks

9. Risks & Mitigations

Risk Impact Mitigation
Redis Streams XREADGROUP blocks Consumer stalls if stream empty block parameter set to 1000ms non-blocking poll
Consumer crashes mid-job Job stuck in "processing" state Health monitor resets stale job statuses every 60s
GPU returns 401 on /slots Load info unavailable for llmgpu/ocu_llm Fall back to sidecar gpu_util_pct for routing
Single consumer bottleneck Queue buildup during high traffic Add 2+ consumer containers (same consumer group)
Redis single point of failure Entire queue down if Redis dies Phase 2+: Redis Sentinel or AOF persistence