diff --git a/pyproject.toml b/pyproject.toml index 34dd23b..af278d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "llamacpp-ha" -version = "0.9.0" +version = "0.10.0" description = "Smart load balancer for llama.cpp servers" requires-python = ">=3.13" dependencies = [ diff --git a/src/llamacpp_ha/monitor.py b/src/llamacpp_ha/monitor.py index 1971b55..9b12170 100644 --- a/src/llamacpp_ha/monitor.py +++ b/src/llamacpp_ha/monitor.py @@ -115,7 +115,7 @@ _HTML = """

Queue

- +
Request IDModelSessionWait (s)Est. TokensPriority
Request IDModelSessionAgeEst. TokensPriority
Queue is empty
@@ -141,6 +141,13 @@ _HTML = """ return n >= 1000 ? (n/1000).toFixed(1) + 'k' : String(n); } + function fmtAge(s) { + if (s < 60) return s.toFixed(1) + 's'; + const m = Math.floor(s / 60); + const r = Math.floor(s % 60); + return m + 'm ' + String(r).padStart(2, '0') + 's'; + } + function render(data) { document.getElementById('uptime').textContent = data.uptime; document.getElementById('total-req').textContent = data.total_requests; @@ -185,7 +192,7 @@ _HTML = """ const tok = e.estimated_tokens != null ? esc(e.estimated_tokens) : '-'; const sid = e.session_id ? esc(e.session_id) : '-'; const pri = e.priority != null ? `${e.priority.toFixed(1)}` : '-'; - return `${esc(e.request_id.slice(0,12))}${esc(e.model_id||'-')}${sid}${esc(e.wait_seconds.toFixed(2))}${tok}${pri}`; + return `${esc(e.request_id.slice(0,12))}${esc(e.model_id||'-')}${sid}${fmtAge(e.wait_seconds)}${tok}${pri}`; }).join(''); } diff --git a/src/llamacpp_ha/scheduler.py b/src/llamacpp_ha/scheduler.py index a5ad97a..b318057 100644 --- a/src/llamacpp_ha/scheduler.py +++ b/src/llamacpp_ha/scheduler.py @@ -20,21 +20,28 @@ class Scheduler: -------------- When model_affinity_sched_bonus == 0 (default) the queue is pure FIFO. - When model_affinity_sched_bonus > 0 each entry gets an effective priority: + When model_affinity_sched_bonus > 0 each dispatch cycle runs two passes: - priority = warm_bonus + age_score + 1. Diversity pass — for each distinct model that has NO active or warm + backend, the highest-priority entry for that model is dispatched first. + This guarantees that N distinct models in the queue will occupy N + backends (up to available capacity) before any model gets a second slot. + Respects SlotTracker.can_accept() so max_models still applies. - warm_bonus = model_affinity_sched_bonus when the requested model is - currently warm (active or in warm-hold window) on a free - backend; 0 otherwise. This makes back-to-back requests for - an already-loaded model cheaper to serve. + 2. Priority pass — remaining entries are dispatched by effective priority: - age_score = wait_seconds × (bonus / queue_aging_equalization) - grows linearly with queue time. After queue_aging_equalization - seconds a cold request's age_score equals the warm bonus, - guaranteeing starvation is impossible. + priority = warm_bonus + age_score + + warm_bonus = model_affinity_sched_bonus when the requested model is + currently warm (active or in warm-hold window) on any + backend; 0 otherwise. + + age_score = wait_seconds × (bonus / queue_aging_equalization) + grows linearly with queue time. After + queue_aging_equalization seconds a cold request's + age_score equals the warm bonus, guaranteeing starvation + is impossible. - Entries are sorted by priority (descending) on every dispatch cycle. Python's sort is stable, so equal-priority entries stay in arrival order (FIFO within the same priority band). @@ -137,6 +144,7 @@ class Scheduler: if self._affinity_bonus > 0: entries = sorted(entries, key=self._priority, reverse=True) + entries = await self._dispatch_unrepresented(entries) for entry in entries: if entry.future is None or entry.future.done(): @@ -145,6 +153,31 @@ class Scheduler: if await self._try_dispatch(entry): await self._queue.remove(entry) + async def _dispatch_unrepresented( + self, entries: list[QueueEntry] + ) -> list[QueueEntry]: + """Diversity pass: dispatch one entry per model with no active/warm backend. + + Processes entries in caller-supplied priority order. Returns entries + that were not dispatched here, preserving their relative order. + """ + dispatched_models: set[str] = set() + remaining: list[QueueEntry] = [] + for entry in entries: + model = entry.model_id + if not model or self._is_warm_for(entry) or model in dispatched_models: + remaining.append(entry) + continue + if entry.future is None or entry.future.done(): + await self._queue.remove(entry) + continue + if await self._try_dispatch(entry): + await self._queue.remove(entry) + dispatched_models.add(model) + else: + remaining.append(entry) + return remaining + async def _try_dispatch(self, entry: QueueEntry) -> bool: """Dispatch to any live backend that can accept this model.""" if entry.model_id: diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 82343c6..852357b 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -245,8 +245,31 @@ class TestScheduler(unittest.IsolatedAsyncioTestCase): self.assertFalse(e_m1.future.done()) self.assertTrue(e_m2.future.done()) - async def test_warm_model_gets_priority(self): - """With bonus>0, a warm-model request is dispatched before a cold one.""" + async def test_warm_bonus_applies_when_only_one_model_in_queue(self): + """With bonus>0 and a single model type in the queue, the warm model fills both free slots.""" + b1 = _make_state("http://b1", models=["m1"]) + scheduler, queue, _, slots, _ = self._make_scheduler( + [b1], model_affinity_sched_bonus=10 + ) + # capacity=3: one slot occupied, two free + slots.set_capacity("http://b1", 3) + async with asyncio.timeout(1.0): + await slots.acquire("http://b1", "m1") + + # Two m1 requests — no competing model, diversity pass is a no-op + e1 = _entry(model_id="m1") + e2 = _entry(model_id="m1") + await queue.enqueue(e1) + await queue.enqueue(e2) + + await scheduler._dispatch_all() + + # Both dispatched: warm model freely fills available slots when nothing else is waiting + self.assertTrue(e1.future.done()) + self.assertTrue(e2.future.done()) + + async def test_warm_model_does_not_block_unrepresented_model(self): + """With bonus>0, an unrepresented cold model gets the diversity slot before a warm one.""" b1 = _make_state("http://b1", models=["m1", "m2"]) scheduler, queue, _, slots, _ = self._make_scheduler( [b1], model_affinity_sched_bonus=10 @@ -254,7 +277,7 @@ class TestScheduler(unittest.IsolatedAsyncioTestCase): # capacity=2: one slot occupied by m1, one free — only one more can be dispatched slots.set_capacity("http://b1", 2) - # m1 is in-flight on b1 (warm); m2 is cold + # m1 is in-flight on b1 (warm); m2 is cold and unrepresented async with asyncio.timeout(1.0): await slots.acquire("http://b1", "m1") @@ -266,7 +289,103 @@ class TestScheduler(unittest.IsolatedAsyncioTestCase): await scheduler._dispatch_all() - # m1 has warm bonus → higher priority → dispatched first despite arriving later + # Diversity pass gives the free slot to m2 (unrepresented) regardless of warm bonus + self.assertTrue(e_m2.future.done()) + self.assertFalse(e_m1.future.done()) + + # ------------------------------------------------------------------ + # Model diversity (unrepresented-first pass) + # ------------------------------------------------------------------ + + async def test_diversity_dispatches_cold_model_over_warm(self): + """With one free slot, the diversity pass gives it to a cold model, not another warm request.""" + b1 = _make_state("http://b1", models=["m1", "m2"]) + scheduler, queue, _, slots, _ = self._make_scheduler( + [b1], model_affinity_sched_bonus=10 + ) + # 2 slots: 1 occupied by m1 (warm), 1 free + slots.set_capacity("http://b1", 2) + async with asyncio.timeout(1.0): + await slots.acquire("http://b1", "m1") + + # Queue m1 (gets warm_bonus=10) then m2 (cold, bonus=0) + e_m1 = _entry(model_id="m1") + e_m2 = _entry(model_id="m2") + await queue.enqueue(e_m1) + await queue.enqueue(e_m2) + + await scheduler._dispatch_all() + + # m2 dispatched by diversity pass despite lower priority; m1 has no free slot left + self.assertTrue(e_m2.future.done(), "cold m2 should win the diversity slot") + self.assertFalse(e_m1.future.done(), "warm m1 has no slot left after diversity pass") + + async def test_diversity_loads_each_model_on_separate_backend(self): + """Two free backends and two queued models → each model lands on its own backend.""" + b1 = _make_state("http://b1", models=["m1", "m2"]) + b2 = _make_state("http://b2", models=["m1", "m2"]) + scheduler, queue, _, slots, _ = self._make_scheduler( + [b1, b2], model_affinity_sched_bonus=10 + ) + slots.set_capacity("http://b1", 1) + slots.set_capacity("http://b2", 1) + + # m1 is warm on b1; m2 is cold; only b2 is free + async with asyncio.timeout(1.0): + await slots.acquire("http://b1", "m1") + + e_m1 = _entry(model_id="m1") + e_m2 = _entry(model_id="m2") + await queue.enqueue(e_m1) + await queue.enqueue(e_m2) + + await scheduler._dispatch_all() + + # Diversity pass dispatches m2 to b2; priority pass dispatches m1 — no free slot remains + self.assertTrue(e_m2.future.done()) + self.assertEqual(e_m2.future.result().url, "http://b2") + self.assertFalse(e_m1.future.done(), "b1 is full, b2 taken by m2") + + async def test_diversity_respects_max_models(self): + """max_models=1 prevents the diversity pass from loading a second model simultaneously.""" + b1 = _make_state("http://b1", models=["m1", "m2"]) + scheduler, queue, _, slots, _ = self._make_scheduler( + [b1], model_affinity_sched_bonus=10 + ) + slots.set_capacity("http://b1", 2) + slots.set_max_models("http://b1", 1) + async with asyncio.timeout(1.0): + await slots.acquire("http://b1", "m1") + + e_m1 = _entry(model_id="m1") + e_m2 = _entry(model_id="m2") + await queue.enqueue(e_m1) + await queue.enqueue(e_m2) + + await scheduler._dispatch_all() + + # Diversity pass tries m2 but can_accept returns False (max_models=1 with m1 active) + # Priority pass dispatches m1 (same model, warm, 1 slot free) + self.assertTrue(e_m1.future.done()) + self.assertFalse(e_m2.future.done()) + + async def test_diversity_skipped_when_bonus_zero(self): + """Pure FIFO mode (bonus=0): no diversity pass, FIFO order holds.""" + b1 = _make_state("http://b1", models=["m1", "m2"]) + scheduler, queue, _, slots, _ = self._make_scheduler([b1]) + slots.set_capacity("http://b1", 2) + async with asyncio.timeout(1.0): + await slots.acquire("http://b1", "m1") + + # m1 arrives first (FIFO should serve it first), m2 is cold + e_m1 = _entry(model_id="m1") + e_m2 = _entry(model_id="m2") + await queue.enqueue(e_m1) + await queue.enqueue(e_m2) + + await scheduler._dispatch_all() + + # FIFO: m1 dispatched first (it arrived first), m2 has no slot left self.assertTrue(e_m1.future.done()) self.assertFalse(e_m2.future.done())