Skip to content

Commit

Permalink
Better handling for h2 remote payload (#439)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Oct 16, 2024
1 parent 4f7d951 commit dedb7de
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 35 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

## Usage

Starting ntex v0.5 async runtime must be selected as a feature. Available options are `glommio`,
`tokio` or `async-std`.
ntex supports multiple async runtimes, runtime must be selected as a feature. Available options are `compio`, `tokio`,
`glommio` or `async-std`.

```toml
[dependencies]
Expand Down
4 changes: 4 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.7.0] - 2024-10-16

* Better handling for h2 remote payload

## [2.6.0] - 2024-09-25

* Disable default features for rustls
Expand Down
6 changes: 3 additions & 3 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "2.6.0"
version = "2.7.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
Expand All @@ -18,7 +18,7 @@ edition = "2021"
rust-version = "1.75"

[package.metadata.docs.rs]
features = ["compio", "tokio", "openssl", "rustls", "compress", "cookie", "ws", "brotli", "ntex-tls/rustls-ring"]
features = ["tokio", "openssl", "rustls", "compress", "cookie", "ws", "brotli", "ntex-tls/rustls-ring"]

[lib]
name = "ntex"
Expand Down Expand Up @@ -69,7 +69,7 @@ ntex-macros = "0.1.3"
ntex-util = "2"
ntex-bytes = "0.1.27"
ntex-server = "2.4"
ntex-h2 = "1.1"
ntex-h2 = "1.2"
ntex-rt = "0.4.19"
ntex-io = "2.7"
ntex-net = "2.4"
Expand Down
71 changes: 49 additions & 22 deletions ntex/src/http/client/h2proto.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::{future::poll_fn, io};

use ntex_h2::client::{RecvStream, SimpleClient};
use ntex_h2::{self as h2, frame};
use ntex_h2::{self as h2, client::RecvStream, client::SimpleClient, frame};

use crate::http::body::{BodySize, MessageBody};
use crate::http::header::{self, HeaderMap, HeaderValue};
use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::{h2::payload, payload::Payload, Method, Version};
use crate::time::{timeout_checked, Millis};
use crate::util::{ByteString, Bytes};
use crate::util::{select, ByteString, Bytes, Either};

use super::error::{ConnectError, SendRequestError};

Expand All @@ -21,7 +20,12 @@ pub(super) async fn send_request<B>(
where
B: MessageBody,
{
log::trace!("Sending client request: {:?} {:?}", head, body.size());
log::trace!(
"{}: Sending client request: {:?} {:?}",
client.client.tag(),
head,
body.size()
);
let length = body.size();
let eof = if head.as_ref().method == Method::HEAD {
true
Expand Down Expand Up @@ -82,7 +86,7 @@ where
// at the same time
let _ = crate::rt::spawn(async move {
if let Err(e) = send_body(body, &snd_stream).await {
log::error!("Cannot send body: {:?}", e);
log::error!("{}: Cannot send body: {:?}", snd_stream.tag(), e);
snd_stream.reset(frame::Reason::INTERNAL_ERROR);
}
});
Expand All @@ -108,7 +112,8 @@ async fn get_response(
eof,
} => {
log::trace!(
"{:?} got response (eof: {}): {:#?}\nheaders: {:#?}",
"{}: {:?} got response (eof: {}): {:#?}\nheaders: {:#?}",
stream.tag(),
stream.id(),
eof,
pseudo,
Expand All @@ -122,31 +127,44 @@ async fn get_response(
head.version = Version::HTTP_2;

let payload = if !eof {
log::debug!("Creating local payload stream for {:?}", stream.id());
log::debug!(
"{}: Creating local payload stream for {:?}",
stream.tag(),
stream.id()
);
let (mut pl, payload) =
payload::Payload::create(stream.empty_capacity());

let _ = crate::rt::spawn(async move {
loop {
let h2::Message { stream, kind } =
match rcv_stream.recv().await {
Some(msg) => msg,
None => {
pl.feed_eof(Bytes::new());
break;
}
};
let h2::Message { stream, kind } = match select(
rcv_stream.recv(),
poll_fn(|cx| pl.on_cancel(cx.waker())),
)
.await
{
Either::Left(Some(msg)) => msg,
Either::Left(None) => {
pl.feed_eof(Bytes::new());
break;
}
Either::Right(_) => break,
};

match kind {
h2::MessageKind::Data(data, cap) => {
log::debug!(
"Got data chunk for {:?}: {:?}",
log::trace!(
"{}: Got data chunk for {:?}: {:?}",
stream.tag(),
stream.id(),
data.len()
);
pl.feed_data(data, cap);
}
h2::MessageKind::Eof(item) => {
log::debug!(
"Got payload eof for {:?}: {:?}",
log::trace!(
"{}: Got payload eof for {:?}: {:?}",
stream.tag(),
stream.id(),
item
);
Expand All @@ -163,7 +181,11 @@ async fn get_response(
}
}
h2::MessageKind::Disconnect(err) => {
log::debug!("Connection is disconnected {:?}", err);
log::trace!(
"{}: Connection is disconnected {:?}",
stream.tag(),
err
);
pl.set_error(
io::Error::new(io::ErrorKind::Other, err)
.into(),
Expand Down Expand Up @@ -207,12 +229,17 @@ async fn send_body<B: MessageBody>(
loop {
match poll_fn(|cx| body.poll_next_chunk(cx)).await {
Some(Ok(b)) => {
log::debug!("{:?} sending chunk, {} bytes", stream.id(), b.len());
log::trace!(
"{}: {:?} sending chunk, {} bytes",
stream.tag(),
stream.id(),
b.len()
);
stream.send_payload(b, false).await?
}
Some(Err(e)) => return Err(e.into()),
None => {
log::debug!("{:?} eof of send stream ", stream.id());
log::trace!("{}: {:?} eof of send stream ", stream.tag(), stream.id());
stream.send_payload(Bytes::new(), true).await?;
return Ok(());
}
Expand Down
3 changes: 1 addition & 2 deletions ntex/src/http/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ pub use self::response::{ClientResponse, JsonBody, MessageBody};
pub use self::sender::SendClientRequest;
pub use self::test::TestResponse;

use crate::http::error::HttpError;
use crate::http::{HeaderMap, Method, RequestHead, Uri};
use crate::http::{error::HttpError, HeaderMap, Method, RequestHead, Uri};
use crate::time::Millis;

use self::connect::{Connect as HttpConnect, ConnectorWrapper};
Expand Down
41 changes: 35 additions & 6 deletions ntex/src/http/h2/payload.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
//! Payload stream
use std::collections::VecDeque;
use std::task::{Context, Poll};
use std::task::{Context, Poll, Waker};
use std::{cell::RefCell, future::poll_fn, pin::Pin, rc::Rc, rc::Weak};

use ntex_h2::{self as h2};

use crate::util::{Bytes, Stream};
use crate::{http::error::PayloadError, task::LocalWaker};

bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 {
const EOF = 0b0000_0001;
const DROPPED = 0b0000_0010;
}
}

/// Buffered stream of byte chunks
///
/// Payload stores chunks in a vector. First chunk can be received with
Expand Down Expand Up @@ -54,6 +62,14 @@ impl Payload {
}
}

impl Drop for Payload {
fn drop(&mut self) {
let mut inner = self.inner.borrow_mut();
inner.io_task.wake();
inner.flags.insert(Flags::DROPPED);
}
}

impl Stream for Payload {
type Item = Result<Bytes, PayloadError>;

Expand Down Expand Up @@ -103,11 +119,24 @@ impl PayloadSender {
shared.borrow_mut().stream = stream;
}
}

pub(crate) fn on_cancel(&self, w: &Waker) -> Poll<()> {
if let Some(shared) = self.inner.upgrade() {
if shared.borrow_mut().flags.contains(Flags::DROPPED) {
Poll::Ready(())
} else {
shared.borrow_mut().io_task.register(w);
Poll::Pending
}
} else {
Poll::Ready(())
}
}
}

#[derive(Debug)]
struct Inner {
eof: bool,
flags: Flags,
cap: h2::Capacity,
err: Option<PayloadError>,
items: VecDeque<Bytes>,
Expand All @@ -120,7 +149,7 @@ impl Inner {
fn new(cap: h2::Capacity) -> Self {
Inner {
cap,
eof: false,
flags: Flags::empty(),
err: None,
stream: None,
items: VecDeque::new(),
Expand All @@ -135,7 +164,7 @@ impl Inner {
}

fn feed_eof(&mut self, data: Bytes) {
self.eof = true;
self.flags.insert(Flags::EOF);
if !data.is_empty() {
self.items.push_back(data);
}
Expand All @@ -153,7 +182,7 @@ impl Inner {
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, PayloadError>>> {
if let Some(data) = self.items.pop_front() {
if !self.eof {
if !self.flags.contains(Flags::EOF) {
self.cap.consume(data.len() as u32);

if self.cap.size() == 0 {
Expand All @@ -163,7 +192,7 @@ impl Inner {
Poll::Ready(Some(Ok(data)))
} else if let Some(err) = self.err.take() {
Poll::Ready(Some(Err(err)))
} else if self.eof {
} else if self.flags.contains(Flags::EOF) {
Poll::Ready(None)
} else {
self.task.register(cx.waker());
Expand Down

0 comments on commit dedb7de

Please sign in to comment.