Skip to content

Commit

Permalink
feat: implement basic json python function
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Mar 12, 2024
1 parent c25024f commit d214b48
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ gst-plugin-version-helper = { version = "0.8" }
gst-video = { package = "gstreamer-video", version = "0.22" }
num-traits = { version = "0.2" }
once-cell = { package = "once_cell", version = "1.19" }
schemars = { version = "0.8" }
serde-json = { package = "serde_json", version = "1.0" }
tokio = { version = "1" }

Expand Down
1 change: 1 addition & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ dash-pipe-provider = { workspace = true }
gst = { workspace = true }
gst-video = { workspace = true }
once-cell = { workspace = true }
schemars = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pub mod args;
pub mod element;
pub mod net;
pub mod plugin;
pub mod sync;
pub mod value;
35 changes: 16 additions & 19 deletions common/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ use gst::{
info, Buffer, BufferRef, CoreError, ErrorMessage, FlowError, FlowSuccess,
};
use gst_video::gst_base::subclass::base_src::CreateSuccess;
use schemars::JsonSchema;
use tokio::{
join,
sync::{MappedMutexGuard, Mutex, MutexGuard, RwLock, RwLockReadGuard},
sync::{MappedMutexGuard, Mutex, RwLock, RwLockReadGuard},
};

use crate::plugin::{base::ArkSubclass, PluginImpl};
use crate::{
plugin::{base::ArkSubclass, PluginImpl},
sync,
};

pub trait ChannelArgs
where
Expand Down Expand Up @@ -146,6 +150,7 @@ where
}

async fn send_buffer(&self, key: String, buffer: &Buffer) -> Result<FlowSuccess, FlowError> {
// TODO: handle other media types (audio, JSON, plain, ...)
// TODO: support non-image(video) data using sink Caps and cache it
// build a payload
let key_ref = format!("@data:image,{key}");
Expand All @@ -155,6 +160,7 @@ where
);

// build a message
// TODO: handle other media types (audio, JSON, plain, ...)
// TODO: to be implemented
let value = Image::default();
let message = PipeMessage::with_payloads(vec![payload], value);
Expand Down Expand Up @@ -202,15 +208,9 @@ impl Channel {
&self,
imp: &(impl ?Sized + ChannelSubclassExt + PluginImpl),
) -> Result<Option<MappedMutexGuard<'_, self::recv::Queue>>> {
fn unwrap_lock(
lock: MutexGuard<'_, Option<self::recv::Queue>>,
) -> MappedMutexGuard<'_, self::recv::Queue> {
MutexGuard::map(lock, |lock| lock.as_mut().unwrap())
}

let mut lock = self.recv.lock().await;
match lock.as_mut() {
Some(_) => Ok(Some(unwrap_lock(lock))),
Some(_) => Ok(Some(sync::mutex::unwrap_lock(lock))),
None => {
let builder_lock = self.builder.read().await;
match builder_lock.as_ref() {
Expand All @@ -222,7 +222,7 @@ impl Channel {
drop(client_lock);
drop(builder_lock);

Ok(Some(unwrap_lock(lock)))
Ok(Some(sync::mutex::unwrap_lock(lock)))
}
None => Ok(None),
}
Expand All @@ -234,15 +234,9 @@ impl Channel {
&self,
imp: &(impl ?Sized + ChannelSubclassExt + PluginImpl),
) -> Result<Option<RwLockReadGuard<'_, self::send::Queue>>> {
fn unwrap_lock(
lock: RwLockReadGuard<'_, Option<self::send::Queue>>,
) -> RwLockReadGuard<'_, self::send::Queue> {
RwLockReadGuard::map(lock, |lock| lock.as_ref().unwrap())
}

let lock = self.send.read().await;
match lock.as_ref() {
Some(_) => Ok(Some(unwrap_lock(lock))),
Some(_) => Ok(Some(sync::rwlock::unwrap_lock(lock))),
None => {
drop(lock);

Expand All @@ -259,7 +253,7 @@ impl Channel {
drop(lock);

let lock = self.send.read().await;
Ok(Some(unwrap_lock(lock)))
Ok(Some(sync::rwlock::unwrap_lock(lock)))
}
None => Ok(None),
}
Expand Down Expand Up @@ -436,7 +430,10 @@ where
}
}

async fn try_init_client() -> Result<PipeClient<Image>, ErrorMessage> {
pub async fn try_init_client<T>() -> Result<PipeClient<T>, ErrorMessage>
where
T: JsonSchema,
{
// Do not parse arguments from command line,
// only use the environment variables.
let args = PipeClientArgs::try_parse_from::<_, &str>([]).map_err(|error| {
Expand Down
2 changes: 2 additions & 0 deletions common/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod mutex;
pub mod rwlock;
5 changes: 5 additions & 0 deletions common/src/sync/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use tokio::sync::{MappedMutexGuard, MutexGuard};

pub fn unwrap_lock<T>(lock: MutexGuard<'_, Option<T>>) -> MappedMutexGuard<'_, T> {
MutexGuard::map(lock, |lock| lock.as_mut().unwrap())
}
5 changes: 5 additions & 0 deletions common/src/sync/rwlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use tokio::sync::RwLockReadGuard;

pub fn unwrap_lock<T>(lock: RwLockReadGuard<'_, Option<T>>) -> RwLockReadGuard<'_, T> {
RwLockReadGuard::map(lock, |lock| lock.as_ref().unwrap())
}
23 changes: 21 additions & 2 deletions func/arkpy/examples/identity.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,21 @@
def tick(message):
return message
def metadata():
return {
'sink': {
'name': 'application/json',
},
'src': {
'name': 'video/x-raw',
},
}


def tick(messages):
return [
type(messages[0])(
payloads=[],
# payloads=messages[0].payloads,
value={
'value': 'hello world',
},
),
]
11 changes: 9 additions & 2 deletions func/arkpy/src/args.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::PathBuf;

use dash_pipe_function_python_provider::FunctionArgs;
use gsark_common::{
args::Params,
net::ChannelArgs,
Expand Down Expand Up @@ -96,13 +97,12 @@ impl ObjectImpl for crate::plugin::Plugin {
}

impl Args {
pub fn as_params(&self) -> Vec<ParamSpec> {
fn as_params(&self) -> Vec<ParamSpec> {
let mut params = self.common.as_params();
params.push(
ParamSpecString::builder("file")
.nick("Pythonfile")
.blurb("Python script file path")
.default_value(None)
.build(),
);
params.push(
Expand All @@ -114,4 +114,11 @@ impl Args {
);
params
}

pub fn build(&self) -> Option<FunctionArgs> {
Some(FunctionArgs {
python_script: self.file.clone()?,
python_tick_method: self.method.clone(),
})
}
}
Loading

0 comments on commit d214b48

Please sign in to comment.