diff --git a/src/tests/integration/test_media_multistream.cpp b/src/tests/integration/test_media_multistream.cpp new file mode 100644 index 00000000..dbcfc725 --- /dev/null +++ b/src/tests/integration/test_media_multistream.cpp @@ -0,0 +1,373 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../common/test_common.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace livekit { +namespace test { + +using namespace std::chrono_literals; + +namespace { + +constexpr int kVideoWidth = 640; +constexpr int kVideoHeight = 360; +constexpr int kAudioSampleRate = 48000; +constexpr int kAudioChannels = 1; +constexpr int kAudioFrameMs = 10; +constexpr int kSamplesPerChannel = kAudioSampleRate * kAudioFrameMs / 1000; + +struct MediaSubscriptionState { + std::mutex mutex; + std::condition_variable cv; + std::set subscribed_track_names; + int audio_tracks = 0; + int video_tracks = 0; +}; + +class MediaTrackCollectorDelegate : public RoomDelegate { +public: + explicit MediaTrackCollectorDelegate(MediaSubscriptionState &state) + : state_(state) {} + + void onTrackSubscribed(Room &, const TrackSubscribedEvent &event) override { + std::lock_guard lock(state_.mutex); + std::string name = ""; + std::string sid = ""; + if (event.track) { + if (event.track->kind() == TrackKind::KIND_AUDIO) { + state_.audio_tracks++; + } else if (event.track->kind() == TrackKind::KIND_VIDEO) { + state_.video_tracks++; + } + sid = event.track->sid(); + } + if (event.publication) { + name = event.publication->name(); + state_.subscribed_track_names.insert(name); + } + std::cerr << "[MediaMultiStream] onTrackSubscribed name=" << name + << " sid=" << sid << " audio_count=" << state_.audio_tracks + << " video_count=" << state_.video_tracks << std::endl; + state_.cv.notify_all(); + } + +private: + MediaSubscriptionState &state_; +}; + +void fillWebcamLikeFrame(VideoFrame &frame, std::uint64_t frame_index) { + // ARGB layout: [A, R, G, B] + std::uint8_t *data = frame.data(); + const std::size_t size = frame.dataSize(); + const std::uint8_t blue = static_cast((frame_index * 3) % 255); + for (std::size_t i = 0; i < size; i += 4) { + data[i + 0] = 255; // A + data[i + 1] = 0; // R + data[i + 2] = 170; // G + data[i + 3] = blue; + } +} + +void fillRedFrameWithMetadata(VideoFrame &frame, std::uint64_t frame_index, + std::uint64_t timestamp_us) { + // ARGB layout: [A, R, G, B] + std::uint8_t *data = frame.data(); + const std::size_t size = frame.dataSize(); + for (std::size_t i = 0; i < size; i += 4) { + data[i + 0] = 255; // A + data[i + 1] = 255; // R + data[i + 2] = 0; // G + data[i + 3] = 0; // B + } + + // Encode frame counter + timestamp into first 16 pixels for easy debugging. + std::uint8_t meta[16]; + for (int i = 0; i < 8; ++i) { + meta[i] = static_cast((frame_index >> (i * 8)) & 0xFF); + meta[8 + i] = static_cast((timestamp_us >> (i * 8)) & 0xFF); + } + for (int i = 0; i < 16; ++i) { + const std::size_t px = static_cast(i) * 4; + if (px + 3 < size) { + data[px + 0] = 255; + data[px + 1] = 255; + data[px + 2] = meta[i]; + data[px + 3] = meta[(15 - i)]; + } + } +} + +void runVideoLoop(const std::shared_ptr &source, + std::atomic &running, + void (*fill_fn)(VideoFrame &, std::uint64_t, std::uint64_t)) { + VideoFrame frame = + VideoFrame::create(kVideoWidth, kVideoHeight, VideoBufferType::ARGB); + std::uint64_t frame_index = 0; + while (running.load(std::memory_order_relaxed)) { + const auto now = std::chrono::steady_clock::now().time_since_epoch(); + const auto ts_us = static_cast( + std::chrono::duration_cast(now).count()); + fill_fn(frame, frame_index, ts_us); + try { + source->captureFrame(frame, static_cast(ts_us), + VideoRotation::VIDEO_ROTATION_0); + } catch (...) { + break; + } + frame_index++; + std::this_thread::sleep_for(33ms); + } +} + +void fillWebcamWrapper(VideoFrame &frame, std::uint64_t frame_index, + std::uint64_t) { + fillWebcamLikeFrame(frame, frame_index); +} + +void fillRedWrapper(VideoFrame &frame, std::uint64_t frame_index, + std::uint64_t timestamp_us) { + fillRedFrameWithMetadata(frame, frame_index, timestamp_us); +} + +void runToneLoop(const std::shared_ptr &source, + std::atomic &running, double base_freq_hz, + bool siren_mode) { + double phase = 0.0; + constexpr double kTwoPi = 6.283185307179586; + while (running.load(std::memory_order_relaxed)) { + AudioFrame frame = AudioFrame::create(kAudioSampleRate, kAudioChannels, + kSamplesPerChannel); + auto &samples = frame.data(); + + const double time_sec = + static_cast( + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count()) / + 1000.0; + const double freq = + siren_mode ? (700.0 + 250.0 * std::sin(time_sec * 2.0)) : base_freq_hz; + + const double phase_inc = + kTwoPi * freq / static_cast(kAudioSampleRate); + for (int i = 0; i < kSamplesPerChannel; ++i) { + samples[static_cast(i)] = + static_cast(std::sin(phase) * 12000.0); + phase += phase_inc; + if (phase > kTwoPi) { + phase -= kTwoPi; + } + } + + try { + source->captureFrame(frame); + } catch (...) { + break; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(kAudioFrameMs)); + } +} + +} // namespace + +class MediaMultiStreamIntegrationTest : public LiveKitTestBase { +protected: + void runPublishTwoVideoAndTwoAudioTracks(bool single_peer_connection); +}; + +void MediaMultiStreamIntegrationTest::runPublishTwoVideoAndTwoAudioTracks( + bool single_peer_connection) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and " + "LIVEKIT_RECEIVER_TOKEN not set"; + } + + RoomOptions options; + options.auto_subscribe = true; + options.single_peer_connection = single_peer_connection; + + MediaSubscriptionState receiver_state; + MediaTrackCollectorDelegate receiver_delegate(receiver_state); + + auto receiver_room = std::make_unique(); + receiver_room->setDelegate(&receiver_delegate); + ASSERT_TRUE( + receiver_room->Connect(config_.url, config_.receiver_token, options)) + << "Receiver failed to connect"; + + auto sender_room = std::make_unique(); + ASSERT_TRUE(sender_room->Connect(config_.url, config_.caller_token, options)) + << "Sender failed to connect"; + + const std::string receiver_identity = + receiver_room->localParticipant()->identity(); + const std::string sender_identity = + sender_room->localParticipant()->identity(); + + constexpr int kVideoTrackCount = 10; + constexpr int kAudioTrackCount = 10; + + std::vector> video_sources; + std::vector> video_tracks; + std::vector> video_pubs; + std::vector> audio_sources; + std::vector> audio_tracks; + std::vector> audio_pubs; + std::vector threads; + std::set expected_names; + + video_sources.reserve(kVideoTrackCount); + video_tracks.reserve(kVideoTrackCount); + video_pubs.reserve(kVideoTrackCount); + audio_sources.reserve(kAudioTrackCount); + audio_tracks.reserve(kAudioTrackCount); + audio_pubs.reserve(kAudioTrackCount); + threads.reserve(kVideoTrackCount + kAudioTrackCount); + + for (int i = 0; i < kVideoTrackCount; ++i) { + auto source = std::make_shared(kVideoWidth, kVideoHeight); + const std::string name = "video-track-" + std::to_string(i); + auto track = LocalVideoTrack::createLocalVideoTrack(name, source); + TrackPublishOptions opts; + opts.source = (i % 2 == 0) ? TrackSource::SOURCE_CAMERA + : TrackSource::SOURCE_SCREENSHARE; + auto pub = sender_room->localParticipant()->publishTrack(track, opts); + std::cerr << "[MediaMultiStream] published video " << name + << " sid=" << pub->sid() << std::endl; + video_sources.push_back(source); + video_tracks.push_back(track); + video_pubs.push_back(pub); + expected_names.insert(name); + } + + for (int i = 0; i < kAudioTrackCount; ++i) { + auto source = + std::make_shared(kAudioSampleRate, kAudioChannels, 0); + const std::string name = "audio-track-" + std::to_string(i); + auto track = LocalAudioTrack::createLocalAudioTrack(name, source); + TrackPublishOptions opts; + opts.source = (i % 2 == 0) ? TrackSource::SOURCE_MICROPHONE + : TrackSource::SOURCE_SCREENSHARE_AUDIO; + auto pub = sender_room->localParticipant()->publishTrack(track, opts); + std::cerr << "[MediaMultiStream] published audio " << name + << " sid=" << pub->sid() << std::endl; + audio_sources.push_back(source); + audio_tracks.push_back(track); + audio_pubs.push_back(pub); + expected_names.insert(name); + } + + std::atomic running{true}; + for (int i = 0; i < kVideoTrackCount; ++i) { + auto source = video_sources[static_cast(i)]; + const bool red_mode = (i % 2 == 1); + threads.emplace_back([source, &running, red_mode]() { + runVideoLoop(source, running, + red_mode ? fillRedWrapper : fillWebcamWrapper); + }); + } + for (int i = 0; i < kAudioTrackCount; ++i) { + auto source = audio_sources[static_cast(i)]; + const bool siren_mode = (i % 2 == 1); + const double base_freq = 320.0 + static_cast(i) * 40.0; + threads.emplace_back([source, &running, base_freq, siren_mode]() { + runToneLoop(source, running, base_freq, siren_mode); + }); + } + + { + std::unique_lock lock(receiver_state.mutex); + const bool all_received = receiver_state.cv.wait_for(lock, 20s, [&]() { + return receiver_state.subscribed_track_names.size() >= + expected_names.size(); + }); + EXPECT_TRUE(all_received) << "Timed out waiting for all subscribed tracks"; + if (!all_received) { + std::cerr << "[MediaMultiStream] timeout waiting subscriptions; received " + "names:"; + for (const auto &n : receiver_state.subscribed_track_names) { + std::cerr << " " << n; + } + std::cerr << " (audio=" << receiver_state.audio_tracks + << " video=" << receiver_state.video_tracks << ")" << std::endl; + } + } + + { + std::lock_guard lock(receiver_state.mutex); + for (const auto &expected_name : expected_names) { + EXPECT_TRUE(receiver_state.subscribed_track_names.count(expected_name) > + 0) + << "Missing subscribed track: " << expected_name; + } + EXPECT_GE(receiver_state.video_tracks, kVideoTrackCount); + EXPECT_GE(receiver_state.audio_tracks, kAudioTrackCount); + } + + auto *sender_on_receiver = receiver_room->remoteParticipant(sender_identity); + ASSERT_NE(sender_on_receiver, nullptr); + std::cerr << "[MediaMultiStream] receiver sees sender publications=" + << sender_on_receiver->trackPublications().size() << std::endl; + for (const auto &kv : sender_on_receiver->trackPublications()) { + const auto &pub = kv.second; + std::cerr << "[MediaMultiStream] remote publication sid=" << kv.first + << " name=" << (pub ? pub->name() : "") << " kind=" + << (pub && pub->kind() == TrackKind::KIND_AUDIO ? "audio" + : "video") + << " source=" << (pub ? static_cast(pub->source()) : -1) + << std::endl; + } + EXPECT_GE(sender_on_receiver->trackPublications().size(), + static_cast(kVideoTrackCount + kAudioTrackCount)); + + running.store(false, std::memory_order_relaxed); + for (auto &t : threads) { + if (t.joinable()) { + t.join(); + } + } + + for (const auto &pub : video_pubs) { + sender_room->localParticipant()->unpublishTrack(pub->sid()); + } + for (const auto &pub : audio_pubs) { + sender_room->localParticipant()->unpublishTrack(pub->sid()); + } +} + +TEST_F(MediaMultiStreamIntegrationTest, + PublishTwoVideoAndTwoAudioTracks_DualPeerConnection) { + runPublishTwoVideoAndTwoAudioTracks(false); +} + +TEST_F(MediaMultiStreamIntegrationTest, + PublishTwoVideoAndTwoAudioTracks_SinglePeerConnection) { + runPublishTwoVideoAndTwoAudioTracks(true); +} + +} // namespace test +} // namespace livekit diff --git a/src/tests/stress/test_latency_measurement.cpp b/src/tests/stress/test_latency_measurement.cpp index 2c95859f..9663abcf 100644 --- a/src/tests/stress/test_latency_measurement.cpp +++ b/src/tests/stress/test_latency_measurement.cpp @@ -17,6 +17,8 @@ #include "../common/test_common.h" #include #include +#include +#include namespace livekit { namespace test { @@ -72,6 +74,196 @@ static std::vector generateSilentFrame(int samples_per_channel) { return std::vector(samples_per_channel * kAudioChannels, 0); } +static const char *rtcStatsTypeName(const RtcStats &stats) { + return std::visit( + [](const auto &s) -> const char * { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return "Codec"; + } else if constexpr (std::is_same_v) { + return "InboundRtp"; + } else if constexpr (std::is_same_v) { + return "OutboundRtp"; + } else if constexpr (std::is_same_v) { + return "RemoteInboundRtp"; + } else if constexpr (std::is_same_v) { + return "RemoteOutboundRtp"; + } else if constexpr (std::is_same_v) { + return "MediaSource"; + } else if constexpr (std::is_same_v) { + return "MediaPlayout"; + } else if constexpr (std::is_same_v) { + return "PeerConnection"; + } else if constexpr (std::is_same_v) { + return "DataChannel"; + } else if constexpr (std::is_same_v) { + return "Transport"; + } else if constexpr (std::is_same_v) { + return "CandidatePair"; + } else if constexpr (std::is_same_v) { + return "LocalCandidate"; + } else if constexpr (std::is_same_v) { + return "RemoteCandidate"; + } else if constexpr (std::is_same_v) { + return "Certificate"; + } else if constexpr (std::is_same_v) { + return "Stream"; + } else { + return "Unknown"; + } + }, + stats.stats); +} + +static void printSessionStats(const std::vector &stats, + const std::string &label) { + std::cout << " " << label << " stats: " << stats.size() << std::endl; + for (const auto &stat : stats) { + std::visit( + [&](const auto &s) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + std::cout << " [CandidatePair] id=" << s.rtc.id + << " rtt=" << std::fixed << std::setprecision(4) + << s.candidate_pair.current_round_trip_time << "s" + << " total_rtt=" << s.candidate_pair.total_round_trip_time + << "s" + << " in_bitrate=" + << s.candidate_pair.available_incoming_bitrate + << " out_bitrate=" + << s.candidate_pair.available_outgoing_bitrate + << " bytes_sent=" << s.candidate_pair.bytes_sent + << " bytes_received=" << s.candidate_pair.bytes_received + << std::endl; + } else if constexpr (std::is_same_v) { + std::cout << " [Transport] id=" << s.rtc.id + << " selected_pair=" + << s.transport.selected_candidate_pair_id + << " packets_sent=" << s.transport.packets_sent + << " packets_received=" << s.transport.packets_received + << " bytes_sent=" << s.transport.bytes_sent + << " bytes_received=" << s.transport.bytes_received + << std::endl; + } else if constexpr (std::is_same_v) { + std::cout << " [PeerConnection] id=" << s.rtc.id + << " data_channels_opened=" << s.pc.data_channels_opened + << " data_channels_closed=" << s.pc.data_channels_closed + << std::endl; + } else if constexpr (std::is_same_v) { + std::cout << " [InboundRtp] id=" << s.rtc.id + << " kind=" << s.stream.kind + << " packets_lost=" << s.received.packets_lost + << " jitter=" << std::fixed << std::setprecision(6) + << s.received.jitter + << " bytes_received=" << s.inbound.bytes_received + << std::endl; + } else if constexpr (std::is_same_v) { + std::cout << " [OutboundRtp] id=" << s.rtc.id + << " kind=" << s.stream.kind + << " packets_sent=" << s.sent.packets_sent + << " bytes_sent=" << s.sent.bytes_sent + << " target_bitrate=" << std::fixed + << std::setprecision(2) << s.outbound.target_bitrate + << std::endl; + } + }, + stat.stats); + } + + std::map type_counts; + for (const auto &stat : stats) { + type_counts[rtcStatsTypeName(stat)]++; + } + if (!type_counts.empty()) { + std::cout << " " << label << " type counts:"; + for (const auto &kv : type_counts) { + std::cout << " " << kv.first << "=" << kv.second; + } + std::cout << std::endl; + } +} + +static void +printAudioLatencyAndNetworkSummary(const std::vector &stats, + const std::string &label) { + std::cout << " " << label << " audio/network summary:" << std::endl; + bool printed = false; + + for (const auto &stat : stats) { + std::visit( + [&](const auto &s) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + if (s.stream.kind == "audio") { + printed = true; + double emitted = + static_cast(s.inbound.jitter_buffer_emitted_count); + double avg_jb_delay_s = + emitted > 0.0 ? (s.inbound.jitter_buffer_delay / emitted) + : 0.0; + double avg_jb_target_s = + emitted > 0.0 + ? (s.inbound.jitter_buffer_target_delay / emitted) + : 0.0; + double avg_processing_s = + emitted > 0.0 ? (s.inbound.total_processing_delay / emitted) + : 0.0; + + std::cout << " [InboundAudio] id=" << s.rtc.id + << " packets_received=" << s.received.packets_received + << " packets_lost=" << s.received.packets_lost + << " jitter=" << std::fixed << std::setprecision(6) + << s.received.jitter + << " jb_delay_total_s=" << s.inbound.jitter_buffer_delay + << " jb_target_total_s=" + << s.inbound.jitter_buffer_target_delay + << " jb_emitted=" + << s.inbound.jitter_buffer_emitted_count + << " jb_delay_avg_ms=" << std::setprecision(2) + << (avg_jb_delay_s * 1000.0) + << " jb_target_avg_ms=" << (avg_jb_target_s * 1000.0) + << " processing_avg_ms=" << (avg_processing_s * 1000.0) + << " concealed_samples=" << s.inbound.concealed_samples + << " inserted_for_decel=" + << s.inbound.inserted_samples_for_deceleration + << " removed_for_accel=" + << s.inbound.removed_samples_for_acceleration + << std::endl; + } + } else if constexpr (std::is_same_v) { + printed = true; + std::cout << " [CandidatePair] id=" << s.rtc.id + << " rtt_ms=" << std::fixed << std::setprecision(2) + << (s.candidate_pair.current_round_trip_time * 1000.0) + << " total_rtt_s=" << std::setprecision(4) + << s.candidate_pair.total_round_trip_time + << " bytes_sent=" << s.candidate_pair.bytes_sent + << " bytes_received=" << s.candidate_pair.bytes_received + << " in_bitrate=" + << s.candidate_pair.available_incoming_bitrate + << " out_bitrate=" + << s.candidate_pair.available_outgoing_bitrate + << std::endl; + } else if constexpr (std::is_same_v) { + printed = true; + std::cout << " [Transport] id=" << s.rtc.id + << " selected_pair=" + << s.transport.selected_candidate_pair_id + << " packets_sent=" << s.transport.packets_sent + << " packets_received=" << s.transport.packets_received + << " bytes_sent=" << s.transport.bytes_sent + << " bytes_received=" << s.transport.bytes_received + << std::endl; + } + }, + stat.stats); + } + + if (!printed) { + std::cout << " (no audio/network stats available)" << std::endl; + } +} + // ============================================================================= // Test Fixture // ============================================================================= @@ -104,6 +296,7 @@ TEST_F(LatencyMeasurementTest, ConnectionTime) { stats.addMeasurement(latency_ms); std::cout << " Iteration " << (i + 1) << ": " << std::fixed << std::setprecision(2) << latency_ms << " ms" << std::endl; + } else { std::cout << " Iteration " << (i + 1) << ": FAILED to connect" << std::endl; @@ -125,8 +318,11 @@ class AudioLatencyDelegate : public RoomDelegate { public: void onTrackSubscribed(Room &, const TrackSubscribedEvent &event) override { std::lock_guard lock(mutex_); - if (event.track && event.track->kind() == TrackKind::KIND_AUDIO) { + if (event.track && event.track->kind() == TrackKind::KIND_AUDIO && + event.participant) { subscribed_audio_track_ = event.track; + subscribed_audio_tracks_by_participant_[event.participant->identity()] = + event.track; track_cv_.notify_all(); } } @@ -141,10 +337,24 @@ class AudioLatencyDelegate : public RoomDelegate { return nullptr; } + std::shared_ptr + waitForAudioTrackFromParticipant(const std::string &identity, + std::chrono::milliseconds timeout) { + std::unique_lock lock(mutex_); + if (track_cv_.wait_for(lock, timeout, [this, &identity] { + return subscribed_audio_tracks_by_participant_.count(identity) > 0; + })) { + return subscribed_audio_tracks_by_participant_[identity]; + } + return nullptr; + } + private: std::mutex mutex_; std::condition_variable track_cv_; std::shared_ptr subscribed_audio_track_; + std::map> + subscribed_audio_tracks_by_participant_; }; TEST_F(LatencyMeasurementTest, AudioLatency) { @@ -349,5 +559,298 @@ TEST_F(LatencyMeasurementTest, AudioLatency) { << "At least one audio latency measurement should be recorded"; } +TEST_F(LatencyMeasurementTest, FullDeplexAudioLatency) { + skipIfNotConfigured(); + + std::cout << "\n=== FullDeplexAudioLatency Test ===" << std::endl; + std::cout << "Measuring A->B, B->A, and A->B->A using audio ping-pong" + << std::endl; + + auto room_a = std::make_unique(); // caller + auto room_b = std::make_unique(); // receiver + AudioLatencyDelegate delegate_a; + AudioLatencyDelegate delegate_b; + room_a->setDelegate(&delegate_a); + room_b->setDelegate(&delegate_b); + + RoomOptions options; + options.auto_subscribe = true; + + ASSERT_TRUE(room_a->Connect(config_.url, config_.caller_token, options)) + << "Participant A failed to connect"; + ASSERT_TRUE(room_b->Connect(config_.url, config_.receiver_token, options)) + << "Participant B failed to connect"; + + std::string id_a = room_a->localParticipant()->identity(); + std::string id_b = room_b->localParticipant()->identity(); + std::cout << "Participant A: " << id_a << std::endl; + std::cout << "Participant B: " << id_b << std::endl; + + ASSERT_TRUE(waitForParticipant(room_a.get(), id_b, 10s)) << "A cannot see B"; + ASSERT_TRUE(waitForParticipant(room_b.get(), id_a, 10s)) << "B cannot see A"; + + auto source_a = + std::make_shared(kAudioSampleRate, kAudioChannels, 0); + auto source_b = + std::make_shared(kAudioSampleRate, kAudioChannels, 1000); + auto track_a = + LocalAudioTrack::createLocalAudioTrack("full-duplex-a", source_a); + auto track_b = + LocalAudioTrack::createLocalAudioTrack("full-duplex-b", source_b); + ASSERT_NE(track_a, nullptr); + ASSERT_NE(track_b, nullptr); + + TrackPublishOptions publish_options; + auto pub_a = + room_a->localParticipant()->publishTrack(track_a, publish_options); + auto pub_b = + room_b->localParticipant()->publishTrack(track_b, publish_options); + ASSERT_NE(pub_a, nullptr); + ASSERT_NE(pub_b, nullptr); + + auto track_from_a_on_b = + delegate_b.waitForAudioTrackFromParticipant(id_a, 10s); + auto track_from_b_on_a = + delegate_a.waitForAudioTrackFromParticipant(id_b, 10s); + ASSERT_NE(track_from_a_on_b, nullptr) << "B did not subscribe to A audio"; + ASSERT_NE(track_from_b_on_a, nullptr) << "A did not subscribe to B audio"; + + AudioStream::Options stream_options; + stream_options.capacity = 100; + auto stream_b_recv_a = + AudioStream::fromTrack(track_from_a_on_b, stream_options); + auto stream_a_recv_b = + AudioStream::fromTrack(track_from_b_on_a, stream_options); + ASSERT_NE(stream_b_recv_a, nullptr); + ASSERT_NE(stream_a_recv_b, nullptr); + + LatencyStats a_to_b_stats; + LatencyStats b_to_a_stats; + LatencyStats round_trip_stats; + std::atomic running{true}; + std::atomic active_pulse_id{0}; + std::atomic a_send_us{0}; + std::atomic b_detect_us{0}; + std::atomic b_send_us{0}; + std::atomic b_responded_pulse_id{0}; + std::atomic a_received_pulse_id{0}; + std::atomic waiting_for_response{false}; + std::atomic pre_pulse_silence_frames_remaining{200}; + std::atomic timeouts{0}; + + constexpr int kTotalPulses = 100; + constexpr int kPrePulseSilenceFrames = 50; // 500ms at 10ms/frame + constexpr uint64_t kPulseTimeoutUs = 8000000; // 8 seconds + constexpr int kBMaxResponseFrames = 50; // 500ms at 10ms/frame + constexpr double kMinValidOneWayMs = 100.0; // filter stale/impossible matches + constexpr double kMinValidBToAMs = 100.0; // filter stale/impossible matches + + // B receives A pulses and sends response frames only when responding. + std::thread b_receiver_thread([&]() { + AudioFrameEvent event; + while (running.load() && stream_b_recv_a->read(event)) { + if (calculateEnergy(event.frame.data()) <= kHighEnergyThreshold) { + continue; + } + + int pulse_id = active_pulse_id.load(); + if (pulse_id <= 0 || b_responded_pulse_id.load() == pulse_id) { + continue; + } + + uint64_t detect_us = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t send_from_a_us = a_send_us.load(); + if (send_from_a_us == 0 || detect_us <= send_from_a_us) { + continue; + } + double a_to_b_ms = (detect_us - send_from_a_us) / 1000.0; + if (a_to_b_ms < kMinValidOneWayMs || a_to_b_ms > 5000) { + continue; + } + + b_detect_us.store(detect_us); + b_responded_pulse_id.store(pulse_id); + a_to_b_stats.addMeasurement(a_to_b_ms); + std::cout << " A->B latency: " << std::fixed << std::setprecision(2) + << a_to_b_ms << " ms" << std::endl; + for (int i = 0; i < kBMaxResponseFrames; ++i) { + std::vector pulse = generateHighEnergyFrame(kSamplesPerFrame); + AudioFrame response_frame(std::move(pulse), kAudioSampleRate, + kAudioChannels, kSamplesPerFrame); + try { + if (i == 0) { + b_send_us.store( + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count()); + } + source_b->captureFrame(response_frame); + } catch (const std::exception &e) { + std::cerr << "Error sending B response frame: " << e.what() + << std::endl; + break; + } + // Prevent stale overlap into subsequent pulses. + if (!waiting_for_response.load()) { + break; + } + std::this_thread::sleep_for(10ms); + } + } + }); + + // A receives B responses and computes B->A and A->B->A. + std::thread a_receiver_thread([&]() { + AudioFrameEvent event; + while (running.load() && stream_a_recv_b->read(event)) { + if (!waiting_for_response.load() || + calculateEnergy(event.frame.data()) <= kHighEnergyThreshold) { + continue; + } + + int pulse_id = active_pulse_id.load(); + if (pulse_id <= 0 || a_received_pulse_id.load() == pulse_id) { + continue; + } + + uint64_t receive_us = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t send_from_a_us = a_send_us.load(); + uint64_t send_from_b_us = b_send_us.load(); + int responded_pulse_id = b_responded_pulse_id.load(); + + // Only accept a return pulse after B has actually started responding to + // this same pulse id. This filters stale overlap from previous responses. + if (responded_pulse_id != pulse_id || send_from_b_us == 0 || + receive_us < send_from_b_us) { + continue; + } + a_received_pulse_id.store(pulse_id); + + double b_to_a_ms = (receive_us - send_from_b_us) / 1000.0; + if (b_to_a_ms >= kMinValidBToAMs && b_to_a_ms < 5000) { + b_to_a_stats.addMeasurement(b_to_a_ms); + std::cout << " B->A latency: " << std::fixed << std::setprecision(2) + << b_to_a_ms << " ms" << std::endl; + } + + if (send_from_a_us > 0) { + double rtt_ms = (receive_us - send_from_a_us) / 1000.0; + if (rtt_ms > 0 && rtt_ms < 10000) { + round_trip_stats.addMeasurement(rtt_ms); + std::cout << " A->B->A latency: " << std::fixed + << std::setprecision(2) << rtt_ms << " ms" << std::endl; + } + } + + waiting_for_response.store(false); + active_pulse_id.store(0); + pre_pulse_silence_frames_remaining.store(kPrePulseSilenceFrames); + } + }); + + // A sends ping pulses and waits for B's response. + std::thread a_sender_thread([&]() { + auto next_frame_time = std::chrono::steady_clock::now(); + const auto frame_duration = + std::chrono::milliseconds(kAudioFrameDurationMs); + int pulses_sent = 0; + uint64_t pulse_start_us = 0; + + while (running.load() && + (pulses_sent < kTotalPulses || waiting_for_response.load())) { + std::this_thread::sleep_until(next_frame_time); + next_frame_time += frame_duration; + + if (waiting_for_response.load()) { + uint64_t now_us = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (pulse_start_us > 0 && now_us - pulse_start_us > kPulseTimeoutUs) { + std::cout << " Timeout waiting for B response to pulse " + << active_pulse_id.load() << std::endl; + waiting_for_response.store(false); + active_pulse_id.store(0); + pre_pulse_silence_frames_remaining.store(kPrePulseSilenceFrames); + timeouts++; + } + } + + std::vector frame_data; + if (waiting_for_response.load()) { + // Keep transmitting pulse frames until B responds or timeout. + frame_data = generateHighEnergyFrame(kSamplesPerFrame); + } else if (!waiting_for_response.load() && pulses_sent < kTotalPulses) { + int remaining_silence = pre_pulse_silence_frames_remaining.load(); + if (remaining_silence > 0) { + pre_pulse_silence_frames_remaining.store(remaining_silence - 1); + frame_data = generateSilentFrame(kSamplesPerFrame); + } else { + pulses_sent++; + int pulse_id = pulses_sent; + active_pulse_id.store(pulse_id); + b_responded_pulse_id.store(0); + a_received_pulse_id.store(0); + b_send_us.store(0); + + pulse_start_us = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + a_send_us.store(pulse_start_us); + waiting_for_response.store(true); + + frame_data = generateHighEnergyFrame(kSamplesPerFrame); + + std::cout << "Sent ping pulse " << pulse_id << "/" << kTotalPulses + << std::endl; + } + } else { + frame_data = generateSilentFrame(kSamplesPerFrame); + } + + AudioFrame frame(std::move(frame_data), kAudioSampleRate, kAudioChannels, + kSamplesPerFrame); + try { + source_a->captureFrame(frame); + } catch (const std::exception &e) { + std::cerr << "Error sending A frame: " << e.what() << std::endl; + } + } + + std::this_thread::sleep_for(500ms); + running.store(false); + }); + + a_sender_thread.join(); + stream_a_recv_b->close(); + stream_b_recv_a->close(); + a_receiver_thread.join(); + b_receiver_thread.join(); + + round_trip_stats.printStats("Full Duplex Latency (A->B->A) Statistics"); + a_to_b_stats.printStats("One-way Latency (A->B) Statistics"); + b_to_a_stats.printStats("One-way Latency (B->A) Statistics"); + if (timeouts > 0) { + std::cout << "Response timeouts: " << timeouts << std::endl; + } + + room_a->localParticipant()->unpublishTrack(pub_a->sid()); + room_b->localParticipant()->unpublishTrack(pub_b->sid()); + + EXPECT_GT(round_trip_stats.count(), 0) + << "At least one round-trip latency measurement should be recorded"; + EXPECT_GT(a_to_b_stats.count(), 0) + << "At least one A->B latency measurement should be recorded"; + EXPECT_GT(b_to_a_stats.count(), 0) + << "At least one B->A latency measurement should be recorded"; +} + } // namespace test } // namespace livekit diff --git a/src/tests/stress/test_rpc_stress.cpp b/src/tests/stress/test_rpc_stress.cpp index 38f42262..ef4d12ec 100644 --- a/src/tests/stress/test_rpc_stress.cpp +++ b/src/tests/stress/test_rpc_stress.cpp @@ -516,5 +516,610 @@ TEST_F(RpcStressTest, SmallPayloadStress) { receiver_room.reset(); } +TEST_F(RpcStressTest, VaryingPayloadStress) { + skipIfNotConfigured(); + + std::cout << "\n=== RPC Varying Payload Stress Test ===" << std::endl; + std::cout << "Duration: " << config_.stress_duration_seconds << " seconds" + << std::endl; + std::cout << "Caller threads: " << config_.num_caller_threads << std::endl; + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + std::atomic total_received{0}; + std::map> received_by_size; + std::mutex size_map_mutex; + + // Response sizes to use (varying) + // Note: Leave room for metadata prefix "request_size:response_size:checksum:" + // which is about 25 bytes max + constexpr size_t kMetadataOverhead = 30; + std::vector response_sizes = { + 100, // Small (no compression) + 1024, // 1KB (compression threshold) + 5 * 1024, // 5KB + 10 * 1024, // 10KB + kMaxRpcPayloadSize - kMetadataOverhead // Max minus metadata overhead + }; + + receiver_room->localParticipant()->registerRpcMethod( + "varying-payload-stress", + [&, response_sizes]( + const RpcInvocationData &data) -> std::optional { + total_received++; + size_t request_size = data.payload.size(); + + { + std::lock_guard lock(size_map_mutex); + received_by_size[request_size]++; + } + + // Generate a random response payload of varying size + static thread_local std::random_device rd; + static thread_local std::mt19937 gen(rd()); + std::uniform_int_distribution dis(0, response_sizes.size() - 1); + size_t response_size = response_sizes[dis(gen)]; + + std::string response_payload = generateRandomPayload(response_size); + + // Calculate checksum for verification + size_t checksum = 0; + for (char c : response_payload) { + checksum += static_cast(c); + } + + // Return format: "request_size:response_size:checksum:payload" + // This allows sender to verify both request was received and response + // is correct + return std::to_string(request_size) + ":" + + std::to_string(response_size) + ":" + std::to_string(checksum) + + ":" + response_payload; + }); + + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + StressTestStats stats; + std::atomic running{true}; + + // Payload sizes to test + std::vector payload_sizes = { + 100, // Small + 1024, // 1KB + 5 * 1024, // 5KB + 10 * 1024, // 10KB + kMaxRpcPayloadSize - 1, // Just under max + kMaxRpcPayloadSize // Max (15KB) + }; + + auto start_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::seconds(config_.stress_duration_seconds); + + std::vector caller_threads; + for (int t = 0; t < config_.num_caller_threads; ++t) { + caller_threads.emplace_back([&, thread_id = t]() { + int call_count = 0; + while (running.load()) { + size_t payload_size = payload_sizes[call_count % payload_sizes.size()]; + std::string payload = generateRandomPayload(payload_size); + + auto call_start = std::chrono::high_resolution_clock::now(); + + try { + std::string response = caller_room->localParticipant()->performRpc( + receiver_identity, "varying-payload-stress", payload, 60.0); + + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + + // Parse response format: + // "request_size:response_size:checksum:payload" + bool valid = false; + size_t first_colon = response.find(':'); + size_t second_colon = response.find(':', first_colon + 1); + size_t third_colon = response.find(':', second_colon + 1); + + if (first_colon != std::string::npos && + second_colon != std::string::npos && + third_colon != std::string::npos) { + size_t recv_request_size = + std::stoull(response.substr(0, first_colon)); + size_t recv_response_size = std::stoull(response.substr( + first_colon + 1, second_colon - first_colon - 1)); + size_t recv_checksum = std::stoull(response.substr( + second_colon + 1, third_colon - second_colon - 1)); + std::string recv_payload = response.substr(third_colon + 1); + + // Calculate actual checksum of received payload + size_t actual_checksum = 0; + for (char c : recv_payload) { + actual_checksum += static_cast(c); + } + + // Verify all fields + if (recv_request_size == payload_size && + recv_response_size == recv_payload.size() && + recv_checksum == actual_checksum) { + valid = true; + } else { + std::cerr << "[VARYING MISMATCH] sent_size=" << payload_size + << " recv_request_size=" << recv_request_size + << " recv_response_size=" << recv_response_size + << " actual_payload_size=" << recv_payload.size() + << " recv_checksum=" << recv_checksum + << " actual_checksum=" << actual_checksum << std::endl; + } + } else { + std::cerr << "[VARYING PARSE ERROR] response format invalid" + << std::endl; + } + + if (valid) { + stats.recordCall(true, latency_ms, payload_size); + } else { + stats.recordCall(false, latency_ms, payload_size); + stats.recordError("verification_failed"); + } + } catch (const RpcError &e) { + stats.recordCall(false, 0, payload_size); + auto code = static_cast(e.code()); + std::cerr << "[RPC ERROR] code=" << e.code() << " message=\"" + << e.message() << "\"" + << " data=\"" << e.data() << "\"" << std::endl; + if (code == RpcError::ErrorCode::RESPONSE_TIMEOUT) { + stats.recordError("timeout"); + } else { + stats.recordError("rpc_error"); + } + } catch (const std::exception &ex) { + stats.recordCall(false, 0, payload_size); + stats.recordError("exception"); + std::cerr << "[EXCEPTION] " << ex.what() << std::endl; + } + + call_count++; + std::this_thread::sleep_for(5ms); + } + }); + } + + // Progress reporting + std::thread progress_thread([&]() { + while (running.load()) { + std::this_thread::sleep_for(30s); + if (!running.load()) + break; + + auto elapsed = std::chrono::steady_clock::now() - start_time; + auto elapsed_seconds = + std::chrono::duration_cast(elapsed).count(); + + std::cout << "[" << elapsed_seconds << "s] Total: " << stats.totalCalls() + << " | Success: " << stats.successfulCalls() + << " | Failed: " << stats.failedCalls() << std::endl; + } + }); + + while (std::chrono::steady_clock::now() - start_time < duration) { + std::this_thread::sleep_for(1s); + } + + running.store(false); + + for (auto &t : caller_threads) { + t.join(); + } + progress_thread.join(); + + stats.printStats(); + + // Print breakdown by size + std::cout << "Received by payload size:" << std::endl; + { + std::lock_guard lock(size_map_mutex); + for (const auto &pair : received_by_size) { + std::cout << " " << pair.first << " bytes: " << pair.second.load() + << std::endl; + } + } + + EXPECT_GT(stats.successfulCalls(), 0); + double success_rate = + (stats.totalCalls() > 0) + ? (100.0 * stats.successfulCalls() / stats.totalCalls()) + : 0.0; + EXPECT_GT(success_rate, 95.0) << "Success rate below 95%"; + + receiver_room->localParticipant()->unregisterRpcMethod( + "varying-payload-stress"); + caller_room.reset(); + receiver_room.reset(); +} + +// Stress test for bidirectional RPC (both sides can call each other) +TEST_F(RpcStressTest, BidirectionalRpcStress) { + skipIfNotConfigured(); + + std::cout << "\n=== Bidirectional RPC Stress Test ===" << std::endl; + std::cout << "Duration: " << config_.stress_duration_seconds << " seconds" + << std::endl; + + auto room_a = std::make_unique(); + auto room_b = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool a_connected = + room_a->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(a_connected) << "Room A failed to connect"; + + bool b_connected = + room_b->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(b_connected) << "Room B failed to connect"; + + std::string identity_a = room_a->localParticipant()->identity(); + std::string identity_b = room_b->localParticipant()->identity(); + + ASSERT_TRUE(waitForParticipant(room_a.get(), identity_b, 10s)) + << "Room B not visible to Room A"; + ASSERT_TRUE(waitForParticipant(room_b.get(), identity_a, 10s)) + << "Room A not visible to Room B"; + + std::atomic a_received{0}; + std::atomic b_received{0}; + + // Register handlers on both sides - echo payload back for verification + room_a->localParticipant()->registerRpcMethod( + "ping", + [&a_received]( + const RpcInvocationData &data) -> std::optional { + a_received++; + // Echo the payload back for round-trip verification + return data.payload; + }); + + room_b->localParticipant()->registerRpcMethod( + "ping", + [&b_received]( + const RpcInvocationData &data) -> std::optional { + b_received++; + // Echo the payload back for round-trip verification + return data.payload; + }); + + StressTestStats stats_a_to_b; + StressTestStats stats_b_to_a; + std::atomic running{true}; + + auto start_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::seconds(config_.stress_duration_seconds); + + // A calling B + std::thread thread_a_to_b([&]() { + int counter = 0; + while (running.load()) { + std::string payload = generateRandomPayload(kMaxRpcPayloadSize); + + // Calculate expected checksum for verification + size_t expected_checksum = 0; + for (char c : payload) { + expected_checksum += static_cast(c); + } + + auto call_start = std::chrono::high_resolution_clock::now(); + + try { + std::string response = room_a->localParticipant()->performRpc( + identity_b, "ping", payload, 60.0); + + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + + // Verify response by comparing checksum + size_t response_checksum = 0; + for (char c : response) { + response_checksum += static_cast(c); + } + + if (response.size() == payload.size() && + response_checksum == expected_checksum) { + stats_a_to_b.recordCall(true, latency_ms, kMaxRpcPayloadSize); + } else { + stats_a_to_b.recordCall(false, latency_ms, kMaxRpcPayloadSize); + std::cerr << "[A->B MISMATCH] sent size=" << payload.size() + << " checksum=" << expected_checksum + << " | received size=" << response.size() + << " checksum=" << response_checksum << std::endl; + } + } catch (const RpcError &e) { + stats_a_to_b.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[A->B RPC ERROR] code=" << e.code() << " message=\"" + << e.message() << "\"" + << " data=\"" << e.data() << "\"" << std::endl; + } catch (const std::exception &ex) { + stats_a_to_b.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[A->B EXCEPTION] " << ex.what() << std::endl; + } + + counter++; + std::this_thread::sleep_for(20ms); + } + }); + + // B calling A + std::thread thread_b_to_a([&]() { + int counter = 0; + while (running.load()) { + std::string payload = generateRandomPayload(kMaxRpcPayloadSize); + + // Calculate expected checksum for verification + size_t expected_checksum = 0; + for (char c : payload) { + expected_checksum += static_cast(c); + } + + auto call_start = std::chrono::high_resolution_clock::now(); + + try { + std::string response = room_b->localParticipant()->performRpc( + identity_a, "ping", payload, 60.0); + + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + + // Verify response by comparing checksum + size_t response_checksum = 0; + for (char c : response) { + response_checksum += static_cast(c); + } + + if (response.size() == payload.size() && + response_checksum == expected_checksum) { + stats_b_to_a.recordCall(true, latency_ms, kMaxRpcPayloadSize); + } else { + stats_b_to_a.recordCall(false, latency_ms, kMaxRpcPayloadSize); + std::cerr << "[B->A MISMATCH] sent size=" << payload.size() + << " checksum=" << expected_checksum + << " | received size=" << response.size() + << " checksum=" << response_checksum << std::endl; + } + } catch (const RpcError &e) { + stats_b_to_a.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[B->A RPC ERROR] code=" << e.code() << " message=\"" + << e.message() << "\"" + << " data=\"" << e.data() << "\"" << std::endl; + } catch (const std::exception &ex) { + stats_b_to_a.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[B->A EXCEPTION] " << ex.what() << std::endl; + } + + counter++; + std::this_thread::sleep_for(20ms); + } + }); + + // Progress + std::thread progress_thread([&]() { + while (running.load()) { + std::this_thread::sleep_for(30s); + if (!running.load()) + break; + + auto elapsed = std::chrono::steady_clock::now() - start_time; + auto elapsed_seconds = + std::chrono::duration_cast(elapsed).count(); + + std::cout << "[" << elapsed_seconds << "s] " + << "A->B: " << stats_a_to_b.successfulCalls() << "/" + << stats_a_to_b.totalCalls() << " | " + << "B->A: " << stats_b_to_a.successfulCalls() << "/" + << stats_b_to_a.totalCalls() << " | " + << "A rcvd: " << a_received.load() + << " | B rcvd: " << b_received.load() << std::endl; + } + }); + + while (std::chrono::steady_clock::now() - start_time < duration) { + std::this_thread::sleep_for(1s); + } + + running.store(false); + + thread_a_to_b.join(); + thread_b_to_a.join(); + progress_thread.join(); + + std::cout << "\n=== A -> B Statistics ===" << std::endl; + stats_a_to_b.printStats(); + + std::cout << "\n=== B -> A Statistics ===" << std::endl; + stats_b_to_a.printStats(); + + EXPECT_GT(stats_a_to_b.successfulCalls(), 0); + EXPECT_GT(stats_b_to_a.successfulCalls(), 0); + + room_a->localParticipant()->unregisterRpcMethod("ping"); + room_b->localParticipant()->unregisterRpcMethod("ping"); + room_a.reset(); + room_b.reset(); +} + +// High throughput stress test (short bursts) +TEST_F(RpcStressTest, HighThroughputBurst) { + skipIfNotConfigured(); + + std::cout << "\n=== High Throughput Burst Test ===" << std::endl; + std::cout << "Duration: " << config_.stress_duration_seconds << " seconds" + << std::endl; + std::cout << "Testing rapid-fire RPC with max payload (15KB)..." << std::endl; + + auto receiver_room = std::make_unique(); + RoomOptions options; + options.auto_subscribe = true; + + bool receiver_connected = + receiver_room->Connect(config_.url, config_.receiver_token, options); + ASSERT_TRUE(receiver_connected) << "Receiver failed to connect"; + + std::string receiver_identity = receiver_room->localParticipant()->identity(); + + std::atomic total_received{0}; + + receiver_room->localParticipant()->registerRpcMethod( + "burst-test", + [&total_received]( + const RpcInvocationData &data) -> std::optional { + total_received++; + // Echo the payload back for round-trip verification + return data.payload; + }); + + auto caller_room = std::make_unique(); + bool caller_connected = + caller_room->Connect(config_.url, config_.caller_token, options); + ASSERT_TRUE(caller_connected) << "Caller failed to connect"; + + bool receiver_visible = + waitForParticipant(caller_room.get(), receiver_identity, 10s); + ASSERT_TRUE(receiver_visible) << "Receiver not visible to caller"; + + StressTestStats stats; + std::atomic running{true}; + + auto start_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::seconds(config_.stress_duration_seconds); + + // Multiple threads sending as fast as possible + std::vector burst_threads; + for (int t = 0; t < config_.num_caller_threads * 2; ++t) { + burst_threads.emplace_back([&]() { + while (running.load()) { + std::string payload = generateRandomPayload(kMaxRpcPayloadSize); + + // Calculate expected checksum for verification + size_t expected_checksum = 0; + for (char c : payload) { + expected_checksum += static_cast(c); + } + + auto call_start = std::chrono::high_resolution_clock::now(); + + try { + std::string response = caller_room->localParticipant()->performRpc( + receiver_identity, "burst-test", payload, 60.0); + + auto call_end = std::chrono::high_resolution_clock::now(); + double latency_ms = + std::chrono::duration(call_end - call_start) + .count(); + + // Verify response by comparing checksum + size_t response_checksum = 0; + for (char c : response) { + response_checksum += static_cast(c); + } + + if (response.size() == payload.size() && + response_checksum == expected_checksum) { + stats.recordCall(true, latency_ms, kMaxRpcPayloadSize); + } else { + stats.recordCall(false, latency_ms, kMaxRpcPayloadSize); + std::cerr << "[BURST MISMATCH] sent size=" << payload.size() + << " checksum=" << expected_checksum + << " | received size=" << response.size() + << " checksum=" << response_checksum << std::endl; + } + } catch (const RpcError &e) { + stats.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[BURST RPC ERROR] code=" << e.code() << " message=\"" + << e.message() << "\"" + << " data=\"" << e.data() << "\"" << std::endl; + } catch (const std::exception &ex) { + stats.recordCall(false, 0, kMaxRpcPayloadSize); + std::cerr << "[BURST EXCEPTION] " << ex.what() << std::endl; + } + + // No delay - burst mode + } + }); + } + + // Progress + std::thread progress_thread([&]() { + int last_total = 0; + while (running.load()) { + std::this_thread::sleep_for(10s); + if (!running.load()) + break; + + int current = stats.totalCalls(); + double rate = (current - last_total) / 10.0; + last_total = current; + + auto elapsed = std::chrono::steady_clock::now() - start_time; + auto elapsed_seconds = + std::chrono::duration_cast(elapsed).count(); + + std::cout << "[" << elapsed_seconds << "s] " + << "Total: " << current + << " | Success: " << stats.successfulCalls() + << " | Rate: " << rate << " calls/sec" + << " | Throughput: " << (rate * kMaxRpcPayloadSize / 1024.0) + << " KB/sec" << std::endl; + } + }); + + while (std::chrono::steady_clock::now() - start_time < duration) { + std::this_thread::sleep_for(1s); + } + + running.store(false); + + for (auto &t : burst_threads) { + t.join(); + } + progress_thread.join(); + + stats.printStats(); + + auto total_time = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time) + .count(); + double avg_rate = static_cast(stats.totalCalls()) / total_time; + double throughput_kbps = + (static_cast(stats.successfulCalls()) * kMaxRpcPayloadSize) / + (total_time * 1024.0); + + std::cout << "Average rate: " << avg_rate << " calls/sec" << std::endl; + std::cout << "Average throughput: " << throughput_kbps << " KB/sec" + << std::endl; + + EXPECT_GT(stats.successfulCalls(), 0); + + receiver_room->localParticipant()->unregisterRpcMethod("burst-test"); + caller_room.reset(); + receiver_room.reset(); +} + } // namespace test } // namespace livekit