Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor pipeline call impl #219

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
fail-fast: false
matrix:
version:
- 1.66.0 # MSRV
- 1.67.0 # MSRV
- stable
- nightly

Expand Down
4 changes: 2 additions & 2 deletions ntex-async-std/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl Future for WriteTask {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().get_mut();
let this = self.as_mut().get_mut();

match this.st {
IoWriteState::Processing(ref mut delay) => {
Expand Down Expand Up @@ -432,7 +432,7 @@ mod unixstream {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().get_mut();
let this = self.as_mut().get_mut();

match this.st {
IoWriteState::Processing(ref mut delay) => {
Expand Down
4 changes: 4 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.3.2] - 2023-08-10

* Replace `PipelineCall` with `ServiceCall<'static, S, R>`

## [0.3.1] - 2023-06-23

* `PipelineCall` is static
Expand Down
4 changes: 2 additions & 2 deletions ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.3.1"
version = "0.3.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -19,7 +19,7 @@ path = "src/lib.rs"
ntex-codec = "0.6.2"
ntex-bytes = "0.1.19"
ntex-util = "0.3.0"
ntex-service = "1.2.1"
ntex-service = "1.2.3"

bitflags = "1.3"
log = "0.4"
Expand Down
6 changes: 2 additions & 4 deletions ntex-io/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,8 @@ where
// call service
let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1);
let fut = shared.service.call(item);
spawn(async move {
let result = fut.await;
let result = shared.service.call(item).await;
shared.handle_result(result, &shared.io);
});
}
Expand All @@ -276,9 +275,8 @@ where
// call service
let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1);
let fut = shared.service.call(item);
spawn(async move {
let result = fut.await;
let result = shared.service.call(item).await;
shared.handle_result(result, &shared.io);
});
}
Expand Down
2 changes: 1 addition & 1 deletion ntex-io/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ impl Future for WriteTask {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().get_mut();
let this = self.as_mut().get_mut();

match this.st {
IoWriteState::Processing(ref mut delay) => {
Expand Down
4 changes: 4 additions & 0 deletions ntex-service/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.2.3] - 2023-08-10

* Check readiness for pipeline calls

## [1.2.2] - 2023-06-24

* Added `ServiceCall::advance_to_call`
Expand Down
2 changes: 1 addition & 1 deletion ntex-service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "1.2.2"
version = "1.2.3"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]
Expand Down
3 changes: 1 addition & 2 deletions ntex-service/src/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<S> ApplyService<S> {
where
S: Service<R>,
{
self.service.service_call(req)
self.service.call(req)
}
}

Expand Down Expand Up @@ -85,7 +85,6 @@ where
type Error = Err;
type Future<'f> = R where Self: 'f, In: 'f, R: 'f;

crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);

#[inline]
Expand Down
104 changes: 65 additions & 39 deletions ntex-service/src/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task};

use crate::Service;
use crate::{Pipeline, Service};

pub struct ServiceCtx<'a, S: ?Sized> {
idx: usize,
Expand Down Expand Up @@ -82,7 +82,7 @@ impl Drop for Waiters {
}
}

impl<'a, S: ?Sized> ServiceCtx<'a, S> {
impl<'a, S> ServiceCtx<'a, S> {
pub(crate) fn new(waiters: &'a Waiters) -> Self {
Self {
idx: waiters.index,
Expand All @@ -107,7 +107,7 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> {
/// Wait for service readiness and then call service
pub fn call<T, R>(&self, svc: &'a T, req: R) -> ServiceCall<'a, T, R>
where
T: Service<R> + ?Sized,
T: Service<R>,
R: 'a,
{
ServiceCall {
Expand All @@ -125,7 +125,7 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> {
/// Call service, do not check service readiness
pub fn call_nowait<T, R>(&self, svc: &'a T, req: R) -> T::Future<'a>
where
T: Service<R> + ?Sized,
T: Service<R>,
R: 'a,
{
svc.call(
Expand All @@ -139,9 +139,9 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> {
}
}

impl<'a, S: ?Sized> Copy for ServiceCtx<'a, S> {}
impl<'a, S> Copy for ServiceCtx<'a, S> {}

impl<'a, S: ?Sized> Clone for ServiceCtx<'a, S> {
impl<'a, S> Clone for ServiceCtx<'a, S> {
#[inline]
fn clone(&self) -> Self {
Self {
Expand All @@ -157,57 +157,65 @@ pin_project_lite::pin_project! {
pub struct ServiceCall<'a, S, Req>
where
S: Service<Req>,
S: 'a,
S: ?Sized,
Req: 'a,
{
#[pin]
state: ServiceCallState<'a, S, Req>,
}
}

impl<'a, S, Req> ServiceCall<'a, S, Req>
where
S: Service<Req>,
S: 'a,
S: ?Sized,
Req: 'a,
{
pub fn advance_to_call(self) -> ServiceCallToCall<'a, S, Req> {
match self.state {
ServiceCallState::Ready { .. } => {}
ServiceCallState::Call { .. } | ServiceCallState::Empty => {
panic!(
"`ServiceCall::advance_to_call` must be called before `ServiceCall::poll`"
)
}
}
ServiceCallToCall { state: self.state }
}
}

pin_project_lite::pin_project! {
#[project = ServiceCallStateProject]
enum ServiceCallState<'a, S, Req>
where
S: Service<Req>,
S: 'a,
S: ?Sized,
Req: 'a,
{
Ready { req: Option<Req>,
svc: &'a S,
idx: usize,
waiters: &'a WaitersRef,
},
ReadyPl { req: Option<Req>,
svc: &'a Pipeline<S>,
pl: Pipeline<S>,
},
Call { #[pin] fut: S::Future<'a> },
Empty,
}
}

impl<'a, S, Req> ServiceCall<'a, S, Req>
where
S: Service<Req>,
Req: 'a,
{
pub(crate) fn call_pipeline(req: Req, svc: &'a Pipeline<S>) -> Self {
ServiceCall {
state: ServiceCallState::ReadyPl {
req: Some(req),
pl: svc.clone(),
svc,
},
}
}

pub fn advance_to_call(self) -> ServiceCallToCall<'a, S, Req> {
match self.state {
ServiceCallState::Ready { .. } | ServiceCallState::ReadyPl { .. } => {}
ServiceCallState::Call { .. } | ServiceCallState::Empty => {
panic!(
"`ServiceCall::advance_to_call` must be called before `ServiceCall::poll`"
)
}
}
ServiceCallToCall { state: self.state }
}
}

impl<'a, S, Req> Future for ServiceCall<'a, S, Req>
where
S: Service<Req> + ?Sized,
S: Service<Req>,
{
type Output = Result<S::Response, S::Error>;

Expand Down Expand Up @@ -243,7 +251,21 @@ where
task::Poll::Pending
}
},
ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| {
ServiceCallStateProject::ReadyPl { req, svc, pl } => {
task::ready!(pl.poll_ready(cx))?;

let ctx = ServiceCtx::new(&svc.waiters);
let svc_call = svc.get_ref().call(req.take().unwrap(), ctx);

// SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc`
// Pipeline::svc is heap allocated(Rc<S>), we keep it alive until
// `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) };

this.state.set(ServiceCallState::Call { fut });
self.poll(cx)
}
ServiceCallStateProject::Call { fut, .. } => fut.poll(cx).map(|r| {
this.state.set(ServiceCallState::Empty);
r
}),
Expand All @@ -259,8 +281,6 @@ pin_project_lite::pin_project! {
pub struct ServiceCallToCall<'a, S, Req>
where
S: Service<Req>,
S: 'a,
S: ?Sized,
Req: 'a,
{
#[pin]
Expand All @@ -270,7 +290,7 @@ pin_project_lite::pin_project! {

impl<'a, S, Req> Future for ServiceCallToCall<'a, S, Req>
where
S: Service<Req> + ?Sized,
S: Service<Req>,
{
type Output = Result<S::Future<'a>, S::Error>;

Expand Down Expand Up @@ -306,6 +326,12 @@ where
task::Poll::Pending
}
},
ServiceCallStateProject::ReadyPl { req, svc, pl } => {
task::ready!(pl.poll_ready(cx))?;

let ctx = ServiceCtx::new(&svc.waiters);
task::Poll::Ready(Ok(svc.get_ref().call(req.take().unwrap(), ctx)))
}
ServiceCallStateProject::Call { .. } => {
unreachable!("`ServiceCallToCall` can only be constructed in `Ready` state")
}
Expand Down Expand Up @@ -387,13 +413,13 @@ mod tests {
let data1 = data.clone();
ntex::rt::spawn(async move {
let _ = poll_fn(|cx| srv1.poll_ready(cx)).await;
let i = srv1.call("srv1").await.unwrap();
let i = srv1.call_nowait("srv1").await.unwrap();
data1.borrow_mut().push(i);
});

let data2 = data.clone();
ntex::rt::spawn(async move {
let i = srv2.service_call("srv2").await.unwrap();
let i = srv2.call_static("srv2").await.unwrap();
data2.borrow_mut().push(i);
});
time::sleep(time::Millis(50)).await;
Expand All @@ -417,7 +443,7 @@ mod tests {
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));

let mut fut = srv.service_call("test").advance_to_call();
let mut fut = srv.call("test").advance_to_call();
let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
con.notify();

Expand All @@ -432,7 +458,7 @@ mod tests {
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));

let mut fut = srv.service_call("test");
let mut fut = srv.call("test");
let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
con.notify();

Expand Down
2 changes: 1 addition & 1 deletion ntex-service/src/fn_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ mod tests {

let pipe = Pipeline::new(chain(srv).and_then(on_shutdown).clone());

let res = pipe.service_call(()).await;
let res = pipe.call(()).await;
assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());
assert_eq!(res.unwrap(), "pipe");
Expand Down
4 changes: 2 additions & 2 deletions ntex-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ pub trait ServiceFactory<Req, Cfg = ()> {

impl<'a, S, Req> Service<Req> for &'a S
where
S: Service<Req> + ?Sized,
S: Service<Req>,
{
type Response = S::Response;
type Error = S::Error;
Expand All @@ -285,7 +285,7 @@ where

impl<S, Req> Service<Req> for Box<S>
where
S: Service<Req> + ?Sized,
S: Service<Req>,
{
type Response = S::Response;
type Error = S::Error;
Expand Down
Loading
Loading