Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Jul 22, 2024
1 parent eb6e2b0 commit dff78b2
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/mavlink/mavlink_camera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl MavlinkCameraInner {

let mavlink_stream_type = match video_stream_uri.scheme() {
"rtsp" => mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTSP,
"udp" => mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTPUDP,
"udp" | "udp265" => mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTPUDP,
unsupported => {
return Err(anyhow!(
"Scheme {unsupported:#?} is not supported for a Mavlink Camera."
Expand Down
5 changes: 2 additions & 3 deletions src/server/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use actix_extensible_rate_limit::{
RateLimiter,
};
use actix_service::Service;
use actix_web::{error::JsonPayloadError, http::header, App, HttpRequest, HttpServer};
use actix_web::{error::JsonPayloadError, App, HttpRequest, HttpServer};
use paperclip::{
actix::{web, OpenApiExt},
v2::models::{Api, Info},
Expand All @@ -29,8 +29,7 @@ pub async fn run(server_address: &str) -> Result<(), std::io::Error> {
// Add debug call for API access
.wrap_fn(|req, srv| {
trace!("{:#?}", &req);
let fut = srv.call(req);
async { fut.await }
srv.call(req)
})
.wrap(
Cors::default()
Expand Down
4 changes: 2 additions & 2 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ fn validate_endpoints(video_and_stream_information: &VideoAndStreamInformation)
VideoSourceType::Redirect(_)
) {
match scheme {
"udp" | "rtsp" => return None,
"udp" | "udp265" | "rtsp" => return None,
_ => return Some(anyhow!(
"The URL's scheme for REDIRECT endpoints should be \"udp\" or \"rtsp\", but was: {scheme:?}",
"The URL's scheme for REDIRECT endpoints should be \"udp\" or \"udp265\" or \"rtsp\", but was: {scheme:?}",
))
};
}
Expand Down
23 changes: 23 additions & 0 deletions src/stream/pipeline/fake_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ impl FakePipeline {
rtp_tee_name = rtp_tee_name,
)
}
VideoEncodeType::H265 => {
format!(concat!(
"videotestsrc pattern={pattern} is-live=true do-timestamp=true",
" ! timeoverlay",
" ! video/x-raw,format=I420",
" ! x265enc tune=zerolatency speed-preset=ultrafast bitrate=5000",
" ! h265parse",
" ! capsfilter name={filter_name} caps=video/x-h265,profile={profile},stream-format=byte-stream,alignment=au,width={width},height={height},framerate={interval_denominator}/{interval_numerator}",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtph265pay aggregate-mode=zero-latency config-interval=10 pt=96",
" ! tee name={rtp_tee_name} allow-not-linked=true"
),
pattern = pattern,
profile = "main",
width = configuration.width,
height = configuration.height,
interval_denominator = configuration.frame_interval.denominator,
interval_numerator = configuration.frame_interval.numerator,
filter_name = filter_name,
video_tee_name = video_tee_name,
rtp_tee_name = rtp_tee_name,
)
}
VideoEncodeType::Yuyv => {
format!(
concat!(
Expand Down
2 changes: 1 addition & 1 deletion src/stream/pipeline/redirect_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl RedirectPipeline {
sink_tee_name = sink_tee_name,
)
}
"udp" => {
"udp" | "udp265" => {
format!(
concat!(
"udpsrc address={address} port={port} close-socket=false auto-multicast=true",
Expand Down
21 changes: 21 additions & 0 deletions src/stream/pipeline/v4l_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl V4lPipeline {
"v4l2src device={device} do-timestamp=true",
" ! h264parse", // Here we need the parse to help the stream-format and alignment part, which is being fixed here because avc/au seems to reduce the CPU usage in the RTP payloading part.
" ! capsfilter name={filter_name} caps=video/x-h264,stream-format=avc,alignment=au,width={width},height={height},framerate={interval_denominator}/{interval_numerator}",
// " ! capsfilter name={filter_name} caps=video/x-h264,stream-format=byte-stream,alignment=au,width={width},height={height},framerate={interval_denominator}/{interval_numerator}",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtph264pay aggregate-mode=zero-latency config-interval=10 pt=96",
" ! tee name={rtp_tee_name} allow-not-linked=true"
Expand All @@ -73,6 +74,26 @@ impl V4lPipeline {
rtp_tee_name = rtp_tee_name,
)
}
VideoEncodeType::H265 => {
format!(
concat!(
"v4l2src device={device} do-timestamp=false",
" ! h265parse", // Here we need the parse to help the stream-format and alignment part, which is being fixed here because avc/au seems to reduce the CPU usage in the RTP payloading part.
" ! capsfilter name={filter_name} caps=video/x-h265,stream-format=byte-stream,alignment=au,width={width},height={height},framerate={interval_denominator}/{interval_numerator}",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtph265pay aggregate-mode=zero-latency config-interval=10 pt=96",
" ! tee name={rtp_tee_name} allow-not-linked=true"
),
device = device,
width = width,
height = height,
interval_denominator = interval_denominator,
interval_numerator = interval_numerator,
filter_name = filter_name,
video_tee_name = video_tee_name,
rtp_tee_name = rtp_tee_name,
)
}
VideoEncodeType::Yuyv => {
format!(
concat!(
Expand Down
13 changes: 13 additions & 0 deletions src/stream/rtsp/rtsp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,19 @@ impl RTSPServer {
rtp_caps = rtp_caps,
)
}
"H265" => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true",
" ! queue leaky=downstream flush-on-eos=true max-size-buffers=0",
" ! capsfilter caps={rtp_caps:?}",
" ! rtph265depay",
" ! rtph265pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96",
),
socket_path = socket_path,
rtp_caps = rtp_caps,
)
}
"RAW" => {
format!(
concat!(
Expand Down
18 changes: 18 additions & 0 deletions src/stream/sink/image_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,24 @@ impl ImageSink {
_transcoding_elements.push(filter);
_transcoding_elements.push(decoder);
}
VideoEncodeType::H265 => {
let depayloader = gst::ElementFactory::make("rtph265depay").build()?;
let parser = gst::ElementFactory::make("h265parse").build()?;
// For h265, we need to filter-out unwanted non-key frames here, before decoding it.
let filter = gst::ElementFactory::make("identity")
.property("drop-buffer-flags", gst::BufferFlags::DELTA_UNIT)
.property("sync", false)
.build()?;
let decoder = gst::ElementFactory::make("avdec_h265")
.property("discard-corrupted-frames", true)
.property_from_str("std-compliance", "normal")
.property_from_str("lowres", "2") // (0) is 'full'; (1) is '1/2-size'; (2) is '1/4-size'
.build()?;
_transcoding_elements.push(depayloader);
_transcoding_elements.push(parser);
_transcoding_elements.push(filter);
_transcoding_elements.push(decoder);
}
VideoEncodeType::Mjpg => {
let decoder = gst::ElementFactory::make("jpegdec").build()?;
decoder.has_property("discard-corrupted-frames", None).then(|| decoder.set_property("discard-corrupted-frames", true));
Expand Down
7 changes: 4 additions & 3 deletions src/stream/sink/udp_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl UdpSink {
let clients = addresses
.iter()
.filter_map(|address| {
if address.scheme() != "udp" {
if !matches!(address.scheme(), "udp" | "udp265") {
return None;
}
if let (Some(host), Some(port)) = (address.host(), address.port()) {
Expand All @@ -334,8 +334,9 @@ impl UdpSink {
.collect::<Vec<String>>()
.join(",");
let description = format!("multiudpsink sync=false clients={clients}");
let _udpsink =
gst::parse::launch(&description).context("Failed parsing pipeline description")?;
let _udpsink = gst::parse::launch(&description).context(format!(
"Failed parsing pipeline description: {description:?}"
))?;

let udpsink_sink_pad = _udpsink
.static_pad("sink")
Expand Down
43 changes: 37 additions & 6 deletions src/stream/webrtc/frontend/src/session.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Stream } from "@/signalling_protocol";
import type { Stream } from "@/signalling_protocol"

import type { Signaller } from "@/signaller";
import type { Signaller } from "@/signaller"

type on_close_callback = (session_id: string, reason: string) => void;

Expand Down Expand Up @@ -117,6 +117,7 @@ export class Session {
}

public onIncomingSDP(description: RTCSessionDescription): void {
console.log("onIncomingSDP: ", description);
this.peer_connection
.setRemoteDescription(description)
.then(() => {
Expand All @@ -133,6 +134,7 @@ export class Session {
}

private onRemoteDescriptionSet(): void {
console.log("onRemoteDescriptionSet: ");
this.peer_connection
.createAnswer()
.then((description: RTCSessionDescriptionInit) => {
Expand All @@ -151,7 +153,7 @@ export class Session {
.setLocalDescription(description)
.then(() => {
console.debug(
`Local description set as${JSON.stringify(description, null, 4)}`
`Local description set as ${JSON.stringify(description, null, 4)}`
);
this.onLocalDescriptionSet();
})
Expand All @@ -161,6 +163,10 @@ export class Session {
}

private onLocalDescriptionSet(): void {
console.log(
"onLocalDescriptionSet:",
this.peer_connection.localDescription
);
if (this.peer_connection.localDescription === null) {
return;
}
Expand Down Expand Up @@ -208,6 +214,7 @@ export class Session {
}

private onIceCandidate(event: RTCPeerConnectionIceEventInit): void {
console.log("onIceCandidate", event);
if (!event.candidate) {
// TODO: Add support for empty candidate, meaning ICE Gathering Completed.
return;
Expand All @@ -222,6 +229,8 @@ export class Session {
}

private onTrackAdded(event: RTCTrackEvent): void {
console.log("onTrackAdded", event);

let id: number | undefined = undefined;
id = window.setInterval(() => {
if (this.signaller.ws.readyState !== this.signaller.ws.OPEN) {
Expand Down Expand Up @@ -270,15 +279,27 @@ export class Session {

const [remoteStream] = event.streams;
this.media_element.srcObject = remoteStream;
this.media_element.play();

this.updateStatus("Playing");
const playPromise = this.media_element.play();

if (playPromise !== undefined) {
playPromise
.then((_) => {
this.updateStatus("Playing");
})
.catch((error) => {
console.error("Failed while trying to play:", error);
});
}

clearInterval(id);
}, 1000);
}

private onIceConnectionStateChange(): void {
console.log(
"onIceConnectionStateChange",
this.peer_connection.iceConnectionState
);
switch (this.peer_connection.iceConnectionState) {
case "closed":
case "failed":
Expand All @@ -289,6 +310,10 @@ export class Session {
}

private onConnectionStateChange(): void {
console.log(
"onConnectionStateChange",
this.peer_connection.connectionState
);
switch (this.peer_connection.connectionState) {
case "closed":
case "failed":
Expand All @@ -299,6 +324,7 @@ export class Session {
}

private onSignalingStateChange(): void {
console.log("onSignalingStateChange", this.peer_connection.signalingState);
switch (this.peer_connection.signalingState) {
case "closed":
this.end();
Expand All @@ -307,6 +333,10 @@ export class Session {
}

private onIceGatheringStateChange(): void {
console.log(
"onIceGatheringStateChange",
this.peer_connection.iceGatheringState
);
switch (this.peer_connection.iceGatheringState) {
case "complete":
console.debug(`ICE gathering completed for session ${this.id}`);
Expand All @@ -315,6 +345,7 @@ export class Session {
}

public end() {
console.log("end called. peer_connection: ", this.peer_connection);
this.peer_connection.close();

this.peer_connection.removeEventListener(
Expand Down
6 changes: 3 additions & 3 deletions src/stream/webrtc/signalling_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ impl SignallingServer {
let (height, width, encode, interval) =
match &stream.video_and_stream.stream_information.configuration {
crate::stream::types::CaptureConfiguration::Video(configuration) => {
// Filter out non-H264 local streams
if configuration.encode != crate::video::types::VideoEncodeType::H264 {
trace!("Stream {:?} will not be listed in available streams because it's encoding isn't H264 (it's {:?} instead)", stream.video_and_stream.name, configuration.encode);
// Filter out non-H264/h265 local streams
if !matches!(configuration.encode, crate::video::types::VideoEncodeType::H264 | crate::video::types::VideoEncodeType::H265) {
trace!("Stream {:?} will not be listed in available streams because it's encoding isn't H264 or H265 (it's {:?} instead)", stream.video_and_stream.name, configuration.encode);
return None;
}
(
Expand Down
5 changes: 5 additions & 0 deletions src/video/video_source_gst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl VideoSource for VideoSourceGst {
.collect();

let sizes: Vec<Size> = [
(160, 120),
(320, 240),
(640, 480),
(720, 480),
Expand All @@ -69,6 +70,10 @@ impl VideoSource for VideoSourceGst {
encode: VideoEncodeType::H264,
sizes: sizes.clone(),
},
Format {
encode: VideoEncodeType::H265,
sizes: sizes.clone(),
},
Format {
encode: VideoEncodeType::Yuyv,
sizes: sizes.clone(),
Expand Down
3 changes: 0 additions & 3 deletions src/video/video_source_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ use tracing::*;

use lazy_static::lazy_static;

lazy_static! {
static ref H264_PROFILES: Arc<Mutex<HashMap<String, String>>> = Default::default();
}
lazy_static! {
static ref VIDEO_FORMATS: Arc<Mutex<HashMap<String, Vec<Format>>>> = Default::default();
}
Expand Down

0 comments on commit dff78b2

Please sign in to comment.