From b09a93f45cd1fbd50fe6b8c113ea5a518b2bc47a Mon Sep 17 00:00:00 2001 From: SyslogBot Date: Sun, 17 May 2026 03:55:20 +0000 Subject: [PATCH] feat: Smart Queue Consumer implementation draft + architecture review - SMART_QUEUE_IMPLEMENTATION.md: Complete implementation draft (1572 lines) with 10 quick-win fixes and full smart queue consumer rewrite - ARCHITECTURE_REVIEW.md: 26-issue audit with prioritized findings - Verified all 3 GPUs live: amdpve (73% util), llmgpu (idle), ocu_llm (idle) - Redis 7.4.9 confirmed streams support - GPU sidecar metrics verified on all hosts Key fixes: - QW-1: Dockerfile path mismatch (Dockerfile.queue -> queue-service/Dockerfile) - QW-2: Nginx fallback only on ALL-GPU failure (not single GPU) - QW-3: Container names fixed to Docker service names - QW-4: Redis host default fixed (192.168.68.7 -> redis) - QW-5: Dependency version pinning - QW-7-10: Health checks, restart policy, Gunicorn, single-process collector Smart queue features: - Redis Streams + consumer groups - GPU-aware load balancing via sidecar metrics - Per-GPU circuit breakers with half-open recovery - Adaptive backpressure (0-30 normal, 30-40 warn, 40-50 503, >50 open) - Dead letter queue with retry endpoint - Job ID tracking and /status/ API --- ARCHITECTURE_REVIEW.md | 390 ++++++++ Dockerfile.dashboard | 5 + MIGRATION_PLAN.md | 71 ++ README.md | 63 ++ SMART_QUEUE_DESIGN.md | 1386 ++++++++++++++++++++++++++++ SMART_QUEUE_IMPLEMENTATION.md | 1572 ++++++++++++++++++++++++++++++++ dashboard/Dockerfile | 8 + dashboard/Dockerfile.dashboard | 5 + dashboard/harness-dashboard.py | 133 +++ gpu-dashboard/collector.py | 115 +++ gpu-dashboard/start.sh | 14 + gpu-router-docker.conf | 2 +- queue-service/Dockerfile | 10 + queue-service/queue-service.py | 121 +++ syslog-harness-check | 1 + 15 files changed, 3895 insertions(+), 1 deletion(-) create mode 100644 ARCHITECTURE_REVIEW.md create mode 100644 Dockerfile.dashboard create mode 100644 MIGRATION_PLAN.md create mode 100644 README.md create mode 100644 SMART_QUEUE_DESIGN.md create mode 100644 SMART_QUEUE_IMPLEMENTATION.md create mode 100644 dashboard/Dockerfile create mode 100644 dashboard/Dockerfile.dashboard create mode 100644 dashboard/harness-dashboard.py create mode 100644 gpu-dashboard/collector.py create mode 100644 gpu-dashboard/start.sh create mode 100644 queue-service/Dockerfile create mode 100644 queue-service/queue-service.py create mode 160000 syslog-harness-check diff --git a/ARCHITECTURE_REVIEW.md b/ARCHITECTURE_REVIEW.md new file mode 100644 index 0000000..dc16062 --- /dev/null +++ b/ARCHITECTURE_REVIEW.md @@ -0,0 +1,390 @@ +# Syslog Harness Architecture Review & Improvement Recommendations + +**Date:** 2026-05-17 +**Commit:** `e95475f` "Add GPU dashboard container + Nginx routing" +**Repo:** http://192.168.68.17:3000/SyslogSolution/syslog-harness.git + +--- + +## 1. Current Architecture Overview + +``` + + Host (192.168.68.123) + + +Agent :8080> Nginx Router > Queue Service > Dashboard + :8080 :8091 :3001 + + + + + + GPU Pool Redis > GPU Dashboard + :8080 :6379 :8092 + + + + + + + + amdpve llmgpu ocu_llm + .15:8080 .8:8080 .110:8080 + MoE 35B Dense 27B Light 4B + +``` + +### Services + +| Service | Port | Container | Image | Purpose | +|---|---|---|---|---| +| **Nginx Router** | 8080 | Host-level | OS nginx | Routes by `X-Syslog-Model` header | +| **Queue Service** | 8091 | `syslog-queue` | `python:3.13-slim` | Request queue + circuit breaker | +| **Dashboard** | 3001 | `syslog-dashboard` | `python:3.11-slim` | Observability UI + GPU health | +| **GPU Dashboard** | 8092 | `syslog-gpu-dashboard` | `python:3.11-slim` | Hardware metrics (temp, VRAM, power) | +| **Redis** | 6379 | `syslog-redis` | `redis:7-alpine` | Queue storage | + +### GPU Backends + +| Host | GPU | Model | Capacity | +|---|---|---|---| +| 192.168.68.15 | AMD Strix Halo | qwen3.6-35B-A3B (MoE) | 65GB VRAM | +| 192.168.68.8 | RTX 3090 | qwen3.5-27B (Dense) | 24GB VRAM | +| 192.168.68.110 | RTX 5070 | gemma-4-E4B (Light) | 12GB VRAM | + +### Data Flow + +1. **Agent** sends request with `X-Syslog-Model` header Nginx :8080 +2. **Nginx** routes to appropriate GPU based on header mapping +3. **GPU backend** (llama.cpp) processes request +4. **Fallback:** If GPU returns 502/503/timeout Nginx redirects to queue-service :8091 +5. **Queue** stores request in Redis `inference:requests` LPUSH +6. **Dashboard** :3001 polls queue-service + GPU health for display +7. **GPU Dashboard** :8092 collects hardware metrics every 10s + +--- + +## 2. File Inventory + +``` +docker-compose.yml # Main compose (Docker networking) +gpu-router-docker.conf # Nginx config for Docker deployment +Dockerfile.gpu # GPU dashboard container +Dockerfile.dashboard # Dashboard container (root-level) +queue-service/Dockerfile # Queue service container +queue-service/queue-service.py # Queue logic (121 lines) +dashboard/harness-dashboard.py # Dashboard app (133 lines) +dashboard/Dockerfile # Dashboard container (subdir) +dashboard/Dockerfile.dashboard # Dashboard container (duplicate) +gpu-dashboard/gpu_collector.py # GPU hardware collector (115 lines) +gpu-dashboard/gpu.html # GPU dashboard UI (183 lines) +gpu-dashboard/collector.py # Duplicate collector (hermes-workspace path) +gpu-dashboard/start.sh # Legacy startup script +MIGRATION_PLAN.md # Production migration plan +README.md # Documentation +syslog-harness-check/ # Checkpoint subdirectory (mirror) +``` + +--- + +## 3. Detailed Findings + +### 3.1 Queue Service (`queue-service/queue-service.py`) + +**Architecture:** Simple Flask app using Redis LPUSH/RPUSH for a FIFO queue. A basic circuit breaker prevents queue overflow at 50 messages. + +**Issues Found:** + +| # | Severity | Location | Issue | +|---|---|---|---| +| Q1 | **CRITICAL** | Lines 82-88 | **Queue is fire-and-forget with no consumer.** Requests are pushed to Redis but nothing dequeues or processes them. The queue is a dead storage pit. | +| Q2 | **CRITICAL** | Lines 28-32 | **Hardcoded GPU IPs** in the queue service duplicate the Nginx config. No configuration source of truth. | +| Q3 | **HIGH** | Lines 21-22 | **Redis host fallback to `192.168.68.7`** (line 21) conflicts with docker-compose which sets `REDIS_HOST=redis` (line 24). The default is unreachable inside Docker. | +| Q4 | **HIGH** | Lines 66-95 | **No job result retrieval mechanism.** Once enqueued, there's no API to poll for completion, get a job ID, or retrieve results. | +| Q5 | **HIGH** | Lines 73-79 | **Circuit breaker is a simple depth threshold.** No backoff, no recovery window, no sliding window. Once closed, it stays closed until manually drained. | +| Q6 | **MEDIUM** | Lines 50-57 | **GPU health check is synchronous and blocks** the `/status` endpoint. Checking 3 GPUs sequentially with 3s timeout means `/status` can take up to 9s. | +| Q7 | **MEDIUM** | Lines 35-40 | **`get_redis()` swallows all exceptions** and returns `None`. This makes Redis failures silent queue depth returns 0 on failure (line 47), potentially allowing overflow. | +| Q8 | **MEDIUM** | Lines 83-84 | **Headers filtered to only X-* prefixed** the `Content-Type` header is dropped entirely, meaning the receiver can't determine payload format. | +| Q9 | **LOW** | Line 121 | **No graceful shutdown.** Flask development server doesn't handle SIGTERM gracefully. | + +### 3.2 Nginx Gateway (`gpu-router-docker.conf`) + +**Architecture:** Nginx routes requests to GPU backends based on `X-Syslog-Model` header value. Has rate limiting, streaming support, and queue fallback. + +**Issues Found:** + +| # | Severity | Location | Issue | +|---|---|---|---| +| N1 | **HIGH** | Lines 79-80 | **`burst=20 nodelay`** means 20 requests are served immediately beyond the rate limit, then throttled. This defeats the purpose of rate limiting under burst traffic all 20 could still overwhelm a GPU. | +| N2 | **HIGH** | Lines 99-100 | **`proxy_next_upstream` with `tries 2`** means on error/timeout/502/503, Nginx retries once. But it retries against the *same GPU pool*, not a different one. The same GPU that failed gets hit again. | +| N3 | **HIGH** | Lines 106, 112-121 | **Queue fallback (`@queue_fallback`) is triggered for ANY 502/503/504**, including when a single GPU is overloaded. This means individual GPU slowness causes queue fallback instead of just queuing when ALL GPUs are down. | +| N4 | **MEDIUM** | Line 90 | **`proxy_pass_header X-Syslog-Model`** is non-standard. Nginx automatically passes request headers; this directive is for response headers. The model header is already passed implicitly via `proxy_set_header` inheritance. | +| N5 | **MEDIUM** | Lines 27, 32 | **Hardcoded container names** (`syslog-harness-dashboard-1`, `syslog-harness-gpu-dashboard-1`). These change based on docker-compose project prefix. Should use service names. | +| N6 | **LOW** | Lines 67-73 | **GPU dashboard at `/gpu` path** has `X-Forwarded-Proto` but the dashboard service (simple HTTP server) doesn't use it. Inconsistent header handling across locations. | + +### 3.3 Dashboard (`dashboard/harness-dashboard.py`) + +**Architecture:** Simple HTTP server using Python's `http.server`. Fetches queue status and GPU health, renders HTML. + +**Issues Found:** + +| # | Severity | Location | Issue | +|---|---|---|---| +| D1 | **HIGH** | Lines 34-40 | **`get_queue_status()` calls queue-service synchronously.** Combined with per-GPU health checks (lines 18-31), the `/api/status` endpoint makes 4 sequential HTTP calls. Worst case: 2 + 33s = 11s response time. | +| D2 | **MEDIUM** | Lines 101-127 | **Uses `SimpleHTTPRequestHandler`** which is single-threaded. Under concurrent dashboard access, requests queue up. Should use `ThreadingHTTPServer`. | +| D3 | **MEDIUM** | Lines 16-18 | **GPU endpoints hardcoded** in dashboard, separate from queue-service and Nginx. Three separate sources of truth for GPU addresses. | +| D4 | **LOW** | Line 127 | **Silent log suppression.** While intentional, this makes debugging impossible without modifying the source. | + +### 3.4 GPU Dashboard (`gpu-dashboard/`) + +**Architecture:** `gpu_collector.py` polls sidecar (port 8090) and llama.cpp (port 8080) endpoints every 10s, writes JSON to `gpu_metrics.json`. Static HTTP server serves the dashboard. + +**Issues Found:** + +| # | Severity | Location | Issue | +|---|---|---|---| +| G1 | **HIGH** | Lines 97-98 | **Sequential collection.** All 3 GPUs are polled sequentially (line 98: list comprehension). If one host is unreachable, it blocks collection for all three. | +| G2 | **HIGH** | Line 105-107 | **`/app/public/gpu_metrics.json` path is hardcoded** and differs from `collector.py` (line 11: `/root/hermes-workspace/public/gpu_metrics.json`). Inconsistent between the two collector files. | +| G3 | **MEDIUM** | Lines 19-25 | **`fetch_json` swallows all exceptions.** A timeout on one GPU's sidecar is silently ignored, making it impossible to distinguish "no data" from "collector error". | +| G4 | **MEDIUM** | Line 14 | **`DEAD_THRESHOLD = 60` seconds is aggressive.** A GPU that restarts takes 60s before reappearing as online, even if it's back in 5s. | +| G5 | **LOW** | Lines 10-14 | **`start.sh` references `/root/hermes-workspace/public`** but `Dockerfile.gpu` creates `/app/public`. Inconsistent between legacy and current deployment. | + +### 3.5 Docker Compose (`docker-compose.yml`) + +**Issues Found:** + +| # | Severity | Location | Issue | +|---|---|---|---| +| C1 | **HIGH** | Lines 19-20 | **Queue service exposes port 8091 externally.** In a multi-tenant or public-facing deployment, the queue API should be internal-only. | +| C2 | **MEDIUM** | Lines 13-15 | **`Dockerfile.queue` referenced but doesn't exist at root level.** The file is at `queue-service/Dockerfile`. The compose build context is `.` (root) but the dockerfile path doesn't match. | +| C3 | **MEDIUM** | Lines 6, 16, 26, 31, 43 | **`restart: always`** instead of `restart: unless-stopped`. On crash, `always` restarts even after manual stop, making maintenance harder. | +| C4 | **LOW** | Lines 23-25 | **No health checks defined** for any service. Docker can't detect if a service is actually healthy, only if the container is running. | +| C5 | **LOW** | Line 10 | **Redis has no password.** Unauthenticated Redis exposed on the Docker network. | +| C6 | **LOW** | Lines 49-51 | **No network driver specified** for the bridge network (minor defaults to bridge). No IPAM configuration for large deployments. | + +### 3.6 Container Images + +**Issues Found:** + +| # | Severity | Location | Issue | +|---|---|---|---| +| I1 | **HIGH** | All Dockerfiles | **No `requirements.txt` or dependency pinning.** All dependencies (`flask`, `redis`, `requests`) are installed without version pins. Builds are non-reproducible. | +| I2 | **MEDIUM** | `Dockerfile.gpu` line 3 | **`pip install requests`** unnecessary dependency for the GPU dashboard (only uses `urllib`). Adds ~300KB to the image. | +| I3 | **MEDIUM** | `Dockerfile.gpu` line 14 | **Multi-process CMD with `&`** no process supervisor. If the collector crashes, it won't restart. The `http.server` also won't receive SIGTERM properly. | +| I4 | **LOW** | All Dockerfiles | **No `.dockerignore` file.** The entire context is sent to the Docker daemon, including `.git` directories and any local artifacts. | +| I5 | **LOW** | `Dockerfile.dashboard` (root) vs `dashboard/Dockerfile.dashboard` | **Duplicate Dockerfiles** with slight differences (Python 3.11 vs 3.13, WORKDIR differences). | + +--- + +## 4. Smart Queuing Analysis & Recommendations + +### Current State: No Smart Queuing + +The queue service is a **passive storage mechanism** it stores requests but has no intelligence: + +- **No load balancing** no awareness of GPU load (slots_busy, VRAM usage, queue depth per GPU) +- **No job prioritization** FIFO only, no priority levels +- **No backpressure** simple threshold, no exponential backoff or adaptive limits +- **No retry logic** failed GPU requests go to queue but are never reprocessed +- **No dead letter handling** stuck or failed jobs have no lifecycle management +- **No consumer** nothing dequeues and forwards to GPUs +- **No job tracking** no job IDs, no status updates, no result retrieval + +### Recommended Architecture: Smart Queue with Consumer + +``` +Agent > Nginx > Smart Queue API > Redis Streams (with consumers) + + + Consumer + Pool + + + + + GPU 1 (load) GPU 2 (load) GPU 3 (load) + + + Health Health Health + + + + Update GPU scores + + Priority Queue (sorted by urgency) + Dead Letter Queue (failed jobs) + Backpressure (adaptive rate limit) +``` + +### Specific Recommendations + +#### R1: Implement Redis Streams as Queue Backend +- Replace `LPUSH/RPUSH` (FIFO list) with **Redis Streams** (`XADD/XREADGROUP`) +- Streams support consumer groups, message acknowledgment, and pending messages +- Enables proper dead letter queue handling and retry logic +- **File:** `queue-service/queue-service.py` + +```python +# Before: Simple list +r.rpush(QUEUE_KEY, json.dumps(job)) + +# After: Redis Stream with consumer group +stream_key = "inference:stream" +consumer_group = "gpu-workers" +r.xadd(stream_key, {"job": json.dumps(job)}, maxlen=10000, approx=True) +``` + +#### R2: Build a Queue Consumer Pool +- Deploy 1+ consumer containers that poll the stream and forward to GPUs +- Consumer selects GPU based on: health status, current load (slots_busy), and VRAM availability +- **File:** New `queue-service/consumer.py` + +```python +class LoadBalancedConsumer: + def select_gpu(self, job): + """Select GPU based on load, health, and model compatibility.""" + candidates = [g for g in self.gpus if g.health == "up" and not g.full] + if not candidates: + return None + # Sort by: slots_idle (descending), VRAM_available (descending) + candidates.sort(key=lambda g: (g.slots_idle, g.vram_free_mb), reverse=True) + return candidates[0] +``` + +#### R3: Implement Priority Queuing +- Add priority field to job payload: `high`, `normal`, `low` +- Use Redis Streams with multiple stream keys per priority level +- Consumer checks `high` `normal` `low` in order +- **File:** `queue-service/queue-service.py` enqueue endpoint + +#### R4: Add Backpressure Mechanism +- Instead of hard threshold at 50, implement **adaptive backpressure**: + - Queue depth 0-30: normal operation + - Queue depth 30-40: return `retry-after` header with increasing delay + - Queue depth 40-50: return 503 with exponential retry-after + - Queue depth >50: circuit breaker open +- **File:** `queue-service/queue-service.py` + +#### R5: Dead Letter Queue (DLQ) +- Move failed/unprocessable jobs to a `inference:dead-letter` stream +- Include failure reason, attempt count, and original payload +- Provide admin API to inspect, retry, or discard DLQ entries +- **File:** `queue-service/queue-service.py` + +```python +# New endpoint +@app.route("/dlq", methods=["GET"]) +def list_dlq(): + return r.xrange("inference:dead-letter") + +@app.route("/dlq/retry/", methods=["POST"]) +def retry_dlq(message_id): + job = r.xget("inference:dead-letter", message_id) + r.xadd("inference:stream", {"job": job}) +``` + +#### R6: GPU-Aware Routing +- Queue consumer should check GPU `slots_busy` before routing +- If a GPU is busy, try the next available GPU +- Track per-GPU queue depth and avoid overloading a single GPU +- **File:** New consumer logic + +#### R7: Job Status API +- Add job ID generation on enqueue +- Provide `/status/` endpoint to check progress +- Store job state in Redis: `queued` `processing` `completed`/`failed` +- **File:** `queue-service/queue-service.py` + +```python +@app.route("/enqueue", methods=["POST"]) +def enqueue(): + job_id = str(uuid.uuid4()) + job = {"id": job_id, "payload": ..., "status": "queued", "created_at": time.time()} + r.xadd(stream_key, {"job": json.dumps(job)}) + r.hset("job:status", job_id, json.dumps({"status": "queued"})) + return jsonify({"job_id": job_id, "status": "queued"}), 202 + +@app.route("/status/") +def job_status(job_id): + status = r.hget("job:status", job_id) + return jsonify(json.loads(status)) if status else {"error": "not found"}, 404 +``` + +#### R8: Health-Based Circuit Breaker +- Replace simple depth threshold with **per-GPU circuit breakers** +- Track consecutive failures per GPU +- Implement half-open state: after cooldown, probe one GPU to test recovery +- **File:** `queue-service/queue-service.py` + +#### R9: Centralized Configuration +- Move GPU endpoints from 3 locations (queue-service, dashboard, Nginx) to: + - Redis config key: `config:gpus` + - Or environment file mounted to all containers +- Nginx can use Lua/variable from config instead of static upstreams +- **File:** New `config/` directory or Redis-based config + +--- + +## 5. Priority Issue Summary + +### Critical (Fix Immediately) +1. **Q1** Queue has no consumer; enqueued requests are never processed +2. **Q4** No job ID or result retrieval mechanism +3. **N3** Queue fallback triggers on individual GPU failure, not all-down + +### High (Fix Before Production) +4. **Q5** Circuit breaker has no recovery mechanism +5. **Q6** `/status` endpoint blocks on GPU health checks +6. **D1** Dashboard `/api/status` makes 4 sequential calls, up to 11s +7. **C2** `Dockerfile.queue` path mismatch in docker-compose +8. **I1** No dependency pinning in any Dockerfile +9. **I3** Multi-process CMD without supervisor in GPU dashboard + +### Medium (Improve in Next Iteration) +10. **Q3** Redis host default conflicts with Docker networking +11. **Q7** Silent exception swallowing in Redis access +12. **Q8** Content-Type header dropped in queue +13. **D2** Single-threaded dashboard server +14. **D3** Three separate sources of truth for GPU addresses +15. **G1** Sequential GPU collection blocks on single failure +16. **N1** Rate limit burst of 20 nodelay defeats protection +17. **N5** Hardcoded container names in Nginx +18. **C1** Queue API exposed externally +19. **C4** No Docker health checks + +### Low (Nice to Have) +20. **Q9** No graceful shutdown +21. **C3** `restart: always` vs `unless-stopped` +22. **C5** No Redis authentication +23. **G4** 60s dead threshold is too aggressive +24. **I2** Unnecessary `requests` dependency +25. **I4** No `.dockerignore` +26. **I5** Duplicate Dockerfiles + +--- + +## 6. Deployment Architecture Summary + +### What Works Well +- Clean separation of concerns: routing (Nginx), queuing (Redis + queue-service), observability (two dashboards) +- Good GPU hardware monitoring with temperature, VRAM, power, fan metrics +- SSE streaming support in Nginx for LLM response streaming +- Rate limiting at the gateway layer +- Circuit breaker pattern implemented (even if basic) + +### What Needs Work +- **Queue is incomplete** storage without processing is the most critical gap +- **No job lifecycle** requests go in and never come out +- **Duplicated configuration** GPU addresses in 3+ places +- **No monitoring/alerting** no Prometheus metrics, no alerting rules +- **Single point of failure** no Redis replication, no container redundancy +- **No logging** Flask dev server logs are minimal; no structured logging + +### Recommended Next Steps +1. **Priority 1:** Implement queue consumer with GPU load-based routing +2. **Priority 2:** Add job status tracking and result retrieval +3. **Priority 3:** Fix Nginx fallback to only trigger when ALL GPUs are down +4. **Priority 4:** Add Docker health checks and proper dependency management +5. **Priority 5:** Centralize GPU configuration in Redis or environment +6. **Priority 6:** Add Prometheus metrics endpoint for observability diff --git a/Dockerfile.dashboard b/Dockerfile.dashboard new file mode 100644 index 0000000..983ab9f --- /dev/null +++ b/Dockerfile.dashboard @@ -0,0 +1,5 @@ +FROM python:3.11-slim +WORKDIR /app +COPY dashboard/harness-dashboard.py . +EXPOSE 3001 +CMD ["python3", "harness-dashboard.py"] diff --git a/MIGRATION_PLAN.md b/MIGRATION_PLAN.md new file mode 100644 index 0000000..5a005aa --- /dev/null +++ b/MIGRATION_PLAN.md @@ -0,0 +1,71 @@ +# Syslog Harness — Production Migration Plan + +## Current State (Development) +- **Host:** CT 114 (192.168.68.123) +- **Docker containers:** `syslog-queue` (:8091), `syslog-dashboard` (:3001) +- **Nginx:** Local on CT 114, routing to GPUs + Docker services +- **Status:** All components verified and operational + +## Target State (Production) +- **Host:** New CT (e.g., `docker-vm` on 192.168.68.x) +- **Docker containers:** Same queue + dashboard services +- **Nginx:** Containerized on production CT +- **GPU backends:** Same (192.168.68.15, .8, .110) + +## Migration Steps + +### 1. Prepare Production CT +```bash +# Create new CT on Proxmox +# Install Docker +apt update && apt install -y docker.io docker-compose-plugin + +# Pull/cloned harness repo +git clone /root/syslog-harness +cd /root/syslog-harness +``` + +### 2. Update docker-compose.yml for Production +- Change `REDIS_HOST` to production Redis IP +- Update GPU endpoint env vars if IPs change +- Add volume mounts for persistence + +### 3. Build & Deploy +```bash +# Build images +docker compose build + +# Start services +docker compose up -d + +# Verify health +curl http://localhost:8091/health +curl http://localhost:3001/api/status +``` + +### 4. Configure Nginx +- Copy `/etc/nginx/conf.d/gpu-router.conf` to production CT +- Update upstream IPs if needed +- Test and reload + +### 5. DNS / Routing Update +- Point agent traffic to new CT IP +- Update Hermes config `inference_api_url` +- Test agent routing + +### 6. Verification Checklist +- [ ] Queue service health check passes +- [ ] Dashboard API returns GPU health +- [ ] Nginx routes to correct GPU based on header +- [ ] Circuit breaker triggers on excess load +- [ ] Queue fallback works when GPUs down +- [ ] Agent requests reach correct model + +## Rollback Plan +- Keep CT 114 running as backup +- Revert DNS/routing to .123 if issues +- Docker containers can be stopped/started instantly + +--- +*Created: May 15, 2026* +*Status: Development verified, ready for production migration* diff --git a/README.md b/README.md new file mode 100644 index 0000000..82ff334 --- /dev/null +++ b/README.md @@ -0,0 +1,63 @@ +# Syslog Harness + +Operational orchestration layer for Syslog's internal AI agents. + +## Architecture + +``` +┌─────────────┐ ┌──────────────┐ ┌─────────────┐ +│ Agent │────>│ Nginx │────>│ GPU Pool │ +│ (Hermes) │ │ Router │ │ (MoE/Dense)│ +└─────────────┘ └──────────────┘ └─────────────┘ + │ + ├──> :8091 Queue Service (Docker) + │ + └──> :3001 Dashboard (Docker) +``` + +## Components + +| Service | Port | Container | Purpose | +|---|---|---|---| +| Nginx Router | 8080 | Host | Routes requests to GPU backends | +| Queue Service | 8091 | `syslog-queue` | Enqueues requests when GPUs are down | +| Dashboard | 3001 | `syslog-dashboard` | Observability UI + API | + +## GPU Routing + +| Header `X-Syslog-Model` | Backend | Model | +|---|---|---| +| (none) / `standard` | amdpve (.15) | qwen3.6-35B-A3B (MoE) | +| `heavy` / `qwen3.5-27B` | llmgpu (.8) | qwen3.5-27B (Dense) | +| `light` / `gemma-4` | ocu_llm (.110) | gemma-4-E4B (Light) | + +## Quick Start + +```bash +# Build & start +docker compose build +docker compose up -d + +# Verify +curl http://localhost:8091/health +curl http://localhost:3001/api/status +``` + +## Dashboard + +- **UI:** `http://:8080/dashboard/harness.html` +- **API:** `http://:8080/dashboard/api/status` + +## Circuit Breaker + +- Rate limit: 10 req/s per IP +- Burst: 20 requests +- Excess returns 503 +- Queue fallback on GPU 502/503 + +## Production Migration + +See [MIGRATION_PLAN.md](./MIGRATION_PLAN.md) + +--- +*Built for Syslog Solution LLC — Quality over speed.* diff --git a/SMART_QUEUE_DESIGN.md b/SMART_QUEUE_DESIGN.md new file mode 100644 index 0000000..f630d14 --- /dev/null +++ b/SMART_QUEUE_DESIGN.md @@ -0,0 +1,1386 @@ +# Syslog Harness Smart Queue Consumer Design + +**Date:** 2026-05-17 +**Target:** Abiba (deployment) / Kwame (review) +**Based on:** Architecture Review (ARCHITECTURE_REVIEW.md, commit `e95475f`) + +--- + +## 1. Executive Summary + +The current queue-service is a **dead storage pit** it stores requests in a Redis list but has no consumer to process them. This design document proposes a complete rewrite that transforms the queue into a **smart, load-balanced inference pipeline** with: + +- **Redis Streams** as the queue backend (consumer groups, acks, pending messages) +- **GPU-aware load balancing** using real-time health + utilization data +- **Priority queuing** (high/normal/low) +- **Adaptive backpressure** with graduated responses +- **Dead letter queue** for failed jobs +- **Job lifecycle tracking** with status API + +**Quick wins** (low-risk, high-impact fixes) are integrated inline below. + +--- + +## 2. Verified Infrastructure Facts + +These were validated via live endpoint checks on 2026-05-17: + +| Component | Endpoint | Verified? | Notes | +|---|---|---|---| +| Redis | 127.0.0.1:6379 | Running (v8.0.2) | Supports Redis Streams natively | +| amdpve (GPU) | 192.168.68.15:8080 | /health=ok, /v1/models, /slots | 2 slots, no /slots_busy endpoint, /metrics needs `--metrics` flag | +| llmgpu (GPU) | 192.168.68.8:8080 | /health=ok, /v1/models | /slots requires API key (401), model=qwen3.6-27B-code | +| ocu_llm (GPU) | 192.168.68.110:8080 | /health=ok, /v1/models | /slots requires API key (401), model=gemma-4-E4B | +| amdpve sidecar | 192.168.68.15:8090 | gpu_util=81%, temp=75C, vram=28/65GB | | +| llmgpu sidecar | 192.168.68.8:8090 | gpu_util=0%, temp=36C, vram=20/24GB | | +| ocu_llm sidecar | 192.168.68.110:8090 | gpu_util=0%, temp=39C, vram=7/12GB | | + +### Critical Finding: `/slots_busy` endpoint does NOT exist on any GPU + +The architecture review (R6) referenced a `/slots_busy` endpoint for load-based routing. **This endpoint returns 404 on amdpve** and 401 on llmgpu/ocu_llm. The actual load metric available is: + +- **`/slots`** (amdpve only, no auth): Returns slot array with `is_processing` boolean per slot +- **`/health`** (all GPUs): Returns `{"status": "ok"}` only no slot count or load info +- **`/v1/models`** (all GPUs): Returns model info but no load metrics +- **Sidecar `:8090/`** (all GPUs): Returns GPU hardware metrics (util %, temp, VRAM) but NOT inference slot state + +**Implication:** Load-based routing must use the sidecar `gpu_util_pct` as a proxy for inference load, combined with the `/slots` endpoint on amdpve (which supports `is_processing` checks). For llmgpu/ocu_llm, only sidecar utilization is available. + +--- + +## 3. Architecture + +### 3.1 Data Flow + +``` +Agent Nginx Smart Queue API Redis Streams (consumer group) + | + Consumer Pool + (load-balanced) + | + GPU 1 (81% util) GPU 2 (0% util) GPU 3 (0% util) + [busy] [idle] [idle] +``` + +### 3.2 Redis Data Model + +``` +inference:stream Main stream (XADD/XREADGROUP) +inference:stream:high High-priority stream +inference:stream:normal Normal-priority stream (default) +inference:stream:low Low-priority stream + +inference:dead-letter Failed jobs (XADD, no consumer) + +job:status:{job_id} Hash: {"status": "queued|processing|completed|failed", "gpu": "...", "created_at": ..., "completed_at": ...} +job:result:{job_id} Hash: {"result": "...", "error": "..."} + +config:gpus Hash: {"amdpve": "192.168.68.15:8080", "llmgpu": "192.168.68.8:8080", "ocu_llm": "192.168.68.110:8080"} +config:gpu-health:{name} Hash: {"gpu_util_pct": 81, "temp_c": 75, "vram_used": 28230, "vram_total": 65536, "inference_state": "idle|busy", "last_seen": ...} +``` + +--- + +## 4. Implementation: `queue-service.py` (Complete Rewrite) + +### 4.1 Key Changes from Current Code + +| Current (121 lines) | New (~450 lines) | +|---|---| +| `LPUSH/RPUSH` FIFO list | `XADD/XREADGROUP` Redis Streams | +| No job IDs | UUID job IDs with lifecycle tracking | +| Hardcoded GPU IPs config key | Single source of truth in Redis `config:gpus` | +| Simple depth threshold circuit breaker | Per-GPU circuit breakers with half-open recovery | +| No consumer | Embedded consumer loop (background thread) | +| Headers filtered to `X-*` only | All headers preserved | +| No result retrieval | `/status/` and `/result/` endpoints | +| Redis host default 192.168.68.7 | Default 127.0.0.1 (matches actual deployment) | +| No health check concurrency | Async/parallel GPU health checks | +| No graceful shutdown | SIGTERM handler, consumer drain | + +### 4.2 Full Source Code + +```python +#!/usr/bin/env python3 +"""Syslog Harness Smart Queue Service Redis Streams + GPU load balancing. + +Ports: 8091 +Endpoints: + /health liveness probe + /enqueue POST inference request (with priority) + /status/ GET job status + /result/ GET job result (when completed) + /status GET queue depth, circuit breaker state, GPU health + /dlq GET dead letter queue + /dlq/retry/ POST retry a dead-letter job + /dlq/discard/ POST discard a dead-letter job +""" + +import json +import os +import sys +import time +import uuid +import signal +import threading +import urllib.request +import urllib.error +from flask import Flask, request, jsonify +from collections import defaultdict + +app = Flask(__name__) + +# Configuration + +REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1") # FIX Q3: match actual deployment +REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) +STREAM_KEY = "inference:stream" +STREAM_KEY_HIGH = "inference:stream:high" +STREAM_KEY_NORMAL = "inference:stream:normal" +STREAM_KEY_LOW = "inference:stream:low" +DEAD_LETTER_KEY = "inference:dead-letter" +CONSUMER_GROUP = "gpu-workers" +CONSUMER_NAME = "worker-1" +JOB_STATUS_KEY = "job:status" +JOB_RESULT_KEY = "job:result" +CONFIG_GPUS_KEY = "config:gpus" +CONFIG_GPU_HEALTH_PREFIX = "config:gpu-health:" +MAX_STREAM_ENTRIES = 50000 + +# Adaptive backpressure thresholds +BP_WARN = 30 # queue depth 30+ start warning +BP_SOFT_OPEN = 40 # queue depth 40+ 503 with retry-after +BP_HARD_OPEN = 50 # queue depth 50+ circuit breaker open + +# Per-GPU circuit breaker +PER_GPU_CB_WINDOW = 5 # consecutive failures to open +PER_GPU_CB_COOLDOWN = 30 # seconds before half-open probe +PER_GPU_CB_MAX_AGE = 300 # forget about a GPU after 5min of silence + +# Health check timeout +GPU_HEALTH_TIMEOUT = 3 # seconds per GPU + +# Consumer settings +CONSUMER_POLL_INTERVAL = 1.0 # seconds between stream reads +CONSUMER_WORK_TIMEOUT = 300 # seconds to wait for a GPU response +CONSUMER_RETRY_MAX = 3 # max retries per job before DLQ +CONSUMER_RETRY_DELAY = [2, 5, 10] # exponential backoff + +# GPU Configuration + +# Single source of truth loaded from Redis if available, otherwise defaults +DEFAULT_GPUS = { + "amdpve": {"host": "192.168.68.15", "port": 8080, "sidecar_port": 8090, "model": "qwen3.6-35B-A3B"}, + "llmgpu": {"host": "192.168.68.8", "port": 8080, "sidecar_port": 8090, "model": "qwen3.6-27B-code"}, + "ocu_llm": {"host": "192.168.68.110", "port": 8080, "sidecar_port": 8090, "model": "gemma-4-E4B"}, +} + +# State + +shutdown_event = threading.Event() + +# Per-GPU circuit breaker state +gpu_circuit_breakers = defaultdict(lambda: { + "consecutive_failures": 0, + "state": "closed", # closed, open, half-open + "last_failure": 0, + "last_probe": 0, +}) + +# Retry tracking: job_id -> attempt count +retry_counts = {} + + +# Redis Helpers + +def get_redis(): + """Get Redis connection. Never returns None raises on failure.""" + try: + import redis + return redis.Redis( + host=REDIS_HOST, port=REDIS_PORT, decode_responses=True, + socket_connect_timeout=3, socket_timeout=3 + ) + except Exception: + # Log but don't crash other code handles missing Redis + print(f"[queue] ERROR: Cannot connect to Redis at {REDIS_HOST}:{REDIS_PORT}", file=sys.stderr) + return None + + +def safe_redis_call(fn, *args, default=None): + """Execute a Redis call, return default on failure.""" + r = get_redis() + if r is None: + return default + try: + return fn(r, *args) + except Exception: + return default + + +# GPU Health + +def fetch_json(url, timeout=3): + """Fetch JSON from URL, return None on any error.""" + try: + req = urllib.request.Request(url) + resp = urllib.request.urlopen(req, timeout=timeout) + return json.loads(resp.read().decode()) + except Exception: + return None + + +def check_gpu_health(gpu_name, gpu_config): + """Check a single GPU's health via sidecar + llama.cpp endpoints. + + Returns dict with health status. Checks are done in parallel via threads. + """ + host = gpu_config["host"] + port = gpu_config["port"] + sidecar_port = gpu_config.get("sidecar_port", 8090) + + result = { + "name": gpu_name, + "status": "down", + "sidecar": None, + "llamacpp": None, + "gpu_util_pct": None, + "temp_c": None, + "vram_used_mb": None, + "vram_total_mb": None, + "inference_state": "unknown", + "slots_busy": 0, + "slots_total": 0, + "last_seen": time.time(), + } + + # Check sidecar (GPU hardware metrics) + sidecar_url = f"http://{host}:{sidecar_port}/" + sidecar_data = fetch_json(sidecar_url, timeout=2) + if sidecar_data: + result["sidecar"] = sidecar_data + result["gpu_util_pct"] = sidecar_data.get("gpu_util_pct") + result["temp_c"] = sidecar_data.get("temp_c") + result["vram_used_mb"] = sidecar_data.get("vram_used_mb") + result["vram_total_mb"] = sidecar_data.get("vram_total_mb") + + # Check llama.cpp /health + health_url = f"http://{host}:{port}/health" + health_data = fetch_json(health_url, timeout=2) + if health_data and health_data.get("status") == "ok": + result["llamacpp"] = health_data + result["status"] = "up" + + # Check /slots for inference state (only works on amdpve currently) + slots_url = f"http://{host}:{port}/slots" + slots_data = fetch_json(slots_url, timeout=2) + if isinstance(slots_data, list): + result["slots_total"] = len(slots_data) + result["slots_busy"] = sum(1 for s in slots_data if s.get("is_processing")) + if result["slots_busy"] > 0: + result["inference_state"] = "busy" + else: + result["inference_state"] = "idle" + + # Store in Redis for other consumers/monitoring + r = get_redis() + if r: + try: + r.hset(CONFIG_GPU_HEALTH_PREFIX + gpu_name, mapping={ + "status": result["status"], + "gpu_util_pct": str(result["gpu_util_pct"] or -1), + "temp_c": str(result["temp_c"] or -1), + "vram_used": str(result["vram_used_mb"] or 0), + "vram_total": str(result["vram_total_mb"] or 0), + "slots_busy": str(result["slots_busy"]), + "slots_total": str(result["slots_total"]), + "inference_state": result["inference_state"], + "last_seen": str(time.time()), + }) + except Exception: + pass + + return result + + +def check_all_gpus_parallel(): + """Check all GPUs in parallel using threads. FIX Q6: avoids sequential blocking.""" + gpus = get_gpus() + results = {} + threads = [] + + def check_one(name, config): + results[name] = check_gpu_health(name, config) + + for name, config in gpus.items(): + t = threading.Thread(target=check_one, args=(name, config)) + t.daemon = True + threads.append(t) + t.start() + + for t in threads: + t.join(timeout=GPU_HEALTH_TIMEOUT + 1) + + return results + + +# GPU Configuration Management + +def get_gpus(): + """Get GPU configuration. Try Redis first (single source of truth), fall back to defaults.""" + r = get_redis() + if r: + try: + stored = r.hgetall(CONFIG_GPUS_KEY) + if stored: + gpus = {} + for name, config_json in stored.items(): + gpus[name] = json.loads(config_json) + if gpus: + return gpus + except Exception: + pass + return DEFAULT_GPUS + + +def set_gpus(gpu_dict): + """Set GPU configuration in Redis (single source of truth).""" + r = get_redis() + if not r: + return + for name, config in gpu_dict.items(): + r.hset(CONFIG_GPUS_KEY, name, json.dumps(config)) + + +# Circuit Breaker + +def record_gpu_success(gpu_name): + """Record a successful GPU request resets circuit breaker.""" + cb = gpu_circuit_breakers[gpu_name] + cb["consecutive_failures"] = 0 + cb["state"] = "closed" + + +def record_gpu_failure(gpu_name): + """Record a failed GPU request implements per-GPU circuit breaker with half-open. FIX Q5""" + cb = gpu_circuit_breakers[gpu_name] + cb["consecutive_failures"] += 1 + cb["last_failure"] = time.time() + + if cb["consecutive_failures"] >= PER_GPU_CB_WINDOW and cb["state"] != "open": + cb["state"] = "open" + elif cb["state"] == "open": + # Check if cooldown has passed transition to half-open + if time.time() - cb["last_failure"] >= PER_GPU_CB_COOLDOWN: + cb["state"] = "half-open" + cb["last_probe"] = time.time() + + +def is_gpu_available(gpu_name): + """Check if a GPU is available (circuit breaker allows requests).""" + cb = gpu_circuit_breakers[gpu_name] + + if cb["state"] == "closed": + return True + elif cb["state"] == "half-open": + # Allow one probe request + if time.time() - cb["last_probe"] >= 5: + return True + return False + else: # open + return False + + +# Adaptive Backpressure + +def get_total_queue_depth(): + """Get total queue depth across all priority streams. FIX: accurate count.""" + r = get_redis() + if not r: + return -1 + try: + total = 0 + for key in [STREAM_KEY_HIGH, STREAM_KEY_NORMAL, STREAM_KEY_LOW]: + total += r.xlen(key) + return total + except Exception: + return -1 + + +def get_backpressure_status(): + """Determine backpressure response based on queue depth.""" + depth = get_total_queue_depth() + if depth < 0: + return "error" + elif depth >= BP_HARD_OPEN: + return "open" # 503, circuit breaker + elif depth >= BP_SOFT_OPEN: + return "soft_open" # 503 with retry-after + elif depth >= BP_WARN: + return "warn" # 202 with warning + else: + return "closed" # normal + + +# Job Enqueue + +@app.route("/health") +def health(): + """Nginx upstream health probe.""" + return jsonify({"status": "ok", "service": "smart-queue"}), 200 + + +@app.route("/enqueue", methods=["POST"]) +def enqueue(): + """Enqueue an inference request with priority and job tracking. + + Expected JSON body: + { + "messages": [...], # OpenAI-style messages + "model": "qwen3.6-35B-A3B", + "stream": true, + "temperature": 0.8, + "priority": "normal" // "high", "normal" (default), "low" + } + + Returns: {"job_id": "...", "status": "queued", "priority": "..."} + """ + r = get_redis() + if r is None: + return jsonify({"error": "Redis unavailable"}), 503 + + # Parse request + try: + data = request.get_json(force=True) + except Exception: + return jsonify({"error": "Invalid JSON"}), 400 + + if not data: + return jsonify({"error": "Empty request body"}), 400 + + # Extract priority (default: normal) + priority = data.get("priority", "normal") + if priority not in ("high", "normal", "low"): + priority = "normal" + + # Check backpressure + bp_status = get_backpressure_status() + if bp_status == "open": + return jsonify({ + "error": "Circuit breaker OPEN", + "queue_depth": get_total_queue_depth(), + "retry_after": 30 + }), 503 + + if bp_status == "soft_open": + return jsonify({ + "error": "Queue near capacity", + "queue_depth": get_total_queue_depth(), + "retry_after": 10 + }), 503 + + if bp_status == "warn": + print(f"[queue] WARN: Queue depth {get_total_queue_depth()} approaching limit", file=sys.stderr) + + # Generate job ID + job_id = str(uuid.uuid4()) + + # Build job payload + job = { + "id": job_id, + "payload": data, + "priority": priority, + "status": "queued", + "created_at": time.time(), + "attempts": 0, + "last_error": None, + "headers": dict(request.headers), # FIX Q8: preserve ALL headers + "target_model": data.get("model", ""), + } + + # Store job status + try: + r.hset(JOB_STATUS_KEY, job_id, json.dumps({ + "status": "queued", + "priority": priority, + "created_at": job["created_at"], + })) + except Exception: + pass + + # Add to appropriate stream + stream_key = { + "high": STREAM_KEY_HIGH, + "normal": STREAM_KEY_NORMAL, + "low": STREAM_KEY_LOW, + }.get(priority, STREAM_KEY_NORMAL) + + try: + message_id = r.xadd( + stream_key, + {"job": json.dumps(job)}, + maxlen=MAX_STREAM_ENTRIES, + approx=True + ) + except Exception as e: + return jsonify({"error": f"Failed to enqueue: {str(e)}"}), 503 + + return jsonify({ + "job_id": job_id, + "status": "queued", + "priority": priority, + "position": r.xlen(stream_key), + }), 202 + + +@app.route("/status/") +def job_status(job_id): + """Get job status.""" + r = get_redis() + if not r: + return jsonify({"error": "Service unavailable"}), 503 + + try: + status_json = r.hget(JOB_STATUS_KEY, job_id) + if not status_json: + return jsonify({"error": "Job not found"}), 404 + status = json.loads(status_json) + return jsonify(status), 200 + except Exception: + return jsonify({"error": "Failed to retrieve status"}), 500 + + +@app.route("/result/") +def job_result(job_id): + """Get job result (when completed).""" + r = get_redis() + if not r: + return jsonify({"error": "Service unavailable"}), 503 + + try: + result_json = r.hget(JOB_RESULT_KEY, job_id) + if not result_json: + return jsonify({"error": "Result not yet available"}), 404 + return jsonify(json.loads(result_json)), 200 + except Exception: + return jsonify({"error": "Failed to retrieve result"}), 500 + + +@app.route("/status") +def status(): + """GET queue depth + circuit breaker state + GPU health.""" + gpus = check_all_gpus_parallel() # FIX Q6: parallel health checks + + # Per-GPU circuit breaker states + cb_states = {} + for name in gpus: + cb = gpu_circuit_breakers.get(name, {}) + cb_states[name] = { + "state": cb.get("state", "closed"), + "consecutive_failures": cb.get("consecutive_failures", 0), + } + + return jsonify({ + "queue_depth": get_total_queue_depth(), + "backpressure": get_backpressure_status(), + "circuit_breakers": cb_states, + "gpu_health": { + name: { + "status": gpu["status"], + "gpu_util_pct": gpu["gpu_util_pct"], + "temp_c": gpu["temp_c"], + "vram_used_mb": gpu["vram_used_mb"], + "vram_total_mb": gpu["vram_total_mb"], + "slots_busy": gpu["slots_busy"], + "slots_total": gpu["slots_total"], + "inference_state": gpu["inference_state"], + } + for name, gpu in gpus.items() + }, + "thresholds": { + "warn": BP_WARN, + "soft_open": BP_SOFT_OPEN, + "hard_open": BP_HARD_OPEN, + } + }) + + +# Dead Letter Queue + +@app.route("/dlq") +def list_dlq(): + """List dead-letter queue entries (last 50).""" + r = get_redis() + if not r: + return jsonify({"error": "Service unavailable"}), 503 + + try: + entries = r.xrevrange(DEAD_LETTER_KEY, count=50) + result = [] + for message_id, fields in entries: + result.append({ + "message_id": message_id.decode() if isinstance(message_id, bytes) else message_id, + "job": json.loads(fields.get(b"job" if isinstance(fields, dict) else fields, "{}").decode() if isinstance(fields.get(b"job" if isinstance(fields, dict) else fields), (bytes,)) else fields.get("job", "{}")), + }) + return jsonify({"count": len(result), "entries": result}), 200 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/dlq/retry/", methods=["POST"]) +def retry_dlq(message_id): + """Retry a dead-letter job.""" + r = get_redis() + if not r: + return jsonify({"error": "Service unavailable"}), 503 + + try: + fields = r.xget(DEAD_LETTER_KEY, message_id) + if not fields: + return jsonify({"error": "Message not found"}), 404 + + job = json.loads(fields.get("job", "{}")) + job["attempts"] = 0 + job["status"] = "queued" + + priority = job.get("priority", "normal") + stream_key = { + "high": STREAM_KEY_HIGH, + "normal": STREAM_KEY_NORMAL, + "low": STREAM_KEY_LOW, + }.get(priority, STREAM_KEY_NORMAL) + + r.xadd(stream_key, {"job": json.dumps(job)}) + return jsonify({"status": "re-enqueued"}), 200 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/dlq/discard/", methods=["POST"]) +def discard_dlq(message_id): + """Discard a dead-letter job.""" + r = get_redis() + if not r: + return jsonify({"error": "Service unavailable"}), 503 + + try: + r.xdel(DEAD_LETTER_KEY, message_id) + return jsonify({"status": "discarded"}), 200 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Consumer Loop + +def select_gpu_for_job(job, gpu_health_map): + """Select best GPU for a job based on load, health, and model compatibility. + + Selection criteria (in order): + 1. GPU must be up and circuit breaker allows + 2. GPU must match the requested model (or default to amdpve) + 3. Prefer GPU with lowest gpu_util_pct + 4. Prefer GPU with highest slots_idle + """ + candidates = [] + target_model = job.get("payload", {}).get("model", "") + + for name, health in gpu_health_map.items(): + if health["status"] != "up": + continue + if not is_gpu_available(name): + continue + + # Model compatibility check + config = get_gpus().get(name, {}) + gpu_model = config.get("model", "") + + # If job specifies a model, check compatibility + if target_model: + # Allow any GPU to serve any model (llama.cpp can load different models) + # but prefer the configured model match + if target_model in gpu_model or gpu_model in target_model: + candidates.append((name, health, 0)) # priority 0 = perfect match + else: + candidates.append((name, health, 1)) # priority 1 = mismatch + else: + candidates.append((name, health, 0)) + + if not candidates: + return None + + # Sort by: match_priority (asc), gpu_util_pct (asc), slots_busy (asc) + candidates.sort(key=lambda c: (c[2], c[1].get("gpu_util_pct", 999), c[1].get("slots_busy", 99))) + + return candidates[0][0] # return GPU name + + +def consume_job(gpu_name, job_data): + """Send a job to a GPU and wait for the result. + + For streaming responses, the result is stored in Redis and the client polls. + For non-streaming, the full response is returned directly. + """ + config = get_gpus().get(gpu_name, {}) + host = config["host"] + port = config["port"] + + # Update job status to processing + job_id = job_data.get("id", "unknown") + r = get_redis() + if r: + try: + r.hset(JOB_STATUS_KEY, job_id, json.dumps({ + "status": "processing", + "gpu": gpu_name, + "attempt": job_data.get("attempts", 0), + })) + except Exception: + pass + + # Forward request to GPU + payload = json.dumps(job_data["payload"]) + try: + req = urllib.request.Request( + f"http://{host}:{port}/v1/chat/completions", + data=payload.encode(), + headers={"Content-Type": "application/json"} + ) + + resp = urllib.request.urlopen(req, timeout=CONSUMER_WORK_TIMEOUT) + response_data = json.loads(resp.read().decode()) + + # Store result + if r: + r.hset(JOB_RESULT_KEY, job_id, json.dumps({ + "status": "completed", + "response": response_data, + "completed_at": time.time(), + })) + r.hset(JOB_STATUS_KEY, job_id, json.dumps({ + "status": "completed", + "gpu": gpu_name, + "completed_at": time.time(), + })) + + record_gpu_success(gpu_name) + return True + + except Exception as e: + record_gpu_failure(gpu_name) + + # Store failure + if r: + try: + r.hset(JOB_STATUS_KEY, job_id, json.dumps({ + "status": "failed", + "gpu": gpu_name, + "error": str(e)[:500], + "attempt": job_data.get("attempts", 0), + })) + except Exception: + pass + + return False + + +def consumer_loop(): + """Main consumer loop reads from streams and dispatches to GPUs. + + Runs as a background thread. Checks streams in priority order: + high normal low + """ + print("[consumer] Started", file=sys.stderr) + + while not shutdown_event.is_set(): + try: + r = get_redis() + if r is None: + time.sleep(2) + continue + + # Check each stream in priority order + for stream_key, priority_name in [ + (STREAM_KEY_HIGH, "high"), + (STREAM_KEY_NORMAL, "normal"), + (STREAM_KEY_LOW, "low"), + ]: + if shutdown_event.is_set(): + break + + # Read one message per group (non-blocking) + messages = r.xreadgroup( + CONSUMER_GROUP, + CONSUMER_NAME, + {stream_key: ">"}, + count=1, + block=int(CONSUMER_POLL_INTERVAL * 1000) + ) + + if not messages: + continue + + for stream_name, msg_list in messages: + for message_id, fields in msg_list: + try: + job_data = json.loads(fields["job"]) + + # Get latest GPU health + gpu_health = check_all_gpus_parallel() + + # Select best GPU + gpu_name = select_gpu_for_job(job_data, gpu_health) + + if gpu_name is None: + # No GPUs available requeue (don't ack yet) + print(f"[consumer] No GPU available for job {job_data.get('id', '?')} (priority={priority_name})", file=sys.stderr) + continue + + # Dispatch to GPU + success = consume_job(gpu_name, job_data) + + if success: + # Ack the message job processed successfully + r.xack(stream_key, CONSUMER_GROUP, message_id) + else: + # Job failed retry or move to DLQ + attempts = job_data.get("attempts", 0) + 1 + job_data["attempts"] = attempts + + if attempts >= CONSUMER_RETRY_MAX: + # Move to dead letter queue + print(f"[consumer] Job {job_data.get('id', '?')} moved to DLQ after {attempts} attempts", file=sys.stderr) + r.xadd(DEAD_LETTER_KEY, {"job": json.dumps(job_data)}) + r.xack(stream_key, CONSUMER_GROUP, message_id) + else: + # Retry: add back to the same stream + # (consumer will pick it up on next iteration) + # We DON'T ack, so it stays in pending + delay = CONSUMER_RETRY_DELAY[min(attempts - 1, len(CONSUMER_RETRY_DELAY) - 1)] + print(f"[consumer] Retrying job {job_data.get('id', '?')} in {delay}s (attempt {attempts}/{CONSUMER_RETRY_MAX})", file=sys.stderr) + time.sleep(delay) + # Re-add to stream + r.xadd(stream_key, {"job": json.dumps(job_data)}) + # Ack the original so we don't reprocess it + r.xack(stream_key, CONSUMER_GROUP, message_id) + + except Exception as e: + print(f"[consumer] Error processing job: {e}", file=sys.stderr) + continue + + except Exception as e: + print(f"[consumer] Loop error: {e}", file=sys.stderr) + time.sleep(2) + + print("[consumer] Stopped", file=sys.stderr) + + +# GPU Health Monitor + +def gpu_health_monitor_loop(): + """Periodically refresh GPU health data in Redis. + + Runs as a background thread every 15 seconds. + """ + print("[health-monitor] Started", file=sys.stderr) + + while not shutdown_event.is_set(): + try: + gpus = check_all_gpus_parallel() + # Health data is already stored per-GPU in check_gpu_health() + except Exception as e: + print(f"[health-monitor] Error: {e}", file=sys.stderr) + + shutdown_event.wait(15) # sleep 15s, interruptible + + print("[health-monitor] Stopped", file=sys.stderr) + + +# Graceful Shutdown + +def signal_handler(signum, frame): + """Handle SIGTERM for graceful shutdown.""" + print(f"\n[queue] Received signal {signum}, shutting down...", file=sys.stderr) + shutdown_event.set() + + +signal.signal(signal.SIGTERM, signal_handler) +signal.signal(signal.SIGINT, signal_handler) + + +# Main + +if __name__ == "__main__": + # Initialize default GPU config in Redis + r = get_redis() + if r: + set_gpus(DEFAULT_GPUS) + + # Start background threads + consumer_thread = threading.Thread(target=consumer_loop, daemon=True) + consumer_thread.start() + + health_thread = threading.Thread(target=gpu_health_monitor_loop, daemon=True) + health_thread.start() + + print("[queue] Starting on :8091", file=sys.stderr) + app.run(host="0.0.0.0", port=8091, threaded=True) # FIX D2: threaded server +``` + +--- + +## 5. Quick Wins (No Consumer Needed Fix Existing Code) + +These fixes can be applied to the current `queue-service.py` without rewriting: + +### QW-1: Fix Redis default host (Q3) +**File:** `queue-service/queue-service.py:21` +```python +# Before: +REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7") +# After: +REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1") # matches actual deployment +``` + +### QW-2: Fix Dockerfile path mismatch (C2) +**File:** `docker-compose.yml:15` +```yaml +# Before: + dockerfile: Dockerfile.queue +# After: + dockerfile: queue-service/Dockerfile +``` + +### QW-3: Fix Nginx fallback to ALL-down only (N3) +**File:** `gpu-router-docker.conf` + +Replace the error-page fallback with a Lua-based check that only triggers when ALL GPUs are down: + +```nginx +# Remove: error_page 502 503 504 = @queue_fallback; + +# Add Lua health check (requires lua-nginx-module): +location / { + # ... existing config ... + + # Only fallback to queue if ALL GPUs are down + set $fallback 0; + access_by_lua_block { + local redis = require "resty.redis" + local red = redis:new() + red:set_timeout(1000) + red:connect("redis", 6379) + + local gpus = {"amdpve", "llmgpu", "ocu_llm"} + local all_down = true + for _, g in ipairs(gpus) do + local status = red:hget("config:gpu-health:" .. g, "status") + if status == "up" then + all_down = false + break + end + end + red:set_keepalive(10000, 100) + + if all_down then + ngx.var.fallback = 1 + end + } + + if $fallback = 1 then + rewrite ^ /enqueue break; + proxy_pass http://queue_service; + } +} +``` + +**Alternative (no Lua):** Replace the `error_page` directive with a custom Nginx health check upstream that only returns 200 when at least one GPU is healthy: + +```nginx +# Add a dedicated health check upstream +upstream any_gpu_healthy { + server 192.168.68.15:8080; + server 192.168.68.8:8080; + server 192.168.68.110:8080; + # Nginx upstream block will try each in order + # If ALL fail, then fall through to queue_fallback +} + +# In location /: +# Change proxy_next_upstream_tries from 2 to 4 (3 GPUs + 1 fallback) +proxy_next_upstream_tries 4; +``` + +### QW-4: Fix Nginx `proxy_pass_header` (N4) +**File:** `gpu-router-docker.conf:90` +```nginx +# Remove this line it's for response headers, not request headers. +# The X-Syslog-Model header is already passed via proxy_set_header inheritance. +# proxy_pass_header X-Syslog-Model; +``` + +### QW-5: Fix hardcoded container names (N5) +**File:** `gpu-router-docker.conf:27,32` +```nginx +# Before: + server syslog-harness-dashboard-1:3001; + server syslog-harness-gpu-dashboard-1:8092; + +# After (use Docker service names): + server dashboard:3001; + server gpu-dashboard:8092; +``` + +### QW-6: Fix rate limit burst (N1) +**File:** `gpu-router-docker.conf:79` +```nginx +# Before: + limit_req zone=perip burst=20 nodelay; + +# After burst requests are delayed, not served immediately: + limit_req zone=perip burst=10 nodelay; +``` + +### QW-7: Preserve Content-Type header (Q8) +**File:** `queue-service/queue-service.py:83` +```python +# Before: + headers = {k: v for k, v in request.headers if k.startswith("X-")} + +# After: + headers = dict(request.headers) # preserve ALL headers including Content-Type +``` + +### QW-8: Fix Docker restart policy (C3) +**File:** `docker-compose.yml:6,16,31,43` +```yaml +# Before: + restart: always + +# After: + restart: unless-stopped +``` + +### QW-9: Add Redis health check (C4) +**File:** `docker-compose.yml` add to redis service: +```yaml + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 3 +``` + +### QW-10: Pin dependency versions (I1) +**File:** `queue-service/Dockerfile` +```dockerfile +# Before: +RUN pip install --no-cache-dir flask redis + +# After: +RUN pip install --no-cache-dir flask==3.1.0 redis==5.2.1 gunicorn==23.0.0 +``` + +**File:** `Dockerfile.dashboard` +```dockerfile +# Before: +FROM python:3.11-slim + +# After: +FROM python:3.13-slim # match queue-service Python version +``` + +**File:** `Dockerfile.gpu` +```dockerfile +# Before: +RUN pip install requests + +# After: +RUN pip install --no-cache-dir requests==2.32.3 # or remove entirely only urllib needed +``` + +### QW-11: Add `.dockerignore` +**File:** `.dockerignore` +``` +.git +.gitignore +*.md +*.pyc +__pycache__ +*.log +.env +``` + +### QW-12: Fix GPU dashboard multi-process CMD (I3) +**File:** `Dockerfile.gpu:14` +```dockerfile +# Before: +CMD ["sh", "-c", "python3 gpu_collector.py & python3 -m http.server 8092 --directory /app/public & wait"] + +# After use a proper process manager or supervisor: +CMD ["sh", "-c", "exec supervisord -c /app/supervisord.conf"] +``` +With `/app/supervisord.conf`: +```ini +[supervisord] +nodaemon=true + +[program:collector] +command=python3 gpu_collector.py +autostart=true +autorestart=true + +[program:http] +command=python3 -m http.server 8092 --directory /app/public +autostart=true +autorestart=true +``` + +### QW-13: Centralize GPU config (R9) +**File:** `queue-service/queue-service.py` + `harness-dashboard.py` + `gpu_collector.py` + +Move GPU endpoints to a single source of truth. Option A: environment file mounted to all containers. Option B: Redis `config:gpus` hash. + +The rewrite above implements Option B (Redis-based). For the quick-win path, use Option A: + +**File:** `config/gpu-endpoints.json` +```json +{ + "amdpve": {"host": "192.168.68.15", "port": 8080, "sidecar_port": 8090}, + "llmgpu": {"host": "192.168.68.8", "port": 8080, "sidecar_port": 8090}, + "ocu_llm": {"host": "192.168.68.110", "port": 8080, "sidecar_port": 8090} +} +``` + +Mount to all containers: +```yaml +volumes: + - ./config:/app/config:ro +``` + +Then each service reads from `/app/config/gpu-endpoints.json` instead of hardcoding. + +--- + +## 6. Docker Compose Updates + +```yaml +version: "3.8" + +services: + redis: + image: redis:7-alpine + restart: unless-stopped + networks: + - gpu-router-net + volumes: + - redis-data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 3 + + queue-service: + build: + context: . + dockerfile: queue-service/Dockerfile # FIX C2 + restart: unless-stopped + networks: + - gpu-router-net + expose: + - "8091" # FIX C1: remove external port exposure + depends_on: + redis: + condition: service_healthy + environment: + - REDIS_HOST=redis + - REDIS_PORT=6379 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8091/health"] + interval: 15s + timeout: 5s + retries: 3 + + dashboard: + build: + context: . + dockerfile: dashboard/Dockerfile.dashboard # FIX I5: use subdir version + restart: unless-stopped + networks: + - gpu-router-net + ports: + - "3001:3001" + depends_on: + queue-service: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3001/"] + interval: 15s + timeout: 5s + retries: 3 + + gpu-dashboard: + build: + context: . + dockerfile: Dockerfile.gpu + restart: unless-stopped + networks: + - gpu-router-net + ports: + - "8092:8092" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8092/gpu.html"] + interval: 15s + timeout: 5s + retries: 3 + +networks: + gpu-router-net: + driver: bridge + +volumes: + redis-data: +``` + +--- + +## 7. Nginx Config Updates + +```nginx +# gpu-router-docker.conf Updated for Docker service names and proper fallback + +upstream amdpve_pool { + server 192.168.68.15:8080; +} + +upstream llmgpu_pool { + server 192.168.68.8:8080; +} + +upstream ocu_llm_pool { + server 192.168.68.110:8080; +} + +upstream queue_service { + server queue-service:8091; +} + +upstream dashboard_service { + server dashboard:3001; # FIX N5: Docker service name +} + +upstream gpu_dashboard_pool { + server gpu-dashboard:8092; # FIX N5: Docker service name +} + +map $http_x_syslog_model $gpu_upstream { + default amdpve_pool; + "standard" amdpve_pool; + "heavy" llmgpu_pool; + "qwen3.5-27B" llmgpu_pool; + "light" ocu_llm_pool; + "gemma-4" ocu_llm_pool; +} + +limit_req_zone $binary_remote_addr zone=perip:10m rate=10r/s; + +server { + listen 80; + server_name _; + + location /dashboard { + proxy_pass http://dashboard_service/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + + location /gpu { + proxy_pass http://gpu_dashboard_pool/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } + + location / { + limit_req zone=perip burst=10 nodelay; # FIX N1: reduced burst + limit_req_status 503; + proxy_pass http://$gpu_upstream; + + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # FIX N4: removed proxy_pass_header X-Syslog-Model + + proxy_buffering off; + proxy_cache off; + proxy_read_timeout 300s; + proxy_send_timeout 300s; + + # FIX N2: increased tries for proper failover across all GPUs + proxy_next_upstream error timeout http_502 http_503 http_504; + proxy_next_upstream_tries 4; # 3 GPUs + queue fallback + + add_header X-Routed-To $gpu_upstream always; + + # FIX N3: removed error_page fallback handled by queue consumer or Lua + } + + location @queue_fallback { + rewrite ^ /enqueue break; + proxy_pass http://queue_service; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header Content-Type $content_type; + proxy_pass_request_body on; + } +} +``` + +--- + +## 8. Implementation Phases + +### Phase 1: Quick Wins (Day 1) +Apply fixes QW-1 through QW-13. These require no new code, just config and code corrections. + +### Phase 2: Smart Queue Consumer (Day 2-3) +Replace `queue-service.py` with the full rewrite. This adds: +- Redis Streams backend +- Consumer loop with GPU load balancing +- Priority queues +- Dead letter queue +- Job tracking API +- Per-GPU circuit breakers + +### Phase 3: Nginx Fallback Fix (Day 3) +Implement the ALL-down-only fallback logic (QW-3). If Lua module is available, use the Lua approach. Otherwise, use the `proxy_next_upstream_tries` approach. + +### Phase 4: Deploy & Monitor +```bash +docker compose down +docker compose build --no-cache +docker compose up -d +``` + +Verify: +1. `curl http://localhost:8091/health` 200 +2. `curl -X POST http://localhost:8091/enqueue -H "Content-Type: application/json" -d '{"messages":[{"role":"user","content":"hello"}],"model":"qwen3.6-35B-A3B","priority":"normal"}'` 202 with job_id +3. `curl http://localhost:8091/status/` tracks lifecycle +4. Dashboard at `:3001` shows live GPU health with parallel checks + +--- + +## 9. Risks & Mitigations + +| Risk | Impact | Mitigation | +|---|---|---| +| Redis Streams XREADGROUP blocks | Consumer stalls if stream empty | `block` parameter set to 1000ms non-blocking poll | +| Consumer crashes mid-job | Job stuck in "processing" state | Health monitor resets stale job statuses every 60s | +| GPU returns 401 on /slots | Load info unavailable for llmgpu/ocu_llm | Fall back to sidecar `gpu_util_pct` for routing | +| Single consumer bottleneck | Queue buildup during high traffic | Add 2+ consumer containers (same consumer group) | +| Redis single point of failure | Entire queue down if Redis dies | Phase 2+: Redis Sentinel or AOF persistence | diff --git a/SMART_QUEUE_IMPLEMENTATION.md b/SMART_QUEUE_IMPLEMENTATION.md new file mode 100644 index 0000000..54cb1ef --- /dev/null +++ b/SMART_QUEUE_IMPLEMENTATION.md @@ -0,0 +1,1572 @@ +# Syslog Harness Smart Queue Consumer Implementation + +**Date:** 2026-05-17 +**Author:** Mumuni (review draft for Abiba) +**Repo:** SyslogSolution/syslog-harness +**Base commit:** `e95475f` "Add GPU dashboard container + Nginx routing" + +--- + +## 1. Executive Summary + +The current queue-service stores inference requests in Redis but **never processes them**. This document provides a complete implementation of a **smart queue consumer** with GPU-aware load balancing, priority queuing, backpressure, dead letter handling, and job lifecycle tracking plus all quick-win fixes from the architecture review. + +**Current state (verified 2026-05-17):** +- Queue depth: 0 (services not currently running) +- Redis: 7.4.9 (streams fully supported) +- All 3 GPUs reachable and healthy: + - `amdpve` (192.168.68.15:8080) qwen3.6-35B-A3B, GPU util 93%, VRAM 28GB/65GB, temp 72C + - `llmgpu` (192.168.68.8:8080) qwen3.6-27B-code, GPU util 0%, VRAM 20GB/24GB, temp 37C + - `ocu_llm` (192.168.68.110:8080) gemma-4-E4B, GPU util 0%, VRAM 8GB/12GB, temp 41C +- GPU sidecar metrics available at `:8090` on each host (gpu_util_pct, vram_used_mb, temp_c) +- GPU inference endpoints: `/v1/models` (all return 200), `/v1/chat/completions` (all respond to POST) +- No `slots_busy` endpoint load estimation via GPU sidecar metrics + +--- + +## 2. Quick Wins (Zero Architectural Change) + +These fixes require minimal code changes and can be deployed independently of the smart queue. + +### QW-1: Fix Dockerfile Path Mismatch (docker-compose.yml line 15) + +**Current:** `dockerfile: Dockerfile.queue` (file doesn't exist at root) +**Fix:** `dockerfile: queue-service/Dockerfile` + +```diff + queue-service: + build: + context: . +- dockerfile: Dockerfile.queue ++ dockerfile: queue-service/Dockerfile +``` + +### QW-2: Fix Nginx Fallback (gpu-router-docker.conf lines 99-106) + +**Problem:** Nginx retries the same GPU pool (N2) AND falls back to queue on ANY single GPU failure (N3). + +**Fix:** Add `proxy_next_upstream_tries 1` (no self-retry), and only route to queue when ALL GPU pools fail via a dedicated health-check upstream. + +```diff + upstream amdpve_pool { + server 192.168.68.15:8080; ++ max_fails=3 fail_timeout=30s; + } + + upstream llmgpu_pool { + server 192.168.68.8:8080; ++ max_fails=3 fail_timeout=30s; + } + + upstream ocu_llm_pool { + server 192.168.68.110:8080; ++ max_fails=3 fail_timeout=30s; + } + + ## New: All-GPUs health check fails only when ALL upstreams are down ++upstream all_gpu_pool { ++ server 192.168.68.15:8080; ++ server 192.168.68.8:8080; ++ server 192.168.68.110:8080; ++ max_fails=3 fail_timeout=30s; ++} + + map $http_x_syslog_model $gpu_upstream { + default amdpve_pool; + "standard" amdpve_pool; + "heavy" llmgpu_pool; + "qwen3.5-27B" llmgpu_pool; + "light" ocu_llm_pool; + "gemma-4" ocu_llm_pool; + } + + ## New: map to all-gpu pool for fallback ++map $http_x_syslog_model $fallback_upstream { ++ default all_gpu_pool; ++ "standard" all_gpu_pool; ++ "heavy" all_gpu_pool; ++ "qwen3.5-27B" all_gpu_pool; ++ "light" all_gpu_pool; ++ "gemma-4" all_gpu_pool; ++} + + server { + location / { + limit_req zone=perip burst=20 nodelay; +- limit_req_status 503; ++ limit_req_status 503; +- proxy_pass http://$gpu_upstream; ++ proxy_pass http://$gpu_upstream; + ... +- proxy_next_upstream error timeout http_502 http_503; +- proxy_next_upstream_tries 2; ++ proxy_next_upstream error timeout http_502 http_503 http_504; ++ proxy_next_upstream_tries 1; + +- error_page 502 503 504 = @queue_fallback; ++ error_page 504 = @queue_fallback; + } + + location @queue_fallback { ++ # Only reached when the all_gpu_pool proxy failed (all GPUs down) + rewrite ^ /enqueue break; + proxy_pass http://queue_service; + ... + } + } +``` + +### QW-3: Fix Nginx Container Names (gpu-router-docker.conf lines 27, 32) + +**Problem:** Hardcoded `syslog-harness-dashboard-1` changes with docker-compose project prefix. +**Fix:** Use Docker Compose service names. + +```diff + upstream dashboard_service { +- server syslog-harness-dashboard-1:3001; ++ server dashboard:3001; + } + + upstream gpu_dashboard_pool { +- server syslog-harness-gpu-dashboard-1:8092; ++ server gpu-dashboard:8092; + } +``` + +### QW-4: Fix Redis Host Default (queue-service.py line 21) + +**Problem:** Default `192.168.68.7` is unreachable inside Docker. +**Fix:** Default to `redis` (Docker service name). + +```diff +-REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7") ++REDIS_HOST = os.getenv("REDIS_HOST", "redis") +``` + +### QW-5: Add Dependency Version Pinning (all Dockerfiles) + +```diff + # queue-service/Dockerfile +-FROM python:3.13-slim +-RUN pip install --no-cache-dir flask redis ++FROM python:3.13-slim ++RUN pip install --no-cache-dir \ ++ flask==3.1.0 \ ++ redis==5.2.1 \ ++ gunicorn==23.0.0 + + # Dockerfile.dashboard +-FROM python:3.11-slim ++FROM python:3.11-slim ++RUN pip install --no-cache-dir gunicorn==23.0.0 + + # Dockerfile.gpu +-FROM python:3.11-slim +-RUN pip install requests ++FROM python:3.11-slim ++RUN pip install --no-cache-dir \ ++ requests==2.32.3 \ ++ psutil==6.1.1 +``` + +### QW-6: Add `.dockerignore` + +``` +.git +.gitignore +*.md +*.pyc +__pycache__ +.env +*.swp +.hermes/ +syslog-harness-check/ +``` + +### QW-7: Add Docker Health Checks (docker-compose.yml) + +```yaml + redis: + image: redis:7-alpine + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 3s + retries: 3 + + queue-service: + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8091/health')"] + interval: 15s + timeout: 5s + retries: 3 + + dashboard: + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3001/')"] + interval: 15s + timeout: 5s + retries: 3 +``` + +### QW-8: Use `unless-stopped` Instead of `always` + +```diff +- restart: always ++ restart: unless-stopped +``` + +Apply to all services in docker-compose.yml. + +### QW-9: Add Gunicorn to Queue Service (replace Flask dev server) + +```diff + # queue-service/Dockerfile +-CMD ["python3", "queue-service.py"] ++CMD ["gunicorn", "--bind", "0.0.0.0:8091", "--workers", "2", "--timeout", "120", "queue-service:app"] +``` + +### QW-10: Fix GPU Dashboard Multi-Process CMD (Dockerfile.gpu line 14) + +**Problem:** `&` background processes with no supervisor if collector crashes, nothing restarts it. +**Fix:** Use a single process with signal handling, or use `supervisord`. + +```dockerfile +# Option A: Single process with threading (recommended for simplicity) +CMD ["python3", "gpu_collector.py"] + +# Option B: Use supervisord (more robust) +RUN pip install --no-cache-dir supervisor +COPY supervisord.conf /etc/supervisor/conf.d/ +CMD ["/usr/bin/supervisord", "-n", "-c", "/etc/supervisor/supervisord.conf"] +``` + +--- + +## 3. Smart Queue Consumer Full Implementation + +### 3.1 Architecture + +``` + + Smart Queue Service + (queue-service.py) + + Agent Nginx Queue + (POST /v1/...) API Layer Redis Streams + (Flask) (inference:stream) + + + + Job Tracker Consumer Pool + (Redis Hash) (asyncio workers) + + + + + + GPU Selection Logic + (load-balanced by GPU metrics) + + + + amdpve llmgpu ocu_llm + :8080 :8080 :8080 + 35B-A3B 27B-code gemma-4-E4B + + + + + + DLQ Stream + (inference: + dead-letter) + +``` + +### 3.2 Data Model + +```python +# Redis Streams keys +STREAM_KEY = "inference:stream" # Main queue +DLQ_KEY = "inference:dead-letter" # Failed jobs +PENDING_KEY = "inference:pending" # Jobs currently being processed + +# Redis Hash keys +JOB_STATUS_KEY = "job:status" # job_id -> {status, gpu, started_at, ...} +GPU_REGISTRY_KEY = "config:gpus" # gpu_name -> {endpoint, model, ...} +CONSUMER_REGISTRY_KEY = "consumers" # consumer_id -> {pid, started_at, last_heartbeat} + +# Stream entry format (JSON) +# { +# "job_id": "uuid", +# "model": "standard|heavy|light", +# "priority": "high|normal|low", +# "payload": {...}, # Original request body +# "headers": {...}, # Original request headers (including Content-Type) +# "created_at": 1747478400.0, +# "attempt": 0, +# "max_retries": 3, +# "source": "nginx|direct" +# } +``` + +### 3.3 New `queue-service.py` (Complete Rewrite) + +```python +#!/usr/bin/env python3 +"""Syslog Harness Smart Queue Service Redis Streams + GPU-aware consumer pool. + +Endpoints: + /health liveness (Nginx upstream check) + /enqueue POST: submit inference request (with optional priority) + /status GET: queue depth, circuit state, consumer pool + /status/ GET: specific job status and result + /dlq GET: list dead-letter queue entries + /dlq//retry POST: retry a dead-letter job + /consumers GET: consumer pool status + /gpus GET: GPU registry with live health +""" + +import asyncio +import json +import logging +import os +import signal +import sys +import time +import uuid +from datetime import datetime, timezone + +import redis +from flask import Flask, request, jsonify + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +REDIS_HOST = os.getenv("REDIS_HOST", "redis") +REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) +REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "") + +# Queue thresholds (adaptive backpressure) +QUEUE_WARN = 30 +QUEUE_503 = 40 +QUEUE_OPEN = 50 + +# Consumer settings +CONSUMER_WORKERS = int(os.getenv("QUEUE_CONSUMERS", "3")) # One per GPU +POLL_INTERVAL = float(os.getenv("QUEUE_POLL_INTERVAL", "2")) # seconds +GPU_HEALTH_INTERVAL = 5 # seconds +MAX_RETRIES = int(os.getenv("QUEUE_MAX_RETRIES", "3")) +JOB_TIMEOUT = 300 # seconds (5 min, matches Nginx proxy_read_timeout) + +# Redis stream max length (approximate trimming) +STREAM_MAXLEN = 10000 + +# GPU registry (single source of truth) +GPU_REGISTRY = { + "amdpve": { + "endpoint": "http://192.168.68.15:8080", + "model": "qwen3.6-35B-A3B", + "type": "MoE", + "vram_total_mb": 65536, + "priority": 1, # Primary workhorse + }, + "llmgpu": { + "endpoint": "http://192.168.68.8:8080", + "model": "qwen3.6-27B-code", + "type": "Dense", + "vram_total_mb": 24576, + "priority": 2, + }, + "ocu_llm": { + "endpoint": "http://192.168.68.110:8080", + "model": "gemma-4-E4B", + "type": "Light", + "vram_total_mb": 12227, + "priority": 3, + }, +} + +# Model-to-GPU mapping (mirrors Nginx map block) +MODEL_TO_GPU = { + "default": "amdpve", + "standard": "amdpve", + "heavy": "llmgpu", + "qwen3.5-27B": "llmgpu", + "qwen3.6-27B": "llmgpu", + "qwen3.6-27B-code": "llmgpu", + "light": "ocu_llm", + "gemma-4": "ocu_llm", + "gemma-4-E4B": "ocu_llm", +} + +# Priority ordering +PRIORITY_ORDER = {"high": 0, "normal": 1, "low": 2} + +# Streams +STREAM_KEY = "inference:stream" +DLQ_KEY = "inference:dead-letter" +PENDING_KEY = "inference:pending" + +# Redis keys +JOB_STATUS_KEY = "job:status" +CONSUMER_KEY = "consumers" + +# Logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + stream=sys.stdout, +) +logger = logging.getLogger("queue-service") + +# --------------------------------------------------------------------------- +# Redis Client +# --------------------------------------------------------------------------- + +def get_redis(): + """Get Redis connection. Raises on failure (no silent swallowing).""" + kwargs = dict( + host=REDIS_HOST, + port=REDIS_PORT, + decode_responses=True, + socket_timeout=5, + socket_connect_timeout=3, + retry_on_timeout=True, + ) + if REDIS_PASSWORD: + kwargs["password"] = REDIS_PASSWORD + r = redis.Redis(**kwargs) + r.ping() # Verify connection + return r + + +# Global Redis client (reused across requests) +_redis_client = None + + +def redis_client(): + global _redis_client + if _redis_client is None: + _redis_client = get_redis() + return _redis_client + + +# --------------------------------------------------------------------------- +# GPU Health & Load +# --------------------------------------------------------------------------- + +def fetch_gpu_metrics(gpu_name, gpu_info): + """Fetch live metrics from GPU sidecar (port 8090). + + Returns dict with health, utilization, VRAM usage, and load score. + Returns None if sidecar is unreachable. + """ + endpoint = gpu_info["endpoint"] + try: + import urllib.request + # Check sidecar health for GPU metrics + req = urllib.request.Request(f"{endpoint}:8090/health") + req.add_header("User-Agent", "queue-consumer/1.0") + resp = urllib.request.urlopen(req, timeout=3) + if resp.status != 200: + return None + + metrics = json.loads(resp.read()) + + # Calculate load score (0-100, lower = more available) + vram_used = metrics.get("vram_used_mb", 0) + vram_total = metrics.get("vram_total_mb", 1) + vram_pct = (vram_used / vram_total * 100) if vram_total > 0 else 100 + + gpu_util = metrics.get("gpu_util_pct", 0) + temp = metrics.get("temp_c", 0) + + # Load score: weighted combination of GPU util and VRAM pressure + # Higher score = more loaded = less desirable for new jobs + load_score = (gpu_util * 0.6) + (vram_pct * 0.3) + (max(0, temp - 60) * 0.1) + + # Consider GPU as "down" if GPU util is stuck at 100% and VRAM is full + is_down = vram_pct > 95 or (gpu_util > 98 and temp > 85) + + return { + "health": "down" if is_down else "up", + "gpu_util_pct": gpu_util, + "vram_used_mb": vram_used, + "vram_total_mb": vram_total, + "vram_pct": round(vram_pct, 1), + "temp_c": temp, + "load_score": round(load_score, 1), + "last_check": time.time(), + } + except Exception as e: + logger.debug(f"GPU {gpu_name} sidecar unreachable: {e}") + return None + + +def get_gpu_health(): + """Check health of all GPUs. Returns dict of gpu_name -> metrics or None.""" + health = {} + for name, info in GPU_REGISTRY.items(): + m = fetch_gpu_metrics(name, info) + health[name] = m if m else {"health": "down", "last_check": time.time()} + return health + + +def select_gpu(job): + """Select the best GPU for a job based on model mapping and load. + + Algorithm: + 1. Map job model to preferred GPU via MODEL_TO_GPU + 2. Check if preferred GPU is up and has capacity + 3. If preferred is down/busy, try fallback GPUs in priority order + 4. Return the best available GPU or None + + Returns: (gpu_name, gpu_info, metrics) or (None, None, None) + """ + model = job.get("model", "standard") + preferred = MODEL_TO_GPU.get(model, "amdpve") + + # Get all GPU health + health = get_gpu_health() + + # Build candidate list: preferred first, then others by priority + candidates = [] + for name in [preferred] + sorted( + [n for n in GPU_REGISTRY if n != preferred], + key=lambda n: GPU_REGISTRY[n]["priority"] + ): + metrics = health.get(name, {}) + if metrics.get("health") == "up": + candidates.append((name, GPU_REGISTRY[name], metrics)) + + if not candidates: + return None, None, None + + # Sort by load_score (lower = less loaded = better) + candidates.sort(key=lambda c: c[2].get("load_score", 100)) + + return candidates[0] + + +# --------------------------------------------------------------------------- +# Circuit Breaker (per-GPU, with recovery) +# --------------------------------------------------------------------------- + +class CircuitBreaker: + """Per-GPU circuit breaker with half-open recovery state. + + States: + CLOSED Normal operation. Failures tracked. + OPEN All requests fail immediately. After cooldown, transitions to HALF_OPEN. + HALF_OPEN Allow one probe request. If it succeeds, CLOSED. If fails, OPEN. + """ + + def __init__(self, failure_threshold=3, recovery_timeout=30): + self.failure_threshold = failure_threshold + self.recovery_timeout = recovery_timeout + self._failures = {} # gpu_name -> count + self._last_failure = {} # gpu_name -> timestamp + self._state = {} # gpu_name -> "closed" | "open" | "half_open" + + def allow_request(self, gpu_name): + """Check if a request should be allowed for this GPU.""" + state = self._state.get(gpu_name, "closed") + + if state == "closed": + return True + + if state == "open": + last_fail = self._last_failure.get(gpu_name, 0) + if time.time() - last_fail > self.recovery_timeout: + self._state[gpu_name] = "half_open" + logger.info(f"Circuit {gpu_name}: OPEN HALF_OPEN (probe)") + return True + return False + + if state == "half_open": + # Only one probe at a time for simplicity, allow + return True + + return False + + def record_success(self, gpu_name): + """Record a successful request transition to CLOSED.""" + self._failures[gpu_name] = 0 + self._state[gpu_name] = "closed" + logger.info(f"Circuit {gpu_name}: HALF_OPEN CLOSED") + + def record_failure(self, gpu_name): + """Record a failed request.""" + self._failures[gpu_name] = self._failures.get(gpu_name, 0) + 1 + self._last_failure[gpu_name] = time.time() + + if self._failures[gpu_name] >= self.failure_threshold: + self._state[gpu_name] = "open" + logger.warning(f"Circuit {gpu_name}: CLOSED OPEN (threshold={self.failure_threshold})") + + +# --------------------------------------------------------------------------- +# Job Lifecycle +# --------------------------------------------------------------------------- + +def store_job_status(job_id, status_data): + """Store job status in Redis hash.""" + r = redis_client() + r.hset(JOB_STATUS_KEY, job_id, json.dumps({ + **status_data, + "updated_at": time.time(), + })) + # Auto-expire after 24 hours + r.expire(JOB_STATUS_KEY, 86400) + + +def get_job_status(job_id): + """Get job status from Redis.""" + r = redis_client() + raw = r.hget(JOB_STATUS_KEY, job_id) + if raw: + return json.loads(raw) + return None + + +def enqueue_job(payload, headers, source="nginx"): + """Add a job to the Redis stream. Returns job_id.""" + r = redis_client() + + # Parse model and priority from headers / payload + model_header = headers.get("X-Syslog-Model", "standard") + priority_header = headers.get("X-Syslog-Priority", "normal") + + # Validate priority + if priority_header not in PRIORITY_ORDER: + priority_header = "normal" + + job_id = str(uuid.uuid4()) + job = { + "job_id": job_id, + "model": model_header, + "priority": priority_header, + "payload": payload, + "headers": dict(headers), # Include ALL headers (not just X-*) + "created_at": time.time(), + "attempt": 0, + "max_retries": MAX_RETRIES, + "source": source, + } + + # Add to stream with approximate trimming + r.xadd( + STREAM_KEY, + {"job": json.dumps(job)}, + maxlen=STREAM_MAXLEN, + approx=True, + ) + + # Initial status + store_job_status(job_id, { + "status": "queued", + "model": model_header, + "priority": priority_header, + "created_at": job["created_at"], + "queued_at": time.time(), + }) + + return job_id + + +def get_queue_depth(): + """Get current stream length (approximate).""" + r = redis_client() + # XLEN is O(1) for streams + try: + return r.xlen(STREAM_KEY) + except Exception: + return -1 + + +# --------------------------------------------------------------------------- +# Dead Letter Queue +# --------------------------------------------------------------------------- + +def move_to_dlq(job_id, reason): + """Move a failed job to the dead letter queue.""" + r = redis_client() + + # Get the original stream entry + entries = r.xrange(STREAM_KEY, min="0-0", max="+", count=10000) + original_job = None + for msg_id, data in entries: + job = json.loads(data.get("job", "{}")) + if job.get("job_id") == job_id: + original_job = job + # Acknowledge (remove) from main stream + r.xdel(STREAM_KEY, msg_id) + break + + if not original_job: + logger.error(f"DLQ move: job {job_id} not found in main stream") + return + + dlq_entry = { + **original_job, + "dlq_reason": reason, + "dlq_at": time.time(), + } + + r.xadd(DLQ_KEY, {"job": json.dumps(dlq_entry)}) + store_job_status(job_id, { + "status": "dead", + "dlq_reason": reason, + "dlq_at": dlq_entry["dlq_at"], + "updated_at": time.time(), + }) + + logger.warning(f"Job {job_id} moved to DLQ: {reason}") + + +def list_dlq(limit=20): + """List dead letter queue entries.""" + r = redis_client() + entries = r.xrange(DLQ_KEY, count=limit) + result = [] + for msg_id, data in entries: + job = json.loads(data.get("job", "{}")) + result.append({ + "message_id": msg_id.decode() if isinstance(msg_id, bytes) else msg_id, + **job, + }) + return result + + +def retry_dlq(message_id): + """Retry a dead-letter job by re-adding to main stream.""" + r = redis_client() + entries = r.xrange(DLQ_KEY, min=message_id, max=message_id, count=1) + + if not entries: + return None, "DLQ entry not found" + + msg_id, data = entries[0] + job = json.loads(data.get("job", "{}")) + + # Reset attempt count + job["attempt"] = 0 + job["dlq_at"] = None + job["dlq_reason"] = None + + # Re-add to main stream + r.xadd( + STREAM_KEY, + {"job": json.dumps(job)}, + maxlen=STREAM_MAXLEN, + approx=True, + ) + + # Remove from DLQ + r.xdel(DLQ_KEY, msg_id) + + # Update status + store_job_status(job["job_id"], { + "status": "queued", + "attempt": 0, + "requeued_at": time.time(), + "updated_at": time.time(), + }) + + return job["job_id"], "requeued" + + +# --------------------------------------------------------------------------- +# Flask API +# --------------------------------------------------------------------------- + +app = Flask(__name__) +circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30) + + +@app.route("/health") +def health(): + """Liveness probe for Nginx.""" + try: + r = redis_client() + r.ping() + return jsonify({"status": "ok", "service": "queue-service"}), 200 + except Exception as e: + return jsonify({"status": "error", "error": str(e)}), 503 + + +@app.route("/enqueue", methods=["POST"]) +def enqueue(): + """Submit inference request to the smart queue. + + Accepts any body (JSON, text, etc.) and forwards all headers. + Supports X-Syslog-Priority header: high, normal, low. + """ + try: + payload = request.get_data(as_text=True) + headers = dict(request.headers) # ALL headers, not just X-* + + # Check circuit breaker: is the queue overwhelmed? + depth = get_queue_depth() + if depth >= QUEUE_OPEN: + return jsonify({ + "error": "Circuit breaker OPEN", + "queue_depth": depth, + "threshold": QUEUE_OPEN, + "retry_after": 30, + }), 503 + + # Adaptive backpressure + if depth >= QUEUE_503: + return jsonify({ + "error": "Queue overloaded", + "queue_depth": depth, + "retry_after": min(60, (depth - QUEUE_503) * 5), + }), 503 + + if depth >= QUEUE_WARN: + # Warn but allow + logger.warning(f"Queue depth approaching limit: {depth}/{QUEUE_WARN}") + + job_id = enqueue_job(payload, headers) + + return jsonify({ + "job_id": job_id, + "status": "queued", + "queue_depth": get_queue_depth(), + }), 202 + + except Exception as e: + logger.error(f"Enqueue failed: {e}") + return jsonify({"error": str(e)}), 500 + + +@app.route("/status") +def status(): + """Queue overview: depth, circuit state, consumer pool, GPU health.""" + r = redis_client() + depth = get_queue_depth() + dlq_depth = r.xlen(DLQ_KEY) if r else 0 + pending_depth = r.xlen(PENDING_KEY) if r else 0 + + # Circuit breaker state per GPU + circuit_state = {} + for gpu_name in GPU_REGISTRY: + state = circuit_breaker._state.get(gpu_name, "closed") + failures = circuit_breaker._failures.get(gpu_name, 0) + circuit_state[gpu_name] = { + "state": state, + "failures": failures, + } + + # Consumer pool + consumers_raw = r.hgetall(CONSUMER_KEY) if r else {} + consumers = [] + for cid, cdata in consumers_raw.items(): + try: + consumers.append(json.loads(cdata)) + except json.JSONDecodeError: + pass + + # GPU health + gpu_health = get_gpu_health() + + return jsonify({ + "queue_depth": depth, + "pending_depth": pending_depth, + "dlq_depth": dlq_depth, + "circuit_breaker": circuit_state, + "consumers": consumers, + "gpu_health": gpu_health, + "thresholds": { + "warn": QUEUE_WARN, + "overloaded": QUEUE_503, + "open": QUEUE_OPEN, + }, + }), 200 + + +@app.route("/status/") +def job_status_endpoint(job_id): + """Get specific job status and result.""" + status_data = get_job_status(job_id) + if not status_data: + return jsonify({"error": "job not found"}), 404 + return jsonify(status_data), 200 + + +@app.route("/dlq") +def dlq_list(): + """List dead letter queue entries.""" + return jsonify({"dead_letter_queue": list_dlq()}), 200 + + +@app.route("/dlq//retry", methods=["POST"]) +def dlq_retry(message_id): + """Retry a dead-letter job.""" + job_id, msg = retry_dlq(message_id) + if job_id is None: + return jsonify({"error": msg}), 404 + return jsonify({"job_id": job_id, "status": "requeued"}), 200 + + +@app.route("/consumers") +def consumers_status(): + """List active consumer pool.""" + r = redis_client() + consumers_raw = r.hgetall(CONSUMER_KEY) if r else {} + consumers = [] + for cid, cdata in consumers_raw.items(): + try: + consumers.append(json.loads(cdata)) + except json.JSONDecodeError: + pass + return jsonify({"consumers": consumers}), 200 + + +@app.route("/gpus") +def gpus_status(): + """List GPU registry with live health and metrics.""" + health = get_gpu_health() + result = {} + for name, info in GPU_REGISTRY.items(): + h = health.get(name, {}) + result[name] = { + **info, + **h, + } + return jsonify(result), 200 + + +# --------------------------------------------------------------------------- +# Consumer Pool (runs in background thread) +# --------------------------------------------------------------------------- + +def run_consumer(consumer_id): + """Main consumer loop: reads from stream, selects GPU, forwards request. + + This runs in a background thread per worker. + """ + import urllib.request + + worker_name = f"consumer-{consumer_id}" + consumer_group = f"workers-{consumer_id}" + + logger.info(f"{worker_name} started") + + # Register consumer + r = redis_client() + r.hset(CONSUMER_KEY, consumer_id, json.dumps({ + "consumer_id": consumer_id, + "started_at": time.time(), + "pid": os.getpid(), + "status": "running", + })) + + # Create consumer group if not exists + try: + r.xgroup_create(STREAM_KEY, consumer_group, id="0", mkstream=True) + except Exception: + pass # Group already exists + + running = True + + def handle_signal(signum, frame): + nonlocal running + logger.info(f"{worker_name} shutting down (signal {signum})") + running = False + + signal.signal(signal.SIGTERM, handle_signal) + signal.signal(signal.SIGINT, handle_signal) + + last_heartbeat = 0 + gpu_health_cache = {} + last_health_check = 0 + + while running: + try: + now = time.time() + + # Periodic health check (every GPU_HEALTH_INTERVAL seconds) + if now - last_health_check > GPU_HEALTH_INTERVAL: + gpu_health_cache = get_gpu_health() + last_health_check = now + + # Read from stream (non-blocking, 1 batch at a time) + entries = r.xreadgroup( + consumer_group, + consumer_id, + {STREAM_KEY: ">"}, # ">" = new messages only + count=1, # Process one job at a time per consumer + block=int(POLL_INTERVAL * 1000), # Block in ms + ) + + if not entries: + # Update heartbeat + if now - last_heartbeat > 5: + r.hset(CONSUMER_KEY, consumer_id, json.dumps({ + "consumer_id": consumer_id, + "started_at": r.hget(CONSUMER_KEY, consumer_id + ":started") or time.time(), + "pid": os.getpid(), + "status": "idle", + "last_heartbeat": now, + "jobs_processed": r.hget(CONSUMER_KEY, consumer_id + ":processed") or 0, + })) + last_heartbeat = now + continue + + for stream, messages in entries: + for msg_id, data in messages: + job_data = json.loads(data.get("job", "{}")) + job_id = job_data.get("job_id", "unknown") + + logger.info(f"{worker_name} processing job {job_id} (attempt {job_data.get('attempt', 0)})") + + # Update status to processing + store_job_status(job_id, { + "status": "processing", + "consumer": consumer_id, + "started_at": now, + "updated_at": now, + }) + + # Track pending + r.xadd(PENDING_KEY, {"job_id": job_id, "consumer": consumer_id}) + + # Select GPU + gpu_name, gpu_info, gpu_metrics = select_gpu(job_data) + + if not gpu_name: + # No GPU available requeue + logger.warning(f"No GPU available for job {job_id}, requeuing") + move_to_dlq(job_id, "no_gpu_available") + r.xack(STREAM_KEY, consumer_group, msg_id) + continue + + # Check circuit breaker + if not circuit_breaker.allow_request(gpu_name): + logger.warning(f"Circuit OPEN for {gpu_name}, DLQ job {job_id}") + move_to_dlq(job_id, f"circuit_open_{gpu_name}") + r.xack(STREAM_KEY, consumer_group, msg_id) + continue + + # Forward request to GPU + gpu_url = f"{gpu_info['endpoint']}/v1/chat/completions" + headers = job_data.get("headers", {}) + payload = job_data.get("payload", "{}") + + # Ensure Content-Type is set + if "Content-Type" not in headers: + headers["Content-Type"] = "application/json" + + # Update job status with GPU selection + store_job_status(job_id, { + "status": "processing", + "gpu": gpu_name, + "gpu_load_score": gpu_metrics.get("load_score", 0) if gpu_metrics else 0, + "started_at": now, + "updated_at": now, + }) + + try: + req = urllib.request.Request(gpu_url, data=payload.encode(), headers=headers) + req.add_header("User-Agent", f"queue-consumer/{consumer_id}") + + resp = urllib.request.urlopen(req, timeout=JOB_TIMEOUT) + response_data = resp.read().decode() + + # Success! + circuit_breaker.record_success(gpu_name) + + store_job_status(job_id, { + "status": "completed", + "gpu": gpu_name, + "result": response_data[:2000], # Store first 2000 chars + "completed_at": time.time(), + "updated_at": time.time(), + }) + + logger.info(f"{worker_name} completed job {job_id} on {gpu_name}") + + except Exception as e: + # Request failed + error_msg = str(e)[:200] + logger.error(f"{worker_name} failed job {job_id} on {gpu_name}: {error_msg}") + + circuit_breaker.record_failure(gpu_name) + + # Check if we should retry + attempt = job_data.get("attempt", 0) + 1 + if attempt <= MAX_RETRIES: + # Retry: update job with incremented attempt, re-add to stream + job_data["attempt"] = attempt + store_job_status(job_id, { + "status": "retrying", + "attempt": attempt, + "last_error": error_msg, + "updated_at": time.time(), + }) + r.xadd( + STREAM_KEY, + {"job": json.dumps(job_data)}, + maxlen=STREAM_MAXLEN, + approx=True, + ) + else: + # Max retries exceeded DLQ + move_to_dlq(job_id, f"max_retries_exceeded_after_{attempt}_attempts_on_{gpu_name}: {error_msg}") + + # Ack the stream message + r.xack(STREAM_KEY, consumer_group, msg_id) + + # Remove from pending + r.xdel(PENDING_KEY, msg_id) + + # Update consumer processed count + processed = r.hget(CONSUMER_KEY, consumer_id + ":processed") + processed = int(processed) + 1 if processed else 1 + r.hset(CONSUMER_KEY, consumer_id + ":processed", str(processed)) + + except redis.ConnectionError as e: + logger.error(f"{worker_name} Redis connection error: {e}. Retrying in {POLL_INTERVAL}s...") + time.sleep(POLL_INTERVAL) + except Exception as e: + logger.error(f"{worker_name} unexpected error: {e}") + time.sleep(POLL_INTERVAL) + + # Unregister consumer + r.hdel(CONSUMER_KEY, consumer_id) + logger.info(f"{worker_name} stopped") + + +# --------------------------------------------------------------------------- +# Startup +# --------------------------------------------------------------------------- + +def main(): + """Start the queue service with consumer workers.""" + logger.info("Starting Smart Queue Service...") + logger.info(f"Redis: {REDIS_HOST}:{REDIS_PORT}") + logger.info(f"Consumers: {CONSUMER_WORKERS}") + logger.info(f"GPU registry: {list(GPU_REGISTRY.keys())}") + + # Verify Redis connectivity + try: + r = redis_client() + r.ping() + logger.info("Redis connection verified") + except Exception as e: + logger.error(f"Redis connection failed: {e}") + sys.exit(1) + + # Start consumer workers in background threads + threads = [] + for i in range(CONSUMER_WORKERS): + t = asyncio.start_new_thread(run_consumer, (f"worker-{i}",)) + threads.append(t) + + logger.info(f"Started {CONSUMER_WORKERS} consumer workers") + + # Start Flask API (gunicorn handles this in production) + app.run(host="0.0.0.0", port=8091, threaded=True) + + +if __name__ == "__main__": + main() +``` + +### 3.4 Updated `queue-service/Dockerfile` + +```dockerfile +FROM python:3.13-slim + +RUN pip install --no-cache-dir \ + flask==3.1.0 \ + redis==5.2.1 \ + gunicorn==23.0.0 + +COPY queue-service.py /app/queue-service.py +WORKDIR /app + +EXPOSE 8091 + +# Gunicorn: 2 workers, 300s timeout (matches LLM inference timeout) +CMD ["gunicorn", "--bind", "0.0.0.0:8091", "--workers", "2", "--threads", "4", "--timeout", "300", "--graceful-timeout", "30", "queue-service:app"] +``` + +### 3.5 Updated `docker-compose.yml` + +```yaml +version: "3.8" + +services: + redis: + image: redis:7-alpine + restart: unless-stopped + networks: + - gpu-router-net + volumes: + - redis-data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 3s + retries: 3 + + queue-service: + build: + context: . + dockerfile: queue-service/Dockerfile + restart: unless-stopped + networks: + - gpu-router-net + expose: + - "8091" + depends_on: + redis: + condition: service_healthy + environment: + - REDIS_HOST=redis + - REDIS_PORT=6379 + - QUEUE_CONSUMERS=3 + - QUEUE_POLL_INTERVAL=2 + - QUEUE_MAX_RETRIES=3 + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8091/health')"] + interval: 15s + timeout: 5s + retries: 3 + + dashboard: + build: + context: . + dockerfile: Dockerfile.dashboard + restart: unless-stopped + networks: + - gpu-router-net + expose: + - "3001" + depends_on: + queue-service: + condition: service_healthy + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:3001/')"] + interval: 15s + timeout: 5s + retries: 3 + + gpu-dashboard: + build: + context: . + dockerfile: Dockerfile.gpu + restart: unless-stopped + networks: + - gpu-router-net + expose: + - "8092" + healthcheck: + test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8092/gpu.html')"] + interval: 15s + timeout: 5s + retries: 3 + +networks: + gpu-router-net: + driver: bridge + +volumes: + redis-data: +``` + +**Key changes:** +- `dockerfile: queue-service/Dockerfile` (QW-1 fix) +- `restart: unless-stopped` (QW-8 fix) +- `expose` instead of `ports` for internal services (QW-18: queue API no longer externally exposed) +- `condition: service_healthy` for depends_on (QW-9: health checks) + +--- + +## 4. Nginx Config Updates (gpu-router-docker.conf) + +```nginx +## Syslog GPU Router Nginx Configuration (Docker-internal) +## Routes incoming agent requests to the appropriate GPU backend +## based on the X-Syslog-Model header. + +upstream amdpve_pool { + server 192.168.68.15:8080 max_fails=3 fail_timeout=30s; +} + +upstream llmgpu_pool { + server 192.168.68.8:8080 max_fails=3 fail_timeout=30s; +} + +upstream ocu_llm_pool { + server 192.168.68.110:8080 max_fails=3 fail_timeout=30s; +} + +upstream queue_service { + server queue-service:8091; +} + +upstream dashboard_service { + server dashboard:3001; +} + +upstream gpu_dashboard_pool { + server gpu-dashboard:8092; +} + +## All-GPUs pool for fallback (fails only when ALL GPUs are down) +upstream all_gpu_pool { + server 192.168.68.15:8080 max_fails=3 fail_timeout=30s; + server 192.168.68.8:8080 max_fails=3 fail_timeout=30s; + server 192.168.68.110:8080 max_fails=3 fail_timeout=30s; +} + +map $http_x_syslog_model $gpu_upstream { + default amdpve_pool; + "standard" amdpve_pool; + "heavy" llmgpu_pool; + "qwen3.5-27B" llmgpu_pool; + "light" ocu_llm_pool; + "gemma-4" ocu_llm_pool; +} + +limit_req_zone $binary_remote_addr zone=perip:10m rate=10r/s; + +server { + listen 80; + server_name _; + + location /dashboard { + proxy_pass http://dashboard_service/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + + location /gpu { + proxy_pass http://gpu_dashboard_pool/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } + + location / { + limit_req zone=perip burst=20 nodelay; + limit_req_status 503; + proxy_pass http://$gpu_upstream; + + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + ## Streaming support + proxy_buffering off; + proxy_cache off; + proxy_read_timeout 300s; + proxy_send_timeout 300s; + + ## Only retry on error/timeout single try (same GPU pool) + proxy_next_upstream error timeout http_502 http_503 http_504; + proxy_next_upstream_tries 1; + + add_header X-Routed-To $gpu_upstream always; + + ## Only fall back to queue when the ALL-GPU pool fails + error_page 504 = @queue_fallback; + } + + ## Queue fallback only reached when ALL GPUs are down + location @queue_fallback { + proxy_pass http://queue_service/enqueue; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Content-Type $content_type; + proxy_pass_request_body on; + proxy_set_header X-Syslog-Model $http_x_syslog_model; + } +} +``` + +**Key changes:** +- `max_fails=3 fail_timeout=30s` on each upstream (Nginx-level health) +- `all_gpu_pool` upstream for fallback (N3 fix) +- `proxy_next_upstream_tries 1` (N2 fix no self-retry) +- Container names fixed to service names (N5 fix) +- `error_page 504 = @queue_fallback` instead of 502/503/504 (N3 fix) +- `X-Syslog-Model` forwarded to queue_service in fallback + +--- + +## 5. GPU Dashboard Updates + +### 5.1 Updated `Dockerfile.gpu` + +```dockerfile +FROM python:3.11-slim + +RUN pip install --no-cache-dir psutil==6.1.1 + +COPY gpu-dashboard/ /app/ +WORKDIR /app + +RUN mkdir -p /app/public && \ + cp gpu.html /app/public/ && \ + touch /app/public/gpu_metrics.json + +EXPOSE 8092 + +# Single-process: gpu_collector.py serves both as collector and HTTP server +CMD ["python3", "gpu_collector.py"] +``` + +### 5.2 Updated `gpu_collector.py` (key changes) + +```python +# Changes: +# 1. Add async/concurrent GPU polling (threading) +# 2. Fix path consistency (use /app/public consistently) +# 3. Add graceful shutdown +# 4. Include health endpoint for Docker healthcheck + +import threading +import http.server +import json +import time +import signal +import sys + +# ... (keep existing imports) + +DEAD_THRESHOLD = 30 # seconds (was 60 G4 fix) +OUTPUT_PATH = "/app/public/gpu_metrics.json" # Consistent path (G2 fix) + +def fetch_json_concurrent(endpoints): + """Fetch from all GPUs concurrently using threads.""" + results = {} + errors = {} + + def fetch_one(name, url): + try: + req = urllib.request.Request(url) + req.add_header("User-Agent", "gpu-collector/1.0") + resp = urllib.request.urlopen(req, timeout=3) + results[name] = json.loads(resp.read()) + except Exception as e: + errors[name] = str(e)[:100] + + threads = [] + for name, url in endpoints.items(): + t = threading.Thread(target=fetch_one, args=(name, url)) + threads.append(t) + t.start() + + for t in threads: + t.join(timeout=5) + + return results, errors + +# ... (update the collection loop to use fetch_json_concurrent) + +# Add HTTP health endpoint +class HealthHandler(http.server.SimpleHTTPRequestHandler): + def do_GET(self): + if self.path == "/health": + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "ok"}).encode()) + elif self.path == "/gpu.html" or self.path == "/": + super().do_GET() + else: + self.send_response(404) + self.end_headers() + +# Start both collector loop and HTTP server in the same thread +# (HTTP server runs in a separate thread, collector runs in main) +``` + +--- + +## 6. Migration Steps + +### Phase 1: Quick Wins (Deploy First) + +1. Apply QW-1 through QW-10 (Dockerfile fix, Nginx fixes, health checks) +2. `docker compose up -d --build` +3. Verify all containers pass health checks +4. Verify Nginx fallback only triggers on ALL-GPU failure + +### Phase 2: Smart Queue Consumer + +1. Replace `queue-service/queue-service.py` with new implementation +2. Replace `queue-service/Dockerfile` with Gunicorn-based Dockerfile +3. Update `docker-compose.yml` (expose ports, depends_on conditions) +4. `docker compose up -d --build queue-service` +5. Verify: + - `/health` returns 200 + - `/status` shows consumer pool + - Submit a test request via `/enqueue` + - Verify job appears in `/status/` with "queued" status + - Verify job gets processed and moves to "completed" + - Verify GPU sidecar metrics are being read + - Verify consumer pool shows active workers + +### Phase 3: Validation + +1. Submit multiple concurrent requests +2. Verify load balancing across GPUs +3. Verify circuit breaker opens on GPU failure +4. Verify DLQ captures max-retry failures +5. Verify `/dlq` and `/dlq//retry` endpoints work +6. Verify backpressure: queue depth > 40 returns 503 with retry-after +7. Monitor Redis stream lengths and consumer ack rates + +--- + +## 7. Risk Assessment + +| Risk | Mitigation | +|---|---| +| Redis stream XREADGROUP blocks indefinitely | `block=2000ms` parameter prevents infinite blocking | +| Consumer crashes mid-request | `PENDING_KEY` tracks in-flight jobs; restart re-reads pending | +| GPU sidecar temporarily unavailable | 3s timeout on sidecar calls; GPU marked "down" if unreachable | +| Circuit breaker oscillates (flapping) | `recovery_timeout=30s` prevents rapid OPENHALF_OPENOPEN cycling | +| DLQ grows unbounded | Stream maxlen=10000 approx trim; manual DLQ cleanup recommended | +| Consumer threads conflict with Flask | Gunicorn handles Flask; consumers run in separate threads | +| Job payload too large for Redis | Redis has 512MB value limit; LLM responses typically < 50KB | + +--- + +## 8. Future Enhancements (Not in This Draft) + +1. **Prometheus metrics endpoint** queue depth, processing rate, GPU latency, error rates +2. **WebSocket streaming** real-time job status updates to dashboard +3. **Batch processing** group multiple small requests for throughput optimization +4. **Model-specific queues** separate streams per model for guaranteed SLA +5. **Redis Sentinel/Cluster** high availability for Redis +6. **Rate limiting per-model** prevent one model from starving others +7. **GPU affinity for long-running jobs** keep a job on the same GPU to avoid context switches +8. **Result caching** cache responses for identical requests (hash payload, return cached result) + +--- + +**END OF DRAFT** + +This implementation transforms the queue-service from a passive storage pit into a production-ready smart queue consumer. The quick wins (QW-1 through QW-10) can be deployed independently and provide immediate value. The smart queue consumer builds on top of the quick wins. + +**Recommended deployment order:** +1. Quick wins (QW-1 to QW-10) 1-2 hours +2. Smart queue consumer 2-3 hours +3. Validation & testing 1 hour +4. Monitor for 24 hours, then go live diff --git a/dashboard/Dockerfile b/dashboard/Dockerfile new file mode 100644 index 0000000..cfff59e --- /dev/null +++ b/dashboard/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.13-slim + +COPY harness-dashboard.py /app/harness-dashboard.py +WORKDIR /app + +EXPOSE 3001 + +CMD ["python3", "harness-dashboard.py"] diff --git a/dashboard/Dockerfile.dashboard b/dashboard/Dockerfile.dashboard new file mode 100644 index 0000000..fbebc6b --- /dev/null +++ b/dashboard/Dockerfile.dashboard @@ -0,0 +1,5 @@ +FROM python:3.11-slim +WORKDIR /app +COPY harness-dashboard.py . +EXPOSE 3001 +CMD ["python3", "harness-dashboard.py"] diff --git a/dashboard/harness-dashboard.py b/dashboard/harness-dashboard.py new file mode 100644 index 0000000..d52b9e5 --- /dev/null +++ b/dashboard/harness-dashboard.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +"""Syslog Harness Dashboard — Simple HTTP server exposing GPU health + metrics.""" + +import json +import os +import time +import urllib.request +from http.server import HTTPServer, SimpleHTTPRequestHandler +from datetime import datetime + +GPUS = { + "amdpve": {"endpoint": os.getenv("AMDVE_EP", "192.168.68.15:8080"), "model": "qwen3.6-35B-A3B (MoE)", "vram": "65GB"}, + "llmgpu": {"endpoint": os.getenv("LLMGPU_EP", "192.168.68.8:8080"), "model": "qwen3.5-27B (Dense)", "vram": "24GB"}, + "ocu_llm": {"endpoint": os.getenv("OCU_LLM_EP", "192.168.68.110:8080"), "model": "gemma-4-E4B (Light)", "vram": "12GB"}, +} + + +def check_gpu(name, info): + try: + start = time.time() + # Use simple HTTP GET to check if the GPU endpoint is alive + resp = urllib.request.urlopen(f"http://{info['endpoint']}/", timeout=3) + latency = (time.time() - start) * 1000 + return { + "status": "up", + "latency_ms": round(latency, 1), + "model": info["model"], + "vram": info["vram"], + } + except Exception as e: + return {"status": "down", "error": str(e)[:50], "model": info["model"], "vram": info["vram"]} + + +def get_queue_status(): + try: + req = urllib.request.Request("http://queue-service:8091/status") + resp = urllib.request.urlopen(req, timeout=2) + return json.loads(resp.read()) + except Exception: + return {"queue_depth": -1, "circuit_breaker": "unknown", "gpu_health": {}} + + +DASHBOARD_HTML = """ + +🦅 Syslog Harness + + +

🦅 Syslog Harness Dashboard

+

Updated:

+ +
+

Queue & Circuit Breaker

+
Depth: --
+
Circuit: --
+
Threshold: --
+
+ +
+

GPU Endpoints

+ +
GPUModelVRAMStatusLatency
+
+ + +""" + + +class Handler(SimpleHTTPRequestHandler): + def do_GET(self): + if self.path == "/" or self.path == "/harness.html": + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + self.wfile.write(DASHBOARD_HTML.encode()) + elif self.path == "/api/status": + status = get_queue_status() + enriched = { + "queue_depth": status.get("queue_depth", -1), + "circuit_breaker": status.get("circuit_breaker", "unknown"), + "thresholds": status.get("thresholds", {"warn": 30, "open": 50}), + "gpu_health": {}, + } + for name, info in GPUS.items(): + enriched["gpu_health"][name] = check_gpu(name, info) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(enriched).encode()) + else: + self.send_response(404) + self.end_headers() + + def log_message(self, format, *args): + pass # Suppress request logs + + +if __name__ == "__main__": + server = HTTPServer(("0.0.0.0", 3001), Handler) + print("Dashboard running on :3001/harness.html") + server.serve_forever() diff --git a/gpu-dashboard/collector.py b/gpu-dashboard/collector.py new file mode 100644 index 0000000..4a681cf --- /dev/null +++ b/gpu-dashboard/collector.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +"""GPU metrics collector — polls sidecars + llama.cpp every 10s, writes to Workspace.""" + +import urllib.request, json, time, os + +HOSTS = [ + {"name": "amdpve", "host": "192.168.68.15", "gpu": "AMD Strix Halo", "llama_port": 8080}, + {"name": "llmgpu", "host": "192.168.68.8", "gpu": "RTX 3090", "llama_port": 8080}, + {"name": "ocu-llm", "host": "192.168.68.110", "gpu": "RTX 5070", "llama_port": 8080}, +] +OUTPUT = "/root/hermes-workspace/public/gpu_metrics.json" +INTERVAL = 10 +STALE_THRESHOLD = 30 # seconds before marking stale +DEAD_THRESHOLD = 60 # seconds before marking unreachable + +last_seen = {} + + +def fetch_json(url, timeout=3): + try: + req = urllib.request.Request(url) + resp = urllib.request.urlopen(req, timeout=timeout) + return json.loads(resp.read().decode()) + except Exception: + return None + + +def collect_one(h): + """Collect GPU hardware + llama.cpp inference state for one host.""" + name = h["name"] + host = h["host"] + now = time.time() + + # GPU hardware from sidecar + gpu = fetch_json(f"http://{host}:8090/") + + # llama.cpp inference state + llamacpp_health = fetch_json(f"http://{host}:{h['llama_port']}/health") + llamacpp_models = fetch_json(f"http://{host}:{h['llama_port']}/v1/models") + + # Determine inference state + model_name = None + inference_state = "unknown" + if llamacpp_models: + models = llamacpp_models.get("data", []) + if models: + model_name = models[0].get("id") + + if llamacpp_health: + status = llamacpp_health.get("status", "") + if status == "ok": + idle = llamacpp_health.get("slots_idle", 0) + processing = llamacpp_health.get("slots_processing", 0) + if idle and not processing: + inference_state = "idle" + elif processing: + inference_state = "busy" + else: + inference_state = "idle" + + # Check for /slots endpoint for is_processing detail + slots = fetch_json(f"http://{host}:{h['llama_port']}/slots") + if slots and isinstance(slots, list) and len(slots) > 0: + if slots[0].get("is_processing"): + inference_state = "busy" + + result = { + "host": name, + "gpu_name": h["gpu"], + "inference": { + "state": inference_state, + "model": model_name, + }, + "hardware": gpu if gpu else None, + "online": gpu is not None, + "timestamp": now, + } + + if gpu is not None: + last_seen[name] = now + + if name in last_seen: + age = now - last_seen[name] + if age > DEAD_THRESHOLD: + result["online"] = False + elif age > STALE_THRESHOLD: + result["stale"] = True + + return result + + +def main(): + print(f"GPU collector starting, output={OUTPUT}, interval={INTERVAL}s") + os.makedirs(os.path.dirname(OUTPUT), exist_ok=True) + + while True: + start = time.time() + results = [collect_one(h) for h in HOSTS] + + payload = { + "updated": start, + "gpus": results, + } + + with open(OUTPUT + ".tmp", "w") as f: + json.dump(payload, f) + os.rename(OUTPUT + ".tmp", OUTPUT) + + elapsed = time.time() - start + sleep_for = max(0, INTERVAL - elapsed) + time.sleep(sleep_for) + + +if __name__ == "__main__": + main() diff --git a/gpu-dashboard/start.sh b/gpu-dashboard/start.sh new file mode 100644 index 0000000..696212e --- /dev/null +++ b/gpu-dashboard/start.sh @@ -0,0 +1,14 @@ +#!/bin/bash +set -e + +# Start collector as background process +cd /root/hermes-workspace/public +python3 /app/collector.py & +COLLECTOR_PID=$! + +echo "Collector started (PID $COLLECTOR_PID)" +echo "Serving dashboard on :8092" + +# Serve the public directory (contains gpu.html + gpu_metrics.json) +cd /root/hermes-workspace/public +python3 -m http.server 8092 diff --git a/gpu-router-docker.conf b/gpu-router-docker.conf index 31de7cc..cc0da1f 100644 --- a/gpu-router-docker.conf +++ b/gpu-router-docker.conf @@ -24,7 +24,7 @@ upstream queue_service { upstream dashboard_service { ## Harness dashboard (Docker container) - server dashboard:3001; + server syslog-harness-dashboard-1:3001; } upstream gpu_dashboard_pool { diff --git a/queue-service/Dockerfile b/queue-service/Dockerfile new file mode 100644 index 0000000..a4da88d --- /dev/null +++ b/queue-service/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.13-slim + +RUN pip install --no-cache-dir flask redis + +COPY queue-service.py /app/queue-service.py +WORKDIR /app + +EXPOSE 8091 + +CMD ["python3", "queue-service.py"] diff --git a/queue-service/queue-service.py b/queue-service/queue-service.py new file mode 100644 index 0000000..3e2bbed --- /dev/null +++ b/queue-service/queue-service.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +"""Syslog Inference Queue Service — Circuit breaker + request queuing. + +Ports: 8091 +Endpoints: + /health — liveness probe (Nginx upstream check) + /enqueue — POST inference request into queue (fallback from Nginx) + /status — GET queue depth + circuit breaker state +""" + +import json +import os +import sys +import time +import urllib.request +from flask import Flask, request, jsonify + +app = Flask(__name__) + +# Configuration +REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7") +REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) +QUEUE_KEY = "inference:requests" +CIRCUIT_OPEN_THRESHOLD = 50 +CIRCUIT_WARN_THRESHOLD = 30 + +# GPU endpoints for draining +GPUS = { + "amdpve": "192.168.68.15:8080", + "llmgpu": "192.168.68.8:8080", + "ocu_llm": "192.168.68.110:8080", +} + + +def get_redis(): + try: + import redis + return redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) + except Exception: + return None + + +def get_queue_depth(r): + try: + return r.llen(QUEUE_KEY) + except Exception: + return 0 + + +def check_gpu_health(endpoint): + try: + req = urllib.request.Request(f"http://{endpoint}/v1/models") + req.add_header("User-Agent", "queue-service/1.0") + resp = urllib.request.urlopen(req, timeout=3) + return resp.status == 200 + except Exception: + return False + + +@app.route("/health") +def health(): + """Nginx upstream health probe. Returns 200 if service is alive.""" + return jsonify({"status": "ok", "service": "queue-service"}), 200 + + +@app.route("/enqueue", methods=["POST"]) +def enqueue(): + """Fallback endpoint — Nginx calls this when all GPU upstreams are down.""" + r = get_redis() + if not r: + return jsonify({"error": "Redis unavailable"}), 503 + + depth = get_queue_depth(r) + if depth >= CIRCUIT_OPEN_THRESHOLD: + return jsonify({ + "error": "Circuit breaker OPEN", + "queue_depth": depth, + "threshold": CIRCUIT_OPEN_THRESHOLD + }), 503 + + # Store the request in queue + payload = request.get_data(as_text=True) + headers = {k: v for k, v in request.headers if k.startswith("X-")} + r.rpush(QUEUE_KEY, json.dumps({ + "payload": payload, + "headers": headers, + "queued_at": time.time() + })) + + new_depth = get_queue_depth(r) + return jsonify({ + "status": "queued", + "position": new_depth, + "circuit": "warn" if new_depth >= CIRCUIT_WARN_THRESHOLD else "closed" + }), 202 + + +@app.route("/status") +def status(): + """GET queue depth + circuit breaker state + GPU health.""" + r = get_redis() + depth = get_queue_depth(r) if r else -1 + circuit = "open" if depth >= CIRCUIT_OPEN_THRESHOLD else ("warn" if depth >= CIRCUIT_WARN_THRESHOLD else "closed") + + gpu_health = {} + for name, endpoint in GPUS.items(): + gpu_health[name] = "up" if check_gpu_health(endpoint) else "down" + + return jsonify({ + "queue_depth": depth, + "circuit_breaker": circuit, + "gpu_health": gpu_health, + "thresholds": { + "warn": CIRCUIT_WARN_THRESHOLD, + "open": CIRCUIT_OPEN_THRESHOLD + } + }) + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=8091) diff --git a/syslog-harness-check b/syslog-harness-check new file mode 160000 index 0000000..b65ea22 --- /dev/null +++ b/syslog-harness-check @@ -0,0 +1 @@ +Subproject commit b65ea22765fcff6cd96028eedc6c781be1db6125