Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions include/livekit/room.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
#pragma once

#include <cstdint>
#include <future>
#include <memory>
#include <mutex>

#include "livekit/data_stream.h"
#include "livekit/e2ee.h"
#include "livekit/ffi_handle.h"
#include "livekit/result.h"
#include "livekit/room_event_types.h"
#include "livekit/stats.h"
#include "livekit/subscription_thread_dispatcher.h"
#include "livekit/visibility.h"

Expand Down Expand Up @@ -187,6 +190,20 @@ class LIVEKIT_API Room {
/// Returns the current connection state of the room.
ConnectionState connectionState() const;

/// Retrieve aggregated WebRTC stats for this room session.
///
/// Behavior:
/// - If the room is not currently connected, returns a failed result immediately.
/// - Otherwise dispatches an async request to the server to get the stats.
///
/// @note Check `result.ok()` before accessing the stats. The error variant
/// is a free-form diagnostic string; treat it as opaque (suitable for logs/
/// metrics, not for programmatic branching). The Rust FFI does not yet
/// surface a typed error code for this operation; see `cb.error()` plumbing
/// in `FfiClient::getSessionStatsAsync` for the source.
/// @return Future result of the room session stats.
std::future<Result<SessionStats, std::string>> getStats() const;

/* Register a handler for incoming text streams on a specific topic.
*
* When a remote participant opens a text stream with the given topic,
Expand Down
8 changes: 8 additions & 0 deletions include/livekit/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,14 @@ struct RtcStats {
RtcStatsVariant stats;
};

/// Aggregated WebRTC stats for a connected room session.
struct SessionStats {
/// Stats from the publisher peer connection (outbound media).
std::vector<RtcStats> publisher_stats;
/// Stats from the subscriber peer connection (inbound media).
std::vector<RtcStats> subscriber_stats;
};

// ----------------------
// fromProto declarations
// ----------------------
Expand Down
63 changes: 63 additions & 0 deletions src/ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,69 @@ std::future<std::vector<RtcStats>> FfiClient::getTrackStatsAsync(uintptr_t track
return fut;
}

namespace {

std::future<Result<SessionStats, std::string>> readySessionStatsFailure(std::string message) {
std::promise<Result<SessionStats, std::string>> pr;
pr.set_value(Result<SessionStats, std::string>::failure(std::move(message)));
return pr.get_future();
}

} // namespace

std::future<Result<SessionStats, std::string>> FfiClient::getSessionStatsAsync(uintptr_t room_handle) {
const AsyncId async_id = generateAsyncId();

auto fut = registerAsync<Result<SessionStats, std::string>>(
async_id,
// match
[async_id](const proto::FfiEvent& event) {
return event.has_get_session_stats() && event.get_session_stats().async_id() == async_id;
},
// handler
[](const proto::FfiEvent& event, std::promise<Result<SessionStats, std::string>>& pr) {
const auto& cb = event.get_session_stats();
if (cb.has_error()) {
pr.set_value(Result<SessionStats, std::string>::failure(cb.error()));
return;
}
if (!cb.has_result()) {
pr.set_value(Result<SessionStats, std::string>::failure("GetSessionStatsCallback missing result and error"));
return;
}

const auto& result = cb.result();
SessionStats stats;
stats.publisher_stats.reserve(result.publisher_stats_size());
for (const auto& ps : result.publisher_stats()) {
stats.publisher_stats.push_back(fromProto(ps));
}
stats.subscriber_stats.reserve(result.subscriber_stats_size());
for (const auto& ps : result.subscriber_stats()) {
stats.subscriber_stats.push_back(fromProto(ps));
}
pr.set_value(Result<SessionStats, std::string>::success(std::move(stats)));
});

proto::FfiRequest req;
auto* get_session_stats_req = req.mutable_get_session_stats();
get_session_stats_req->set_room_handle(room_handle);
get_session_stats_req->set_request_async_id(async_id);

try {
const proto::FfiResponse resp = sendRequest(req);
if (!resp.has_get_session_stats()) {
cancelPendingByAsyncId(async_id);
return readySessionStatsFailure("FfiResponse missing get_session_stats");
}
} catch (const std::exception& e) {
cancelPendingByAsyncId(async_id);
return readySessionStatsFailure(e.what());
}

return fut;
}

// Participant APIs Implementation
std::future<proto::OwnedTrackPublication> FfiClient::publishTrackAsync(std::uint64_t local_participant_handle,
std::uint64_t track_handle,
Expand Down
4 changes: 4 additions & 0 deletions src/ffi_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class LIVEKIT_INTERNAL_API FfiClient {
// Track APIs
std::future<std::vector<RtcStats>> getTrackStatsAsync(uintptr_t track_handle);

// Room APIs (stats). On failure, the error variant carries an
// implementation-defined diagnostic string (suitable for logs/metrics).
std::future<Result<SessionStats, std::string>> getSessionStatsAsync(uintptr_t room_handle);

// Participant APIs
std::future<proto::OwnedTrackPublication> publishTrackAsync(std::uint64_t local_participant_handle,
std::uint64_t track_handle,
Expand Down
14 changes: 14 additions & 0 deletions src/room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,20 @@ ConnectionState Room::connectionState() const {
return connection_state_;
}

std::future<Result<SessionStats, std::string>> Room::getStats() const {
std::shared_ptr<FfiHandle> handle;
{
const std::scoped_lock<std::mutex> g(lock_);
handle = room_handle_;
}
if (!handle) {
std::promise<Result<SessionStats, std::string>> pr;
pr.set_value(Result<SessionStats, std::string>::failure("Room is not connected"));
return pr.get_future();
}
return FfiClient::instance().getSessionStatsAsync(handle->get());
}

E2EEManager* Room::e2eeManager() const {
const std::scoped_lock<std::mutex> g(lock_);
return e2ee_manager_.get();
Expand Down
207 changes: 207 additions & 0 deletions src/tests/integration/test_session_stats.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright 2026 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iomanip>
#include <iostream>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include "../common/audio_utils.h"
#include "../common/test_common.h"

namespace livekit::test {

using namespace std::chrono_literals;

namespace {

constexpr int kAudioSampleRate = kDefaultAudioSampleRate;
constexpr int kAudioChannels = kDefaultAudioChannels;

/// Time to let media flow before sampling stats; below this the RTP counters
/// are typically empty and the printed output is uninteresting.
constexpr auto kStatsWarmup = 5s;

const char* rtcStatsTypeName(const RtcStats& s) {
return std::visit(
[](const auto& v) -> const char* {
using T = std::decay_t<decltype(v)>;
if constexpr (std::is_same_v<T, RtcCodecStats>) {
return "Codec";
} else if constexpr (std::is_same_v<T, RtcInboundRtpStats>) {
return "InboundRtp";
} else if constexpr (std::is_same_v<T, RtcOutboundRtpStats>) {
return "OutboundRtp";
} else if constexpr (std::is_same_v<T, RtcRemoteInboundRtpStats>) {
return "RemoteInboundRtp";
} else if constexpr (std::is_same_v<T, RtcRemoteOutboundRtpStats>) {
return "RemoteOutboundRtp";
} else if constexpr (std::is_same_v<T, RtcMediaSourceStats>) {
return "MediaSource";
} else if constexpr (std::is_same_v<T, RtcMediaPlayoutStats>) {
return "MediaPlayout";
} else if constexpr (std::is_same_v<T, RtcPeerConnectionStats>) {
return "PeerConnection";
} else if constexpr (std::is_same_v<T, RtcDataChannelStats>) {
return "DataChannel";
} else if constexpr (std::is_same_v<T, RtcTransportStats>) {
return "Transport";
} else if constexpr (std::is_same_v<T, RtcCandidatePairStats>) {
return "CandidatePair";
} else if constexpr (std::is_same_v<T, RtcLocalCandidateStats>) {
return "LocalCandidate";
} else if constexpr (std::is_same_v<T, RtcRemoteCandidateStats>) {
return "RemoteCandidate";
} else if constexpr (std::is_same_v<T, RtcCertificateStats>) {
return "Certificate";
} else if constexpr (std::is_same_v<T, RtcStreamStats>) {
return "Stream";
} else {
return "Unknown";
}
},
s.stats);
}

void dumpInterestingEntries(const std::vector<RtcStats>& stats) {
for (const auto& stat : stats) {
std::visit(
[&](const auto& s) {
using T = std::decay_t<decltype(s)>;
if constexpr (std::is_same_v<T, RtcOutboundRtpStats>) {
std::cout << " [OutboundRtp] id=" << s.rtc.id << " kind=" << s.stream.kind
<< " packets_sent=" << s.sent.packets_sent << " bytes_sent=" << s.sent.bytes_sent
<< " target_bitrate=" << std::fixed << std::setprecision(2) << s.outbound.target_bitrate
<< std::endl;
} else if constexpr (std::is_same_v<T, RtcInboundRtpStats>) {
std::cout << " [InboundRtp] id=" << s.rtc.id << " kind=" << s.stream.kind
<< " packets_received=" << s.received.packets_received
<< " packets_lost=" << s.received.packets_lost << " jitter=" << std::fixed << std::setprecision(6)
<< s.received.jitter << " bytes_received=" << s.inbound.bytes_received << std::endl;
} else if constexpr (std::is_same_v<T, RtcCandidatePairStats>) {
std::cout << " [CandidatePair] id=" << s.rtc.id << " rtt=" << std::fixed << std::setprecision(4)
<< s.candidate_pair.current_round_trip_time << "s"
<< " in_bitrate=" << s.candidate_pair.available_incoming_bitrate
<< " out_bitrate=" << s.candidate_pair.available_outgoing_bitrate
<< " bytes_sent=" << s.candidate_pair.bytes_sent
<< " bytes_received=" << s.candidate_pair.bytes_received << std::endl;
} else if constexpr (std::is_same_v<T, RtcTransportStats>) {
std::cout << " [Transport] id=" << s.rtc.id << " packets_sent=" << s.transport.packets_sent
<< " packets_received=" << s.transport.packets_received
<< " bytes_sent=" << s.transport.bytes_sent << " bytes_received=" << s.transport.bytes_received
<< std::endl;
} else if constexpr (std::is_same_v<T, RtcPeerConnectionStats>) {
std::cout << " [PeerConnection] id=" << s.rtc.id
<< " data_channels_opened=" << s.pc.data_channels_opened
<< " data_channels_closed=" << s.pc.data_channels_closed << std::endl;
}
},
stat.stats);
}
}

void printSide(const std::string& side_label, const std::vector<RtcStats>& stats) {
std::cout << " " << side_label << " entries=" << stats.size();
std::map<std::string, int> type_counts;
for (const auto& s : stats) {
type_counts[rtcStatsTypeName(s)]++;
}
if (!type_counts.empty()) {
std::cout << " types:";
for (const auto& kv : type_counts) {
std::cout << " " << kv.first << "=" << kv.second;
}
}
std::cout << std::endl;
dumpInterestingEntries(stats);
}

void printSessionStats(const std::string& room_label, const SessionStats& stats) {
std::cout << "[SessionStats] " << room_label << ":" << std::endl;
printSide("publisher", stats.publisher_stats);
printSide("subscriber", stats.subscriber_stats);
}

} // namespace

class SessionStatsIntegrationTest : public LiveKitTestBase {};

TEST_F(SessionStatsIntegrationTest, PublishAudioThenFetchSessionStats) {
skipIfNotConfigured();

RoomOptions options;
options.auto_subscribe = true;
options.single_peer_connection = false;

auto receiver_room = std::make_unique<Room>();
ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect";

auto sender_room = std::make_unique<Room>();
ASSERT_TRUE(sender_room->connect(config_.url, config_.token_a, options)) << "Sender failed to connect";

auto source = std::make_shared<AudioSource>(kAudioSampleRate, kAudioChannels, 0);
auto track = LocalAudioTrack::createLocalAudioTrack("session-stats-audio", source);
TrackPublishOptions opts;
opts.source = TrackSource::SOURCE_MICROPHONE;
sender_room->localParticipant()->publishTrack(track, opts);
std::cerr << "[SessionStats] published audio track sid=" << track->sid() << std::endl;

std::atomic<bool> running{true};
std::thread audio_thread([&]() { runToneLoop(source, running, /*base_freq_hz=*/440.0, /*siren_mode=*/false); });

std::this_thread::sleep_for(kStatsWarmup);

auto sender_fut = sender_room->getStats();
auto receiver_fut = receiver_room->getStats();

auto sender_result = sender_fut.get();
auto receiver_result = receiver_fut.get();

running.store(false, std::memory_order_relaxed);
if (audio_thread.joinable()) {
audio_thread.join();
}
if (track->publication()) {
sender_room->localParticipant()->unpublishTrack(track->publication()->sid());
}

ASSERT_TRUE(sender_result.ok()) << "Sender getStats failed: " << sender_result.error();
ASSERT_TRUE(receiver_result.ok()) << "Receiver getStats failed: " << receiver_result.error();

printSessionStats("sender", sender_result.value());
printSessionStats("receiver", receiver_result.value());

EXPECT_FALSE(sender_result.value().publisher_stats.empty()) << "Sender should have publisher stats";
EXPECT_FALSE(receiver_result.value().subscriber_stats.empty()) << "Receiver should have subscriber stats";
}

TEST_F(SessionStatsIntegrationTest, NotConnectedReturnsError) {
Room room;
auto fut = room.getStats();
auto result = fut.get();
EXPECT_FALSE(result.ok());
EXPECT_FALSE(result.error().empty());
std::cerr << "[SessionStats] disconnected message: " << result.error() << std::endl;
}

} // namespace livekit::test
9 changes: 9 additions & 0 deletions src/tests/unit/test_ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,15 @@ TEST_F(FfiClientTest, NotInitialized_GetTrackStatsAsyncThrows) {
EXPECT_THROW(FfiClient::instance().getTrackStatsAsync(1), std::runtime_error);
}

TEST_F(FfiClientTest, NotInitialized_GetSessionStatsAsyncFails) {
ASSERT_FALSE(FfiClient::instance().isInitialized());

auto fut_result = FfiClient::instance().getSessionStatsAsync(1);
auto result = fut_result.get();
EXPECT_FALSE(result.ok());
EXPECT_FALSE(result.error().empty());
}

TEST_F(FfiClientTest, NotInitialized_PublishDataTrackAsyncFails) {
ASSERT_FALSE(FfiClient::instance().isInitialized());

Expand Down
Loading