477 lines
18 KiB
Python
477 lines
18 KiB
Python
import asyncio
|
|
import unittest
|
|
|
|
from llamacpp_ha.config import BackendConfig, ProxyConfig
|
|
from llamacpp_ha.policies import RoundRobinPolicy
|
|
from llamacpp_ha.queue import QueueEntry, RequestQueue
|
|
from llamacpp_ha.registry import BackendRegistry, BackendState
|
|
from llamacpp_ha.scheduler import Scheduler
|
|
from llamacpp_ha.session_store import SessionStore
|
|
from llamacpp_ha.slot_tracker import SlotTracker
|
|
|
|
|
|
def _make_state(url: str, models: list[str] | None = None) -> BackendState:
|
|
cfg = BackendConfig(url=url)
|
|
return BackendState(config=cfg, live=True, models=models or ["m1"])
|
|
|
|
|
|
def _entry(**kwargs) -> QueueEntry:
|
|
e = QueueEntry(**kwargs)
|
|
e.future = asyncio.get_running_loop().create_future()
|
|
return e
|
|
|
|
|
|
class TestScheduler(unittest.IsolatedAsyncioTestCase):
|
|
def _make_scheduler(
|
|
self,
|
|
live_backends=None,
|
|
model_affinity_sched_bonus: int = 0,
|
|
queue_aging_equalization: float = 30.0,
|
|
):
|
|
cfg = ProxyConfig(backends=[BackendConfig(url=b.url) for b in (live_backends or [])])
|
|
registry = BackendRegistry(cfg)
|
|
for state in (live_backends or []):
|
|
registry._states[state.url] = state
|
|
registry._rebuild_index()
|
|
|
|
slot_tracker = SlotTracker()
|
|
for state in (live_backends or []):
|
|
slot_tracker.set_capacity(state.url, 2)
|
|
|
|
session_store = SessionStore()
|
|
queue = RequestQueue()
|
|
scheduler = Scheduler(
|
|
queue=queue,
|
|
registry=registry,
|
|
slot_tracker=slot_tracker,
|
|
session_store=session_store,
|
|
policy=RoundRobinPolicy(),
|
|
model_affinity_sched_bonus=model_affinity_sched_bonus,
|
|
queue_aging_equalization=queue_aging_equalization,
|
|
)
|
|
return scheduler, queue, registry, slot_tracker, session_store
|
|
|
|
async def test_dispatches_entry_to_live_backend(self):
|
|
b1 = _make_state("http://b1")
|
|
scheduler, queue, *_ = self._make_scheduler([b1])
|
|
|
|
entry = _entry(model_id="m1")
|
|
await queue.enqueue(entry)
|
|
await scheduler._dispatch_all()
|
|
|
|
self.assertTrue(entry.future.done())
|
|
self.assertEqual(entry.future.result().url, "http://b1")
|
|
|
|
async def test_skips_full_backends(self):
|
|
b1 = _make_state("http://b1")
|
|
scheduler, queue, _, slots, _ = self._make_scheduler([b1])
|
|
slots.set_capacity("http://b1", 1)
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1")
|
|
|
|
entry = _entry(model_id="m1")
|
|
await queue.enqueue(entry)
|
|
await scheduler._dispatch_all()
|
|
|
|
self.assertFalse(entry.future.done())
|
|
|
|
async def test_session_affinity_preferred(self):
|
|
b1 = _make_state("http://b1")
|
|
b2 = _make_state("http://b2")
|
|
scheduler, queue, _, _, sessions = self._make_scheduler([b1, b2])
|
|
|
|
await sessions.get_or_create("sess1")
|
|
await sessions.update("sess1", preferred_backend="http://b2")
|
|
|
|
entry = _entry(model_id="m1", session_id="sess1")
|
|
await queue.enqueue(entry)
|
|
await scheduler._dispatch_all()
|
|
|
|
self.assertTrue(entry.future.done())
|
|
self.assertEqual(entry.future.result().url, "http://b2")
|
|
|
|
async def test_session_affinity_fallback_when_full(self):
|
|
b1 = _make_state("http://b1")
|
|
b2 = _make_state("http://b2")
|
|
scheduler, queue, _, slots, sessions = self._make_scheduler([b1, b2])
|
|
slots.set_capacity("http://b2", 1)
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b2")
|
|
|
|
await sessions.get_or_create("sess1")
|
|
await sessions.update("sess1", preferred_backend="http://b2")
|
|
|
|
entry = _entry(model_id="m1", session_id="sess1")
|
|
await queue.enqueue(entry)
|
|
await scheduler._dispatch_all()
|
|
|
|
self.assertTrue(entry.future.done())
|
|
self.assertEqual(entry.future.result().url, "http://b1")
|
|
|
|
async def test_no_live_backends_entry_stays(self):
|
|
scheduler, queue, *_ = self._make_scheduler([])
|
|
|
|
entry = _entry(model_id="m1")
|
|
await queue.enqueue(entry)
|
|
await scheduler._dispatch_all()
|
|
|
|
self.assertFalse(entry.future.done())
|
|
self.assertEqual(await queue.depth(), 1)
|
|
|
|
def test_notify_slot_released_wakes_queue(self):
|
|
b1 = _make_state("http://b1")
|
|
scheduler, queue, *_ = self._make_scheduler([b1])
|
|
scheduler.notify_slot_released()
|
|
self.assertTrue(queue.wakeup_event.is_set())
|
|
|
|
async def test_cancelled_future_cleaned_up(self):
|
|
b1 = _make_state("http://b1")
|
|
scheduler, queue, *_ = self._make_scheduler([b1])
|
|
|
|
entry = _entry(model_id="m1")
|
|
await queue.enqueue(entry)
|
|
entry.future.cancel()
|
|
|
|
await scheduler._dispatch_all()
|
|
|
|
self.assertEqual(await queue.depth(), 0)
|
|
|
|
async def test_round_robin_across_backends(self):
|
|
b1 = _make_state("http://b1")
|
|
b2 = _make_state("http://b2")
|
|
scheduler, queue, *_ = self._make_scheduler([b1, b2])
|
|
|
|
results = []
|
|
for _ in range(4):
|
|
e = _entry(model_id="m1")
|
|
await queue.enqueue(e)
|
|
await scheduler._dispatch_all()
|
|
results.append(e.future.result().url)
|
|
|
|
self.assertEqual(results.count("http://b1"), 2)
|
|
self.assertEqual(results.count("http://b2"), 2)
|
|
|
|
async def test_slot_released_then_dispatch(self):
|
|
b1 = _make_state("http://b1")
|
|
scheduler, queue, _, slots, _ = self._make_scheduler([b1])
|
|
slots.set_capacity("http://b1", 1)
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1")
|
|
|
|
entry = _entry(model_id="m1")
|
|
await queue.enqueue(entry)
|
|
await scheduler._dispatch_all()
|
|
self.assertFalse(entry.future.done())
|
|
|
|
await slots.release("http://b1")
|
|
scheduler.notify_slot_released()
|
|
await scheduler._dispatch_all()
|
|
self.assertTrue(entry.future.done())
|
|
|
|
# ------------------------------------------------------------------
|
|
# max_models / preemption prevention
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_max_models_blocks_second_model_on_same_backend(self):
|
|
b1 = _make_state("http://b1")
|
|
_, queue, _, slots, _ = self._make_scheduler([b1])
|
|
slots.set_capacity("http://b1", 4)
|
|
slots.set_max_models("http://b1", 1)
|
|
|
|
# Occupy a slot with model-a
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
|
|
# Request for model-b should stay queued
|
|
cfg = ProxyConfig(backends=[BackendConfig(url="http://b1", model_ids=["m1", "m2"])])
|
|
registry = BackendRegistry(cfg)
|
|
registry._states["http://b1"] = BackendState(
|
|
config=BackendConfig(url="http://b1", model_ids=["m1", "m2"]),
|
|
live=True,
|
|
models=["m1", "m2"],
|
|
)
|
|
registry._rebuild_index()
|
|
|
|
sched2 = Scheduler(
|
|
queue=queue,
|
|
registry=registry,
|
|
slot_tracker=slots,
|
|
session_store=SessionStore(),
|
|
policy=RoundRobinPolicy(),
|
|
)
|
|
entry = _entry(model_id="m2")
|
|
await queue.enqueue(entry)
|
|
await sched2._dispatch_all()
|
|
self.assertFalse(entry.future.done(), "model-b should be blocked by max_models=1")
|
|
|
|
async def test_max_models_allows_same_model(self):
|
|
b1 = _make_state("http://b1")
|
|
scheduler, queue, _, slots, _ = self._make_scheduler([b1])
|
|
slots.set_capacity("http://b1", 4)
|
|
slots.set_max_models("http://b1", 1)
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
|
|
entry = _entry(model_id="m1")
|
|
await queue.enqueue(entry)
|
|
await scheduler._dispatch_all()
|
|
self.assertTrue(entry.future.done(), "same model should still be dispatchable")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Priority scheduling
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_pure_fifo_when_bonus_zero(self):
|
|
"""Default (bonus=0): blocked head-of-queue does not prevent later entries."""
|
|
b1 = _make_state("http://b1", models=["m1"])
|
|
b2 = _make_state("http://b2", models=["m2"])
|
|
scheduler, queue, _, slots, _ = self._make_scheduler([b1, b2])
|
|
slots.set_capacity("http://b1", 1)
|
|
slots.set_capacity("http://b2", 1)
|
|
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b2", "m2")
|
|
|
|
e_m1 = _entry(model_id="m1")
|
|
e_m2 = _entry(model_id="m2")
|
|
await queue.enqueue(e_m1)
|
|
await queue.enqueue(e_m2)
|
|
|
|
await slots.release("http://b2", "m2")
|
|
await scheduler._dispatch_all()
|
|
|
|
self.assertFalse(e_m1.future.done())
|
|
self.assertTrue(e_m2.future.done())
|
|
|
|
async def test_warm_bonus_applies_when_only_one_model_in_queue(self):
|
|
"""With bonus>0 and a single model type in the queue, the warm model fills both free slots."""
|
|
b1 = _make_state("http://b1", models=["m1"])
|
|
scheduler, queue, _, slots, _ = self._make_scheduler(
|
|
[b1], model_affinity_sched_bonus=10
|
|
)
|
|
# capacity=3: one slot occupied, two free
|
|
slots.set_capacity("http://b1", 3)
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
|
|
# Two m1 requests — no competing model, diversity pass is a no-op
|
|
e1 = _entry(model_id="m1")
|
|
e2 = _entry(model_id="m1")
|
|
await queue.enqueue(e1)
|
|
await queue.enqueue(e2)
|
|
|
|
await scheduler._dispatch_all()
|
|
|
|
# Both dispatched: warm model freely fills available slots when nothing else is waiting
|
|
self.assertTrue(e1.future.done())
|
|
self.assertTrue(e2.future.done())
|
|
|
|
async def test_warm_model_does_not_block_unrepresented_model(self):
|
|
"""With bonus>0, an unrepresented cold model gets the diversity slot before a warm one."""
|
|
b1 = _make_state("http://b1", models=["m1", "m2"])
|
|
scheduler, queue, _, slots, _ = self._make_scheduler(
|
|
[b1], model_affinity_sched_bonus=10
|
|
)
|
|
# capacity=2: one slot occupied by m1, one free — only one more can be dispatched
|
|
slots.set_capacity("http://b1", 2)
|
|
|
|
# m1 is in-flight on b1 (warm); m2 is cold and unrepresented
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
|
|
# Queue: cold m2 request arrives first, warm m1 request arrives second
|
|
e_m2 = _entry(model_id="m2")
|
|
e_m1 = _entry(model_id="m1")
|
|
await queue.enqueue(e_m2)
|
|
await queue.enqueue(e_m1)
|
|
|
|
await scheduler._dispatch_all()
|
|
|
|
# Diversity pass gives the free slot to m2 (unrepresented) regardless of warm bonus
|
|
self.assertTrue(e_m2.future.done())
|
|
self.assertFalse(e_m1.future.done())
|
|
|
|
# ------------------------------------------------------------------
|
|
# Model diversity (unrepresented-first pass)
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_diversity_dispatches_cold_model_over_warm(self):
|
|
"""With one free slot, the diversity pass gives it to a cold model, not another warm request."""
|
|
b1 = _make_state("http://b1", models=["m1", "m2"])
|
|
scheduler, queue, _, slots, _ = self._make_scheduler(
|
|
[b1], model_affinity_sched_bonus=10
|
|
)
|
|
# 2 slots: 1 occupied by m1 (warm), 1 free
|
|
slots.set_capacity("http://b1", 2)
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
|
|
# Queue m1 (gets warm_bonus=10) then m2 (cold, bonus=0)
|
|
e_m1 = _entry(model_id="m1")
|
|
e_m2 = _entry(model_id="m2")
|
|
await queue.enqueue(e_m1)
|
|
await queue.enqueue(e_m2)
|
|
|
|
await scheduler._dispatch_all()
|
|
|
|
# m2 dispatched by diversity pass despite lower priority; m1 has no free slot left
|
|
self.assertTrue(e_m2.future.done(), "cold m2 should win the diversity slot")
|
|
self.assertFalse(e_m1.future.done(), "warm m1 has no slot left after diversity pass")
|
|
|
|
async def test_diversity_loads_each_model_on_separate_backend(self):
|
|
"""Two free backends and two queued models → each model lands on its own backend."""
|
|
b1 = _make_state("http://b1", models=["m1", "m2"])
|
|
b2 = _make_state("http://b2", models=["m1", "m2"])
|
|
scheduler, queue, _, slots, _ = self._make_scheduler(
|
|
[b1, b2], model_affinity_sched_bonus=10
|
|
)
|
|
slots.set_capacity("http://b1", 1)
|
|
slots.set_capacity("http://b2", 1)
|
|
|
|
# m1 is warm on b1; m2 is cold; only b2 is free
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
|
|
e_m1 = _entry(model_id="m1")
|
|
e_m2 = _entry(model_id="m2")
|
|
await queue.enqueue(e_m1)
|
|
await queue.enqueue(e_m2)
|
|
|
|
await scheduler._dispatch_all()
|
|
|
|
# Diversity pass dispatches m2 to b2; priority pass dispatches m1 — no free slot remains
|
|
self.assertTrue(e_m2.future.done())
|
|
self.assertEqual(e_m2.future.result().url, "http://b2")
|
|
self.assertFalse(e_m1.future.done(), "b1 is full, b2 taken by m2")
|
|
|
|
async def test_diversity_respects_max_models(self):
|
|
"""max_models=1 prevents the diversity pass from loading a second model simultaneously."""
|
|
b1 = _make_state("http://b1", models=["m1", "m2"])
|
|
scheduler, queue, _, slots, _ = self._make_scheduler(
|
|
[b1], model_affinity_sched_bonus=10
|
|
)
|
|
slots.set_capacity("http://b1", 2)
|
|
slots.set_max_models("http://b1", 1)
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
|
|
e_m1 = _entry(model_id="m1")
|
|
e_m2 = _entry(model_id="m2")
|
|
await queue.enqueue(e_m1)
|
|
await queue.enqueue(e_m2)
|
|
|
|
await scheduler._dispatch_all()
|
|
|
|
# Diversity pass tries m2 but can_accept returns False (max_models=1 with m1 active)
|
|
# Priority pass dispatches m1 (same model, warm, 1 slot free)
|
|
self.assertTrue(e_m1.future.done())
|
|
self.assertFalse(e_m2.future.done())
|
|
|
|
async def test_diversity_skipped_when_bonus_zero(self):
|
|
"""Pure FIFO mode (bonus=0): no diversity pass, FIFO order holds."""
|
|
b1 = _make_state("http://b1", models=["m1", "m2"])
|
|
scheduler, queue, _, slots, _ = self._make_scheduler([b1])
|
|
slots.set_capacity("http://b1", 2)
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
|
|
# m1 arrives first (FIFO should serve it first), m2 is cold
|
|
e_m1 = _entry(model_id="m1")
|
|
e_m2 = _entry(model_id="m2")
|
|
await queue.enqueue(e_m1)
|
|
await queue.enqueue(e_m2)
|
|
|
|
await scheduler._dispatch_all()
|
|
|
|
# FIFO: m1 dispatched first (it arrived first), m2 has no slot left
|
|
self.assertTrue(e_m1.future.done())
|
|
self.assertFalse(e_m2.future.done())
|
|
|
|
async def test_loop_retries_after_sticky_window_expires(self):
|
|
"""Scheduler loop dispatches a blocked entry once the sticky window expires."""
|
|
b1 = _make_state("http://b1", models=["m1", "m2"])
|
|
scheduler, queue, _, slots, _ = self._make_scheduler([b1])
|
|
slots.set_capacity("http://b1", 2)
|
|
slots.set_model_unload_delay(0.05)
|
|
|
|
# Acquire and release m1 — starts a 0.05 s sticky window that blocks m2.
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
await slots.release("http://b1", "m1")
|
|
|
|
entry = _entry(model_id="m2")
|
|
await queue.enqueue(entry)
|
|
await scheduler._dispatch_all()
|
|
self.assertFalse(entry.future.done(), "m2 should be blocked by sticky window")
|
|
|
|
# Run the loop with a short interval so it retries before the test times out.
|
|
scheduler._wakeup_interval = 0.1
|
|
scheduler.start()
|
|
try:
|
|
async with asyncio.timeout(1.0):
|
|
result = await entry.future
|
|
finally:
|
|
await scheduler.stop()
|
|
|
|
self.assertEqual(result.url, "http://b1")
|
|
|
|
async def test_diversity_waives_sticky_for_unrepresented_model(self):
|
|
"""Diversity pass clears a sticky window blocking an unrepresented model."""
|
|
b1 = _make_state("http://b1", models=["m1", "m2"])
|
|
scheduler, queue, _, slots, _ = self._make_scheduler(
|
|
[b1], model_affinity_sched_bonus=10
|
|
)
|
|
slots.set_capacity("http://b1", 2)
|
|
slots.set_model_unload_delay(60.0)
|
|
|
|
# m1 was active and released — sticky window now blocks m2
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
await slots.release("http://b1", "m1")
|
|
self.assertFalse(slots.can_accept("http://b1", "m2"))
|
|
|
|
e_m1 = _entry(model_id="m1")
|
|
e_m2 = _entry(model_id="m2")
|
|
await queue.enqueue(e_m1)
|
|
await queue.enqueue(e_m2)
|
|
|
|
await scheduler._dispatch_all()
|
|
|
|
# Diversity pass waives sticky for m2 and dispatches it
|
|
self.assertTrue(e_m2.future.done())
|
|
self.assertEqual(e_m2.future.result().url, "http://b1")
|
|
|
|
async def test_aging_overtakes_warm_bonus(self):
|
|
"""After equalization time, an aged cold request outranks the warm bonus."""
|
|
b1 = _make_state("http://b1", models=["m1", "m2"])
|
|
# equalization=0.1s so aging is fast enough to test synchronously
|
|
scheduler, queue, _, slots, _ = self._make_scheduler(
|
|
[b1], model_affinity_sched_bonus=10, queue_aging_equalization=0.1
|
|
)
|
|
# capacity=2: one slot occupied by m1, one free — only one more can be dispatched
|
|
slots.set_capacity("http://b1", 2)
|
|
|
|
async with asyncio.timeout(1.0):
|
|
await slots.acquire("http://b1", "m1")
|
|
|
|
# m2 request arrives and waits long enough to exceed the bonus
|
|
e_m2 = _entry(model_id="m2")
|
|
await queue.enqueue(e_m2)
|
|
await asyncio.sleep(0.15) # age_score > bonus after equalization
|
|
|
|
# m1 warm request arrives after m2 has already aged past equalization
|
|
e_m1 = _entry(model_id="m1")
|
|
await queue.enqueue(e_m1)
|
|
|
|
await scheduler._dispatch_all()
|
|
|
|
# m2's age_score now exceeds the warm bonus → m2 dispatched first
|
|
self.assertTrue(e_m2.future.done())
|
|
self.assertFalse(e_m1.future.done())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|