From 0c3b360d18a8110b10bc1814ae542332f6c98d0f Mon Sep 17 00:00:00 2001 From: deanlee Date: Thu, 8 Aug 2024 03:03:34 +0800 Subject: [PATCH] refactor handle_encoder_msg --- system/loggerd/loggerd.cc | 129 ++++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 61 deletions(-) diff --git a/system/loggerd/loggerd.cc b/system/loggerd/loggerd.cc index c9632c0dd60c7ce..7926ecf58e53061 100644 --- a/system/loggerd/loggerd.cc +++ b/system/loggerd/loggerd.cc @@ -57,28 +57,85 @@ struct RemoteEncoder { std::unique_ptr writer; int encoderd_segment_offset; int current_segment = -1; - std::vector q; + std::vector> q; int dropped_frames = 0; bool recording = false; bool marked_ready_to_rotate = false; bool seen_first_packet = false; }; -int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) { - int bytes_count = 0; +size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, struct RemoteEncoder &re, const EncoderInfo &encoder_info) { + auto edata = (event.*(encoder_info.get_encode_data_func))(); + auto idx = edata.getIdx(); + auto flags = idx.getFlags(); + // if we aren't recording yet, try to start, since we are in the correct segment + if (!re.recording) { + if (flags & V4L2_BUF_FLAG_KEYFRAME) { + // only create on iframe + if (re.dropped_frames) { + // this should only happen for the first segment, maybe + LOGW("%s: dropped %d non iframe packets before init", encoder_info.publish_name, re.dropped_frames); + re.dropped_frames = 0; + } + // if we aren't actually recording, don't create the writer + if (encoder_info.record) { + assert(encoder_info.filename != NULL); + re.writer.reset(new VideoWriter(s->logger.segmentPath().c_str(), + encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, + edata.getWidth(), edata.getHeight(), encoder_info.fps, idx.getType())); + // write the header + auto header = edata.getHeader(); + re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof() / 1000, true, false); + } + re.recording = true; + } else { + // this is a sad case when we aren't recording, but don't have an iframe + // nothing we can do but drop the frame + ++re.dropped_frames; + return 0; + } + } + + // we have to be recording if we are here + assert(re.recording); + + // if we are actually writing the video file, do so + if (re.writer) { + auto data = edata.getData(); + re.writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof() / 1000, false, flags & V4L2_BUF_FLAG_KEYFRAME); + } + + // put it in log stream as the idx packet + MessageBuilder bmsg; + auto evt = bmsg.initEvent(event.getValid()); + evt.setLogMonoTime(event.getLogMonoTime()); + (evt.*(encoder_info.set_encode_idx_func))(idx); + auto new_msg = bmsg.toBytes(); + s->logger.write((uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog? + return new_msg.size(); +} + +size_t write_encode_data(LoggerdState *s, Message *msg, struct RemoteEncoder &re, const EncoderInfo &encoder_info) { + capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word))); + auto event = cmsg.getRoot(); + return write_encode_data(s, event, re, encoder_info); +} + +int handle_encoder_msg(LoggerdState *s, Message *raw_msg, struct RemoteEncoder &re, const EncoderInfo &encoder_info) { + int bytes_count = 0; + std::unique_ptr msg(raw_msg); // extract the message capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word))); auto event = cmsg.getRoot(); auto edata = (event.*(encoder_info.get_encode_data_func))(); auto idx = edata.getIdx(); - auto flags = idx.getFlags(); // encoderd can have started long before loggerd if (!re.seen_first_packet) { re.seen_first_packet = true; re.encoderd_segment_offset = idx.getSegmentNum(); - LOGD("%s: has encoderd offset %d", name.c_str(), re.encoderd_segment_offset); + LOGD("%s: has encoderd offset %d", encoder_info.publish_name, re.encoderd_segment_offset); } int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset; @@ -96,61 +153,12 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct // we are in this segment now, process any queued messages before this one if (!re.q.empty()) { for (auto &qmsg : re.q) { - bytes_count += handle_encoder_msg(s, qmsg, name, re, encoder_info); + bytes_count += write_encode_data(s, qmsg.get(), re, encoder_info); } re.q.clear(); } } - - // if we aren't recording yet, try to start, since we are in the correct segment - if (!re.recording) { - if (flags & V4L2_BUF_FLAG_KEYFRAME) { - // only create on iframe - if (re.dropped_frames) { - // this should only happen for the first segment, maybe - LOGW("%s: dropped %d non iframe packets before init", name.c_str(), re.dropped_frames); - re.dropped_frames = 0; - } - // if we aren't actually recording, don't create the writer - if (encoder_info.record) { - assert(encoder_info.filename != NULL); - re.writer.reset(new VideoWriter(s->logger.segmentPath().c_str(), - encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, - edata.getWidth(), edata.getHeight(), encoder_info.fps, idx.getType())); - // write the header - auto header = edata.getHeader(); - re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof()/1000, true, false); - } - re.recording = true; - } else { - // this is a sad case when we aren't recording, but don't have an iframe - // nothing we can do but drop the frame - delete msg; - ++re.dropped_frames; - return bytes_count; - } - } - - // we have to be recording if we are here - assert(re.recording); - - // if we are actually writing the video file, do so - if (re.writer) { - auto data = edata.getData(); - re.writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof()/1000, false, flags & V4L2_BUF_FLAG_KEYFRAME); - } - - // put it in log stream as the idx packet - MessageBuilder bmsg; - auto evt = bmsg.initEvent(event.getValid()); - evt.setLogMonoTime(event.getLogMonoTime()); - (evt.*(encoder_info.set_encode_idx_func))(idx); - auto new_msg = bmsg.toBytes(); - s->logger.write((uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog? - bytes_count += new_msg.size(); - - // free the message, we used it - delete msg; + bytes_count += write_encode_data(s, event, re, encoder_info); } else if (offset_segment_num > s->logger.segment()) { // encoderd packet has a newer segment, this means encoderd has rolled over if (!re.marked_ready_to_rotate) { @@ -158,17 +166,16 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct ++s->ready_to_rotate; LOGD("rotate %d -> %d ready %d/%d for %s", s->logger.segment(), offset_segment_num, - s->ready_to_rotate.load(), s->max_waiting, name.c_str()); + s->ready_to_rotate.load(), s->max_waiting, encoder_info.publish_name); } // queue up all the new segment messages, they go in after the rotate - re.q.push_back(msg); + re.q.emplace_back(std::move(msg)); } else { LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d", - name.c_str(), idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset); + encoder_info.publish_name, idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset); // free the message, it's useless. this should never happen // actually, this can happen if you restart encoderd re.encoderd_segment_offset = -s->logger.segment(); - delete msg; } return bytes_count; @@ -261,7 +268,7 @@ void loggerd_thread() { const bool in_qlog = service.freq != -1 && (service.counter++ % service.freq == 0); if (service.encoder) { s.last_camera_seen_tms = millis_since_boot(); - bytes_count += handle_encoder_msg(&s, msg, service.name, remote_encoders[sock], encoder_infos_dict[service.name]); + bytes_count += handle_encoder_msg(&s, msg, remote_encoders[sock], encoder_infos_dict[service.name]); } else { s.logger.write((uint8_t *)msg->getData(), msg->getSize(), in_qlog); bytes_count += msg->getSize();