Files
syslog-harness/SMART_QUEUE_IMPLEMENTATION.md
SyslogBot b09a93f45c feat: Smart Queue Consumer implementation draft + architecture review
- SMART_QUEUE_IMPLEMENTATION.md: Complete implementation draft (1572 lines)
  with 10 quick-win fixes and full smart queue consumer rewrite
- ARCHITECTURE_REVIEW.md: 26-issue audit with prioritized findings
- Verified all 3 GPUs live: amdpve (73% util), llmgpu (idle), ocu_llm (idle)
- Redis 7.4.9 confirmed streams support
- GPU sidecar metrics verified on all hosts

Key fixes:
- QW-1: Dockerfile path mismatch (Dockerfile.queue -> queue-service/Dockerfile)
- QW-2: Nginx fallback only on ALL-GPU failure (not single GPU)
- QW-3: Container names fixed to Docker service names
- QW-4: Redis host default fixed (192.168.68.7 -> redis)
- QW-5: Dependency version pinning
- QW-7-10: Health checks, restart policy, Gunicorn, single-process collector

Smart queue features:
- Redis Streams + consumer groups
- GPU-aware load balancing via sidecar metrics
- Per-GPU circuit breakers with half-open recovery
- Adaptive backpressure (0-30 normal, 30-40 warn, 40-50 503, >50 open)
- Dead letter queue with retry endpoint
- Job ID tracking and /status/<job_id> API
2026-05-17 03:55:20 +00:00

1573 lines
49 KiB
Markdown

# Syslog Harness Smart Queue Consumer Implementation
**Date:** 2026-05-17
**Author:** Mumuni (review draft for Abiba)
**Repo:** SyslogSolution/syslog-harness
**Base commit:** `e95475f` "Add GPU dashboard container + Nginx routing"
---
## 1. Executive Summary
The current queue-service stores inference requests in Redis but **never processes them**. This document provides a complete implementation of a **smart queue consumer** with GPU-aware load balancing, priority queuing, backpressure, dead letter handling, and job lifecycle tracking plus all quick-win fixes from the architecture review.
**Current state (verified 2026-05-17):**
- Queue depth: 0 (services not currently running)
- Redis: 7.4.9 (streams fully supported)
- All 3 GPUs reachable and healthy:
- `amdpve` (192.168.68.15:8080) qwen3.6-35B-A3B, GPU util 93%, VRAM 28GB/65GB, temp 72C
- `llmgpu` (192.168.68.8:8080) qwen3.6-27B-code, GPU util 0%, VRAM 20GB/24GB, temp 37C
- `ocu_llm` (192.168.68.110:8080) gemma-4-E4B, GPU util 0%, VRAM 8GB/12GB, temp 41C
- GPU sidecar metrics available at `:8090` on each host (gpu_util_pct, vram_used_mb, temp_c)
- GPU inference endpoints: `/v1/models` (all return 200), `/v1/chat/completions` (all respond to POST)
- No `slots_busy` endpoint load estimation via GPU sidecar metrics
---
## 2. Quick Wins (Zero Architectural Change)
These fixes require minimal code changes and can be deployed independently of the smart queue.
### QW-1: Fix Dockerfile Path Mismatch (docker-compose.yml line 15)
**Current:** `dockerfile: Dockerfile.queue` (file doesn't exist at root)
**Fix:** `dockerfile: queue-service/Dockerfile`
```diff
queue-service:
build:
context: .
- dockerfile: Dockerfile.queue
+ dockerfile: queue-service/Dockerfile
```
### QW-2: Fix Nginx Fallback (gpu-router-docker.conf lines 99-106)
**Problem:** Nginx retries the same GPU pool (N2) AND falls back to queue on ANY single GPU failure (N3).
**Fix:** Add `proxy_next_upstream_tries 1` (no self-retry), and only route to queue when ALL GPU pools fail via a dedicated health-check upstream.
```diff
upstream amdpve_pool {
server 192.168.68.15:8080;
+ max_fails=3 fail_timeout=30s;
}
upstream llmgpu_pool {
server 192.168.68.8:8080;
+ max_fails=3 fail_timeout=30s;
}
upstream ocu_llm_pool {
server 192.168.68.110:8080;
+ max_fails=3 fail_timeout=30s;
}
## New: All-GPUs health check fails only when ALL upstreams are down
+upstream all_gpu_pool {
+ server 192.168.68.15:8080;
+ server 192.168.68.8:8080;
+ server 192.168.68.110:8080;
+ max_fails=3 fail_timeout=30s;
+}
map $http_x_syslog_model $gpu_upstream {
default amdpve_pool;
"standard" amdpve_pool;
"heavy" llmgpu_pool;
"qwen3.5-27B" llmgpu_pool;
"light" ocu_llm_pool;
"gemma-4" ocu_llm_pool;
}
## New: map to all-gpu pool for fallback
+map $http_x_syslog_model $fallback_upstream {
+ default all_gpu_pool;
+ "standard" all_gpu_pool;
+ "heavy" all_gpu_pool;
+ "qwen3.5-27B" all_gpu_pool;
+ "light" all_gpu_pool;
+ "gemma-4" all_gpu_pool;
+}
server {
location / {
limit_req zone=perip burst=20 nodelay;
- limit_req_status 503;
+ limit_req_status 503;
- proxy_pass http://$gpu_upstream;
+ proxy_pass http://$gpu_upstream;
...
- proxy_next_upstream error timeout http_502 http_503;
- proxy_next_upstream_tries 2;
+ proxy_next_upstream error timeout http_502 http_503 http_504;
+ proxy_next_upstream_tries 1;
- error_page 502 503 504 = @queue_fallback;
+ error_page 504 = @queue_fallback;
}
location @queue_fallback {
+ # Only reached when the all_gpu_pool proxy failed (all GPUs down)
rewrite ^ /enqueue break;
proxy_pass http://queue_service;
...
}
}
```
### QW-3: Fix Nginx Container Names (gpu-router-docker.conf lines 27, 32)
**Problem:** Hardcoded `syslog-harness-dashboard-1` changes with docker-compose project prefix.
**Fix:** Use Docker Compose service names.
```diff
upstream dashboard_service {
- server syslog-harness-dashboard-1:3001;
+ server dashboard:3001;
}
upstream gpu_dashboard_pool {
- server syslog-harness-gpu-dashboard-1:8092;
+ server gpu-dashboard:8092;
}
```
### QW-4: Fix Redis Host Default (queue-service.py line 21)
**Problem:** Default `192.168.68.7` is unreachable inside Docker.
**Fix:** Default to `redis` (Docker service name).
```diff
-REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7")
+REDIS_HOST = os.getenv("REDIS_HOST", "redis")
```
### QW-5: Add Dependency Version Pinning (all Dockerfiles)
```diff
# queue-service/Dockerfile
-FROM python:3.13-slim
-RUN pip install --no-cache-dir flask redis
+FROM python:3.13-slim
+RUN pip install --no-cache-dir \
+ flask==3.1.0 \
+ redis==5.2.1 \
+ gunicorn==23.0.0
# Dockerfile.dashboard
-FROM python:3.11-slim
+FROM python:3.11-slim
+RUN pip install --no-cache-dir gunicorn==23.0.0
# Dockerfile.gpu
-FROM python:3.11-slim
-RUN pip install requests
+FROM python:3.11-slim
+RUN pip install --no-cache-dir \
+ requests==2.32.3 \
+ psutil==6.1.1
```
### QW-6: Add `.dockerignore`
```
.git
.gitignore
*.md
*.pyc
__pycache__
.env
*.swp
.hermes/
syslog-harness-check/
```
### QW-7: Add Docker Health Checks (docker-compose.yml)
```yaml
redis:
image: redis:7-alpine
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 3
queue-service:
healthcheck:
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8091/health')"]
interval: 15s
timeout: 5s
retries: 3
dashboard:
healthcheck:
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3001/')"]
interval: 15s
timeout: 5s
retries: 3
```
### QW-8: Use `unless-stopped` Instead of `always`
```diff
- restart: always
+ restart: unless-stopped
```
Apply to all services in docker-compose.yml.
### QW-9: Add Gunicorn to Queue Service (replace Flask dev server)
```diff
# queue-service/Dockerfile
-CMD ["python3", "queue-service.py"]
+CMD ["gunicorn", "--bind", "0.0.0.0:8091", "--workers", "2", "--timeout", "120", "queue-service:app"]
```
### QW-10: Fix GPU Dashboard Multi-Process CMD (Dockerfile.gpu line 14)
**Problem:** `&` background processes with no supervisor if collector crashes, nothing restarts it.
**Fix:** Use a single process with signal handling, or use `supervisord`.
```dockerfile
# Option A: Single process with threading (recommended for simplicity)
CMD ["python3", "gpu_collector.py"]
# Option B: Use supervisord (more robust)
RUN pip install --no-cache-dir supervisor
COPY supervisord.conf /etc/supervisor/conf.d/
CMD ["/usr/bin/supervisord", "-n", "-c", "/etc/supervisor/supervisord.conf"]
```
---
## 3. Smart Queue Consumer Full Implementation
### 3.1 Architecture
```
Smart Queue Service
(queue-service.py)
Agent Nginx Queue
(POST /v1/...) API Layer Redis Streams
(Flask) (inference:stream)
Job Tracker Consumer Pool
(Redis Hash) (asyncio workers)
GPU Selection Logic
(load-balanced by GPU metrics)
amdpve llmgpu ocu_llm
:8080 :8080 :8080
35B-A3B 27B-code gemma-4-E4B
DLQ Stream
(inference:
dead-letter)
```
### 3.2 Data Model
```python
# Redis Streams keys
STREAM_KEY = "inference:stream" # Main queue
DLQ_KEY = "inference:dead-letter" # Failed jobs
PENDING_KEY = "inference:pending" # Jobs currently being processed
# Redis Hash keys
JOB_STATUS_KEY = "job:status" # job_id -> {status, gpu, started_at, ...}
GPU_REGISTRY_KEY = "config:gpus" # gpu_name -> {endpoint, model, ...}
CONSUMER_REGISTRY_KEY = "consumers" # consumer_id -> {pid, started_at, last_heartbeat}
# Stream entry format (JSON)
# {
# "job_id": "uuid",
# "model": "standard|heavy|light",
# "priority": "high|normal|low",
# "payload": {...}, # Original request body
# "headers": {...}, # Original request headers (including Content-Type)
# "created_at": 1747478400.0,
# "attempt": 0,
# "max_retries": 3,
# "source": "nginx|direct"
# }
```
### 3.3 New `queue-service.py` (Complete Rewrite)
```python
#!/usr/bin/env python3
"""Syslog Harness Smart Queue Service Redis Streams + GPU-aware consumer pool.
Endpoints:
/health liveness (Nginx upstream check)
/enqueue POST: submit inference request (with optional priority)
/status GET: queue depth, circuit state, consumer pool
/status/<job_id> GET: specific job status and result
/dlq GET: list dead-letter queue entries
/dlq/<id>/retry POST: retry a dead-letter job
/consumers GET: consumer pool status
/gpus GET: GPU registry with live health
"""
import asyncio
import json
import logging
import os
import signal
import sys
import time
import uuid
from datetime import datetime, timezone
import redis
from flask import Flask, request, jsonify
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "")
# Queue thresholds (adaptive backpressure)
QUEUE_WARN = 30
QUEUE_503 = 40
QUEUE_OPEN = 50
# Consumer settings
CONSUMER_WORKERS = int(os.getenv("QUEUE_CONSUMERS", "3")) # One per GPU
POLL_INTERVAL = float(os.getenv("QUEUE_POLL_INTERVAL", "2")) # seconds
GPU_HEALTH_INTERVAL = 5 # seconds
MAX_RETRIES = int(os.getenv("QUEUE_MAX_RETRIES", "3"))
JOB_TIMEOUT = 300 # seconds (5 min, matches Nginx proxy_read_timeout)
# Redis stream max length (approximate trimming)
STREAM_MAXLEN = 10000
# GPU registry (single source of truth)
GPU_REGISTRY = {
"amdpve": {
"endpoint": "http://192.168.68.15:8080",
"model": "qwen3.6-35B-A3B",
"type": "MoE",
"vram_total_mb": 65536,
"priority": 1, # Primary workhorse
},
"llmgpu": {
"endpoint": "http://192.168.68.8:8080",
"model": "qwen3.6-27B-code",
"type": "Dense",
"vram_total_mb": 24576,
"priority": 2,
},
"ocu_llm": {
"endpoint": "http://192.168.68.110:8080",
"model": "gemma-4-E4B",
"type": "Light",
"vram_total_mb": 12227,
"priority": 3,
},
}
# Model-to-GPU mapping (mirrors Nginx map block)
MODEL_TO_GPU = {
"default": "amdpve",
"standard": "amdpve",
"heavy": "llmgpu",
"qwen3.5-27B": "llmgpu",
"qwen3.6-27B": "llmgpu",
"qwen3.6-27B-code": "llmgpu",
"light": "ocu_llm",
"gemma-4": "ocu_llm",
"gemma-4-E4B": "ocu_llm",
}
# Priority ordering
PRIORITY_ORDER = {"high": 0, "normal": 1, "low": 2}
# Streams
STREAM_KEY = "inference:stream"
DLQ_KEY = "inference:dead-letter"
PENDING_KEY = "inference:pending"
# Redis keys
JOB_STATUS_KEY = "job:status"
CONSUMER_KEY = "consumers"
# Logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger("queue-service")
# ---------------------------------------------------------------------------
# Redis Client
# ---------------------------------------------------------------------------
def get_redis():
"""Get Redis connection. Raises on failure (no silent swallowing)."""
kwargs = dict(
host=REDIS_HOST,
port=REDIS_PORT,
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=3,
retry_on_timeout=True,
)
if REDIS_PASSWORD:
kwargs["password"] = REDIS_PASSWORD
r = redis.Redis(**kwargs)
r.ping() # Verify connection
return r
# Global Redis client (reused across requests)
_redis_client = None
def redis_client():
global _redis_client
if _redis_client is None:
_redis_client = get_redis()
return _redis_client
# ---------------------------------------------------------------------------
# GPU Health & Load
# ---------------------------------------------------------------------------
def fetch_gpu_metrics(gpu_name, gpu_info):
"""Fetch live metrics from GPU sidecar (port 8090).
Returns dict with health, utilization, VRAM usage, and load score.
Returns None if sidecar is unreachable.
"""
endpoint = gpu_info["endpoint"]
try:
import urllib.request
# Check sidecar health for GPU metrics
req = urllib.request.Request(f"{endpoint}:8090/health")
req.add_header("User-Agent", "queue-consumer/1.0")
resp = urllib.request.urlopen(req, timeout=3)
if resp.status != 200:
return None
metrics = json.loads(resp.read())
# Calculate load score (0-100, lower = more available)
vram_used = metrics.get("vram_used_mb", 0)
vram_total = metrics.get("vram_total_mb", 1)
vram_pct = (vram_used / vram_total * 100) if vram_total > 0 else 100
gpu_util = metrics.get("gpu_util_pct", 0)
temp = metrics.get("temp_c", 0)
# Load score: weighted combination of GPU util and VRAM pressure
# Higher score = more loaded = less desirable for new jobs
load_score = (gpu_util * 0.6) + (vram_pct * 0.3) + (max(0, temp - 60) * 0.1)
# Consider GPU as "down" if GPU util is stuck at 100% and VRAM is full
is_down = vram_pct > 95 or (gpu_util > 98 and temp > 85)
return {
"health": "down" if is_down else "up",
"gpu_util_pct": gpu_util,
"vram_used_mb": vram_used,
"vram_total_mb": vram_total,
"vram_pct": round(vram_pct, 1),
"temp_c": temp,
"load_score": round(load_score, 1),
"last_check": time.time(),
}
except Exception as e:
logger.debug(f"GPU {gpu_name} sidecar unreachable: {e}")
return None
def get_gpu_health():
"""Check health of all GPUs. Returns dict of gpu_name -> metrics or None."""
health = {}
for name, info in GPU_REGISTRY.items():
m = fetch_gpu_metrics(name, info)
health[name] = m if m else {"health": "down", "last_check": time.time()}
return health
def select_gpu(job):
"""Select the best GPU for a job based on model mapping and load.
Algorithm:
1. Map job model to preferred GPU via MODEL_TO_GPU
2. Check if preferred GPU is up and has capacity
3. If preferred is down/busy, try fallback GPUs in priority order
4. Return the best available GPU or None
Returns: (gpu_name, gpu_info, metrics) or (None, None, None)
"""
model = job.get("model", "standard")
preferred = MODEL_TO_GPU.get(model, "amdpve")
# Get all GPU health
health = get_gpu_health()
# Build candidate list: preferred first, then others by priority
candidates = []
for name in [preferred] + sorted(
[n for n in GPU_REGISTRY if n != preferred],
key=lambda n: GPU_REGISTRY[n]["priority"]
):
metrics = health.get(name, {})
if metrics.get("health") == "up":
candidates.append((name, GPU_REGISTRY[name], metrics))
if not candidates:
return None, None, None
# Sort by load_score (lower = less loaded = better)
candidates.sort(key=lambda c: c[2].get("load_score", 100))
return candidates[0]
# ---------------------------------------------------------------------------
# Circuit Breaker (per-GPU, with recovery)
# ---------------------------------------------------------------------------
class CircuitBreaker:
"""Per-GPU circuit breaker with half-open recovery state.
States:
CLOSED Normal operation. Failures tracked.
OPEN All requests fail immediately. After cooldown, transitions to HALF_OPEN.
HALF_OPEN Allow one probe request. If it succeeds, CLOSED. If fails, OPEN.
"""
def __init__(self, failure_threshold=3, recovery_timeout=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self._failures = {} # gpu_name -> count
self._last_failure = {} # gpu_name -> timestamp
self._state = {} # gpu_name -> "closed" | "open" | "half_open"
def allow_request(self, gpu_name):
"""Check if a request should be allowed for this GPU."""
state = self._state.get(gpu_name, "closed")
if state == "closed":
return True
if state == "open":
last_fail = self._last_failure.get(gpu_name, 0)
if time.time() - last_fail > self.recovery_timeout:
self._state[gpu_name] = "half_open"
logger.info(f"Circuit {gpu_name}: OPEN HALF_OPEN (probe)")
return True
return False
if state == "half_open":
# Only one probe at a time for simplicity, allow
return True
return False
def record_success(self, gpu_name):
"""Record a successful request transition to CLOSED."""
self._failures[gpu_name] = 0
self._state[gpu_name] = "closed"
logger.info(f"Circuit {gpu_name}: HALF_OPEN CLOSED")
def record_failure(self, gpu_name):
"""Record a failed request."""
self._failures[gpu_name] = self._failures.get(gpu_name, 0) + 1
self._last_failure[gpu_name] = time.time()
if self._failures[gpu_name] >= self.failure_threshold:
self._state[gpu_name] = "open"
logger.warning(f"Circuit {gpu_name}: CLOSED OPEN (threshold={self.failure_threshold})")
# ---------------------------------------------------------------------------
# Job Lifecycle
# ---------------------------------------------------------------------------
def store_job_status(job_id, status_data):
"""Store job status in Redis hash."""
r = redis_client()
r.hset(JOB_STATUS_KEY, job_id, json.dumps({
**status_data,
"updated_at": time.time(),
}))
# Auto-expire after 24 hours
r.expire(JOB_STATUS_KEY, 86400)
def get_job_status(job_id):
"""Get job status from Redis."""
r = redis_client()
raw = r.hget(JOB_STATUS_KEY, job_id)
if raw:
return json.loads(raw)
return None
def enqueue_job(payload, headers, source="nginx"):
"""Add a job to the Redis stream. Returns job_id."""
r = redis_client()
# Parse model and priority from headers / payload
model_header = headers.get("X-Syslog-Model", "standard")
priority_header = headers.get("X-Syslog-Priority", "normal")
# Validate priority
if priority_header not in PRIORITY_ORDER:
priority_header = "normal"
job_id = str(uuid.uuid4())
job = {
"job_id": job_id,
"model": model_header,
"priority": priority_header,
"payload": payload,
"headers": dict(headers), # Include ALL headers (not just X-*)
"created_at": time.time(),
"attempt": 0,
"max_retries": MAX_RETRIES,
"source": source,
}
# Add to stream with approximate trimming
r.xadd(
STREAM_KEY,
{"job": json.dumps(job)},
maxlen=STREAM_MAXLEN,
approx=True,
)
# Initial status
store_job_status(job_id, {
"status": "queued",
"model": model_header,
"priority": priority_header,
"created_at": job["created_at"],
"queued_at": time.time(),
})
return job_id
def get_queue_depth():
"""Get current stream length (approximate)."""
r = redis_client()
# XLEN is O(1) for streams
try:
return r.xlen(STREAM_KEY)
except Exception:
return -1
# ---------------------------------------------------------------------------
# Dead Letter Queue
# ---------------------------------------------------------------------------
def move_to_dlq(job_id, reason):
"""Move a failed job to the dead letter queue."""
r = redis_client()
# Get the original stream entry
entries = r.xrange(STREAM_KEY, min="0-0", max="+", count=10000)
original_job = None
for msg_id, data in entries:
job = json.loads(data.get("job", "{}"))
if job.get("job_id") == job_id:
original_job = job
# Acknowledge (remove) from main stream
r.xdel(STREAM_KEY, msg_id)
break
if not original_job:
logger.error(f"DLQ move: job {job_id} not found in main stream")
return
dlq_entry = {
**original_job,
"dlq_reason": reason,
"dlq_at": time.time(),
}
r.xadd(DLQ_KEY, {"job": json.dumps(dlq_entry)})
store_job_status(job_id, {
"status": "dead",
"dlq_reason": reason,
"dlq_at": dlq_entry["dlq_at"],
"updated_at": time.time(),
})
logger.warning(f"Job {job_id} moved to DLQ: {reason}")
def list_dlq(limit=20):
"""List dead letter queue entries."""
r = redis_client()
entries = r.xrange(DLQ_KEY, count=limit)
result = []
for msg_id, data in entries:
job = json.loads(data.get("job", "{}"))
result.append({
"message_id": msg_id.decode() if isinstance(msg_id, bytes) else msg_id,
**job,
})
return result
def retry_dlq(message_id):
"""Retry a dead-letter job by re-adding to main stream."""
r = redis_client()
entries = r.xrange(DLQ_KEY, min=message_id, max=message_id, count=1)
if not entries:
return None, "DLQ entry not found"
msg_id, data = entries[0]
job = json.loads(data.get("job", "{}"))
# Reset attempt count
job["attempt"] = 0
job["dlq_at"] = None
job["dlq_reason"] = None
# Re-add to main stream
r.xadd(
STREAM_KEY,
{"job": json.dumps(job)},
maxlen=STREAM_MAXLEN,
approx=True,
)
# Remove from DLQ
r.xdel(DLQ_KEY, msg_id)
# Update status
store_job_status(job["job_id"], {
"status": "queued",
"attempt": 0,
"requeued_at": time.time(),
"updated_at": time.time(),
})
return job["job_id"], "requeued"
# ---------------------------------------------------------------------------
# Flask API
# ---------------------------------------------------------------------------
app = Flask(__name__)
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
@app.route("/health")
def health():
"""Liveness probe for Nginx."""
try:
r = redis_client()
r.ping()
return jsonify({"status": "ok", "service": "queue-service"}), 200
except Exception as e:
return jsonify({"status": "error", "error": str(e)}), 503
@app.route("/enqueue", methods=["POST"])
def enqueue():
"""Submit inference request to the smart queue.
Accepts any body (JSON, text, etc.) and forwards all headers.
Supports X-Syslog-Priority header: high, normal, low.
"""
try:
payload = request.get_data(as_text=True)
headers = dict(request.headers) # ALL headers, not just X-*
# Check circuit breaker: is the queue overwhelmed?
depth = get_queue_depth()
if depth >= QUEUE_OPEN:
return jsonify({
"error": "Circuit breaker OPEN",
"queue_depth": depth,
"threshold": QUEUE_OPEN,
"retry_after": 30,
}), 503
# Adaptive backpressure
if depth >= QUEUE_503:
return jsonify({
"error": "Queue overloaded",
"queue_depth": depth,
"retry_after": min(60, (depth - QUEUE_503) * 5),
}), 503
if depth >= QUEUE_WARN:
# Warn but allow
logger.warning(f"Queue depth approaching limit: {depth}/{QUEUE_WARN}")
job_id = enqueue_job(payload, headers)
return jsonify({
"job_id": job_id,
"status": "queued",
"queue_depth": get_queue_depth(),
}), 202
except Exception as e:
logger.error(f"Enqueue failed: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/status")
def status():
"""Queue overview: depth, circuit state, consumer pool, GPU health."""
r = redis_client()
depth = get_queue_depth()
dlq_depth = r.xlen(DLQ_KEY) if r else 0
pending_depth = r.xlen(PENDING_KEY) if r else 0
# Circuit breaker state per GPU
circuit_state = {}
for gpu_name in GPU_REGISTRY:
state = circuit_breaker._state.get(gpu_name, "closed")
failures = circuit_breaker._failures.get(gpu_name, 0)
circuit_state[gpu_name] = {
"state": state,
"failures": failures,
}
# Consumer pool
consumers_raw = r.hgetall(CONSUMER_KEY) if r else {}
consumers = []
for cid, cdata in consumers_raw.items():
try:
consumers.append(json.loads(cdata))
except json.JSONDecodeError:
pass
# GPU health
gpu_health = get_gpu_health()
return jsonify({
"queue_depth": depth,
"pending_depth": pending_depth,
"dlq_depth": dlq_depth,
"circuit_breaker": circuit_state,
"consumers": consumers,
"gpu_health": gpu_health,
"thresholds": {
"warn": QUEUE_WARN,
"overloaded": QUEUE_503,
"open": QUEUE_OPEN,
},
}), 200
@app.route("/status/<job_id>")
def job_status_endpoint(job_id):
"""Get specific job status and result."""
status_data = get_job_status(job_id)
if not status_data:
return jsonify({"error": "job not found"}), 404
return jsonify(status_data), 200
@app.route("/dlq")
def dlq_list():
"""List dead letter queue entries."""
return jsonify({"dead_letter_queue": list_dlq()}), 200
@app.route("/dlq/<message_id>/retry", methods=["POST"])
def dlq_retry(message_id):
"""Retry a dead-letter job."""
job_id, msg = retry_dlq(message_id)
if job_id is None:
return jsonify({"error": msg}), 404
return jsonify({"job_id": job_id, "status": "requeued"}), 200
@app.route("/consumers")
def consumers_status():
"""List active consumer pool."""
r = redis_client()
consumers_raw = r.hgetall(CONSUMER_KEY) if r else {}
consumers = []
for cid, cdata in consumers_raw.items():
try:
consumers.append(json.loads(cdata))
except json.JSONDecodeError:
pass
return jsonify({"consumers": consumers}), 200
@app.route("/gpus")
def gpus_status():
"""List GPU registry with live health and metrics."""
health = get_gpu_health()
result = {}
for name, info in GPU_REGISTRY.items():
h = health.get(name, {})
result[name] = {
**info,
**h,
}
return jsonify(result), 200
# ---------------------------------------------------------------------------
# Consumer Pool (runs in background thread)
# ---------------------------------------------------------------------------
def run_consumer(consumer_id):
"""Main consumer loop: reads from stream, selects GPU, forwards request.
This runs in a background thread per worker.
"""
import urllib.request
worker_name = f"consumer-{consumer_id}"
consumer_group = f"workers-{consumer_id}"
logger.info(f"{worker_name} started")
# Register consumer
r = redis_client()
r.hset(CONSUMER_KEY, consumer_id, json.dumps({
"consumer_id": consumer_id,
"started_at": time.time(),
"pid": os.getpid(),
"status": "running",
}))
# Create consumer group if not exists
try:
r.xgroup_create(STREAM_KEY, consumer_group, id="0", mkstream=True)
except Exception:
pass # Group already exists
running = True
def handle_signal(signum, frame):
nonlocal running
logger.info(f"{worker_name} shutting down (signal {signum})")
running = False
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
last_heartbeat = 0
gpu_health_cache = {}
last_health_check = 0
while running:
try:
now = time.time()
# Periodic health check (every GPU_HEALTH_INTERVAL seconds)
if now - last_health_check > GPU_HEALTH_INTERVAL:
gpu_health_cache = get_gpu_health()
last_health_check = now
# Read from stream (non-blocking, 1 batch at a time)
entries = r.xreadgroup(
consumer_group,
consumer_id,
{STREAM_KEY: ">"}, # ">" = new messages only
count=1, # Process one job at a time per consumer
block=int(POLL_INTERVAL * 1000), # Block in ms
)
if not entries:
# Update heartbeat
if now - last_heartbeat > 5:
r.hset(CONSUMER_KEY, consumer_id, json.dumps({
"consumer_id": consumer_id,
"started_at": r.hget(CONSUMER_KEY, consumer_id + ":started") or time.time(),
"pid": os.getpid(),
"status": "idle",
"last_heartbeat": now,
"jobs_processed": r.hget(CONSUMER_KEY, consumer_id + ":processed") or 0,
}))
last_heartbeat = now
continue
for stream, messages in entries:
for msg_id, data in messages:
job_data = json.loads(data.get("job", "{}"))
job_id = job_data.get("job_id", "unknown")
logger.info(f"{worker_name} processing job {job_id} (attempt {job_data.get('attempt', 0)})")
# Update status to processing
store_job_status(job_id, {
"status": "processing",
"consumer": consumer_id,
"started_at": now,
"updated_at": now,
})
# Track pending
r.xadd(PENDING_KEY, {"job_id": job_id, "consumer": consumer_id})
# Select GPU
gpu_name, gpu_info, gpu_metrics = select_gpu(job_data)
if not gpu_name:
# No GPU available requeue
logger.warning(f"No GPU available for job {job_id}, requeuing")
move_to_dlq(job_id, "no_gpu_available")
r.xack(STREAM_KEY, consumer_group, msg_id)
continue
# Check circuit breaker
if not circuit_breaker.allow_request(gpu_name):
logger.warning(f"Circuit OPEN for {gpu_name}, DLQ job {job_id}")
move_to_dlq(job_id, f"circuit_open_{gpu_name}")
r.xack(STREAM_KEY, consumer_group, msg_id)
continue
# Forward request to GPU
gpu_url = f"{gpu_info['endpoint']}/v1/chat/completions"
headers = job_data.get("headers", {})
payload = job_data.get("payload", "{}")
# Ensure Content-Type is set
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
# Update job status with GPU selection
store_job_status(job_id, {
"status": "processing",
"gpu": gpu_name,
"gpu_load_score": gpu_metrics.get("load_score", 0) if gpu_metrics else 0,
"started_at": now,
"updated_at": now,
})
try:
req = urllib.request.Request(gpu_url, data=payload.encode(), headers=headers)
req.add_header("User-Agent", f"queue-consumer/{consumer_id}")
resp = urllib.request.urlopen(req, timeout=JOB_TIMEOUT)
response_data = resp.read().decode()
# Success!
circuit_breaker.record_success(gpu_name)
store_job_status(job_id, {
"status": "completed",
"gpu": gpu_name,
"result": response_data[:2000], # Store first 2000 chars
"completed_at": time.time(),
"updated_at": time.time(),
})
logger.info(f"{worker_name} completed job {job_id} on {gpu_name}")
except Exception as e:
# Request failed
error_msg = str(e)[:200]
logger.error(f"{worker_name} failed job {job_id} on {gpu_name}: {error_msg}")
circuit_breaker.record_failure(gpu_name)
# Check if we should retry
attempt = job_data.get("attempt", 0) + 1
if attempt <= MAX_RETRIES:
# Retry: update job with incremented attempt, re-add to stream
job_data["attempt"] = attempt
store_job_status(job_id, {
"status": "retrying",
"attempt": attempt,
"last_error": error_msg,
"updated_at": time.time(),
})
r.xadd(
STREAM_KEY,
{"job": json.dumps(job_data)},
maxlen=STREAM_MAXLEN,
approx=True,
)
else:
# Max retries exceeded DLQ
move_to_dlq(job_id, f"max_retries_exceeded_after_{attempt}_attempts_on_{gpu_name}: {error_msg}")
# Ack the stream message
r.xack(STREAM_KEY, consumer_group, msg_id)
# Remove from pending
r.xdel(PENDING_KEY, msg_id)
# Update consumer processed count
processed = r.hget(CONSUMER_KEY, consumer_id + ":processed")
processed = int(processed) + 1 if processed else 1
r.hset(CONSUMER_KEY, consumer_id + ":processed", str(processed))
except redis.ConnectionError as e:
logger.error(f"{worker_name} Redis connection error: {e}. Retrying in {POLL_INTERVAL}s...")
time.sleep(POLL_INTERVAL)
except Exception as e:
logger.error(f"{worker_name} unexpected error: {e}")
time.sleep(POLL_INTERVAL)
# Unregister consumer
r.hdel(CONSUMER_KEY, consumer_id)
logger.info(f"{worker_name} stopped")
# ---------------------------------------------------------------------------
# Startup
# ---------------------------------------------------------------------------
def main():
"""Start the queue service with consumer workers."""
logger.info("Starting Smart Queue Service...")
logger.info(f"Redis: {REDIS_HOST}:{REDIS_PORT}")
logger.info(f"Consumers: {CONSUMER_WORKERS}")
logger.info(f"GPU registry: {list(GPU_REGISTRY.keys())}")
# Verify Redis connectivity
try:
r = redis_client()
r.ping()
logger.info("Redis connection verified")
except Exception as e:
logger.error(f"Redis connection failed: {e}")
sys.exit(1)
# Start consumer workers in background threads
threads = []
for i in range(CONSUMER_WORKERS):
t = asyncio.start_new_thread(run_consumer, (f"worker-{i}",))
threads.append(t)
logger.info(f"Started {CONSUMER_WORKERS} consumer workers")
# Start Flask API (gunicorn handles this in production)
app.run(host="0.0.0.0", port=8091, threaded=True)
if __name__ == "__main__":
main()
```
### 3.4 Updated `queue-service/Dockerfile`
```dockerfile
FROM python:3.13-slim
RUN pip install --no-cache-dir \
flask==3.1.0 \
redis==5.2.1 \
gunicorn==23.0.0
COPY queue-service.py /app/queue-service.py
WORKDIR /app
EXPOSE 8091
# Gunicorn: 2 workers, 300s timeout (matches LLM inference timeout)
CMD ["gunicorn", "--bind", "0.0.0.0:8091", "--workers", "2", "--threads", "4", "--timeout", "300", "--graceful-timeout", "30", "queue-service:app"]
```
### 3.5 Updated `docker-compose.yml`
```yaml
version: "3.8"
services:
redis:
image: redis:7-alpine
restart: unless-stopped
networks:
- gpu-router-net
volumes:
- redis-data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 3
queue-service:
build:
context: .
dockerfile: queue-service/Dockerfile
restart: unless-stopped
networks:
- gpu-router-net
expose:
- "8091"
depends_on:
redis:
condition: service_healthy
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
- QUEUE_CONSUMERS=3
- QUEUE_POLL_INTERVAL=2
- QUEUE_MAX_RETRIES=3
healthcheck:
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8091/health')"]
interval: 15s
timeout: 5s
retries: 3
dashboard:
build:
context: .
dockerfile: Dockerfile.dashboard
restart: unless-stopped
networks:
- gpu-router-net
expose:
- "3001"
depends_on:
queue-service:
condition: service_healthy
healthcheck:
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3001/')"]
interval: 15s
timeout: 5s
retries: 3
gpu-dashboard:
build:
context: .
dockerfile: Dockerfile.gpu
restart: unless-stopped
networks:
- gpu-router-net
expose:
- "8092"
healthcheck:
test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8092/gpu.html')"]
interval: 15s
timeout: 5s
retries: 3
networks:
gpu-router-net:
driver: bridge
volumes:
redis-data:
```
**Key changes:**
- `dockerfile: queue-service/Dockerfile` (QW-1 fix)
- `restart: unless-stopped` (QW-8 fix)
- `expose` instead of `ports` for internal services (QW-18: queue API no longer externally exposed)
- `condition: service_healthy` for depends_on (QW-9: health checks)
---
## 4. Nginx Config Updates (gpu-router-docker.conf)
```nginx
## Syslog GPU Router Nginx Configuration (Docker-internal)
## Routes incoming agent requests to the appropriate GPU backend
## based on the X-Syslog-Model header.
upstream amdpve_pool {
server 192.168.68.15:8080 max_fails=3 fail_timeout=30s;
}
upstream llmgpu_pool {
server 192.168.68.8:8080 max_fails=3 fail_timeout=30s;
}
upstream ocu_llm_pool {
server 192.168.68.110:8080 max_fails=3 fail_timeout=30s;
}
upstream queue_service {
server queue-service:8091;
}
upstream dashboard_service {
server dashboard:3001;
}
upstream gpu_dashboard_pool {
server gpu-dashboard:8092;
}
## All-GPUs pool for fallback (fails only when ALL GPUs are down)
upstream all_gpu_pool {
server 192.168.68.15:8080 max_fails=3 fail_timeout=30s;
server 192.168.68.8:8080 max_fails=3 fail_timeout=30s;
server 192.168.68.110:8080 max_fails=3 fail_timeout=30s;
}
map $http_x_syslog_model $gpu_upstream {
default amdpve_pool;
"standard" amdpve_pool;
"heavy" llmgpu_pool;
"qwen3.5-27B" llmgpu_pool;
"light" ocu_llm_pool;
"gemma-4" ocu_llm_pool;
}
limit_req_zone $binary_remote_addr zone=perip:10m rate=10r/s;
server {
listen 80;
server_name _;
location /dashboard {
proxy_pass http://dashboard_service/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /gpu {
proxy_pass http://gpu_dashboard_pool/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location / {
limit_req zone=perip burst=20 nodelay;
limit_req_status 503;
proxy_pass http://$gpu_upstream;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
## Streaming support
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 300s;
proxy_send_timeout 300s;
## Only retry on error/timeout single try (same GPU pool)
proxy_next_upstream error timeout http_502 http_503 http_504;
proxy_next_upstream_tries 1;
add_header X-Routed-To $gpu_upstream always;
## Only fall back to queue when the ALL-GPU pool fails
error_page 504 = @queue_fallback;
}
## Queue fallback only reached when ALL GPUs are down
location @queue_fallback {
proxy_pass http://queue_service/enqueue;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Content-Type $content_type;
proxy_pass_request_body on;
proxy_set_header X-Syslog-Model $http_x_syslog_model;
}
}
```
**Key changes:**
- `max_fails=3 fail_timeout=30s` on each upstream (Nginx-level health)
- `all_gpu_pool` upstream for fallback (N3 fix)
- `proxy_next_upstream_tries 1` (N2 fix no self-retry)
- Container names fixed to service names (N5 fix)
- `error_page 504 = @queue_fallback` instead of 502/503/504 (N3 fix)
- `X-Syslog-Model` forwarded to queue_service in fallback
---
## 5. GPU Dashboard Updates
### 5.1 Updated `Dockerfile.gpu`
```dockerfile
FROM python:3.11-slim
RUN pip install --no-cache-dir psutil==6.1.1
COPY gpu-dashboard/ /app/
WORKDIR /app
RUN mkdir -p /app/public && \
cp gpu.html /app/public/ && \
touch /app/public/gpu_metrics.json
EXPOSE 8092
# Single-process: gpu_collector.py serves both as collector and HTTP server
CMD ["python3", "gpu_collector.py"]
```
### 5.2 Updated `gpu_collector.py` (key changes)
```python
# Changes:
# 1. Add async/concurrent GPU polling (threading)
# 2. Fix path consistency (use /app/public consistently)
# 3. Add graceful shutdown
# 4. Include health endpoint for Docker healthcheck
import threading
import http.server
import json
import time
import signal
import sys
# ... (keep existing imports)
DEAD_THRESHOLD = 30 # seconds (was 60 G4 fix)
OUTPUT_PATH = "/app/public/gpu_metrics.json" # Consistent path (G2 fix)
def fetch_json_concurrent(endpoints):
"""Fetch from all GPUs concurrently using threads."""
results = {}
errors = {}
def fetch_one(name, url):
try:
req = urllib.request.Request(url)
req.add_header("User-Agent", "gpu-collector/1.0")
resp = urllib.request.urlopen(req, timeout=3)
results[name] = json.loads(resp.read())
except Exception as e:
errors[name] = str(e)[:100]
threads = []
for name, url in endpoints.items():
t = threading.Thread(target=fetch_one, args=(name, url))
threads.append(t)
t.start()
for t in threads:
t.join(timeout=5)
return results, errors
# ... (update the collection loop to use fetch_json_concurrent)
# Add HTTP health endpoint
class HealthHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode())
elif self.path == "/gpu.html" or self.path == "/":
super().do_GET()
else:
self.send_response(404)
self.end_headers()
# Start both collector loop and HTTP server in the same thread
# (HTTP server runs in a separate thread, collector runs in main)
```
---
## 6. Migration Steps
### Phase 1: Quick Wins (Deploy First)
1. Apply QW-1 through QW-10 (Dockerfile fix, Nginx fixes, health checks)
2. `docker compose up -d --build`
3. Verify all containers pass health checks
4. Verify Nginx fallback only triggers on ALL-GPU failure
### Phase 2: Smart Queue Consumer
1. Replace `queue-service/queue-service.py` with new implementation
2. Replace `queue-service/Dockerfile` with Gunicorn-based Dockerfile
3. Update `docker-compose.yml` (expose ports, depends_on conditions)
4. `docker compose up -d --build queue-service`
5. Verify:
- `/health` returns 200
- `/status` shows consumer pool
- Submit a test request via `/enqueue`
- Verify job appears in `/status/<job_id>` with "queued" status
- Verify job gets processed and moves to "completed"
- Verify GPU sidecar metrics are being read
- Verify consumer pool shows active workers
### Phase 3: Validation
1. Submit multiple concurrent requests
2. Verify load balancing across GPUs
3. Verify circuit breaker opens on GPU failure
4. Verify DLQ captures max-retry failures
5. Verify `/dlq` and `/dlq/<id>/retry` endpoints work
6. Verify backpressure: queue depth > 40 returns 503 with retry-after
7. Monitor Redis stream lengths and consumer ack rates
---
## 7. Risk Assessment
| Risk | Mitigation |
|---|---|
| Redis stream XREADGROUP blocks indefinitely | `block=2000ms` parameter prevents infinite blocking |
| Consumer crashes mid-request | `PENDING_KEY` tracks in-flight jobs; restart re-reads pending |
| GPU sidecar temporarily unavailable | 3s timeout on sidecar calls; GPU marked "down" if unreachable |
| Circuit breaker oscillates (flapping) | `recovery_timeout=30s` prevents rapid OPENHALF_OPENOPEN cycling |
| DLQ grows unbounded | Stream maxlen=10000 approx trim; manual DLQ cleanup recommended |
| Consumer threads conflict with Flask | Gunicorn handles Flask; consumers run in separate threads |
| Job payload too large for Redis | Redis has 512MB value limit; LLM responses typically < 50KB |
---
## 8. Future Enhancements (Not in This Draft)
1. **Prometheus metrics endpoint** queue depth, processing rate, GPU latency, error rates
2. **WebSocket streaming** real-time job status updates to dashboard
3. **Batch processing** group multiple small requests for throughput optimization
4. **Model-specific queues** separate streams per model for guaranteed SLA
5. **Redis Sentinel/Cluster** high availability for Redis
6. **Rate limiting per-model** prevent one model from starving others
7. **GPU affinity for long-running jobs** keep a job on the same GPU to avoid context switches
8. **Result caching** cache responses for identical requests (hash payload, return cached result)
---
**END OF DRAFT**
This implementation transforms the queue-service from a passive storage pit into a production-ready smart queue consumer. The quick wins (QW-1 through QW-10) can be deployed independently and provide immediate value. The smart queue consumer builds on top of the quick wins.
**Recommended deployment order:**
1. Quick wins (QW-1 to QW-10) 1-2 hours
2. Smart queue consumer 2-3 hours
3. Validation & testing 1 hour
4. Monitor for 24 hours, then go live