#!/usr/bin/env python3 """Syslog Inference Queue Service — Circuit breaker + request queuing. Ports: 8091 Endpoints: /health — liveness probe (Nginx upstream check) /enqueue — POST inference request into queue (fallback from Nginx) /status — GET queue depth + circuit breaker state """ import json import os import sys import time import urllib.request from flask import Flask, request, jsonify app = Flask(__name__) # Configuration REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7") REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) QUEUE_KEY = "inference:requests" CIRCUIT_OPEN_THRESHOLD = 50 CIRCUIT_WARN_THRESHOLD = 30 # GPU endpoints for draining GPUS = { "amdpve": "192.168.68.15:8080", "llmgpu": "192.168.68.8:8080", "ocu_llm": "192.168.68.110:8080", } def get_redis(): try: import redis return redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) except Exception: return None def get_queue_depth(r): try: return r.llen(QUEUE_KEY) except Exception: return 0 def check_gpu_health(endpoint): try: req = urllib.request.Request(f"http://{endpoint}/v1/models") req.add_header("User-Agent", "queue-service/1.0") resp = urllib.request.urlopen(req, timeout=3) return resp.status == 200 except Exception: return False @app.route("/health") def health(): """Nginx upstream health probe. Returns 200 if service is alive.""" return jsonify({"status": "ok", "service": "queue-service"}), 200 @app.route("/enqueue", methods=["POST"]) def enqueue(): """Fallback endpoint — Nginx calls this when all GPU upstreams are down.""" r = get_redis() if not r: return jsonify({"error": "Redis unavailable"}), 503 depth = get_queue_depth(r) if depth >= CIRCUIT_OPEN_THRESHOLD: return jsonify({ "error": "Circuit breaker OPEN", "queue_depth": depth, "threshold": CIRCUIT_OPEN_THRESHOLD }), 503 # Store the request in queue payload = request.get_data(as_text=True) headers = {k: v for k, v in request.headers if k.startswith("X-")} r.rpush(QUEUE_KEY, json.dumps({ "payload": payload, "headers": headers, "queued_at": time.time() })) new_depth = get_queue_depth(r) return jsonify({ "status": "queued", "position": new_depth, "circuit": "warn" if new_depth >= CIRCUIT_WARN_THRESHOLD else "closed" }), 202 @app.route("/status") def status(): """GET queue depth + circuit breaker state + GPU health.""" r = get_redis() depth = get_queue_depth(r) if r else -1 circuit = "open" if depth >= CIRCUIT_OPEN_THRESHOLD else ("warn" if depth >= CIRCUIT_WARN_THRESHOLD else "closed") gpu_health = {} for name, endpoint in GPUS.items(): gpu_health[name] = "up" if check_gpu_health(endpoint) else "down" return jsonify({ "queue_depth": depth, "circuit_breaker": circuit, "gpu_health": gpu_health, "thresholds": { "warn": CIRCUIT_WARN_THRESHOLD, "open": CIRCUIT_OPEN_THRESHOLD } }) if __name__ == "__main__": app.run(host="0.0.0.0", port=8091)