400 lines
17 KiB
Python
400 lines
17 KiB
Python
import asyncio
|
|
import unittest
|
|
|
|
from llamacpp_ha.slot_tracker import SlotTracker
|
|
|
|
|
|
class TestSlotTracker(unittest.IsolatedAsyncioTestCase):
|
|
async def test_acquire_when_free(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
await tracker.acquire("http://b")
|
|
acquired, total = tracker.usage("http://b")
|
|
self.assertEqual(acquired, 1)
|
|
self.assertEqual(total, 2)
|
|
|
|
async def test_has_free_slot(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 1)
|
|
self.assertTrue(tracker.has_free_slot("http://b"))
|
|
await tracker.acquire("http://b")
|
|
self.assertFalse(tracker.has_free_slot("http://b"))
|
|
|
|
async def test_timeout_when_full(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 1)
|
|
await tracker.acquire("http://b")
|
|
with self.assertRaises(TimeoutError):
|
|
async with asyncio.timeout(0.05):
|
|
await tracker.acquire("http://b")
|
|
|
|
async def test_release_unblocks_waiter(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 1)
|
|
await tracker.acquire("http://b")
|
|
|
|
results = []
|
|
|
|
async def waiter():
|
|
async with asyncio.timeout(2.0):
|
|
await tracker.acquire("http://b")
|
|
results.append(True)
|
|
|
|
task = asyncio.create_task(waiter())
|
|
await asyncio.sleep(0.05)
|
|
await tracker.release("http://b")
|
|
await task
|
|
self.assertEqual(results, [True])
|
|
|
|
async def test_release_below_zero(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 1)
|
|
await tracker.release("http://b")
|
|
acquired, _ = tracker.usage("http://b")
|
|
self.assertEqual(acquired, 0)
|
|
|
|
def test_set_capacity_increase(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 1)
|
|
tracker.set_capacity("http://b", 3)
|
|
_, total = tracker.usage("http://b")
|
|
self.assertEqual(total, 3)
|
|
|
|
def test_unknown_url_defaults(self):
|
|
tracker = SlotTracker()
|
|
self.assertTrue(tracker.has_free_slot("http://unknown"))
|
|
acquired, total = tracker.usage("http://unknown")
|
|
self.assertEqual(acquired, 0)
|
|
self.assertEqual(total, 1)
|
|
|
|
async def test_acquire_zero_timeout_succeeds_then_fails(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 1)
|
|
async with asyncio.timeout(0):
|
|
await tracker.acquire("http://b")
|
|
with self.assertRaises(TimeoutError):
|
|
async with asyncio.timeout(0):
|
|
await tracker.acquire("http://b")
|
|
|
|
async def test_release_decrements(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
await tracker.acquire("http://b")
|
|
await tracker.acquire("http://b")
|
|
acquired, _ = tracker.usage("http://b")
|
|
self.assertEqual(acquired, 2)
|
|
await tracker.release("http://b")
|
|
acquired, _ = tracker.usage("http://b")
|
|
self.assertEqual(acquired, 1)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Model-aware tests
|
|
# ------------------------------------------------------------------
|
|
|
|
def test_can_accept_respects_max_models(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 4)
|
|
tracker.set_max_models("http://b", 1)
|
|
self.assertTrue(tracker.can_accept("http://b", "model-a"))
|
|
|
|
async def test_max_models_blocks_second_model(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 4)
|
|
tracker.set_max_models("http://b", 1)
|
|
await tracker.acquire("http://b", "model-a")
|
|
# model-a is still accepted (same model, slot available)
|
|
self.assertTrue(tracker.can_accept("http://b", "model-a"))
|
|
# model-b is blocked (max_models=1 already reached)
|
|
self.assertFalse(tracker.can_accept("http://b", "model-b"))
|
|
|
|
async def test_max_models_unblocks_after_release(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 4)
|
|
tracker.set_max_models("http://b", 1)
|
|
await tracker.acquire("http://b", "model-a")
|
|
self.assertFalse(tracker.can_accept("http://b", "model-b"))
|
|
await tracker.release("http://b", "model-a")
|
|
self.assertTrue(tracker.can_accept("http://b", "model-b"))
|
|
|
|
async def test_active_model_set(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 4)
|
|
self.assertEqual(tracker.active_model_set("http://b"), frozenset())
|
|
await tracker.acquire("http://b", "model-a")
|
|
self.assertEqual(tracker.active_model_set("http://b"), frozenset({"model-a"}))
|
|
await tracker.acquire("http://b", "model-b")
|
|
self.assertEqual(
|
|
tracker.active_model_set("http://b"), frozenset({"model-a", "model-b"})
|
|
)
|
|
await tracker.release("http://b", "model-a")
|
|
self.assertEqual(tracker.active_model_set("http://b"), frozenset({"model-b"}))
|
|
|
|
async def test_acquire_tracks_active_models(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 4)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.acquire("http://b", "model-a")
|
|
acquired, _ = tracker.usage("http://b")
|
|
self.assertEqual(acquired, 2)
|
|
self.assertEqual(tracker.active_model_set("http://b"), frozenset({"model-a"}))
|
|
await tracker.release("http://b", "model-a")
|
|
self.assertEqual(tracker.active_model_set("http://b"), frozenset({"model-a"}))
|
|
await tracker.release("http://b", "model-a")
|
|
self.assertEqual(tracker.active_model_set("http://b"), frozenset())
|
|
|
|
async def test_reset_acquired_clears_state(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.acquire("http://b", "model-a")
|
|
acquired, _ = tracker.usage("http://b")
|
|
self.assertEqual(acquired, 2)
|
|
await tracker.reset_acquired("http://b")
|
|
acquired, _ = tracker.usage("http://b")
|
|
self.assertEqual(acquired, 0)
|
|
self.assertEqual(tracker.active_model_set("http://b"), frozenset())
|
|
|
|
async def test_reset_acquired_unblocks_waiters(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 1)
|
|
tracker.set_max_models("http://b", 1)
|
|
await tracker.acquire("http://b", "model-a")
|
|
|
|
unblocked = []
|
|
|
|
async def waiter():
|
|
async with asyncio.timeout(2.0):
|
|
await tracker.acquire("http://b", "model-b")
|
|
unblocked.append(True)
|
|
|
|
task = asyncio.create_task(waiter())
|
|
await asyncio.sleep(0.05)
|
|
self.assertFalse(unblocked)
|
|
await tracker.reset_acquired("http://b")
|
|
await task
|
|
self.assertEqual(unblocked, [True])
|
|
|
|
async def test_max_models_none_allows_any(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 4)
|
|
tracker.set_max_models("http://b", None)
|
|
await tracker.acquire("http://b", "model-a")
|
|
self.assertTrue(tracker.can_accept("http://b", "model-b"))
|
|
self.assertTrue(tracker.can_accept("http://b", "model-c"))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Global model limit tests
|
|
# ------------------------------------------------------------------
|
|
|
|
def test_global_limit_allows_before_any_acquire(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b1", 4)
|
|
tracker.set_capacity("http://b2", 4)
|
|
tracker.set_global_model_limit("bigmodel", 1)
|
|
self.assertTrue(tracker.can_accept("http://b1", "bigmodel"))
|
|
self.assertTrue(tracker.can_accept("http://b2", "bigmodel"))
|
|
|
|
async def test_global_limit_blocks_all_backends_after_acquire(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b1", 4)
|
|
tracker.set_capacity("http://b2", 4)
|
|
tracker.set_global_model_limit("bigmodel", 1)
|
|
await tracker.acquire("http://b1", "bigmodel")
|
|
self.assertFalse(tracker.can_accept("http://b1", "bigmodel"))
|
|
self.assertFalse(tracker.can_accept("http://b2", "bigmodel"))
|
|
|
|
async def test_global_limit_releases_across_backends(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b1", 4)
|
|
tracker.set_capacity("http://b2", 4)
|
|
tracker.set_global_model_limit("bigmodel", 1)
|
|
await tracker.acquire("http://b1", "bigmodel")
|
|
self.assertFalse(tracker.can_accept("http://b2", "bigmodel"))
|
|
await tracker.release("http://b1", "bigmodel")
|
|
self.assertTrue(tracker.can_accept("http://b2", "bigmodel"))
|
|
|
|
async def test_global_limit_acquire_fails_with_zero_timeout(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b1", 4)
|
|
tracker.set_capacity("http://b2", 4)
|
|
tracker.set_global_model_limit("bigmodel", 1)
|
|
await tracker.acquire("http://b1", "bigmodel")
|
|
with self.assertRaises(TimeoutError):
|
|
async with asyncio.timeout(0):
|
|
await tracker.acquire("http://b2", "bigmodel")
|
|
|
|
async def test_global_limit_does_not_affect_other_models(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b1", 4)
|
|
tracker.set_global_model_limit("bigmodel", 1)
|
|
await tracker.acquire("http://b1", "bigmodel")
|
|
self.assertTrue(tracker.can_accept("http://b1", "othermodel"))
|
|
|
|
async def test_global_limit_allows_up_to_cap(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b1", 4)
|
|
tracker.set_global_model_limit("bigmodel", 2)
|
|
await tracker.acquire("http://b1", "bigmodel")
|
|
self.assertTrue(tracker.can_accept("http://b1", "bigmodel"))
|
|
await tracker.acquire("http://b1", "bigmodel")
|
|
self.assertFalse(tracker.can_accept("http://b1", "bigmodel"))
|
|
|
|
async def test_global_limit_usage(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b1", 4)
|
|
tracker.set_global_model_limit("bigmodel", 2)
|
|
self.assertEqual(tracker.global_model_usage("bigmodel"), (0, 2))
|
|
await tracker.acquire("http://b1", "bigmodel")
|
|
self.assertEqual(tracker.global_model_usage("bigmodel"), (1, 2))
|
|
self.assertIsNone(tracker.global_model_usage("othermodel"))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Warm-hold / model_unload_delay tests
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_sticky_window_blocks_other_model(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
tracker.set_model_unload_delay(60.0)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.release("http://b", "model-a")
|
|
# Window active: model-b should be rejected
|
|
self.assertFalse(tracker.can_accept("http://b", "model-b"))
|
|
|
|
async def test_sticky_window_allows_same_model(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
tracker.set_model_unload_delay(60.0)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.release("http://b", "model-a")
|
|
self.assertTrue(tracker.can_accept("http://b", "model-a"))
|
|
|
|
async def test_sticky_window_expires(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
tracker.set_model_unload_delay(0.05)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.release("http://b", "model-a")
|
|
self.assertFalse(tracker.can_accept("http://b", "model-b"))
|
|
await asyncio.sleep(0.1)
|
|
self.assertTrue(tracker.can_accept("http://b", "model-b"))
|
|
|
|
async def test_sticky_window_not_started_when_delay_zero(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
tracker.set_model_unload_delay(0.0)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.release("http://b", "model-a")
|
|
self.assertTrue(tracker.can_accept("http://b", "model-b"))
|
|
|
|
async def test_sticky_window_not_started_while_slots_remain(self):
|
|
"""Window must not start until ALL slots for the model drain."""
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 4)
|
|
tracker.set_model_unload_delay(60.0)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.release("http://b", "model-a") # one slot still held
|
|
self.assertTrue(tracker.can_accept("http://b", "model-b"))
|
|
|
|
async def test_reset_acquired_clears_sticky_state(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
tracker.set_model_unload_delay(60.0)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.release("http://b", "model-a")
|
|
self.assertFalse(tracker.can_accept("http://b", "model-b"))
|
|
await tracker.reset_acquired("http://b")
|
|
self.assertTrue(tracker.can_accept("http://b", "model-b"))
|
|
|
|
async def test_reset_acquired_updates_global_counts(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b1", 4)
|
|
tracker.set_capacity("http://b2", 4)
|
|
tracker.set_global_model_limit("bigmodel", 1)
|
|
await tracker.acquire("http://b1", "bigmodel")
|
|
self.assertFalse(tracker.can_accept("http://b2", "bigmodel"))
|
|
await tracker.reset_acquired("http://b1")
|
|
self.assertTrue(tracker.can_accept("http://b2", "bigmodel"))
|
|
self.assertEqual(tracker.global_model_usage("bigmodel"), (0, 1))
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# waive_sticky_if_idle tests
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_waive_sticky_clears_window_for_unrepresented_model(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
tracker.set_model_unload_delay(60.0)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.release("http://b", "model-a")
|
|
self.assertFalse(tracker.can_accept("http://b", "model-b"))
|
|
|
|
waived = tracker.waive_sticky_if_idle("http://b", "model-b")
|
|
self.assertTrue(waived)
|
|
self.assertTrue(tracker.can_accept("http://b", "model-b"))
|
|
|
|
async def test_waive_sticky_noop_when_no_free_slot(self):
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 1)
|
|
tracker.set_model_unload_delay(60.0)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.release("http://b", "model-a")
|
|
# Slot is now free but let's fill it first
|
|
await tracker.acquire("http://b", "model-a")
|
|
# sticky is cleared when re-acquired; set up sticky again manually is tricky,
|
|
# so use a fresh tracker with the slot held before release
|
|
tracker2 = SlotTracker()
|
|
tracker2.set_capacity("http://b", 1)
|
|
tracker2.set_model_unload_delay(60.0)
|
|
await tracker2.acquire("http://b", "model-a")
|
|
await tracker2.release("http://b", "model-a")
|
|
# Now exhaust the slot
|
|
await tracker2.acquire("http://b", "model-a")
|
|
waived = tracker2.waive_sticky_if_idle("http://b", "model-b")
|
|
self.assertFalse(waived)
|
|
|
|
async def test_waive_sticky_noop_for_same_model(self):
|
|
"""waive_sticky_if_idle must not clear the window for the sticky model itself."""
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
tracker.set_model_unload_delay(60.0)
|
|
await tracker.acquire("http://b", "model-a")
|
|
await tracker.release("http://b", "model-a")
|
|
waived = tracker.waive_sticky_if_idle("http://b", "model-a")
|
|
self.assertFalse(waived)
|
|
self.assertTrue(tracker.can_accept("http://b", "model-a"))
|
|
|
|
def test_waive_sticky_noop_when_max_models_also_blocks(self):
|
|
"""Do not waive if max_models would still block the requesting model.
|
|
|
|
The sticky window is only set when active_models empties, so this scenario
|
|
can only be created by direct state manipulation (not through normal acquire/release).
|
|
The guard is still present in waive_sticky_if_idle for defensive correctness.
|
|
"""
|
|
import time as _time
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
tracker.set_max_models("http://b", 1)
|
|
state = tracker._ensure("http://b")
|
|
# Manually inject: sticky=model-a, active_models={model-c: 1}
|
|
state.sticky_model = "model-a"
|
|
state.sticky_until = _time.monotonic() + 60.0
|
|
state.active_models["model-c"] = 1
|
|
state.acquired = 1
|
|
|
|
waived = tracker.waive_sticky_if_idle("http://b", "model-b")
|
|
self.assertFalse(waived)
|
|
|
|
def test_waive_sticky_noop_when_no_active_window(self):
|
|
"""Returns False and has no effect when there is no sticky window."""
|
|
tracker = SlotTracker()
|
|
tracker.set_capacity("http://b", 2)
|
|
waived = tracker.waive_sticky_if_idle("http://b", "model-b")
|
|
self.assertFalse(waived)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|