b09a93f45c
- SMART_QUEUE_IMPLEMENTATION.md: Complete implementation draft (1572 lines) with 10 quick-win fixes and full smart queue consumer rewrite - ARCHITECTURE_REVIEW.md: 26-issue audit with prioritized findings - Verified all 3 GPUs live: amdpve (73% util), llmgpu (idle), ocu_llm (idle) - Redis 7.4.9 confirmed streams support - GPU sidecar metrics verified on all hosts Key fixes: - QW-1: Dockerfile path mismatch (Dockerfile.queue -> queue-service/Dockerfile) - QW-2: Nginx fallback only on ALL-GPU failure (not single GPU) - QW-3: Container names fixed to Docker service names - QW-4: Redis host default fixed (192.168.68.7 -> redis) - QW-5: Dependency version pinning - QW-7-10: Health checks, restart policy, Gunicorn, single-process collector Smart queue features: - Redis Streams + consumer groups - GPU-aware load balancing via sidecar metrics - Per-GPU circuit breakers with half-open recovery - Adaptive backpressure (0-30 normal, 30-40 warn, 40-50 503, >50 open) - Dead letter queue with retry endpoint - Job ID tracking and /status/<job_id> API
122 lines
3.2 KiB
Python
122 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Syslog Inference Queue Service — Circuit breaker + request queuing.
|
|
|
|
Ports: 8091
|
|
Endpoints:
|
|
/health — liveness probe (Nginx upstream check)
|
|
/enqueue — POST inference request into queue (fallback from Nginx)
|
|
/status — GET queue depth + circuit breaker state
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from flask import Flask, request, jsonify
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Configuration
|
|
REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7")
|
|
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
|
|
QUEUE_KEY = "inference:requests"
|
|
CIRCUIT_OPEN_THRESHOLD = 50
|
|
CIRCUIT_WARN_THRESHOLD = 30
|
|
|
|
# GPU endpoints for draining
|
|
GPUS = {
|
|
"amdpve": "192.168.68.15:8080",
|
|
"llmgpu": "192.168.68.8:8080",
|
|
"ocu_llm": "192.168.68.110:8080",
|
|
}
|
|
|
|
|
|
def get_redis():
|
|
try:
|
|
import redis
|
|
return redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_queue_depth(r):
|
|
try:
|
|
return r.llen(QUEUE_KEY)
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def check_gpu_health(endpoint):
|
|
try:
|
|
req = urllib.request.Request(f"http://{endpoint}/v1/models")
|
|
req.add_header("User-Agent", "queue-service/1.0")
|
|
resp = urllib.request.urlopen(req, timeout=3)
|
|
return resp.status == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
"""Nginx upstream health probe. Returns 200 if service is alive."""
|
|
return jsonify({"status": "ok", "service": "queue-service"}), 200
|
|
|
|
|
|
@app.route("/enqueue", methods=["POST"])
|
|
def enqueue():
|
|
"""Fallback endpoint — Nginx calls this when all GPU upstreams are down."""
|
|
r = get_redis()
|
|
if not r:
|
|
return jsonify({"error": "Redis unavailable"}), 503
|
|
|
|
depth = get_queue_depth(r)
|
|
if depth >= CIRCUIT_OPEN_THRESHOLD:
|
|
return jsonify({
|
|
"error": "Circuit breaker OPEN",
|
|
"queue_depth": depth,
|
|
"threshold": CIRCUIT_OPEN_THRESHOLD
|
|
}), 503
|
|
|
|
# Store the request in queue
|
|
payload = request.get_data(as_text=True)
|
|
headers = {k: v for k, v in request.headers if k.startswith("X-")}
|
|
r.rpush(QUEUE_KEY, json.dumps({
|
|
"payload": payload,
|
|
"headers": headers,
|
|
"queued_at": time.time()
|
|
}))
|
|
|
|
new_depth = get_queue_depth(r)
|
|
return jsonify({
|
|
"status": "queued",
|
|
"position": new_depth,
|
|
"circuit": "warn" if new_depth >= CIRCUIT_WARN_THRESHOLD else "closed"
|
|
}), 202
|
|
|
|
|
|
@app.route("/status")
|
|
def status():
|
|
"""GET queue depth + circuit breaker state + GPU health."""
|
|
r = get_redis()
|
|
depth = get_queue_depth(r) if r else -1
|
|
circuit = "open" if depth >= CIRCUIT_OPEN_THRESHOLD else ("warn" if depth >= CIRCUIT_WARN_THRESHOLD else "closed")
|
|
|
|
gpu_health = {}
|
|
for name, endpoint in GPUS.items():
|
|
gpu_health[name] = "up" if check_gpu_health(endpoint) else "down"
|
|
|
|
return jsonify({
|
|
"queue_depth": depth,
|
|
"circuit_breaker": circuit,
|
|
"gpu_health": gpu_health,
|
|
"thresholds": {
|
|
"warn": CIRCUIT_WARN_THRESHOLD,
|
|
"open": CIRCUIT_OPEN_THRESHOLD
|
|
}
|
|
})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=8091)
|