diff --git a/README.md b/README.md index 51ce1c865..175d98ca9 100644 --- a/README.md +++ b/README.md @@ -24,8 +24,8 @@ ## Usage -Starting ntex v0.5 async runtime must be selected as a feature. Available options are `glommio`, -`tokio` or `async-std`. +ntex supports multiple async runtimes, runtime must be selected as a feature. Available options are `compio`, `tokio`, +`glommio` or `async-std`. ```toml [dependencies] diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index a1cef06b4..02b2b479d 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.7.0] - 2024-10-16 + +* Better handling for h2 remote payload + ## [2.6.0] - 2024-09-25 * Disable default features for rustls diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 8f71e68c8..c8addc44f 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "2.6.0" +version = "2.7.0" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -18,7 +18,7 @@ edition = "2021" rust-version = "1.75" [package.metadata.docs.rs] -features = ["compio", "tokio", "openssl", "rustls", "compress", "cookie", "ws", "brotli", "ntex-tls/rustls-ring"] +features = ["compio", "openssl", "rustls", "compress", "cookie", "ws", "brotli", "ntex-tls/rustls-ring"] [lib] name = "ntex" @@ -69,7 +69,7 @@ ntex-macros = "0.1.3" ntex-util = "2" ntex-bytes = "0.1.27" ntex-server = "2.4" -ntex-h2 = "1.1" +ntex-h2 = "1.2" ntex-rt = "0.4.19" ntex-io = "2.7" ntex-net = "2.4" diff --git a/ntex/src/http/client/h2proto.rs b/ntex/src/http/client/h2proto.rs index 3ef764854..e04d4763a 100644 --- a/ntex/src/http/client/h2proto.rs +++ b/ntex/src/http/client/h2proto.rs @@ -1,14 +1,13 @@ use std::{future::poll_fn, io}; -use ntex_h2::client::{RecvStream, SimpleClient}; -use ntex_h2::{self as h2, frame}; +use ntex_h2::{self as h2, client::RecvStream, client::SimpleClient, frame}; use crate::http::body::{BodySize, MessageBody}; use crate::http::header::{self, HeaderMap, HeaderValue}; use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::{h2::payload, payload::Payload, Method, Version}; use crate::time::{timeout_checked, Millis}; -use crate::util::{ByteString, Bytes}; +use crate::util::{select, ByteString, Bytes, Either}; use super::error::{ConnectError, SendRequestError}; @@ -21,7 +20,12 @@ pub(super) async fn send_request( where B: MessageBody, { - log::trace!("Sending client request: {:?} {:?}", head, body.size()); + log::trace!( + "{}: Sending client request: {:?} {:?}", + client.client.tag(), + head, + body.size() + ); let length = body.size(); let eof = if head.as_ref().method == Method::HEAD { true @@ -82,7 +86,7 @@ where // at the same time let _ = crate::rt::spawn(async move { if let Err(e) = send_body(body, &snd_stream).await { - log::error!("Cannot send body: {:?}", e); + log::error!("{}: Cannot send body: {:?}", snd_stream.tag(), e); snd_stream.reset(frame::Reason::INTERNAL_ERROR); } }); @@ -108,7 +112,8 @@ async fn get_response( eof, } => { log::trace!( - "{:?} got response (eof: {}): {:#?}\nheaders: {:#?}", + "{}: {:?} got response (eof: {}): {:#?}\nheaders: {:#?}", + stream.tag(), stream.id(), eof, pseudo, @@ -122,31 +127,44 @@ async fn get_response( head.version = Version::HTTP_2; let payload = if !eof { - log::debug!("Creating local payload stream for {:?}", stream.id()); + log::debug!( + "{}: Creating local payload stream for {:?}", + stream.tag(), + stream.id() + ); let (mut pl, payload) = payload::Payload::create(stream.empty_capacity()); + let _ = crate::rt::spawn(async move { loop { - let h2::Message { stream, kind } = - match rcv_stream.recv().await { - Some(msg) => msg, - None => { - pl.feed_eof(Bytes::new()); - break; - } - }; + let h2::Message { stream, kind } = match select( + rcv_stream.recv(), + poll_fn(|cx| pl.on_cancel(cx.waker())), + ) + .await + { + Either::Left(Some(msg)) => msg, + Either::Left(None) => { + pl.feed_eof(Bytes::new()); + break; + } + Either::Right(_) => break, + }; + match kind { h2::MessageKind::Data(data, cap) => { - log::debug!( - "Got data chunk for {:?}: {:?}", + log::trace!( + "{}: Got data chunk for {:?}: {:?}", + stream.tag(), stream.id(), data.len() ); pl.feed_data(data, cap); } h2::MessageKind::Eof(item) => { - log::debug!( - "Got payload eof for {:?}: {:?}", + log::trace!( + "{}: Got payload eof for {:?}: {:?}", + stream.tag(), stream.id(), item ); @@ -163,7 +181,11 @@ async fn get_response( } } h2::MessageKind::Disconnect(err) => { - log::debug!("Connection is disconnected {:?}", err); + log::trace!( + "{}: Connection is disconnected {:?}", + stream.tag(), + err + ); pl.set_error( io::Error::new(io::ErrorKind::Other, err) .into(), @@ -207,12 +229,17 @@ async fn send_body( loop { match poll_fn(|cx| body.poll_next_chunk(cx)).await { Some(Ok(b)) => { - log::debug!("{:?} sending chunk, {} bytes", stream.id(), b.len()); + log::trace!( + "{}: {:?} sending chunk, {} bytes", + stream.tag(), + stream.id(), + b.len() + ); stream.send_payload(b, false).await? } Some(Err(e)) => return Err(e.into()), None => { - log::debug!("{:?} eof of send stream ", stream.id()); + log::trace!("{}: {:?} eof of send stream ", stream.tag(), stream.id()); stream.send_payload(Bytes::new(), true).await?; return Ok(()); } diff --git a/ntex/src/http/client/mod.rs b/ntex/src/http/client/mod.rs index cef41c7bf..11437dd1d 100644 --- a/ntex/src/http/client/mod.rs +++ b/ntex/src/http/client/mod.rs @@ -41,8 +41,7 @@ pub use self::response::{ClientResponse, JsonBody, MessageBody}; pub use self::sender::SendClientRequest; pub use self::test::TestResponse; -use crate::http::error::HttpError; -use crate::http::{HeaderMap, Method, RequestHead, Uri}; +use crate::http::{error::HttpError, HeaderMap, Method, RequestHead, Uri}; use crate::time::Millis; use self::connect::{Connect as HttpConnect, ConnectorWrapper}; diff --git a/ntex/src/http/h2/payload.rs b/ntex/src/http/h2/payload.rs index be2a17344..4a4042948 100644 --- a/ntex/src/http/h2/payload.rs +++ b/ntex/src/http/h2/payload.rs @@ -1,6 +1,6 @@ //! Payload stream use std::collections::VecDeque; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, Waker}; use std::{cell::RefCell, future::poll_fn, pin::Pin, rc::Rc, rc::Weak}; use ntex_h2::{self as h2}; @@ -8,6 +8,14 @@ use ntex_h2::{self as h2}; use crate::util::{Bytes, Stream}; use crate::{http::error::PayloadError, task::LocalWaker}; +bitflags::bitflags! { + #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] + struct Flags: u8 { + const EOF = 0b0000_0001; + const DROPPED = 0b0000_0010; + } +} + /// Buffered stream of byte chunks /// /// Payload stores chunks in a vector. First chunk can be received with @@ -54,6 +62,14 @@ impl Payload { } } +impl Drop for Payload { + fn drop(&mut self) { + let mut inner = self.inner.borrow_mut(); + inner.io_task.wake(); + inner.flags.insert(Flags::DROPPED); + } +} + impl Stream for Payload { type Item = Result; @@ -103,11 +119,24 @@ impl PayloadSender { shared.borrow_mut().stream = stream; } } + + pub(crate) fn on_cancel(&self, w: &Waker) -> Poll<()> { + if let Some(shared) = self.inner.upgrade() { + if shared.borrow_mut().flags.contains(Flags::DROPPED) { + Poll::Ready(()) + } else { + shared.borrow_mut().io_task.register(w); + Poll::Pending + } + } else { + Poll::Ready(()) + } + } } #[derive(Debug)] struct Inner { - eof: bool, + flags: Flags, cap: h2::Capacity, err: Option, items: VecDeque, @@ -120,7 +149,7 @@ impl Inner { fn new(cap: h2::Capacity) -> Self { Inner { cap, - eof: false, + flags: Flags::empty(), err: None, stream: None, items: VecDeque::new(), @@ -135,7 +164,7 @@ impl Inner { } fn feed_eof(&mut self, data: Bytes) { - self.eof = true; + self.flags.insert(Flags::EOF); if !data.is_empty() { self.items.push_back(data); } @@ -153,7 +182,7 @@ impl Inner { cx: &mut Context<'_>, ) -> Poll>> { if let Some(data) = self.items.pop_front() { - if !self.eof { + if !self.flags.contains(Flags::EOF) { self.cap.consume(data.len() as u32); if self.cap.size() == 0 { @@ -163,7 +192,7 @@ impl Inner { Poll::Ready(Some(Ok(data))) } else if let Some(err) = self.err.take() { Poll::Ready(Some(Err(err))) - } else if self.eof { + } else if self.flags.contains(Flags::EOF) { Poll::Ready(None) } else { self.task.register(cx.waker());