Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Refactor block streaming method (processing is done in grpc thread) (#…
Browse files Browse the repository at this point in the history
…2181)

Signed-off-by: Igor Egorov <igor@soramitsu.co.jp>
  • Loading branch information
Igor Egorov authored Mar 21, 2019
1 parent 5d1db98 commit f8efa83
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 74 deletions.
30 changes: 2 additions & 28 deletions irohad/torii/impl/command_service_transport_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/range/adaptor/transformed.hpp>
#include "backend/protobuf/transaction_responses/proto_tx_response.hpp"
#include "common/combine_latest_until_first_completed.hpp"
#include "common/run_loop_handler.hpp"
#include "interfaces/iroha_internal/transaction_batch.hpp"
#include "interfaces/iroha_internal/transaction_batch_factory.hpp"
#include "interfaces/iroha_internal/transaction_batch_parser.hpp"
Expand Down Expand Up @@ -165,33 +166,6 @@ namespace iroha {
return grpc::Status::OK;
}

namespace {
void handleEvents(rxcpp::composite_subscription &subscription,
rxcpp::schedulers::run_loop &run_loop) {
std::condition_variable wait_cv;

run_loop.set_notify_earlier_wakeup(
[&wait_cv](const auto &) { wait_cv.notify_one(); });

std::mutex wait_mutex;
std::unique_lock<std::mutex> lock(wait_mutex);
while (subscription.is_subscribed() or not run_loop.empty()) {
while (not run_loop.empty()
and run_loop.peek().when <= run_loop.now()) {
run_loop.dispatch();
}

if (run_loop.empty()) {
wait_cv.wait(lock, [&run_loop, &subscription]() {
return not subscription.is_subscribed() or not run_loop.empty();
});
} else {
wait_cv.wait_until(lock, run_loop.peek().when);
}
}
}
} // namespace

grpc::Status CommandServiceTransportGrpc::StatusStream(
grpc::ServerContext *context,
const iroha::protocol::TxStatusRequest *request,
Expand Down Expand Up @@ -271,7 +245,7 @@ namespace iroha {

// run loop while subscription is active or there are pending events in
// the queue
handleEvents(subscription, rl);
iroha::schedulers::handleEvents(subscription, rl);

log_->debug("status stream done, {}", client_id);
return grpc::Status::OK;
Expand Down
116 changes: 70 additions & 46 deletions irohad/torii/impl/query_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "backend/protobuf/query_responses/proto_block_query_response.hpp"
#include "backend/protobuf/query_responses/proto_query_response.hpp"
#include "common/run_loop_handler.hpp"
#include "cryptography/default_hash_provider.hpp"
#include "interfaces/iroha_internal/abstract_transport_factory.hpp"
#include "logger/logger.hpp"
Expand All @@ -18,7 +19,7 @@ namespace iroha {
QueryService::QueryService(
std::shared_ptr<iroha::torii::QueryProcessor> query_processor,
std::shared_ptr<QueryFactoryType> query_factory,
logger::LoggerPtr log)
logger::LoggerPtr log)
: query_processor_{std::move(query_processor)},
query_factory_{std::move(query_factory)},
log_{std::move(log)} {}
Expand Down Expand Up @@ -70,59 +71,82 @@ namespace iroha {
const iroha::protocol::BlocksQuery *request,
grpc::ServerWriter<iroha::protocol::BlockQueryResponse> *writer) {
log_->debug("Fetching commits");

rxcpp::schedulers::run_loop run_loop;
auto current_thread = rxcpp::synchronize_in_one_worker(
rxcpp::schedulers::make_run_loop(run_loop));

shared_model::proto::TransportBuilder<
shared_model::proto::BlocksQuery,
shared_model::validation::DefaultSignedBlocksQueryValidator>()
.build(*request)
.match(
[this, context, request, writer](
[this, context, request, writer, &current_thread, &run_loop](
const iroha::expected::Value<shared_model::proto::BlocksQuery>
&query) {
rxcpp::composite_subscription sub;
rxcpp::composite_subscription subscription;
std::string client_id =
(boost::format("Peer: '%s'") % context->peer()).str();
query_processor_->blocksQueryHandle(query.value)
.as_blocking()
.subscribe(
sub,
[this, context, &sub, request, writer](
const std::shared_ptr<
shared_model::interface::BlockQueryResponse>
response) {
if (context->IsCancelled()) {
log_->debug("Unsubscribed");
sub.unsubscribe();
} else {
iroha::visit_in_place(
response->get(),
[this, writer, request](
const shared_model::interface::BlockResponse
&block_response) {
log_->debug(
"{} receives committed block",
request->meta().creator_account_id());
auto proto_block_response = static_cast<
const shared_model::proto::BlockResponse
&>(block_response);
writer->Write(
proto_block_response.getTransport());
},
[this, writer, request](
const shared_model::interface::
BlockErrorResponse
&block_error_response) {
log_->debug(
"{} received error with message: {}",
request->meta().creator_account_id(),
block_error_response.message());
auto proto_block_error_response =
static_cast<const shared_model::proto::
BlockErrorResponse &>(
block_error_response);
writer->WriteLast(
proto_block_error_response.getTransport(),
grpc::WriteOptions());
});
}
});
.observe_on(current_thread)
.take_while([this, context, request, writer](
const std::shared_ptr<
shared_model::interface::
BlockQueryResponse> response) {
if (context->IsCancelled()) {
log_->debug("Unsubscribed from block stream");
return false;
} else {
auto result = iroha::visit_in_place(
response->get(),
[this, writer, request](
const shared_model::interface::BlockResponse
&block_response) {
log_->debug("{} receives committed block",
request->meta().creator_account_id());
auto proto_block_response = static_cast<
const shared_model::proto::BlockResponse &>(
block_response);
bool written = writer->Write(
proto_block_response.getTransport());
if (not written) {
log_->debug(
"Block stream appears to be closed");
return false;
}
return true;
},
[this, writer, request](
const shared_model::interface::
BlockErrorResponse &block_error_response) {
log_->debug("{} received error with message: {}",
request->meta().creator_account_id(),
block_error_response.message());
auto proto_block_error_response = static_cast<
const shared_model::proto::BlockErrorResponse
&>(block_error_response);
writer->WriteLast(
proto_block_error_response.getTransport(),
grpc::WriteOptions());
return false;
});
return result;
}
})
.subscribe(subscription,
[](const auto &) {},
[&](std::exception_ptr ep) {
log_->error(
"something bad happened during block "
"streaming, client_id {}",
client_id);
},
[&] {
log_->debug("block stream done, {}",
client_id);
});

iroha::schedulers::handleEvents(subscription, run_loop);
},
[this, writer](const auto &error) {
log_->debug("Stateless invalid: {}", error.error);
Expand Down
42 changes: 42 additions & 0 deletions libs/common/run_loop_handler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef IROHA_RUN_LOOP_HANDLER_HPP
#define IROHA_RUN_LOOP_HANDLER_HPP

#include <condition_variable>

#include <rxcpp/rx.hpp>

namespace iroha {
namespace schedulers {

inline void handleEvents(rxcpp::composite_subscription &subscription,
rxcpp::schedulers::run_loop &run_loop) {
std::condition_variable wait_cv;

run_loop.set_notify_earlier_wakeup(
[&wait_cv](const auto &) { wait_cv.notify_one(); });

std::mutex wait_mutex;
std::unique_lock<std::mutex> lock(wait_mutex);
while (subscription.is_subscribed() or not run_loop.empty()) {
while (not run_loop.empty()
and run_loop.peek().when <= run_loop.now()) {
run_loop.dispatch();
}

if (run_loop.empty()) {
wait_cv.wait(lock, [&run_loop, &subscription]() {
return not subscription.is_subscribed() or not run_loop.empty();
});
} else {
wait_cv.wait_until(lock, run_loop.peek().when);
}
}
}
} // namespace schedulers
} // namespace iroha

#endif // IROHA_RUN_LOOP_HANDLER_HPP

0 comments on commit f8efa83

Please sign in to comment.