first commit

This commit is contained in:
2026-04-13 23:42:02 +02:00
parent ba7e8ba122
commit 5763f35220
22 changed files with 1194 additions and 0 deletions

8
tests/conftest.py Normal file
View File

@@ -0,0 +1,8 @@
from __future__ import annotations
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))

112
tests/fake_mcp_server.py Normal file
View File

@@ -0,0 +1,112 @@
from __future__ import annotations
import json
import sys
TOOLS = [
{
"name": "get_file_contents",
"title": "Get file contents",
"description": "Read file contents at a given ref.",
"inputSchema": {
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"},
"ref": {"type": "string"},
"filePath": {"type": "string"},
"withLines": {"type": "boolean"},
},
"required": ["owner", "repo", "ref", "filePath"],
"additionalProperties": False,
},
},
{
"name": "delete_file",
"title": "Delete file",
"description": "Dangerous write tool.",
"inputSchema": {
"type": "object",
"properties": {"filePath": {"type": "string"}},
"required": ["filePath"],
},
},
]
def send(message: dict) -> None:
sys.stdout.write(json.dumps(message, separators=(",", ":")) + "\n")
sys.stdout.flush()
for line in sys.stdin:
raw = line.strip()
if not raw:
continue
msg = json.loads(raw)
method = msg.get("method")
msg_id = msg.get("id")
if method == "initialize":
send(
{
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"protocolVersion": msg["params"]["protocolVersion"],
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": "fake-mcp", "version": "1.0"},
},
}
)
continue
if method == "notifications/initialized":
continue
if method == "tools/list":
cursor = (msg.get("params") or {}).get("cursor")
if not cursor:
send(
{
"jsonrpc": "2.0",
"id": msg_id,
"result": {"tools": [TOOLS[0]], "nextCursor": "page-2"},
}
)
else:
send(
{
"jsonrpc": "2.0",
"id": msg_id,
"result": {"tools": [TOOLS[1]]},
}
)
continue
if method == "tools/call":
params = msg.get("params") or {}
name = params["name"]
arguments = params.get("arguments") or {}
if name == "get_file_contents":
payload = {
"content": [
{
"type": "text",
"text": "README line\n" * 3000,
}
],
"structuredContent": {
"owner": arguments.get("owner"),
"repo": arguments.get("repo"),
"ref": arguments.get("ref"),
"filePath": arguments.get("filePath"),
},
"isError": False,
}
send({"jsonrpc": "2.0", "id": msg_id, "result": payload})
continue
send({"jsonrpc": "2.0", "id": msg_id, "error": {"code": -32601, "message": f"Unknown tool: {name}"}})
continue
send({"jsonrpc": "2.0", "id": msg_id, "error": {"code": -32601, "message": f"Unknown method: {method}"}})

View File

@@ -0,0 +1,132 @@
from __future__ import annotations
import json
import shlex
import sys
from pathlib import Path
from fastapi.testclient import TestClient
from pyMCPBroker.app import create_app
from pyMCPBroker.broker import Broker
from pyMCPBroker.config import load_config
def make_config(tmp_path: Path) -> Path:
server = Path(__file__).with_name("fake_mcp_server.py")
command = f"{shlex.quote(sys.executable)} {shlex.quote(str(server))}"
config = {
"backends": {
"gitea": {
"backend": "stdio",
"command": command,
"tool_filter": ["get_*", "!delete_*"],
"tool_overrides": {
"get_file_contents": {
"summary": "Read one file from a repository",
"max_output_chars": 1200,
"example_args": {
"owner": "myorg",
"repo": "demo-repo",
"ref": "main",
"filePath": "README.md",
},
}
},
}
},
"tree": {
"path": "/",
"type": "node",
"summary": "Root",
"children": [
{
"path": "/repo",
"type": "node",
"summary": "Repository operations",
"children": [
{
"path": "/repo/read",
"type": "node",
"summary": "Read repository data",
"children": [
{
"path": "/repo/read/get_file",
"type": "tool",
"summary": "Read one file",
"backend_ref": "gitea",
"tool_name": "get_file_contents",
}
],
}
],
}
],
},
}
path = tmp_path / "config.json"
path.write_text(json.dumps(config), encoding="utf-8")
return path
def test_meta_end_to_end(tmp_path: Path) -> None:
cfg = load_config(make_config(tmp_path))
broker = Broker(cfg)
app = create_app(broker)
with TestClient(app) as client:
r = client.post("/meta_tree", json={"path": "/"})
assert r.status_code == 200
assert r.json()["children"][0]["path"] == "/repo"
r = client.post("/meta_desc", json={"path": "/repo/read/get_file"})
body = r.json()
assert body["summary"] == "Read one file from a repository"
assert body["args_schema"]["required"] == ["owner", "repo", "ref", "filePath"]
assert body["example_args"]["repo"] == "demo-repo"
r = client.post(
"/meta_call",
json={
"path": "/repo/read/get_file",
"args": {"owner": "acme", "repo": "demo", "ref": "main", "filePath": "README.md"},
},
)
body = r.json()
assert body["ok"] is True
assert body["result"]["truncated"] is True
assert body["result"]["result"]["structuredContent"]["repo"] == "demo"
def test_meta_call_invalid_args(tmp_path: Path) -> None:
cfg = load_config(make_config(tmp_path))
broker = Broker(cfg)
app = create_app(broker)
with TestClient(app) as client:
r = client.post("/meta_call", json={"path": "/repo/read/get_file", "args": {"owner": "acme"}})
body = r.json()
assert r.status_code == 400
assert body["error_code"] == "invalid_arguments"
assert "required property" in body["message"]
def test_meta_tree_rejects_tool_path(tmp_path: Path) -> None:
cfg = load_config(make_config(tmp_path))
broker = Broker(cfg)
app = create_app(broker)
with TestClient(app) as client:
r = client.post("/meta_tree", json={"path": "/repo/read/get_file"})
assert r.status_code == 400
assert r.json()["error_code"] == "not_a_node"
def test_secret_auth(tmp_path: Path) -> None:
cfg = load_config(make_config(tmp_path))
broker = Broker(cfg)
app = create_app(broker, shared_secret="sekret")
with TestClient(app) as client:
assert client.post("/meta_tree", json={"path": "/"}).status_code == 401
assert client.post("/meta_tree", json={"path": "/"}, headers={"Authorization": "Bearer sekret"}).status_code == 200

23
tests/test_config.py Normal file
View File

@@ -0,0 +1,23 @@
from __future__ import annotations
import json
from pathlib import Path
import pytest
from pyMCPBroker.config import load_config
def test_env_substitution(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("BROKER_CMD", "python fake.py")
path = tmp_path / "config.json"
path.write_text(json.dumps({"backends": {"x": {"backend": "stdio", "command": "${BROKER_CMD}"}}, "tree": {"path": "/", "type": "node", "children": []}}), encoding="utf-8")
cfg = load_config(path)
assert cfg.backends["x"].command == "python fake.py"
def test_env_missing_raises(tmp_path: Path) -> None:
path = tmp_path / "config.json"
path.write_text(json.dumps({"backends": {"x": {"backend": "stdio", "command": "${MISSING_VAR}"}}, "tree": {"path": "/", "type": "node", "children": []}}), encoding="utf-8")
with pytest.raises(ValueError):
load_config(path)

10
tests/test_filters.py Normal file
View File

@@ -0,0 +1,10 @@
from pyMCPBroker.filters import filter_names, is_allowed
def test_filter_allow_then_deny() -> None:
patterns = ["get_*", "list_*", "!get_secret*"]
assert is_allowed("get_file", patterns)
assert is_allowed("list_repos", patterns)
assert not is_allowed("get_secret_token", patterns)
assert not is_allowed("delete_file", patterns)
assert filter_names(["get_file", "get_secret_token", "delete_file"], patterns) == ["get_file"]