From a802b22b7a0b0ae7c84b0bbe361fa421de88b1c3 Mon Sep 17 00:00:00 2001 From: "Lorenzo L. Romero" Date: Sat, 9 May 2026 20:24:26 -0400 Subject: [PATCH 1/3] Add patch registry service to fne --- configs/fne-config.example.yml | 11 + src/common/network/BaseNetwork.h | 1 + src/common/network/RTPFNEHeader.h | 3 +- src/fne/PatchStatusRegistry.cpp | 405 ++++++++++++++++++++++++++++ src/fne/PatchStatusRegistry.h | 101 +++++++ src/fne/network/MetadataNetwork.cpp | 64 +++++ src/fne/network/TrafficNetwork.cpp | 105 ++++++++ src/fne/network/TrafficNetwork.h | 35 +++ 8 files changed, 724 insertions(+), 1 deletion(-) create mode 100644 src/fne/PatchStatusRegistry.cpp create mode 100644 src/fne/PatchStatusRegistry.h diff --git a/configs/fne-config.example.yml b/configs/fne-config.example.yml index 2fb1c1198..5c221faf5 100644 --- a/configs/fne-config.example.yml +++ b/configs/fne-config.example.yml @@ -120,6 +120,17 @@ master: # spanning tree updates.) spanningTreeFastReconnect: true + # Console patch status registry configuration. + patchStatus: + # Flag indicating whether or not console patch status publishing is enabled. + enabled: true + # Default TTL, in seconds, for console patch status updates that do not specify one. + defaultTtlSeconds: 15 + # Minimum accepted TTL, in seconds, for console patch status updates. + minTtlSeconds: 5 + # Maximum accepted TTL, in seconds, for console patch status updates. + maxTtlSeconds: 300 + # Flag indicating whether or not peer pinging will be reported. reportPeerPing: true diff --git a/src/common/network/BaseNetwork.h b/src/common/network/BaseNetwork.h index e6538f00a..a40230119 100644 --- a/src/common/network/BaseNetwork.h +++ b/src/common/network/BaseNetwork.h @@ -69,6 +69,7 @@ #define TAG_TRANSFER_ACT_LOG "TRNSLOG" #define TAG_TRANSFER_DIAG_LOG "TRNSDIAG" #define TAG_TRANSFER_STATUS "TRNSSTS" +#define TAG_TRANSFER_PATCH_STATUS "TRNSPTCH" #define TAG_ANNOUNCE "ANNC" #define TAG_PEER_REPLICA "REPL" diff --git a/src/common/network/RTPFNEHeader.h b/src/common/network/RTPFNEHeader.h index 3b8422768..9e52e8b83 100644 --- a/src/common/network/RTPFNEHeader.h +++ b/src/common/network/RTPFNEHeader.h @@ -100,6 +100,7 @@ namespace network TRANSFER_SUBFUNC_ACTIVITY = 0x01U, //!< Activity Log Transfer TRANSFER_SUBFUNC_DIAG = 0x02U, //!< Diagnostic Log Transfer TRANSFER_SUBFUNC_STATUS = 0x03U, //!< Status Transfer + TRANSFER_SUBFUNC_PATCH_STATUS = 0x04U, //!< Console Patch Status Transfer ANNC_SUBFUNC_GRP_AFFIL = 0x00U, //!< Announce Group Affiliation ANNC_SUBFUNC_UNIT_REG = 0x01U, //!< Announce Unit Registration @@ -215,4 +216,4 @@ namespace network } // namespace frame } // namespace network -#endif // __RTP_FNE_HEADER_H__ \ No newline at end of file +#endif // __RTP_FNE_HEADER_H__ diff --git a/src/fne/PatchStatusRegistry.cpp b/src/fne/PatchStatusRegistry.cpp new file mode 100644 index 000000000..132ef4840 --- /dev/null +++ b/src/fne/PatchStatusRegistry.cpp @@ -0,0 +1,405 @@ +// SPDX-License-Identifier: GPL-2.0-only +/* + * Digital Voice Modem - Converged FNE Software + * GPLv2 Open Source. Use is subject to license terms. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright (C) 2026 DVMProject Authors + * + */ +#include "fne/PatchStatusRegistry.h" + +#include "common/Log.h" + +#include +#include +#include +#include +#include + +constexpr uint32_t PatchStatusRegistry::DEFAULT_TTL_SECONDS; +constexpr uint32_t PatchStatusRegistry::MIN_TTL_SECONDS; +constexpr uint32_t PatchStatusRegistry::MAX_TTL_SECONDS; +constexpr uint32_t PatchStatusRegistry::MAX_WAIT_MS; + +PatchStatusRegistry::PatchStatusRegistry() : + m_mutex(), + m_revisionChanged(), + m_peerPatches(), + m_revision(0U), + m_defaultTtlSeconds(DEFAULT_TTL_SECONDS), + m_minTtlSeconds(MIN_TTL_SECONDS), + m_maxTtlSeconds(MAX_TTL_SECONDS) +{ + /* stub */ +} + +void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds) +{ + if (minTtlSeconds == 0U) + minTtlSeconds = MIN_TTL_SECONDS; + if (maxTtlSeconds < minTtlSeconds) + maxTtlSeconds = minTtlSeconds; + + std::lock_guard guard(m_mutex); + m_minTtlSeconds = minTtlSeconds; + m_maxTtlSeconds = maxTtlSeconds; + m_defaultTtlSeconds = std::max(m_minTtlSeconds, std::min(defaultTtlSeconds, m_maxTtlSeconds)); +} + +bool PatchStatusRegistry::publish(json::object& request, json::object& response, std::string& errorMessage) +{ + if (!request["peerId"].is()) { + errorMessage = "peerId was not a valid integer"; + return false; + } + + if (!request["patches"].is()) { + errorMessage = "patches was not a valid array"; + return false; + } + + PeerPatchSnapshot incoming; + incoming.peerId = request["peerId"].get(); + if (incoming.peerId == 0U) { + errorMessage = "peerId cannot be zero"; + return false; + } + + if (request["peerName"].is()) + incoming.peerName = request["peerName"].get(); + + if (request["sequence"].is()) + incoming.sequence = request["sequence"].get(); + + uint32_t ttlSeconds = defaultTtlSeconds(); + if (request["ttlSeconds"].is()) + ttlSeconds = request["ttlSeconds"].get(); + ttlSeconds = clampTtl(ttlSeconds); + + incoming.updatedAt = nowMs(); + incoming.expiresAt = incoming.updatedAt + (static_cast(ttlSeconds) * 1000U); + + json::array patches = request["patches"].get(); + for (json::value& value : patches) { + if (!value.is()) { + errorMessage = "patches contained a non-object entry"; + return false; + } + + json::object patchObj = value.get(); + PatchRecord patch; + if (!parsePatch(patchObj, patch, errorMessage)) + return false; + + incoming.patches.push_back(patch); + } + + cleanupExpired(); + + { + std::lock_guard guard(m_mutex); + if (incoming.patches.empty()) + m_peerPatches.erase(incoming.peerId); + else + m_peerPatches[incoming.peerId] = incoming; + bumpRevisionLocked(); + + response = snapshotLocked(); + response["acceptedPeerId"].set(incoming.peerId); + response["ttlSeconds"].set(ttlSeconds); + } + + m_revisionChanged.notify_all(); + return true; +} + +bool PatchStatusRegistry::removePeer(uint32_t peerId) +{ + if (peerId == 0U) + return false; + + bool removed = false; + { + std::lock_guard guard(m_mutex); + removed = m_peerPatches.erase(peerId) > 0U; + if (removed) + bumpRevisionLocked(); + } + + if (removed) + m_revisionChanged.notify_all(); + + return removed; +} + +uint32_t PatchStatusRegistry::cleanupExpired() +{ + uint32_t removed = 0U; + bool changed = false; + uint64_t now = nowMs(); + + { + std::lock_guard guard(m_mutex); + for (auto it = m_peerPatches.begin(); it != m_peerPatches.end();) { + if (it->second.expiresAt <= now) { + it = m_peerPatches.erase(it); + removed++; + changed = true; + } + else { + ++it; + } + } + + if (changed) + bumpRevisionLocked(); + } + + if (changed) + m_revisionChanged.notify_all(); + + return removed; +} + +json::object PatchStatusRegistry::snapshot() +{ + cleanupExpired(); + + std::lock_guard guard(m_mutex); + return snapshotLocked(); +} + +json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_t waitMs) +{ + waitMs = std::min(waitMs, MAX_WAIT_MS); + cleanupExpired(); + + std::unique_lock lock(m_mutex); + if (waitMs > 0U && sinceRevision >= m_revision) { + m_revisionChanged.wait_for(lock, std::chrono::milliseconds(waitMs), [&]() { + return m_revision > sinceRevision; + }); + } + + return snapshotLocked(); +} + +uint64_t PatchStatusRegistry::revision() const +{ + std::lock_guard guard(m_mutex); + return m_revision; +} + +uint32_t PatchStatusRegistry::defaultTtlSeconds() const +{ + std::lock_guard guard(m_mutex); + return m_defaultTtlSeconds; +} + +uint32_t PatchStatusRegistry::minTtlSeconds() const +{ + std::lock_guard guard(m_mutex); + return m_minTtlSeconds; +} + +uint32_t PatchStatusRegistry::maxTtlSeconds() const +{ + std::lock_guard guard(m_mutex); + return m_maxTtlSeconds; +} + +uint64_t PatchStatusRegistry::nowMs() +{ + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); +} + +std::string PatchStatusRegistry::normalizeMode(const std::string& mode) +{ + std::string normalized = mode; + std::transform(normalized.begin(), normalized.end(), normalized.begin(), [](unsigned char c) { + return static_cast(std::tolower(c)); + }); + return normalized; +} + +std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member) +{ + std::ostringstream ss; + ss << normalizeMode(member.mode) << ':' << member.tgid << ':' << static_cast(member.slot); + return ss.str(); +} + +json::object PatchStatusRegistry::memberToJson(const PatchMember& member) +{ + json::object obj = json::object(); + obj["system"].set(member.system); + obj["mode"].set(member.mode); + obj["tgid"].set(member.tgid); + obj["slot"].set(member.slot); + obj["key"].set(buildTalkgroupKey(member)); + return obj; +} + +json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch) +{ + json::object obj = json::object(); + obj["patchId"].set(patch.patchId); + obj["active"].set(patch.active); + obj["oneWay"].set(patch.oneWay); + + json::array members = json::array(); + for (const PatchMember& member : patch.members) + members.push_back(json::value(memberToJson(member))); + obj["members"].set(members); + + return obj; +} + +json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& peer) +{ + json::object obj = json::object(); + obj["peerId"].set(peer.peerId); + obj["peerName"].set(peer.peerName); + obj["sequence"].set(peer.sequence); + obj["updatedAt"].set(peer.updatedAt); + obj["expiresAt"].set(peer.expiresAt); + + json::array patches = json::array(); + for (const PatchRecord& patch : peer.patches) + patches.push_back(json::value(patchToJson(patch))); + obj["patches"].set(patches); + + return obj; +} + +bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const +{ + if (obj["patchId"].is()) + patch.patchId = obj["patchId"].get(); + + if (obj["active"].is()) + patch.active = obj["active"].get(); + + if (obj["oneWay"].is()) + patch.oneWay = obj["oneWay"].get(); + + if (!obj["members"].is()) { + errorMessage = "patch members was not a valid array"; + return false; + } + + json::array members = obj["members"].get(); + for (json::value& value : members) { + if (!value.is()) { + errorMessage = "patch members contained a non-object entry"; + return false; + } + + json::object memberObj = value.get(); + PatchMember member; + if (!parseMember(memberObj, member, errorMessage)) + return false; + + patch.members.push_back(member); + } + + return true; +} + +bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const +{ + if (obj["system"].is()) + member.system = obj["system"].get(); + + if (obj["mode"].is()) + member.mode = normalizeMode(obj["mode"].get()); + else + member.mode = "unknown"; + + if (!obj["tgid"].is()) { + errorMessage = "patch member tgid was not a valid integer"; + return false; + } + + member.tgid = obj["tgid"].get(); + if (member.tgid == 0U) { + errorMessage = "patch member tgid cannot be zero"; + return false; + } + + if (obj["slot"].is()) + member.slot = obj["slot"].get(); + else if (obj["slot"].is()) { + uint32_t slot = obj["slot"].get(); + if (slot > std::numeric_limits::max()) { + errorMessage = "patch member slot was out of range"; + return false; + } + member.slot = static_cast(slot); + } + + return true; +} + +uint32_t PatchStatusRegistry::clampTtl(uint32_t ttlSeconds) const +{ + std::lock_guard guard(m_mutex); + return std::max(m_minTtlSeconds, std::min(ttlSeconds, m_maxTtlSeconds)); +} + +json::object PatchStatusRegistry::snapshotLocked() const +{ + json::object response = json::object(); + response["revision"].set(m_revision); + + json::array peers = json::array(); + json::array patches = json::array(); + json::object byTalkgroup = json::object(); + + for (const auto& entry : m_peerPatches) { + const PeerPatchSnapshot& peer = entry.second; + peers.push_back(json::value(peerSnapshotToJson(peer))); + + for (const PatchRecord& patch : peer.patches) { + json::object patchObj = patchToJson(patch); + patchObj["peerId"].set(peer.peerId); + patchObj["peerName"].set(peer.peerName); + patchObj["updatedAt"].set(peer.updatedAt); + patchObj["expiresAt"].set(peer.expiresAt); + patches.push_back(json::value(patchObj)); + + for (const PatchMember& member : patch.members) { + std::string key = buildTalkgroupKey(member); + json::array entries = json::array(); + if (byTalkgroup[key].is()) + entries = byTalkgroup[key].get(); + + json::object tgPatch = json::object(); + tgPatch["peerId"].set(peer.peerId); + tgPatch["peerName"].set(peer.peerName); + tgPatch["patchId"].set(patch.patchId); + tgPatch["active"].set(patch.active); + tgPatch["oneWay"].set(patch.oneWay); + tgPatch["updatedAt"].set(peer.updatedAt); + tgPatch["expiresAt"].set(peer.expiresAt); + tgPatch["member"].set(memberToJson(member)); + entries.push_back(json::value(tgPatch)); + byTalkgroup[key].set(entries); + } + } + } + + response["peers"].set(peers); + response["patches"].set(patches); + response["byTalkgroup"].set(byTalkgroup); + return response; +} + +void PatchStatusRegistry::bumpRevisionLocked() +{ + m_revision++; + if (m_revision == 0U) + m_revision = 1U; +} diff --git a/src/fne/PatchStatusRegistry.h b/src/fne/PatchStatusRegistry.h new file mode 100644 index 000000000..0537e6cd2 --- /dev/null +++ b/src/fne/PatchStatusRegistry.h @@ -0,0 +1,101 @@ +// SPDX-License-Identifier: GPL-2.0-only +/* + * Digital Voice Modem - Converged FNE Software + * GPLv2 Open Source. Use is subject to license terms. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright (C) 2026 DVMProject Authors + * + */ +/** + * @file PatchStatusRegistry.h + * @ingroup fne + */ +#if !defined(__FNE_PATCH_STATUS_REGISTRY_H__) +#define __FNE_PATCH_STATUS_REGISTRY_H__ + +#include "fne/Defines.h" +#include "common/json/json.h" + +#include +#include +#include +#include +#include +#include + +/** + * @brief In-memory registry for console-advertised patch status. + * @ingroup fne + */ +class HOST_SW_API PatchStatusRegistry { +public: + static constexpr uint32_t DEFAULT_TTL_SECONDS = 15U; + static constexpr uint32_t MIN_TTL_SECONDS = 5U; + static constexpr uint32_t MAX_TTL_SECONDS = 300U; + static constexpr uint32_t MAX_WAIT_MS = 30000U; + + PatchStatusRegistry(); + ~PatchStatusRegistry() = default; + + void configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds); + + bool publish(json::object& request, json::object& response, std::string& errorMessage); + bool removePeer(uint32_t peerId); + uint32_t cleanupExpired(); + + json::object snapshot(); + json::object waitForChanges(uint64_t sinceRevision, uint32_t waitMs); + + uint64_t revision() const; + uint32_t defaultTtlSeconds() const; + uint32_t minTtlSeconds() const; + uint32_t maxTtlSeconds() const; + +private: + struct PatchMember { + std::string system; + std::string mode; + uint32_t tgid = 0U; + uint8_t slot = 0U; + }; + + struct PatchRecord { + std::string patchId; + bool active = true; + bool oneWay = false; + std::vector members; + }; + + struct PeerPatchSnapshot { + uint32_t peerId = 0U; + std::string peerName; + uint32_t sequence = 0U; + uint64_t updatedAt = 0U; + uint64_t expiresAt = 0U; + std::vector patches; + }; + + static uint64_t nowMs(); + static std::string normalizeMode(const std::string& mode); + static std::string buildTalkgroupKey(const PatchMember& member); + static json::object memberToJson(const PatchMember& member); + static json::object patchToJson(const PatchRecord& patch); + static json::object peerSnapshotToJson(const PeerPatchSnapshot& peer); + + bool parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const; + bool parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const; + uint32_t clampTtl(uint32_t ttlSeconds) const; + json::object snapshotLocked() const; + void bumpRevisionLocked(); + + mutable std::mutex m_mutex; + std::condition_variable m_revisionChanged; + std::unordered_map m_peerPatches; + uint64_t m_revision; + uint32_t m_defaultTtlSeconds; + uint32_t m_minTtlSeconds; + uint32_t m_maxTtlSeconds; +}; + +#endif // __FNE_PATCH_STATUS_REGISTRY_H__ diff --git a/src/fne/network/MetadataNetwork.cpp b/src/fne/network/MetadataNetwork.cpp index 560faaeb8..bc58a09fb 100644 --- a/src/fne/network/MetadataNetwork.cpp +++ b/src/fne/network/MetadataNetwork.cpp @@ -12,6 +12,7 @@ #include "common/Log.h" #include "common/Utils.h" #include "network/MetadataNetwork.h" +#include "common/json/json.h" #include "fne/ActivityLog.h" #include "HostFNE.h" @@ -375,6 +376,69 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) } break; + case NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS: // Console Patch Status Transfer + { + if (pktPeerId > 0 && validPeerId) { + FNEPeerConnection* connection = network->m_peers[pktPeerId]; + if (connection != nullptr) { + if (!network->patchStatusEnabled()) { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); + break; + } + + std::string ip = udp::Socket::address(req->address); + + // Only authenticated console peers may publish or request patch registry state. + if (req->length <= 11U) { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET); + break; + } + + if (connection->connected() && connection->address() == ip && connection->peerClass() == PEER_CONN_CLASS_CONSOLE) { + DECLARE_UINT8_ARRAY(rawPayload, req->length - 11U); + ::memcpy(rawPayload, req->buffer + 11U, req->length - 11U); + std::string payload(rawPayload, rawPayload + (req->length - 11U)); + + json::value v; + std::string err = json::parse(v, payload); + if (!err.empty() || !v.is()) { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET); + break; + } + + json::object reqObj = v.get(); + std::string type = "snapshot"; + if (reqObj["type"].is()) + type = reqObj["type"].get(); + + if (type == "request") { + json::object snapshot = network->patchStatusRegistry().snapshot(); + network->writePatchStatusToPeer(pktPeerId, snapshot); + break; + } + + // The authenticated peer identity is authoritative; do not allow spoofed peer IDs. + reqObj["peerId"].set(pktPeerId); + if (!reqObj["peerName"].is() || reqObj["peerName"].get().empty()) + reqObj["peerName"].set(connection->identity()); + + json::object response = json::object(); + std::string errorMessage; + if (!network->patchStatusRegistry().publish(reqObj, response, errorMessage)) { + LogWarning(LOG_MASTER, "PEER %u (%s) invalid patch status payload, %s", pktPeerId, connection->identWithQualifier().c_str(), errorMessage.c_str()); + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET); + break; + } + + network->writePatchStatusToConsoles(response); + } + else { + network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); + } + } + } + } + break; default: network->writePeerNAK(peerId, network->createStreamId(), TAG_TRANSFER, NET_CONN_NAK_ILLEGAL_PACKET); Utils::dump("Unknown transfer opcode from the peer", req->buffer, req->length); diff --git a/src/fne/network/TrafficNetwork.cpp b/src/fne/network/TrafficNetwork.cpp index 176a80aef..ab06bb227 100644 --- a/src/fne/network/TrafficNetwork.cpp +++ b/src/fne/network/TrafficNetwork.cpp @@ -109,6 +109,8 @@ TrafficNetwork::TrafficNetwork(HostFNE* host, const std::string& address, uint16 m_maintainenceTimer(1000U, pingTime), m_updateLookupTimer(1000U, (updateLookupTime * 60U)), m_haUpdateTimer(1000U, FIXED_HA_UPDATE_INTERVAL), + m_patchStatusRegistry(), + m_patchStatusEnabled(true), m_softConnLimit(0U), m_enableSpanningTree(true), m_logSpanningTreeChanges(false), @@ -229,6 +231,13 @@ void TrafficNetwork::setOptions(yaml::Node& conf, bool printOptions) m_logSpanningTreeChanges = conf["logSpanningTreeChanges"].as(false); m_spanningTreeFastReconnect = conf["spanningTreeFastReconnect"].as(true); + yaml::Node patchStatusConf = conf["patchStatus"]; + m_patchStatusEnabled = patchStatusConf["enabled"].as(true); + uint32_t patchStatusDefaultTtl = patchStatusConf["defaultTtlSeconds"].as(PatchStatusRegistry::DEFAULT_TTL_SECONDS); + uint32_t patchStatusMinTtl = patchStatusConf["minTtlSeconds"].as(PatchStatusRegistry::MIN_TTL_SECONDS); + uint32_t patchStatusMaxTtl = patchStatusConf["maxTtlSeconds"].as(PatchStatusRegistry::MAX_TTL_SECONDS); + m_patchStatusRegistry.configure(patchStatusDefaultTtl, patchStatusMinTtl, patchStatusMaxTtl); + // always force disable ADJ_STS_BCAST to neighbor FNE peers if the all option // is enabled if (m_disallowAdjStsBcast) { @@ -350,6 +359,12 @@ void TrafficNetwork::setOptions(yaml::Node& conf, bool printOptions) LogInfo(" Enable Peer Spanning Tree: %s", m_enableSpanningTree ? "yes" : "no"); LogInfo(" Log Spanning Tree Changes: %s", m_logSpanningTreeChanges ? "yes" : "no"); LogInfo(" Spanning Tree Allow Fast Reconnect: %s", m_spanningTreeFastReconnect ? "yes" : "no"); + LogInfo(" Console Patch Status Enabled: %s", m_patchStatusEnabled ? "yes" : "no"); + if (m_patchStatusEnabled) { + LogInfo(" Console Patch Status Default TTL: %us", m_patchStatusRegistry.defaultTtlSeconds()); + LogInfo(" Console Patch Status Minimum TTL: %us", m_patchStatusRegistry.minTtlSeconds()); + LogInfo(" Console Patch Status Maximum TTL: %us", m_patchStatusRegistry.maxTtlSeconds()); + } LogInfo(" Disable adjacent site broadcasts to any peers: %s", m_disallowAdjStsBcast ? "yes" : "no"); if (m_disallowAdjStsBcast) { LogWarning(LOG_MASTER, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!"); @@ -636,6 +651,8 @@ void TrafficNetwork::clock(uint32_t ms) } // cleanup possibly stale data calls + if (m_patchStatusEnabled && m_patchStatusRegistry.cleanupExpired() > 0U) + writePatchStatusToConsoles(m_patchStatusRegistry.snapshot()); m_tagDMR->packetData()->cleanupStale(); m_tagP25->packetData()->cleanupStale(); @@ -1571,6 +1588,8 @@ void TrafficNetwork::taskNetworkRx(NetPacketRequest* req) // spin up a thread and send metadata over to peer network->peerMetadataUpdate(peerId); + if (network->m_patchStatusEnabled && connection->peerClass() == PEER_CONN_CLASS_CONSOLE) + network->writePatchStatusToPeer(peerId, network->patchStatusRegistry().snapshot()); } } } @@ -2304,6 +2323,10 @@ void TrafficNetwork::erasePeer(uint32_t peerId) } } + // erase any console patch status records for this peer + if (m_patchStatusEnabled && m_patchStatusRegistry.removePeer(peerId)) + writePatchStatusToConsoles(m_patchStatusRegistry.snapshot()); + // erase any HA parameters for this peer { auto it = std::find_if(m_peerReplicaHAParams.begin(), m_peerReplicaHAParams.end(), [&](auto& x) { return x.peerId == peerId; }); @@ -2408,6 +2431,88 @@ json::object TrafficNetwork::fneConnObject(uint32_t peerId, FNEPeerConnection *c return peerObj; } +/* Helper to send patch status state to one console peer. */ + +bool TrafficNetwork::writePatchStatusToPeer(uint32_t peerId, json::object obj) +{ + if (peerId == 0U) + return false; + if (!m_patchStatusEnabled) + return false; + + bool ret = false; + m_peers.shared_lock(); + auto it = std::find_if(m_peers.begin(), m_peers.end(), [&](PeerMapPair x) { return x.first == peerId; }); + if (it != m_peers.end() && it->second != nullptr) + ret = writePatchStatusPayload(it->second, obj); + m_peers.shared_unlock(); + + return ret; +} + +/* Helper to broadcast patch status state to connected console peers. */ + +void TrafficNetwork::writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId) +{ + if (!m_patchStatusEnabled) + return; + + m_peers.shared_lock(); + if (m_peers.size() == 0U) { + m_peers.shared_unlock(); + return; + } + + for (auto peer : m_peers) { + if (peer.first == exceptPeerId) + continue; + if (peer.second == nullptr) + continue; + if (!peer.second->connected() || peer.second->peerClass() != PEER_CONN_CLASS_CONSOLE) + continue; + + writePatchStatusPayload(peer.second, obj); + } + m_peers.shared_unlock(); +} + +/* Helper to serialize and queue a patch status transfer payload. */ + +bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json::object obj) +{ + if (connection == nullptr) + return false; + if (!m_patchStatusEnabled) + return false; + if (!connection->connected()) + return false; + if (connection->peerClass() != PEER_CONN_CLASS_CONSOLE) + return false; + + obj["type"].set("registry"); + json::value v = json::value(obj); + std::string payload = std::string(v.serialize()); + uint32_t len = static_cast(payload.length()); + if ((len + 11U) > DATA_PACKET_LENGTH) { + LogError(LOG_MASTER, "PEER %u (%s) patch status registry payload too large, len = %u", connection->id(), connection->identWithQualifier().c_str(), len); + return false; + } + + uint8_t buffer[DATA_PACKET_LENGTH]; + ::memset(buffer, 0x00U, DATA_PACKET_LENGTH); + ::memcpy(buffer + 11U, payload.c_str(), len); + + sockaddr_storage addr = connection->socketStorage(); + uint32_t addrLen = connection->sockStorageLen(); + + if (m_debug) { + LogDebug(LOG_MASTER, "PEER %u (%s) sending patch status registry, len = %u", connection->id(), connection->identWithQualifier().c_str(), len); + } + + return m_frameQueue->write(buffer, len + 11U, createStreamId(), connection->id(), m_peerId, + { NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen); +} + /* Helper to reset a peer connection. */ bool TrafficNetwork::resetPeer(uint32_t peerId) diff --git a/src/fne/network/TrafficNetwork.h b/src/fne/network/TrafficNetwork.h index 27d7f4d39..f234df879 100644 --- a/src/fne/network/TrafficNetwork.h +++ b/src/fne/network/TrafficNetwork.h @@ -41,6 +41,7 @@ #include "fne/network/FNEPeerConnection.h" #include "fne/network/SpanningTree.h" #include "fne/network/HAParameters.h" +#include "fne/PatchStatusRegistry.h" #include "fne/CryptoContainer.h" #include @@ -274,6 +275,29 @@ namespace network * @return json::object */ json::object fneConnObject(uint32_t peerId, FNEPeerConnection* conn); + /** + * @brief Gets the console patch status registry. + * @return PatchStatusRegistry& Patch status registry. + */ + PatchStatusRegistry& patchStatusRegistry() { return m_patchStatusRegistry; } + /** + * @brief Flag indicating whether console patch status handling is enabled. + * @returns bool True, if enabled. + */ + bool patchStatusEnabled() const { return m_patchStatusEnabled; } + /** + * @brief Sends patch status registry state to one console peer. + * @param peerId Destination peer ID. + * @param obj Patch status JSON payload. + * @returns bool True, if the message was queued, otherwise false. + */ + bool writePatchStatusToPeer(uint32_t peerId, json::object obj); + /** + * @brief Broadcasts patch status registry state to connected console peers. + * @param obj Patch status JSON payload. + * @param exceptPeerId Optional peer ID to skip. + */ + void writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId = 0U); /** * @brief Helper to reset a peer connection. @@ -359,6 +383,9 @@ namespace network Timer m_updateLookupTimer; Timer m_haUpdateTimer; + PatchStatusRegistry m_patchStatusRegistry; + bool m_patchStatusEnabled; + uint32_t m_softConnLimit; bool m_enableSpanningTree; @@ -793,6 +820,14 @@ namespace network */ bool writePeerNAK(uint32_t peerId, const char* tag, NET_CONN_NAK_REASON reason, sockaddr_storage& addr, uint32_t addrLen); + /** + * @brief Serializes and queues a patch status transfer payload. + * @param connection Destination connection. + * @param obj Patch status JSON payload. + * @returns bool True, if message was queued, otherwise false. + */ + bool writePatchStatusPayload(FNEPeerConnection* connection, json::object obj); + /* ** Internal KMM Callback. */ From 9c3601462f67a4ed82d75fd2213935b2b1a36480 Mon Sep 17 00:00:00 2001 From: "Lorenzo L. Romero" Date: Sun, 17 May 2026 10:47:20 -0400 Subject: [PATCH 2/3] FNE patch status registry replication. Add docs to new patch registry functions. Replace unordered map with mutex guarded map. --- build-windows.ps1 | 23 ++++ docs/WINDOWS_BUILD.md | 30 +++++ src/common/network/RTPFNEHeader.h | 1 + src/fne/HostFNE.cpp | 10 ++ src/fne/HostFNE.h | 7 ++ src/fne/PatchStatusRegistry.cpp | 58 +++++++++ src/fne/PatchStatusRegistry.h | 181 ++++++++++++++++++++++++---- src/fne/network/MetadataNetwork.cpp | 85 +++++++++++++ src/fne/network/MetadataNetwork.h | 1 + src/fne/network/PeerNetwork.cpp | 62 ++++++++++ src/fne/network/PeerNetwork.h | 33 ++++- src/fne/network/TrafficNetwork.cpp | 106 +++++++++++++++- src/fne/network/TrafficNetwork.h | 19 +++ 13 files changed, 592 insertions(+), 24 deletions(-) create mode 100644 build-windows.ps1 create mode 100644 docs/WINDOWS_BUILD.md diff --git a/build-windows.ps1 b/build-windows.ps1 new file mode 100644 index 000000000..e41ac4c1f --- /dev/null +++ b/build-windows.ps1 @@ -0,0 +1,23 @@ +param( + [string]$Generator = "Ninja", + [string]$BuildDir = "out\\build\\windows", + [string]$Config = "RelWithDebInfo" +) + +function Abort($msg) { + Write-Error $msg + exit 1 +} + +if (-not (Get-Command cmake -ErrorAction SilentlyContinue)) { Abort "cmake not found on PATH. Install CMake and retry." } +if ($Generator -eq "Ninja" -and -not (Get-Command ninja -ErrorAction SilentlyContinue)) { Abort "ninja not found on PATH. Install Ninja and retry." } + +Write-Host "Configuring (Generator=$Generator, BuildDir=$BuildDir, Config=$Config)" +cmake -S . -B $BuildDir -G $Generator -DCOMPILE_WIN32=ON -DCMAKE_BUILD_TYPE=$Config +if ($LASTEXITCODE -ne 0) { Abort "CMake configure failed." } + +Write-Host "Building" +cmake --build $BuildDir --config $Config -- -v +if ($LASTEXITCODE -ne 0) { Abort "Build failed." } + +Write-Host "Build finished. Artifacts are in: $BuildDir" \ No newline at end of file diff --git a/docs/WINDOWS_BUILD.md b/docs/WINDOWS_BUILD.md new file mode 100644 index 000000000..5fc004131 --- /dev/null +++ b/docs/WINDOWS_BUILD.md @@ -0,0 +1,30 @@ +Windows build prerequisites and steps for dvmhost + +Prerequisites +- Visual Studio 2019/2022 (Desktop development with C++ workload) or at least the MSVC build tools. +- CMake 3.16 or newer +- Ninja (recommended generator) +- Git +- (Optional) OpenSSL for Windows if you need SSL; not required by default when building with `-DCOMPILE_WIN32=ON`. + +Quick steps +1. Open a Developer command prompt (e.g. "x64 Native Tools Command Prompt for VS 2022") or run the MSVC environment so compilers are on PATH. +2. Ensure `cmake` and `ninja` are on PATH. +3. From the repository root: + +```powershell +# create an out/build directory and configure +mkdir -p out\build\windows +cmake -S . -B out\build\windows -G Ninja -DCOMPILE_WIN32=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo + +# build +cmake --build out\build\windows --config RelWithDebInfo +``` + +Notes +- The project provides `CMakeSettings.json` presets for Visual Studio Code/CMake Tools which enable `COMPILE_WIN32` and use Ninja. +- TUI support is disabled on Windows by design; some utilities that require ncurses will not be available when `COMPILE_WIN32=ON`. +- If you prefer Visual Studio IDE: open the CMake project in Visual Studio, select the provided configuration (see `CMakeSettings.json`) and build. +- If you need OpenSSL for Windows, install a binary distribution and set `-DOPENSSL_ROOT_DIR="C:/path/to/openssl"` when invoking `cmake`. + +If you want, run `.uild-windows.ps1` from a Developer PowerShell prompt — it will run the configure+build steps automatically. \ No newline at end of file diff --git a/src/common/network/RTPFNEHeader.h b/src/common/network/RTPFNEHeader.h index 9e52e8b83..67563259b 100644 --- a/src/common/network/RTPFNEHeader.h +++ b/src/common/network/RTPFNEHeader.h @@ -115,6 +115,7 @@ namespace network REPL_ACT_PEER_LIST = 0xA2U, //!< FNE Replication Active Peer List Transfer REPL_HA_PARAMS = 0xA3U, //!< FNE Replication HA Parameters + REPL_PATCH_STATUS = 0xA4U, //!< FNE Replication Patch Status Transfer NET_TREE_LIST = 0x00U, //!< FNE Network Tree List NET_TREE_DISC = 0x01U //!< FNE Network Tree Disconnect diff --git a/src/fne/HostFNE.cpp b/src/fne/HostFNE.cpp index cb85d22fa..52f969d5b 100644 --- a/src/fne/HostFNE.cpp +++ b/src/fne/HostFNE.cpp @@ -899,6 +899,7 @@ bool HostFNE::createPeerNetworks() network->setNetTreeDiscCallback(std::bind(&HostFNE::processNetworkTreeDisconnect, this, std::placeholders::_1, std::placeholders::_2)); network->setNotifyPeerReplicaCallback(std::bind(&HostFNE::processPeerReplicaNotify, this, std::placeholders::_1)); + network->setPatchStatusCallback(std::bind(&HostFNE::processPeerPatchStatus, this, std::placeholders::_1, std::placeholders::_2)); network->enable(enabled); if (enabled) { @@ -1194,3 +1195,12 @@ void HostFNE::processPeerReplicaNotify(network::PeerNetwork* peerNetwork) m_network->setPeerReplica(true); } } + +/* Processes peer patch status replication. */ + +void HostFNE::processPeerPatchStatus(network::PeerNetwork* peerNetwork, json::object obj) +{ + if (m_network != nullptr && peerNetwork != nullptr) { + m_network->processReplicatedPatchStatus(peerNetwork->getPeerId(), obj); + } +} diff --git a/src/fne/HostFNE.h b/src/fne/HostFNE.h index 83d38a4e1..ef9d706b9 100644 --- a/src/fne/HostFNE.h +++ b/src/fne/HostFNE.h @@ -17,6 +17,7 @@ #define __HOST_FNE_H__ #include "Defines.h" +#include "common/json/json.h" #include "common/lookups/RadioIdLookup.h" #include "common/lookups/TalkgroupRulesLookup.h" #include "common/lookups/PeerListLookup.h" @@ -265,6 +266,12 @@ class HOST_SW_API HostFNE { * @param peerNetwork Peer network instance. */ void processPeerReplicaNotify(network::PeerNetwork* peerNetwork); + /** + * @brief Processes peer patch status replication. + * @param peerNetwork Peer network instance. + * @param obj Patch status JSON payload. + */ + void processPeerPatchStatus(network::PeerNetwork* peerNetwork, json::object obj); }; #endif // __HOST_FNE_H__ diff --git a/src/fne/PatchStatusRegistry.cpp b/src/fne/PatchStatusRegistry.cpp index 132ef4840..eb5870dc0 100644 --- a/src/fne/PatchStatusRegistry.cpp +++ b/src/fne/PatchStatusRegistry.cpp @@ -22,6 +22,8 @@ constexpr uint32_t PatchStatusRegistry::MIN_TTL_SECONDS; constexpr uint32_t PatchStatusRegistry::MAX_TTL_SECONDS; constexpr uint32_t PatchStatusRegistry::MAX_WAIT_MS; +/* Initializes a new instance of the PatchStatusRegistry class. */ + PatchStatusRegistry::PatchStatusRegistry() : m_mutex(), m_revisionChanged(), @@ -34,6 +36,8 @@ PatchStatusRegistry::PatchStatusRegistry() : /* stub */ } +/* Configures the accepted TTL range for patch status records. */ + void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds) { if (minTtlSeconds == 0U) @@ -47,6 +51,8 @@ void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlS m_defaultTtlSeconds = std::max(m_minTtlSeconds, std::min(defaultTtlSeconds, m_maxTtlSeconds)); } +/* Publishes a complete patch snapshot for one console peer. */ + bool PatchStatusRegistry::publish(json::object& request, json::object& response, std::string& errorMessage) { if (!request["peerId"].is()) { @@ -69,6 +75,9 @@ bool PatchStatusRegistry::publish(json::object& request, json::object& response, if (request["peerName"].is()) incoming.peerName = request["peerName"].get(); + if (request["originFnePeerId"].is()) + incoming.originFnePeerId = request["originFnePeerId"].get(); + if (request["sequence"].is()) incoming.sequence = request["sequence"].get(); @@ -99,6 +108,14 @@ bool PatchStatusRegistry::publish(json::object& request, json::object& response, { std::lock_guard guard(m_mutex); + auto existing = m_peerPatches.find(incoming.peerId); + if (existing != m_peerPatches.end() && incoming.sequence > 0U && existing->second.sequence > incoming.sequence) { + response = snapshotLocked(); + response["acceptedPeerId"].set(incoming.peerId); + response["ttlSeconds"].set(ttlSeconds); + return true; + } + if (incoming.patches.empty()) m_peerPatches.erase(incoming.peerId); else @@ -114,6 +131,8 @@ bool PatchStatusRegistry::publish(json::object& request, json::object& response, return true; } +/* Removes all patch records associated with a console peer. */ + bool PatchStatusRegistry::removePeer(uint32_t peerId) { if (peerId == 0U) @@ -133,6 +152,8 @@ bool PatchStatusRegistry::removePeer(uint32_t peerId) return removed; } +/* Removes expired patch status records. */ + uint32_t PatchStatusRegistry::cleanupExpired() { uint32_t removed = 0U; @@ -162,6 +183,8 @@ uint32_t PatchStatusRegistry::cleanupExpired() return removed; } +/* Creates a complete JSON snapshot of the registry. */ + json::object PatchStatusRegistry::snapshot() { cleanupExpired(); @@ -170,6 +193,8 @@ json::object PatchStatusRegistry::snapshot() return snapshotLocked(); } +/* Waits for registry changes after the supplied revision. */ + json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_t waitMs) { waitMs = std::min(waitMs, MAX_WAIT_MS); @@ -185,36 +210,48 @@ json::object PatchStatusRegistry::waitForChanges(uint64_t sinceRevision, uint32_ return snapshotLocked(); } +/* Gets the current registry revision. */ + uint64_t PatchStatusRegistry::revision() const { std::lock_guard guard(m_mutex); return m_revision; } +/* Gets the configured default patch status TTL. */ + uint32_t PatchStatusRegistry::defaultTtlSeconds() const { std::lock_guard guard(m_mutex); return m_defaultTtlSeconds; } +/* Gets the configured minimum patch status TTL. */ + uint32_t PatchStatusRegistry::minTtlSeconds() const { std::lock_guard guard(m_mutex); return m_minTtlSeconds; } +/* Gets the configured maximum patch status TTL. */ + uint32_t PatchStatusRegistry::maxTtlSeconds() const { std::lock_guard guard(m_mutex); return m_maxTtlSeconds; } +/* Gets the current system time in milliseconds. */ + uint64_t PatchStatusRegistry::nowMs() { return std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); } +/* Normalizes a mode string for key generation. */ + std::string PatchStatusRegistry::normalizeMode(const std::string& mode) { std::string normalized = mode; @@ -224,6 +261,8 @@ std::string PatchStatusRegistry::normalizeMode(const std::string& mode) return normalized; } +/* Builds a stable talkgroup lookup key for a patch member. */ + std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member) { std::ostringstream ss; @@ -231,6 +270,8 @@ std::string PatchStatusRegistry::buildTalkgroupKey(const PatchMember& member) return ss.str(); } +/* Serializes a patch member to JSON. */ + json::object PatchStatusRegistry::memberToJson(const PatchMember& member) { json::object obj = json::object(); @@ -242,6 +283,8 @@ json::object PatchStatusRegistry::memberToJson(const PatchMember& member) return obj; } +/* Serializes a patch record to JSON. */ + json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch) { json::object obj = json::object(); @@ -257,10 +300,13 @@ json::object PatchStatusRegistry::patchToJson(const PatchRecord& patch) return obj; } +/* Serializes a peer patch snapshot to JSON. */ + json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& peer) { json::object obj = json::object(); obj["peerId"].set(peer.peerId); + obj["originFnePeerId"].set(peer.originFnePeerId); obj["peerName"].set(peer.peerName); obj["sequence"].set(peer.sequence); obj["updatedAt"].set(peer.updatedAt); @@ -274,6 +320,8 @@ json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& pe return obj; } +/* Parses one patch record from JSON. */ + bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const { if (obj["patchId"].is()) @@ -308,6 +356,8 @@ bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std: return true; } +/* Parses one patch member from JSON. */ + bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const { if (obj["system"].is()) @@ -343,12 +393,16 @@ bool PatchStatusRegistry::parseMember(json::object& obj, PatchMember& member, st return true; } +/* Clamps a TTL value into the configured TTL range. */ + uint32_t PatchStatusRegistry::clampTtl(uint32_t ttlSeconds) const { std::lock_guard guard(m_mutex); return std::max(m_minTtlSeconds, std::min(ttlSeconds, m_maxTtlSeconds)); } +/* Creates a registry snapshot while the registry mutex is held. */ + json::object PatchStatusRegistry::snapshotLocked() const { json::object response = json::object(); @@ -365,6 +419,7 @@ json::object PatchStatusRegistry::snapshotLocked() const for (const PatchRecord& patch : peer.patches) { json::object patchObj = patchToJson(patch); patchObj["peerId"].set(peer.peerId); + patchObj["originFnePeerId"].set(peer.originFnePeerId); patchObj["peerName"].set(peer.peerName); patchObj["updatedAt"].set(peer.updatedAt); patchObj["expiresAt"].set(peer.expiresAt); @@ -378,6 +433,7 @@ json::object PatchStatusRegistry::snapshotLocked() const json::object tgPatch = json::object(); tgPatch["peerId"].set(peer.peerId); + tgPatch["originFnePeerId"].set(peer.originFnePeerId); tgPatch["peerName"].set(peer.peerName); tgPatch["patchId"].set(patch.patchId); tgPatch["active"].set(patch.active); @@ -397,6 +453,8 @@ json::object PatchStatusRegistry::snapshotLocked() const return response; } +/* Advances the registry revision while the registry mutex is held. */ + void PatchStatusRegistry::bumpRevisionLocked() { m_revision++; diff --git a/src/fne/PatchStatusRegistry.h b/src/fne/PatchStatusRegistry.h index 0537e6cd2..3c8ee02ca 100644 --- a/src/fne/PatchStatusRegistry.h +++ b/src/fne/PatchStatusRegistry.h @@ -19,83 +19,220 @@ #include #include +#include #include #include -#include #include /** * @brief In-memory registry for console-advertised patch status. + * + * The registry owns all patch status records published by console peers and + * replicated from neighboring FNEs. All access to the backing storage is + * serialized through the registry mutex; callers receive JSON snapshots and + * never receive direct references or iterators into the registry storage. + * * @ingroup fne */ class HOST_SW_API PatchStatusRegistry { public: + /** + * @brief Default number of seconds before a patch status record expires. + */ static constexpr uint32_t DEFAULT_TTL_SECONDS = 15U; + /** + * @brief Minimum accepted patch status TTL in seconds. + */ static constexpr uint32_t MIN_TTL_SECONDS = 5U; + /** + * @brief Maximum accepted patch status TTL in seconds. + */ static constexpr uint32_t MAX_TTL_SECONDS = 300U; + /** + * @brief Maximum wait time for long-poll style change requests. + */ static constexpr uint32_t MAX_WAIT_MS = 30000U; + /** + * @brief Initializes a new instance of the PatchStatusRegistry class. + */ PatchStatusRegistry(); + /** + * @brief Finalizes an instance of the PatchStatusRegistry class. + */ ~PatchStatusRegistry() = default; + /** + * @brief Configures the accepted TTL range for patch status records. + * @param defaultTtlSeconds Default TTL used when a publish request omits a TTL. + * @param minTtlSeconds Minimum accepted TTL. + * @param maxTtlSeconds Maximum accepted TTL. + */ void configure(uint32_t defaultTtlSeconds, uint32_t minTtlSeconds, uint32_t maxTtlSeconds); + /** + * @brief Publishes a complete patch snapshot for one console peer. + * @param request JSON request containing peerId, peerName, optional sequence, optional ttlSeconds, and patches. + * @param response JSON registry snapshot returned after the publish is applied. + * @param errorMessage Validation error text populated when the request is invalid. + * @returns bool True, if the publish request was valid and applied, otherwise false. + */ bool publish(json::object& request, json::object& response, std::string& errorMessage); + /** + * @brief Removes all patch records associated with a console peer. + * @param peerId Console peer ID whose records should be removed. + * @returns bool True, if records were removed, otherwise false. + */ bool removePeer(uint32_t peerId); + /** + * @brief Removes expired patch status records. + * @returns uint32_t Number of peer records removed. + */ uint32_t cleanupExpired(); + /** + * @brief Creates a complete JSON snapshot of the registry. + * @returns json::object Registry snapshot. + */ json::object snapshot(); + /** + * @brief Waits for registry changes after the supplied revision. + * @param sinceRevision Revision already known to the caller. + * @param waitMs Maximum wait time in milliseconds. + * @returns json::object Registry snapshot after change or timeout. + */ json::object waitForChanges(uint64_t sinceRevision, uint32_t waitMs); + /** + * @brief Gets the current registry revision. + * @returns uint64_t Registry revision number. + */ uint64_t revision() const; + /** + * @brief Gets the configured default patch status TTL. + * @returns uint32_t Default TTL in seconds. + */ uint32_t defaultTtlSeconds() const; + /** + * @brief Gets the configured minimum patch status TTL. + * @returns uint32_t Minimum TTL in seconds. + */ uint32_t minTtlSeconds() const; + /** + * @brief Gets the configured maximum patch status TTL. + * @returns uint32_t Maximum TTL in seconds. + */ uint32_t maxTtlSeconds() const; private: + /** + * @brief Represents a talkgroup member participating in a patch. + */ struct PatchMember { - std::string system; - std::string mode; - uint32_t tgid = 0U; - uint8_t slot = 0U; + std::string system; //!< System name for the member. + std::string mode; //!< Digital mode for the member. + uint32_t tgid = 0U; //!< Talkgroup ID for the member. + uint8_t slot = 0U; //!< Timeslot for the member, if applicable. }; + /** + * @brief Represents one active console patch. + */ struct PatchRecord { - std::string patchId; - bool active = true; - bool oneWay = false; - std::vector members; + std::string patchId; //!< Console-defined patch ID. + bool active = true; //!< Flag indicating whether the patch is active. + bool oneWay = false; //!< Flag indicating whether the patch is one-way. + std::vector members; //!< Talkgroup members in the patch. }; + /** + * @brief Represents a console peer's complete patch status snapshot. + */ struct PeerPatchSnapshot { - uint32_t peerId = 0U; - std::string peerName; - uint32_t sequence = 0U; - uint64_t updatedAt = 0U; - uint64_t expiresAt = 0U; - std::vector patches; + uint32_t peerId = 0U; //!< Console peer ID. + uint32_t originFnePeerId = 0U; //!< FNE peer ID where this status originated. + std::string peerName; //!< Console peer display name. + uint32_t sequence = 0U; //!< Console-defined sequence number. + uint64_t updatedAt = 0U; //!< Time this snapshot was accepted. + uint64_t expiresAt = 0U; //!< Time this snapshot expires. + std::vector patches; //!< Complete patch list for this console peer. }; + /** + * @brief Gets the current system time in milliseconds. + * @returns uint64_t Current time in milliseconds. + */ static uint64_t nowMs(); + /** + * @brief Normalizes a mode string for key generation. + * @param mode Mode string. + * @returns std::string Normalized mode string. + */ static std::string normalizeMode(const std::string& mode); + /** + * @brief Builds a stable talkgroup lookup key for a patch member. + * @param member Patch member. + * @returns std::string Talkgroup key. + */ static std::string buildTalkgroupKey(const PatchMember& member); + /** + * @brief Serializes a patch member to JSON. + * @param member Patch member. + * @returns json::object JSON patch member. + */ static json::object memberToJson(const PatchMember& member); + /** + * @brief Serializes a patch record to JSON. + * @param patch Patch record. + * @returns json::object JSON patch record. + */ static json::object patchToJson(const PatchRecord& patch); + /** + * @brief Serializes a peer patch snapshot to JSON. + * @param peer Peer patch snapshot. + * @returns json::object JSON peer patch snapshot. + */ static json::object peerSnapshotToJson(const PeerPatchSnapshot& peer); + /** + * @brief Parses one patch record from JSON. + * @param obj JSON patch object. + * @param patch Parsed patch record. + * @param errorMessage Validation error text. + * @returns bool True, if parsed successfully, otherwise false. + */ bool parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const; + /** + * @brief Parses one patch member from JSON. + * @param obj JSON patch member object. + * @param member Parsed patch member. + * @param errorMessage Validation error text. + * @returns bool True, if parsed successfully, otherwise false. + */ bool parseMember(json::object& obj, PatchMember& member, std::string& errorMessage) const; + /** + * @brief Clamps a TTL value into the configured TTL range. + * @param ttlSeconds TTL in seconds. + * @returns uint32_t Clamped TTL in seconds. + */ uint32_t clampTtl(uint32_t ttlSeconds) const; + /** + * @brief Creates a registry snapshot while the registry mutex is held. + * @returns json::object Registry snapshot. + */ json::object snapshotLocked() const; + /** + * @brief Advances the registry revision while the registry mutex is held. + */ void bumpRevisionLocked(); - mutable std::mutex m_mutex; - std::condition_variable m_revisionChanged; - std::unordered_map m_peerPatches; - uint64_t m_revision; - uint32_t m_defaultTtlSeconds; - uint32_t m_minTtlSeconds; - uint32_t m_maxTtlSeconds; + mutable std::mutex m_mutex; //!< Mutex guarding registry state. + std::condition_variable m_revisionChanged; //!< Condition variable signaled when revision changes. + std::map m_peerPatches; //!< Peer patch snapshots keyed by console peer ID. + uint64_t m_revision; //!< Monotonic registry revision. + uint32_t m_defaultTtlSeconds; //!< Default TTL in seconds. + uint32_t m_minTtlSeconds; //!< Minimum TTL in seconds. + uint32_t m_maxTtlSeconds; //!< Maximum TTL in seconds. }; #endif // __FNE_PATCH_STATUS_REGISTRY_H__ diff --git a/src/fne/network/MetadataNetwork.cpp b/src/fne/network/MetadataNetwork.cpp index bc58a09fb..c57024128 100644 --- a/src/fne/network/MetadataNetwork.cpp +++ b/src/fne/network/MetadataNetwork.cpp @@ -419,6 +419,7 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) // The authenticated peer identity is authoritative; do not allow spoofed peer IDs. reqObj["peerId"].set(pktPeerId); + reqObj["originFnePeerId"].set(network->m_peerId); if (!reqObj["peerName"].is() || reqObj["peerName"].get().empty()) reqObj["peerName"].set(connection->identity()); @@ -431,6 +432,7 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) } network->writePatchStatusToConsoles(response); + network->replicatePatchStatus(reqObj); } else { network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); @@ -633,6 +635,89 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) } } } + else if (req->fneHeader.getSubFunction() == NET_SUBFUNC::REPL_PATCH_STATUS) { // Peer Replication Patch Status + if (peerId > 0 && (network->m_peers.find(peerId) != network->m_peers.end())) { + FNEPeerConnection* connection = network->m_peers[peerId]; + if (connection != nullptr) { + std::string ip = udp::Socket::address(req->address); + + // validate peer (simple validation really) + if (connection->connected() && connection->address() == ip && connection->peerClass() == PEER_CONN_CLASS_NEIGHBOR && + connection->isReplica()) { + DECLARE_UINT8_ARRAY(rawPayload, req->length); + ::memcpy(rawPayload, req->buffer, req->length); + + if (mdNetwork->m_peerPatchStatusPkt.find(peerId) == mdNetwork->m_peerPatchStatusPkt.end()) { + mdNetwork->m_peerPatchStatusPkt.insert(peerId, MetadataNetwork::PacketBufferEntry()); + + MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId]; + pkt.buffer = new PacketBuffer(true, "Peer Replication, Patch Status"); + pkt.streamId = streamId; + + pkt.locked = false; + } else { + MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId]; + if (!pkt.locked && pkt.streamId != streamId) { + LogError(LOG_REPL, "PEER %u (%s) Peer Replication, Patch Status, stream ID mismatch, expected %u, got %u", peerId, + connection->identWithQualifier().c_str(), pkt.streamId, streamId); + pkt.buffer->clear(); + pkt.streamId = streamId; + } + + if (pkt.streamId != streamId) { + // otherwise drop the packet + break; + } + } + + MetadataNetwork::PacketBufferEntry& pkt = mdNetwork->m_peerPatchStatusPkt[peerId]; + if (pkt.locked) { + while (pkt.locked) + Thread::sleep(1U); + } + + pkt.locked = true; + + uint32_t decompressedLen = 0U; + uint8_t* decompressed = nullptr; + + if (pkt.buffer->decode(rawPayload, &decompressed, &decompressedLen)) { + mdNetwork->m_peerPatchStatusPkt.lock(); + std::string payload(decompressed + 8U, decompressed + decompressedLen); + + json::value v; + std::string err = json::parse(v, payload); + if (!err.empty() || !v.is()) { + LogError(LOG_REPL, "PEER %u (%s) error parsing patch status replication, %s", peerId, connection->identWithQualifier().c_str(), err.c_str()); + pkt.buffer->clear(); + pkt.streamId = 0U; + if (decompressed != nullptr) { + delete[] decompressed; + } + mdNetwork->m_peerPatchStatusPkt.unlock(); + mdNetwork->m_peerPatchStatusPkt.erase(peerId); + break; + } + + network->processReplicatedPatchStatus(peerId, v.get()); + + pkt.buffer->clear(); + delete pkt.buffer; + pkt.streamId = 0U; + if (decompressed != nullptr) { + delete[] decompressed; + } + mdNetwork->m_peerPatchStatusPkt.unlock(); + mdNetwork->m_peerPatchStatusPkt.erase(peerId); + } else { + pkt.locked = false; + } + } else { + network->writePeerNAK(peerId, 0U, TAG_PEER_REPLICA, NET_CONN_NAK_FNE_UNAUTHORIZED); + } + } + } + } break; case NET_FUNC::NET_TREE: diff --git a/src/fne/network/MetadataNetwork.h b/src/fne/network/MetadataNetwork.h index 62629a2ec..dcc6dda4a 100644 --- a/src/fne/network/MetadataNetwork.h +++ b/src/fne/network/MetadataNetwork.h @@ -117,6 +117,7 @@ namespace network bool locked; }; concurrent::unordered_map m_peerReplicaActPkt; + concurrent::unordered_map m_peerPatchStatusPkt; concurrent::unordered_map m_peerTreeListPkt; ThreadPool m_threadPool; diff --git a/src/fne/network/PeerNetwork.cpp b/src/fne/network/PeerNetwork.cpp index b2d517cb1..477ad9856 100644 --- a/src/fne/network/PeerNetwork.cpp +++ b/src/fne/network/PeerNetwork.cpp @@ -48,6 +48,7 @@ PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t loc m_analogCallback(nullptr), m_netTreeDiscCallback(nullptr), m_peerReplicaCallback(nullptr), + m_patchStatusCallback(nullptr), m_masterPeerId(0U), m_pidLookup(nullptr), m_peerReplica(false), @@ -55,6 +56,7 @@ PeerNetwork::PeerNetwork(const std::string& address, uint16_t port, uint16_t loc m_tgidPkt(true, "Peer Replication, TGID List"), m_ridPkt(true, "Peer Replication, RID List"), m_pidPkt(true, "Peer Replication, PID List"), + m_patchStatusPkt(true, "Peer Replication, Patch Status"), m_threadPool(WORKER_CNT, "peer"), m_prevSpanningTreeChildren(0U), m_nakFallOver(false), @@ -233,6 +235,40 @@ bool PeerNetwork::writeHAParams(std::vector& haParams) return false; } +/* Writes a complete console patch status update upstream. */ + +bool PeerNetwork::writePatchStatus(json::object obj) +{ + if (!m_peerReplica) + return false; + + obj["type"].set("publish"); + json::value v = json::value(obj); + std::string json = std::string(v.serialize()); + + size_t len = json.length() + 9U; + DECLARE_CHAR_ARRAY(buffer, len); + + ::memcpy(buffer + 0U, TAG_PEER_REPLICA, 4U); + ::snprintf(buffer + 8U, json.length() + 1U, "%s", json.c_str()); + + PacketBuffer pkt(true, "Peer Replication, Patch Status"); + pkt.encode((uint8_t*)buffer, len); + + uint32_t streamId = createStreamId(); + LogInfoEx(LOG_REPL, "PEER %u Peer Replication, Patch Status, blocks %u, streamId = %u", m_peerId, pkt.fragments.size(), streamId); + if (pkt.fragments.size() > 0U) { + for (auto frag : pkt.fragments) { + writeMaster({ NET_FUNC::REPL, NET_SUBFUNC::REPL_PATCH_STATUS }, + frag.second->data, FRAG_SIZE, RTP_END_OF_CALL_SEQ, streamId, true); + Thread::sleep(60U); // pace block transmission + } + } + + pkt.clear(); + return true; +} + // --------------------------------------------------------------------------- // Protected Class Members // --------------------------------------------------------------------------- @@ -462,6 +498,32 @@ void PeerNetwork::userPacketHandler(uint32_t peerId, FrameQueue::OpcodePair opco } break; + case NET_SUBFUNC::REPL_PATCH_STATUS: // Patch Status + { + uint32_t decompressedLen = 0U; + uint8_t* decompressed = nullptr; + + if (m_patchStatusPkt.decode(data, &decompressed, &decompressedLen)) { + std::string payload(decompressed + 8U, decompressed + decompressedLen); + + json::value v; + std::string err = json::parse(v, payload); + if (!err.empty() || !v.is()) { + LogError(LOG_PEER, "PEER %u error parsing patch status replication, %s", m_peerId, err.c_str()); + m_patchStatusPkt.clear(); + delete[] decompressed; + break; + } + + if (m_patchStatusCallback != nullptr) + m_patchStatusCallback(this, v.get()); + + m_patchStatusPkt.clear(); + delete[] decompressed; + } + } + break; + default: break; } diff --git a/src/fne/network/PeerNetwork.h b/src/fne/network/PeerNetwork.h index 48fbf40be..74f3c8a46 100644 --- a/src/fne/network/PeerNetwork.h +++ b/src/fne/network/PeerNetwork.h @@ -17,6 +17,7 @@ #define __PEER_NETWORK_H__ #include "Defines.h" +#include "common/json/json.h" #include "common/lookups/PeerListLookup.h" #include "common/network/Network.h" #include "common/network/PacketBuffer.h" @@ -138,9 +139,14 @@ namespace network void setNetTreeDiscCallback(std::function&& callback) { m_netTreeDiscCallback = callback; } /** * @brief Helper to set the peer replica notification callback. - * @param callback + * @param callback */ void setNotifyPeerReplicaCallback(std::function&& callback) { m_peerReplicaCallback = callback; } + /** + * @brief Helper to set the peer patch status callback. + * @param callback + */ + void setPatchStatusCallback(std::function&& callback) { m_patchStatusCallback = callback; } /** * @brief Writes a complete update of this CFNE's active peer list to the network. @@ -235,6 +241,26 @@ namespace network * @returns bool True, if list was sent, otherwise false. */ bool writeHAParams(std::vector& haParams); + /** + * @brief Writes a complete console patch status update upstream. + * \code{.unparsed} + * The patch status replication message is a JSON body, and is a packet + * buffer compressed message. + * + * { + * "type": "publish", + * "peerId": , + * "originFnePeerId": , + * "peerName": "", + * "sequence": , + * "ttlSeconds": , + * "patches": [] + * } + * \endcode + * @param obj JSON patch status publish payload. + * @returns bool True, if patch status was sent, otherwise false. + */ + bool writePatchStatus(json::object obj); /** * @brief Returns flag indicating whether or not this peer connection is peer replication enabled. @@ -298,6 +324,10 @@ namespace network * @brief Peer Replica Notification Callback. */ std::function m_peerReplicaCallback; + /** + * @brief Peer patch status callback. + */ + std::function m_patchStatusCallback; /** * @brief User overrideable handler that allows user code to process network packets not handled by this class. @@ -336,6 +366,7 @@ namespace network PacketBuffer m_tgidPkt; PacketBuffer m_ridPkt; PacketBuffer m_pidPkt; + PacketBuffer m_patchStatusPkt; ThreadPool m_threadPool; diff --git a/src/fne/network/TrafficNetwork.cpp b/src/fne/network/TrafficNetwork.cpp index ab06bb227..16b23cf34 100644 --- a/src/fne/network/TrafficNetwork.cpp +++ b/src/fne/network/TrafficNetwork.cpp @@ -532,6 +532,27 @@ void TrafficNetwork::processNetworkTreeDisconnect(uint32_t peerId, uint32_t offe } } +/* Processes a replicated console patch status update. */ + +void TrafficNetwork::processReplicatedPatchStatus(uint32_t peerId, json::object obj) +{ + if (!m_patchStatusEnabled) + return; + + if (!obj["peerId"].is() || !obj["patches"].is()) + return; + + json::object response = json::object(); + std::string errorMessage; + if (!m_patchStatusRegistry.publish(obj, response, errorMessage)) { + LogWarning(LOG_MASTER, "PEER %u invalid replicated patch status payload, %s", peerId, errorMessage.c_str()); + return; + } + + writePatchStatusToConsoles(response); + replicatePatchStatus(obj, peerId); +} + /* Helper to process an downstream peer In-Call Control message. */ void TrafficNetwork::processDownstreamInCallCtrl(network::NET_ICC::ENUM command, network::NET_SUBFUNC::ENUM subFunc, uint32_t dstId, @@ -2324,8 +2345,18 @@ void TrafficNetwork::erasePeer(uint32_t peerId) } // erase any console patch status records for this peer - if (m_patchStatusEnabled && m_patchStatusRegistry.removePeer(peerId)) + if (m_patchStatusEnabled && m_patchStatusRegistry.removePeer(peerId)) { + json::object clearPatchStatus = json::object(); + uint32_t originFnePeerId = m_peerId; + uint32_t ttlSeconds = m_patchStatusRegistry.defaultTtlSeconds(); + clearPatchStatus["type"].set("publish"); + clearPatchStatus["peerId"].set(peerId); + clearPatchStatus["originFnePeerId"].set(originFnePeerId); + clearPatchStatus["ttlSeconds"].set(ttlSeconds); + clearPatchStatus["patches"].set(json::array()); writePatchStatusToConsoles(m_patchStatusRegistry.snapshot()); + replicatePatchStatus(clearPatchStatus); + } // erase any HA parameters for this peer { @@ -2476,6 +2507,38 @@ void TrafficNetwork::writePatchStatusToConsoles(json::object obj, uint32_t excep m_peers.shared_unlock(); } +/* Helper to replicate patch status state to neighboring FNE peers. */ + +void TrafficNetwork::replicatePatchStatus(json::object obj, uint32_t exceptPeerId) +{ + if (!m_patchStatusEnabled) + return; + + obj["type"].set("publish"); + + if (m_host->m_peerNetworks.size() > 0U) { + for (auto peer : m_host->m_peerNetworks) { + if (peer.first == exceptPeerId) + continue; + if (peer.second != nullptr && peer.second->isEnabled() && peer.second->isReplica()) + peer.second->writePatchStatus(obj); + } + } + + m_peers.shared_lock(); + for (auto peer : m_peers) { + if (peer.first == exceptPeerId) + continue; + if (peer.second == nullptr) + continue; + if (!peer.second->connected() || peer.second->peerClass() != PEER_CONN_CLASS_NEIGHBOR || !peer.second->isReplica()) + continue; + + writePatchStatusReplicationPayload(peer.second, obj); + } + m_peers.shared_unlock(); +} + /* Helper to serialize and queue a patch status transfer payload. */ bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json::object obj) @@ -2513,6 +2576,47 @@ bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json { NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen); } +/* Helper to serialize and queue a patch status replication payload. */ + +bool TrafficNetwork::writePatchStatusReplicationPayload(FNEPeerConnection* connection, json::object obj) +{ + if (connection == nullptr) + return false; + if (!m_patchStatusEnabled) + return false; + if (!connection->connected()) + return false; + if (connection->peerClass() != PEER_CONN_CLASS_NEIGHBOR || !connection->isReplica()) + return false; + + obj["type"].set("publish"); + json::value v = json::value(obj); + std::string json = std::string(v.serialize()); + + size_t len = json.length() + 9U; + DECLARE_CHAR_ARRAY(buffer, len); + + ::memcpy(buffer + 0U, TAG_PEER_REPLICA, 4U); + ::snprintf(buffer + 8U, json.length() + 1U, "%s", json.c_str()); + + PacketBuffer pkt(true, "Peer Replication, Patch Status"); + pkt.encode((uint8_t*)buffer, len); + + uint32_t streamId = createStreamId(); + LogInfoEx(LOG_REPL, "PEER %u (%s) Peer Replication, Patch Status, blocks %u, streamId = %u", connection->id(), + connection->identWithQualifier().c_str(), pkt.fragments.size(), streamId); + if (pkt.fragments.size() > 0U) { + for (auto frag : pkt.fragments) { + writePeer(connection->id(), m_peerId, { NET_FUNC::REPL, NET_SUBFUNC::REPL_PATCH_STATUS }, + frag.second->data, FRAG_SIZE, RTP_END_OF_CALL_SEQ, streamId); + Thread::sleep(60U); // pace block transmission + } + } + + pkt.clear(); + return true; +} + /* Helper to reset a peer connection. */ bool TrafficNetwork::resetPeer(uint32_t peerId) diff --git a/src/fne/network/TrafficNetwork.h b/src/fne/network/TrafficNetwork.h index f234df879..1a1a9cccb 100644 --- a/src/fne/network/TrafficNetwork.h +++ b/src/fne/network/TrafficNetwork.h @@ -237,6 +237,12 @@ namespace network * @param offendingPeerId Offending Peer ID. */ void processNetworkTreeDisconnect(uint32_t peerId, uint32_t offendingPeerId); + /** + * @brief Processes a replicated console patch status update. + * @param peerId Peer ID that delivered the replication update. + * @param obj Patch status JSON payload. + */ + void processReplicatedPatchStatus(uint32_t peerId, json::object obj); /** * @brief Helper to process an downstream peer In-Call Control message. @@ -298,6 +304,12 @@ namespace network * @param exceptPeerId Optional peer ID to skip. */ void writePatchStatusToConsoles(json::object obj, uint32_t exceptPeerId = 0U); + /** + * @brief Replicates patch status state to neighboring FNE peers. + * @param obj Patch status JSON payload. + * @param exceptPeerId Optional peer ID to skip. + */ + void replicatePatchStatus(json::object obj, uint32_t exceptPeerId = 0U); /** * @brief Helper to reset a peer connection. @@ -827,6 +839,13 @@ namespace network * @returns bool True, if message was queued, otherwise false. */ bool writePatchStatusPayload(FNEPeerConnection* connection, json::object obj); + /** + * @brief Serializes and queues a patch status replication payload. + * @param connection Destination neighbor connection. + * @param obj Patch status JSON payload. + * @returns bool True, if message was queued, otherwise false. + */ + bool writePatchStatusReplicationPayload(FNEPeerConnection* connection, json::object obj); /* ** Internal KMM Callback. From 9c62b8222257f518a4e608c61e4b3ebec917beb0 Mon Sep 17 00:00:00 2001 From: "Lorenzo L. Romero" Date: Fri, 26 Jun 2026 21:03:02 -0400 Subject: [PATCH 3/3] Route patch status replication over metadata network Move console patch status fanout and downstream FNE replication off the traffic network path and onto the metadata connection. Add registry change detection so duplicate or stale replicated snapshots do not advance revisions or get re-broadcast. --- src/fne/PatchStatusRegistry.cpp | 75 +++++++++++++++++++++++++++-- src/fne/PatchStatusRegistry.h | 5 +- src/fne/network/MetadataNetwork.cpp | 34 +++++++++++-- src/fne/network/MetadataNetwork.h | 3 ++ src/fne/network/TrafficNetwork.cpp | 19 +++++--- 5 files changed, 120 insertions(+), 16 deletions(-) diff --git a/src/fne/PatchStatusRegistry.cpp b/src/fne/PatchStatusRegistry.cpp index eb5870dc0..afa75c9c1 100644 --- a/src/fne/PatchStatusRegistry.cpp +++ b/src/fne/PatchStatusRegistry.cpp @@ -53,8 +53,11 @@ void PatchStatusRegistry::configure(uint32_t defaultTtlSeconds, uint32_t minTtlS /* Publishes a complete patch snapshot for one console peer. */ -bool PatchStatusRegistry::publish(json::object& request, json::object& response, std::string& errorMessage) +bool PatchStatusRegistry::publish(json::object& request, json::object& response, std::string& errorMessage, bool* changed) { + if (changed != nullptr) + *changed = false; + if (!request["peerId"].is()) { errorMessage = "peerId was not a valid integer"; return false; @@ -109,18 +112,35 @@ bool PatchStatusRegistry::publish(json::object& request, json::object& response, { std::lock_guard guard(m_mutex); auto existing = m_peerPatches.find(incoming.peerId); - if (existing != m_peerPatches.end() && incoming.sequence > 0U && existing->second.sequence > incoming.sequence) { + if (existing != m_peerPatches.end() && incoming.sequence > 0U && existing->second.sequence >= incoming.sequence && + existing->second.originFnePeerId == incoming.originFnePeerId) { response = snapshotLocked(); response["acceptedPeerId"].set(incoming.peerId); response["ttlSeconds"].set(ttlSeconds); return true; } - if (incoming.patches.empty()) - m_peerPatches.erase(incoming.peerId); - else + if (existing != m_peerPatches.end() && peerSnapshotsEqual(existing->second, incoming)) { + response = snapshotLocked(); + response["acceptedPeerId"].set(incoming.peerId); + response["ttlSeconds"].set(ttlSeconds); + return true; + } + + if (incoming.patches.empty()) { + if (m_peerPatches.erase(incoming.peerId) == 0U) { + response = snapshotLocked(); + response["acceptedPeerId"].set(incoming.peerId); + response["ttlSeconds"].set(ttlSeconds); + return true; + } + } + else { m_peerPatches[incoming.peerId] = incoming; + } bumpRevisionLocked(); + if (changed != nullptr) + *changed = true; response = snapshotLocked(); response["acceptedPeerId"].set(incoming.peerId); @@ -320,6 +340,51 @@ json::object PatchStatusRegistry::peerSnapshotToJson(const PeerPatchSnapshot& pe return obj; } +/* Compares two patch members for logical equality. */ + +bool PatchStatusRegistry::patchMembersEqual(const PatchMember& lhs, const PatchMember& rhs) +{ + return lhs.system == rhs.system && + lhs.mode == rhs.mode && + lhs.tgid == rhs.tgid && + lhs.slot == rhs.slot; +} + +/* Compares two patch records for logical equality. */ + +bool PatchStatusRegistry::patchRecordsEqual(const PatchRecord& lhs, const PatchRecord& rhs) +{ + if (lhs.patchId != rhs.patchId || lhs.active != rhs.active || lhs.oneWay != rhs.oneWay || lhs.members.size() != rhs.members.size()) + return false; + + for (size_t i = 0U; i < lhs.members.size(); i++) { + if (!patchMembersEqual(lhs.members[i], rhs.members[i])) + return false; + } + + return true; +} + +/* Compares two peer snapshots for logical equality. */ + +bool PatchStatusRegistry::peerSnapshotsEqual(const PeerPatchSnapshot& lhs, const PeerPatchSnapshot& rhs) +{ + if (lhs.peerId != rhs.peerId || + lhs.originFnePeerId != rhs.originFnePeerId || + lhs.peerName != rhs.peerName || + lhs.sequence != rhs.sequence || + lhs.patches.size() != rhs.patches.size()) { + return false; + } + + for (size_t i = 0U; i < lhs.patches.size(); i++) { + if (!patchRecordsEqual(lhs.patches[i], rhs.patches[i])) + return false; + } + + return true; +} + /* Parses one patch record from JSON. */ bool PatchStatusRegistry::parsePatch(json::object& obj, PatchRecord& patch, std::string& errorMessage) const diff --git a/src/fne/PatchStatusRegistry.h b/src/fne/PatchStatusRegistry.h index 3c8ee02ca..4d6472477 100644 --- a/src/fne/PatchStatusRegistry.h +++ b/src/fne/PatchStatusRegistry.h @@ -77,7 +77,7 @@ class HOST_SW_API PatchStatusRegistry { * @param errorMessage Validation error text populated when the request is invalid. * @returns bool True, if the publish request was valid and applied, otherwise false. */ - bool publish(json::object& request, json::object& response, std::string& errorMessage); + bool publish(json::object& request, json::object& response, std::string& errorMessage, bool* changed = nullptr); /** * @brief Removes all patch records associated with a console peer. * @param peerId Console peer ID whose records should be removed. @@ -193,6 +193,9 @@ class HOST_SW_API PatchStatusRegistry { * @returns json::object JSON peer patch snapshot. */ static json::object peerSnapshotToJson(const PeerPatchSnapshot& peer); + static bool patchMembersEqual(const PatchMember& lhs, const PatchMember& rhs); + static bool patchRecordsEqual(const PatchRecord& lhs, const PatchRecord& rhs); + static bool peerSnapshotsEqual(const PeerPatchSnapshot& lhs, const PeerPatchSnapshot& rhs); /** * @brief Parses one patch record from JSON. diff --git a/src/fne/network/MetadataNetwork.cpp b/src/fne/network/MetadataNetwork.cpp index b0356b98a..606d0decc 100644 --- a/src/fne/network/MetadataNetwork.cpp +++ b/src/fne/network/MetadataNetwork.cpp @@ -164,6 +164,31 @@ void MetadataNetwork::close() m_status = NET_STAT_INVALID; } +/* Helper to send a metadata message to a peer's metadata port. */ + +bool MetadataNetwork::writePeerMetadata(FNEPeerConnection* connection, uint32_t ssrc, FrameQueue::OpcodePair opcode, const uint8_t* data, + uint32_t length, uint16_t pktSeq, uint32_t streamId) const +{ + if (connection == nullptr) + return false; + if (m_status != NET_STAT_MST_RUNNING) + return false; + if (m_frameQueue == nullptr) + return false; + + sockaddr_storage addr; + uint32_t addrLen = 0U; + uint16_t port = connection->port() + 1U; + + if (udp::Socket::lookup(connection->address(), port, addr, addrLen) != 0) { + LogWarning(LOG_NET, "PEER %u (%s) failed to resolve metadata endpoint %s:%u", connection->id(), + connection->identWithQualifier().c_str(), connection->address().c_str(), port); + return false; + } + + return m_frameQueue->write(data, length, streamId, connection->id(), ssrc, opcode, pktSeq, addr, addrLen); +} + // --------------------------------------------------------------------------- // Private Class Members // --------------------------------------------------------------------------- @@ -433,14 +458,17 @@ void MetadataNetwork::taskNetworkRx(NetPacketRequest* req) json::object response = json::object(); std::string errorMessage; - if (!network->patchStatusRegistry().publish(reqObj, response, errorMessage)) { + bool changed = false; + if (!network->patchStatusRegistry().publish(reqObj, response, errorMessage, &changed)) { LogWarning(LOG_MASTER, "PEER %u (%s) invalid patch status payload, %s", pktPeerId, connection->identWithQualifier().c_str(), errorMessage.c_str()); network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_ILLEGAL_PACKET); break; } - network->writePatchStatusToConsoles(response); - network->replicatePatchStatus(reqObj); + if (changed) { + network->writePatchStatusToConsoles(response); + network->replicatePatchStatus(reqObj); + } } else { network->writePeerNAK(pktPeerId, network->createStreamId(), TAG_TRANSFER_PATCH_STATUS, NET_CONN_NAK_FNE_UNAUTHORIZED); diff --git a/src/fne/network/MetadataNetwork.h b/src/fne/network/MetadataNetwork.h index 4d5160750..b0864c75e 100644 --- a/src/fne/network/MetadataNetwork.h +++ b/src/fne/network/MetadataNetwork.h @@ -91,6 +91,9 @@ namespace network private: friend class TrafficNetwork; + bool writePeerMetadata(FNEPeerConnection* connection, uint32_t ssrc, FrameQueue::OpcodePair opcode, const uint8_t* data, + uint32_t length, uint16_t pktSeq, uint32_t streamId) const; + TrafficNetwork* m_trafficNetwork; HostFNE* m_host; diff --git a/src/fne/network/TrafficNetwork.cpp b/src/fne/network/TrafficNetwork.cpp index a358770f7..4468d1b83 100644 --- a/src/fne/network/TrafficNetwork.cpp +++ b/src/fne/network/TrafficNetwork.cpp @@ -553,11 +553,15 @@ void TrafficNetwork::processReplicatedPatchStatus(uint32_t peerId, json::object json::object response = json::object(); std::string errorMessage; - if (!m_patchStatusRegistry.publish(obj, response, errorMessage)) { + bool changed = false; + if (!m_patchStatusRegistry.publish(obj, response, errorMessage, &changed)) { LogWarning(LOG_MASTER, "PEER %u invalid replicated patch status payload, %s", peerId, errorMessage.c_str()); return; } + if (!changed) + return; + writePatchStatusToConsoles(response); replicatePatchStatus(obj, peerId); } @@ -2525,6 +2529,8 @@ bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json return false; if (!m_patchStatusEnabled) return false; + if (m_host->m_mdNetwork == nullptr) + return false; if (!connection->connected()) return false; if (connection->peerClass() != PEER_CONN_CLASS_CONSOLE) @@ -2543,15 +2549,12 @@ bool TrafficNetwork::writePatchStatusPayload(FNEPeerConnection* connection, json ::memset(buffer, 0x00U, DATA_PACKET_LENGTH); ::memcpy(buffer + 11U, payload.c_str(), len); - sockaddr_storage addr = connection->socketStorage(); - uint32_t addrLen = connection->sockStorageLen(); - if (m_debug) { LogDebug(LOG_MASTER, "PEER %u (%s) sending patch status registry, len = %u", connection->id(), connection->identWithQualifier().c_str(), len); } - return m_frameQueue->write(buffer, len + 11U, createStreamId(), connection->id(), m_peerId, - { NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS }, RTP_END_OF_CALL_SEQ, addr, addrLen); + return m_host->m_mdNetwork->writePeerMetadata(connection, m_peerId, + { NET_FUNC::TRANSFER, NET_SUBFUNC::TRANSFER_SUBFUNC_PATCH_STATUS }, buffer, len + 11U, RTP_END_OF_CALL_SEQ, createStreamId()); } /* Helper to serialize and queue a patch status replication payload. */ @@ -2562,6 +2565,8 @@ bool TrafficNetwork::writePatchStatusReplicationPayload(FNEPeerConnection* conne return false; if (!m_patchStatusEnabled) return false; + if (m_host->m_mdNetwork == nullptr) + return false; if (!connection->connected()) return false; if (connection->peerClass() != PEER_CONN_CLASS_NEIGHBOR || !connection->isReplica()) @@ -2585,7 +2590,7 @@ bool TrafficNetwork::writePatchStatusReplicationPayload(FNEPeerConnection* conne connection->identWithQualifier().c_str(), pkt.fragments.size(), streamId); if (pkt.fragments.size() > 0U) { for (auto frag : pkt.fragments) { - writePeer(connection->id(), m_peerId, { NET_FUNC::REPL, NET_SUBFUNC::REPL_PATCH_STATUS }, + m_host->m_mdNetwork->writePeerMetadata(connection, m_peerId, { NET_FUNC::REPL, NET_SUBFUNC::REPL_PATCH_STATUS }, frag.second->data, FRAG_SIZE, RTP_END_OF_CALL_SEQ, streamId); Thread::sleep(60U); // pace block transmission }