Skip to content

Commit

Permalink
Fix panic on error after stream eof
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Apr 6, 2023
1 parent 40373b3 commit 0397aa5
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 71 deletions.
4 changes: 4 additions & 0 deletions ntex-grpc/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.3.5] - 2023-04-06

* Fix panic on error after stream eof

## [0.3.4] - 2023-02-27

* Add google wrapper types
Expand Down
2 changes: 1 addition & 1 deletion ntex-grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-grpc"
version = "0.3.4"
version = "0.3.5"
license = "MIT"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "GRPC Client/Server framework"
Expand Down
141 changes: 71 additions & 70 deletions ntex-grpc/src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,89 +224,90 @@ where
}
}
h2::MessageKind::Eof(data) => {
let mut inflight = streams.remove(&id).unwrap();
if let Some(mut inflight) = streams.remove(&id) {
match data {
h2::StreamEof::Data(chunk) => inflight.data.push(chunk),
h2::StreamEof::Trailers(hdrs) => {
for (name, val) in hdrs.iter() {
inflight.headers.insert(name.clone(), val.clone());
}
}
h2::StreamEof::Error(err) => return Either::Right(Ready::Err(err)),
}

match data {
h2::StreamEof::Data(chunk) => inflight.data.push(chunk),
h2::StreamEof::Trailers(hdrs) => {
for (name, val) in hdrs.iter() {
inflight.headers.insert(name.clone(), val.clone());
let mut data = inflight.data.get();
let _compressed = data.get_u8();
let len = data.get_u32();
if (len as usize) > data.len() {
if msg
.stream()
.send_response(StatusCode::OK, HeaderMap::default(), false)
.is_ok()
{
let mut trailers = HeaderMap::default();
trailers
.insert(consts::GRPC_STATUS, GrpcStatus::InvalidArgument.into());
trailers.insert(
consts::GRPC_MESSAGE,
HeaderValue::from_static(
"Cannot decode request message: not enough data provided",
),
);
msg.stream().send_trailers(trailers);
}
return Either::Right(Ready::Ok(()));
}
h2::StreamEof::Error(err) => return Either::Right(Ready::Err(err)),
}
let data = data.split_to(len as usize);

let mut data = inflight.data.get();
let _compressed = data.get_u8();
let len = data.get_u32();
if (len as usize) > data.len() {
log::debug!("Call service {} method {}", inflight.service, inflight.name);
let req = ServerRequest {
payload: data,
name: inflight.name,
headers: inflight.headers,
};
if msg
.stream()
.send_response(StatusCode::OK, HeaderMap::default(), false)
.is_ok()
.is_err()
{
let mut trailers = HeaderMap::default();
trailers.insert(consts::GRPC_STATUS, GrpcStatus::InvalidArgument.into());
trailers.insert(
consts::GRPC_MESSAGE,
HeaderValue::from_static(
"Cannot decode request message: not enough data provided",
),
);
msg.stream().send_trailers(trailers);
return Either::Right(Ready::Ok(()));
}
return Either::Right(Ready::Ok(()));
}
let data = data.split_to(len as usize);

log::debug!("Call service {} method {}", inflight.service, inflight.name);
let req = ServerRequest {
payload: data,
name: inflight.name,
headers: inflight.headers,
};
if msg
.stream()
.send_response(StatusCode::OK, HeaderMap::default(), false)
.is_err()
{
return Either::Right(Ready::Ok(()));
}

let fut = self.service.call(req);
return Either::Left(Box::pin(async move {
match fut.await {
Ok(res) => {
log::debug!("Response is received {:?}", res);
let mut buf = BytesMut::with_capacity(res.payload.len() + 5);
buf.put_u8(0); // compression
buf.put_u32(res.payload.len() as u32); // length
buf.extend_from_slice(&res.payload);

let _ = msg.stream().send_payload(buf.freeze(), false).await;

let mut trailers = HeaderMap::default();
trailers.insert(consts::GRPC_STATUS, GrpcStatus::Ok.into());
for (name, val) in res.headers {
trailers.append(name, val);
let fut = self.service.call(req);
return Either::Left(Box::pin(async move {
match fut.await {
Ok(res) => {
log::debug!("Response is received {:?}", res);
let mut buf = BytesMut::with_capacity(res.payload.len() + 5);
buf.put_u8(0); // compression
buf.put_u32(res.payload.len() as u32); // length
buf.extend_from_slice(&res.payload);

let _ = msg.stream().send_payload(buf.freeze(), false).await;

let mut trailers = HeaderMap::default();
trailers.insert(consts::GRPC_STATUS, GrpcStatus::Ok.into());
for (name, val) in res.headers {
trailers.append(name, val);
}

msg.stream().send_trailers(trailers);
}

msg.stream().send_trailers(trailers);
}
Err(err) => {
let error = format!("Failure during service call: {}", err);
log::debug!("{}", error);
let mut trailers = HeaderMap::default();
trailers.insert(consts::GRPC_STATUS, GrpcStatus::Aborted.into());
if let Ok(val) = HeaderValue::from_str(&error) {
trailers.insert(consts::GRPC_MESSAGE, val);
Err(err) => {
let error = format!("Failure during service call: {}", err);
log::debug!("{}", error);
let mut trailers = HeaderMap::default();
trailers.insert(consts::GRPC_STATUS, GrpcStatus::Aborted.into());
if let Ok(val) = HeaderValue::from_str(&error) {
trailers.insert(consts::GRPC_MESSAGE, val);
}
msg.stream().send_trailers(trailers);
}
msg.stream().send_trailers(trailers);
}
};
};

Ok(())
}));
Ok(())
}));
}
}
h2::MessageKind::Disconnect(_) | h2::MessageKind::Empty => {
streams.remove(&id);
Expand Down

0 comments on commit 0397aa5

Please sign in to comment.