From b55b954967c5771d835a7665a9f65c31b5dfe136 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mumuni=20=F0=9F=A6=85=20=28Syslog=20Falcon=29?= Date: Fri, 15 May 2026 21:07:05 +0000 Subject: [PATCH] Add queue service --- queue-service/queue-service.py | 121 +++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 queue-service/queue-service.py diff --git a/queue-service/queue-service.py b/queue-service/queue-service.py new file mode 100644 index 0000000..3e2bbed --- /dev/null +++ b/queue-service/queue-service.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +"""Syslog Inference Queue Service — Circuit breaker + request queuing. + +Ports: 8091 +Endpoints: + /health — liveness probe (Nginx upstream check) + /enqueue — POST inference request into queue (fallback from Nginx) + /status — GET queue depth + circuit breaker state +""" + +import json +import os +import sys +import time +import urllib.request +from flask import Flask, request, jsonify + +app = Flask(__name__) + +# Configuration +REDIS_HOST = os.getenv("REDIS_HOST", "192.168.68.7") +REDIS_PORT = int(os.getenv("REDIS_PORT", "6379")) +QUEUE_KEY = "inference:requests" +CIRCUIT_OPEN_THRESHOLD = 50 +CIRCUIT_WARN_THRESHOLD = 30 + +# GPU endpoints for draining +GPUS = { + "amdpve": "192.168.68.15:8080", + "llmgpu": "192.168.68.8:8080", + "ocu_llm": "192.168.68.110:8080", +} + + +def get_redis(): + try: + import redis + return redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) + except Exception: + return None + + +def get_queue_depth(r): + try: + return r.llen(QUEUE_KEY) + except Exception: + return 0 + + +def check_gpu_health(endpoint): + try: + req = urllib.request.Request(f"http://{endpoint}/v1/models") + req.add_header("User-Agent", "queue-service/1.0") + resp = urllib.request.urlopen(req, timeout=3) + return resp.status == 200 + except Exception: + return False + + +@app.route("/health") +def health(): + """Nginx upstream health probe. Returns 200 if service is alive.""" + return jsonify({"status": "ok", "service": "queue-service"}), 200 + + +@app.route("/enqueue", methods=["POST"]) +def enqueue(): + """Fallback endpoint — Nginx calls this when all GPU upstreams are down.""" + r = get_redis() + if not r: + return jsonify({"error": "Redis unavailable"}), 503 + + depth = get_queue_depth(r) + if depth >= CIRCUIT_OPEN_THRESHOLD: + return jsonify({ + "error": "Circuit breaker OPEN", + "queue_depth": depth, + "threshold": CIRCUIT_OPEN_THRESHOLD + }), 503 + + # Store the request in queue + payload = request.get_data(as_text=True) + headers = {k: v for k, v in request.headers if k.startswith("X-")} + r.rpush(QUEUE_KEY, json.dumps({ + "payload": payload, + "headers": headers, + "queued_at": time.time() + })) + + new_depth = get_queue_depth(r) + return jsonify({ + "status": "queued", + "position": new_depth, + "circuit": "warn" if new_depth >= CIRCUIT_WARN_THRESHOLD else "closed" + }), 202 + + +@app.route("/status") +def status(): + """GET queue depth + circuit breaker state + GPU health.""" + r = get_redis() + depth = get_queue_depth(r) if r else -1 + circuit = "open" if depth >= CIRCUIT_OPEN_THRESHOLD else ("warn" if depth >= CIRCUIT_WARN_THRESHOLD else "closed") + + gpu_health = {} + for name, endpoint in GPUS.items(): + gpu_health[name] = "up" if check_gpu_health(endpoint) else "down" + + return jsonify({ + "queue_depth": depth, + "circuit_breaker": circuit, + "gpu_health": gpu_health, + "thresholds": { + "warn": CIRCUIT_WARN_THRESHOLD, + "open": CIRCUIT_OPEN_THRESHOLD + } + }) + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=8091)