Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add back 24.05 response sending path #378

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ InferRequest::Exec(const bool is_decoupled)
{
bi::scoped_lock<bi::interprocess_mutex> lock{
*(ipc_message->ResponseMutex())};
stub->SendIPCMessage(ipc_message);
stub->SendIPCUtilsMessage(ipc_message);
ipc_message->ResponseCondition()->wait(lock);
}

Expand Down
1 change: 1 addition & 0 deletions src/infer_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class InferRequest {
InferenceTrace& GetTrace();
uint32_t ReleaseFlags();
void SetReleaseFlags(const uint32_t& flags);
intptr_t GetResponseFactoryAddress() { return response_factory_address_; }

#ifdef TRITON_PB_STUB
std::shared_ptr<InferResponse> Exec(const bool is_decoupled);
Expand Down
23 changes: 23 additions & 0 deletions src/ipc_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ IPCMessage::Create(
new IPCMessage(ipc_message_shm, response_mutex_shm, response_cond_shm));
}

std::unique_ptr<IPCMessage>
IPCMessage::Create(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& message_handle)
{
return std::unique_ptr<IPCMessage>(
new IPCMessage(ipc_message_shm, message_handle));
}

AllocatedSharedMemory<IPCMessageShm>&
IPCMessage::GetAllocatedSharedMemory()
{
return ipc_message_shm_;
}

std::unique_ptr<IPCMessage>
IPCMessage::LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
Expand Down Expand Up @@ -133,4 +148,12 @@ IPCMessage::IPCMessage(
ipc_message_handle_ = ipc_message_shm_.handle_;
}

IPCMessage::IPCMessage(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& handle)
{
ipc_message_handle_ = handle;
ipc_message_shm_ptr_ = ipc_message_shm;
}

}}}; // namespace triton::backend::python
9 changes: 9 additions & 0 deletions src/ipc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class IPCMessage {
static std::unique_ptr<IPCMessage> Create(
const std::unique_ptr<SharedMemoryManager>& shm_pool,
bool inline_response);

static std::unique_ptr<IPCMessage> Create(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& message_handle);
static std::unique_ptr<IPCMessage> LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
bi::managed_external_buffer::handle_t message_handle);
Expand All @@ -108,6 +112,7 @@ class IPCMessage {
bi::interprocess_mutex* ResponseMutex();
bi::managed_external_buffer::handle_t& Args();
bi::managed_external_buffer::handle_t ShmHandle();
AllocatedSharedMemory<IPCMessageShm>& GetAllocatedSharedMemory();

private:
AllocatedSharedMemory<IPCMessageShm> ipc_message_shm_;
Expand All @@ -129,6 +134,10 @@ class IPCMessage {
AllocatedSharedMemory<IPCMessageShm>& ipc_message_shm,
AllocatedSharedMemory<bi::interprocess_mutex>& response_mutex_shm,
AllocatedSharedMemory<bi::interprocess_condition>& response_cond_shm);

IPCMessage(
IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& handle);
};

}}}; // namespace triton::backend::python
113 changes: 88 additions & 25 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -653,27 +653,19 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
py::list py_request_list =
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
std::unique_ptr<IPCMessage> execute_response =
IPCMessage::Create(shm_pool_, false /* Inline response */);
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
std::unique_ptr<IPCMessage> execute_response;

AllocatedSharedMemory<ResponseBatch> response_batch =
shm_pool_->Construct<ResponseBatch>();
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());
execute_response->Args() = response_batch.handle_;
std::optional<AllocatedSharedMemory<char>> response_batch;
bool has_exception = false;
std::string error_string;
std::unique_ptr<PbString> error_string_shm;
std::string err_message;

ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); });
ScopedDefer _(
[this, &execute_response] { SendIPCMessage(execute_response); });

py::object execute_return;
try {
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;

if (!py::hasattr(model_instance_, "execute")) {
std::string message = "Python model " + model_context_.PythonModelPath() +
" does not implement `execute` method.";
Expand All @@ -683,8 +675,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
NVTX_RANGE(nvtx_, "PyExecute " + name_);

py::object execute_return =
model_instance_.attr("execute")(py_request_list);
execute_return = model_instance_.attr("execute")(py_request_list);

bool is_coroutine = py::module::import("asyncio")
.attr("iscoroutine")(execute_return)
Expand All @@ -696,10 +687,12 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
} else {
py::object coroutine_return =
RunCoroutine(execute_return, false /* in_background */);
ProcessReturnedResponses(py_request_list, coroutine_return);
ProcessReturnedResponses(
py_request_list, coroutine_return, response_batch);
}
} else {
ProcessReturnedResponses(py_request_list, execute_return);
ProcessReturnedResponses(
py_request_list, execute_return, response_batch);
}
}
}
Expand All @@ -713,16 +706,23 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
}

if (has_exception) {
std::string err_message =
std::string(
"Failed to process the request(s) for model '" + name_ +
"', message: ") +
error_string;
err_message = std::string(
"Failed to process the request(s) for model '" + name_ +
"', message: ") +
error_string;
LOG_ERROR << err_message.c_str();
if (!response_batch) {
response_batch = shm_pool_->Construct<char>(
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
}
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));

response_batch_shm_ptr->has_error = true;
error_string_shm = PbString::Create(shm_pool_, err_message);
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
response_batch_shm_ptr->is_error_set = true;
response_batch_shm_ptr->batch_size = 0;
// Once the error is sent to the backend, the backend is supposed to close
// all response factories if not already closed, so closing all response
// senders if not already closed to prevent the model from sending more
Expand All @@ -731,12 +731,47 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
InferRequest* request = py_request.cast<InferRequest*>();
request->GetResponseSender()->Close();
}
} else {
if (!response_batch) {
response_batch = shm_pool_->Construct<char>(
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->batch_size = 0;
}
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;
}

execute_response = IPCMessage::Create(
reinterpret_cast<IPCMessageShm*>(response_batch.value().data_.get()),
response_batch.value().handle_);
execute_response->Args() =
response_batch.value().handle_ + sizeof(IPCMessageShm);
execute_response->InlineResponse() = false;
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
_.Complete();
execute_finalize.Complete();
}

void
Stub::ProcessResponse(InferResponse* response)
{
response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */);

for (auto& output_tensor : response->OutputTensors()) {
if (!output_tensor->IsCPU()) {
gpu_tensors_.push_back(output_tensor);
}
}
}

void
Stub::ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj)
py::list py_requests, py::object py_responses_obj,
std::optional<AllocatedSharedMemory<char>>& response_batch)
{
// Return if there is nothing to process.
if (py::isinstance<py::none>(py_responses_obj)) {
Expand Down Expand Up @@ -784,12 +819,40 @@ Stub::ProcessReturnedResponses(
"return list, found type '" +
std::string(py::str(py_responses[i].get_type())) + "'.");
}
std::shared_ptr<InferResponse> response =
py_responses[i].cast<std::shared_ptr<InferResponse>>();
request->GetResponseSender()->Send(

InferResponse* response = py_responses[i].cast<InferResponse*>();
request->GetResponseSender()->UpdateStateAndCounters(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
}
}
// Return all the created responses using response_batch. The reason
// that both of the paths are available is that sending the responses
// using response_batch is faster than using `response_sender`.
response_batch = std::move(shm_pool_->Construct<char>(
sizeof(IPCMessageShm) +
requests_size * sizeof(bi::managed_external_buffer::handle_t) +
sizeof(ResponseBatch)));
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
response_batch.value().data_.get() + sizeof(IPCMessageShm));

bi::managed_external_buffer::handle_t* responses_shm_handle =
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
response_batch.value().data_.get() + sizeof(ResponseBatch) +
sizeof(IPCMessageShm));

for (size_t i = 0; i < responses_size; i++) {
// Check the return type of execute function.
InferRequest* infer_request = py_requests[i].cast<InferRequest*>();
InferResponse* infer_response = py_responses[i].cast<InferResponse*>();
if (!py::isinstance<py::none>(py_responses[i])) {
infer_response->PruneOutputTensors(infer_request->RequestedOutputNames());
ProcessResponse(infer_response);
responses_shm_handle[i] = infer_response->ShmHandle();
} else {
responses_shm_handle[i] = 0;
}
}
response_batch_shm_ptr->batch_size = requests_size;
}

py::object
Expand Down
5 changes: 4 additions & 1 deletion src/pb_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ class Stub {
void ProcessRequests(RequestBatch* request_batch_shm_ptr);

void ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj);
py::list py_requests, py::object py_responses_obj,
std::optional<AllocatedSharedMemory<char>>& response_batch);

void ProcessResponse(InferResponse* response);

py::object GetAsyncEventLoop();

Expand Down
Loading
Loading