From 39381768e8f7fa7a2644eebb5ecd0a84ad28261a Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Mon, 22 Jun 2026 12:53:37 -0400 Subject: [PATCH 01/14] Add scoped event-time director models Add scoped event-time model storage and inference so the dragonfly-dally event-time surrogate can use separate ML models for switch LPs while retaining the existing director request flow. Fix ZeroMQ director request argument handling so command handlers parse the argument-count prefix exactly once. This prevents client IDs such as 1 from being mistaken for a second argument count and dropped from training/inference requests. Restore the ZeroMQ director build path by compiling director-client.C when USE_ZMQML is enabled and propagating the USE_ZMQML compile definition to downstream targets. Clean up the director-client merge conflict around global ZMQ latency statistics and keep the cumulative MPI-reduced DIR_STATS output format. Expose a latency-recording hook so event-time inference requests from dragonfly-dally are included in the shared ZMQ request statistics. Update the event-time workflow to use START_ITER and END_ITER template variables and save/load the scoped event-time model directory rather than a single model file. --- CODES-compile-instructions.sh | 104 ++-- codes/surrogate/director-client.h | 6 +- .../kb.dfdally-72-zeromq-director.conf.in | 16 +- src/CMakeLists.txt | 78 +-- src/networks/model-net/dragonfly-dally.C | 33 +- src/surrogate/director-client.C | 113 ++-- src/surrogate/zmqml/model/mliterationtime.py | 59 +- src/surrogate/zmqml/zmqmlserver.py | 521 +++++++++++++++--- 8 files changed, 692 insertions(+), 238 deletions(-) diff --git a/CODES-compile-instructions.sh b/CODES-compile-instructions.sh index 23f862ae..79d64c3f 100644 --- a/CODES-compile-instructions.sh +++ b/CODES-compile-instructions.sh @@ -3,9 +3,9 @@ set -euo pipefail set -x # Switches -swm_enable=0 +swm_enable=1 union_enable=0 -torch_enable=0 +torch_enable=1 # Uncomment below for MPICH #export PATH=/usr/local/mpich-4.1.2/bin/:"$PATH" @@ -37,6 +37,17 @@ else echo "Using existing ross checkout: $(realpath ross)" fi + +if [ "$torch_enable" = 1 ]; then + make_args_codes=( + "${make_args_codes[@]}" + ) +else + make_args_codes=( + "${make_args_codes[@]}" + ) +fi + if [ $swm_enable = 1 ]; then if [ ! -d argobots/.git ]; then git clone https://github.com/pmodels/argobots --depth=1 @@ -192,53 +203,41 @@ fi -# Make system pkg-config metadata visible even when Conda's pkg-config is active. -# This is needed for libzmq.pc on systems where ZeroMQ is installed through the OS -# but the active Conda environment's pkg-config only searches Conda pkgconfig dirs. -if ! pkg-config --exists libzmq 2>/dev/null; then - for pcdir in \ - /usr/lib/x86_64-linux-gnu/pkgconfig \ - /usr/lib64/pkgconfig \ - /usr/lib/pkgconfig \ - /usr/local/lib/pkgconfig \ - /usr/local/lib64/pkgconfig \ - /opt/homebrew/lib/pkgconfig \ - /usr/share/pkgconfig - do - if [ -d "$pcdir" ]; then - export PKG_CONFIG_PATH="$pcdir:${PKG_CONFIG_PATH:-}" - fi - done -fi +if [ "$torch_enable" = 1 ]; then + # Make system pkg-config metadata visible even when Conda's pkg-config is active. + # This is needed for libzmq.pc on systems where ZeroMQ is installed through the OS + # but the active Conda environment's pkg-config only searches Conda pkgconfig dirs. + if ! pkg-config --exists libzmq 2>/dev/null; then + for pcdir in \ + /usr/lib/x86_64-linux-gnu/pkgconfig \ + /usr/lib64/pkgconfig \ + /usr/lib/pkgconfig \ + /usr/local/lib/pkgconfig \ + /usr/local/lib64/pkgconfig \ + /opt/homebrew/lib/pkgconfig \ + /usr/share/pkgconfig + do + if [ -d "$pcdir" ]; then + export PKG_CONFIG_PATH="$pcdir:${PKG_CONFIG_PATH:-}" + fi + done + fi + + if ! pkg-config --exists libzmq 2>/dev/null; then + echo "WARNING: pkg-config still cannot find libzmq.pc." >&2 + echo " If ZMQML requester support fails to build, install the ZeroMQ development package" >&2 + echo " or set PKG_CONFIG_PATH to the directory containing libzmq.pc." >&2 + fi -if ! pkg-config --exists libzmq 2>/dev/null; then - echo "WARNING: pkg-config still cannot find libzmq.pc." >&2 - echo " If ZMQML fails to build, install the ZeroMQ development package" >&2 - echo " or set PKG_CONFIG_PATH to the directory containing libzmq.pc." >&2 + # Build local ZMQML requester library required by director-client.C. + pushd codes/src/surrogate/zmqml + make clean + make + test -f libzmqmlrequester.so + test -f zmqmlrequester.h + popd fi -# Build local ZMQML requester library required by director-client.C -pushd codes/src/surrogate/zmqml -make clean -make -test -f libzmqmlrequester.so -test -f zmqmlrequester.h -popd - -# Make imported zmqmlrequester target visible to doc/example and tests. -python3 - <<'INNERPY' -from pathlib import Path -cm = Path("codes/src/CMakeLists.txt") -text = cm.read_text() -old = "add_library(zmqmlrequester SHARED IMPORTED )" -new = "add_library(zmqmlrequester SHARED IMPORTED GLOBAL)" -if old in text: - cm.write_text(text.replace(old, new)) -elif new in text: - pass -else: - raise SystemExit("Could not find zmqmlrequester imported target line in codes/src/CMakeLists.txt") -INNERPY mkdir -p codes/build pushd codes/build @@ -368,10 +367,8 @@ make_args_codes=( -DCMAKE_USE_WIN32_THREADS_INIT=0 -DCMAKE_BUILD_TYPE=Debug -DBUILD_TESTING=ON -DCMAKE_INSTALL_PREFIX="$(realpath bin)" - -DZMQML_BUILD_PATH="$(realpath "$CUR_DIR/codes/src/surrogate/zmqml")" - -DZeroMQ_INCLUDE_DIR=/usr/include - -DZeroMQ_LIBRARY=/usr/lib/x86_64-linux-gnu/libzmq.so ) + if [ $swm_enable = 1 ]; then make_args_codes=( "${make_args_codes[@]}" @@ -390,6 +387,10 @@ if [ "$torch_enable" = 1 ]; then "${make_args_codes[@]}" -DUSE_TORCH=true -DTorch_DIR="${torch_dir}" + -DUSE_ZMQML=true + -DZMQML_BUILD_PATH="$(realpath "$CUR_DIR/codes/src/surrogate/zmqml")" + -DZeroMQ_INCLUDE_DIR=/usr/include + -DZeroMQ_LIBRARY=/usr/lib/x86_64-linux-gnu/libzmq.so ) if [ -n "${CUDA_HOME:-}" ]; then @@ -412,7 +413,10 @@ if [ "$torch_enable" = 1 ]; then ) fi else - make_args_codes=("${make_args_codes[@]}" -DUSE_TORCH=false) + make_args_codes=( + "${make_args_codes[@]}" + -DUSE_TORCH=false + ) fi cmake .. "${make_args_codes[@]}" diff --git a/codes/surrogate/director-client.h b/codes/surrogate/director-client.h index aaea3d09..b9c4b3fd 100644 --- a/codes/surrogate/director-client.h +++ b/codes/surrogate/director-client.h @@ -124,9 +124,8 @@ extern "C" { // const char* data); -extern void director_lp_register_model(const char*); - - +extern void director_lp_register_model(const char *); +extern void director_record_external_zmq_latency(double processing_sec, double total_sec); /* extern void director_parse_args(char *args, int **args_array, int *length); static void director_issue_codes_event(director_state * s, tw_lpid nw_lpid, int dir_registered_event_type, tw_stime ts, tw_lp* lp); @@ -142,5 +141,4 @@ extern void dir_test_finalize(director_state* s, tw_lp* lp); #ifdef __cplusplus } #endif - #endif diff --git a/doc/example/kb.dfdally-72-zeromq-director.conf.in b/doc/example/kb.dfdally-72-zeromq-director.conf.in index 656959c4..fdb77ec7 100644 --- a/doc/example/kb.dfdally-72-zeromq-director.conf.in +++ b/doc/example/kb.dfdally-72-zeromq-director.conf.in @@ -23,19 +23,19 @@ LPGROUPS DIRECTOR { - start_iter="${DIRECTOR_START_ITER}"; - end_iter="${DIRECTOR_END_ITER}"; + start_iter="${START_ITER}"; + end_iter="${END_ITER}"; # Optional one-shot pause/retrain/resume pipeline. # First implementation is intended for --synch=1. - retrain_enabled="${DIRECTOR_RETRAIN_ENABLED}"; - retrain_iter="${DIRECTOR_RETRAIN_ITER}"; - retrain_save_path="${DIRECTOR_RETRAIN_SAVE_PATH}"; + retrain_enabled="${RETRAIN_ENABLED}"; + retrain_iter="${RETRAIN_ITER}"; + retrain_save_path="${RETRAIN_SAVE_PATH}"; # Optional second surrogate window after retraining. - second_surrogate_enabled="${DIRECTOR_SECOND_SURROGATE_ENABLED}"; - second_start_iter="${DIRECTOR_SECOND_START_ITER}"; - second_end_iter="${DIRECTOR_SECOND_END_ITER}"; + second_surrogate_enabled="${SECOND_SURROGATE_ENABLED}"; + second_start_iter="${SECOND_START_ITER}"; + second_end_iter="${SECOND_END_ITER}"; # Common modes: # diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 99430538..4e834b90 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,3 +1,4 @@ +option(USE_ZMQML "Enable ZeroMQ ML requester support" OFF) cmake_print_variables(CMAKE_CURRENT_SOURCE_DIR) find_package(FLEX REQUIRED) @@ -124,68 +125,38 @@ if(USE_TORCH) list(APPEND LIBS_TO_LINK ${TORCH_LIBRARIES}) endif() -# ZMQML / director-client (opt-in). When USE_ZMQML=ON, callers must -# point ZMQML_BUILD_PATH at a directory containing libzmqmlrequester.so -# (build it via src/surrogate/zmqml/Makefile, or set ZMQML_BUILD_PATH to -# wherever you installed it). When OFF (the default), CODES builds with -# no surrogate/director-client linkage; configs that reference -# "dir-nw-lp" will fail at runtime because the LP type isn't registered. -option(USE_ZMQML "Build the director-client + zmqml surrogate integration" OFF) +# ZMQML requester support if(USE_ZMQML) - if(NOT ZMQML_BUILD_PATH) - message(FATAL_ERROR - "USE_ZMQML=ON but ZMQML_BUILD_PATH is unset.\n" - "Build src/surrogate/zmqml/libzmqmlrequester.so first, then " - "reconfigure with -DZMQML_BUILD_PATH=.") - endif() list(APPEND SRCS surrogate/director-client.C) -endif() -add_library(codes STATIC ${SRCS}) - -list(APPEND LIBS_TO_LINK ${MPI_C_LIBRARIES}) -target_include_directories(codes INTERFACE ${MPI_C_INCLUDE_PATH}) + if(NOT DEFINED ZMQML_BUILD_PATH) + message(FATAL_ERROR "USE_ZMQML is ON, but ZMQML_BUILD_PATH is not defined.") + endif() -# set(LIBS_TO_LINK -# PkgConfig::ROSS -# ${DUMPI_LIB} -# PkgConfig::ARGOBOTS -# PkgConfig::SWM -# ) + if(NOT EXISTS "${ZMQML_BUILD_PATH}/libzmqmlrequester.so") + message(FATAL_ERROR "USE_ZMQML is ON, but ${ZMQML_BUILD_PATH}/libzmqmlrequester.so does not exist. Re-run CODES-compile-instructions.sh so the local requester library is built before configuring CODES.") + endif() -#LINK DUMPI -# target_link_libraries(codes PUBLIC ${DUPMI_LIB}) -if(USE_DUMPI) - target_include_directories(codes PUBLIC ${DUMPI_INCLUDE}) -endif() + pkg_check_modules(PC_ZeroMQ QUIET zmq) + find_path(ZeroMQ_INCLUDE_DIR NAMES zmq.hpp PATHS ${PC_ZeroMQ_INCLUDE_DIRS}) + find_library(ZeroMQ_LIBRARY NAMES zmq PATHS ${PC_ZeroMQ_LIBRARY_DIRS}) -#LINK ARGOBOTS, SWM and UNION -# target_link_libraries(codes PUBLIC PkgConfig::ARGOBOTS) -if(USE_ONLINE) - if(USE_SWM) - target_include_directories(codes PUBLIC ${ARGOBOTS_INCLUDE_DIRS}) - # target_link_libraries(codes PUBLIC PkgConfig::SWM) - target_include_directories(codes PUBLIC ${SWM_INCLUDE_DIRS}) + if(NOT ZeroMQ_LIBRARY) + message(FATAL_ERROR "USE_ZMQML is ON, but libzmq was not found.") endif() - if(USE_UNION) - target_include_directories(codes PUBLIC ${ARGOBOTS_INCLUDE_DIRS}) - # target_link_libraries(codes PUBLIC PkgConfig::SWM) - target_include_directories(codes PUBLIC ${SWM_INCLUDE_DIRS}) - target_include_directories(codes PUBLIC ${UNION_INCLUDE_DIRS}) - endif() -endif() -if(USE_ZMQML) add_library(zmqmlrequester SHARED IMPORTED GLOBAL) set_target_properties(zmqmlrequester PROPERTIES IMPORTED_LOCATION "${ZMQML_BUILD_PATH}/libzmqmlrequester.so" INTERFACE_INCLUDE_DIRECTORIES "${ZMQML_BUILD_PATH}") - target_compile_definitions(codes PUBLIC USE_ZMQML) endif() #LINK ROSS # target_link_libraries(codes PUBLIC #{pkgcfg_lib_ROSS_ROSS}) # target_link_libraries(codes PUBLIC PkgConfig::ROSS) + +add_library(codes ${SRCS}) + target_include_directories(codes PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${ROSS_INCLUDE_DIRS} @@ -194,11 +165,16 @@ target_include_directories(codes PUBLIC ${PROJECT_SOURCE_DIR}/codes ${PROJECT_SOURCE_DIR}/src ${PROJECT_SOURCE_DIR}/src/modelconfig - $<$:$> ) target_link_libraries(codes PUBLIC ${LIBS_TO_LINK}) +if(USE_ZMQML) + target_compile_definitions(codes PUBLIC USE_ZMQML) + target_include_directories(codes PUBLIC "${ZMQML_BUILD_PATH}" ${ZeroMQ_INCLUDE_DIR}) + target_link_libraries(codes PUBLIC zmqmlrequester ${ZeroMQ_LIBRARY}) +endif() + get_target_property(CODES_INCLUDE_DIRS codes INCLUDE_DIRECTORIES) cmake_print_variables(CODES_INCLUDE_DIRS) @@ -227,18 +203,12 @@ if(USE_DUMPI) list(APPEND CODES_TARGETS model-net-dumpi-traces-dump) endif() -# ZMQ — only resolved + linked when USE_ZMQML is on; otherwise nothing -# in the codes library calls into libzmq. -if(USE_ZMQML) - pkg_check_modules(PC_ZeroMQ QUIET zmq) - find_path(ZeroMQ_INCLUDE_DIR NAMES zmq.hpp PATHS ${PC_ZeroMQ_INCLUDE_DIRS}) - find_library(ZeroMQ_LIBRARY NAMES zmq PATHS ${PC_ZeroMQ_LIBRARY_DIRS}) -endif() - foreach(tar IN LISTS CODES_TARGETS) target_include_directories(${tar} PUBLIC ${CODES_INCLUDE_DIRS} ${ROSS_INCLUDE_DIRS}) target_link_libraries(${tar} PUBLIC codes ${LIBS_TO_LINK}) + if(USE_ZMQML) + target_include_directories(${tar} PUBLIC "${ZMQML_BUILD_PATH}" ${ZeroMQ_INCLUDE_DIR}) target_link_libraries(${tar} PUBLIC zmqmlrequester ${ZeroMQ_LIBRARY}) endif() endforeach() diff --git a/src/networks/model-net/dragonfly-dally.C b/src/networks/model-net/dragonfly-dally.C index b4c064cf..f97e71c8 100644 --- a/src/networks/model-net/dragonfly-dally.C +++ b/src/networks/model-net/dragonfly-dally.C @@ -22,6 +22,7 @@ #include "codes/model-net-method.h" #include "codes/model-net-lp.h" #include "codes/surrogate/init.h" +#include "codes/surrogate/director-client.h" #ifdef USE_TORCH #include "codes/surrogate/packet-latency-predictor/torch-jit.h" #endif @@ -64,11 +65,18 @@ * resolve it to null.) */ #ifdef USE_ZMQML -extern std::vector zmqml_director_request(const std::string& surrogate_family, - const std::string& surrogate_backend, - const std::string& operation, - const std::vector& args, - const std::string& bindata); +extern std::vector zmqml_director_request( + const std::string& surrogate_family, + const std::string& surrogate_backend, + const std::string& operation, + const std::vector& args, + const std::string& bindata +) __attribute__((weak)); + +extern "C" void director_record_external_zmq_latency( + double processing_sec, + double total_sec +) __attribute__((weak)); extern void director_record_zmq_latency_stats(const char* label, const std::vector& ret, @@ -297,7 +305,20 @@ static std::vector dfdally_event_time_director_request_with_latency (double)(finish.tv_nsec - start.tv_nsec) / 1000000000.0; #ifdef USE_ZMQML - director_record_zmq_latency_stats(label, ret, local_latency_sec); + double zmq_processing_time = 0.0; + + if (ret.size() > 1) { + char *endptr = NULL; + double parsed = strtod(ret[1].c_str(), &endptr); + + if (endptr != ret[1].c_str() && isfinite(parsed) && parsed >= 0.0) { + zmq_processing_time = parsed; + } + } + + if (director_record_external_zmq_latency) { + director_record_external_zmq_latency(zmq_processing_time, local_latency_sec); + } #endif return ret; diff --git a/src/surrogate/director-client.C b/src/surrogate/director-client.C index 3b115dc0..5b658d07 100644 --- a/src/surrogate/director-client.C +++ b/src/surrogate/director-client.C @@ -23,7 +23,7 @@ #define DIR_ZMQ_CMD_LENGTH 64 #define DIR_ZMQ_ARG_LENGTH 2048 -#define DIR_MAX_PREDICTION 5 +#define DIR_MAX_PREDICTION 1 #define DIR_MAX_TRAINING_RECORDS 10 /* * The Python iteration-time model currently uses history_len=2 and horizon=3, @@ -86,42 +86,70 @@ std::vector director_client_request_family(const char* surrogate_fa int surrogate_enabled = 0; int inferencing_enabled = 1; - -void director_record_zmq_latency_stats(const char* label, const std::vector& ret, - double local_latency_sec) { - (void)label; - +static void director_record_zmq_latency_values( + double processing_sec, + double total_sec) +{ if (evaluate_perf != 1) { return; } - director_zmq_total_elapsed_times.push_back(local_latency_sec); + if (!std::isfinite(processing_sec) || processing_sec < 0.0) { + processing_sec = 0.0; + } + + if (!std::isfinite(total_sec) || total_sec < 0.0) { + total_sec = 0.0; + } + director_zmq_processing_times.push_back(processing_sec); + director_zmq_total_elapsed_times.push_back(total_sec); +} + +static void director_record_zmq_latency_stats( + const char* label, + const std::vector& ret, + double local_latency_sec) +{ double zmq_processing_time = 0.0; + if (ret.size() > 1) { - try { - zmq_processing_time = std::stod(ret[1]); - } catch (...) { - if (director_debug_prints) { - fprintf(stderr, - "[DIR] Warning: could not parse zmq processing time from reply field " - "ret[1]=%s\n", - ret[1].c_str()); - fflush(stderr); - } + char *endptr = NULL; + double parsed = strtod(ret[1].c_str(), &endptr); + + if (endptr != ret[1].c_str() && std::isfinite(parsed) && parsed >= 0.0) { + zmq_processing_time = parsed; + } else if (director_debug_prints) { + fprintf( + stderr, + "[DIR] Warning: could not parse zmq processing time from reply field ret[1]=%s request=%s\n", + ret[1].c_str(), + label ? label : "" + ); + fflush(stderr); } } else if (director_debug_prints) { - fprintf(stderr, - "[DIR] Warning: zmq reply too short for perf timing: ret.size()=%llu request=%s\n", - (unsigned long long)ret.size(), label ? label : ""); + fprintf( + stderr, + "[DIR] Warning: zmq reply too short for perf timing: ret.size()=%llu request=%s\n", + (unsigned long long)ret.size(), + label ? label : "" + ); fflush(stderr); } - director_zmq_processing_times.push_back(zmq_processing_time); + director_record_zmq_latency_values(zmq_processing_time, local_latency_sec); } +extern "C" void director_record_external_zmq_latency( + double processing_sec, + double total_sec) +{ + director_record_zmq_latency_values(processing_sec, total_sec); +} -static int director_surrogate_family_is(const char* family) { +static int director_surrogate_family_is(const char *family) +{ return strcmp(director_config_global.surrogate_family, family) == 0; } @@ -333,7 +361,7 @@ static void director_send_iteration_records_now(int client_id, int training_cycl * * training_cycle_id is currently used only for local debug output. */ - sprintf(args, "%d;%d;%d;", 3, client_id, num_records); + sprintf(args, "%d;%d;%d;", 2, client_id, num_records); struct timespec start, end; clock_gettime(CLOCK_MONOTONIC, &start); @@ -1246,31 +1274,29 @@ void director_event_handler_commit(director_state* s, tw_bf* bf, director_messag static void director_reduce_and_print_zmq_latency_stat(const char* stat_name, const std::vector& local_values) { - unsigned long long local_count = (unsigned long long)local_values.size(); - + unsigned long long local_count = 0; double local_sum = 0.0; double local_sq_sum = 0.0; double local_min = std::numeric_limits::infinity(); double local_max = -std::numeric_limits::infinity(); for (double value : local_values) { - local_sum += value; - local_sq_sum += value * value; - - if (value < local_min) { - local_min = value; + if (!std::isfinite(value)) { + continue; } - if (value > local_max) { - local_max = value; - } + local_count++; + local_sum += value; + local_sq_sum += value * value; + local_min = std::min(local_min, value); + local_max = std::max(local_max, value); } unsigned long long global_count = 0; double global_sum = 0.0; double global_sq_sum = 0.0; - double global_min = 0.0; - double global_max = 0.0; + double global_min = std::numeric_limits::infinity(); + double global_max = -std::numeric_limits::infinity(); MPI_Reduce(&local_count, &global_count, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_CODES); @@ -1293,14 +1319,14 @@ static void director_reduce_and_print_zmq_latency_stat(const char* stat_name, double variance = global_sq_sum / (double)global_count - mean * mean; /* - * Floating-point roundoff can make variance slightly negative when - * values are very close together. + * Floating-point roundoff can make variance slightly negative when values + * are very close together. */ if (variance < 0.0 && variance > -1.0e-18) { variance = 0.0; } - double stddev = sqrt(variance); + double stddev = variance > 0.0 ? sqrt(variance) : 0.0; std::cout << std::setprecision(9) << std::fixed << "==DIR_STATS " << stat_name << ": requests = " << global_count << ", mean = " << mean << ", min = " << global_min @@ -1335,7 +1361,9 @@ static void director_print_zmq_latency_stats_once(void) { director_zmq_total_elapsed_times); } -void director_finalize(director_state* s, tw_lp* lp) { + +void director_finalize(director_state* s, tw_lp* lp) +{ director_print_zmq_latency_stats_once(); /* @@ -1362,11 +1390,10 @@ tw_lptype dir_lp = {(init_f)director_init, (map_f)codes_mapping, sizeof(director_state)}; -extern void director_lp_register_model(const char* dir_lp_name) { +extern void director_lp_register_model(const char * dir_lp_name){ int num_dir_per_mgrp = codes_mapping_get_lp_count("MODELNET_GRP", 1, "dir-nw-lp", NULL, 0); - if (num_dir_per_mgrp > 0) { - lp_type_register(dir_lp_name, &dir_lp); // DIRECTOR addition - register type - //printf("\n==DIR: Registered\n"); + if(num_dir_per_mgrp > 0){ + lp_type_register(dir_lp_name, &dir_lp); } } diff --git a/src/surrogate/zmqml/model/mliterationtime.py b/src/surrogate/zmqml/model/mliterationtime.py index 6fda3461..ce9877ce 100644 --- a/src/surrogate/zmqml/model/mliterationtime.py +++ b/src/surrogate/zmqml/model/mliterationtime.py @@ -302,6 +302,51 @@ def _predict_once(self, client_id: int, history: List[float], iteration: int) -> return np.asarray(cleaned, dtype=np.float64) + def _global_fallback_prediction(self, requested_horizon: int) -> List[float]: + requested_horizon = max(1, int(requested_horizon)) + + recent_values: List[float] = [] + for model in self.models.values(): + if model.records: + recent_values.extend(model.records[-max(1, self.history_len):]) + + recent_values = _as_positive_finite(recent_values) + + if recent_values: + value = float(np.median(np.asarray(recent_values, dtype=np.float64))) + elif self.y_mean is not None and len(self.y_mean) > 0: + value = float(self.y_mean[0]) + else: + # Match the older iteration-time fallback scale instead of using 1.0. + value = 2_000_000.0 + + if not np.isfinite(value) or value <= 0.0: + value = 2_000_000.0 + + return [value for _ in range(requested_horizon)] + + def _global_fallback_prediction(self, requested_horizon: int) -> List[float]: + requested_horizon = max(1, int(requested_horizon)) + + recent_values: List[float] = [] + for model in self.models.values(): + if model.records: + recent_values.extend(model.records[-max(1, self.history_len):]) + + recent_values = _as_positive_finite(recent_values) + + if recent_values: + value = float(np.median(np.asarray(recent_values, dtype=np.float64))) + elif self.y_mean is not None and len(self.y_mean) > 0: + value = float(self.y_mean[0]) + else: + value = 2_000_000.0 + + if not np.isfinite(value) or value <= 0.0: + value = 2_000_000.0 + + return [value for _ in range(requested_horizon)] + def predict(self, client_id: int, requested_horizon: int | None = None) -> List[float]: client_id = int(client_id) requested_horizon = int(requested_horizon or self.horizon) @@ -310,7 +355,15 @@ def predict(self, client_id: int, requested_horizon: int | None = None) -> List[ model = self.get(client_id) if not model.records: - return model._fallback_prediction(requested_horizon) + fallback = self._global_fallback_prediction(requested_horizon) + if self.debug: + print( + "[IterationTimeModelRegistry] predict global-fallback " + f"client={client_id} requested_horizon={requested_horizon} " + f"trained={int(self.trained)} predictions={fallback}", + flush=True, + ) + return fallback # Predict in chunks if requested_horizon > self.horizon. out: List[float] = [] @@ -326,6 +379,9 @@ def predict(self, client_id: int, requested_horizon: int | None = None) -> List[ history.append(float(value)) iteration += 1 + if not out: + out = self._global_fallback_prediction(requested_horizon) + if self.debug: print( "[IterationTimeModelRegistry] predict " @@ -337,6 +393,7 @@ def predict(self, client_id: int, requested_horizon: int | None = None) -> List[ return out + def save(self, path: str) -> None: out_path = Path(path) if out_path.parent: diff --git a/src/surrogate/zmqml/zmqmlserver.py b/src/surrogate/zmqml/zmqmlserver.py index fb4b7944..9631c106 100755 --- a/src/surrogate/zmqml/zmqmlserver.py +++ b/src/surrogate/zmqml/zmqmlserver.py @@ -20,6 +20,8 @@ from model.mliterationtime import IterationTimeModelRegistry from model.mleventtime import EventTimeModel import csv +import io +import pickle from pathlib import Path import os @@ -37,19 +39,262 @@ launch_id = count(start=1) # unique for launched thread launched_threads = {} # id:obj. keep track of active threads. remove the thread once it finished -training_records = {} # client_id:[] -iteration_time_models = IterationTimeModelRegistry( - history_len=int(os.environ.get("ZMQML_ITERATION_HISTORY_LEN", "4")), - horizon=int(os.environ.get("ZMQML_ITERATION_HORIZON", "30")), - ridge_alpha=float(os.environ.get("ZMQML_ITERATION_RIDGE_ALPHA", "1.0")), - train_stride=int(os.environ.get("ZMQML_ITERATION_TRAIN_STRIDE", "3")), -) -event_time_model = EventTimeModel( - min_rows=int(os.environ.get("ZMQML_EVENT_TIME_MIN_ROWS", "32")), - max_epochs=int(os.environ.get("ZMQML_EVENT_TIME_EPOCHS", "80")), - lr=float(os.environ.get("ZMQML_EVENT_TIME_LR", "0.001")), - hidden_dim=int(os.environ.get("ZMQML_EVENT_TIME_HIDDEN_DIM", "64")), -) +training_records = {} # client_id -> [] + +ITERATION_MODEL_KWARGS = { + "history_len": int(os.environ.get("ZMQML_ITERATION_HISTORY_LEN", "4")), + "horizon": int(os.environ.get("ZMQML_ITERATION_HORIZON", "30")), + "ridge_alpha": float(os.environ.get("ZMQML_ITERATION_RIDGE_ALPHA", "1.0")), + "train_stride": int(os.environ.get("ZMQML_ITERATION_TRAIN_STRIDE", "3")), +} + +EVENT_TIME_MODEL_KWARGS = { + "min_rows": int(os.environ.get("ZMQML_EVENT_TIME_MIN_ROWS", "32")), + "max_epochs": int(os.environ.get("ZMQML_EVENT_TIME_EPOCHS", "80")), + "lr": float(os.environ.get("ZMQML_EVENT_TIME_LR", "0.001")), + "hidden_dim": int(os.environ.get("ZMQML_EVENT_TIME_HIDDEN_DIM", "64")), +} + +DEFAULT_TERMINAL_MODEL_SCOPE = "terminal" +DEFAULT_TERMINAL_MODEL_KEY = "global" + +# The current dragonfly-dally event-time rows use numeric LP-type fields. +# By default, current_lp_type=0 is treated as a terminal LP; all other LP +# types get switch-local models. Override if the enum changes. +EVENT_TIME_TERMINAL_LP_TYPES = { + token.strip() + for token in os.environ.get("ZMQML_EVENT_TIME_TERMINAL_LP_TYPES", "0").split(",") + if token.strip() +} + + +def normalize_model_identity(model_scope: str | None = None, model_key: str | None = None) -> tuple[str, str, str]: + scope = str(model_scope or "").strip() + key = str(model_key or "").strip() + + if not scope: + scope = DEFAULT_TERMINAL_MODEL_SCOPE + + if scope in ("terminal", "term", "client", "global"): + scope = DEFAULT_TERMINAL_MODEL_SCOPE + key = DEFAULT_TERMINAL_MODEL_KEY + elif scope in ("router", "switch", "switch-lp", "router-lp"): + scope = "switch" + if not key: + key = "unknown" + else: + if not key: + key = DEFAULT_TERMINAL_MODEL_KEY + + model_id = f"{scope}:{key}" + return scope, key, model_id + + +def model_identity_from_real_args( + real_args: list[str], + *, + offset: int, + default_scope: str = DEFAULT_TERMINAL_MODEL_SCOPE, + default_key: str = DEFAULT_TERMINAL_MODEL_KEY, +) -> tuple[str, str, str]: + if len(real_args) >= offset + 2: + return normalize_model_identity(real_args[offset], real_args[offset + 1]) + return normalize_model_identity(default_scope, default_key) + + + +class ScopedIterationTimeModelRegistry: + """Compatibility facade that keeps iteration-time unscoped. + + Event-time uses scoped models, but iteration-time should remain the original + client/app-level model. The surrounding unified Director request code may + still pass model_scope/model_key arguments; this facade deliberately ignores + them and routes all iteration-time records/inference to one plain + IterationTimeModelRegistry. + """ + + def __init__(self, kwargs: dict): + self.kwargs = dict(kwargs) + self.registry = IterationTimeModelRegistry(**self.kwargs) + self.debug = False + + def set_debug(self, enabled: bool) -> None: + self.debug = bool(enabled) + self.registry.set_debug(self.debug) + + def get_registry(self, model_scope: str | None = None, model_key: str | None = None) -> IterationTimeModelRegistry: + return self.registry + + def get(self, client_id: int, model_scope: str | None = None, model_key: str | None = None): + return self.registry.get(client_id) + + def set_client_app_id(self, client_id: int, app_id: int, model_scope: str | None = None, model_key: str | None = None) -> None: + self.registry.set_client_app_id(client_id, app_id) + + def predict(self, client_id: int, requested_horizon: int | None = None, model_scope: str | None = None, model_key: str | None = None): + return self.registry.predict(client_id, requested_horizon) + + def train_or_update(self, model_scope: str | None = None, model_key: str | None = None) -> bool: + return self.registry.train_or_update() + + def save(self, path: str) -> None: + self.registry.save(path) + + def load(self, path: str) -> None: + self.registry.load(path) + self.registry.set_debug(self.debug) + + def status(self, model_scope: str | None = None, model_key: str | None = None) -> dict: + client_ids = sorted(self.registry.models.keys()) + total_clients = len(client_ids) + trained_clients = 0 + total_records = 0 + client_summaries = [] + + for client_id in client_ids: + model = self.registry.get(client_id) + records = len(model.records) + trained = bool(model.trained) + total_records += records + trained_clients += int(trained) + client_summaries.append( + f"{client_id}:records={records},trained={int(trained)}" + ) + + return { + "total_clients": str(total_clients), + "trained_clients": str(trained_clients), + "total_records": str(total_records), + "clients": ";".join(client_summaries), + } + +class ScopedEventTimeModelRegistry: + def __init__(self, kwargs: dict): + self.kwargs = dict(kwargs) + self.models: dict[str, EventTimeModel] = {} + self.debug = False + self.model_versions: dict[str, int] = {} + + def set_debug(self, enabled: bool) -> None: + self.debug = bool(enabled) + for model in self.models.values(): + model.set_debug(self.debug) + + def get(self, model_scope: str | None = None, model_key: str | None = None) -> EventTimeModel: + _, _, model_id = normalize_model_identity(model_scope, model_key) + if model_id not in self.models: + model = EventTimeModel(**self.kwargs) + model.set_debug(self.debug) + self.models[model_id] = model + self.model_versions.setdefault(model_id, 0) + return self.models[model_id] + + def model_id(self, model_scope: str | None = None, model_key: str | None = None) -> str: + return normalize_model_identity(model_scope, model_key)[2] + + def train_or_update(self, model_scope: str | None = None, model_key: str | None = None) -> bool: + if model_scope in (None, "", "all", "*"): + trained_any = False + for model_id, model in self.models.items(): + trained = model.train_or_update() + if trained: + self.model_versions[model_id] = self.model_versions.get(model_id, 0) + 1 + trained_any = trained or trained_any + return trained_any + + model_id = self.model_id(model_scope, model_key) + model = self.get(model_scope, model_key) + trained = model.train_or_update() + if trained: + self.model_versions[model_id] = self.model_versions.get(model_id, 0) + 1 + return trained + + @staticmethod + def _safe_filename(model_id: str) -> str: + return model_id.replace("/", "_").replace(":", "__") + + def save(self, path: str | Path, model_scope: str | None = None, model_key: str | None = None) -> None: + path = Path(path) + + if model_scope not in (None, "", "all", "*"): + model = self.get(model_scope, model_key) + if path.parent: + path.parent.mkdir(parents=True, exist_ok=True) + model.save(path) + return + + path.mkdir(parents=True, exist_ok=True) + manifest = { + "format": "scoped-event-time-directory-v1", + "models": {}, + } + + for model_id, model in sorted(self.models.items()): + filename = self._safe_filename(model_id) + ".pt" + model.save(path / filename) + manifest["models"][model_id] = filename + + (path / "manifest.json").write_text(json.dumps(manifest, indent=2, sort_keys=True)) + + def load(self, path: str | Path, model_scope: str | None = None, model_key: str | None = None) -> None: + path = Path(path) + + if path.is_dir(): + manifest_path = path / "manifest.json" + if not manifest_path.exists(): + raise FileNotFoundError(f"missing scoped event-time manifest: {manifest_path}") + + manifest = json.loads(manifest_path.read_text()) + self.models.clear() + self.model_versions.clear() + for model_id, filename in manifest.get("models", {}).items(): + scope, key = model_id.split(":", 1) + model = self.get(scope, key) + model.load(path / filename) + model.set_debug(self.debug) + self.model_versions[model_id] = self.model_versions.get(model_id, 0) + 1 + return + + # Backward-compatible load of an old single event-time model file. + model = self.get(model_scope, model_key) + model.load(path) + model.set_debug(self.debug) + model_id = self.model_id(model_scope, model_key) + self.model_versions[model_id] = self.model_versions.get(model_id, 0) + 1 + + def status(self, model_scope: str | None = None, model_key: str | None = None) -> dict[str, str]: + if model_scope in (None, "", "all", "*"): + model_ids = sorted(self.models) + else: + model_ids = [self.model_id(model_scope, model_key)] + + total_rows = 0 + trained_models = 0 + entries = [] + + for model_id in model_ids: + model = self.models.get(model_id) + if model is None: + continue + total_rows += len(model.rows) + trained_models += int(bool(model.trained)) + entries.append( + f"{model_id}:rows={len(model.rows)},trained={int(model.trained)},examples={model.training_examples}" + ) + + return { + "model_count": str(len(model_ids)), + "trained_models": str(trained_models), + "total_rows": str(total_rows), + "models": ";".join(entries), + } + + +iteration_time_models = IterationTimeModelRegistry(**ITERATION_MODEL_KWARGS) +event_time_models = ScopedEventTimeModelRegistry(EVENT_TIME_MODEL_KWARGS) + +# Compatibility alias for old helper code that still references event_time_model. +event_time_model = event_time_models.get() iteration_model_path = os.environ.get("ZMQML_ITERATION_MODEL_PATH", "").strip() event_time_model_path = os.environ.get("ZMQML_EVENT_TIME_MODEL_PATH", "").strip() event_time_record_log_path = os.environ.get("ZMQML_EVENT_TIME_RECORD_LOG_PATH", "").strip() @@ -83,10 +328,11 @@ ) if event_time_model_path: - event_time_model.load(event_time_model_path) + event_time_models.load(event_time_model_path) + event_time_model = event_time_models.get() event_time_model_version = 1 print( - f"[zmqmlserver] loaded event-time model: {event_time_model_path}", + f"[zmqmlserver] loaded event-time model(s): {event_time_model_path}", flush=True, ) @@ -228,7 +474,7 @@ def set_director_debug_prints(args): director_debug_prints = raw in ("1", "true", "yes", "on", "enabled") iteration_time_models.set_debug(director_debug_prints) - event_time_model.set_debug(director_debug_prints) + event_time_models.set_debug(director_debug_prints) if director_debug_prints: print(f"[zmqmlserver] director_debug_prints=1", flush=True) @@ -335,13 +581,17 @@ def receivedata(args, bindata): # # receive training records # + def receiverecords(args, bindata): status = "failed" st = time.time() - num_args = int(args[0]) # 1st arg is num of args - client = int(args[1]) # 2nd arg is client id - num_records = int(args[2]) # 3rd arg is num records + real_args = _real_command_args(args) + if len(real_args) < 2: + return ("failed", time.time() - st) + + client = int(real_args[0]) + num_records = int(real_args[1]) records_str = str(bindata.decode('utf-8')) records_str = records_str.strip() @@ -361,17 +611,10 @@ def receiverecords(args, bindata): training_records[client].extend(parsed_records) - # Keep the raw records available for offline/pretraining workflows. - # By default this preserves the old behavior and trains immediately. - # Set ZMQML_AUTO_TRAIN_ON_RECORDS=0 for pure-PDES collection or - # frozen pretrained inference runs. if parsed_records: append_record_log(client, parsed_records) model = iteration_time_models.get(client) - # Enrich the ML model with app_id metadata when available. - # The C++ protocol still sends client + timing values, while the Python - # server infers app_id from ZMQML_APP_ALLOC_PATH. app_id = client_app_map.get(client, -1) if "client_app_map" in globals() else -1 if hasattr(model, "set_app_id"): @@ -400,19 +643,17 @@ def receiverecords(args, bindata): return (status, elapsed_time) -# -# do inference to get predictions -# def launch_iteration_time_inferencing(args, bindata): status = "failed" st = time.time() - num_args = int(args[0]) # 1st arg is num of args - client = int(args[1]) # 2nd arg is client id - num_steps = int(args[2]) # 3rd arg is num steps to predict + real_args = _real_command_args(args) + if len(real_args) < 2: + return ("failed", time.time() - st, "") + + client = int(real_args[0]) + num_steps = int(real_args[1]) - # Optional recent-context payload. The normal path uses records previously - # received through send-records, but accepting context keeps the API flexible. records_str = str(bindata.decode('utf-8')) records_str = records_str.strip() @@ -448,11 +689,6 @@ def launch_iteration_time_inferencing(args, bindata): elapsed_time = time.time() - st return (status, elapsed_time, inferences_str) - - - - - def event_time_payload_has_header(payload: str) -> bool: for line in payload.splitlines(): line = line.strip() @@ -502,27 +738,101 @@ def append_event_time_record_log(payload: str) -> None: f.write("\n") +def event_time_model_identity_from_row(row: dict) -> tuple[str, str, str]: + raw_lp_type = str(row.get("current_lp_type", "")).strip() + raw_gid = str(row.get("current_lp_gid", "")).strip() + + if raw_lp_type in EVENT_TIME_TERMINAL_LP_TYPES: + return normalize_model_identity("terminal", "global") + + return normalize_model_identity("switch", raw_gid or "unknown") + + +def iter_event_time_rows_from_payload(raw_payload: str): + payload = event_time_payload_with_header(raw_payload) + if not payload.strip(): + return + + reader = csv.DictReader(io.StringIO(payload)) + if reader.fieldnames: + reader.fieldnames = [str(name).strip().lstrip("#").strip() for name in reader.fieldnames] + + for row in reader: + clean = {str(k).strip().lstrip("#").strip(): v for k, v in row.items()} + yield clean + + def receive_event_time_records(args, bindata): st = time.time() raw_payload = bindata.decode("utf-8", errors="replace").strip() - payload = event_time_payload_with_header(raw_payload) - loaded_rows = event_time_model.add_records_text(payload) if payload else 0 + loaded_rows = 0 + + # Important performance rule: + # Do NOT call EventTimeModel.add_records_text(...) once per row. + # Event-time batches can contain 65K+ rows, and per-row parsing/routing makes + # pure-PDES collection much slower than the old single-global model path. + # + # Instead, parse once, group rows by scoped model id, then call + # add_records_text(...) once per scoped model per C++ batch. + grouped_rows: dict[str, list[dict]] = {} + grouped_identity: dict[str, tuple[str, str]] = {} + + for row in iter_event_time_rows_from_payload(raw_payload) or []: + model_scope, model_key, model_id = event_time_model_identity_from_row(row) + grouped_rows.setdefault(model_id, []).append(row) + grouped_identity[model_id] = (model_scope, model_key) + + per_model_loaded: dict[str, int] = {} + + for model_id, rows in grouped_rows.items(): + model_scope, model_key = grouped_identity[model_id] + model = event_time_models.get(model_scope, model_key) + + if not rows: + continue + + # Build one CSV payload for this model. Preserve the field order from + # the first row and include any later extra keys defensively. + fieldnames = list(rows[0].keys()) + seen = set(fieldnames) + for row in rows[1:]: + for key in row.keys(): + if key not in seen: + fieldnames.append(key) + seen.add(key) + + buf = io.StringIO() + writer = csv.DictWriter(buf, fieldnames=fieldnames, extrasaction="ignore") + writer.writeheader() + writer.writerows(rows) + + accepted = model.add_records_text(buf.getvalue()) + loaded_rows += accepted + per_model_loaded[model_id] = accepted + + if accepted > 0 and event_time_auto_train_on_records: + model.train_or_update() if loaded_rows > 0: append_event_time_record_log(raw_payload) - if event_time_auto_train_on_records: - event_time_model.train_or_update() - director_debug( - f"[event-time records] loaded_rows={loaded_rows} " - f"total_rows={len(event_time_model.rows)} " - f"trained={int(event_time_model.trained)}" - ) + if director_debug_prints: + # Keep this compact. Printing the full per_model dict for 100+ switch + # models is noisy and can itself become expensive. + nonempty_models = sum(1 for v in per_model_loaded.values() if v > 0) + min_rows = min(per_model_loaded.values()) if per_model_loaded else 0 + max_rows = max(per_model_loaded.values()) if per_model_loaded else 0 + sample = sorted(per_model_loaded.items())[:8] + print( + f"[event-time records] loaded_rows={loaded_rows} " + f"models={nonempty_models} min_rows_per_model={min_rows} " + f"max_rows_per_model={max_rows} sample={sample}", + flush=True, + ) return ("done", time.time() - st, loaded_rows) - def load_event_time_records_csv_command(args): st = time.time() real_args = _real_command_args(args) @@ -542,22 +852,44 @@ def load_event_time_records_csv_command(args): "error": f"event-time records path does not exist: {path}", } - loaded_rows = event_time_model.load_csv(path) + loaded_rows = 0 + files = sorted(path.rglob("*")) if path.is_dir() else [path] - return { + for child in files: + if not child.is_file() or child.suffix.lower() not in (".csv", ".txt", ".log"): + continue + status, _et, child_rows = receive_event_time_records(["0"], child.read_text().encode("utf-8")) + if status == "done": + loaded_rows += int(child_rows) + + ret = { "status": "done", "et": str(time.time() - st), "path": str(path), "loaded_rows": str(loaded_rows), - "total_rows": str(len(event_time_model.rows)), } + ret.update(event_time_models.status()) + return ret def train_event_time_model_command(args): global event_time_model_version st = time.time() - trained = event_time_model.train_or_update() + real_args = _real_command_args(args) + + target = real_args[0] if real_args else "all" + if target in ("", "all", "*"): + trained = event_time_models.train_or_update("all", "") + model_scope = "all" + model_key = "" + model_id = "all" + elif len(real_args) >= 2: + model_scope, model_key, model_id = normalize_model_identity(real_args[0], real_args[1]) + trained = event_time_models.train_or_update(model_scope, model_key) + else: + model_scope, model_key, model_id = normalize_model_identity("switch", target) + trained = event_time_models.train_or_update(model_scope, model_key) if trained: event_time_model_version += 1 @@ -565,18 +897,17 @@ def train_event_time_model_command(args): ret = { "status": "done" if trained else "failed", "et": str(time.time() - st), + "target": model_id, "model_version": str(event_time_model_version), } - ret.update(event_time_model.status()) + ret.update(event_time_models.status(model_scope, model_key)) if not trained: - ret["error"] = "event-time model was not trained; load enough generic event-time rows first" + ret["error"] = "event-time model was not trained; load enough scoped event-time rows first" print( - f"[event-time model-train-command] trained={int(trained)} " - f"rows={len(event_time_model.rows)} " - f"training_examples={event_time_model.training_examples} " - f"model_version={event_time_model_version}", + f"[event-time model-train-command] target={model_id} trained={int(trained)} " + f"model_version={event_time_model_version} status={event_time_models.status(model_scope, model_key)}", flush=True, ) @@ -595,21 +926,29 @@ def save_event_time_model_command(args): } model_path = Path(real_args[0]) - if model_path.parent: - model_path.parent.mkdir(parents=True, exist_ok=True) + target = real_args[1] if len(real_args) >= 2 else "all" - event_time_model.save(model_path) + if target in ("", "all", "*"): + event_time_models.save(model_path, "all", "") + model_id = "all" + elif len(real_args) >= 3: + model_scope, model_key, model_id = normalize_model_identity(real_args[1], real_args[2]) + event_time_models.save(model_path, model_scope, model_key) + else: + model_scope, model_key, model_id = normalize_model_identity("switch", target) + event_time_models.save(model_path, model_scope, model_key) return { "status": "done", "et": str(time.time() - st), "path": str(model_path), + "target": model_id, "model_version": str(event_time_model_version), } def load_event_time_model_command(args): - global event_time_model_version + global event_time_model_version, event_time_model st = time.time() real_args = _real_command_args(args) @@ -629,25 +968,50 @@ def load_event_time_model_command(args): "error": f"model path does not exist: {model_path}", } - event_time_model.load(model_path) + target = real_args[1] if len(real_args) >= 2 else "all" + + if target in ("", "all", "*"): + event_time_models.load(model_path) + model_id = "all" + elif len(real_args) >= 3: + model_scope, model_key, model_id = normalize_model_identity(real_args[1], real_args[2]) + event_time_models.load(model_path, model_scope, model_key) + else: + model_scope, model_key, model_id = normalize_model_identity("switch", target) + event_time_models.load(model_path, model_scope, model_key) + + event_time_model = event_time_models.get() event_time_model_version += 1 return { "status": "done", "et": str(time.time() - st), "path": str(model_path), + "target": model_id, "model_version": str(event_time_model_version), } def event_time_model_status_command(args): st = time.time() + real_args = _real_command_args(args) + + if not real_args or real_args[0] in ("", "all", "*"): + model_scope = "all" + model_key = "" + model_id = "all" + elif len(real_args) >= 2: + model_scope, model_key, model_id = normalize_model_identity(real_args[0], real_args[1]) + else: + model_scope, model_key, model_id = normalize_model_identity("switch", real_args[0]) + ret = { "status": "done", "et": str(time.time() - st), + "target": model_id, "model_version": str(event_time_model_version), } - ret.update(event_time_model.status()) + ret.update(event_time_models.status(model_scope, model_key)) return ret @@ -663,15 +1027,24 @@ def launch_event_time_inferencing(args, bindata): requested_count = 1 payload = bindata.decode("utf-8", errors="replace").strip() - predictions = event_time_model.predict_from_text( + rows = list(iter_event_time_rows_from_payload(payload) or []) + + if rows: + model_scope, model_key, model_id = event_time_model_identity_from_row(rows[0]) + else: + model_scope, model_key, model_id = normalize_model_identity() + + model = event_time_models.get(model_scope, model_key) + predictions = model.predict_from_text( payload, requested_count=max(1, requested_count), ) predictions_str = " ".join(str(float(x)) for x in predictions) director_debug( - f"[event-time inference] requested_count={requested_count} " - f"payload_bytes={len(payload)} predictions={predictions_str}" + f"[event-time inference] model={model_id} requested_count={requested_count} " + f"payload_bytes={len(payload)} trained={int(model.trained)} " + f"rows={len(model.rows)} predictions={predictions_str}" ) return ("done", time.time() - st, predictions_str) @@ -695,6 +1068,7 @@ def _real_command_args(args): return out + def train_iteration_time_model_command(args): global iteration_model_version @@ -875,7 +1249,6 @@ def load_iteration_records_csv_command(args): "loaded_clients": str(len(loaded_clients)), } - def load_iteration_time_model_command(args): global iteration_model_version @@ -916,6 +1289,7 @@ def load_iteration_time_model_command(args): } + def iteration_time_model_status_command(args): st = time.time() real_args = _real_command_args(args) @@ -960,8 +1334,6 @@ def iteration_time_model_status_command(args): "clients": ";".join(per_client), } - -# Backwards-compatible wrapper for the old command name. def launch_surrogate_inferencing(args, bindata): return launch_iteration_time_inferencing(args, bindata) @@ -991,7 +1363,12 @@ def director_request_command(msg, bindata): family = str(msg.get("surrogate_family", "iteration-time")).strip() operation = str(msg.get("operation", "")).strip() backend = str(msg.get("surrogate_backend", "")).strip() - args = _real_command_args(msg.get("args", [])) + # Do not normalize here. Each command handler normalizes exactly once. + # + # Normalizing here and again in receiverecords()/inference breaks client 1: + # ["2", "1", "9"] -> ["1", "9"] -> ["9"] + # so the server drops client 1 records and later returns empty predictions. + args = msg.get("args", []) operation_aliases = { "status": "model-status", From 0318344e96a45fef549765a354c2a9bce5e82e7d Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Mon, 22 Jun 2026 12:55:02 -0400 Subject: [PATCH 02/14] Set swm and torch to disabled by default --- CODES-compile-instructions.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CODES-compile-instructions.sh b/CODES-compile-instructions.sh index 79d64c3f..2b4f781e 100644 --- a/CODES-compile-instructions.sh +++ b/CODES-compile-instructions.sh @@ -3,9 +3,9 @@ set -euo pipefail set -x # Switches -swm_enable=1 +swm_enable=0 union_enable=0 -torch_enable=1 +torch_enable=0 # Uncomment below for MPICH #export PATH=/usr/local/mpich-4.1.2/bin/:"$PATH" From 83ea73ea793a07a879e55bf7da4d6a0aa5674d1d Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Mon, 22 Jun 2026 15:52:10 -0400 Subject: [PATCH 03/14] Stale code cleanup --- src/networks/model-net/dragonfly-dally.C | 41 +++++-------- src/surrogate/zmqml/zmqmlserver.py | 77 +----------------------- 2 files changed, 18 insertions(+), 100 deletions(-) diff --git a/src/networks/model-net/dragonfly-dally.C b/src/networks/model-net/dragonfly-dally.C index f97e71c8..5581063a 100644 --- a/src/networks/model-net/dragonfly-dally.C +++ b/src/networks/model-net/dragonfly-dally.C @@ -2716,14 +2716,8 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { char event_time_inference_enabled_str[MAX_NAME_LENGTH]; event_time_inference_enabled_str[0] = '\0'; - char const* inferencing_enabled_env = getenv("INFERENCING_ENABLED"); - if (inferencing_enabled_env && strlen(inferencing_enabled_env) > 0) { - snprintf(event_time_inference_enabled_str, sizeof(event_time_inference_enabled_str), "%s", - inferencing_enabled_env); - } else { - configuration_get_value(&config, "DIRECTOR", "inferencing_enabled", anno, - event_time_inference_enabled_str, MAX_NAME_LENGTH); - } + configuration_get_value(&config, "DIRECTOR", "inferencing_enabled", anno, + event_time_inference_enabled_str, MAX_NAME_LENGTH); /* * Do not expose a separate event-time inference flag. @@ -2758,14 +2752,8 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { char event_time_training_enabled_str[MAX_NAME_LENGTH]; event_time_training_enabled_str[0] = '\0'; - char const* training_enabled_env = getenv("TRAINING_ENABLED"); - if (training_enabled_env && strlen(training_enabled_env) > 0) { - snprintf(event_time_training_enabled_str, sizeof(event_time_training_enabled_str), "%s", - training_enabled_env); - } else { - configuration_get_value(&config, "DIRECTOR", "training_enabled", anno, - event_time_training_enabled_str, MAX_NAME_LENGTH); - } + configuration_get_value(&config, "DIRECTOR", "training_enabled", anno, + event_time_training_enabled_str, MAX_NAME_LENGTH); event_time_surrogate_family_selected = strcmp(event_time_surrogate_family_str, "event-time") == 0; @@ -2786,15 +2774,6 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { event_time_zmq_flush_registered = 1; } - if (dfdally_surrogate_debug_prints) { - fprintf(stderr, - "[event-time records] family=%s training_enabled=%s send_to_zmq=%d batch_size=%d\n", - event_time_surrogate_family_str, event_time_training_enabled_str, - event_time_training_records_enabled, event_time_zmq_batch_size); - fflush(stderr); - } - - // START Surrogate configuration char enable_str[MAX_NAME_LENGTH]; enable_str[0] = '\0'; @@ -2819,6 +2798,18 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { dfdally_surrogate_debug_prints = dfdally_string_is_true(debug_prints_str); + if(dfdally_surrogate_debug_prints) { + fprintf( + stderr, + "[event-time records] family=%s training_enabled=%s send_to_zmq=%d batch_size=%d\n", + event_time_surrogate_family_str, + event_time_training_enabled_str, + event_time_training_records_enabled, + event_time_zmq_batch_size + ); + fflush(stderr); + } + // if surrogate mode has been set up if (enable_network_surrogate) { struct network_surrogate_config surr_conf = { diff --git a/src/surrogate/zmqml/zmqmlserver.py b/src/surrogate/zmqml/zmqmlserver.py index 9631c106..4e65588c 100755 --- a/src/surrogate/zmqml/zmqmlserver.py +++ b/src/surrogate/zmqml/zmqmlserver.py @@ -103,70 +103,6 @@ def model_identity_from_real_args( -class ScopedIterationTimeModelRegistry: - """Compatibility facade that keeps iteration-time unscoped. - - Event-time uses scoped models, but iteration-time should remain the original - client/app-level model. The surrounding unified Director request code may - still pass model_scope/model_key arguments; this facade deliberately ignores - them and routes all iteration-time records/inference to one plain - IterationTimeModelRegistry. - """ - - def __init__(self, kwargs: dict): - self.kwargs = dict(kwargs) - self.registry = IterationTimeModelRegistry(**self.kwargs) - self.debug = False - - def set_debug(self, enabled: bool) -> None: - self.debug = bool(enabled) - self.registry.set_debug(self.debug) - - def get_registry(self, model_scope: str | None = None, model_key: str | None = None) -> IterationTimeModelRegistry: - return self.registry - - def get(self, client_id: int, model_scope: str | None = None, model_key: str | None = None): - return self.registry.get(client_id) - - def set_client_app_id(self, client_id: int, app_id: int, model_scope: str | None = None, model_key: str | None = None) -> None: - self.registry.set_client_app_id(client_id, app_id) - - def predict(self, client_id: int, requested_horizon: int | None = None, model_scope: str | None = None, model_key: str | None = None): - return self.registry.predict(client_id, requested_horizon) - - def train_or_update(self, model_scope: str | None = None, model_key: str | None = None) -> bool: - return self.registry.train_or_update() - - def save(self, path: str) -> None: - self.registry.save(path) - - def load(self, path: str) -> None: - self.registry.load(path) - self.registry.set_debug(self.debug) - - def status(self, model_scope: str | None = None, model_key: str | None = None) -> dict: - client_ids = sorted(self.registry.models.keys()) - total_clients = len(client_ids) - trained_clients = 0 - total_records = 0 - client_summaries = [] - - for client_id in client_ids: - model = self.registry.get(client_id) - records = len(model.records) - trained = bool(model.trained) - total_records += records - trained_clients += int(trained) - client_summaries.append( - f"{client_id}:records={records},trained={int(trained)}" - ) - - return { - "total_clients": str(total_clients), - "trained_clients": str(trained_clients), - "total_records": str(total_records), - "clients": ";".join(client_summaries), - } class ScopedEventTimeModelRegistry: def __init__(self, kwargs: dict): @@ -293,7 +229,6 @@ def status(self, model_scope: str | None = None, model_key: str | None = None) - iteration_time_models = IterationTimeModelRegistry(**ITERATION_MODEL_KWARGS) event_time_models = ScopedEventTimeModelRegistry(EVENT_TIME_MODEL_KWARGS) -# Compatibility alias for old helper code that still references event_time_model. event_time_model = event_time_models.get() iteration_model_path = os.environ.get("ZMQML_ITERATION_MODEL_PATH", "").strip() event_time_model_path = os.environ.get("ZMQML_EVENT_TIME_MODEL_PATH", "").strip() @@ -303,10 +238,7 @@ def status(self, model_scope: str | None = None, model_key: str | None = None) - app_alloc_path = os.environ.get("ZMQML_APP_ALLOC_PATH", "").strip() auto_train_on_records = os.environ.get( - "ZMQML_AUTO_TRAIN_ON_RECORDS", "1" -).strip().lower() in ("1", "true", "yes", "on") -event_time_auto_train_on_records = os.environ.get( - "ZMQML_EVENT_TIME_AUTO_TRAIN_ON_RECORDS", "0" + "ZMQML_AUTO_TRAIN_ON_RECORDS", "0" ).strip().lower() in ("1", "true", "yes", "on") iteration_model_version = 0 @@ -811,7 +743,7 @@ def receive_event_time_records(args, bindata): loaded_rows += accepted per_model_loaded[model_id] = accepted - if accepted > 0 and event_time_auto_train_on_records: + if accepted > 0 and auto_train_on_records: model.train_or_update() if loaded_rows > 0: @@ -1363,11 +1295,6 @@ def director_request_command(msg, bindata): family = str(msg.get("surrogate_family", "iteration-time")).strip() operation = str(msg.get("operation", "")).strip() backend = str(msg.get("surrogate_backend", "")).strip() - # Do not normalize here. Each command handler normalizes exactly once. - # - # Normalizing here and again in receiverecords()/inference breaks client 1: - # ["2", "1", "9"] -> ["1", "9"] -> ["9"] - # so the server drops client 1 records and later returns empty predictions. args = msg.get("args", []) operation_aliases = { From 8c2a44a6f07170b3d861771803127946ecae750a Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Tue, 23 Jun 2026 10:48:13 -0400 Subject: [PATCH 04/14] Fix argument count for director surrogate --- src/surrogate/director-client.C | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/surrogate/director-client.C b/src/surrogate/director-client.C index 5b658d07..a588ef3f 100644 --- a/src/surrogate/director-client.C +++ b/src/surrogate/director-client.C @@ -1005,8 +1005,8 @@ void director_get_surrogate_prediction(director_state* s, tw_bf* bf, director_me char args[DIR_ZMQ_ARG_LENGTH]; sprintf(commandstr, "%s", "director-request"); - sprintf(args, "%d;%llu;%d;", 3, (unsigned long long)s->director_id, - DIR_MAX_PREDICTION); // num-of-args;num-record + sprintf(args, "%d;%llu;%d;", 2, (unsigned long long)s->director_id, + DIR_MAX_PREDICTION); // num-of-args;client-id;num-predictions // The Python side primarily uses records previously sent through // send-records. Keep the payload empty for now rather than sending From c22b9684b1e961e777839050f5a7d04d306f0346 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Tue, 23 Jun 2026 11:04:17 -0400 Subject: [PATCH 05/14] Fix clang format issues --- codes/surrogate/director-client.h | 2 +- src/networks/model-net/dragonfly-dally.C | 43 ++++++++------------- src/surrogate/director-client.C | 49 ++++++++---------------- 3 files changed, 35 insertions(+), 59 deletions(-) diff --git a/codes/surrogate/director-client.h b/codes/surrogate/director-client.h index b9c4b3fd..fa563c6a 100644 --- a/codes/surrogate/director-client.h +++ b/codes/surrogate/director-client.h @@ -124,7 +124,7 @@ extern "C" { // const char* data); -extern void director_lp_register_model(const char *); +extern void director_lp_register_model(const char*); extern void director_record_external_zmq_latency(double processing_sec, double total_sec); /* extern void director_parse_args(char *args, int **args_array, int *length); diff --git a/src/networks/model-net/dragonfly-dally.C b/src/networks/model-net/dragonfly-dally.C index 5581063a..780b39ed 100644 --- a/src/networks/model-net/dragonfly-dally.C +++ b/src/networks/model-net/dragonfly-dally.C @@ -65,18 +65,13 @@ * resolve it to null.) */ #ifdef USE_ZMQML -extern std::vector zmqml_director_request( - const std::string& surrogate_family, - const std::string& surrogate_backend, - const std::string& operation, - const std::vector& args, - const std::string& bindata -) __attribute__((weak)); - -extern "C" void director_record_external_zmq_latency( - double processing_sec, - double total_sec -) __attribute__((weak)); +extern std::vector +zmqml_director_request(const std::string& surrogate_family, const std::string& surrogate_backend, + const std::string& operation, const std::vector& args, + const std::string& bindata) __attribute__((weak)); + +extern "C" void director_record_external_zmq_latency(double processing_sec, double total_sec) + __attribute__((weak)); extern void director_record_zmq_latency_stats(const char* label, const std::vector& ret, @@ -127,7 +122,7 @@ extern void director_record_zmq_latency_stats(const char* label, #define ALWAYS_DETERMINISTIC_NETWORK 0 #define num_chunks_for(message_size, chunk_size) \ - ((message_size) ? ((message_size) + (chunk_size) - 1) / (chunk_size) : 1) + ((message_size) ? ((message_size) + (chunk_size)-1) / (chunk_size) : 1) /* handles terminal and router events like packet generate/send/receive/buffer */ typedef struct terminal_state terminal_state; @@ -308,7 +303,7 @@ static std::vector dfdally_event_time_director_request_with_latency double zmq_processing_time = 0.0; if (ret.size() > 1) { - char *endptr = NULL; + char* endptr = NULL; double parsed = strtod(ret[1].c_str(), &endptr); if (endptr != ret[1].c_str() && isfinite(parsed) && parsed >= 0.0) { @@ -2717,7 +2712,7 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { event_time_inference_enabled_str[0] = '\0'; configuration_get_value(&config, "DIRECTOR", "inferencing_enabled", anno, - event_time_inference_enabled_str, MAX_NAME_LENGTH); + event_time_inference_enabled_str, MAX_NAME_LENGTH); /* * Do not expose a separate event-time inference flag. @@ -2753,7 +2748,7 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { event_time_training_enabled_str[0] = '\0'; configuration_get_value(&config, "DIRECTOR", "training_enabled", anno, - event_time_training_enabled_str, MAX_NAME_LENGTH); + event_time_training_enabled_str, MAX_NAME_LENGTH); event_time_surrogate_family_selected = strcmp(event_time_surrogate_family_str, "event-time") == 0; @@ -2798,15 +2793,11 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { dfdally_surrogate_debug_prints = dfdally_string_is_true(debug_prints_str); - if(dfdally_surrogate_debug_prints) { - fprintf( - stderr, - "[event-time records] family=%s training_enabled=%s send_to_zmq=%d batch_size=%d\n", - event_time_surrogate_family_str, - event_time_training_enabled_str, - event_time_training_records_enabled, - event_time_zmq_batch_size - ); + if (dfdally_surrogate_debug_prints) { + fprintf(stderr, + "[event-time records] family=%s training_enabled=%s send_to_zmq=%d batch_size=%d\n", + event_time_surrogate_family_str, event_time_training_enabled_str, + event_time_training_records_enabled, event_time_zmq_batch_size); fflush(stderr); } @@ -7275,7 +7266,7 @@ static void router_packet_receive(router_state* s, tw_bf* bf, terminal_dally_mes if (output_chan >= s->params->num_vcs || output_chan < 0) tw_error(TW_LOC, "\n Output channel %d great than available VCs %d", output_chan, s->params->num_vcs - 1); - //cur_chunk->msg.packet_ID, output_chan, output_port, s->router_id, dest_router_id, cur_chunk->msg.path_type, src_grp_id, dest_grp_id, msg->src_terminal_id); + //cur_chunk->msg.packet_ID, output_chan, output_port, s->router_id, dest_router_id, cur_chunk->msg.path_type, src_grp_id, dest_grp_id, msg->src_terminal_id); #if DEBUG == 1 if (cur_chunk->msg.packet_ID == LLU(TRACK_PKT) && cur_chunk->msg.src_terminal_id == T_ID) diff --git a/src/surrogate/director-client.C b/src/surrogate/director-client.C index a588ef3f..b6f50757 100644 --- a/src/surrogate/director-client.C +++ b/src/surrogate/director-client.C @@ -86,10 +86,7 @@ std::vector director_client_request_family(const char* surrogate_fa int surrogate_enabled = 0; int inferencing_enabled = 1; -static void director_record_zmq_latency_values( - double processing_sec, - double total_sec) -{ +static void director_record_zmq_latency_values(double processing_sec, double total_sec) { if (evaluate_perf != 1) { return; } @@ -106,50 +103,39 @@ static void director_record_zmq_latency_values( director_zmq_total_elapsed_times.push_back(total_sec); } -static void director_record_zmq_latency_stats( - const char* label, - const std::vector& ret, - double local_latency_sec) -{ +static void director_record_zmq_latency_stats(const char* label, + const std::vector& ret, + double local_latency_sec) { double zmq_processing_time = 0.0; if (ret.size() > 1) { - char *endptr = NULL; + char* endptr = NULL; double parsed = strtod(ret[1].c_str(), &endptr); if (endptr != ret[1].c_str() && std::isfinite(parsed) && parsed >= 0.0) { zmq_processing_time = parsed; } else if (director_debug_prints) { - fprintf( - stderr, - "[DIR] Warning: could not parse zmq processing time from reply field ret[1]=%s request=%s\n", - ret[1].c_str(), - label ? label : "" - ); + fprintf(stderr, + "[DIR] Warning: could not parse zmq processing time from reply field ret[1]=%s " + "request=%s\n", + ret[1].c_str(), label ? label : ""); fflush(stderr); } } else if (director_debug_prints) { - fprintf( - stderr, - "[DIR] Warning: zmq reply too short for perf timing: ret.size()=%llu request=%s\n", - (unsigned long long)ret.size(), - label ? label : "" - ); + fprintf(stderr, + "[DIR] Warning: zmq reply too short for perf timing: ret.size()=%llu request=%s\n", + (unsigned long long)ret.size(), label ? label : ""); fflush(stderr); } director_record_zmq_latency_values(zmq_processing_time, local_latency_sec); } -extern "C" void director_record_external_zmq_latency( - double processing_sec, - double total_sec) -{ +extern "C" void director_record_external_zmq_latency(double processing_sec, double total_sec) { director_record_zmq_latency_values(processing_sec, total_sec); } -static int director_surrogate_family_is(const char *family) -{ +static int director_surrogate_family_is(const char* family) { return strcmp(director_config_global.surrogate_family, family) == 0; } @@ -1362,8 +1348,7 @@ static void director_print_zmq_latency_stats_once(void) { } -void director_finalize(director_state* s, tw_lp* lp) -{ +void director_finalize(director_state* s, tw_lp* lp) { director_print_zmq_latency_stats_once(); /* @@ -1390,9 +1375,9 @@ tw_lptype dir_lp = {(init_f)director_init, (map_f)codes_mapping, sizeof(director_state)}; -extern void director_lp_register_model(const char * dir_lp_name){ +extern void director_lp_register_model(const char* dir_lp_name) { int num_dir_per_mgrp = codes_mapping_get_lp_count("MODELNET_GRP", 1, "dir-nw-lp", NULL, 0); - if(num_dir_per_mgrp > 0){ + if (num_dir_per_mgrp > 0) { lp_type_register(dir_lp_name, &dir_lp); } } From d5f784dace3b5ee64e8154a946b38808b1737257 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Tue, 23 Jun 2026 11:15:45 -0400 Subject: [PATCH 06/14] Formatted files with clang-format-20 This commit formats files with clang-format-20 ,which is used by the CI, instead of just clang-format. --- src/networks/model-net/dragonfly-dally.C | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/networks/model-net/dragonfly-dally.C b/src/networks/model-net/dragonfly-dally.C index 780b39ed..d5470dfa 100644 --- a/src/networks/model-net/dragonfly-dally.C +++ b/src/networks/model-net/dragonfly-dally.C @@ -122,7 +122,7 @@ extern void director_record_zmq_latency_stats(const char* label, #define ALWAYS_DETERMINISTIC_NETWORK 0 #define num_chunks_for(message_size, chunk_size) \ - ((message_size) ? ((message_size) + (chunk_size)-1) / (chunk_size) : 1) + ((message_size) ? ((message_size) + (chunk_size) - 1) / (chunk_size) : 1) /* handles terminal and router events like packet generate/send/receive/buffer */ typedef struct terminal_state terminal_state; @@ -7266,7 +7266,7 @@ static void router_packet_receive(router_state* s, tw_bf* bf, terminal_dally_mes if (output_chan >= s->params->num_vcs || output_chan < 0) tw_error(TW_LOC, "\n Output channel %d great than available VCs %d", output_chan, s->params->num_vcs - 1); - //cur_chunk->msg.packet_ID, output_chan, output_port, s->router_id, dest_router_id, cur_chunk->msg.path_type, src_grp_id, dest_grp_id, msg->src_terminal_id); + //cur_chunk->msg.packet_ID, output_chan, output_port, s->router_id, dest_router_id, cur_chunk->msg.path_type, src_grp_id, dest_grp_id, msg->src_terminal_id); #if DEBUG == 1 if (cur_chunk->msg.packet_ID == LLU(TRACK_PKT) && cur_chunk->msg.src_terminal_id == T_ID) From e9379682be37e7da223cd9e18618295f03fabe05 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Tue, 23 Jun 2026 11:19:48 -0400 Subject: [PATCH 07/14] Add clang-format major release version to README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 92b61cf4..56820db6 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,7 @@ This repo uses [clang-format](https://clang.llvm.org/docs/ClangFormat.html) to k - **Emacs:** see [clang-format.el](https://clang.llvm.org/docs/ClangFormat.html#emacs-integration). To reformat a file manually: `clang-format -i path/to/file.c`. CI runs `clang-format --dry-run --Werror` on every PR and rejects any drift, so PRs with unformatted code don't merge. +Note: The CI uses clang-format major release version 20, so you should format your files with that version. ### Determinism From 9ab9deb920ffb2f6d0858b9b8a46bfc5a51091c5 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Wed, 1 Jul 2026 14:47:29 -0400 Subject: [PATCH 08/14] Add tests for director surrogate simulations --- .github/workflows/zmqml-hybrid.yml | 143 ++++++++++++++++++ src/surrogate/zmqml/Makefile | 2 +- src/surrogate/zmqml/zmqmlrequester.cpp | 11 +- tests/CMakeLists.txt | 7 + tests/zmqml-event-time-hybrid-workflow.sh | 138 +++++++++++++++++ tests/zmqml-iteration-time-hybrid-workflow.sh | 125 +++++++++++++++ 6 files changed, 424 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/zmqml-hybrid.yml create mode 100755 tests/zmqml-event-time-hybrid-workflow.sh create mode 100755 tests/zmqml-iteration-time-hybrid-workflow.sh diff --git a/.github/workflows/zmqml-hybrid.yml b/.github/workflows/zmqml-hybrid.yml new file mode 100644 index 00000000..0fac8283 --- /dev/null +++ b/.github/workflows/zmqml-hybrid.yml @@ -0,0 +1,143 @@ +name: ZMQML Hybrid + +on: + pull_request: + branches: + - master + push: + branches: + - master + +env: + ROSS_REF: 9b6ccb18f9b9db438bf41b5b221d0ef16a4dac48 + ZMQML_IMAGE: ghcr.io/codes-org/codes-ci-full:latest + +jobs: + zmqml-hybrid: + name: zmqml hybrid workflows + runs-on: ubuntu-24.04 + + steps: + - name: Checkout CODES + uses: actions/checkout@v4 + with: + path: codes + + - name: Checkout ROSS + uses: actions/checkout@v4 + with: + repository: ROSS-org/ROSS + ref: ${{ github.event_name == 'schedule' && 'master' || env.ROSS_REF }} + path: ross + + - name: Pull full dependency image + run: docker pull "$ZMQML_IMAGE" + + - name: Create Docker network + run: docker network create codes-zmqml-ci + + - name: Start ZMQML server container + run: | + mkdir -p "$PWD/zmqml-artifacts" + + docker run -d \ + --name zmqml-server \ + --network codes-zmqml-ci \ + -v "$PWD/codes:/work/codes" \ + -v "$PWD/zmqml-artifacts:/work/zmqml-artifacts" \ + -w /work/codes/src/surrogate/zmqml \ + -e ZMQML_ITERATION_HISTORY_LEN=2 \ + -e ZMQML_ITERATION_HORIZON=3 \ + -e ZMQML_ITERATION_TRAIN_STRIDE=1 \ + -e ZMQML_EVENT_TIME_MIN_ROWS=4 \ + -e ZMQML_EVENT_TIME_EPOCHS=2 \ + -e ZMQML_RECORD_LOG_PATH=/work/zmqml-artifacts/iteration-records.csv \ + -e ZMQML_EVENT_TIME_RECORD_LOG_PATH=/work/zmqml-artifacts/event-time-records.csv \ + "$ZMQML_IMAGE" \ + bash -lc 'apt-get update && apt-get install -y python3-zmq python3-numpy python3-sklearn python3-pandas python3-pip + python3 -c 'import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec("torch") else subprocess.check_call([sys.executable, "-m", "pip", "install", "--break-system-packages", "torch", "--index-url", "https://download.pytorch.org/whl/cpu"])' python3-numpy python3-sklearn python3-pip && python3 -c 'import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec("torch") else subprocess.check_call([sys.executable, "-m", "pip", "install", "--break-system-packages", "torch", "--index-url", "https://download.pytorch.org/whl/cpu"])' && exec python3 -u zmqmlserver.py' + + - name: Build ROSS and CODES with ZMQML + run: | + docker run --rm \ + --name codes-zmqml-build \ + --network codes-zmqml-ci \ + -v "$PWD/codes:/work/codes" \ + -v "$PWD/ross:/work/ross" \ + -v "$PWD/ross-install:/work/ross-install" \ + -w /work \ + "$ZMQML_IMAGE" \ + bash -euxo pipefail -c ' + cmake -S ross -B ross/build -G Ninja \ + -DCMAKE_BUILD_TYPE=Debug \ + -DROSS_BUILD_MODELS=ON \ + -DCMAKE_INSTALL_PREFIX=/work/ross-install + + cmake --build ross/build --target install -j + + cd /work/codes/src/surrogate/zmqml + make clean + make + + cd /work/codes + rm -rf build + cmake -S . -B build -G Ninja \ + -DCMAKE_BUILD_TYPE=Debug \ + -DBUILD_TESTING=ON \ + -DUSE_ONLINE=ON \ + -DUSE_SWM=ON \ + -DUSE_TORCH=ON \ + -DUSE_ZMQML=ON \ + -DZMQML_BUILD_PATH=/work/codes/src/surrogate/zmqml \ + -DCMAKE_C_COMPILER=mpicc \ + -DCMAKE_CXX_COMPILER=mpicxx \ + -DROSS_PKG_CONFIG_PATH=/work/ross-install/lib/pkgconfig \ + -DTorch_DIR="$(python3 -c "import torch; print(torch.utils.cmake_prefix_path)")/Torch" + + cmake --build build -j + ' + + - name: Run ZMQML hybrid tests + run: | + docker run --rm \ + --name codes-zmqml-tests \ + --network codes-zmqml-ci \ + -v "$PWD/codes:/work/codes" \ + -v "$PWD/zmqml-artifacts:/work/zmqml-artifacts" \ + -w /work/codes \ + -e ZMQML_ENDPOINT=tcp://zmqml-server:5555 \ + -e ZMQML_TEST_NP=1 \ + "$ZMQML_IMAGE" \ + bash -euxo pipefail -c ' + apt-get update + apt-get install -y python3-zmq + + ctest --test-dir build \ + -R "zmqml-(iteration-time|event-time)-hybrid-workflow.sh" \ + --output-on-failure \ + --timeout 600 \ + -VV + ' + + - name: Dump ZMQML server logs + if: always() + run: docker logs zmqml-server || true + + - name: Stop ZMQML server + if: always() + run: | + docker rm -f zmqml-server || true + docker network rm codes-zmqml-ci || true + + - name: Upload logs on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: zmqml-hybrid-logs + path: | + codes/build/Testing/Temporary/LastTest.log + codes/build/Testing/Temporary/LastTestsFailed.log + codes/build/testing-output/** + zmqml-artifacts/** + if-no-files-found: ignore + retention-days: 14 diff --git a/src/surrogate/zmqml/Makefile b/src/surrogate/zmqml/Makefile index b4abcfab..c760d699 100644 --- a/src/surrogate/zmqml/Makefile +++ b/src/surrogate/zmqml/Makefile @@ -7,7 +7,7 @@ TARGETS=libzmqmlrequester.so demozmqmlrequester all: $(TARGETS) libzmqmlrequester.so: zmqmlrequester.o - $(CXX) -shared -o $@ $^ $(LDFLAGS) + $(CXX) -shared -Wl,-soname,libzmqmlrequester.so -o $@ $^ $(LDFLAGS) zmqmlrequester.o: zmqmlrequester.cpp zmqmlrequester.h $(CXX) $(CXXFLAGS) -fPIC -c $< -o $@ diff --git a/src/surrogate/zmqml/zmqmlrequester.cpp b/src/surrogate/zmqml/zmqmlrequester.cpp index 13452b62..af631363 100644 --- a/src/surrogate/zmqml/zmqmlrequester.cpp +++ b/src/surrogate/zmqml/zmqmlrequester.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include "rapidjson/document.h" @@ -18,7 +19,15 @@ using namespace std; using namespace rapidjson; -static string endpoint = "tcp://localhost:5555"; +static string endpoint_from_env() { + const char* env = getenv("ZMQML_ENDPOINT"); + if (env && env[0] != '\0') { + return string(env); + } + return "tcp://localhost:5555"; +} + +static string endpoint = endpoint_from_env(); static int debug = 0; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6390965c..dfb8130f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -109,6 +109,13 @@ if(USE_UNION) ) endif() +if(USE_ZMQML) + list(APPEND test-shell-files + zmqml-iteration-time-hybrid-workflow.sh + zmqml-event-time-hybrid-workflow.sh + ) +endif() + foreach(testname ${test-shell-files}) add_test(NAME ${testname} COMMAND "${CMAKE_CURRENT_BINARY_DIR}/run-test.sh" "${CMAKE_CURRENT_SOURCE_DIR}/${testname}" diff --git a/tests/zmqml-event-time-hybrid-workflow.sh b/tests/zmqml-event-time-hybrid-workflow.sh new file mode 100755 index 00000000..e356bee9 --- /dev/null +++ b/tests/zmqml-event-time-hybrid-workflow.sh @@ -0,0 +1,138 @@ +#!/bin/bash +set -euo pipefail + +if [[ -z "${bindir:-}" ]]; then + echo "bindir variable not set" + exit 1 +fi + +if [[ -z "${srcdir:-}" ]]; then + echo "srcdir variable not set" + exit 1 +fi + +endpoint="${ZMQML_ENDPOINT:-tcp://localhost:5555}" +np="${ZMQML_TEST_NP:-1}" + +expfolder="$PWD" +artifacts="$expfolder/zmqml-event-artifacts" +mkdir -p "$artifacts" + +train_workdir="$artifacts/train-run" +hybrid_workdir="$artifacts/hybrid-run" +mkdir -p "$train_workdir" "$hybrid_workdir" + +cp "$bindir/doc/example/kb.dfdally-72-milc-small.json" "$expfolder/" +cp "$bindir/doc/example/kb.dfdally-72-milc-small.alloc.conf" "$expfolder/" + +# Keep the event-time CI test small. The stock 10-iteration MILC run +# processes millions of events and makes the hybrid event-time inference +# phase too large for a smoke test. +python3 - "$expfolder/kb.dfdally-72-milc-small.json" <<'PYJSON' +import json +import sys +from pathlib import Path + +path = Path(sys.argv[1]) +data = json.loads(path.read_text()) +data["jobs"]["cfg"]["iteration_cnt"] = 4 +path.write_text(json.dumps(data, indent=2) + "\n") +PYJSON + +printf '72 %s/kb.dfdally-72-milc-small.json 1 0\n' "$expfolder" \ + > "$expfolder/kb.dfdally-72-milc-small.workload.conf" + +make_event_conf() { + local out="$1" + local inferencing="$2" + local training="$3" + + SURROGATE_BACKEND=dragonfly-dally \ + SURROGATE_FAMILY=event-time \ + START_ITER=1 \ + END_ITER=2 \ + RETRAIN_ENABLED=0 \ + RETRAIN_ITER=-1 \ + RETRAIN_SAVE_PATH="" \ + SECOND_SURROGATE_ENABLED=0 \ + SECOND_START_ITER=100000 \ + SECOND_END_ITER=100001 \ + INFERENCING_ENABLED="$inferencing" \ + SURROGATE_ENABLED=1 \ + TRAINING_ENABLED="$training" \ + DIRECTOR_DEBUG_PRINTS=1 \ + SHUTDOWN_ZMQML_SERVER_ON_FINALIZE=0 \ + envsubst < "$bindir/doc/example/kb.dfdally-72-event-time-director.template.conf.in" > "$out" +} + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family event-time \ + status + +# Phase 1: high-fidelity event-time record collection. +make_event_conf "$expfolder/event-time-train.conf" 0 1 + +( + cd "$train_workdir" + ZMQML_ENDPOINT="$endpoint" \ + mpirun -np "$np" "$bindir/src/model-net-mpi-replay" \ + --synch=1 \ + --workload_type=swm-online \ + --disable_compute=0 \ + --payload_sz=4096 \ + --workload_conf_file="$expfolder/kb.dfdally-72-milc-small.workload.conf" \ + --alloc_file="$expfolder/kb.dfdally-72-milc-small.alloc.conf" \ + -- "$expfolder/event-time-train.conf" +) > "$artifacts/event-time-train.out" \ + 2> "$artifacts/event-time-train.err" + +grep 'Net Events Processed' "$artifacts/event-time-train.out" +grep -E '\[event-time records\].*send_to_zmq=1' "$artifacts/event-time-train.err" + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family event-time \ + train \ + > "$artifacts/event-time-train-model.json" + +grep '"status": "done"' "$artifacts/event-time-train-model.json" + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family event-time \ + save "$artifacts/event-time-models" \ + > "$artifacts/event-time-save-model.json" + +test -f "$artifacts/event-time-models/manifest.json" +grep '"status": "done"' "$artifacts/event-time-save-model.json" + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family event-time \ + load "$artifacts/event-time-models" \ + > "$artifacts/event-time-load-model.json" + +grep '"status": "done"' "$artifacts/event-time-load-model.json" + +# Phase 2: hybrid event-time inference. +make_event_conf "$expfolder/event-time-hybrid.conf" 1 0 + +( + cd "$hybrid_workdir" + ZMQML_ENDPOINT="$endpoint" \ + mpirun -np "$np" "$bindir/src/model-net-mpi-replay" \ + --synch=1 \ + --workload_type=swm-online \ + --disable_compute=0 \ + --payload_sz=4096 \ + --workload_conf_file="$expfolder/kb.dfdally-72-milc-small.workload.conf" \ + --alloc_file="$expfolder/kb.dfdally-72-milc-small.alloc.conf" \ + -- "$expfolder/event-time-hybrid.conf" +) > "$artifacts/event-time-hybrid.out" \ + 2> "$artifacts/event-time-hybrid.err" + +grep 'Net Events Processed' "$artifacts/event-time-hybrid.out" +grep -E '\[event-time surrogate\] active=1|\[event-time inference cache-(hit|miss)\]' "$artifacts/event-time-hybrid.err" + +exit 0 diff --git a/tests/zmqml-iteration-time-hybrid-workflow.sh b/tests/zmqml-iteration-time-hybrid-workflow.sh new file mode 100755 index 00000000..bdd3bba0 --- /dev/null +++ b/tests/zmqml-iteration-time-hybrid-workflow.sh @@ -0,0 +1,125 @@ +#!/bin/bash +set -euo pipefail + +if [[ -z "${bindir:-}" ]]; then + echo "bindir variable not set" + exit 1 +fi + +if [[ -z "${srcdir:-}" ]]; then + echo "srcdir variable not set" + exit 1 +fi + +endpoint="${ZMQML_ENDPOINT:-tcp://localhost:5555}" +np="${ZMQML_TEST_NP:-1}" + +expfolder="$PWD" +artifacts="$expfolder/zmqml-iteration-artifacts" +mkdir -p "$artifacts" + +train_workdir="$artifacts/train-run" +hybrid_workdir="$artifacts/hybrid-run" +mkdir -p "$train_workdir" "$hybrid_workdir" + +cp "$bindir/doc/example/kb.dfdally-72-milc-small.workload.conf" "$expfolder/" +cp "$bindir/doc/example/kb.dfdally-72-milc-small.json" "$expfolder/" +cp "$bindir/doc/example/kb.dfdally-72-milc-small.alloc.conf" "$expfolder/" + +make_conf() { + local out="$1" + local start_iter="$2" + local end_iter="$3" + local inferencing="$4" + local surrogate="$5" + local training="$6" + + START_ITER="$start_iter" \ + END_ITER="$end_iter" \ + RETRAIN_ENABLED=0 \ + RETRAIN_ITER=-1 \ + RETRAIN_SAVE_PATH="" \ + SECOND_SURROGATE_ENABLED=0 \ + SECOND_START_ITER=100000 \ + SECOND_END_ITER=100001 \ + INFERENCING_ENABLED="$inferencing" \ + SURROGATE_ENABLED="$surrogate" \ + TRAINING_ENABLED="$training" \ + DIRECTOR_DEBUG_PRINTS=1 \ + SHUTDOWN_ZMQML_SERVER_ON_FINALIZE=0 \ + PACKET_SIZE=4096 \ + CHUNK_SIZE=64 \ + envsubst < "$bindir/doc/example/kb.dfdally-72-zeromq-director.template.conf.in" > "$out" +} + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family iteration-time \ + status + +# Phase 1: high-fidelity/director data collection. Inferencing is off, training +# records are sent to the external ZMQML server. +make_conf "$expfolder/iteration-train.conf" 0 10 0 1 1 + +( + cd "$train_workdir" + ZMQML_ENDPOINT="$endpoint" \ + mpirun -np "$np" "$bindir/src/model-net-mpi-replay" \ + --synch=1 \ + --workload_type=swm-online \ + --disable_compute=0 \ + --payload_sz=4096 \ + --workload_conf_file="$expfolder/kb.dfdally-72-milc-small.workload.conf" \ + --alloc_file="$expfolder/kb.dfdally-72-milc-small.alloc.conf" \ + -- "$expfolder/iteration-train.conf" +) > "$artifacts/iteration-train.out" \ + 2> "$artifacts/iteration-train.err" + +grep 'Net Events Processed' "$artifacts/iteration-train.out" + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family iteration-time \ + train \ + > "$artifacts/iteration-train-model.json" + +grep '"status": "done"' "$artifacts/iteration-train-model.json" + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family iteration-time \ + save "$artifacts/iteration-time-model.pt" \ + > "$artifacts/iteration-save-model.json" + +test -s "$artifacts/iteration-time-model.pt" +grep '"status": "done"' "$artifacts/iteration-save-model.json" + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family iteration-time \ + load "$artifacts/iteration-time-model.pt" \ + > "$artifacts/iteration-load-model.json" + +grep '"status": "done"' "$artifacts/iteration-load-model.json" + +# Phase 2: hybrid inference. Training is off; inference is on. +make_conf "$expfolder/iteration-hybrid.conf" 3 8 1 1 0 + +( + cd "$hybrid_workdir" + ZMQML_ENDPOINT="$endpoint" \ + mpirun -np "$np" "$bindir/src/model-net-mpi-replay" \ + --synch=1 \ + --workload_type=swm-online \ + --disable_compute=0 \ + --payload_sz=4096 \ + --workload_conf_file="$expfolder/kb.dfdally-72-milc-small.workload.conf" \ + --alloc_file="$expfolder/kb.dfdally-72-milc-small.alloc.conf" \ + -- "$expfolder/iteration-hybrid.conf" +) > "$artifacts/iteration-hybrid.out" \ + 2> "$artifacts/iteration-hybrid.err" + +grep 'Net Events Processed' "$artifacts/iteration-hybrid.out" +grep -E '\[DIR\] iteration-time predictions director_id=[0-9]+ count=[1-9][0-9]* values=' "$artifacts/iteration-hybrid.err" + +exit 0 From aae002516a6acb1eb658c630fb166abf0d5f1e05 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Thu, 2 Jul 2026 11:19:31 -0400 Subject: [PATCH 09/14] Fix ZMQML hybrid workflow --- .github/workflows/zmqml-hybrid.yml | 40 +++++++++++++++++++----------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/.github/workflows/zmqml-hybrid.yml b/.github/workflows/zmqml-hybrid.yml index 0fac8283..ac963ba8 100644 --- a/.github/workflows/zmqml-hybrid.yml +++ b/.github/workflows/zmqml-hybrid.yml @@ -27,7 +27,7 @@ jobs: uses: actions/checkout@v4 with: repository: ROSS-org/ROSS - ref: ${{ github.event_name == 'schedule' && 'master' || env.ROSS_REF }} + ref: ${{ env.ROSS_REF }} path: ross - name: Pull full dependency image @@ -54,8 +54,18 @@ jobs: -e ZMQML_RECORD_LOG_PATH=/work/zmqml-artifacts/iteration-records.csv \ -e ZMQML_EVENT_TIME_RECORD_LOG_PATH=/work/zmqml-artifacts/event-time-records.csv \ "$ZMQML_IMAGE" \ - bash -lc 'apt-get update && apt-get install -y python3-zmq python3-numpy python3-sklearn python3-pandas python3-pip - python3 -c 'import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec("torch") else subprocess.check_call([sys.executable, "-m", "pip", "install", "--break-system-packages", "torch", "--index-url", "https://download.pytorch.org/whl/cpu"])' python3-numpy python3-sklearn python3-pip && python3 -c 'import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec("torch") else subprocess.check_call([sys.executable, "-m", "pip", "install", "--break-system-packages", "torch", "--index-url", "https://download.pytorch.org/whl/cpu"])' && exec python3 -u zmqmlserver.py' + bash -euxo pipefail -c ' + apt-get update + apt-get install -y python3-zmq python3-numpy python3-sklearn python3-pandas python3-pip gettext-base + + python3 -c "import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec(\"torch\") else subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"--break-system-packages\", \"torch\", \"--index-url\", \"https://download.pytorch.org/whl/cpu\"])" + + exec python3 -u zmqmlserver.py + ' + + sleep 5 + docker ps --filter name=zmqml-server + docker logs zmqml-server - name: Build ROSS and CODES with ZMQML run: | @@ -68,6 +78,10 @@ jobs: -w /work \ "$ZMQML_IMAGE" \ bash -euxo pipefail -c ' + apt-get update + apt-get install -y python3-zmq python3-numpy python3-sklearn python3-pandas python3-pip gettext-base + python3 -c "import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec(\"torch\") else subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"--break-system-packages\", \"torch\", \"--index-url\", \"https://download.pytorch.org/whl/cpu\"])" + cmake -S ross -B ross/build -G Ninja \ -DCMAKE_BUILD_TYPE=Debug \ -DROSS_BUILD_MODELS=ON \ @@ -75,23 +89,19 @@ jobs: cmake --build ross/build --target install -j - cd /work/codes/src/surrogate/zmqml - make clean - make - cd /work/codes rm -rf build + cmake -S . -B build -G Ninja \ -DCMAKE_BUILD_TYPE=Debug \ -DBUILD_TESTING=ON \ - -DUSE_ONLINE=ON \ - -DUSE_SWM=ON \ - -DUSE_TORCH=ON \ - -DUSE_ZMQML=ON \ - -DZMQML_BUILD_PATH=/work/codes/src/surrogate/zmqml \ + -DCODES_USE_SWM=ON \ + -DCODES_USE_TORCH=ON \ + -DCODES_USE_ZEROMQ=ON \ -DCMAKE_C_COMPILER=mpicc \ -DCMAKE_CXX_COMPILER=mpicxx \ -DROSS_PKG_CONFIG_PATH=/work/ross-install/lib/pkgconfig \ + -DCMAKE_PREFIX_PATH="/opt/swm;/opt/argobots" \ -DTorch_DIR="$(python3 -c "import torch; print(torch.utils.cmake_prefix_path)")/Torch" cmake --build build -j @@ -107,15 +117,17 @@ jobs: -w /work/codes \ -e ZMQML_ENDPOINT=tcp://zmqml-server:5555 \ -e ZMQML_TEST_NP=1 \ + -e ZMQML_CTL_TIMEOUT=30 \ "$ZMQML_IMAGE" \ bash -euxo pipefail -c ' apt-get update - apt-get install -y python3-zmq + apt-get install -y python3-zmq python3-numpy python3-sklearn python3-pandas python3-pip gettext-base + python3 -c "import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec(\"torch\") else subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"--break-system-packages\", \"torch\", \"--index-url\", \"https://download.pytorch.org/whl/cpu\"])" ctest --test-dir build \ -R "zmqml-(iteration-time|event-time)-hybrid-workflow.sh" \ --output-on-failure \ - --timeout 600 \ + --timeout 1200 \ -VV ' From 47a8dec98f70fb06fdd8b0c29b1737ccf32b6735 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Thu, 2 Jul 2026 11:26:41 -0400 Subject: [PATCH 10/14] Fix ROSS path in ZMQML workflow --- .github/workflows/zmqml-hybrid.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/zmqml-hybrid.yml b/.github/workflows/zmqml-hybrid.yml index ac963ba8..402f5abf 100644 --- a/.github/workflows/zmqml-hybrid.yml +++ b/.github/workflows/zmqml-hybrid.yml @@ -100,8 +100,7 @@ jobs: -DCODES_USE_ZEROMQ=ON \ -DCMAKE_C_COMPILER=mpicc \ -DCMAKE_CXX_COMPILER=mpicxx \ - -DROSS_PKG_CONFIG_PATH=/work/ross-install/lib/pkgconfig \ - -DCMAKE_PREFIX_PATH="/opt/swm;/opt/argobots" \ + -DCMAKE_PREFIX_PATH="/work/ross-install;/opt/swm;/opt/argobots" \ -DTorch_DIR="$(python3 -c "import torch; print(torch.utils.cmake_prefix_path)")/Torch" cmake --build build -j From 74f89a5fe6e89554647af5cf4e45cb4e70d239a4 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Thu, 2 Jul 2026 11:32:37 -0400 Subject: [PATCH 11/14] Register ZMQML tests with ZeroMQ --- .github/workflows/zmqml-hybrid.yml | 7 +++++++ tests/CMakeLists.txt | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/zmqml-hybrid.yml b/.github/workflows/zmqml-hybrid.yml index 402f5abf..6791ee6f 100644 --- a/.github/workflows/zmqml-hybrid.yml +++ b/.github/workflows/zmqml-hybrid.yml @@ -123,6 +123,13 @@ jobs: apt-get install -y python3-zmq python3-numpy python3-sklearn python3-pandas python3-pip gettext-base python3 -c "import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec(\"torch\") else subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"--break-system-packages\", \"torch\", \"--index-url\", \"https://download.pytorch.org/whl/cpu\"])" + ctest --test-dir build -N \ + -R "zmqml-(iteration-time|event-time)-hybrid-workflow.sh" \ + | tee /tmp/zmqml-ctest-list.txt + + grep -E "Test #[0-9]+: zmqml-(iteration-time|event-time)-hybrid-workflow.sh" \ + /tmp/zmqml-ctest-list.txt + ctest --test-dir build \ -R "zmqml-(iteration-time|event-time)-hybrid-workflow.sh" \ --output-on-failure \ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e9e369b1..66518754 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -262,7 +262,7 @@ if(USE_UNION) list(APPEND test-shell-files ${union-shell-files}) endif() -if(USE_ZMQML) +if(USE_ZEROMQ) list(APPEND test-shell-files zmqml-iteration-time-hybrid-workflow.sh zmqml-event-time-hybrid-workflow.sh From 26f7cd270571941934c84d5067aaf4e361766ddc Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Thu, 2 Jul 2026 13:46:46 -0400 Subject: [PATCH 12/14] Validate ZMQML server evidence --- .github/workflows/zmqml-hybrid.yml | 37 ++++++++++++++++++- tests/zmqml-event-time-hybrid-workflow.sh | 23 ++++++++++++ tests/zmqml-iteration-time-hybrid-workflow.sh | 23 ++++++++++++ 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/.github/workflows/zmqml-hybrid.yml b/.github/workflows/zmqml-hybrid.yml index 6791ee6f..564481e8 100644 --- a/.github/workflows/zmqml-hybrid.yml +++ b/.github/workflows/zmqml-hybrid.yml @@ -137,6 +137,39 @@ jobs: -VV ' + - name: Validate ZMQML server logs + run: | + mkdir -p "$PWD/zmqml-artifacts" + docker logs zmqml-server 2>&1 | tee "$PWD/zmqml-artifacts/zmqml-server.log" + + require_log() { + local pattern="$1" + local description="$2" + + if ! grep -nE "$pattern" "$PWD/zmqml-artifacts/zmqml-server.log"; then + echo "::error::Missing server-side ZMQML evidence: $description" + exit 1 + fi + } + + require_log '\[zmqmlserver\] director_debug_prints=1' \ + 'simulation configured the server debug flag' + + require_log '\[iteration-time records\]' \ + 'iteration-time records reached the server' + + require_log '\[iteration-time inference\].*predictions=' \ + 'iteration-time inference reached the server and returned predictions' + + require_log '\[event-time records\]' \ + 'event-time records reached the server' + + require_log '\[event-time inference\].*predictions=' \ + 'event-time inference reached the server and returned predictions' + + test -s "$PWD/zmqml-artifacts/iteration-records.csv" + test -s "$PWD/zmqml-artifacts/event-time-records.csv" + - name: Dump ZMQML server logs if: always() run: docker logs zmqml-server || true @@ -147,8 +180,8 @@ jobs: docker rm -f zmqml-server || true docker network rm codes-zmqml-ci || true - - name: Upload logs on failure - if: failure() + - name: Upload ZMQML logs + if: always() uses: actions/upload-artifact@v4 with: name: zmqml-hybrid-logs diff --git a/tests/zmqml-event-time-hybrid-workflow.sh b/tests/zmqml-event-time-hybrid-workflow.sh index e356bee9..18e545cb 100755 --- a/tests/zmqml-event-time-hybrid-workflow.sh +++ b/tests/zmqml-event-time-hybrid-workflow.sh @@ -98,6 +98,29 @@ python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ grep '"status": "done"' "$artifacts/event-time-train-model.json" +python3 - "$artifacts/event-time-train-model.json" <<'PY_EVENT_TRAIN_CHECK' +import json +import sys +from pathlib import Path + +path = Path(sys.argv[1]) +data = json.loads(path.read_text()) + +total_rows = int(data.get("total_rows", "0")) +trained_models = int(data.get("trained_models", "0")) + +if total_rows <= 0: + raise SystemExit(f"event-time train reported no server-side rows: {data}") + +if trained_models <= 0: + raise SystemExit(f"event-time train reported no trained models: {data}") + +print( + "event-time server train status: " + f"total_rows={total_rows} trained_models={trained_models}" +) +PY_EVENT_TRAIN_CHECK + python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ --endpoint "$endpoint" \ --family event-time \ diff --git a/tests/zmqml-iteration-time-hybrid-workflow.sh b/tests/zmqml-iteration-time-hybrid-workflow.sh index bdd3bba0..70566dcc 100755 --- a/tests/zmqml-iteration-time-hybrid-workflow.sh +++ b/tests/zmqml-iteration-time-hybrid-workflow.sh @@ -85,6 +85,29 @@ python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ grep '"status": "done"' "$artifacts/iteration-train-model.json" +python3 - "$artifacts/iteration-train-model.json" <<'PY_ITER_TRAIN_CHECK' +import json +import sys +from pathlib import Path + +path = Path(sys.argv[1]) +data = json.loads(path.read_text()) + +total_records = int(data.get("total_records", "0")) +trained_clients = int(data.get("trained_clients", "0")) + +if total_records <= 0: + raise SystemExit(f"iteration-time train reported no server-side records: {data}") + +if trained_clients <= 0: + raise SystemExit(f"iteration-time train reported no trained clients: {data}") + +print( + "iteration-time server train status: " + f"total_records={total_records} trained_clients={trained_clients}" +) +PY_ITER_TRAIN_CHECK + python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ --endpoint "$endpoint" \ --family iteration-time \ From 3fa3f8cd80157b83277bcd2e4e56c8f9f67fe47f Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Thu, 2 Jul 2026 14:07:16 -0400 Subject: [PATCH 13/14] Make ZMQML hybrid tests opt-in --- .github/workflows/zmqml-hybrid.yml | 1 + tests/CMakeLists.txt | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/zmqml-hybrid.yml b/.github/workflows/zmqml-hybrid.yml index 564481e8..e041e31d 100644 --- a/.github/workflows/zmqml-hybrid.yml +++ b/.github/workflows/zmqml-hybrid.yml @@ -98,6 +98,7 @@ jobs: -DCODES_USE_SWM=ON \ -DCODES_USE_TORCH=ON \ -DCODES_USE_ZEROMQ=ON \ + -DCODES_ENABLE_ZMQML_HYBRID_TESTS=ON \ -DCMAKE_C_COMPILER=mpicc \ -DCMAKE_CXX_COMPILER=mpicxx \ -DCMAKE_PREFIX_PATH="/work/ross-install;/opt/swm;/opt/argobots" \ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 66518754..4a660ae6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -2,6 +2,10 @@ enable_testing() configure_file(run-test.sh.in run-test.sh) +option(CODES_ENABLE_ZMQML_HYBRID_TESTS + "Register ZMQML hybrid workflow tests that require a running zmqmlserver.py" + OFF) + # codes_add_equivalence_test — register an equivalence/determinism test. # # Runs a model binary two or more times and asserts a marker line (default @@ -262,7 +266,7 @@ if(USE_UNION) list(APPEND test-shell-files ${union-shell-files}) endif() -if(USE_ZEROMQ) +if(USE_ZEROMQ AND CODES_ENABLE_ZMQML_HYBRID_TESTS) list(APPEND test-shell-files zmqml-iteration-time-hybrid-workflow.sh zmqml-event-time-hybrid-workflow.sh From 85ca8c20d60dea04bff985355d56ff85428e5495 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Thu, 2 Jul 2026 15:09:17 -0400 Subject: [PATCH 14/14] Fix compilation via script --- CMakeLists.txt | 53 +++++++++++++++++++++++++++++++++++ CODES-compile-instructions.sh | 13 ++++++++- 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d308e9dd..7c3bad1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,6 +77,59 @@ set(ARGOBOTS_PKG_CONFIG_PATH "" CACHE PATH "DEPRECATED: use CMAKE_PREFIX_PATH. W # dirs, the MPI dependency, and link libraries (linked in src/CMakeLists.txt). find_package(ROSS CONFIG REQUIRED) +# Compatibility for older ROSS CMake package configs. +# Some ROSS installs provide ROSSConfig.cmake but do not define the modern +# imported target ROSS::ROSS. This CODES tree links against ROSS::ROSS, so +# synthesize that target from the installed ROSS prefix when needed. +if(NOT TARGET ROSS::ROSS) + message(WARNING "ROSS package did not define ROSS::ROSS; creating compatibility imported target.") + + if(DEFINED ROSS_DIR) + get_filename_component(_ROSS_CONFIG_DIR "${ROSS_DIR}" ABSOLUTE) + else() + set(_ROSS_CONFIG_DIR "") + endif() + + # In this install layout, ROSSConfig.cmake is under: + # /lib/ROSSConfig.cmake + # so the prefix is one directory above ROSS_DIR. + get_filename_component(_ROSS_PREFIX "${_ROSS_CONFIG_DIR}/.." ABSOLUTE) + + find_library(_ROSS_COMPAT_LIBRARY + NAMES ROSS ross + PATHS + "${_ROSS_PREFIX}/lib" + "${_ROSS_CONFIG_DIR}" + NO_DEFAULT_PATH + ) + + find_path(_ROSS_COMPAT_INCLUDE_DIR + NAMES ross.h + PATHS + "${_ROSS_PREFIX}/include" + "${CMAKE_CURRENT_SOURCE_DIR}/../ross/core" + "$ENV{HOME}/ross/core" + NO_DEFAULT_PATH + ) + + if(NOT _ROSS_COMPAT_LIBRARY) + message(FATAL_ERROR "Could not locate ROSS library for compatibility target. Checked ${_ROSS_PREFIX}/lib and ${_ROSS_CONFIG_DIR}.") + endif() + + if(NOT _ROSS_COMPAT_INCLUDE_DIR) + message(FATAL_ERROR "Could not locate ross.h for compatibility target. Checked ${_ROSS_PREFIX}/include and ~/ross/core.") + endif() + + add_library(ROSS::ROSS UNKNOWN IMPORTED) + set_target_properties(ROSS::ROSS PROPERTIES + IMPORTED_LOCATION "${_ROSS_COMPAT_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${_ROSS_COMPAT_INCLUDE_DIR}" + ) + + message(STATUS "Using compatibility ROSS::ROSS library: ${_ROSS_COMPAT_LIBRARY}") + message(STATUS "Using compatibility ROSS::ROSS include dir: ${_ROSS_COMPAT_INCLUDE_DIR}") +endif() + # PkgConfig discovers the optional SWM/UNION/ARGOBOTS deps below (as imported # targets). The recommended way to point at a non-standard install is # CMAKE_PREFIX_PATH (pkg_check_modules searches /lib/pkgconfig etc. under diff --git a/CODES-compile-instructions.sh b/CODES-compile-instructions.sh index 80b3925d..6d425687 100644 --- a/CODES-compile-instructions.sh +++ b/CODES-compile-instructions.sh @@ -333,12 +333,23 @@ INNERPY fi fi -cmake_prefix_path="$(realpath "$CUR_DIR/ross/build/bin")" +ross_config="$(find "$CUR_DIR/ross/build" \( -name ROSSConfig.cmake -o -name ross-config.cmake \) | head -n 1)" +if [ -z "$ross_config" ]; then + echo "ERROR: Could not find built ROSSConfig.cmake under $CUR_DIR/ross/build." >&2 + echo " Try rebuilding ROSS or check whether make install completed." >&2 + exit 1 +fi + +ross_dir="$(dirname "$ross_config")" +echo "Using ROSS_DIR=${ross_dir}" + +cmake_prefix_path="${ross_dir}" if [ "$torch_enable" = 1 ]; then cmake_prefix_path="${cmake_prefix_path};${torch_cmake_prefix}" fi make_args_codes=( + -DROSS_DIR="${ross_dir}" -DCMAKE_PREFIX_PATH="${cmake_prefix_path}" -DCMAKE_C_FLAGS="-g -Wall" -DCMAKE_CXX_FLAGS="-g -Wall"