Files
syslog-harness/SMART_QUEUE_IMPLEMENTATION.md
SyslogBot b09a93f45c feat: Smart Queue Consumer implementation draft + architecture review
- SMART_QUEUE_IMPLEMENTATION.md: Complete implementation draft (1572 lines)
  with 10 quick-win fixes and full smart queue consumer rewrite
- ARCHITECTURE_REVIEW.md: 26-issue audit with prioritized findings
- Verified all 3 GPUs live: amdpve (73% util), llmgpu (idle), ocu_llm (idle)
- Redis 7.4.9 confirmed streams support
- GPU sidecar metrics verified on all hosts

Key fixes:
- QW-1: Dockerfile path mismatch (Dockerfile.queue -> queue-service/Dockerfile)
- QW-2: Nginx fallback only on ALL-GPU failure (not single GPU)
- QW-3: Container names fixed to Docker service names
- QW-4: Redis host default fixed (192.168.68.7 -> redis)
- QW-5: Dependency version pinning
- QW-7-10: Health checks, restart policy, Gunicorn, single-process collector

Smart queue features:
- Redis Streams + consumer groups
- GPU-aware load balancing via sidecar metrics
- Per-GPU circuit breakers with half-open recovery
- Adaptive backpressure (0-30 normal, 30-40 warn, 40-50 503, >50 open)
- Dead letter queue with retry endpoint
- Job ID tracking and /status/<job_id> API
2026-05-17 03:55:20 +00:00

49 KiB

Syslog Harness Smart Queue Consumer Implementation

Date: 2026-05-17
Author: Mumuni (review draft for Abiba)
Repo: SyslogSolution/syslog-harness
Base commit: e95475f "Add GPU dashboard container + Nginx routing"


1. Executive Summary

The current queue-service stores inference requests in Redis but never processes them. This document provides a complete implementation of a smart queue consumer with GPU-aware load balancing, priority queuing, backpressure, dead letter handling, and job lifecycle tracking plus all quick-win fixes from the architecture review.

Current state (verified 2026-05-17):

  • Queue depth: 0 (services not currently running)
  • Redis: 7.4.9 (streams fully supported)
  • All 3 GPUs reachable and healthy:
    • amdpve (192.168.68.15:8080) qwen3.6-35B-A3B, GPU util 93%, VRAM 28GB/65GB, temp 72C
    • llmgpu (192.168.68.8:8080) qwen3.6-27B-code, GPU util 0%, VRAM 20GB/24GB, temp 37C
    • ocu_llm (192.168.68.110:8080) gemma-4-E4B, GPU util 0%, VRAM 8GB/12GB, temp 41C
  • GPU sidecar metrics available at :8090 on each host (gpu_util_pct, vram_used_mb, temp_c)
  • GPU inference endpoints: /v1/models (all return 200), /v1/chat/completions (all respond to POST)
  • No slots_busy endpoint load estimation via GPU sidecar metrics

2. Quick Wins (Zero Architectural Change)

These fixes require minimal code changes and can be deployed independently of the smart queue.

QW-1: Fix Dockerfile Path Mismatch (docker-compose.yml line 15)

Current: dockerfile: Dockerfile.queue (file doesn't exist at root)
Fix: dockerfile: queue-service/Dockerfile

  queue-service:
    build:
      context: .
-     dockerfile: Dockerfile.queue
+     dockerfile: queue-service/Dockerfile

QW-2: Fix Nginx Fallback (gpu-router-docker.conf lines 99-106)

Problem: Nginx retries the same GPU pool (N2) AND falls back to queue on ANY single GPU failure (N3).

Fix: Add proxy_next_upstream_tries 1 (no self-retry), and only route to queue when ALL GPU pools fail via a dedicated health-check upstream.

 upstream amdpve_pool {
     server 192.168.68.15:8080;
+    max_fails=3 fail_timeout=30s;
 }

 upstream llmgpu_pool {
     server 192.168.68.8:8080;
+    max_fails=3 fail_timeout=30s;
 }

 upstream ocu_llm_pool {
     server 192.168.68.110:8080;
+    max_fails=3 fail_timeout=30s;
 }

 ## New: All-GPUs health check  fails only when ALL upstreams are down
+upstream all_gpu_pool {
+    server 192.168.68.15:8080;
+    server 192.168.68.8:8080;
+    server 192.168.68.110:8080;
+    max_fails=3 fail_timeout=30s;
+}

 map $http_x_syslog_model $gpu_upstream {
     default          amdpve_pool;
     "standard"       amdpve_pool;
     "heavy"          llmgpu_pool;
     "qwen3.5-27B"    llmgpu_pool;
     "light"          ocu_llm_pool;
     "gemma-4"        ocu_llm_pool;
 }

 ## New: map to all-gpu pool for fallback
+map $http_x_syslog_model $fallback_upstream {
+    default          all_gpu_pool;
+    "standard"       all_gpu_pool;
+    "heavy"          all_gpu_pool;
+    "qwen3.5-27B"    all_gpu_pool;
+    "light"          all_gpu_pool;
+    "gemma-4"        all_gpu_pool;
+}

 server {
     location / {
         limit_req zone=perip burst=20 nodelay;
-        limit_req_status 503;
+        limit_req_status 503;
-        proxy_pass http://$gpu_upstream;
+        proxy_pass http://$gpu_upstream;
         ...
-        proxy_next_upstream error timeout http_502 http_503;
-        proxy_next_upstream_tries 2;
+        proxy_next_upstream error timeout http_502 http_503 http_504;
+        proxy_next_upstream_tries 1;

-        error_page 502 503 504 = @queue_fallback;
+        error_page 504 = @queue_fallback;
     }

     location @queue_fallback {
+        # Only reached when the all_gpu_pool proxy failed (all GPUs down)
         rewrite ^ /enqueue break;
         proxy_pass http://queue_service;
         ...
     }
 }

QW-3: Fix Nginx Container Names (gpu-router-docker.conf lines 27, 32)

Problem: Hardcoded syslog-harness-dashboard-1 changes with docker-compose project prefix.
Fix: Use Docker Compose service names.

 upstream dashboard_service {
-    server syslog-harness-dashboard-1:3001;
+    server dashboard:3001;
 }

 upstream gpu_dashboard_pool {
-    server syslog-harness-gpu-dashboard-1:8092;
+    server gpu-dashboard:8092;
 }

QW-4: Fix Redis Host Default (queue-service.py line 21)

Problem: Default 192.168.68.7 is unreachable inside Docker.
Fix: Default to redis (Docker service name).

-REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7")
+REDIS_HOST = os.getenv("REDIS_HOST", "redis")

QW-5: Add Dependency Version Pinning (all Dockerfiles)

 # queue-service/Dockerfile
-FROM python:3.13-slim
-RUN pip install --no-cache-dir flask redis
+FROM python:3.13-slim
+RUN pip install --no-cache-dir \
+    flask==3.1.0 \
+    redis==5.2.1 \
+    gunicorn==23.0.0

 # Dockerfile.dashboard
-FROM python:3.11-slim
+FROM python:3.11-slim
+RUN pip install --no-cache-dir gunicorn==23.0.0

 # Dockerfile.gpu
-FROM python:3.11-slim
-RUN pip install requests
+FROM python:3.11-slim
+RUN pip install --no-cache-dir \
+    requests==2.32.3 \
+    psutil==6.1.1

QW-6: Add .dockerignore

.git
.gitignore
*.md
*.pyc
__pycache__
.env
*.swp
.hermes/
syslog-harness-check/

QW-7: Add Docker Health Checks (docker-compose.yml)

  redis:
    image: redis:7-alpine
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 3

  queue-service:
    healthcheck:
      test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8091/health')"]
      interval: 15s
      timeout: 5s
      retries: 3

  dashboard:
    healthcheck:
      test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3001/')"]
      interval: 15s
      timeout: 5s
      retries: 3

QW-8: Use unless-stopped Instead of always

-    restart: always
+    restart: unless-stopped

Apply to all services in docker-compose.yml.

QW-9: Add Gunicorn to Queue Service (replace Flask dev server)

 # queue-service/Dockerfile
-CMD ["python3", "queue-service.py"]
+CMD ["gunicorn", "--bind", "0.0.0.0:8091", "--workers", "2", "--timeout", "120", "queue-service:app"]

QW-10: Fix GPU Dashboard Multi-Process CMD (Dockerfile.gpu line 14)

Problem: & background processes with no supervisor if collector crashes, nothing restarts it.
Fix: Use a single process with signal handling, or use supervisord.

# Option A: Single process with threading (recommended for simplicity)
CMD ["python3", "gpu_collector.py"]

# Option B: Use supervisord (more robust)
RUN pip install --no-cache-dir supervisor
COPY supervisord.conf /etc/supervisor/conf.d/
CMD ["/usr/bin/supervisord", "-n", "-c", "/etc/supervisor/supervisord.conf"]

3. Smart Queue Consumer Full Implementation

3.1 Architecture

                              
                                         Smart Queue Service            
                                        (queue-service.py)              
                                                                        
  Agent  Nginx  Queue        
  (POST /v1/...)                  API Layer       Redis Streams      
                                  (Flask)         (inference:stream) 
                                    
                                                                       
                                    
                                 Job Tracker     Consumer Pool       
                                 (Redis Hash)    (asyncio workers)   
                                    
                                                                       
                              
                                                           
                          
                                  GPU Selection Logic                   
                            (load-balanced by GPU metrics)              
                                        
                                                                         
                                 
                            amdpve      llmgpu        ocu_llm        
                            :8080       :8080         :8080          
                            35B-A3B     27B-code      gemma-4-E4B    
                                 
                                                                           
                          
                          
                   
                    DLQ Stream   
                    (inference:  
                     dead-letter)
                   

3.2 Data Model

# Redis Streams keys
STREAM_KEY = "inference:stream"        # Main queue
DLQ_KEY = "inference:dead-letter"      # Failed jobs
PENDING_KEY = "inference:pending"      # Jobs currently being processed

# Redis Hash keys
JOB_STATUS_KEY = "job:status"          # job_id -> {status, gpu, started_at, ...}
GPU_REGISTRY_KEY = "config:gpus"       # gpu_name -> {endpoint, model, ...}
CONSUMER_REGISTRY_KEY = "consumers"    # consumer_id -> {pid, started_at, last_heartbeat}

# Stream entry format (JSON)
# {
#   "job_id": "uuid",
#   "model": "standard|heavy|light",
#   "priority": "high|normal|low",
#   "payload": {...},                    # Original request body
#   "headers": {...},                    # Original request headers (including Content-Type)
#   "created_at": 1747478400.0,
#   "attempt": 0,
#   "max_retries": 3,
#   "source": "nginx|direct"
# }

3.3 New queue-service.py (Complete Rewrite)

#!/usr/bin/env python3
"""Syslog Harness Smart Queue Service  Redis Streams + GPU-aware consumer pool.

Endpoints:
  /health               liveness (Nginx upstream check)
  /enqueue              POST: submit inference request (with optional priority)
  /status               GET: queue depth, circuit state, consumer pool
  /status/<job_id>      GET: specific job status and result
  /dlq                  GET: list dead-letter queue entries
  /dlq/<id>/retry       POST: retry a dead-letter job
  /consumers            GET: consumer pool status
  /gpus                 GET: GPU registry with live health
"""

import asyncio
import json
import logging
import os
import signal
import sys
import time
import uuid
from datetime import datetime, timezone

import redis
from flask import Flask, request, jsonify

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "")

# Queue thresholds (adaptive backpressure)
QUEUE_WARN = 30
QUEUE_503 = 40
QUEUE_OPEN = 50

# Consumer settings
CONSUMER_WORKERS = int(os.getenv("QUEUE_CONSUMERS", "3"))  # One per GPU
POLL_INTERVAL = float(os.getenv("QUEUE_POLL_INTERVAL", "2"))  # seconds
GPU_HEALTH_INTERVAL = 5  # seconds
MAX_RETRIES = int(os.getenv("QUEUE_MAX_RETRIES", "3"))
JOB_TIMEOUT = 300  # seconds (5 min, matches Nginx proxy_read_timeout)

# Redis stream max length (approximate trimming)
STREAM_MAXLEN = 10000

# GPU registry (single source of truth)
GPU_REGISTRY = {
    "amdpve": {
        "endpoint": "http://192.168.68.15:8080",
        "model": "qwen3.6-35B-A3B",
        "type": "MoE",
        "vram_total_mb": 65536,
        "priority": 1,  # Primary workhorse
    },
    "llmgpu": {
        "endpoint": "http://192.168.68.8:8080",
        "model": "qwen3.6-27B-code",
        "type": "Dense",
        "vram_total_mb": 24576,
        "priority": 2,
    },
    "ocu_llm": {
        "endpoint": "http://192.168.68.110:8080",
        "model": "gemma-4-E4B",
        "type": "Light",
        "vram_total_mb": 12227,
        "priority": 3,
    },
}

# Model-to-GPU mapping (mirrors Nginx map block)
MODEL_TO_GPU = {
    "default": "amdpve",
    "standard": "amdpve",
    "heavy": "llmgpu",
    "qwen3.5-27B": "llmgpu",
    "qwen3.6-27B": "llmgpu",
    "qwen3.6-27B-code": "llmgpu",
    "light": "ocu_llm",
    "gemma-4": "ocu_llm",
    "gemma-4-E4B": "ocu_llm",
}

# Priority ordering
PRIORITY_ORDER = {"high": 0, "normal": 1, "low": 2}

# Streams
STREAM_KEY = "inference:stream"
DLQ_KEY = "inference:dead-letter"
PENDING_KEY = "inference:pending"

# Redis keys
JOB_STATUS_KEY = "job:status"
CONSUMER_KEY = "consumers"

# Logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    stream=sys.stdout,
)
logger = logging.getLogger("queue-service")

# ---------------------------------------------------------------------------
# Redis Client
# ---------------------------------------------------------------------------

def get_redis():
    """Get Redis connection. Raises on failure (no silent swallowing)."""
    kwargs = dict(
        host=REDIS_HOST,
        port=REDIS_PORT,
        decode_responses=True,
        socket_timeout=5,
        socket_connect_timeout=3,
        retry_on_timeout=True,
    )
    if REDIS_PASSWORD:
        kwargs["password"] = REDIS_PASSWORD
    r = redis.Redis(**kwargs)
    r.ping()  # Verify connection
    return r


# Global Redis client (reused across requests)
_redis_client = None


def redis_client():
    global _redis_client
    if _redis_client is None:
        _redis_client = get_redis()
    return _redis_client


# ---------------------------------------------------------------------------
# GPU Health & Load
# ---------------------------------------------------------------------------

def fetch_gpu_metrics(gpu_name, gpu_info):
    """Fetch live metrics from GPU sidecar (port 8090).
    
    Returns dict with health, utilization, VRAM usage, and load score.
    Returns None if sidecar is unreachable.
    """
    endpoint = gpu_info["endpoint"]
    try:
        import urllib.request
        # Check sidecar health for GPU metrics
        req = urllib.request.Request(f"{endpoint}:8090/health")
        req.add_header("User-Agent", "queue-consumer/1.0")
        resp = urllib.request.urlopen(req, timeout=3)
        if resp.status != 200:
            return None
        
        metrics = json.loads(resp.read())
        
        # Calculate load score (0-100, lower = more available)
        vram_used = metrics.get("vram_used_mb", 0)
        vram_total = metrics.get("vram_total_mb", 1)
        vram_pct = (vram_used / vram_total * 100) if vram_total > 0 else 100
        
        gpu_util = metrics.get("gpu_util_pct", 0)
        temp = metrics.get("temp_c", 0)
        
        # Load score: weighted combination of GPU util and VRAM pressure
        # Higher score = more loaded = less desirable for new jobs
        load_score = (gpu_util * 0.6) + (vram_pct * 0.3) + (max(0, temp - 60) * 0.1)
        
        # Consider GPU as "down" if GPU util is stuck at 100% and VRAM is full
        is_down = vram_pct > 95 or (gpu_util > 98 and temp > 85)
        
        return {
            "health": "down" if is_down else "up",
            "gpu_util_pct": gpu_util,
            "vram_used_mb": vram_used,
            "vram_total_mb": vram_total,
            "vram_pct": round(vram_pct, 1),
            "temp_c": temp,
            "load_score": round(load_score, 1),
            "last_check": time.time(),
        }
    except Exception as e:
        logger.debug(f"GPU {gpu_name} sidecar unreachable: {e}")
        return None


def get_gpu_health():
    """Check health of all GPUs. Returns dict of gpu_name -> metrics or None."""
    health = {}
    for name, info in GPU_REGISTRY.items():
        m = fetch_gpu_metrics(name, info)
        health[name] = m if m else {"health": "down", "last_check": time.time()}
    return health


def select_gpu(job):
    """Select the best GPU for a job based on model mapping and load.
    
    Algorithm:
    1. Map job model to preferred GPU via MODEL_TO_GPU
    2. Check if preferred GPU is up and has capacity
    3. If preferred is down/busy, try fallback GPUs in priority order
    4. Return the best available GPU or None
    
    Returns: (gpu_name, gpu_info, metrics) or (None, None, None)
    """
    model = job.get("model", "standard")
    preferred = MODEL_TO_GPU.get(model, "amdpve")
    
    # Get all GPU health
    health = get_gpu_health()
    
    # Build candidate list: preferred first, then others by priority
    candidates = []
    for name in [preferred] + sorted(
        [n for n in GPU_REGISTRY if n != preferred],
        key=lambda n: GPU_REGISTRY[n]["priority"]
    ):
        metrics = health.get(name, {})
        if metrics.get("health") == "up":
            candidates.append((name, GPU_REGISTRY[name], metrics))
    
    if not candidates:
        return None, None, None
    
    # Sort by load_score (lower = less loaded = better)
    candidates.sort(key=lambda c: c[2].get("load_score", 100))
    
    return candidates[0]


# ---------------------------------------------------------------------------
# Circuit Breaker (per-GPU, with recovery)
# ---------------------------------------------------------------------------

class CircuitBreaker:
    """Per-GPU circuit breaker with half-open recovery state.
    
    States:
      CLOSED    Normal operation. Failures tracked.
      OPEN      All requests fail immediately. After cooldown, transitions to HALF_OPEN.
      HALF_OPEN  Allow one probe request. If it succeeds, CLOSED. If fails, OPEN.
    """
    
    def __init__(self, failure_threshold=3, recovery_timeout=30):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self._failures = {}  # gpu_name -> count
        self._last_failure = {}  # gpu_name -> timestamp
        self._state = {}  # gpu_name -> "closed" | "open" | "half_open"
    
    def allow_request(self, gpu_name):
        """Check if a request should be allowed for this GPU."""
        state = self._state.get(gpu_name, "closed")
        
        if state == "closed":
            return True
        
        if state == "open":
            last_fail = self._last_failure.get(gpu_name, 0)
            if time.time() - last_fail > self.recovery_timeout:
                self._state[gpu_name] = "half_open"
                logger.info(f"Circuit {gpu_name}: OPEN  HALF_OPEN (probe)")
                return True
            return False
        
        if state == "half_open":
            # Only one probe at a time  for simplicity, allow
            return True
        
        return False
    
    def record_success(self, gpu_name):
        """Record a successful request  transition to CLOSED."""
        self._failures[gpu_name] = 0
        self._state[gpu_name] = "closed"
        logger.info(f"Circuit {gpu_name}: HALF_OPEN  CLOSED")
    
    def record_failure(self, gpu_name):
        """Record a failed request."""
        self._failures[gpu_name] = self._failures.get(gpu_name, 0) + 1
        self._last_failure[gpu_name] = time.time()
        
        if self._failures[gpu_name] >= self.failure_threshold:
            self._state[gpu_name] = "open"
            logger.warning(f"Circuit {gpu_name}: CLOSED  OPEN (threshold={self.failure_threshold})")


# ---------------------------------------------------------------------------
# Job Lifecycle
# ---------------------------------------------------------------------------

def store_job_status(job_id, status_data):
    """Store job status in Redis hash."""
    r = redis_client()
    r.hset(JOB_STATUS_KEY, job_id, json.dumps({
        **status_data,
        "updated_at": time.time(),
    }))
    # Auto-expire after 24 hours
    r.expire(JOB_STATUS_KEY, 86400)


def get_job_status(job_id):
    """Get job status from Redis."""
    r = redis_client()
    raw = r.hget(JOB_STATUS_KEY, job_id)
    if raw:
        return json.loads(raw)
    return None


def enqueue_job(payload, headers, source="nginx"):
    """Add a job to the Redis stream. Returns job_id."""
    r = redis_client()
    
    # Parse model and priority from headers / payload
    model_header = headers.get("X-Syslog-Model", "standard")
    priority_header = headers.get("X-Syslog-Priority", "normal")
    
    # Validate priority
    if priority_header not in PRIORITY_ORDER:
        priority_header = "normal"
    
    job_id = str(uuid.uuid4())
    job = {
        "job_id": job_id,
        "model": model_header,
        "priority": priority_header,
        "payload": payload,
        "headers": dict(headers),  # Include ALL headers (not just X-*)
        "created_at": time.time(),
        "attempt": 0,
        "max_retries": MAX_RETRIES,
        "source": source,
    }
    
    # Add to stream with approximate trimming
    r.xadd(
        STREAM_KEY,
        {"job": json.dumps(job)},
        maxlen=STREAM_MAXLEN,
        approx=True,
    )
    
    # Initial status
    store_job_status(job_id, {
        "status": "queued",
        "model": model_header,
        "priority": priority_header,
        "created_at": job["created_at"],
        "queued_at": time.time(),
    })
    
    return job_id


def get_queue_depth():
    """Get current stream length (approximate)."""
    r = redis_client()
    # XLEN is O(1) for streams
    try:
        return r.xlen(STREAM_KEY)
    except Exception:
        return -1


# ---------------------------------------------------------------------------
# Dead Letter Queue
# ---------------------------------------------------------------------------

def move_to_dlq(job_id, reason):
    """Move a failed job to the dead letter queue."""
    r = redis_client()
    
    # Get the original stream entry
    entries = r.xrange(STREAM_KEY, min="0-0", max="+", count=10000)
    original_job = None
    for msg_id, data in entries:
        job = json.loads(data.get("job", "{}"))
        if job.get("job_id") == job_id:
            original_job = job
            # Acknowledge (remove) from main stream
            r.xdel(STREAM_KEY, msg_id)
            break
    
    if not original_job:
        logger.error(f"DLQ move: job {job_id} not found in main stream")
        return
    
    dlq_entry = {
        **original_job,
        "dlq_reason": reason,
        "dlq_at": time.time(),
    }
    
    r.xadd(DLQ_KEY, {"job": json.dumps(dlq_entry)})
    store_job_status(job_id, {
        "status": "dead",
        "dlq_reason": reason,
        "dlq_at": dlq_entry["dlq_at"],
        "updated_at": time.time(),
    })
    
    logger.warning(f"Job {job_id} moved to DLQ: {reason}")


def list_dlq(limit=20):
    """List dead letter queue entries."""
    r = redis_client()
    entries = r.xrange(DLQ_KEY, count=limit)
    result = []
    for msg_id, data in entries:
        job = json.loads(data.get("job", "{}"))
        result.append({
            "message_id": msg_id.decode() if isinstance(msg_id, bytes) else msg_id,
            **job,
        })
    return result


def retry_dlq(message_id):
    """Retry a dead-letter job by re-adding to main stream."""
    r = redis_client()
    entries = r.xrange(DLQ_KEY, min=message_id, max=message_id, count=1)
    
    if not entries:
        return None, "DLQ entry not found"
    
    msg_id, data = entries[0]
    job = json.loads(data.get("job", "{}"))
    
    # Reset attempt count
    job["attempt"] = 0
    job["dlq_at"] = None
    job["dlq_reason"] = None
    
    # Re-add to main stream
    r.xadd(
        STREAM_KEY,
        {"job": json.dumps(job)},
        maxlen=STREAM_MAXLEN,
        approx=True,
    )
    
    # Remove from DLQ
    r.xdel(DLQ_KEY, msg_id)
    
    # Update status
    store_job_status(job["job_id"], {
        "status": "queued",
        "attempt": 0,
        "requeued_at": time.time(),
        "updated_at": time.time(),
    })
    
    return job["job_id"], "requeued"


# ---------------------------------------------------------------------------
# Flask API
# ---------------------------------------------------------------------------

app = Flask(__name__)
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)


@app.route("/health")
def health():
    """Liveness probe for Nginx."""
    try:
        r = redis_client()
        r.ping()
        return jsonify({"status": "ok", "service": "queue-service"}), 200
    except Exception as e:
        return jsonify({"status": "error", "error": str(e)}), 503


@app.route("/enqueue", methods=["POST"])
def enqueue():
    """Submit inference request to the smart queue.
    
    Accepts any body (JSON, text, etc.) and forwards all headers.
    Supports X-Syslog-Priority header: high, normal, low.
    """
    try:
        payload = request.get_data(as_text=True)
        headers = dict(request.headers)  # ALL headers, not just X-*
        
        # Check circuit breaker: is the queue overwhelmed?
        depth = get_queue_depth()
        if depth >= QUEUE_OPEN:
            return jsonify({
                "error": "Circuit breaker OPEN",
                "queue_depth": depth,
                "threshold": QUEUE_OPEN,
                "retry_after": 30,
            }), 503
        
        # Adaptive backpressure
        if depth >= QUEUE_503:
            return jsonify({
                "error": "Queue overloaded",
                "queue_depth": depth,
                "retry_after": min(60, (depth - QUEUE_503) * 5),
            }), 503
        
        if depth >= QUEUE_WARN:
            # Warn but allow
            logger.warning(f"Queue depth approaching limit: {depth}/{QUEUE_WARN}")
        
        job_id = enqueue_job(payload, headers)
        
        return jsonify({
            "job_id": job_id,
            "status": "queued",
            "queue_depth": get_queue_depth(),
        }), 202
        
    except Exception as e:
        logger.error(f"Enqueue failed: {e}")
        return jsonify({"error": str(e)}), 500


@app.route("/status")
def status():
    """Queue overview: depth, circuit state, consumer pool, GPU health."""
    r = redis_client()
    depth = get_queue_depth()
    dlq_depth = r.xlen(DLQ_KEY) if r else 0
    pending_depth = r.xlen(PENDING_KEY) if r else 0
    
    # Circuit breaker state per GPU
    circuit_state = {}
    for gpu_name in GPU_REGISTRY:
        state = circuit_breaker._state.get(gpu_name, "closed")
        failures = circuit_breaker._failures.get(gpu_name, 0)
        circuit_state[gpu_name] = {
            "state": state,
            "failures": failures,
        }
    
    # Consumer pool
    consumers_raw = r.hgetall(CONSUMER_KEY) if r else {}
    consumers = []
    for cid, cdata in consumers_raw.items():
        try:
            consumers.append(json.loads(cdata))
        except json.JSONDecodeError:
            pass
    
    # GPU health
    gpu_health = get_gpu_health()
    
    return jsonify({
        "queue_depth": depth,
        "pending_depth": pending_depth,
        "dlq_depth": dlq_depth,
        "circuit_breaker": circuit_state,
        "consumers": consumers,
        "gpu_health": gpu_health,
        "thresholds": {
            "warn": QUEUE_WARN,
            "overloaded": QUEUE_503,
            "open": QUEUE_OPEN,
        },
    }), 200


@app.route("/status/<job_id>")
def job_status_endpoint(job_id):
    """Get specific job status and result."""
    status_data = get_job_status(job_id)
    if not status_data:
        return jsonify({"error": "job not found"}), 404
    return jsonify(status_data), 200


@app.route("/dlq")
def dlq_list():
    """List dead letter queue entries."""
    return jsonify({"dead_letter_queue": list_dlq()}), 200


@app.route("/dlq/<message_id>/retry", methods=["POST"])
def dlq_retry(message_id):
    """Retry a dead-letter job."""
    job_id, msg = retry_dlq(message_id)
    if job_id is None:
        return jsonify({"error": msg}), 404
    return jsonify({"job_id": job_id, "status": "requeued"}), 200


@app.route("/consumers")
def consumers_status():
    """List active consumer pool."""
    r = redis_client()
    consumers_raw = r.hgetall(CONSUMER_KEY) if r else {}
    consumers = []
    for cid, cdata in consumers_raw.items():
        try:
            consumers.append(json.loads(cdata))
        except json.JSONDecodeError:
            pass
    return jsonify({"consumers": consumers}), 200


@app.route("/gpus")
def gpus_status():
    """List GPU registry with live health and metrics."""
    health = get_gpu_health()
    result = {}
    for name, info in GPU_REGISTRY.items():
        h = health.get(name, {})
        result[name] = {
            **info,
            **h,
        }
    return jsonify(result), 200


# ---------------------------------------------------------------------------
# Consumer Pool (runs in background thread)
# ---------------------------------------------------------------------------

def run_consumer(consumer_id):
    """Main consumer loop: reads from stream, selects GPU, forwards request.
    
    This runs in a background thread per worker.
    """
    import urllib.request
    
    worker_name = f"consumer-{consumer_id}"
    consumer_group = f"workers-{consumer_id}"
    
    logger.info(f"{worker_name} started")
    
    # Register consumer
    r = redis_client()
    r.hset(CONSUMER_KEY, consumer_id, json.dumps({
        "consumer_id": consumer_id,
        "started_at": time.time(),
        "pid": os.getpid(),
        "status": "running",
    }))
    
    # Create consumer group if not exists
    try:
        r.xgroup_create(STREAM_KEY, consumer_group, id="0", mkstream=True)
    except Exception:
        pass  # Group already exists
    
    running = True
    
    def handle_signal(signum, frame):
        nonlocal running
        logger.info(f"{worker_name} shutting down (signal {signum})")
        running = False
    
    signal.signal(signal.SIGTERM, handle_signal)
    signal.signal(signal.SIGINT, handle_signal)
    
    last_heartbeat = 0
    gpu_health_cache = {}
    last_health_check = 0
    
    while running:
        try:
            now = time.time()
            
            # Periodic health check (every GPU_HEALTH_INTERVAL seconds)
            if now - last_health_check > GPU_HEALTH_INTERVAL:
                gpu_health_cache = get_gpu_health()
                last_health_check = now
            
            # Read from stream (non-blocking, 1 batch at a time)
            entries = r.xreadgroup(
                consumer_group,
                consumer_id,
                {STREAM_KEY: ">"},  # ">" = new messages only
                count=1,  # Process one job at a time per consumer
                block=int(POLL_INTERVAL * 1000),  # Block in ms
            )
            
            if not entries:
                # Update heartbeat
                if now - last_heartbeat > 5:
                    r.hset(CONSUMER_KEY, consumer_id, json.dumps({
                        "consumer_id": consumer_id,
                        "started_at": r.hget(CONSUMER_KEY, consumer_id + ":started") or time.time(),
                        "pid": os.getpid(),
                        "status": "idle",
                        "last_heartbeat": now,
                        "jobs_processed": r.hget(CONSUMER_KEY, consumer_id + ":processed") or 0,
                    }))
                    last_heartbeat = now
                continue
            
            for stream, messages in entries:
                for msg_id, data in messages:
                    job_data = json.loads(data.get("job", "{}"))
                    job_id = job_data.get("job_id", "unknown")
                    
                    logger.info(f"{worker_name} processing job {job_id} (attempt {job_data.get('attempt', 0)})")
                    
                    # Update status to processing
                    store_job_status(job_id, {
                        "status": "processing",
                        "consumer": consumer_id,
                        "started_at": now,
                        "updated_at": now,
                    })
                    
                    # Track pending
                    r.xadd(PENDING_KEY, {"job_id": job_id, "consumer": consumer_id})
                    
                    # Select GPU
                    gpu_name, gpu_info, gpu_metrics = select_gpu(job_data)
                    
                    if not gpu_name:
                        # No GPU available  requeue
                        logger.warning(f"No GPU available for job {job_id}, requeuing")
                        move_to_dlq(job_id, "no_gpu_available")
                        r.xack(STREAM_KEY, consumer_group, msg_id)
                        continue
                    
                    # Check circuit breaker
                    if not circuit_breaker.allow_request(gpu_name):
                        logger.warning(f"Circuit OPEN for {gpu_name}, DLQ job {job_id}")
                        move_to_dlq(job_id, f"circuit_open_{gpu_name}")
                        r.xack(STREAM_KEY, consumer_group, msg_id)
                        continue
                    
                    # Forward request to GPU
                    gpu_url = f"{gpu_info['endpoint']}/v1/chat/completions"
                    headers = job_data.get("headers", {})
                    payload = job_data.get("payload", "{}")
                    
                    # Ensure Content-Type is set
                    if "Content-Type" not in headers:
                        headers["Content-Type"] = "application/json"
                    
                    # Update job status with GPU selection
                    store_job_status(job_id, {
                        "status": "processing",
                        "gpu": gpu_name,
                        "gpu_load_score": gpu_metrics.get("load_score", 0) if gpu_metrics else 0,
                        "started_at": now,
                        "updated_at": now,
                    })
                    
                    try:
                        req = urllib.request.Request(gpu_url, data=payload.encode(), headers=headers)
                        req.add_header("User-Agent", f"queue-consumer/{consumer_id}")
                        
                        resp = urllib.request.urlopen(req, timeout=JOB_TIMEOUT)
                        response_data = resp.read().decode()
                        
                        # Success!
                        circuit_breaker.record_success(gpu_name)
                        
                        store_job_status(job_id, {
                            "status": "completed",
                            "gpu": gpu_name,
                            "result": response_data[:2000],  # Store first 2000 chars
                            "completed_at": time.time(),
                            "updated_at": time.time(),
                        })
                        
                        logger.info(f"{worker_name} completed job {job_id} on {gpu_name}")
                        
                    except Exception as e:
                        # Request failed
                        error_msg = str(e)[:200]
                        logger.error(f"{worker_name} failed job {job_id} on {gpu_name}: {error_msg}")
                        
                        circuit_breaker.record_failure(gpu_name)
                        
                        # Check if we should retry
                        attempt = job_data.get("attempt", 0) + 1
                        if attempt <= MAX_RETRIES:
                            # Retry: update job with incremented attempt, re-add to stream
                            job_data["attempt"] = attempt
                            store_job_status(job_id, {
                                "status": "retrying",
                                "attempt": attempt,
                                "last_error": error_msg,
                                "updated_at": time.time(),
                            })
                            r.xadd(
                                STREAM_KEY,
                                {"job": json.dumps(job_data)},
                                maxlen=STREAM_MAXLEN,
                                approx=True,
                            )
                        else:
                            # Max retries exceeded  DLQ
                            move_to_dlq(job_id, f"max_retries_exceeded_after_{attempt}_attempts_on_{gpu_name}: {error_msg}")
                    
                    # Ack the stream message
                    r.xack(STREAM_KEY, consumer_group, msg_id)
                    
                    # Remove from pending
                    r.xdel(PENDING_KEY, msg_id)
                    
                    # Update consumer processed count
                    processed = r.hget(CONSUMER_KEY, consumer_id + ":processed")
                    processed = int(processed) + 1 if processed else 1
                    r.hset(CONSUMER_KEY, consumer_id + ":processed", str(processed))
        
        except redis.ConnectionError as e:
            logger.error(f"{worker_name} Redis connection error: {e}. Retrying in {POLL_INTERVAL}s...")
            time.sleep(POLL_INTERVAL)
        except Exception as e:
            logger.error(f"{worker_name} unexpected error: {e}")
            time.sleep(POLL_INTERVAL)
    
    # Unregister consumer
    r.hdel(CONSUMER_KEY, consumer_id)
    logger.info(f"{worker_name} stopped")


# ---------------------------------------------------------------------------
# Startup
# ---------------------------------------------------------------------------

def main():
    """Start the queue service with consumer workers."""
    logger.info("Starting Smart Queue Service...")
    logger.info(f"Redis: {REDIS_HOST}:{REDIS_PORT}")
    logger.info(f"Consumers: {CONSUMER_WORKERS}")
    logger.info(f"GPU registry: {list(GPU_REGISTRY.keys())}")
    
    # Verify Redis connectivity
    try:
        r = redis_client()
        r.ping()
        logger.info("Redis connection verified")
    except Exception as e:
        logger.error(f"Redis connection failed: {e}")
        sys.exit(1)
    
    # Start consumer workers in background threads
    threads = []
    for i in range(CONSUMER_WORKERS):
        t = asyncio.start_new_thread(run_consumer, (f"worker-{i}",))
        threads.append(t)
    
    logger.info(f"Started {CONSUMER_WORKERS} consumer workers")
    
    # Start Flask API (gunicorn handles this in production)
    app.run(host="0.0.0.0", port=8091, threaded=True)


if __name__ == "__main__":
    main()

3.4 Updated queue-service/Dockerfile

FROM python:3.13-slim

RUN pip install --no-cache-dir \
    flask==3.1.0 \
    redis==5.2.1 \
    gunicorn==23.0.0

COPY queue-service.py /app/queue-service.py
WORKDIR /app

EXPOSE 8091

# Gunicorn: 2 workers, 300s timeout (matches LLM inference timeout)
CMD ["gunicorn", "--bind", "0.0.0.0:8091", "--workers", "2", "--threads", "4", "--timeout", "300", "--graceful-timeout", "30", "queue-service:app"]

3.5 Updated docker-compose.yml

version: "3.8"

services:
  redis:
    image: redis:7-alpine
    restart: unless-stopped
    networks:
      - gpu-router-net
    volumes:
      - redis-data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 3

  queue-service:
    build:
      context: .
      dockerfile: queue-service/Dockerfile
    restart: unless-stopped
    networks:
      - gpu-router-net
    expose:
      - "8091"
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - QUEUE_CONSUMERS=3
      - QUEUE_POLL_INTERVAL=2
      - QUEUE_MAX_RETRIES=3
    healthcheck:
      test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8091/health')"]
      interval: 15s
      timeout: 5s
      retries: 3

  dashboard:
    build:
      context: .
      dockerfile: Dockerfile.dashboard
    restart: unless-stopped
    networks:
      - gpu-router-net
    expose:
      - "3001"
    depends_on:
      queue-service:
        condition: service_healthy
    healthcheck:
      test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3001/')"]
      interval: 15s
      timeout: 5s
      retries: 3

  gpu-dashboard:
    build:
      context: .
      dockerfile: Dockerfile.gpu
    restart: unless-stopped
    networks:
      - gpu-router-net
    expose:
      - "8092"
    healthcheck:
      test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8092/gpu.html')"]
      interval: 15s
      timeout: 5s
      retries: 3

networks:
  gpu-router-net:
    driver: bridge

volumes:
  redis-data:

Key changes:

  • dockerfile: queue-service/Dockerfile (QW-1 fix)
  • restart: unless-stopped (QW-8 fix)
  • expose instead of ports for internal services (QW-18: queue API no longer externally exposed)
  • condition: service_healthy for depends_on (QW-9: health checks)

4. Nginx Config Updates (gpu-router-docker.conf)

## Syslog GPU Router  Nginx Configuration (Docker-internal)
## Routes incoming agent requests to the appropriate GPU backend
## based on the X-Syslog-Model header.

upstream amdpve_pool {
    server 192.168.68.15:8080 max_fails=3 fail_timeout=30s;
}

upstream llmgpu_pool {
    server 192.168.68.8:8080 max_fails=3 fail_timeout=30s;
}

upstream ocu_llm_pool {
    server 192.168.68.110:8080 max_fails=3 fail_timeout=30s;
}

upstream queue_service {
    server queue-service:8091;
}

upstream dashboard_service {
    server dashboard:3001;
}

upstream gpu_dashboard_pool {
    server gpu-dashboard:8092;
}

## All-GPUs pool for fallback (fails only when ALL GPUs are down)
upstream all_gpu_pool {
    server 192.168.68.15:8080 max_fails=3 fail_timeout=30s;
    server 192.168.68.8:8080 max_fails=3 fail_timeout=30s;
    server 192.168.68.110:8080 max_fails=3 fail_timeout=30s;
}

map $http_x_syslog_model $gpu_upstream {
    default          amdpve_pool;
    "standard"       amdpve_pool;
    "heavy"          llmgpu_pool;
    "qwen3.5-27B"    llmgpu_pool;
    "light"          ocu_llm_pool;
    "gemma-4"        ocu_llm_pool;
}

limit_req_zone $binary_remote_addr zone=perip:10m rate=10r/s;

server {
    listen 80;
    server_name _;

    location /dashboard {
        proxy_pass http://dashboard_service/;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }

    location /gpu {
        proxy_pass http://gpu_dashboard_pool/;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location / {
        limit_req zone=perip burst=20 nodelay;
        limit_req_status 503;
        proxy_pass http://$gpu_upstream;

        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        ## Streaming support
        proxy_buffering off;
        proxy_cache off;
        proxy_read_timeout  300s;
        proxy_send_timeout  300s;

        ## Only retry on error/timeout  single try (same GPU pool)
        proxy_next_upstream error timeout http_502 http_503 http_504;
        proxy_next_upstream_tries 1;

        add_header X-Routed-To $gpu_upstream always;

        ## Only fall back to queue when the ALL-GPU pool fails
        error_page 504 = @queue_fallback;
    }

    ## Queue fallback  only reached when ALL GPUs are down
    location @queue_fallback {
        proxy_pass http://queue_service/enqueue;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header Content-Type $content_type;
        proxy_pass_request_body on;
        proxy_set_header X-Syslog-Model $http_x_syslog_model;
    }
}

Key changes:

  • max_fails=3 fail_timeout=30s on each upstream (Nginx-level health)
  • all_gpu_pool upstream for fallback (N3 fix)
  • proxy_next_upstream_tries 1 (N2 fix no self-retry)
  • Container names fixed to service names (N5 fix)
  • error_page 504 = @queue_fallback instead of 502/503/504 (N3 fix)
  • X-Syslog-Model forwarded to queue_service in fallback

5. GPU Dashboard Updates

5.1 Updated Dockerfile.gpu

FROM python:3.11-slim

RUN pip install --no-cache-dir psutil==6.1.1

COPY gpu-dashboard/ /app/
WORKDIR /app

RUN mkdir -p /app/public && \
    cp gpu.html /app/public/ && \
    touch /app/public/gpu_metrics.json

EXPOSE 8092

# Single-process: gpu_collector.py serves both as collector and HTTP server
CMD ["python3", "gpu_collector.py"]

5.2 Updated gpu_collector.py (key changes)

# Changes:
# 1. Add async/concurrent GPU polling (threading)
# 2. Fix path consistency (use /app/public consistently)
# 3. Add graceful shutdown
# 4. Include health endpoint for Docker healthcheck

import threading
import http.server
import json
import time
import signal
import sys

# ... (keep existing imports)

DEAD_THRESHOLD = 30  # seconds (was 60  G4 fix)
OUTPUT_PATH = "/app/public/gpu_metrics.json"  # Consistent path (G2 fix)

def fetch_json_concurrent(endpoints):
    """Fetch from all GPUs concurrently using threads."""
    results = {}
    errors = {}
    
    def fetch_one(name, url):
        try:
            req = urllib.request.Request(url)
            req.add_header("User-Agent", "gpu-collector/1.0")
            resp = urllib.request.urlopen(req, timeout=3)
            results[name] = json.loads(resp.read())
        except Exception as e:
            errors[name] = str(e)[:100]
    
    threads = []
    for name, url in endpoints.items():
        t = threading.Thread(target=fetch_one, args=(name, url))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join(timeout=5)
    
    return results, errors

# ... (update the collection loop to use fetch_json_concurrent)

# Add HTTP health endpoint
class HealthHandler(http.server.SimpleHTTPRequestHandler):
    def do_GET(self):
        if self.path == "/health":
            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(json.dumps({"status": "ok"}).encode())
        elif self.path == "/gpu.html" or self.path == "/":
            super().do_GET()
        else:
            self.send_response(404)
            self.end_headers()

# Start both collector loop and HTTP server in the same thread
# (HTTP server runs in a separate thread, collector runs in main)

6. Migration Steps

Phase 1: Quick Wins (Deploy First)

  1. Apply QW-1 through QW-10 (Dockerfile fix, Nginx fixes, health checks)
  2. docker compose up -d --build
  3. Verify all containers pass health checks
  4. Verify Nginx fallback only triggers on ALL-GPU failure

Phase 2: Smart Queue Consumer

  1. Replace queue-service/queue-service.py with new implementation
  2. Replace queue-service/Dockerfile with Gunicorn-based Dockerfile
  3. Update docker-compose.yml (expose ports, depends_on conditions)
  4. docker compose up -d --build queue-service
  5. Verify:
    • /health returns 200
    • /status shows consumer pool
    • Submit a test request via /enqueue
    • Verify job appears in /status/<job_id> with "queued" status
    • Verify job gets processed and moves to "completed"
    • Verify GPU sidecar metrics are being read
    • Verify consumer pool shows active workers

Phase 3: Validation

  1. Submit multiple concurrent requests
  2. Verify load balancing across GPUs
  3. Verify circuit breaker opens on GPU failure
  4. Verify DLQ captures max-retry failures
  5. Verify /dlq and /dlq/<id>/retry endpoints work
  6. Verify backpressure: queue depth > 40 returns 503 with retry-after
  7. Monitor Redis stream lengths and consumer ack rates

7. Risk Assessment

Risk Mitigation
Redis stream XREADGROUP blocks indefinitely block=2000ms parameter prevents infinite blocking
Consumer crashes mid-request PENDING_KEY tracks in-flight jobs; restart re-reads pending
GPU sidecar temporarily unavailable 3s timeout on sidecar calls; GPU marked "down" if unreachable
Circuit breaker oscillates (flapping) recovery_timeout=30s prevents rapid OPENHALF_OPENOPEN cycling
DLQ grows unbounded Stream maxlen=10000 approx trim; manual DLQ cleanup recommended
Consumer threads conflict with Flask Gunicorn handles Flask; consumers run in separate threads
Job payload too large for Redis Redis has 512MB value limit; LLM responses typically < 50KB

8. Future Enhancements (Not in This Draft)

  1. Prometheus metrics endpoint queue depth, processing rate, GPU latency, error rates
  2. WebSocket streaming real-time job status updates to dashboard
  3. Batch processing group multiple small requests for throughput optimization
  4. Model-specific queues separate streams per model for guaranteed SLA
  5. Redis Sentinel/Cluster high availability for Redis
  6. Rate limiting per-model prevent one model from starving others
  7. GPU affinity for long-running jobs keep a job on the same GPU to avoid context switches
  8. Result caching cache responses for identical requests (hash payload, return cached result)

END OF DRAFT

This implementation transforms the queue-service from a passive storage pit into a production-ready smart queue consumer. The quick wins (QW-1 through QW-10) can be deployed independently and provide immediate value. The smart queue consumer builds on top of the quick wins.

Recommended deployment order:

  1. Quick wins (QW-1 to QW-10) 1-2 hours
  2. Smart queue consumer 2-3 hours
  3. Validation & testing 1 hour
  4. Monitor for 24 hours, then go live