change scheduling strategy to hibrid priority queue with aging

This commit is contained in:
2026-05-19 20:57:38 +02:00
parent 0f5aabbf15
commit b379faebdb
10 changed files with 138 additions and 164 deletions

View File

@@ -6,7 +6,8 @@
"Bash(python -m pytest tests/test_slot_tracker.py -v)",
"Bash(python -m pytest tests/test_scheduler.py -v)",
"Bash(python -m pytest tests/test_monitor.py::TestMonitorEndpoints::test_monitor_data_structure -v)",
"Bash(python -m pytest -q)"
"Bash(python -m pytest -q)",
"Bash(python -m pytest tests/test_config.py -q)"
]
}
}

View File

@@ -7,7 +7,8 @@
"session_idle_ttl": 300,
"default_slot_capacity": 1,
"default_max_models": 1,
"max_queue_skip": 4,
"model_affinity_sched_bonus": 10,
"queue_aging_equalization": 30.0,
"model_unload_delay": 3.0,
"model_limits": {
"my-very-large-model": 1

View File

@@ -38,12 +38,15 @@ class ProxyConfig(BaseSettings):
# Fallback max_models applied to any backend that does not set its own.
# None = unlimited. Set to 1 globally when all backends are llama.cpp.
default_max_models: int | None = None
# How many queue positions a model-affinity request may skip ahead.
# 0 = pure FIFO (default). N > 0 enables reordering: the scheduler looks
# for a request matching an already-active model and can promote it up to N
# positions; each bypassed entry accumulates a skip count and is immune to
# further skipping once it reaches N.
max_queue_skip: int = 0
# Priority bonus added to requests whose model is currently warm (active or in
# the warm-hold window) on an available backend. 0 = pure FIFO.
# Works together with queue_aging_equalization: after that many seconds a
# waiting request's age score catches up to the bonus, restoring FIFO order.
model_affinity_sched_bonus: int = 0
# Seconds after which an aging request's accumulated score equals
# model_affinity_sched_bonus, ensuring starvation is impossible.
# Only meaningful when model_affinity_sched_bonus > 0.
queue_aging_equalization: float = 30.0
# Seconds to keep a backend sticky to its last model after all slots drain.
# Prevents unnecessary model swaps for follow-up requests (e.g. title/suggestion
# generation) that arrive shortly after the main response. 0 = disabled.

View File

@@ -1,12 +1,14 @@
from __future__ import annotations
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from fastapi import APIRouter
from fastapi.responses import HTMLResponse, JSONResponse
from .queue import RequestQueue
from .queue import QueueEntry, RequestQueue
from .registry import BackendRegistry
from .session_store import SessionStore
from .slot_tracker import SlotTracker
@@ -113,7 +115,7 @@ _HTML = """<!DOCTYPE html>
<h2>Queue</h2>
<table>
<thead><tr><th>Request ID</th><th>Model</th><th>Session</th><th>Wait (s)</th><th>Est. Tokens</th><th>Skips</th></tr></thead>
<thead><tr><th>Request ID</th><th>Model</th><th>Session</th><th>Wait (s)</th><th>Est. Tokens</th><th class="num">Priority</th></tr></thead>
<tbody id="queue-body"><tr><td colspan="6" class="empty">Queue is empty</td></tr></tbody>
</table>
@@ -182,7 +184,8 @@ _HTML = """<!DOCTYPE html>
qBody.innerHTML = data.queue.map(e => {
const tok = e.estimated_tokens != null ? esc(e.estimated_tokens) : '<span class="empty">-</span>';
const sid = e.session_id ? esc(e.session_id) : '<span class="empty">-</span>';
return `<tr><td>${esc(e.request_id.slice(0,12))}</td><td>${esc(e.model_id||'-')}</td><td>${sid}</td><td>${esc(e.wait_seconds.toFixed(2))}</td><td>${tok}</td><td>${esc(e.skip_count)}</td></tr>`;
const pri = e.priority != null ? `<span class="slots">${e.priority.toFixed(1)}</span>` : '<span class="empty">-</span>';
return `<tr><td>${esc(e.request_id.slice(0,12))}</td><td>${esc(e.model_id||'-')}</td><td>${sid}</td><td>${esc(e.wait_seconds.toFixed(2))}</td><td>${tok}</td><td class="num">${pri}</td></tr>`;
}).join('');
}
@@ -245,6 +248,7 @@ def build_router(
request_queue: RequestQueue,
session_store: SessionStore,
stats: ProxyStats,
priority_fn: Callable[[QueueEntry], float] | None = None,
) -> APIRouter:
router = APIRouter()
@@ -272,7 +276,7 @@ def build_router(
}
)
queue_snapshot = await request_queue.snapshot()
queue_snapshot = await request_queue.snapshot(score_fn=priority_fn)
session_count = await session_store.count()
sessions_by_model = await session_store.count_by_model()
live_count = sum(1 for s in states if s.live)

View File

@@ -354,7 +354,8 @@ def create_app(config: ProxyConfig) -> FastAPI:
slot_tracker=slot_tracker,
session_store=session_store,
policy=RoundRobinPolicy(),
max_queue_skip=config.max_queue_skip,
model_affinity_sched_bonus=config.model_affinity_sched_bonus,
queue_aging_equalization=config.queue_aging_equalization,
)
@asynccontextmanager
@@ -391,6 +392,7 @@ def create_app(config: ProxyConfig) -> FastAPI:
request_queue=request_queue,
session_store=session_store,
stats=stats,
priority_fn=scheduler._priority if config.model_affinity_sched_bonus > 0 else None,
)
)

View File

@@ -17,10 +17,6 @@ class QueueEntry:
future: asyncio.Future | None = field(default=None)
# Populated by scheduler at dispatch time
assigned_backend: str | None = None
# Incremented each time a later entry is dispatched ahead of this one via
# model-affinity reordering. When it reaches max_queue_skip the entry
# becomes immune to further skipping (starvation prevention).
skip_count: int = 0
@property
def wait_seconds(self) -> float:
@@ -59,19 +55,20 @@ class RequestQueue:
async with self._lock:
return len(self._entries)
async def snapshot(self) -> list[dict]:
async def snapshot(self, score_fn=None) -> list[dict]:
async with self._lock:
return [
{
rows = []
for e in self._entries:
d: dict = {
"request_id": e.request_id,
"model_id": e.model_id,
"session_id": (e.session_id or "")[:8] or None,
"wait_seconds": round(e.wait_seconds, 2),
"estimated_tokens": e.estimated_tokens,
"skip_count": e.skip_count,
"priority": round(score_fn(e), 1) if score_fn else None,
}
for e in self._entries
]
rows.append(d)
return rows
def notify(self) -> None:
"""Wake the scheduler without holding the lock."""

View File

@@ -18,36 +18,34 @@ class Scheduler:
Dispatch order
--------------
When max_queue_skip == 0 (default) the queue is pure FIFO.
When model_affinity_sched_bonus == 0 (default) the queue is pure FIFO.
When max_queue_skip > N the scheduler runs a two-phase dispatch on every
wakeup:
When model_affinity_sched_bonus > 0 each entry gets an effective priority:
Phase 1 — model-affinity promotion
The scheduler scans the queue looking for entries whose model is already
in-flight on a free backend. It promotes those entries ahead of earlier
entries that would require a model switch. For every entry that is
bypassed, its skip_count is incremented. Scanning stops as soon as an
entry with skip_count >= max_queue_skip is encountered (that entry is
frozen at the head and must be served next, preventing starvation).
priority = warm_bonus + age_score
Phase 2 — standard FIFO
Remaining entries are dispatched in arrival order to any backend that
can accept the model (slot free, max_models constraint satisfied).
warm_bonus = model_affinity_sched_bonus when the requested model is
currently warm (active or in warm-hold window) on a free
backend; 0 otherwise. This makes back-to-back requests for
an already-loaded model cheaper to serve.
age_score = wait_seconds × (bonus / queue_aging_equalization)
grows linearly with queue time. After queue_aging_equalization
seconds a cold request's age_score equals the warm bonus,
guaranteeing starvation is impossible.
Entries are sorted by priority (descending) on every dispatch cycle.
Python's sort is stable, so equal-priority entries stay in arrival order
(FIFO within the same priority band).
Session affinity
----------------
Session affinity is applied inside _resolve_backend as a hint: if the
preferred backend is in the candidate set it is chosen first. Affinity
is re-pinned to whichever backend ultimately serves the request; it is
never queued on a preferred backend when a free alternative exists.
preferred backend is in the candidate set it is chosen first.
Preemption prevention
---------------------
SlotTracker.can_accept() enforces the per-backend max_models limit. A
backend with max_models=1 that is serving model-A will reject model-B
requests until all model-A slots are released, preventing llama.cpp from
preempting the in-flight generation.
SlotTracker.can_accept() enforces the per-backend max_models limit.
"""
def __init__(
@@ -57,14 +55,21 @@ class Scheduler:
slot_tracker: SlotTracker,
session_store: SessionStore,
policy: RoutingPolicy,
max_queue_skip: int = 0,
model_affinity_sched_bonus: int = 0,
queue_aging_equalization: float = 30.0,
) -> None:
self._queue = queue
self._registry = registry
self._slots = slot_tracker
self._sessions = session_store
self._policy = policy
self._max_queue_skip = max_queue_skip
self._affinity_bonus = model_affinity_sched_bonus
# Points per second so that age_score == bonus after queue_aging_equalization s.
self._aging_rate: float = (
model_affinity_sched_bonus / queue_aging_equalization
if model_affinity_sched_bonus > 0 and queue_aging_equalization > 0
else 0.0
)
self._task: asyncio.Task | None = None
def notify_slot_released(self) -> None:
@@ -85,6 +90,25 @@ class Scheduler:
self._queue.wakeup_event.clear()
await self._dispatch_all()
# ------------------------------------------------------------------
# Priority
# ------------------------------------------------------------------
def _priority(self, entry: QueueEntry) -> float:
age_score = entry.wait_seconds * self._aging_rate
warm_bonus = self._affinity_bonus if self._is_warm_for(entry) else 0
return age_score + warm_bonus
def _is_warm_for(self, entry: QueueEntry) -> bool:
"""True if the requested model is warm on at least one free backend."""
if not entry.model_id:
return False
return any(
self._slots.is_warm(b.url, entry.model_id)
and self._slots.can_accept(b.url, entry.model_id)
for b in self._registry.get_backends_for_model(entry.model_id)
)
# ------------------------------------------------------------------
# Dispatch
# ------------------------------------------------------------------
@@ -92,7 +116,6 @@ class Scheduler:
async def _dispatch_all(self) -> None:
entries = await self._queue.pending()
# Prune stale futures first so they don't count in skip logic.
for entry in entries:
if entry.future is None or entry.future.done():
await self._queue.remove(entry)
@@ -101,70 +124,18 @@ class Scheduler:
if not entries:
return
dispatched: set[int] = set()
if self._affinity_bonus > 0:
entries = sorted(entries, key=self._priority, reverse=True)
if self._max_queue_skip > 0:
await self._affinity_pass(entries, dispatched)
# Standard FIFO pass for all remaining live entries.
for entry in entries:
if id(entry) in dispatched:
continue
if entry.future is None or entry.future.done():
await self._queue.remove(entry)
continue
if await self._try_dispatch(entry):
dispatched.add(id(entry))
await self._queue.remove(entry)
async def _affinity_pass(
self, entries: list[QueueEntry], dispatched: set[int]
) -> None:
"""Phase 1: promote entries whose model is already active on a free backend.
Stops scanning as soon as an entry with skip_count >= max_queue_skip is
encountered — that entry is frozen and must be served by the FIFO pass.
"""
for i, entry in enumerate(entries):
if id(entry) in dispatched:
continue
if entry.skip_count >= self._max_queue_skip:
break # frozen head-of-line; FIFO pass must handle it next
if await self._try_dispatch_affinity(entry):
dispatched.add(id(entry))
await self._queue.remove(entry)
# Bump skip_count for every earlier entry we bypassed.
for j in range(i):
if id(entries[j]) not in dispatched:
entries[j].skip_count += 1
async def _try_dispatch_affinity(self, entry: QueueEntry) -> bool:
"""Dispatch only to a backend that already has entry.model_id in-flight.
Skipped when a completely idle backend exists — piling onto a warm backend
while another is idle reduces effective capacity without KV-cache benefit
(different conversations don't share KV cache across sessions).
"""
if not entry.model_id:
return False
live_backends = self._registry.get_backends_for_model(entry.model_id)
if any(
self._slots.usage(b.url)[0] == 0 and self._slots.can_accept(b.url, entry.model_id)
for b in live_backends
):
return False
active_backends = [
b for b in live_backends
if entry.model_id in self._slots.active_model_set(b.url)
and self._slots.can_accept(b.url, entry.model_id)
]
if not active_backends:
return False
return await self._acquire_and_resolve(entry, active_backends)
async def _try_dispatch(self, entry: QueueEntry) -> bool:
"""Standard dispatch: any live backend that can accept this model."""
"""Dispatch to any live backend that can accept this model."""
if entry.model_id:
live_backends = self._registry.get_backends_for_model(entry.model_id)
else:

View File

@@ -86,6 +86,15 @@ class SlotTracker:
return True
return self._can_acquire(state, model_id)
def is_warm(self, url: str, model_id: str) -> bool:
"""True if model_id is active or in warm-hold window on this backend."""
state = self._slots.get(url)
if state is None:
return False
if model_id in state.active_models:
return True
return state.sticky_model == model_id and time.monotonic() < state.sticky_until
def active_model_set(self, url: str) -> frozenset[str]:
"""Models currently in-flight on this backend."""
state = self._slots.get(url)

View File

@@ -49,7 +49,8 @@ class TestProxyConfig(unittest.TestCase):
self.assertEqual(cfg.session_idle_ttl, 300.0)
self.assertEqual(cfg.backends, [])
self.assertIsNone(cfg.default_max_models)
self.assertEqual(cfg.max_queue_skip, 0)
self.assertEqual(cfg.model_affinity_sched_bonus, 0)
self.assertEqual(cfg.queue_aging_equalization, 30.0)
def test_from_file_minimal(self):
path = self._write_config(
@@ -71,7 +72,8 @@ class TestProxyConfig(unittest.TestCase):
"slot_wait_timeout": 60,
"session_idle_ttl": 600,
"default_max_models": 2,
"max_queue_skip": 5,
"model_affinity_sched_bonus": 10,
"queue_aging_equalization": 30.0,
"backends": [
{"url": "http://b1", "api_key": "secret", "model_ids": ["m1"], "max_models": 1},
{"url": "http://b2/"},
@@ -85,7 +87,8 @@ class TestProxyConfig(unittest.TestCase):
self.assertEqual(cfg.api_keys, ["key1", "key2"])
self.assertEqual(cfg.poll_interval, 10)
self.assertEqual(cfg.default_max_models, 2)
self.assertEqual(cfg.max_queue_skip, 5)
self.assertEqual(cfg.model_affinity_sched_bonus, 10)
self.assertEqual(cfg.queue_aging_equalization, 30.0)
self.assertEqual(cfg.backends[0].api_key, "secret")
self.assertEqual(cfg.backends[0].model_ids, ["m1"])
self.assertEqual(cfg.backends[0].max_models, 1)

View File

@@ -22,7 +22,12 @@ def _entry(**kwargs) -> QueueEntry:
class TestScheduler(unittest.IsolatedAsyncioTestCase):
def _make_scheduler(self, live_backends=None, max_queue_skip: int = 0):
def _make_scheduler(
self,
live_backends=None,
model_affinity_sched_bonus: int = 0,
queue_aging_equalization: float = 30.0,
):
cfg = ProxyConfig(backends=[BackendConfig(url=b.url) for b in (live_backends or [])])
registry = BackendRegistry(cfg)
for state in (live_backends or []):
@@ -41,7 +46,8 @@ class TestScheduler(unittest.IsolatedAsyncioTestCase):
slot_tracker=slot_tracker,
session_store=session_store,
policy=RoundRobinPolicy(),
max_queue_skip=max_queue_skip,
model_affinity_sched_bonus=model_affinity_sched_bonus,
queue_aging_equalization=queue_aging_equalization,
)
return scheduler, queue, registry, slot_tracker, session_store
@@ -212,108 +218,85 @@ class TestScheduler(unittest.IsolatedAsyncioTestCase):
self.assertTrue(entry.future.done(), "same model should still be dispatchable")
# ------------------------------------------------------------------
# N-skip reordering
# Priority scheduling
# ------------------------------------------------------------------
async def test_no_reorder_when_max_queue_skip_zero(self):
"""Default FIFO: model-B request is not promoted over model-A."""
async def test_pure_fifo_when_bonus_zero(self):
"""Default (bonus=0): blocked head-of-queue does not prevent later entries."""
b1 = _make_state("http://b1", models=["m1"])
b2 = _make_state("http://b2", models=["m2"])
scheduler, queue, _, slots, _ = self._make_scheduler([b1, b2], max_queue_skip=0)
scheduler, queue, _, slots, _ = self._make_scheduler([b1, b2])
slots.set_capacity("http://b1", 1)
slots.set_capacity("http://b2", 1)
# Fill b1; b2 is free with m2 active
async with asyncio.timeout(1.0):
await slots.acquire("http://b1", "m1")
async with asyncio.timeout(1.0):
await slots.acquire("http://b2", "m2")
# Queue: [m1-request (blocked), m2-request (could go to b2)]
e_m1 = _entry(model_id="m1")
e_m2 = _entry(model_id="m2")
await queue.enqueue(e_m1)
await queue.enqueue(e_m2)
# Release b2 slot so m2 can be served
await slots.release("http://b2", "m2")
await scheduler._dispatch_all()
# m2 can be dispatched even with max_queue_skip=0 because _dispatch_all
# scans all entries (not strict head-of-line per model)
self.assertFalse(e_m1.future.done())
self.assertTrue(e_m2.future.done())
# skip_count must NOT be bumped when max_queue_skip=0
self.assertEqual(e_m1.skip_count, 0)
async def test_affinity_promotes_matching_model(self):
"""With max_queue_skip>0, a matching model gets promoted."""
b1 = _make_state("http://b1", models=["m1"])
scheduler, queue, _, slots, _ = self._make_scheduler([b1], max_queue_skip=3)
async def test_warm_model_gets_priority(self):
"""With bonus>0, a warm-model request is dispatched before a cold one."""
b1 = _make_state("http://b1", models=["m1", "m2"])
scheduler, queue, _, slots, _ = self._make_scheduler(
[b1], model_affinity_sched_bonus=10
)
# capacity=2: one slot occupied by m1, one free — only one more can be dispatched
slots.set_capacity("http://b1", 2)
# b1 already has m1 in-flight
# m1 is in-flight on b1 (warm); m2 is cold
async with asyncio.timeout(1.0):
await slots.acquire("http://b1", "m1")
# Queue: [m2-entry (no affinity), m1-entry (affinity match)]
e_other = _entry(model_id="m2") # no backend serves m2
# Queue: cold m2 request arrives first, warm m1 request arrives second
e_m2 = _entry(model_id="m2")
e_m1 = _entry(model_id="m1")
await queue.enqueue(e_other)
await queue.enqueue(e_m2)
await queue.enqueue(e_m1)
await scheduler._dispatch_all()
# m1 entry is promoted (affinity pass), m2 stays (no backend)
# m1 has warm bonus → higher priority → dispatched first despite arriving later
self.assertTrue(e_m1.future.done())
self.assertFalse(e_other.future.done())
# e_other was bypassed once
self.assertEqual(e_other.skip_count, 1)
self.assertFalse(e_m2.future.done())
async def test_affinity_skips_when_idle_backend_available(self):
"""Warm-model routing is bypassed when a completely idle backend exists."""
b1 = _make_state("http://b1", models=["m1"])
b2 = _make_state("http://b2", models=["m1"])
scheduler, queue, _, slots, _ = self._make_scheduler([b1, b2], max_queue_skip=3)
async def test_aging_overtakes_warm_bonus(self):
"""After equalization time, an aged cold request outranks the warm bonus."""
b1 = _make_state("http://b1", models=["m1", "m2"])
# equalization=0.1s so aging is fast enough to test synchronously
scheduler, queue, _, slots, _ = self._make_scheduler(
[b1], model_affinity_sched_bonus=10, queue_aging_equalization=0.1
)
# capacity=2: one slot occupied by m1, one free — only one more can be dispatched
slots.set_capacity("http://b1", 2)
# b2 is warm (m1 active, 1/2 slots used); b1 is completely idle (0/2)
async with asyncio.timeout(1.0):
await slots.acquire("http://b2", "m1")
entry = _entry(model_id="m1")
await queue.enqueue(entry)
await scheduler._dispatch_all()
self.assertTrue(entry.future.done())
# Affinity pass must not force the request onto the warm backend (b2).
# Round-robin picks b1 first (b1 is index 0 in the registry), which is
# correct: b1 is idle and should absorb the load.
self.assertEqual(entry.future.result().url, "http://b1")
async def test_skip_count_caps_reordering(self):
"""Once skip_count reaches max_queue_skip the entry freezes at head."""
b1 = _make_state("http://b1", models=["m1"])
scheduler, queue, _, slots, _ = self._make_scheduler([b1], max_queue_skip=2)
slots.set_capacity("http://b1", 4)
# b1 has m1 active
async with asyncio.timeout(1.0):
await slots.acquire("http://b1", "m1")
e_other = _entry(model_id="m2")
e_other.skip_count = 2 # already at limit — must not be bypassed
# m2 request arrives and waits long enough to exceed the bonus
e_m2 = _entry(model_id="m2")
await queue.enqueue(e_m2)
await asyncio.sleep(0.15) # age_score > bonus after equalization
# m1 warm request arrives after m2 has already aged past equalization
e_m1 = _entry(model_id="m1")
await queue.enqueue(e_other)
await queue.enqueue(e_m1)
await scheduler._dispatch_all()
# Affinity pass stops at e_other (skip_count >= max_queue_skip),
# so e_m1 is NOT promoted via affinity. Both get a chance in FIFO pass.
# e_other (m2) has no backend → stays. e_m1 gets dispatched in FIFO pass.
self.assertTrue(e_m1.future.done())
# skip_count must NOT increase further (entry was frozen)
self.assertEqual(e_other.skip_count, 2)
# m2's age_score now exceeds the warm bonus → m2 dispatched first
self.assertTrue(e_m2.future.done())
self.assertFalse(e_m1.future.done())
if __name__ == "__main__":