Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@

import asyncio
import json
import os
from typing import Any
from urllib.parse import urlparse

import websockets
from mcp.server.fastmcp import FastMCP
Expand All @@ -51,9 +53,10 @@
class WorkflowConnection:
"""Holds WebSocket connection and buffered state for one DPL workflow."""

def __init__(self, port: int, name: str):
self.port = port
def __init__(self, *, url: str, name: str, extra_headers: dict[str, str] | None = None):
self.url = url
self.name = name
self.extra_headers = extra_headers or {}
self.ws: Any = None
self.reader_task: asyncio.Task | None = None
self.snapshot: dict = {}
Expand Down Expand Up @@ -83,8 +86,11 @@ async def ensure_connected(self) -> None:
except Exception:
pass

url = f"ws://localhost:{self.port}/status"
self.ws = await websockets.connect(url, subprotocols=["dpl"])
self.ws = await websockets.connect(
self.url,
subprotocols=["dpl"],
additional_headers=self.extra_headers if self.extra_headers else None,
)
if self.reader_task is None or self.reader_task.done():
self.reader_task = asyncio.create_task(self._reader())

Expand Down Expand Up @@ -178,7 +184,8 @@ async def connect(port: int = 0, pid: int = 0, name: str = "") -> str:
old = _workflows[wf_name]
await old.close()

conn = WorkflowConnection(port, wf_name)
url = f"ws://localhost:{port}/status"
conn = WorkflowConnection(url=url, name=wf_name)
await conn.ensure_connected()
_workflows[wf_name] = conn

Expand All @@ -189,6 +196,48 @@ async def connect(port: int = 0, pid: int = 0, name: str = "") -> str:
)


@mcp.tool()
async def connect_hyperloop(url: str, name: str = "", token: str = "") -> str:
"""Connect to a DPL workflow running on Hyperloop via the remote proxy.

Accepts a URL like:
https://alimonitor.cern.ch/train-workdir/remote-gui/remote_proxy.html?<token>/<port>

and remaps it to the local WebSocket proxy endpoint.

Args:
url: The remote_proxy.html URL from alimonitor.
name: Optional human-friendly name for this workflow.
token: Hyperloop auth token. Falls back to HYPERLOOP_TOKEN env var.
"""
token = token or os.environ.get("HYPERLOOP_TOKEN", "")
if not token:
return "No token provided and HYPERLOOP_TOKEN environment variable is not set."

parsed = urlparse(url)
path_suffix = parsed.query # everything after '?'
if not path_suffix:
return f"Cannot parse token/port from URL: {url}"

ws_url = f"ws://localhost:8888/remote-mcp/o2/{path_suffix}/status"
wf_name = name or path_suffix.split("/")[-1]

if wf_name in _workflows:
old = _workflows[wf_name]
await old.close()

headers = {"Authorization": f"Bearer {token}"}
conn = WorkflowConnection(url=ws_url, name=wf_name, extra_headers=headers)
await conn.ensure_connected()
_workflows[wf_name] = conn

devices = conn.snapshot.get("devices", [])
return (
f"Connected to Hyperloop workflow '{wf_name}' via {ws_url} "
f"({len(devices)} device(s))."
)


@mcp.tool()
async def disconnect(workflow: str) -> str:
"""Disconnect from a DPL workflow and release its resources.
Expand Down