Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tcp endpoint #82

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/stream/gst/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,15 @@ impl Pipeline {
)))
}
};
Ok(pipeline_payload.to_string())

let mut pipeline_payload = pipeline_payload.to_string();

// We need to add RTP Stream Payloader when using TCP endpoints (https://datatracker.ietf.org/doc/html/rfc4571)
if video_and_stream_information.stream_information.endpoints[0].scheme() == "tcp" {
pipeline_payload = format!("{pipeline_payload} ! rtpstreampay");
}

Ok(pipeline_payload)
}

fn build_pipeline_sink(
Expand Down Expand Up @@ -220,6 +228,11 @@ impl Pipeline {
.join(",");
format!(" ! multiudpsink clients={clients}")
}
"tcp" => {
let host = endpoints[0].host().unwrap();
let port = endpoints[0].port().unwrap();
format!(" ! tcpserversink host={host} port={port}")
}
_ => "".to_string(),
};
Ok(pipeline_sink)
Expand Down
71 changes: 42 additions & 29 deletions src/stream/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::{Arc, Mutex};
struct Stream {
stream_type: StreamType,
video_and_stream_information: VideoAndStreamInformation,
mavlink_camera: MavlinkCameraHandle,
mavlink_camera: Option<MavlinkCameraHandle>,
}

#[derive(Default)]
Expand All @@ -36,6 +36,9 @@ pub fn start() {
StreamType::UDP(stream) => {
stream.start();
}
StreamType::TCP(stream) => {
stream.start();
}
StreamType::RTSP(stream) => {
stream.start();
}
Expand Down Expand Up @@ -75,43 +78,34 @@ pub fn add_stream_and_start(
}
}

let endpoint = video_and_stream_information
.stream_information
.endpoints
.first()
.unwrap() // We have an endpoint since we have passed the point of stream creation
.clone();

let mut stream = stream_backend::new(&video_and_stream_information)?;
let mavtype: mavlink::common::VideoStreamType = match &stream {
StreamType::UDP(_) => mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTPUDP,
StreamType::RTSP(_) => mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTSP,
StreamType::REDIRECT(video_strem_redirect) => match video_strem_redirect.scheme.as_str() {
"rtsp" => mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTSP,
"mpegts" => mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_MPEG_TS_H264,
"tcp" => mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_TCP_MPEG,
"udp" | _ => mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTPUDP,
},
// TODO: update WEBRTC arm with the correct type once mavlink starts to support it.
// Note: For now this is fine because most of the clients doesn't seems to be using mavtype to determine the stream type,
// instead, they're parsing the URI's scheme itself, so as long as we pass a known scheme, it should be enough.
StreamType::WEBRTC(_) => mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTSP,
};

stream.mut_inner().start();
manager.streams.push(Stream {
stream_type: stream,
video_and_stream_information: video_and_stream_information.clone(),
mavlink_camera: MavlinkCameraHandle::new(
let mavlink_camera = get_stream_mavtype(&stream).map(|mavlink_stream_type| {
let endpoint = video_and_stream_information
.stream_information
.endpoints
.first()
.unwrap() // We have an endpoint since we have passed the point of stream creation
.clone();

MavlinkCameraHandle::new(
video_and_stream_information.video_source.clone(),
endpoint,
mavtype,
mavlink_stream_type,
video_and_stream_information
.stream_information
.extended_configuration
.clone()
.unwrap_or_default()
.thermal,
),
)
});

stream.mut_inner().start();
manager.streams.push(Stream {
stream_type: stream,
video_and_stream_information: video_and_stream_information,
mavlink_camera,
});

//TODO: Create function to update settings
Expand Down Expand Up @@ -145,6 +139,25 @@ pub fn remove_stream(stream_name: &str) -> Result<(), SimpleError> {
}
}

fn get_stream_mavtype(stream_type: &StreamType) -> Option<mavlink::common::VideoStreamType> {
match stream_type {
StreamType::UDP(_) => Some(mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTPUDP),
StreamType::TCP(_) => None,
StreamType::RTSP(_) => Some(mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTSP),
StreamType::REDIRECT(video_strem_redirect) => match video_strem_redirect.scheme.as_str() {
"rtsp" => Some(mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTSP),
"mpegts" => Some(mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_MPEG_TS_H264),
"tcpmpeg" => Some(mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_TCP_MPEG),
"udp" => Some(mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTPUDP),
_ => None,
},
// TODO: update WEBRTC arm with the correct type once mavlink starts to support it.
// Note: For now this is fine because most of the clients doesn't seems to be using mavtype to determine the stream type,
// instead, they're parsing the URI's scheme itself, so as long as we pass a known scheme, it should be enough.
StreamType::WEBRTC(_) => None,
}
}

//TODO: rework to use UML definition
// Add a new pipeline string to run
/*
Expand Down
1 change: 1 addition & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod stream_backend;
pub mod types;
pub mod video_stream_redirect;
pub mod video_stream_rtsp;
pub mod video_stream_tcp;
pub mod video_stream_udp;
pub mod video_stream_webrtc;
pub mod webrtc;
162 changes: 79 additions & 83 deletions src/stream/stream_backend.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::types::*;
use super::video_stream_redirect::VideoStreamRedirect;
use super::video_stream_rtsp::VideoStreamRtsp;
use super::video_stream_tcp::VideoStreamTcp;
use super::video_stream_udp::VideoStreamUdp;
use super::video_stream_webrtc::VideoStreamWebRTC;
use super::webrtc::utils::{is_webrtcsink_available, webrtc_usage_hint};
Expand All @@ -21,8 +22,8 @@ pub fn new(
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<StreamType, SimpleError> {
check_endpoints(video_and_stream_information)?;
check_encode(video_and_stream_information)?;
check_scheme(video_and_stream_information)?;
check_encode_support(video_and_stream_information)?;
check_scheme_and_encoding_compatibility(video_and_stream_information)?;
return create_stream(video_and_stream_information);
}

Expand Down Expand Up @@ -53,7 +54,7 @@ fn check_endpoints(
return Ok(());
}

fn check_encode(
fn check_encode_support(
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<(), SimpleError> {
let encode = match &video_and_stream_information
Expand Down Expand Up @@ -81,7 +82,7 @@ fn check_encode(
return Ok(());
}

fn check_scheme(
fn check_scheme_and_encoding_compatibility(
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<(), SimpleError> {
let endpoints = &video_and_stream_information.stream_information.endpoints;
Expand All @@ -96,60 +97,23 @@ fn check_scheme(

if let VideoSourceType::Redirect(_) = video_and_stream_information.video_source {
match scheme {
"udp" | "udp265"| "rtsp" | "mpegts" | "tcp" => scheme.to_string(),
"udp" | "udp265"| "rtsp" | "mpegts" | "tcpmpeg" => scheme.to_string(),
"tcp" => return Err(SimpleError::new(format!("Endpoints with the \"tcp\" scheme are not supported by Mavlink, REDIRECT is meant to advertise an already existing stream using Mavlink protocol, but Mavlink protocol doesn't specify any TCP with RTP. If you meant to use TPC with MPEG, you should use the perhaps you meant \"tcpmpeg\" scheme. Encode: {encode:?}, Endpoints: {endpoints:#?}"))),
_ => return Err(SimpleError::new(format!(
"The URL's scheme for REDIRECT endpoints should be \"udp\", \"udp265\", \"rtsp\", \"mpegts\" or \"tcp\", but was: {:?}",
scheme
"The URL's scheme for REDIRECT endpoints should be \"udp\", \"udp265\", \"rtsp\", \"mpegts\" \"tcpmpeg\", but was: {scheme:?}",
)))
};
} else {
match scheme {
"rtsp" => {
if endpoints.len() > 1 {
return Err(SimpleError::new(format!(
"Multiple RTSP endpoints are not acceptable: {:#?}",
endpoints
)));
}
}
"udp" => {
if VideoEncodeType::H265 == encode {
return Err(SimpleError::new("Endpoint with udp scheme only supports H264, encode type is H265, the scheme should be udp265.".to_string()));
}

//UDP endpoints should contain both host and port
let no_host_or_port = endpoints
.iter()
.any(|endpoint| endpoint.host().is_none() || endpoint.port().is_none());

if no_host_or_port {
return Err(SimpleError::new(format!(
"Endpoint with udp scheme should contain host and port. Endpoints: {:#?}",
endpoints
)));
}
}
"udp" | "tcp" | "rtsp" | "webrtc" | "stun" | "turn" | "ws" => (), // No encoding restrictions for these schemes.
"udp265" => {
if VideoEncodeType::H265 != encode {
return Err(SimpleError::new(format!("Endpoint with udp265 scheme only supports H265 encode. Encode: {:?}, Endpoints: {:#?}", encode, endpoints)));
}
}
"webrtc" | "stun" | "turn" | "ws" => {
let incomplete_endpoint = endpoints.iter().any(|endpoint| {
(endpoint.scheme() != "webrtc")
&& (endpoint.host().is_none() || endpoint.port().is_none())
});

if incomplete_endpoint {
return Err(SimpleError::new(format!(
"Endpoint with 'stun://', 'turn://' and 'ws://' schemes should have a host and port, like \"stun://0.0.0.0:3478\". Endpoints: {endpoints:#?}",
)));
return Err(SimpleError::new(format!("Endpoint with \"udp265\" scheme only supports H265 encode. Encode: {encode:?}, Endpoints: {endpoints:#?}")));
}
}
_ => {
"mpegts" | "tcpmpeg" | _ => {
return Err(SimpleError::new(format!(
"Scheme is not accepted as stream endpoint: {}",
scheme
"Scheme is not accepted as stream endpoint: {scheme}",
)));
}
}
Expand All @@ -158,9 +122,37 @@ fn check_scheme(
return Ok(());
}

fn check_for_multiple_endpoints(endpoints: &Vec<url::Url>) -> Result<(), SimpleError> {
if endpoints.len() > 1 {
let scheme = endpoints[0].scheme().to_uppercase();
return Err(SimpleError::new(format!(
"Multiple {scheme} endpoints are not acceptable: {endpoints:#?}",
)));
}
Ok(())
}

fn check_for_host_and_port(endpoints: &Vec<url::Url>) -> Result<(), SimpleError> {
let no_host_or_port = endpoints
.iter()
.any(|endpoint| endpoint.host().is_none() || endpoint.port().is_none());

if no_host_or_port {
let scheme = endpoints[0].scheme().to_uppercase();
return Err(SimpleError::new(format!(
"Endpoint with {scheme} scheme should contain host and port. Endpoints: {endpoints:#?}",
)));
}
Ok(())
}

fn create_udp_stream(
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<StreamType, SimpleError> {
let endpoints = &video_and_stream_information.stream_information.endpoints;

check_for_host_and_port(endpoints)?;

Ok(StreamType::UDP(VideoStreamUdp::new(
video_and_stream_information,
)?))
Expand All @@ -170,6 +162,9 @@ fn create_rtsp_stream(
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<StreamType, SimpleError> {
let endpoints = &video_and_stream_information.stream_information.endpoints;

check_for_multiple_endpoints(endpoints)?;

let endpoint = &endpoints[0];
if endpoint.scheme() != "rtsp" {
return Err(SimpleError::new(format!(
Expand Down Expand Up @@ -202,6 +197,19 @@ fn create_rtsp_stream(
)?))
}

fn create_tcp_stream(
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<StreamType, SimpleError> {
let endpoints = &video_and_stream_information.stream_information.endpoints;

check_for_host_and_port(endpoints)?;
check_for_multiple_endpoints(endpoints)?;

Ok(StreamType::TCP(VideoStreamTcp::new(
video_and_stream_information,
)?))
}

fn create_redirect_stream(
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<StreamType, SimpleError> {
Expand All @@ -226,46 +234,33 @@ fn create_webrtc_turn_stream(

let endpoints = &video_and_stream_information.stream_information.endpoints;

if endpoints
.iter()
.filter(|endpoint| endpoint.scheme() == "webrtc")
.count()
> 1
{
return Err(SimpleError::new(format!(
"More than one 'webrtc://' scheme was passed. {usage_hint}. The endpoints passed were: {endpoints:#?}",
)));
}
if endpoints
.iter()
.filter(|endpoint| endpoint.scheme() == "stun")
.count()
> 1
{
return Err(SimpleError::new(format!(
"More than one 'stun://' scheme was passed. {usage_hint}. The endpoints passed were: {endpoints:#?}",
)));
}
if endpoints
.iter()
.filter(|endpoint| endpoint.scheme() == "turn")
.count()
> 1
if check_for_host_and_port(
&endpoints
.clone()
.into_iter()
.filter(|endpoint| endpoint.scheme() != "webrtc")
.collect(),
)
.is_err()
{
return Err(SimpleError::new(format!(
"More than one 'turn://' scheme was passed. {usage_hint}. The endpoints passed were: {endpoints:#?}",
"Endpoint with 'stun://', 'turn://' and 'ws://' schemes should have a host and port. {usage_hint}. The endpoints passed were: {endpoints:#?}",
)));
}
if endpoints
.iter()
.filter(|endpoint| endpoint.scheme() == "ws")
.count()
> 1
{
return Err(SimpleError::new(format!(
"More than one 'ws://' scheme was passed. {usage_hint}. The endpoints passed were: {endpoints:#?}",
)));

for scheme in vec!["webrtc", "stun", "turn", "ws"] {
if endpoints
.iter()
.filter(|endpoint| endpoint.scheme() == scheme)
.count()
> 1
{
return Err(SimpleError::new(format!(
"More than one 'webrtc://' {scheme} was passed. {usage_hint}. The endpoints passed were: {endpoints:#?}",
)));
}
}

if endpoints
.iter()
.any(|endpoint| endpoint.scheme() == "webrtc")
Expand Down Expand Up @@ -296,6 +291,7 @@ fn create_stream(
.unwrap();
match endpoint.scheme() {
"udp" => create_udp_stream(video_and_stream_information),
"tcp" => create_tcp_stream(video_and_stream_information),
"rtsp" => create_rtsp_stream(video_and_stream_information),
"webrtc" | "stun" | "turn" | "ws" => {
create_webrtc_turn_stream(video_and_stream_information)
Expand Down
Loading