diff --git a/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py b/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py index dca5058b01dcd..ed457b8a57d9d 100644 --- a/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py +++ b/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py @@ -39,7 +39,9 @@ import asyncio import json +import os from typing import Any +from urllib.parse import urlparse import websockets from mcp.server.fastmcp import FastMCP @@ -51,9 +53,10 @@ class WorkflowConnection: """Holds WebSocket connection and buffered state for one DPL workflow.""" - def __init__(self, port: int, name: str): - self.port = port + def __init__(self, *, url: str, name: str, extra_headers: dict[str, str] | None = None): + self.url = url self.name = name + self.extra_headers = extra_headers or {} self.ws: Any = None self.reader_task: asyncio.Task | None = None self.snapshot: dict = {} @@ -83,8 +86,11 @@ async def ensure_connected(self) -> None: except Exception: pass - url = f"ws://localhost:{self.port}/status" - self.ws = await websockets.connect(url, subprotocols=["dpl"]) + self.ws = await websockets.connect( + self.url, + subprotocols=["dpl"], + additional_headers=self.extra_headers if self.extra_headers else None, + ) if self.reader_task is None or self.reader_task.done(): self.reader_task = asyncio.create_task(self._reader()) @@ -178,7 +184,8 @@ async def connect(port: int = 0, pid: int = 0, name: str = "") -> str: old = _workflows[wf_name] await old.close() - conn = WorkflowConnection(port, wf_name) + url = f"ws://localhost:{port}/status" + conn = WorkflowConnection(url=url, name=wf_name) await conn.ensure_connected() _workflows[wf_name] = conn @@ -189,6 +196,48 @@ async def connect(port: int = 0, pid: int = 0, name: str = "") -> str: ) +@mcp.tool() +async def connect_hyperloop(url: str, name: str = "", token: str = "") -> str: + """Connect to a DPL workflow running on Hyperloop via the remote proxy. + + Accepts a URL like: + https://alimonitor.cern.ch/train-workdir/remote-gui/remote_proxy.html?/ + + and remaps it to the local WebSocket proxy endpoint. + + Args: + url: The remote_proxy.html URL from alimonitor. + name: Optional human-friendly name for this workflow. + token: Hyperloop auth token. Falls back to HYPERLOOP_TOKEN env var. + """ + token = token or os.environ.get("HYPERLOOP_TOKEN", "") + if not token: + return "No token provided and HYPERLOOP_TOKEN environment variable is not set." + + parsed = urlparse(url) + path_suffix = parsed.query # everything after '?' + if not path_suffix: + return f"Cannot parse token/port from URL: {url}" + + ws_url = f"ws://localhost:8888/remote-mcp/o2/{path_suffix}/status" + wf_name = name or path_suffix.split("/")[-1] + + if wf_name in _workflows: + old = _workflows[wf_name] + await old.close() + + headers = {"Authorization": f"Bearer {token}"} + conn = WorkflowConnection(url=ws_url, name=wf_name, extra_headers=headers) + await conn.ensure_connected() + _workflows[wf_name] = conn + + devices = conn.snapshot.get("devices", []) + return ( + f"Connected to Hyperloop workflow '{wf_name}' via {ws_url} " + f"({len(devices)} device(s))." + ) + + @mcp.tool() async def disconnect(workflow: str) -> str: """Disconnect from a DPL workflow and release its resources.