diff --git a/.github/workflows/zmqml-hybrid.yml b/.github/workflows/zmqml-hybrid.yml new file mode 100644 index 00000000..e041e31d --- /dev/null +++ b/.github/workflows/zmqml-hybrid.yml @@ -0,0 +1,195 @@ +name: ZMQML Hybrid + +on: + pull_request: + branches: + - master + push: + branches: + - master + +env: + ROSS_REF: 9b6ccb18f9b9db438bf41b5b221d0ef16a4dac48 + ZMQML_IMAGE: ghcr.io/codes-org/codes-ci-full:latest + +jobs: + zmqml-hybrid: + name: zmqml hybrid workflows + runs-on: ubuntu-24.04 + + steps: + - name: Checkout CODES + uses: actions/checkout@v4 + with: + path: codes + + - name: Checkout ROSS + uses: actions/checkout@v4 + with: + repository: ROSS-org/ROSS + ref: ${{ env.ROSS_REF }} + path: ross + + - name: Pull full dependency image + run: docker pull "$ZMQML_IMAGE" + + - name: Create Docker network + run: docker network create codes-zmqml-ci + + - name: Start ZMQML server container + run: | + mkdir -p "$PWD/zmqml-artifacts" + + docker run -d \ + --name zmqml-server \ + --network codes-zmqml-ci \ + -v "$PWD/codes:/work/codes" \ + -v "$PWD/zmqml-artifacts:/work/zmqml-artifacts" \ + -w /work/codes/src/surrogate/zmqml \ + -e ZMQML_ITERATION_HISTORY_LEN=2 \ + -e ZMQML_ITERATION_HORIZON=3 \ + -e ZMQML_ITERATION_TRAIN_STRIDE=1 \ + -e ZMQML_EVENT_TIME_MIN_ROWS=4 \ + -e ZMQML_EVENT_TIME_EPOCHS=2 \ + -e ZMQML_RECORD_LOG_PATH=/work/zmqml-artifacts/iteration-records.csv \ + -e ZMQML_EVENT_TIME_RECORD_LOG_PATH=/work/zmqml-artifacts/event-time-records.csv \ + "$ZMQML_IMAGE" \ + bash -euxo pipefail -c ' + apt-get update + apt-get install -y python3-zmq python3-numpy python3-sklearn python3-pandas python3-pip gettext-base + + python3 -c "import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec(\"torch\") else subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"--break-system-packages\", \"torch\", \"--index-url\", \"https://download.pytorch.org/whl/cpu\"])" + + exec python3 -u zmqmlserver.py + ' + + sleep 5 + docker ps --filter name=zmqml-server + docker logs zmqml-server + + - name: Build ROSS and CODES with ZMQML + run: | + docker run --rm \ + --name codes-zmqml-build \ + --network codes-zmqml-ci \ + -v "$PWD/codes:/work/codes" \ + -v "$PWD/ross:/work/ross" \ + -v "$PWD/ross-install:/work/ross-install" \ + -w /work \ + "$ZMQML_IMAGE" \ + bash -euxo pipefail -c ' + apt-get update + apt-get install -y python3-zmq python3-numpy python3-sklearn python3-pandas python3-pip gettext-base + python3 -c "import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec(\"torch\") else subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"--break-system-packages\", \"torch\", \"--index-url\", \"https://download.pytorch.org/whl/cpu\"])" + + cmake -S ross -B ross/build -G Ninja \ + -DCMAKE_BUILD_TYPE=Debug \ + -DROSS_BUILD_MODELS=ON \ + -DCMAKE_INSTALL_PREFIX=/work/ross-install + + cmake --build ross/build --target install -j + + cd /work/codes + rm -rf build + + cmake -S . -B build -G Ninja \ + -DCMAKE_BUILD_TYPE=Debug \ + -DBUILD_TESTING=ON \ + -DCODES_USE_SWM=ON \ + -DCODES_USE_TORCH=ON \ + -DCODES_USE_ZEROMQ=ON \ + -DCODES_ENABLE_ZMQML_HYBRID_TESTS=ON \ + -DCMAKE_C_COMPILER=mpicc \ + -DCMAKE_CXX_COMPILER=mpicxx \ + -DCMAKE_PREFIX_PATH="/work/ross-install;/opt/swm;/opt/argobots" \ + -DTorch_DIR="$(python3 -c "import torch; print(torch.utils.cmake_prefix_path)")/Torch" + + cmake --build build -j + ' + + - name: Run ZMQML hybrid tests + run: | + docker run --rm \ + --name codes-zmqml-tests \ + --network codes-zmqml-ci \ + -v "$PWD/codes:/work/codes" \ + -v "$PWD/zmqml-artifacts:/work/zmqml-artifacts" \ + -w /work/codes \ + -e ZMQML_ENDPOINT=tcp://zmqml-server:5555 \ + -e ZMQML_TEST_NP=1 \ + -e ZMQML_CTL_TIMEOUT=30 \ + "$ZMQML_IMAGE" \ + bash -euxo pipefail -c ' + apt-get update + apt-get install -y python3-zmq python3-numpy python3-sklearn python3-pandas python3-pip gettext-base + python3 -c "import importlib.util, subprocess, sys; sys.exit(0) if importlib.util.find_spec(\"torch\") else subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"--break-system-packages\", \"torch\", \"--index-url\", \"https://download.pytorch.org/whl/cpu\"])" + + ctest --test-dir build -N \ + -R "zmqml-(iteration-time|event-time)-hybrid-workflow.sh" \ + | tee /tmp/zmqml-ctest-list.txt + + grep -E "Test #[0-9]+: zmqml-(iteration-time|event-time)-hybrid-workflow.sh" \ + /tmp/zmqml-ctest-list.txt + + ctest --test-dir build \ + -R "zmqml-(iteration-time|event-time)-hybrid-workflow.sh" \ + --output-on-failure \ + --timeout 1200 \ + -VV + ' + + - name: Validate ZMQML server logs + run: | + mkdir -p "$PWD/zmqml-artifacts" + docker logs zmqml-server 2>&1 | tee "$PWD/zmqml-artifacts/zmqml-server.log" + + require_log() { + local pattern="$1" + local description="$2" + + if ! grep -nE "$pattern" "$PWD/zmqml-artifacts/zmqml-server.log"; then + echo "::error::Missing server-side ZMQML evidence: $description" + exit 1 + fi + } + + require_log '\[zmqmlserver\] director_debug_prints=1' \ + 'simulation configured the server debug flag' + + require_log '\[iteration-time records\]' \ + 'iteration-time records reached the server' + + require_log '\[iteration-time inference\].*predictions=' \ + 'iteration-time inference reached the server and returned predictions' + + require_log '\[event-time records\]' \ + 'event-time records reached the server' + + require_log '\[event-time inference\].*predictions=' \ + 'event-time inference reached the server and returned predictions' + + test -s "$PWD/zmqml-artifacts/iteration-records.csv" + test -s "$PWD/zmqml-artifacts/event-time-records.csv" + + - name: Dump ZMQML server logs + if: always() + run: docker logs zmqml-server || true + + - name: Stop ZMQML server + if: always() + run: | + docker rm -f zmqml-server || true + docker network rm codes-zmqml-ci || true + + - name: Upload ZMQML logs + if: always() + uses: actions/upload-artifact@v4 + with: + name: zmqml-hybrid-logs + path: | + codes/build/Testing/Temporary/LastTest.log + codes/build/Testing/Temporary/LastTestsFailed.log + codes/build/testing-output/** + zmqml-artifacts/** + if-no-files-found: ignore + retention-days: 14 diff --git a/CMakeLists.txt b/CMakeLists.txt index d308e9dd..7c3bad1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,6 +77,59 @@ set(ARGOBOTS_PKG_CONFIG_PATH "" CACHE PATH "DEPRECATED: use CMAKE_PREFIX_PATH. W # dirs, the MPI dependency, and link libraries (linked in src/CMakeLists.txt). find_package(ROSS CONFIG REQUIRED) +# Compatibility for older ROSS CMake package configs. +# Some ROSS installs provide ROSSConfig.cmake but do not define the modern +# imported target ROSS::ROSS. This CODES tree links against ROSS::ROSS, so +# synthesize that target from the installed ROSS prefix when needed. +if(NOT TARGET ROSS::ROSS) + message(WARNING "ROSS package did not define ROSS::ROSS; creating compatibility imported target.") + + if(DEFINED ROSS_DIR) + get_filename_component(_ROSS_CONFIG_DIR "${ROSS_DIR}" ABSOLUTE) + else() + set(_ROSS_CONFIG_DIR "") + endif() + + # In this install layout, ROSSConfig.cmake is under: + # /lib/ROSSConfig.cmake + # so the prefix is one directory above ROSS_DIR. + get_filename_component(_ROSS_PREFIX "${_ROSS_CONFIG_DIR}/.." ABSOLUTE) + + find_library(_ROSS_COMPAT_LIBRARY + NAMES ROSS ross + PATHS + "${_ROSS_PREFIX}/lib" + "${_ROSS_CONFIG_DIR}" + NO_DEFAULT_PATH + ) + + find_path(_ROSS_COMPAT_INCLUDE_DIR + NAMES ross.h + PATHS + "${_ROSS_PREFIX}/include" + "${CMAKE_CURRENT_SOURCE_DIR}/../ross/core" + "$ENV{HOME}/ross/core" + NO_DEFAULT_PATH + ) + + if(NOT _ROSS_COMPAT_LIBRARY) + message(FATAL_ERROR "Could not locate ROSS library for compatibility target. Checked ${_ROSS_PREFIX}/lib and ${_ROSS_CONFIG_DIR}.") + endif() + + if(NOT _ROSS_COMPAT_INCLUDE_DIR) + message(FATAL_ERROR "Could not locate ross.h for compatibility target. Checked ${_ROSS_PREFIX}/include and ~/ross/core.") + endif() + + add_library(ROSS::ROSS UNKNOWN IMPORTED) + set_target_properties(ROSS::ROSS PROPERTIES + IMPORTED_LOCATION "${_ROSS_COMPAT_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${_ROSS_COMPAT_INCLUDE_DIR}" + ) + + message(STATUS "Using compatibility ROSS::ROSS library: ${_ROSS_COMPAT_LIBRARY}") + message(STATUS "Using compatibility ROSS::ROSS include dir: ${_ROSS_COMPAT_INCLUDE_DIR}") +endif() + # PkgConfig discovers the optional SWM/UNION/ARGOBOTS deps below (as imported # targets). The recommended way to point at a non-standard install is # CMAKE_PREFIX_PATH (pkg_check_modules searches /lib/pkgconfig etc. under diff --git a/CODES-compile-instructions.sh b/CODES-compile-instructions.sh index 80b3925d..6d425687 100644 --- a/CODES-compile-instructions.sh +++ b/CODES-compile-instructions.sh @@ -333,12 +333,23 @@ INNERPY fi fi -cmake_prefix_path="$(realpath "$CUR_DIR/ross/build/bin")" +ross_config="$(find "$CUR_DIR/ross/build" \( -name ROSSConfig.cmake -o -name ross-config.cmake \) | head -n 1)" +if [ -z "$ross_config" ]; then + echo "ERROR: Could not find built ROSSConfig.cmake under $CUR_DIR/ross/build." >&2 + echo " Try rebuilding ROSS or check whether make install completed." >&2 + exit 1 +fi + +ross_dir="$(dirname "$ross_config")" +echo "Using ROSS_DIR=${ross_dir}" + +cmake_prefix_path="${ross_dir}" if [ "$torch_enable" = 1 ]; then cmake_prefix_path="${cmake_prefix_path};${torch_cmake_prefix}" fi make_args_codes=( + -DROSS_DIR="${ross_dir}" -DCMAKE_PREFIX_PATH="${cmake_prefix_path}" -DCMAKE_C_FLAGS="-g -Wall" -DCMAKE_CXX_FLAGS="-g -Wall" diff --git a/README.md b/README.md index 40513f74..13ca5c5e 100644 --- a/README.md +++ b/README.md @@ -259,6 +259,7 @@ This repo uses [clang-format](https://clang.llvm.org/docs/ClangFormat.html) to k - **Emacs:** see [clang-format.el](https://clang.llvm.org/docs/ClangFormat.html#emacs-integration). To reformat a file manually: `clang-format -i path/to/file.c`. CI runs `clang-format --dry-run --Werror` on every PR and rejects any drift, so PRs with unformatted code don't merge. +Note: The CI uses clang-format major release version 20, so you should format your files with that version. ### Determinism diff --git a/codes/surrogate/director-client.h b/codes/surrogate/director-client.h index aaea3d09..fa563c6a 100644 --- a/codes/surrogate/director-client.h +++ b/codes/surrogate/director-client.h @@ -125,8 +125,7 @@ extern "C" { extern void director_lp_register_model(const char*); - - +extern void director_record_external_zmq_latency(double processing_sec, double total_sec); /* extern void director_parse_args(char *args, int **args_array, int *length); static void director_issue_codes_event(director_state * s, tw_lpid nw_lpid, int dir_registered_event_type, tw_stime ts, tw_lp* lp); @@ -142,5 +141,4 @@ extern void dir_test_finalize(director_state* s, tw_lp* lp); #ifdef __cplusplus } #endif - #endif diff --git a/doc/example/kb.dfdally-72-zeromq-director.conf.in b/doc/example/kb.dfdally-72-zeromq-director.conf.in index 656959c4..fdb77ec7 100644 --- a/doc/example/kb.dfdally-72-zeromq-director.conf.in +++ b/doc/example/kb.dfdally-72-zeromq-director.conf.in @@ -23,19 +23,19 @@ LPGROUPS DIRECTOR { - start_iter="${DIRECTOR_START_ITER}"; - end_iter="${DIRECTOR_END_ITER}"; + start_iter="${START_ITER}"; + end_iter="${END_ITER}"; # Optional one-shot pause/retrain/resume pipeline. # First implementation is intended for --synch=1. - retrain_enabled="${DIRECTOR_RETRAIN_ENABLED}"; - retrain_iter="${DIRECTOR_RETRAIN_ITER}"; - retrain_save_path="${DIRECTOR_RETRAIN_SAVE_PATH}"; + retrain_enabled="${RETRAIN_ENABLED}"; + retrain_iter="${RETRAIN_ITER}"; + retrain_save_path="${RETRAIN_SAVE_PATH}"; # Optional second surrogate window after retraining. - second_surrogate_enabled="${DIRECTOR_SECOND_SURROGATE_ENABLED}"; - second_start_iter="${DIRECTOR_SECOND_START_ITER}"; - second_end_iter="${DIRECTOR_SECOND_END_ITER}"; + second_surrogate_enabled="${SECOND_SURROGATE_ENABLED}"; + second_start_iter="${SECOND_START_ITER}"; + second_end_iter="${SECOND_END_ITER}"; # Common modes: # diff --git a/src/networks/model-net/dragonfly-dally.cxx b/src/networks/model-net/dragonfly-dally.cxx index 03d6549c..546905ca 100644 --- a/src/networks/model-net/dragonfly-dally.cxx +++ b/src/networks/model-net/dragonfly-dally.cxx @@ -24,6 +24,7 @@ #include "codes/model-net-lp.h" #include "codes/surrogate/init.h" #if CODES_HAVE_TORCH +#include "codes/surrogate/director-client.h" #include "codes/surrogate/packet-latency-predictor/torch-jit.h" #endif #include "codes/net/dragonfly-dally.h" @@ -64,16 +65,15 @@ * undefined weak symbol with no providing library; Linux/ELF happens to * resolve it to null.) */ + #if CODES_HAVE_ZEROMQ +extern "C" void director_record_external_zmq_latency(double processing_sec, double total_sec) + __attribute__((weak)); extern std::vector zmqml_director_request(const std::string& surrogate_family, const std::string& surrogate_backend, const std::string& operation, const std::vector& args, const std::string& bindata); - -extern void director_record_zmq_latency_stats(const char* label, - const std::vector& ret, - double local_latency_sec); #endif @@ -298,7 +298,20 @@ static std::vector dfdally_event_time_director_request_with_latency (double)(finish.tv_nsec - start.tv_nsec) / 1000000000.0; #if CODES_HAVE_ZEROMQ - director_record_zmq_latency_stats(label, ret, local_latency_sec); + double zmq_processing_time = 0.0; + + if (ret.size() > 1) { + char* endptr = NULL; + double parsed = strtod(ret[1].c_str(), &endptr); + + if (endptr != ret[1].c_str() && isfinite(parsed) && parsed >= 0.0) { + zmq_processing_time = parsed; + } + } + + if (director_record_external_zmq_latency) { + director_record_external_zmq_latency(zmq_processing_time, local_latency_sec); + } #endif return ret; @@ -2696,14 +2709,8 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { char event_time_inference_enabled_str[MAX_NAME_LENGTH]; event_time_inference_enabled_str[0] = '\0'; - char const* inferencing_enabled_env = getenv("INFERENCING_ENABLED"); - if (inferencing_enabled_env && strlen(inferencing_enabled_env) > 0) { - snprintf(event_time_inference_enabled_str, sizeof(event_time_inference_enabled_str), "%s", - inferencing_enabled_env); - } else { - configuration_get_value(&config, "DIRECTOR", "inferencing_enabled", anno, - event_time_inference_enabled_str, MAX_NAME_LENGTH); - } + configuration_get_value(&config, "DIRECTOR", "inferencing_enabled", anno, + event_time_inference_enabled_str, MAX_NAME_LENGTH); /* * Do not expose a separate event-time inference flag. @@ -2738,14 +2745,8 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { char event_time_training_enabled_str[MAX_NAME_LENGTH]; event_time_training_enabled_str[0] = '\0'; - char const* training_enabled_env = getenv("TRAINING_ENABLED"); - if (training_enabled_env && strlen(training_enabled_env) > 0) { - snprintf(event_time_training_enabled_str, sizeof(event_time_training_enabled_str), "%s", - training_enabled_env); - } else { - configuration_get_value(&config, "DIRECTOR", "training_enabled", anno, - event_time_training_enabled_str, MAX_NAME_LENGTH); - } + configuration_get_value(&config, "DIRECTOR", "training_enabled", anno, + event_time_training_enabled_str, MAX_NAME_LENGTH); event_time_surrogate_family_selected = strcmp(event_time_surrogate_family_str, "event-time") == 0; @@ -2766,15 +2767,6 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { event_time_zmq_flush_registered = 1; } - if (dfdally_surrogate_debug_prints) { - fprintf(stderr, - "[event-time records] family=%s training_enabled=%s send_to_zmq=%d batch_size=%d\n", - event_time_surrogate_family_str, event_time_training_enabled_str, - event_time_training_records_enabled, event_time_zmq_batch_size); - fflush(stderr); - } - - // START Surrogate configuration char enable_str[MAX_NAME_LENGTH]; enable_str[0] = '\0'; @@ -2799,6 +2791,14 @@ static void dragonfly_read_config(const char* anno, dragonfly_param* params) { dfdally_surrogate_debug_prints = dfdally_string_is_true(debug_prints_str); + if (dfdally_surrogate_debug_prints) { + fprintf(stderr, + "[event-time records] family=%s training_enabled=%s send_to_zmq=%d batch_size=%d\n", + event_time_surrogate_family_str, event_time_training_enabled_str, + event_time_training_records_enabled, event_time_zmq_batch_size); + fflush(stderr); + } + // if surrogate mode has been set up if (enable_network_surrogate) { struct network_surrogate_config surr_conf = { diff --git a/src/surrogate/director-client.cxx b/src/surrogate/director-client.cxx index f865b0f1..43535792 100644 --- a/src/surrogate/director-client.cxx +++ b/src/surrogate/director-client.cxx @@ -23,7 +23,7 @@ #define DIR_ZMQ_CMD_LENGTH 64 #define DIR_ZMQ_ARG_LENGTH 2048 -#define DIR_MAX_PREDICTION 5 +#define DIR_MAX_PREDICTION 1 #define DIR_MAX_TRAINING_RECORDS 10 /* * The Python iteration-time model currently uses history_len=2 and horizon=3, @@ -86,29 +86,40 @@ std::vector director_client_request_family(const char* surrogate_fa int surrogate_enabled = 0; int inferencing_enabled = 1; - -void director_record_zmq_latency_stats(const char* label, const std::vector& ret, - double local_latency_sec) { - (void)label; - +static void director_record_zmq_latency_values(double processing_sec, double total_sec) { if (evaluate_perf != 1) { return; } - director_zmq_total_elapsed_times.push_back(local_latency_sec); + if (!std::isfinite(processing_sec) || processing_sec < 0.0) { + processing_sec = 0.0; + } + + if (!std::isfinite(total_sec) || total_sec < 0.0) { + total_sec = 0.0; + } + director_zmq_processing_times.push_back(processing_sec); + director_zmq_total_elapsed_times.push_back(total_sec); +} + +static void director_record_zmq_latency_stats(const char* label, + const std::vector& ret, + double local_latency_sec) { double zmq_processing_time = 0.0; + if (ret.size() > 1) { - try { - zmq_processing_time = std::stod(ret[1]); - } catch (...) { - if (director_debug_prints) { - fprintf(stderr, - "[DIR] Warning: could not parse zmq processing time from reply field " - "ret[1]=%s\n", - ret[1].c_str()); - fflush(stderr); - } + char* endptr = NULL; + double parsed = strtod(ret[1].c_str(), &endptr); + + if (endptr != ret[1].c_str() && std::isfinite(parsed) && parsed >= 0.0) { + zmq_processing_time = parsed; + } else if (director_debug_prints) { + fprintf(stderr, + "[DIR] Warning: could not parse zmq processing time from reply field ret[1]=%s " + "request=%s\n", + ret[1].c_str(), label ? label : ""); + fflush(stderr); } } else if (director_debug_prints) { fprintf(stderr, @@ -117,9 +128,12 @@ void director_record_zmq_latency_stats(const char* label, const std::vectordirector_id, - DIR_MAX_PREDICTION); // num-of-args;num-record + sprintf(args, "%d;%llu;%d;", 2, (unsigned long long)s->director_id, + DIR_MAX_PREDICTION); // num-of-args;client-id;num-predictions // The Python side primarily uses records previously sent through // send-records. Keep the payload empty for now rather than sending @@ -1246,31 +1260,29 @@ void director_event_handler_commit(director_state* s, tw_bf* bf, director_messag static void director_reduce_and_print_zmq_latency_stat(const char* stat_name, const std::vector& local_values) { - unsigned long long local_count = (unsigned long long)local_values.size(); - + unsigned long long local_count = 0; double local_sum = 0.0; double local_sq_sum = 0.0; double local_min = std::numeric_limits::infinity(); double local_max = -std::numeric_limits::infinity(); for (double value : local_values) { - local_sum += value; - local_sq_sum += value * value; - - if (value < local_min) { - local_min = value; + if (!std::isfinite(value)) { + continue; } - if (value > local_max) { - local_max = value; - } + local_count++; + local_sum += value; + local_sq_sum += value * value; + local_min = std::min(local_min, value); + local_max = std::max(local_max, value); } unsigned long long global_count = 0; double global_sum = 0.0; double global_sq_sum = 0.0; - double global_min = 0.0; - double global_max = 0.0; + double global_min = std::numeric_limits::infinity(); + double global_max = -std::numeric_limits::infinity(); MPI_Reduce(&local_count, &global_count, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_CODES); @@ -1293,14 +1305,14 @@ static void director_reduce_and_print_zmq_latency_stat(const char* stat_name, double variance = global_sq_sum / (double)global_count - mean * mean; /* - * Floating-point roundoff can make variance slightly negative when - * values are very close together. + * Floating-point roundoff can make variance slightly negative when values + * are very close together. */ if (variance < 0.0 && variance > -1.0e-18) { variance = 0.0; } - double stddev = sqrt(variance); + double stddev = variance > 0.0 ? sqrt(variance) : 0.0; std::cout << std::setprecision(9) << std::fixed << "==DIR_STATS " << stat_name << ": requests = " << global_count << ", mean = " << mean << ", min = " << global_min @@ -1335,6 +1347,7 @@ static void director_print_zmq_latency_stats_once(void) { director_zmq_total_elapsed_times); } + void director_finalize(director_state* s, tw_lp* lp) { director_print_zmq_latency_stats_once(); @@ -1365,8 +1378,7 @@ tw_lptype dir_lp = {(init_f)director_init, extern void director_lp_register_model(const char* dir_lp_name) { int num_dir_per_mgrp = codes_mapping_get_lp_count("MODELNET_GRP", 1, "dir-nw-lp", NULL, 0); if (num_dir_per_mgrp > 0) { - lp_type_register(dir_lp_name, &dir_lp); // DIRECTOR addition - register type - //printf("\n==DIR: Registered\n"); + lp_type_register(dir_lp_name, &dir_lp); } } diff --git a/src/surrogate/zmqml/CMakeLists.txt b/src/surrogate/zmqml/CMakeLists.txt index 82265c79..03e52353 100644 --- a/src/surrogate/zmqml/CMakeLists.txt +++ b/src/surrogate/zmqml/CMakeLists.txt @@ -9,14 +9,27 @@ add_library(zmqmlrequester STATIC zmqmlrequester.cpp) # PUBLIC libzmq: consumers (codes, and the executables that link codes) need the -# zmq_* symbols on their final link line. zmq.hpp travels with PkgConfig::ZeroMQ. -target_link_libraries(zmqmlrequester PUBLIC PkgConfig::ZeroMQ) +# zmq_* symbols on their final link line. +# +# Do not link the imported PkgConfig::ZeroMQ target directly here. On some +# systems pkg-config expands libzmq include dirs to invalid paths such as +# /usr/lib/include, and CMake rejects imported targets that contain non-existent +# INTERFACE_INCLUDE_DIRECTORIES. Use the raw pkg-config library variables and +# the validated zmq.hpp include directory found by the top-level CMake instead. +if(ZeroMQ_LIBRARY_DIRS) + target_link_directories(zmqmlrequester PUBLIC ${ZeroMQ_LIBRARY_DIRS}) +endif() + +target_link_libraries(zmqmlrequester PUBLIC ${ZeroMQ_LIBRARIES}) target_include_directories(zmqmlrequester # zmqmlrequester.h is the public surface — director-client.cxx includes it. - PUBLIC $ + PUBLIC + $ + ${ZMQ_CPP_INCLUDE_DIR} # rapidjson is header-only and used only inside zmqmlrequester.cpp. - PRIVATE ${RapidJSON_INCLUDE_DIR}) + PRIVATE + ${RapidJSON_INCLUDE_DIR}) # keep the requester PIC so it can be archived into one. set_target_properties(zmqmlrequester PROPERTIES POSITION_INDEPENDENT_CODE ON) diff --git a/src/surrogate/zmqml/model/mliterationtime.py b/src/surrogate/zmqml/model/mliterationtime.py index 6fda3461..ce9877ce 100644 --- a/src/surrogate/zmqml/model/mliterationtime.py +++ b/src/surrogate/zmqml/model/mliterationtime.py @@ -302,6 +302,51 @@ def _predict_once(self, client_id: int, history: List[float], iteration: int) -> return np.asarray(cleaned, dtype=np.float64) + def _global_fallback_prediction(self, requested_horizon: int) -> List[float]: + requested_horizon = max(1, int(requested_horizon)) + + recent_values: List[float] = [] + for model in self.models.values(): + if model.records: + recent_values.extend(model.records[-max(1, self.history_len):]) + + recent_values = _as_positive_finite(recent_values) + + if recent_values: + value = float(np.median(np.asarray(recent_values, dtype=np.float64))) + elif self.y_mean is not None and len(self.y_mean) > 0: + value = float(self.y_mean[0]) + else: + # Match the older iteration-time fallback scale instead of using 1.0. + value = 2_000_000.0 + + if not np.isfinite(value) or value <= 0.0: + value = 2_000_000.0 + + return [value for _ in range(requested_horizon)] + + def _global_fallback_prediction(self, requested_horizon: int) -> List[float]: + requested_horizon = max(1, int(requested_horizon)) + + recent_values: List[float] = [] + for model in self.models.values(): + if model.records: + recent_values.extend(model.records[-max(1, self.history_len):]) + + recent_values = _as_positive_finite(recent_values) + + if recent_values: + value = float(np.median(np.asarray(recent_values, dtype=np.float64))) + elif self.y_mean is not None and len(self.y_mean) > 0: + value = float(self.y_mean[0]) + else: + value = 2_000_000.0 + + if not np.isfinite(value) or value <= 0.0: + value = 2_000_000.0 + + return [value for _ in range(requested_horizon)] + def predict(self, client_id: int, requested_horizon: int | None = None) -> List[float]: client_id = int(client_id) requested_horizon = int(requested_horizon or self.horizon) @@ -310,7 +355,15 @@ def predict(self, client_id: int, requested_horizon: int | None = None) -> List[ model = self.get(client_id) if not model.records: - return model._fallback_prediction(requested_horizon) + fallback = self._global_fallback_prediction(requested_horizon) + if self.debug: + print( + "[IterationTimeModelRegistry] predict global-fallback " + f"client={client_id} requested_horizon={requested_horizon} " + f"trained={int(self.trained)} predictions={fallback}", + flush=True, + ) + return fallback # Predict in chunks if requested_horizon > self.horizon. out: List[float] = [] @@ -326,6 +379,9 @@ def predict(self, client_id: int, requested_horizon: int | None = None) -> List[ history.append(float(value)) iteration += 1 + if not out: + out = self._global_fallback_prediction(requested_horizon) + if self.debug: print( "[IterationTimeModelRegistry] predict " @@ -337,6 +393,7 @@ def predict(self, client_id: int, requested_horizon: int | None = None) -> List[ return out + def save(self, path: str) -> None: out_path = Path(path) if out_path.parent: diff --git a/src/surrogate/zmqml/zmqmlrequester.cpp b/src/surrogate/zmqml/zmqmlrequester.cpp index 13452b62..af631363 100644 --- a/src/surrogate/zmqml/zmqmlrequester.cpp +++ b/src/surrogate/zmqml/zmqmlrequester.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include "rapidjson/document.h" @@ -18,7 +19,15 @@ using namespace std; using namespace rapidjson; -static string endpoint = "tcp://localhost:5555"; +static string endpoint_from_env() { + const char* env = getenv("ZMQML_ENDPOINT"); + if (env && env[0] != '\0') { + return string(env); + } + return "tcp://localhost:5555"; +} + +static string endpoint = endpoint_from_env(); static int debug = 0; diff --git a/src/surrogate/zmqml/zmqmlserver.py b/src/surrogate/zmqml/zmqmlserver.py index fb4b7944..4e65588c 100755 --- a/src/surrogate/zmqml/zmqmlserver.py +++ b/src/surrogate/zmqml/zmqmlserver.py @@ -20,6 +20,8 @@ from model.mliterationtime import IterationTimeModelRegistry from model.mleventtime import EventTimeModel import csv +import io +import pickle from pathlib import Path import os @@ -37,19 +39,197 @@ launch_id = count(start=1) # unique for launched thread launched_threads = {} # id:obj. keep track of active threads. remove the thread once it finished -training_records = {} # client_id:[] -iteration_time_models = IterationTimeModelRegistry( - history_len=int(os.environ.get("ZMQML_ITERATION_HISTORY_LEN", "4")), - horizon=int(os.environ.get("ZMQML_ITERATION_HORIZON", "30")), - ridge_alpha=float(os.environ.get("ZMQML_ITERATION_RIDGE_ALPHA", "1.0")), - train_stride=int(os.environ.get("ZMQML_ITERATION_TRAIN_STRIDE", "3")), -) -event_time_model = EventTimeModel( - min_rows=int(os.environ.get("ZMQML_EVENT_TIME_MIN_ROWS", "32")), - max_epochs=int(os.environ.get("ZMQML_EVENT_TIME_EPOCHS", "80")), - lr=float(os.environ.get("ZMQML_EVENT_TIME_LR", "0.001")), - hidden_dim=int(os.environ.get("ZMQML_EVENT_TIME_HIDDEN_DIM", "64")), -) +training_records = {} # client_id -> [] + +ITERATION_MODEL_KWARGS = { + "history_len": int(os.environ.get("ZMQML_ITERATION_HISTORY_LEN", "4")), + "horizon": int(os.environ.get("ZMQML_ITERATION_HORIZON", "30")), + "ridge_alpha": float(os.environ.get("ZMQML_ITERATION_RIDGE_ALPHA", "1.0")), + "train_stride": int(os.environ.get("ZMQML_ITERATION_TRAIN_STRIDE", "3")), +} + +EVENT_TIME_MODEL_KWARGS = { + "min_rows": int(os.environ.get("ZMQML_EVENT_TIME_MIN_ROWS", "32")), + "max_epochs": int(os.environ.get("ZMQML_EVENT_TIME_EPOCHS", "80")), + "lr": float(os.environ.get("ZMQML_EVENT_TIME_LR", "0.001")), + "hidden_dim": int(os.environ.get("ZMQML_EVENT_TIME_HIDDEN_DIM", "64")), +} + +DEFAULT_TERMINAL_MODEL_SCOPE = "terminal" +DEFAULT_TERMINAL_MODEL_KEY = "global" + +# The current dragonfly-dally event-time rows use numeric LP-type fields. +# By default, current_lp_type=0 is treated as a terminal LP; all other LP +# types get switch-local models. Override if the enum changes. +EVENT_TIME_TERMINAL_LP_TYPES = { + token.strip() + for token in os.environ.get("ZMQML_EVENT_TIME_TERMINAL_LP_TYPES", "0").split(",") + if token.strip() +} + + +def normalize_model_identity(model_scope: str | None = None, model_key: str | None = None) -> tuple[str, str, str]: + scope = str(model_scope or "").strip() + key = str(model_key or "").strip() + + if not scope: + scope = DEFAULT_TERMINAL_MODEL_SCOPE + + if scope in ("terminal", "term", "client", "global"): + scope = DEFAULT_TERMINAL_MODEL_SCOPE + key = DEFAULT_TERMINAL_MODEL_KEY + elif scope in ("router", "switch", "switch-lp", "router-lp"): + scope = "switch" + if not key: + key = "unknown" + else: + if not key: + key = DEFAULT_TERMINAL_MODEL_KEY + + model_id = f"{scope}:{key}" + return scope, key, model_id + + +def model_identity_from_real_args( + real_args: list[str], + *, + offset: int, + default_scope: str = DEFAULT_TERMINAL_MODEL_SCOPE, + default_key: str = DEFAULT_TERMINAL_MODEL_KEY, +) -> tuple[str, str, str]: + if len(real_args) >= offset + 2: + return normalize_model_identity(real_args[offset], real_args[offset + 1]) + return normalize_model_identity(default_scope, default_key) + + + + +class ScopedEventTimeModelRegistry: + def __init__(self, kwargs: dict): + self.kwargs = dict(kwargs) + self.models: dict[str, EventTimeModel] = {} + self.debug = False + self.model_versions: dict[str, int] = {} + + def set_debug(self, enabled: bool) -> None: + self.debug = bool(enabled) + for model in self.models.values(): + model.set_debug(self.debug) + + def get(self, model_scope: str | None = None, model_key: str | None = None) -> EventTimeModel: + _, _, model_id = normalize_model_identity(model_scope, model_key) + if model_id not in self.models: + model = EventTimeModel(**self.kwargs) + model.set_debug(self.debug) + self.models[model_id] = model + self.model_versions.setdefault(model_id, 0) + return self.models[model_id] + + def model_id(self, model_scope: str | None = None, model_key: str | None = None) -> str: + return normalize_model_identity(model_scope, model_key)[2] + + def train_or_update(self, model_scope: str | None = None, model_key: str | None = None) -> bool: + if model_scope in (None, "", "all", "*"): + trained_any = False + for model_id, model in self.models.items(): + trained = model.train_or_update() + if trained: + self.model_versions[model_id] = self.model_versions.get(model_id, 0) + 1 + trained_any = trained or trained_any + return trained_any + + model_id = self.model_id(model_scope, model_key) + model = self.get(model_scope, model_key) + trained = model.train_or_update() + if trained: + self.model_versions[model_id] = self.model_versions.get(model_id, 0) + 1 + return trained + + @staticmethod + def _safe_filename(model_id: str) -> str: + return model_id.replace("/", "_").replace(":", "__") + + def save(self, path: str | Path, model_scope: str | None = None, model_key: str | None = None) -> None: + path = Path(path) + + if model_scope not in (None, "", "all", "*"): + model = self.get(model_scope, model_key) + if path.parent: + path.parent.mkdir(parents=True, exist_ok=True) + model.save(path) + return + + path.mkdir(parents=True, exist_ok=True) + manifest = { + "format": "scoped-event-time-directory-v1", + "models": {}, + } + + for model_id, model in sorted(self.models.items()): + filename = self._safe_filename(model_id) + ".pt" + model.save(path / filename) + manifest["models"][model_id] = filename + + (path / "manifest.json").write_text(json.dumps(manifest, indent=2, sort_keys=True)) + + def load(self, path: str | Path, model_scope: str | None = None, model_key: str | None = None) -> None: + path = Path(path) + + if path.is_dir(): + manifest_path = path / "manifest.json" + if not manifest_path.exists(): + raise FileNotFoundError(f"missing scoped event-time manifest: {manifest_path}") + + manifest = json.loads(manifest_path.read_text()) + self.models.clear() + self.model_versions.clear() + for model_id, filename in manifest.get("models", {}).items(): + scope, key = model_id.split(":", 1) + model = self.get(scope, key) + model.load(path / filename) + model.set_debug(self.debug) + self.model_versions[model_id] = self.model_versions.get(model_id, 0) + 1 + return + + # Backward-compatible load of an old single event-time model file. + model = self.get(model_scope, model_key) + model.load(path) + model.set_debug(self.debug) + model_id = self.model_id(model_scope, model_key) + self.model_versions[model_id] = self.model_versions.get(model_id, 0) + 1 + + def status(self, model_scope: str | None = None, model_key: str | None = None) -> dict[str, str]: + if model_scope in (None, "", "all", "*"): + model_ids = sorted(self.models) + else: + model_ids = [self.model_id(model_scope, model_key)] + + total_rows = 0 + trained_models = 0 + entries = [] + + for model_id in model_ids: + model = self.models.get(model_id) + if model is None: + continue + total_rows += len(model.rows) + trained_models += int(bool(model.trained)) + entries.append( + f"{model_id}:rows={len(model.rows)},trained={int(model.trained)},examples={model.training_examples}" + ) + + return { + "model_count": str(len(model_ids)), + "trained_models": str(trained_models), + "total_rows": str(total_rows), + "models": ";".join(entries), + } + + +iteration_time_models = IterationTimeModelRegistry(**ITERATION_MODEL_KWARGS) +event_time_models = ScopedEventTimeModelRegistry(EVENT_TIME_MODEL_KWARGS) + +event_time_model = event_time_models.get() iteration_model_path = os.environ.get("ZMQML_ITERATION_MODEL_PATH", "").strip() event_time_model_path = os.environ.get("ZMQML_EVENT_TIME_MODEL_PATH", "").strip() event_time_record_log_path = os.environ.get("ZMQML_EVENT_TIME_RECORD_LOG_PATH", "").strip() @@ -58,10 +238,7 @@ app_alloc_path = os.environ.get("ZMQML_APP_ALLOC_PATH", "").strip() auto_train_on_records = os.environ.get( - "ZMQML_AUTO_TRAIN_ON_RECORDS", "1" -).strip().lower() in ("1", "true", "yes", "on") -event_time_auto_train_on_records = os.environ.get( - "ZMQML_EVENT_TIME_AUTO_TRAIN_ON_RECORDS", "0" + "ZMQML_AUTO_TRAIN_ON_RECORDS", "0" ).strip().lower() in ("1", "true", "yes", "on") iteration_model_version = 0 @@ -83,10 +260,11 @@ ) if event_time_model_path: - event_time_model.load(event_time_model_path) + event_time_models.load(event_time_model_path) + event_time_model = event_time_models.get() event_time_model_version = 1 print( - f"[zmqmlserver] loaded event-time model: {event_time_model_path}", + f"[zmqmlserver] loaded event-time model(s): {event_time_model_path}", flush=True, ) @@ -228,7 +406,7 @@ def set_director_debug_prints(args): director_debug_prints = raw in ("1", "true", "yes", "on", "enabled") iteration_time_models.set_debug(director_debug_prints) - event_time_model.set_debug(director_debug_prints) + event_time_models.set_debug(director_debug_prints) if director_debug_prints: print(f"[zmqmlserver] director_debug_prints=1", flush=True) @@ -335,13 +513,17 @@ def receivedata(args, bindata): # # receive training records # + def receiverecords(args, bindata): status = "failed" st = time.time() - num_args = int(args[0]) # 1st arg is num of args - client = int(args[1]) # 2nd arg is client id - num_records = int(args[2]) # 3rd arg is num records + real_args = _real_command_args(args) + if len(real_args) < 2: + return ("failed", time.time() - st) + + client = int(real_args[0]) + num_records = int(real_args[1]) records_str = str(bindata.decode('utf-8')) records_str = records_str.strip() @@ -361,17 +543,10 @@ def receiverecords(args, bindata): training_records[client].extend(parsed_records) - # Keep the raw records available for offline/pretraining workflows. - # By default this preserves the old behavior and trains immediately. - # Set ZMQML_AUTO_TRAIN_ON_RECORDS=0 for pure-PDES collection or - # frozen pretrained inference runs. if parsed_records: append_record_log(client, parsed_records) model = iteration_time_models.get(client) - # Enrich the ML model with app_id metadata when available. - # The C++ protocol still sends client + timing values, while the Python - # server infers app_id from ZMQML_APP_ALLOC_PATH. app_id = client_app_map.get(client, -1) if "client_app_map" in globals() else -1 if hasattr(model, "set_app_id"): @@ -400,19 +575,17 @@ def receiverecords(args, bindata): return (status, elapsed_time) -# -# do inference to get predictions -# def launch_iteration_time_inferencing(args, bindata): status = "failed" st = time.time() - num_args = int(args[0]) # 1st arg is num of args - client = int(args[1]) # 2nd arg is client id - num_steps = int(args[2]) # 3rd arg is num steps to predict + real_args = _real_command_args(args) + if len(real_args) < 2: + return ("failed", time.time() - st, "") + + client = int(real_args[0]) + num_steps = int(real_args[1]) - # Optional recent-context payload. The normal path uses records previously - # received through send-records, but accepting context keeps the API flexible. records_str = str(bindata.decode('utf-8')) records_str = records_str.strip() @@ -448,11 +621,6 @@ def launch_iteration_time_inferencing(args, bindata): elapsed_time = time.time() - st return (status, elapsed_time, inferences_str) - - - - - def event_time_payload_has_header(payload: str) -> bool: for line in payload.splitlines(): line = line.strip() @@ -502,27 +670,101 @@ def append_event_time_record_log(payload: str) -> None: f.write("\n") +def event_time_model_identity_from_row(row: dict) -> tuple[str, str, str]: + raw_lp_type = str(row.get("current_lp_type", "")).strip() + raw_gid = str(row.get("current_lp_gid", "")).strip() + + if raw_lp_type in EVENT_TIME_TERMINAL_LP_TYPES: + return normalize_model_identity("terminal", "global") + + return normalize_model_identity("switch", raw_gid or "unknown") + + +def iter_event_time_rows_from_payload(raw_payload: str): + payload = event_time_payload_with_header(raw_payload) + if not payload.strip(): + return + + reader = csv.DictReader(io.StringIO(payload)) + if reader.fieldnames: + reader.fieldnames = [str(name).strip().lstrip("#").strip() for name in reader.fieldnames] + + for row in reader: + clean = {str(k).strip().lstrip("#").strip(): v for k, v in row.items()} + yield clean + + def receive_event_time_records(args, bindata): st = time.time() raw_payload = bindata.decode("utf-8", errors="replace").strip() - payload = event_time_payload_with_header(raw_payload) - loaded_rows = event_time_model.add_records_text(payload) if payload else 0 + loaded_rows = 0 + + # Important performance rule: + # Do NOT call EventTimeModel.add_records_text(...) once per row. + # Event-time batches can contain 65K+ rows, and per-row parsing/routing makes + # pure-PDES collection much slower than the old single-global model path. + # + # Instead, parse once, group rows by scoped model id, then call + # add_records_text(...) once per scoped model per C++ batch. + grouped_rows: dict[str, list[dict]] = {} + grouped_identity: dict[str, tuple[str, str]] = {} + + for row in iter_event_time_rows_from_payload(raw_payload) or []: + model_scope, model_key, model_id = event_time_model_identity_from_row(row) + grouped_rows.setdefault(model_id, []).append(row) + grouped_identity[model_id] = (model_scope, model_key) + + per_model_loaded: dict[str, int] = {} + + for model_id, rows in grouped_rows.items(): + model_scope, model_key = grouped_identity[model_id] + model = event_time_models.get(model_scope, model_key) + + if not rows: + continue + + # Build one CSV payload for this model. Preserve the field order from + # the first row and include any later extra keys defensively. + fieldnames = list(rows[0].keys()) + seen = set(fieldnames) + for row in rows[1:]: + for key in row.keys(): + if key not in seen: + fieldnames.append(key) + seen.add(key) + + buf = io.StringIO() + writer = csv.DictWriter(buf, fieldnames=fieldnames, extrasaction="ignore") + writer.writeheader() + writer.writerows(rows) + + accepted = model.add_records_text(buf.getvalue()) + loaded_rows += accepted + per_model_loaded[model_id] = accepted + + if accepted > 0 and auto_train_on_records: + model.train_or_update() if loaded_rows > 0: append_event_time_record_log(raw_payload) - if event_time_auto_train_on_records: - event_time_model.train_or_update() - director_debug( - f"[event-time records] loaded_rows={loaded_rows} " - f"total_rows={len(event_time_model.rows)} " - f"trained={int(event_time_model.trained)}" - ) + if director_debug_prints: + # Keep this compact. Printing the full per_model dict for 100+ switch + # models is noisy and can itself become expensive. + nonempty_models = sum(1 for v in per_model_loaded.values() if v > 0) + min_rows = min(per_model_loaded.values()) if per_model_loaded else 0 + max_rows = max(per_model_loaded.values()) if per_model_loaded else 0 + sample = sorted(per_model_loaded.items())[:8] + print( + f"[event-time records] loaded_rows={loaded_rows} " + f"models={nonempty_models} min_rows_per_model={min_rows} " + f"max_rows_per_model={max_rows} sample={sample}", + flush=True, + ) return ("done", time.time() - st, loaded_rows) - def load_event_time_records_csv_command(args): st = time.time() real_args = _real_command_args(args) @@ -542,22 +784,44 @@ def load_event_time_records_csv_command(args): "error": f"event-time records path does not exist: {path}", } - loaded_rows = event_time_model.load_csv(path) + loaded_rows = 0 + files = sorted(path.rglob("*")) if path.is_dir() else [path] - return { + for child in files: + if not child.is_file() or child.suffix.lower() not in (".csv", ".txt", ".log"): + continue + status, _et, child_rows = receive_event_time_records(["0"], child.read_text().encode("utf-8")) + if status == "done": + loaded_rows += int(child_rows) + + ret = { "status": "done", "et": str(time.time() - st), "path": str(path), "loaded_rows": str(loaded_rows), - "total_rows": str(len(event_time_model.rows)), } + ret.update(event_time_models.status()) + return ret def train_event_time_model_command(args): global event_time_model_version st = time.time() - trained = event_time_model.train_or_update() + real_args = _real_command_args(args) + + target = real_args[0] if real_args else "all" + if target in ("", "all", "*"): + trained = event_time_models.train_or_update("all", "") + model_scope = "all" + model_key = "" + model_id = "all" + elif len(real_args) >= 2: + model_scope, model_key, model_id = normalize_model_identity(real_args[0], real_args[1]) + trained = event_time_models.train_or_update(model_scope, model_key) + else: + model_scope, model_key, model_id = normalize_model_identity("switch", target) + trained = event_time_models.train_or_update(model_scope, model_key) if trained: event_time_model_version += 1 @@ -565,18 +829,17 @@ def train_event_time_model_command(args): ret = { "status": "done" if trained else "failed", "et": str(time.time() - st), + "target": model_id, "model_version": str(event_time_model_version), } - ret.update(event_time_model.status()) + ret.update(event_time_models.status(model_scope, model_key)) if not trained: - ret["error"] = "event-time model was not trained; load enough generic event-time rows first" + ret["error"] = "event-time model was not trained; load enough scoped event-time rows first" print( - f"[event-time model-train-command] trained={int(trained)} " - f"rows={len(event_time_model.rows)} " - f"training_examples={event_time_model.training_examples} " - f"model_version={event_time_model_version}", + f"[event-time model-train-command] target={model_id} trained={int(trained)} " + f"model_version={event_time_model_version} status={event_time_models.status(model_scope, model_key)}", flush=True, ) @@ -595,21 +858,29 @@ def save_event_time_model_command(args): } model_path = Path(real_args[0]) - if model_path.parent: - model_path.parent.mkdir(parents=True, exist_ok=True) + target = real_args[1] if len(real_args) >= 2 else "all" - event_time_model.save(model_path) + if target in ("", "all", "*"): + event_time_models.save(model_path, "all", "") + model_id = "all" + elif len(real_args) >= 3: + model_scope, model_key, model_id = normalize_model_identity(real_args[1], real_args[2]) + event_time_models.save(model_path, model_scope, model_key) + else: + model_scope, model_key, model_id = normalize_model_identity("switch", target) + event_time_models.save(model_path, model_scope, model_key) return { "status": "done", "et": str(time.time() - st), "path": str(model_path), + "target": model_id, "model_version": str(event_time_model_version), } def load_event_time_model_command(args): - global event_time_model_version + global event_time_model_version, event_time_model st = time.time() real_args = _real_command_args(args) @@ -629,25 +900,50 @@ def load_event_time_model_command(args): "error": f"model path does not exist: {model_path}", } - event_time_model.load(model_path) + target = real_args[1] if len(real_args) >= 2 else "all" + + if target in ("", "all", "*"): + event_time_models.load(model_path) + model_id = "all" + elif len(real_args) >= 3: + model_scope, model_key, model_id = normalize_model_identity(real_args[1], real_args[2]) + event_time_models.load(model_path, model_scope, model_key) + else: + model_scope, model_key, model_id = normalize_model_identity("switch", target) + event_time_models.load(model_path, model_scope, model_key) + + event_time_model = event_time_models.get() event_time_model_version += 1 return { "status": "done", "et": str(time.time() - st), "path": str(model_path), + "target": model_id, "model_version": str(event_time_model_version), } def event_time_model_status_command(args): st = time.time() + real_args = _real_command_args(args) + + if not real_args or real_args[0] in ("", "all", "*"): + model_scope = "all" + model_key = "" + model_id = "all" + elif len(real_args) >= 2: + model_scope, model_key, model_id = normalize_model_identity(real_args[0], real_args[1]) + else: + model_scope, model_key, model_id = normalize_model_identity("switch", real_args[0]) + ret = { "status": "done", "et": str(time.time() - st), + "target": model_id, "model_version": str(event_time_model_version), } - ret.update(event_time_model.status()) + ret.update(event_time_models.status(model_scope, model_key)) return ret @@ -663,15 +959,24 @@ def launch_event_time_inferencing(args, bindata): requested_count = 1 payload = bindata.decode("utf-8", errors="replace").strip() - predictions = event_time_model.predict_from_text( + rows = list(iter_event_time_rows_from_payload(payload) or []) + + if rows: + model_scope, model_key, model_id = event_time_model_identity_from_row(rows[0]) + else: + model_scope, model_key, model_id = normalize_model_identity() + + model = event_time_models.get(model_scope, model_key) + predictions = model.predict_from_text( payload, requested_count=max(1, requested_count), ) predictions_str = " ".join(str(float(x)) for x in predictions) director_debug( - f"[event-time inference] requested_count={requested_count} " - f"payload_bytes={len(payload)} predictions={predictions_str}" + f"[event-time inference] model={model_id} requested_count={requested_count} " + f"payload_bytes={len(payload)} trained={int(model.trained)} " + f"rows={len(model.rows)} predictions={predictions_str}" ) return ("done", time.time() - st, predictions_str) @@ -695,6 +1000,7 @@ def _real_command_args(args): return out + def train_iteration_time_model_command(args): global iteration_model_version @@ -875,7 +1181,6 @@ def load_iteration_records_csv_command(args): "loaded_clients": str(len(loaded_clients)), } - def load_iteration_time_model_command(args): global iteration_model_version @@ -916,6 +1221,7 @@ def load_iteration_time_model_command(args): } + def iteration_time_model_status_command(args): st = time.time() real_args = _real_command_args(args) @@ -960,8 +1266,6 @@ def iteration_time_model_status_command(args): "clients": ";".join(per_client), } - -# Backwards-compatible wrapper for the old command name. def launch_surrogate_inferencing(args, bindata): return launch_iteration_time_inferencing(args, bindata) @@ -991,7 +1295,7 @@ def director_request_command(msg, bindata): family = str(msg.get("surrogate_family", "iteration-time")).strip() operation = str(msg.get("operation", "")).strip() backend = str(msg.get("surrogate_backend", "")).strip() - args = _real_command_args(msg.get("args", [])) + args = msg.get("args", []) operation_aliases = { "status": "model-status", diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 70ab98e5..4a660ae6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -2,6 +2,10 @@ enable_testing() configure_file(run-test.sh.in run-test.sh) +option(CODES_ENABLE_ZMQML_HYBRID_TESTS + "Register ZMQML hybrid workflow tests that require a running zmqmlserver.py" + OFF) + # codes_add_equivalence_test — register an equivalence/determinism test. # # Runs a model binary two or more times and asserts a marker line (default @@ -262,6 +266,13 @@ if(USE_UNION) list(APPEND test-shell-files ${union-shell-files}) endif() +if(USE_ZEROMQ AND CODES_ENABLE_ZMQML_HYBRID_TESTS) + list(APPEND test-shell-files + zmqml-iteration-time-hybrid-workflow.sh + zmqml-event-time-hybrid-workflow.sh + ) +endif() + foreach(testname ${test-shell-files}) add_test(NAME ${testname} COMMAND "${CMAKE_CURRENT_BINARY_DIR}/run-test.sh" "${CMAKE_CURRENT_SOURCE_DIR}/${testname}" diff --git a/tests/zmqml-event-time-hybrid-workflow.sh b/tests/zmqml-event-time-hybrid-workflow.sh new file mode 100755 index 00000000..18e545cb --- /dev/null +++ b/tests/zmqml-event-time-hybrid-workflow.sh @@ -0,0 +1,161 @@ +#!/bin/bash +set -euo pipefail + +if [[ -z "${bindir:-}" ]]; then + echo "bindir variable not set" + exit 1 +fi + +if [[ -z "${srcdir:-}" ]]; then + echo "srcdir variable not set" + exit 1 +fi + +endpoint="${ZMQML_ENDPOINT:-tcp://localhost:5555}" +np="${ZMQML_TEST_NP:-1}" + +expfolder="$PWD" +artifacts="$expfolder/zmqml-event-artifacts" +mkdir -p "$artifacts" + +train_workdir="$artifacts/train-run" +hybrid_workdir="$artifacts/hybrid-run" +mkdir -p "$train_workdir" "$hybrid_workdir" + +cp "$bindir/doc/example/kb.dfdally-72-milc-small.json" "$expfolder/" +cp "$bindir/doc/example/kb.dfdally-72-milc-small.alloc.conf" "$expfolder/" + +# Keep the event-time CI test small. The stock 10-iteration MILC run +# processes millions of events and makes the hybrid event-time inference +# phase too large for a smoke test. +python3 - "$expfolder/kb.dfdally-72-milc-small.json" <<'PYJSON' +import json +import sys +from pathlib import Path + +path = Path(sys.argv[1]) +data = json.loads(path.read_text()) +data["jobs"]["cfg"]["iteration_cnt"] = 4 +path.write_text(json.dumps(data, indent=2) + "\n") +PYJSON + +printf '72 %s/kb.dfdally-72-milc-small.json 1 0\n' "$expfolder" \ + > "$expfolder/kb.dfdally-72-milc-small.workload.conf" + +make_event_conf() { + local out="$1" + local inferencing="$2" + local training="$3" + + SURROGATE_BACKEND=dragonfly-dally \ + SURROGATE_FAMILY=event-time \ + START_ITER=1 \ + END_ITER=2 \ + RETRAIN_ENABLED=0 \ + RETRAIN_ITER=-1 \ + RETRAIN_SAVE_PATH="" \ + SECOND_SURROGATE_ENABLED=0 \ + SECOND_START_ITER=100000 \ + SECOND_END_ITER=100001 \ + INFERENCING_ENABLED="$inferencing" \ + SURROGATE_ENABLED=1 \ + TRAINING_ENABLED="$training" \ + DIRECTOR_DEBUG_PRINTS=1 \ + SHUTDOWN_ZMQML_SERVER_ON_FINALIZE=0 \ + envsubst < "$bindir/doc/example/kb.dfdally-72-event-time-director.template.conf.in" > "$out" +} + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family event-time \ + status + +# Phase 1: high-fidelity event-time record collection. +make_event_conf "$expfolder/event-time-train.conf" 0 1 + +( + cd "$train_workdir" + ZMQML_ENDPOINT="$endpoint" \ + mpirun -np "$np" "$bindir/src/model-net-mpi-replay" \ + --synch=1 \ + --workload_type=swm-online \ + --disable_compute=0 \ + --payload_sz=4096 \ + --workload_conf_file="$expfolder/kb.dfdally-72-milc-small.workload.conf" \ + --alloc_file="$expfolder/kb.dfdally-72-milc-small.alloc.conf" \ + -- "$expfolder/event-time-train.conf" +) > "$artifacts/event-time-train.out" \ + 2> "$artifacts/event-time-train.err" + +grep 'Net Events Processed' "$artifacts/event-time-train.out" +grep -E '\[event-time records\].*send_to_zmq=1' "$artifacts/event-time-train.err" + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family event-time \ + train \ + > "$artifacts/event-time-train-model.json" + +grep '"status": "done"' "$artifacts/event-time-train-model.json" + +python3 - "$artifacts/event-time-train-model.json" <<'PY_EVENT_TRAIN_CHECK' +import json +import sys +from pathlib import Path + +path = Path(sys.argv[1]) +data = json.loads(path.read_text()) + +total_rows = int(data.get("total_rows", "0")) +trained_models = int(data.get("trained_models", "0")) + +if total_rows <= 0: + raise SystemExit(f"event-time train reported no server-side rows: {data}") + +if trained_models <= 0: + raise SystemExit(f"event-time train reported no trained models: {data}") + +print( + "event-time server train status: " + f"total_rows={total_rows} trained_models={trained_models}" +) +PY_EVENT_TRAIN_CHECK + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family event-time \ + save "$artifacts/event-time-models" \ + > "$artifacts/event-time-save-model.json" + +test -f "$artifacts/event-time-models/manifest.json" +grep '"status": "done"' "$artifacts/event-time-save-model.json" + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family event-time \ + load "$artifacts/event-time-models" \ + > "$artifacts/event-time-load-model.json" + +grep '"status": "done"' "$artifacts/event-time-load-model.json" + +# Phase 2: hybrid event-time inference. +make_event_conf "$expfolder/event-time-hybrid.conf" 1 0 + +( + cd "$hybrid_workdir" + ZMQML_ENDPOINT="$endpoint" \ + mpirun -np "$np" "$bindir/src/model-net-mpi-replay" \ + --synch=1 \ + --workload_type=swm-online \ + --disable_compute=0 \ + --payload_sz=4096 \ + --workload_conf_file="$expfolder/kb.dfdally-72-milc-small.workload.conf" \ + --alloc_file="$expfolder/kb.dfdally-72-milc-small.alloc.conf" \ + -- "$expfolder/event-time-hybrid.conf" +) > "$artifacts/event-time-hybrid.out" \ + 2> "$artifacts/event-time-hybrid.err" + +grep 'Net Events Processed' "$artifacts/event-time-hybrid.out" +grep -E '\[event-time surrogate\] active=1|\[event-time inference cache-(hit|miss)\]' "$artifacts/event-time-hybrid.err" + +exit 0 diff --git a/tests/zmqml-iteration-time-hybrid-workflow.sh b/tests/zmqml-iteration-time-hybrid-workflow.sh new file mode 100755 index 00000000..70566dcc --- /dev/null +++ b/tests/zmqml-iteration-time-hybrid-workflow.sh @@ -0,0 +1,148 @@ +#!/bin/bash +set -euo pipefail + +if [[ -z "${bindir:-}" ]]; then + echo "bindir variable not set" + exit 1 +fi + +if [[ -z "${srcdir:-}" ]]; then + echo "srcdir variable not set" + exit 1 +fi + +endpoint="${ZMQML_ENDPOINT:-tcp://localhost:5555}" +np="${ZMQML_TEST_NP:-1}" + +expfolder="$PWD" +artifacts="$expfolder/zmqml-iteration-artifacts" +mkdir -p "$artifacts" + +train_workdir="$artifacts/train-run" +hybrid_workdir="$artifacts/hybrid-run" +mkdir -p "$train_workdir" "$hybrid_workdir" + +cp "$bindir/doc/example/kb.dfdally-72-milc-small.workload.conf" "$expfolder/" +cp "$bindir/doc/example/kb.dfdally-72-milc-small.json" "$expfolder/" +cp "$bindir/doc/example/kb.dfdally-72-milc-small.alloc.conf" "$expfolder/" + +make_conf() { + local out="$1" + local start_iter="$2" + local end_iter="$3" + local inferencing="$4" + local surrogate="$5" + local training="$6" + + START_ITER="$start_iter" \ + END_ITER="$end_iter" \ + RETRAIN_ENABLED=0 \ + RETRAIN_ITER=-1 \ + RETRAIN_SAVE_PATH="" \ + SECOND_SURROGATE_ENABLED=0 \ + SECOND_START_ITER=100000 \ + SECOND_END_ITER=100001 \ + INFERENCING_ENABLED="$inferencing" \ + SURROGATE_ENABLED="$surrogate" \ + TRAINING_ENABLED="$training" \ + DIRECTOR_DEBUG_PRINTS=1 \ + SHUTDOWN_ZMQML_SERVER_ON_FINALIZE=0 \ + PACKET_SIZE=4096 \ + CHUNK_SIZE=64 \ + envsubst < "$bindir/doc/example/kb.dfdally-72-zeromq-director.template.conf.in" > "$out" +} + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family iteration-time \ + status + +# Phase 1: high-fidelity/director data collection. Inferencing is off, training +# records are sent to the external ZMQML server. +make_conf "$expfolder/iteration-train.conf" 0 10 0 1 1 + +( + cd "$train_workdir" + ZMQML_ENDPOINT="$endpoint" \ + mpirun -np "$np" "$bindir/src/model-net-mpi-replay" \ + --synch=1 \ + --workload_type=swm-online \ + --disable_compute=0 \ + --payload_sz=4096 \ + --workload_conf_file="$expfolder/kb.dfdally-72-milc-small.workload.conf" \ + --alloc_file="$expfolder/kb.dfdally-72-milc-small.alloc.conf" \ + -- "$expfolder/iteration-train.conf" +) > "$artifacts/iteration-train.out" \ + 2> "$artifacts/iteration-train.err" + +grep 'Net Events Processed' "$artifacts/iteration-train.out" + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family iteration-time \ + train \ + > "$artifacts/iteration-train-model.json" + +grep '"status": "done"' "$artifacts/iteration-train-model.json" + +python3 - "$artifacts/iteration-train-model.json" <<'PY_ITER_TRAIN_CHECK' +import json +import sys +from pathlib import Path + +path = Path(sys.argv[1]) +data = json.loads(path.read_text()) + +total_records = int(data.get("total_records", "0")) +trained_clients = int(data.get("trained_clients", "0")) + +if total_records <= 0: + raise SystemExit(f"iteration-time train reported no server-side records: {data}") + +if trained_clients <= 0: + raise SystemExit(f"iteration-time train reported no trained clients: {data}") + +print( + "iteration-time server train status: " + f"total_records={total_records} trained_clients={trained_clients}" +) +PY_ITER_TRAIN_CHECK + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family iteration-time \ + save "$artifacts/iteration-time-model.pt" \ + > "$artifacts/iteration-save-model.json" + +test -s "$artifacts/iteration-time-model.pt" +grep '"status": "done"' "$artifacts/iteration-save-model.json" + +python3 "$srcdir/src/surrogate/zmqml/zmqmlctl.py" \ + --endpoint "$endpoint" \ + --family iteration-time \ + load "$artifacts/iteration-time-model.pt" \ + > "$artifacts/iteration-load-model.json" + +grep '"status": "done"' "$artifacts/iteration-load-model.json" + +# Phase 2: hybrid inference. Training is off; inference is on. +make_conf "$expfolder/iteration-hybrid.conf" 3 8 1 1 0 + +( + cd "$hybrid_workdir" + ZMQML_ENDPOINT="$endpoint" \ + mpirun -np "$np" "$bindir/src/model-net-mpi-replay" \ + --synch=1 \ + --workload_type=swm-online \ + --disable_compute=0 \ + --payload_sz=4096 \ + --workload_conf_file="$expfolder/kb.dfdally-72-milc-small.workload.conf" \ + --alloc_file="$expfolder/kb.dfdally-72-milc-small.alloc.conf" \ + -- "$expfolder/iteration-hybrid.conf" +) > "$artifacts/iteration-hybrid.out" \ + 2> "$artifacts/iteration-hybrid.err" + +grep 'Net Events Processed' "$artifacts/iteration-hybrid.out" +grep -E '\[DIR\] iteration-time predictions director_id=[0-9]+ count=[1-9][0-9]* values=' "$artifacts/iteration-hybrid.err" + +exit 0