Skip to content

Commit

Permalink
Update ntex-h2
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 9, 2023
1 parent cc31e27 commit 34d18ba
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 30 deletions.
4 changes: 2 additions & 2 deletions examples/custom/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ name = "client"
path = "src/client.rs"

[dependencies]
ntex = "0.7.0"
ntex-h2 = "0.4.0"
ntex = "0.7.5"
ntex-h2 = "0.4.1"
ntex-grpc = "0.5.0"

clap = "3.2"
Expand Down
4 changes: 2 additions & 2 deletions examples/helloworld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ name = "server"
path = "src/server.rs"

[dependencies]
ntex = "0.7.0"
ntex = "0.7.5"
ntex-grpc = "0.5.0"
ntex-h2 = "0.4.0"
ntex-h2 = "0.4.1"

clap = "2"
bitflags = "1.3"
Expand Down
4 changes: 2 additions & 2 deletions ntex-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ name = "ntex_grpc"
path = "src/lib.rs"

[dependencies]
ntex-h2 = "0.4.0"
ntex-http = "0.1.9"
ntex-h2 = "0.4.1"
ntex-http = "0.1.10"
ntex-connect = "0.3.2"
ntex-io = "0.3.4"
ntex-util = "0.3.2"
Expand Down
5 changes: 2 additions & 3 deletions ntex-grpc/src/client/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ impl<T: MethodDef> Transport<T> for Client {
let mut payload = Data::Empty;

loop {
let mut msg = if let Some(msg) = rcv_stream.recv().await {
let msg = if let Some(msg) = rcv_stream.recv().await {
msg
} else {
return Err(ClientError::UnexpectedEof(status, hdrs));
};

match msg.kind().take() {
match msg.kind {
h2::MessageKind::Headers {
headers,
pseudo,
Expand Down Expand Up @@ -112,7 +112,6 @@ impl<T: MethodDef> Transport<T> for Client {
};
}
h2::MessageKind::Disconnect(err) => return Err(ClientError::Operation(err)),
h2::MessageKind::Empty => {}
}

let mut data = payload.get();
Expand Down
37 changes: 16 additions & 21 deletions ntex-grpc/src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,12 @@ where
Ready<Self::Response, Self::Error>,
>;

fn call<'a>(&'a self, mut msg: h2::Message, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
fn call<'a>(&'a self, msg: h2::Message, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
let id = msg.id();
let mut streams = self.streams.borrow_mut();
let h2::Message { stream, kind } = msg;

match msg.kind().take() {
match kind {
h2::MessageKind::Headers {
headers,
pseudo,
Expand All @@ -179,18 +180,14 @@ where
path.split_to(n)
} else {
// not found
let _ = msg.stream().send_response(
StatusCode::NOT_FOUND,
HeaderMap::default(),
true,
);
let _ =
stream.send_response(StatusCode::NOT_FOUND, HeaderMap::default(), true);
return Either::Right(Ready::Ok(()));
};

// stream eof, cannot do anything
if eof {
if msg
.stream()
if stream
.send_response(StatusCode::OK, HeaderMap::default(), false)
.is_ok()
{
Expand All @@ -200,7 +197,7 @@ where
consts::GRPC_MESSAGE,
HeaderValue::from_static("Cannot decode request message"),
);
msg.stream().send_trailers(trailers);
stream.send_trailers(trailers);
}
return Either::Right(Ready::Ok(()));
}
Expand All @@ -213,7 +210,7 @@ where
};

let _ = streams.insert(
msg.id(),
stream.id(),
Inflight {
headers,
data: Data::Empty,
Expand All @@ -223,7 +220,7 @@ where
);
}
h2::MessageKind::Data(data, _cap) => {
if let Some(inflight) = streams.get_mut(&msg.id()) {
if let Some(inflight) = streams.get_mut(&stream.id()) {
inflight.data.push(data);
}
}
Expand All @@ -243,8 +240,7 @@ where
let _compressed = data.get_u8();
let len = data.get_u32();
if (len as usize) > data.len() {
if msg
.stream()
if stream
.send_response(StatusCode::OK, HeaderMap::default(), false)
.is_ok()
{
Expand All @@ -257,7 +253,7 @@ where
"Cannot decode request message: not enough data provided",
),
);
msg.stream().send_trailers(trailers);
stream.send_trailers(trailers);
}
return Either::Right(Ready::Ok(()));
}
Expand All @@ -269,8 +265,7 @@ where
name: inflight.name,
headers: inflight.headers,
};
if msg
.stream()
if stream
.send_response(StatusCode::OK, HeaderMap::default(), false)
.is_err()
{
Expand All @@ -286,15 +281,15 @@ where
buf.put_u32(res.payload.len() as u32); // length
buf.extend_from_slice(&res.payload);

let _ = msg.stream().send_payload(buf.freeze(), false).await;
let _ = stream.send_payload(buf.freeze(), false).await;

let mut trailers = HeaderMap::default();
trailers.insert(consts::GRPC_STATUS, GrpcStatus::Ok.into());
for (name, val) in res.headers {
trailers.append(name, val);
}

msg.stream().send_trailers(trailers);
stream.send_trailers(trailers);
}
Err(err) => {
let error = format!("Failure during service call: {}", err);
Expand All @@ -304,15 +299,15 @@ where
if let Ok(val) = HeaderValue::from_str(&error) {
trailers.insert(consts::GRPC_MESSAGE, val);
}
msg.stream().send_trailers(trailers);
stream.send_trailers(trailers);
}
};

Ok(())
}));
}
}
h2::MessageKind::Disconnect(_) | h2::MessageKind::Empty => {
h2::MessageKind::Disconnect(_) => {
streams.remove(&id);
}
}
Expand Down

0 comments on commit 34d18ba

Please sign in to comment.