Skip to content

Commit

Permalink
Rebase ring buffer branch to develop
Browse files Browse the repository at this point in the history
  • Loading branch information
ForeverASilver authored and gavv committed Jul 12, 2024
1 parent 195e207 commit 9601cee
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/internal_modules/roc_pipeline/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,21 @@ void ReceiverCommonConfig::deduce_defaults() {

ReceiverSessionConfig::ReceiverSessionConfig()
: payload_type(0)
, prebuf_len(0)
, enable_beeping(false) {
}

void ReceiverSessionConfig::deduce_defaults() {
if (prebuf_len == 0) {
prebuf_len = latency.target_latency;
}
latency.deduce_defaults(DefaultLatency, true);
watchdog.deduce_defaults(latency.target_latency);
resampler.deduce_defaults(latency.tuner_backend, latency.tuner_profile);
}

ReceiverSourceConfig::ReceiverSourceConfig() {
ReceiverSourceConfig::ReceiverSourceConfig()
: max_session_packets(0) {
}

void ReceiverSourceConfig::deduce_defaults() {
Expand Down
6 changes: 6 additions & 0 deletions src/internal_modules/roc_pipeline/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ struct ReceiverSessionConfig {
//! Packet payload type.
unsigned int payload_type;

//! Packet prebuffer length, nanoseconds.
core::nanoseconds_t prebuf_len;

//! FEC reader parameters.
fec::BlockReaderConfig fec_reader;

Expand Down Expand Up @@ -188,6 +191,9 @@ struct ReceiverSourceConfig {
//! Default parameters for a session.
ReceiverSessionConfig session_defaults;

//! Maximum number of packets per session.
size_t max_session_packets;

//! Initialize config.
ReceiverSourceConfig();

Expand Down
45 changes: 45 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
}

if (sess) {
enqueue_prebuf_packet_(packet);
// Session found, route packet to it.
return sess->route_packet(packet);
}
Expand All @@ -340,6 +341,48 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
return status::StatusNoRoute;
}

void ReceiverSessionGroup::enqueue_prebuf_packet_(const packet::PacketPtr& packet_ptr) {
prebuf_packets_.push_back(*packet_ptr.get());

core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);

while (prebuf_packets_.size() > 0) {
core::nanoseconds_t received = prebuf_packets_.front()->udp()->receive_timestamp;
if (now - received > source_config_.session_defaults.prebuf_len) {
prebuf_packets_.remove(*prebuf_packets_.front());
} else {
break;
}
}
}

void ReceiverSessionGroup::dequeue_prebuf_packets_(ReceiverSession& sess) {
packet::PacketPtr curr, next;

if (prebuf_packets_.size() == 0) {
return;
}

core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);

for (curr = prebuf_packets_.front(); curr; curr = next) {
next = prebuf_packets_.nextof(*curr);

// if packet is too old, remove it from the queue
core::nanoseconds_t received = curr->udp()->receive_timestamp;
if (now - received > source_config_.session_defaults.prebuf_len) {
prebuf_packets_.remove(*curr);
continue;
}

// if session handles the packet, remove it from the queue
const status::StatusCode code = sess.route_packet(curr);
if (code == status::StatusOK) {
prebuf_packets_.remove(*curr);
}
}
}

status::StatusCode
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
Expand Down Expand Up @@ -431,6 +474,8 @@ ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) {
sessions_.push_back(*sess);
state_tracker_.register_session();

dequeue_prebuf_packets_(*sess);

return status::StatusOK;
}

Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
void enqueue_prebuf_packet_(const packet::PacketPtr& packet);
void dequeue_prebuf_packets_(ReceiverSession& sess);

bool can_create_session_(const packet::PacketPtr& packet);

Expand Down Expand Up @@ -159,6 +161,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
sessions_;
ReceiverSessionRouter session_router_;

core::List<packet::Packet> prebuf_packets_;

status::StatusCode init_status_;
};

Expand Down
8 changes: 8 additions & 0 deletions src/public_api/include/roc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,14 @@ typedef struct roc_receiver_config {
* If zero, default value is used. If negative, the check is disabled.
*/
long long choppy_playback_timeout;

/** Packet prebuffer length, in nanoseconds.
* Packets received for sessions that have not yet been created
* will be buffered. Any packets older than the prebuf_len
* will be discarded.
* If zero, default value is used.
*/
unsigned long long prebuf_len;
} roc_receiver_config;

/** Interface configuration.
Expand Down
55 changes: 55 additions & 0 deletions src/tests/roc_pipeline/test_receiver_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2181,6 +2181,61 @@ TEST(receiver_source, timestamp_mapping_remixing) {
CHECK(first_ts);
}

TEST(receiver_source, packet_buffer) {
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxPackets = 10 };

init(Rate, Chans, Rate, Chans);

ReceiverSourceConfig config = make_default_config();
config.session_defaults.prebuf_len = 0;
ReceiverSource receiver(config, encoding_map, packet_pool, packet_buffer_pool,
frame_pool, frame_buffer_pool, arena);
LONGS_EQUAL(status::StatusOK, receiver.init_status());

ReceiverSlot* slot = create_slot(receiver);
CHECK(slot);

packet::FifoQueue queue;
packet::FifoQueue source_queue;
packet::FifoQueue repair_queue;

packet::IWriter* source_endpoint_writer = create_transport_endpoint(
slot, address::Iface_AudioSource, address::Proto_RTP_RS8M_Source, dst_addr1);

packet::IWriter* repair_endpoint_writer = create_transport_endpoint(
slot, address::Iface_AudioRepair, address::Proto_RS8M_Repair, dst_addr2);

fec::BlockWriterConfig fec_config;

test::PacketWriter packet_writer(
arena, *source_endpoint_writer, *repair_endpoint_writer, encoding_map,
packet_factory, src_id1, src_addr1, dst_addr1, dst_addr2, PayloadType_Ch2,
packet::FEC_ReedSolomon_M8, fec_config);

// setup reader
test::FrameReader frame_reader(receiver, frame_factory);

packet_writer.write_packets(fec_config.n_source_packets, SamplesPerPacket,
output_sample_spec);

for (int i = 0; i < ManyPackets; ++i) {
packet::PacketPtr pp;
LONGS_EQUAL(status::StatusOK, queue.read(pp, packet::ModeFetch));
CHECK(pp);

if (pp->flags() & packet::Packet::FlagAudio) {
UNSIGNED_LONGS_EQUAL(status::StatusOK, source_queue.write(pp));
}
if (pp->flags() & packet::Packet::FlagRepair) {
UNSIGNED_LONGS_EQUAL(status::StatusOK, repair_queue.write(pp));
}
}

LONGS_EQUAL(status::StatusOK, receiver.refresh(frame_reader.refresh_ts(), NULL));
frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec);
UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions());
}

// Check receiver metrics for multiple remote participants (senders).
TEST(receiver_source, metrics_participants) {
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxParties = 10 };
Expand Down
3 changes: 3 additions & 0 deletions src/tools/roc_recv/cmdline.ggo
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ section "Options"
option "no-play-timeout" - "No playback timeout, TIME units"
string optional

option "prebuf-len" - "Length of packet prebuffer, TIME units"
string optional

option "choppy-play-timeout" - "Choppy playback timeout, TIME units"
string optional

Expand Down
13 changes: 13 additions & 0 deletions src/tools/roc_recv/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,19 @@ int main(int argc, char** argv) {
}
}

if (args.prebuf_len_given) {
core::nanoseconds_t prebuf_len = 0;
if (!core::parse_duration(args.prebuf_len_arg, prebuf_len)) {
roc_log(LogError, "invalid --prebuf-len");
return 1;
}
receiver_config.session_defaults.prebuf_len =
(core::nanoseconds_t)args.prebuf_len_arg;
} else {
receiver_config.session_defaults.prebuf_len =
receiver_config.session_defaults.latency.target_latency;
}

if (args.choppy_play_timeout_given) {
if (!core::parse_duration(
args.choppy_play_timeout_arg,
Expand Down

0 comments on commit 9601cee

Please sign in to comment.