From f4c83f276a66705ce22aee2439388bc50840a73c Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Thu, 12 Sep 2024 03:01:40 +0000 Subject: [PATCH 1/7] Add back 24.05 response sender path --- src/pb_stub.cc | 73 ++++++++++++--- src/pb_stub.h | 4 +- src/python_be.cc | 212 ++++++++++++++++++++++++++++++++++++++++-- src/response_sender.h | 4 +- 4 files changed, 269 insertions(+), 24 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 007e7f29..a4e051e9 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -657,11 +657,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) IPCMessage::Create(shm_pool_, false /* Inline response */); execute_response->Command() = PYTHONSTUB_ExecuteResponse; - AllocatedSharedMemory response_batch = - shm_pool_->Construct(); - ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.data_.get()); - execute_response->Args() = response_batch.handle_; + std::optional> response_batch; bool has_exception = false; std::string error_string; std::unique_ptr error_string_shm; @@ -669,11 +665,8 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); }); ScopedDefer _( [this, &execute_response] { SendIPCMessage(execute_response); }); - + py::object execute_return; try { - response_batch_shm_ptr->has_error = false; - response_batch_shm_ptr->is_error_set = false; - if (!py::hasattr(model_instance_, "execute")) { std::string message = "Python model " + model_context_.PythonModelPath() + " does not implement `execute` method."; @@ -683,7 +676,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) { NVTX_RANGE(nvtx_, "PyExecute " + name_); - py::object execute_return = + execute_return = model_instance_.attr("execute")(py_request_list); bool is_coroutine = py::module::import("asyncio") @@ -696,10 +689,10 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } else { py::object coroutine_return = RunCoroutine(execute_return, false /* in_background */); - ProcessReturnedResponses(py_request_list, coroutine_return); + ProcessReturnedResponses(py_request_list, coroutine_return, response_batch); } } else { - ProcessReturnedResponses(py_request_list, execute_return); + ProcessReturnedResponses(py_request_list, execute_return, response_batch); } } } @@ -719,6 +712,12 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) "', message: ") + error_string; LOG_ERROR << err_message.c_str(); + if (!response_batch) { + response_batch = shm_pool_->Construct(sizeof(ResponseBatch)); + } + ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); + + response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); response_batch_shm_ptr->has_error = true; error_string_shm = PbString::Create(shm_pool_, err_message); response_batch_shm_ptr->error = error_string_shm->ShmHandle(); @@ -732,11 +731,35 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) request->GetResponseSender()->Close(); } } + + if (!response_batch) { + response_batch = shm_pool_->Construct(sizeof(ResponseBatch)); + ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get()); + response_batch_shm_ptr->batch_size = 0; + } + ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get()); + response_batch_shm_ptr->has_error = false; + response_batch_shm_ptr->is_error_set = false; + execute_response->Args() = response_batch.value().handle_; + _.Complete(); + execute_finalize.Complete(); +} + +void +Stub::ProcessResponse(InferResponse* response) +{ + response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */); + + for (auto& output_tensor : response->OutputTensors()) { + if (!output_tensor->IsCPU()) { + gpu_tensors_.push_back(output_tensor); + } + } } void Stub::ProcessReturnedResponses( - py::list py_requests, py::object py_responses_obj) + py::list py_requests, py::object py_responses_obj, std::optional>& response_batch) { // Return if there is nothing to process. if (py::isinstance(py_responses_obj)) { @@ -784,12 +807,32 @@ Stub::ProcessReturnedResponses( "return list, found type '" + std::string(py::str(py_responses[i].get_type())) + "'."); } + std::shared_ptr response = py_responses[i].cast>(); - request->GetResponseSender()->Send( - response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); + request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); } } + response_batch = std::move(shm_pool_->Construct( + requests_size * sizeof(bi::managed_external_buffer::handle_t) + + sizeof(ResponseBatch))); + ResponseBatch* response_batch_shm_ptr = + reinterpret_cast(response_batch.value().data_.get()); + + bi::managed_external_buffer::handle_t* responses_shm_handle = + reinterpret_cast( + response_batch.value().data_.get() + sizeof(ResponseBatch)); + + for (size_t i = 0; i < responses_size; i++) { + // Check the return type of execute function. + InferRequest* infer_request = py_requests[i].cast(); + InferResponse* infer_response = py_responses[i].cast(); + infer_response->PruneOutputTensors( + infer_request->RequestedOutputNames()); + ProcessResponse(infer_response); + responses_shm_handle[i] = infer_response->ShmHandle(); + } + response_batch_shm_ptr->batch_size = requests_size; } py::object diff --git a/src/pb_stub.h b/src/pb_stub.h index 9ed74d9a..85a2783a 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -254,7 +254,9 @@ class Stub { void ProcessRequests(RequestBatch* request_batch_shm_ptr); void ProcessReturnedResponses( - py::list py_requests, py::object py_responses_obj); + py::list py_requests, py::object py_responses_obj, std::optional>& response_batch); + + void ProcessResponse(InferResponse* response); py::object GetAsyncEventLoop(); diff --git a/src/python_be.cc b/src/python_be.cc index 761abdbf..78b306b9 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1229,7 +1229,7 @@ ModelInstanceState::ProcessRequests( ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest; ipc_message->Args() = request_batch.handle_; received_message_ = nullptr; - ScopedDefer _([this] { + ScopedDefer execute_finalize([this] { // Push a dummy message to signal the thread to terminate. Stub()->StubMessageQueue()->Push(DUMMY_MESSAGE); }); @@ -1240,8 +1240,12 @@ ModelInstanceState::ProcessRequests( cv_.wait(guard, [this] { return received_message_ != nullptr; }); } - AllocatedSharedMemory response_batch = - Stub()->ShmPool()->Load(received_message_->Args()); + + AllocatedSharedMemory response_batch = Stub()->ShmPool()->Load(received_message_->Args()); + + ResponseBatch* response_batch_shm_ptr = + reinterpret_cast(response_batch.data_.get()); + received_message_.reset(); uint64_t compute_end_ns = 0; @@ -1249,10 +1253,10 @@ ModelInstanceState::ProcessRequests( reporter.SetComputeEndNs(compute_end_ns); reporter.SetBatchStatistics(total_batch_size); - if (response_batch.data_->has_error) { - if (response_batch.data_->is_error_set) { + if (response_batch_shm_ptr->has_error) { + if (response_batch_shm_ptr->is_error_set) { auto error = PbString::LoadFromSharedMemory( - Stub()->ShmPool(), response_batch.data_->error); + Stub()->ShmPool(), response_batch_shm_ptr->error); return TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_INTERNAL, error->String().c_str()); } @@ -1261,6 +1265,202 @@ ModelInstanceState::ProcessRequests( TRITONSERVER_ERROR_INTERNAL, "Failed to process the requests."); } + if (response_batch_shm_ptr->batch_size > 0) { + std::shared_ptr> responses( + new std::vector()); + responses->reserve(request_count); + for (size_t i = 0; i < request_count; i++) { + TRITONBACKEND_Response* response; + auto err = TRITONBACKEND_ResponseNew(&response, requests[i]); + if (err == nullptr) { + responses->emplace_back(response); + } else { + responses->emplace_back(nullptr); + LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Fail to create response"); + TRITONSERVER_ErrorDelete(err); + } + } + bi::managed_external_buffer::handle_t* response_shm_handle = + reinterpret_cast( + response_batch.data_.get() + sizeof(ResponseBatch)); + + // If the output provided by the model is in GPU, we will pass the list of + // buffers provided by Triton to the stub process. + // bool has_gpu_output = false; + std::vector requires_deferred_callback; + + std::vector> shm_responses; + std::vector, void*>>> + gpu_output_buffers(request_count); + GPUBuffersHelper gpu_buffer_helper; + + for (uint32_t r = 0; r < request_count; ++r) { + NVTX_RANGE(nvtx_, "LoadingResponse " + Name()); + TRITONBACKEND_Response* response = (*responses)[r]; + TRITONBACKEND_Request* request = requests[r]; + uint32_t requested_output_count = 0; + requires_deferred_callback.push_back(false); + + shm_responses.emplace_back(nullptr); + std::unique_ptr& infer_response = shm_responses.back(); + try { + if (pb_infer_requests[r]->ReleaseFlags() == + TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { + // For rescheduled requests, we do not need to send a response. + LOG_IF_ERROR( + TRITONBACKEND_ResponseDelete((*responses)[r]), + "failed to delete response"); + (*responses)[r] = nullptr; + continue; + } + infer_response = InferResponse::LoadFromSharedMemory( + Stub()->ShmPool(), response_shm_handle[r], + false /* open_cuda_handle */); + if (infer_response->HasError()) { + TRITONSERVER_Error* err = TRITONSERVER_ErrorNew( + infer_response->Error()->Code(), + infer_response->Error()->Message().c_str()); + + LOG_IF_ERROR( + TRITONBACKEND_ResponseSend( + (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), + "failed sending response"); + TRITONSERVER_ErrorDelete(err); + (*responses)[r] = nullptr; + + // Reset the release flags for the request. + pb_infer_requests[r]->SetReleaseFlags( + TRITONSERVER_REQUEST_RELEASE_ALL); + + // If has_error is true, we do not look at the response tensors. + continue; + } + } + catch (const PythonBackendException& pb_exception) { + TRITONSERVER_Error* err = TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, pb_exception.what()); + LOG_IF_ERROR( + TRITONBACKEND_ResponseSend( + (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), + "failed sending response"); + TRITONSERVER_ErrorDelete(err); + (*responses)[r] = nullptr; + + // Reset the release flags for the request. + pb_infer_requests[r]->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); + + continue; + } + + GUARDED_RESPOND_IF_ERROR( + responses, r, + TRITONBACKEND_RequestOutputCount(request, &requested_output_count)); + + std::set requested_output_names; + for (size_t j = 0; j < requested_output_count; ++j) { + const char* output_name; + GUARDED_RESPOND_IF_ERROR( + responses, r, + TRITONBACKEND_RequestOutputName(request, j, &output_name)); + requested_output_names.insert(output_name); + } + + bool require_deferred_callback = false; + +#ifdef TRITON_ENABLE_GPU + for (auto& output_tensor : infer_response->OutputTensors()) { + if (output_tensor->MemoryType() == TRITONSERVER_MEMORY_GPU) { + // Attempt to use the cuda shared memory pool for GPU tensor. + ShareCUDAMemoryPool(output_tensor->MemoryTypeId()); + } + } +#endif // TRITON_ENABLE_GPU + + gpu_output_buffers[r] = + std::vector, void*>>{}; + infer_response->Send( + response, CudaStream(), require_deferred_callback, + TRITONSERVER_RESPONSE_COMPLETE_FINAL, Stub()->ShmPool(), + gpu_buffer_helper, gpu_output_buffers[r], requested_output_names); + + requires_deferred_callback[r] = require_deferred_callback; + + if (requires_deferred_callback[r]) { + // has_gpu_output = true; + } + } + + // Finalize the execute. + execute_finalize.Complete(); + } + + // If the output tensor is in GPU, there will be a second round trip + // required for filling the GPU buffers provided by the main process. +// if (has_gpu_output) { +// ipc_message->Command() = +// PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers; +// gpu_buffer_helper.Complete(Stub()->ShmPool()); +// ipc_message->Args() = gpu_buffer_helper.ShmHandle(); +// SendMessageAndReceiveResponse( +// ipc_message->ShmHandle(), response_message, restart, responses, +// requests, 0); + +// bool cuda_copy = false; + +// uint32_t response_index = 0; +// for (auto& gpu_output_buffer : gpu_output_buffers) { +// for (auto& buffer_memory_pair : gpu_output_buffer) { +// auto& pb_memory = buffer_memory_pair.first; +// void* pointer = buffer_memory_pair.second; +// bool cuda_used = false; + +// if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { +// GUARDED_RESPOND_IF_ERROR( +// responses, response_index, +// CopyBuffer( +// "Failed to copy the output tensor to buffer.", +// TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, +// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, +// CudaStream(), &cuda_used)); +// cuda_copy |= cuda_used; +// } else if ( +// (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) && +// pb_memory->UseCUDASharedPool() && +// (pb_memory->DataPtr() != pointer)) { +// // If the data pointer from pb_memory is not the same as the +// // pointer, it means that the Triton-provided buffer is not used +// // during tensor transfer. Instead, an intermediate buffer that uses +// // CUDA shared memory pool is used. In this case, we need to copy +// // the data from the intermediate buffer back to the Triton-provided +// // buffer. +// GUARDED_RESPOND_IF_ERROR( +// responses, response_index, +// CopyBuffer( +// "Failed to copy the output tensor to buffer.", +// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), +// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), +// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, +// CudaStream(), &cuda_used)); +// cuda_copy |= cuda_used; +// } +// } +// response_index++; +// #ifdef TRITON_ENABLE_GPU +// if (cuda_copy) { +// cudaStreamSynchronize(stream_); +// } +// #endif // TRITON_ENABLE_GPU +// } +// } + +// bls_defer.Complete(); +// for (uint32_t r = 0; r < request_count; ++r) { +// if (requires_deferred_callback[r]) { +// shm_responses[r]->DeferredSendCallback(); +// } +// } +// } + return nullptr; // success } diff --git a/src/response_sender.h b/src/response_sender.h index 69f416c2..6b0258fd 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -46,13 +46,13 @@ class ResponseSender { ~ResponseSender(); void Send(std::shared_ptr response, const uint32_t flags); bool IsCancelled(); + void UpdateStateAndCounters( + const std::shared_ptr& response, const uint32_t flags); // Can be useful at stopping the model from sending any more responses. void Close(); private: - void UpdateStateAndCounters( - const std::shared_ptr& response, const uint32_t flags); void DeleteResponseFactory(); intptr_t request_address_; From b36b55d983925685244706fe3b176a3c1eaa02cd Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Mon, 16 Sep 2024 14:25:01 +0000 Subject: [PATCH 2/7] Improve perf --- src/infer_request.cc | 2 +- src/ipc_message.cc | 19 ++++++++++ src/ipc_message.h | 7 ++++ src/pb_stub.cc | 24 ++++++------ src/python_be.cc | 85 ++++++++++++++---------------------------- src/response_sender.cc | 14 +++---- 6 files changed, 76 insertions(+), 75 deletions(-) diff --git a/src/infer_request.cc b/src/infer_request.cc index 8a95b524..e5733662 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -484,7 +484,7 @@ InferRequest::Exec(const bool is_decoupled) { bi::scoped_lock lock{ *(ipc_message->ResponseMutex())}; - stub->SendIPCMessage(ipc_message); + stub->SendIPCUtilsMessage(ipc_message); ipc_message->ResponseCondition()->wait(lock); } diff --git a/src/ipc_message.cc b/src/ipc_message.cc index ea1dc5b0..1b813214 100644 --- a/src/ipc_message.cc +++ b/src/ipc_message.cc @@ -56,6 +56,19 @@ IPCMessage::Create( new IPCMessage(ipc_message_shm, response_mutex_shm, response_cond_shm)); } +std::unique_ptr +IPCMessage::Create(IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& message_handle) +{ + return std::unique_ptr(new IPCMessage(ipc_message_shm, message_handle)); +} + + AllocatedSharedMemory& +IPCMessage::GetAllocatedSharedMemory() +{ + return ipc_message_shm_; +} + std::unique_ptr IPCMessage::LoadFromSharedMemory( std::unique_ptr& shm_pool, @@ -133,4 +146,10 @@ IPCMessage::IPCMessage( ipc_message_handle_ = ipc_message_shm_.handle_; } +IPCMessage::IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle) +{ + ipc_message_handle_ = handle; + ipc_message_shm_ptr_ = ipc_message_shm; +} + }}}; // namespace triton::backend::python diff --git a/src/ipc_message.h b/src/ipc_message.h index 8e762b8f..c7d0ae9d 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -97,6 +97,10 @@ class IPCMessage { static std::unique_ptr Create( const std::unique_ptr& shm_pool, bool inline_response); + + static std::unique_ptr + Create(IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& message_handle); static std::unique_ptr LoadFromSharedMemory( std::unique_ptr& shm_pool, bi::managed_external_buffer::handle_t message_handle); @@ -108,6 +112,7 @@ class IPCMessage { bi::interprocess_mutex* ResponseMutex(); bi::managed_external_buffer::handle_t& Args(); bi::managed_external_buffer::handle_t ShmHandle(); + AllocatedSharedMemory& GetAllocatedSharedMemory(); private: AllocatedSharedMemory ipc_message_shm_; @@ -129,6 +134,8 @@ class IPCMessage { AllocatedSharedMemory& ipc_message_shm, AllocatedSharedMemory& response_mutex_shm, AllocatedSharedMemory& response_cond_shm); + + IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle); }; }}}; // namespace triton::backend::python diff --git a/src/pb_stub.cc b/src/pb_stub.cc index a4e051e9..e6c93214 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -653,9 +653,8 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) { py::list py_request_list = LoadRequestsFromSharedMemory(request_batch_shm_ptr); - std::unique_ptr execute_response = - IPCMessage::Create(shm_pool_, false /* Inline response */); - execute_response->Command() = PYTHONSTUB_ExecuteResponse; + std::unique_ptr execute_response; + // IPCMessage::Create(shm_pool_, false /* Inline response */); std::optional> response_batch; bool has_exception = false; @@ -713,9 +712,9 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) error_string; LOG_ERROR << err_message.c_str(); if (!response_batch) { - response_batch = shm_pool_->Construct(sizeof(ResponseBatch)); + response_batch = shm_pool_->Construct(sizeof(ResponseBatch) + sizeof(IPCMessageShm)); } - ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); response_batch_shm_ptr->has_error = true; @@ -733,14 +732,17 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } if (!response_batch) { - response_batch = shm_pool_->Construct(sizeof(ResponseBatch)); - ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get()); + response_batch = shm_pool_->Construct(sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr->batch_size = 0; } - ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get()); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr->has_error = false; response_batch_shm_ptr->is_error_set = false; + execute_response = IPCMessage::Create(reinterpret_cast(response_batch.value().data_.get()), response_batch.value().handle_); execute_response->Args() = response_batch.value().handle_; + execute_response->InlineResponse() = false; + execute_response->Command() = PYTHONSTUB_ExecuteResponse; _.Complete(); execute_finalize.Complete(); } @@ -813,15 +815,15 @@ Stub::ProcessReturnedResponses( request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); } } - response_batch = std::move(shm_pool_->Construct( + response_batch = std::move(shm_pool_->Construct(sizeof(IPCMessageShm) + requests_size * sizeof(bi::managed_external_buffer::handle_t) + sizeof(ResponseBatch))); ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.value().data_.get()); + reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); bi::managed_external_buffer::handle_t* responses_shm_handle = reinterpret_cast( - response_batch.value().data_.get() + sizeof(ResponseBatch)); + response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); for (size_t i = 0; i < responses_size; i++) { // Check the return type of execute function. diff --git a/src/python_be.cc b/src/python_be.cc index 78b306b9..361e1401 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -290,8 +290,8 @@ ModelInstanceState::SaveRequestsToSharedMemory( request, &request_timeout)); std::unique_ptr infer_request; - TRITONBACKEND_ResponseFactory* factory_ptr; - RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); + TRITONBACKEND_ResponseFactory* factory_ptr = nullptr; + // RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); infer_request = std::make_unique( id, correlation_id, pb_input_tensors, requested_output_names, @@ -322,8 +322,6 @@ ModelInstanceState::LaunchStubProcess() thread_pool_ = std::make_unique( model_state->StateForBackend()->thread_pool_size); - queue_monitor_thread_ = true; - queue_monitor_ = std::thread(&ModelInstanceState::MessageQueueMonitor, this); request_executor_ = std::make_unique( Stub()->ShmPool(), model_state->TritonServer()); @@ -685,44 +683,6 @@ ModelInstanceState::ExecuteBLSRequest( } } -void -ModelInstanceState::MessageQueueMonitor() -{ - while (queue_monitor_thread_) { - bi::managed_external_buffer::handle_t handle = - Stub()->ParentMessageQueue()->Pop(); - if (handle == DUMMY_MESSAGE) { - break; - } - std::unique_ptr message = - IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), handle); - - // Need to notify the model instance thread that the execute response has - // been received. - if (message->Command() == PYTHONSTUB_ExecuteResponse) { - std::lock_guard guard{mu_}; - received_message_ = std::move(message); - cv_.notify_one(); - } else if (message->Command() == PYTHONSTUB_ResponseSend) { - std::shared_ptr response_send_message = std::move(message); - std::packaged_task task([this, response_send_message] { - ResponseSendDecoupled(response_send_message); - }); - boost::asio::post(*thread_pool_, std::move(task)); - } else if ( - message->Command() == PYTHONSTUB_InferExecRequest || - message->Command() == PYTHONSTUB_InferStreamExecRequest) { - std::shared_ptr bls_execute = std::move(message); - std::packaged_task task([this, bls_execute] { - ExecuteBLSRequest( - bls_execute, - (bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest)); - }); - boost::asio::post(*thread_pool_, std::move(task)); - } - } -} - void ModelInstanceState::StubToParentMQMonitor() { @@ -769,6 +729,25 @@ ModelInstanceState::StubToParentMQMonitor() ProcessModelControlRequest(message); break; } + case PYTHONSTUB_ResponseSend: { + std::shared_ptr response_send_message = std::move(message); + std::packaged_task task([this, response_send_message] { + ResponseSendDecoupled(response_send_message); + }); + boost::asio::post(*thread_pool_, std::move(task)); + break; + } + case PYTHONSTUB_InferExecRequest: + case PYTHONSTUB_InferStreamExecRequest: { + std::shared_ptr bls_execute = std::move(message); + std::packaged_task task([this, bls_execute] { + ExecuteBLSRequest( + bls_execute, + (bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest)); + }); + boost::asio::post(*thread_pool_, std::move(task)); + break; + } default: { LOG_MESSAGE( TRITONSERVER_LOG_ERROR, "Unexpected message type received."); @@ -1228,26 +1207,23 @@ ModelInstanceState::ProcessRequests( IPCMessage::Create(Stub()->ShmPool(), false /*inline_response*/)); ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest; ipc_message->Args() = request_batch.handle_; - received_message_ = nullptr; + ScopedDefer execute_finalize([this] { // Push a dummy message to signal the thread to terminate. Stub()->StubMessageQueue()->Push(DUMMY_MESSAGE); }); + std::unique_ptr response; { - std::unique_lock guard{mu_}; Stub()->StubMessageQueue()->Push(ipc_message->ShmHandle()); - cv_.wait(guard, [this] { return received_message_ != nullptr; }); + bi::managed_external_buffer::handle_t response_message; + Stub()->ReceiveMessageFromStub(response_message); + response = IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message); } - - - AllocatedSharedMemory response_batch = Stub()->ShmPool()->Load(received_message_->Args()); - + char* ipc_message_shm = reinterpret_cast(response->GetAllocatedSharedMemory().data_.get());; ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.data_.get()); + reinterpret_cast(ipc_message_shm + sizeof(IPCMessageShm)); - received_message_.reset(); - uint64_t compute_end_ns = 0; SET_TIMESTAMP(compute_end_ns); reporter.SetComputeEndNs(compute_end_ns); @@ -1282,7 +1258,7 @@ ModelInstanceState::ProcessRequests( } bi::managed_external_buffer::handle_t* response_shm_handle = reinterpret_cast( - response_batch.data_.get() + sizeof(ResponseBatch)); + ipc_message_shm + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); // If the output provided by the model is in GPU, we will pass the list of // buffers provided by Triton to the stub process. @@ -1390,8 +1366,6 @@ ModelInstanceState::ProcessRequests( } } - // Finalize the execute. - execute_finalize.Complete(); } // If the output tensor is in GPU, there will be a second round trip @@ -1610,7 +1584,6 @@ ModelInstanceState::~ModelInstanceState() Stub()->TerminateStub(); TerminateMonitor(); Stub()->ClearQueues(); - received_message_.reset(); Stub().reset(); } diff --git a/src/response_sender.cc b/src/response_sender.cc index 0a88fb6b..cef4e3a7 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -69,7 +69,7 @@ ResponseSender::ResponseSender( ResponseSender::~ResponseSender() { - DeleteResponseFactory(); + // DeleteResponseFactory(); } void @@ -172,7 +172,7 @@ ResponseSender::Send( { bi::scoped_lock guard{send_message_payload->mu}; - stub->SendIPCMessage(ipc_message); + stub->SendIPCUtilsMessage(ipc_message); while (!send_message_payload->is_stub_turn) { send_message_payload->cv.wait(guard); } @@ -248,7 +248,7 @@ ResponseSender::Send( } if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - DeleteResponseFactory(); + // DeleteResponseFactory(); } } @@ -270,10 +270,10 @@ ResponseSender::DeleteResponseFactory() { bool already_deleted = response_factory_deleted_.exchange(true); if (!already_deleted) { - std::unique_ptr& stub = Stub::GetOrCreateInstance(); - stub->EnqueueCleanupId( - reinterpret_cast(response_factory_address_), - PYTHONSTUB_DecoupledResponseFactoryCleanup); + // std::unique_ptr& stub = Stub::GetOrCreateInstance(); + // stub->EnqueueCleanupId( + // reinterpret_cast(response_factory_address_), + // PYTHONSTUB_DecoupledResponseFactoryCleanup); } } From 3e9dcc562b69c8aa7d37672a3c1452a60cb66eb9 Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Mon, 16 Sep 2024 19:56:06 +0000 Subject: [PATCH 3/7] Fix cleanup --- src/python_be.cc | 199 ++++++++++++++++++++++++++--------------- src/python_be.h | 17 +++- src/response_sender.cc | 17 ++-- 3 files changed, 151 insertions(+), 82 deletions(-) diff --git a/src/python_be.cc b/src/python_be.cc index 361e1401..1c6c6505 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -291,7 +291,7 @@ ModelInstanceState::SaveRequestsToSharedMemory( std::unique_ptr infer_request; TRITONBACKEND_ResponseFactory* factory_ptr = nullptr; - // RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); + RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); infer_request = std::make_unique( id, correlation_id, pb_input_tensors, requested_output_names, @@ -1009,6 +1009,62 @@ ModelInstanceState::ProcessModelControlRequest( }); } +void +ModelInstanceState::SendMessageToStub( + bi::managed_external_buffer::handle_t message) +{ + Stub()->StubMessageQueue()->Push(message); +} + +void +ModelInstanceState::SendMessageAndReceiveResponse( + bi::managed_external_buffer::handle_t message, + bi::managed_external_buffer::handle_t& response, + std::shared_ptr>& responses, + TRITONBACKEND_Request** requests, const uint32_t request_count) +{ + SendMessageToStub(message); + + bi::managed_external_buffer::handle_t response_message; + auto error = Stub()->ReceiveMessageFromStub(response_message); + if (error != nullptr) { + RespondErrorToAllRequests( + TRITONSERVER_ErrorMessage(error), responses, requests, request_count); + + return; + } + + response = response_message; +} + +void +ModelInstanceState::RespondErrorToAllRequests( + const char* message, + std::shared_ptr>& responses, + TRITONBACKEND_Request** requests, const uint32_t request_count) +{ + for (uint32_t r = 0; r < request_count; ++r) { + if ((*responses)[r] == nullptr) + continue; + + std::string err_message = + std::string( + "Failed to process the request(s) for model instance '" + Name() + + "', message: ") + + message; + + TRITONSERVER_Error* err = + TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, err_message.c_str()); + LOG_IF_ERROR( + TRITONBACKEND_ResponseSend( + (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), + "failed sending response"); + + (*responses)[r] = nullptr; + TRITONSERVER_ErrorDelete(err); + } +} + void ModelInstanceState::StartMonitor() { @@ -1164,6 +1220,12 @@ ModelInstanceState::ResponseSendDecoupled( SetErrorForResponseSendMessage( send_message_payload, WrapTritonErrorInSharedPtr(error), error_message); } + + if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + lresponse_factory(reinterpret_cast(response_factory)); + } } TRITONSERVER_Error* @@ -1265,6 +1327,7 @@ ModelInstanceState::ProcessRequests( // bool has_gpu_output = false; std::vector requires_deferred_callback; + bool has_gpu_output = false; std::vector> shm_responses; std::vector, void*>>> gpu_output_buffers(request_count); @@ -1362,78 +1425,75 @@ ModelInstanceState::ProcessRequests( requires_deferred_callback[r] = require_deferred_callback; if (requires_deferred_callback[r]) { - // has_gpu_output = true; + has_gpu_output = true; } } - } - // If the output tensor is in GPU, there will be a second round trip // required for filling the GPU buffers provided by the main process. -// if (has_gpu_output) { -// ipc_message->Command() = -// PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers; -// gpu_buffer_helper.Complete(Stub()->ShmPool()); -// ipc_message->Args() = gpu_buffer_helper.ShmHandle(); -// SendMessageAndReceiveResponse( -// ipc_message->ShmHandle(), response_message, restart, responses, -// requests, 0); - -// bool cuda_copy = false; - -// uint32_t response_index = 0; -// for (auto& gpu_output_buffer : gpu_output_buffers) { -// for (auto& buffer_memory_pair : gpu_output_buffer) { -// auto& pb_memory = buffer_memory_pair.first; -// void* pointer = buffer_memory_pair.second; -// bool cuda_used = false; - -// if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { -// GUARDED_RESPOND_IF_ERROR( -// responses, response_index, -// CopyBuffer( -// "Failed to copy the output tensor to buffer.", -// TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, -// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, -// CudaStream(), &cuda_used)); -// cuda_copy |= cuda_used; -// } else if ( -// (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) && -// pb_memory->UseCUDASharedPool() && -// (pb_memory->DataPtr() != pointer)) { -// // If the data pointer from pb_memory is not the same as the -// // pointer, it means that the Triton-provided buffer is not used -// // during tensor transfer. Instead, an intermediate buffer that uses -// // CUDA shared memory pool is used. In this case, we need to copy -// // the data from the intermediate buffer back to the Triton-provided -// // buffer. -// GUARDED_RESPOND_IF_ERROR( -// responses, response_index, -// CopyBuffer( -// "Failed to copy the output tensor to buffer.", -// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), -// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), -// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, -// CudaStream(), &cuda_used)); -// cuda_copy |= cuda_used; -// } -// } -// response_index++; -// #ifdef TRITON_ENABLE_GPU -// if (cuda_copy) { -// cudaStreamSynchronize(stream_); -// } -// #endif // TRITON_ENABLE_GPU -// } -// } - -// bls_defer.Complete(); -// for (uint32_t r = 0; r < request_count; ++r) { -// if (requires_deferred_callback[r]) { -// shm_responses[r]->DeferredSendCallback(); -// } -// } -// } + if (has_gpu_output) { + ipc_message->Command() = + PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers; + gpu_buffer_helper.Complete(Stub()->ShmPool()); + ipc_message->Args() = gpu_buffer_helper.ShmHandle(); + bi::managed_external_buffer::handle_t response_message; + SendMessageAndReceiveResponse( + ipc_message->ShmHandle(), response_message, responses, requests, 0); + + bool cuda_copy = false; + + uint32_t response_index = 0; + for (auto& gpu_output_buffer : gpu_output_buffers) { + for (auto& buffer_memory_pair : gpu_output_buffer) { + auto& pb_memory = buffer_memory_pair.first; + void* pointer = buffer_memory_pair.second; + bool cuda_used = false; + + if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { + GUARDED_RESPOND_IF_ERROR( + responses, response_index, + CopyBuffer( + "Failed to copy the output tensor to buffer.", + TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, + pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, + CudaStream(), &cuda_used)); + cuda_copy |= cuda_used; + } else if ( + (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) && + pb_memory->UseCUDASharedPool() && + (pb_memory->DataPtr() != pointer)) { + // If the data pointer from pb_memory is not the same as the + // pointer, it means that the Triton-provided buffer is not used + // during tensor transfer. Instead, an intermediate buffer that uses + // CUDA shared memory pool is used. In this case, we need to copy + // the data from the intermediate buffer back to the Triton-provided + // buffer. + GUARDED_RESPOND_IF_ERROR( + responses, response_index, + CopyBuffer( + "Failed to copy the output tensor to buffer.", + TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), + TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), + pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, + CudaStream(), &cuda_used)); + cuda_copy |= cuda_used; + } + } + response_index++; +#ifdef TRITON_ENABLE_GPU + if (cuda_copy) { + cudaStreamSynchronize(stream_); + } +#endif // TRITON_ENABLE_GPU + } + } + + for (uint32_t r = 0; r < request_count; ++r) { + if (requires_deferred_callback[r]) { + shm_responses[r]->DeferredSendCallback(); + } + } + } return nullptr; // success } @@ -1575,9 +1635,6 @@ ModelInstanceState::~ModelInstanceState() if (Stub()->IsHealthy()) { // Wait for all the pending tasks to finish. thread_pool_->wait(); - // Push a dummy message to signal the thread to terminate. - Stub()->ParentMessageQueue()->Push(DUMMY_MESSAGE); - queue_monitor_.join(); } // Terminate stub first to allow any last messages to be received by the back // end before deallocating the queue memory diff --git a/src/python_be.h b/src/python_be.h index 59660fc4..4608298e 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -287,9 +287,6 @@ class ModelInstanceState : public BackendModelInstance { std::thread stub_to_parent_queue_monitor_; bool stub_to_parent_thread_; - // Queue monitor thread - std::thread queue_monitor_; - bool queue_monitor_thread_; std::mutex mu_; std::condition_variable cv_; std::unique_ptr received_message_; @@ -361,6 +358,20 @@ class ModelInstanceState : public BackendModelInstance { AllocatedSharedMemory& request_batch, std::shared_ptr>& responses); + void SendMessageAndReceiveResponse( + bi::managed_external_buffer::handle_t message, + bi::managed_external_buffer::handle_t& response, + std::shared_ptr>& responses, + TRITONBACKEND_Request** requests, const uint32_t request_count); + + void RespondErrorToAllRequests( + const char* message, + std::shared_ptr>& responses, + TRITONBACKEND_Request** requests, const uint32_t request_count); + + void SendMessageToStub( + bi::managed_external_buffer::handle_t message); + // Model instance stub std::unique_ptr& Stub() { return model_instance_stub_; } diff --git a/src/response_sender.cc b/src/response_sender.cc index cef4e3a7..043ef41d 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -69,7 +69,7 @@ ResponseSender::ResponseSender( ResponseSender::~ResponseSender() { - // DeleteResponseFactory(); + DeleteResponseFactory(); } void @@ -172,6 +172,10 @@ ResponseSender::Send( { bi::scoped_lock guard{send_message_payload->mu}; + // The server will destruct the response factory if the final flag is set. + if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { + response_factory_deleted_.exchange(true); + } stub->SendIPCUtilsMessage(ipc_message); while (!send_message_payload->is_stub_turn) { send_message_payload->cv.wait(guard); @@ -247,9 +251,6 @@ ResponseSender::Send( } } - if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - // DeleteResponseFactory(); - } } bool @@ -270,10 +271,10 @@ ResponseSender::DeleteResponseFactory() { bool already_deleted = response_factory_deleted_.exchange(true); if (!already_deleted) { - // std::unique_ptr& stub = Stub::GetOrCreateInstance(); - // stub->EnqueueCleanupId( - // reinterpret_cast(response_factory_address_), - // PYTHONSTUB_DecoupledResponseFactoryCleanup); + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + stub->EnqueueCleanupId( + reinterpret_cast(response_factory_address_), + PYTHONSTUB_DecoupledResponseFactoryCleanup); } } From dfe90749108f636fd47fdfe6fc788af36896cdac Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Tue, 17 Sep 2024 15:34:31 +0000 Subject: [PATCH 4/7] Review comments --- src/ipc_message.cc | 14 +++++--- src/ipc_message.h | 8 +++-- src/pb_stub.cc | 78 +++++++++++++++++++++++++----------------- src/pb_stub.h | 3 +- src/python_be.cc | 23 ++++++++----- src/python_be.h | 9 +++-- src/response_sender.cc | 1 - 7 files changed, 80 insertions(+), 56 deletions(-) diff --git a/src/ipc_message.cc b/src/ipc_message.cc index 1b813214..2fa13ba3 100644 --- a/src/ipc_message.cc +++ b/src/ipc_message.cc @@ -57,13 +57,15 @@ IPCMessage::Create( } std::unique_ptr -IPCMessage::Create(IPCMessageShm* ipc_message_shm, - bi::managed_external_buffer::handle_t& message_handle) +IPCMessage::Create( + IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& message_handle) { - return std::unique_ptr(new IPCMessage(ipc_message_shm, message_handle)); + return std::unique_ptr( + new IPCMessage(ipc_message_shm, message_handle)); } - AllocatedSharedMemory& +AllocatedSharedMemory& IPCMessage::GetAllocatedSharedMemory() { return ipc_message_shm_; @@ -146,7 +148,9 @@ IPCMessage::IPCMessage( ipc_message_handle_ = ipc_message_shm_.handle_; } -IPCMessage::IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle) +IPCMessage::IPCMessage( + IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& handle) { ipc_message_handle_ = handle; ipc_message_shm_ptr_ = ipc_message_shm; diff --git a/src/ipc_message.h b/src/ipc_message.h index c7d0ae9d..c3d1472e 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -98,8 +98,8 @@ class IPCMessage { const std::unique_ptr& shm_pool, bool inline_response); - static std::unique_ptr - Create(IPCMessageShm* ipc_message_shm, + static std::unique_ptr Create( + IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& message_handle); static std::unique_ptr LoadFromSharedMemory( std::unique_ptr& shm_pool, @@ -135,7 +135,9 @@ class IPCMessage { AllocatedSharedMemory& response_mutex_shm, AllocatedSharedMemory& response_cond_shm); - IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle); + IPCMessage( + IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& handle); }; }}}; // namespace triton::backend::python diff --git a/src/pb_stub.cc b/src/pb_stub.cc index e6c93214..4b7bffc1 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -654,7 +654,6 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) py::list py_request_list = LoadRequestsFromSharedMemory(request_batch_shm_ptr); std::unique_ptr execute_response; - // IPCMessage::Create(shm_pool_, false /* Inline response */); std::optional> response_batch; bool has_exception = false; @@ -675,8 +674,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) { NVTX_RANGE(nvtx_, "PyExecute " + name_); - execute_return = - model_instance_.attr("execute")(py_request_list); + execute_return = model_instance_.attr("execute")(py_request_list); bool is_coroutine = py::module::import("asyncio") .attr("iscoroutine")(execute_return) @@ -688,10 +686,12 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } else { py::object coroutine_return = RunCoroutine(execute_return, false /* in_background */); - ProcessReturnedResponses(py_request_list, coroutine_return, response_batch); + ProcessReturnedResponses( + py_request_list, coroutine_return, response_batch); } } else { - ProcessReturnedResponses(py_request_list, execute_return, response_batch); + ProcessReturnedResponses( + py_request_list, execute_return, response_batch); } } } @@ -712,11 +712,14 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) error_string; LOG_ERROR << err_message.c_str(); if (!response_batch) { - response_batch = shm_pool_->Construct(sizeof(ResponseBatch) + sizeof(IPCMessageShm)); - } - ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch = shm_pool_->Construct( + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + } + ResponseBatch* response_batch_shm_ptr = reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); + response_batch_shm_ptr = + reinterpret_cast(response_batch.value().data_.get()); response_batch_shm_ptr->has_error = true; error_string_shm = PbString::Create(shm_pool_, err_message); response_batch_shm_ptr->error = error_string_shm->ShmHandle(); @@ -732,14 +735,19 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } if (!response_batch) { - response_batch = shm_pool_->Construct(sizeof(ResponseBatch) + sizeof(IPCMessageShm)); - ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr->batch_size = 0; - } - ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch = shm_pool_->Construct( + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch_shm_ptr->batch_size = 0; + } + ResponseBatch* response_batch_shm_ptr = reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr->has_error = false; response_batch_shm_ptr->is_error_set = false; - execute_response = IPCMessage::Create(reinterpret_cast(response_batch.value().data_.get()), response_batch.value().handle_); + execute_response = IPCMessage::Create( + reinterpret_cast(response_batch.value().data_.get()), + response_batch.value().handle_); execute_response->Args() = response_batch.value().handle_; execute_response->InlineResponse() = false; execute_response->Command() = PYTHONSTUB_ExecuteResponse; @@ -761,7 +769,8 @@ Stub::ProcessResponse(InferResponse* response) void Stub::ProcessReturnedResponses( - py::list py_requests, py::object py_responses_obj, std::optional>& response_batch) + py::list py_requests, py::object py_responses_obj, + std::optional>& response_batch) { // Return if there is nothing to process. if (py::isinstance(py_responses_obj)) { @@ -812,29 +821,34 @@ Stub::ProcessReturnedResponses( std::shared_ptr response = py_responses[i].cast>(); - request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); + request->GetResponseSender()->UpdateStateAndCounters( + response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); } } - response_batch = std::move(shm_pool_->Construct(sizeof(IPCMessageShm) + + // Return all the created responses using response_batch. The reason + // that both of the paths are available is that sending the responses + // using response_batch is faster than using `response_sender`. + response_batch = std::move(shm_pool_->Construct( + sizeof(IPCMessageShm) + requests_size * sizeof(bi::managed_external_buffer::handle_t) + sizeof(ResponseBatch))); - ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); bi::managed_external_buffer::handle_t* responses_shm_handle = reinterpret_cast( - response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); - - for (size_t i = 0; i < responses_size; i++) { - // Check the return type of execute function. - InferRequest* infer_request = py_requests[i].cast(); - InferResponse* infer_response = py_responses[i].cast(); - infer_response->PruneOutputTensors( - infer_request->RequestedOutputNames()); - ProcessResponse(infer_response); - responses_shm_handle[i] = infer_response->ShmHandle(); - } - response_batch_shm_ptr->batch_size = requests_size; + response_batch.value().data_.get() + sizeof(ResponseBatch) + + sizeof(IPCMessageShm)); + + for (size_t i = 0; i < responses_size; i++) { + // Check the return type of execute function. + InferRequest* infer_request = py_requests[i].cast(); + InferResponse* infer_response = py_responses[i].cast(); + infer_response->PruneOutputTensors(infer_request->RequestedOutputNames()); + ProcessResponse(infer_response); + responses_shm_handle[i] = infer_response->ShmHandle(); + } + response_batch_shm_ptr->batch_size = requests_size; } py::object diff --git a/src/pb_stub.h b/src/pb_stub.h index 85a2783a..7d76ec9a 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -254,7 +254,8 @@ class Stub { void ProcessRequests(RequestBatch* request_batch_shm_ptr); void ProcessReturnedResponses( - py::list py_requests, py::object py_responses_obj, std::optional>& response_batch); + py::list py_requests, py::object py_responses_obj, + std::optional>& response_batch); void ProcessResponse(InferResponse* response); diff --git a/src/python_be.cc b/src/python_be.cc index 1c6c6505..8e78ecd7 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1023,7 +1023,7 @@ ModelInstanceState::SendMessageAndReceiveResponse( std::shared_ptr>& responses, TRITONBACKEND_Request** requests, const uint32_t request_count) { - SendMessageToStub(message); + SendMessageToStub(message); bi::managed_external_buffer::handle_t response_message; auto error = Stub()->ReceiveMessageFromStub(response_message); @@ -1224,7 +1224,8 @@ ModelInstanceState::ResponseSendDecoupled( if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> - lresponse_factory(reinterpret_cast(response_factory)); + lresponse_factory( + reinterpret_cast(response_factory)); } } @@ -1280,12 +1281,15 @@ ModelInstanceState::ProcessRequests( Stub()->StubMessageQueue()->Push(ipc_message->ShmHandle()); bi::managed_external_buffer::handle_t response_message; Stub()->ReceiveMessageFromStub(response_message); - response = IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message); + response = + IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message); } - char* ipc_message_shm = reinterpret_cast(response->GetAllocatedSharedMemory().data_.get());; + char* ipc_message_shm = + reinterpret_cast(response->GetAllocatedSharedMemory().data_.get()); + ; ResponseBatch* response_batch_shm_ptr = reinterpret_cast(ipc_message_shm + sizeof(IPCMessageShm)); - + uint64_t compute_end_ns = 0; SET_TIMESTAMP(compute_end_ns); reporter.SetComputeEndNs(compute_end_ns); @@ -1304,10 +1308,10 @@ ModelInstanceState::ProcessRequests( } if (response_batch_shm_ptr->batch_size > 0) { - std::shared_ptr> responses( - new std::vector()); + std::shared_ptr> responses( + new std::vector()); responses->reserve(request_count); - for (size_t i = 0; i < request_count; i++) { + for (size_t i = 0; i < request_count; i++) { TRITONBACKEND_Response* response; auto err = TRITONBACKEND_ResponseNew(&response, requests[i]); if (err == nullptr) { @@ -1324,7 +1328,6 @@ ModelInstanceState::ProcessRequests( // If the output provided by the model is in GPU, we will pass the list of // buffers provided by Triton to the stub process. - // bool has_gpu_output = false; std::vector requires_deferred_callback; bool has_gpu_output = false; @@ -1429,6 +1432,8 @@ ModelInstanceState::ProcessRequests( } } + execute_finalize.Complete(); + // If the output tensor is in GPU, there will be a second round trip // required for filling the GPU buffers provided by the main process. if (has_gpu_output) { diff --git a/src/python_be.h b/src/python_be.h index 4608298e..34871ea5 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -365,12 +365,11 @@ class ModelInstanceState : public BackendModelInstance { TRITONBACKEND_Request** requests, const uint32_t request_count); void RespondErrorToAllRequests( - const char* message, - std::shared_ptr>& responses, - TRITONBACKEND_Request** requests, const uint32_t request_count); + const char* message, + std::shared_ptr>& responses, + TRITONBACKEND_Request** requests, const uint32_t request_count); - void SendMessageToStub( - bi::managed_external_buffer::handle_t message); + void SendMessageToStub(bi::managed_external_buffer::handle_t message); // Model instance stub std::unique_ptr& Stub() { return model_instance_stub_; } diff --git a/src/response_sender.cc b/src/response_sender.cc index 043ef41d..7df90ec2 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -250,7 +250,6 @@ ResponseSender::Send( "An error occurred while sending a response."); } } - } bool From 7bf6d9f318b0cf84ef4df50c6a4195a972d2760c Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Tue, 17 Sep 2024 19:50:52 +0000 Subject: [PATCH 5/7] Fix up --- src/pb_stub.cc | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 4b7bffc1..d6e50e38 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -659,6 +659,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) bool has_exception = false; std::string error_string; std::unique_ptr error_string_shm; + std::string err_message; ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); }); ScopedDefer _( @@ -705,11 +706,10 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } if (has_exception) { - std::string err_message = - std::string( - "Failed to process the request(s) for model '" + name_ + - "', message: ") + - error_string; + err_message = std::string( + "Failed to process the request(s) for model '" + name_ + + "', message: ") + + error_string; LOG_ERROR << err_message.c_str(); if (!response_batch) { response_batch = shm_pool_->Construct( @@ -718,12 +718,11 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) ResponseBatch* response_batch_shm_ptr = reinterpret_cast( response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr = - reinterpret_cast(response_batch.value().data_.get()); response_batch_shm_ptr->has_error = true; error_string_shm = PbString::Create(shm_pool_, err_message); response_batch_shm_ptr->error = error_string_shm->ShmHandle(); response_batch_shm_ptr->is_error_set = true; + response_batch_shm_ptr->batch_size = 0; // Once the error is sent to the backend, the backend is supposed to close // all response factories if not already closed, so closing all response // senders if not already closed to prevent the model from sending more @@ -732,23 +731,25 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) InferRequest* request = py_request.cast(); request->GetResponseSender()->Close(); } - } - - if (!response_batch) { - response_batch = shm_pool_->Construct( - sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + } else { + if (!response_batch) { + response_batch = shm_pool_->Construct( + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch_shm_ptr->batch_size = 0; + } ResponseBatch* response_batch_shm_ptr = reinterpret_cast( response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr->batch_size = 0; + response_batch_shm_ptr->has_error = false; + response_batch_shm_ptr->is_error_set = false; } - ResponseBatch* response_batch_shm_ptr = reinterpret_cast( - response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr->has_error = false; - response_batch_shm_ptr->is_error_set = false; + execute_response = IPCMessage::Create( reinterpret_cast(response_batch.value().data_.get()), response_batch.value().handle_); - execute_response->Args() = response_batch.value().handle_; + execute_response->Args() = + response_batch.value().handle_ + sizeof(IPCMessageShm); execute_response->InlineResponse() = false; execute_response->Command() = PYTHONSTUB_ExecuteResponse; _.Complete(); From c42afe19f8a550c30f1657ac764bb780aed8de0d Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Mon, 23 Sep 2024 23:31:32 +0000 Subject: [PATCH 6/7] Fix up --- src/pb_stub.cc | 13 +++++++---- src/python_be.cc | 16 +++++++------ src/response_sender.cc | 5 ++-- src/response_sender.h | 3 +-- src/stub_launcher.cc | 53 +++--------------------------------------- src/stub_launcher.h | 3 +-- 6 files changed, 25 insertions(+), 68 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index d6e50e38..63bedd0c 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -820,8 +820,7 @@ Stub::ProcessReturnedResponses( std::string(py::str(py_responses[i].get_type())) + "'."); } - std::shared_ptr response = - py_responses[i].cast>(); + InferResponse* response = py_responses[i].cast(); request->GetResponseSender()->UpdateStateAndCounters( response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); } @@ -845,9 +844,13 @@ Stub::ProcessReturnedResponses( // Check the return type of execute function. InferRequest* infer_request = py_requests[i].cast(); InferResponse* infer_response = py_responses[i].cast(); - infer_response->PruneOutputTensors(infer_request->RequestedOutputNames()); - ProcessResponse(infer_response); - responses_shm_handle[i] = infer_response->ShmHandle(); + if (!py::isinstance(py_responses[i])) { + infer_response->PruneOutputTensors(infer_request->RequestedOutputNames()); + ProcessResponse(infer_response); + responses_shm_handle[i] = infer_response->ShmHandle(); + } else { + responses_shm_handle[i] = 0; + } } response_batch_shm_ptr->batch_size = requests_size; } diff --git a/src/python_be.cc b/src/python_be.cc index 8e78ecd7..b5334aa2 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1026,13 +1026,7 @@ ModelInstanceState::SendMessageAndReceiveResponse( SendMessageToStub(message); bi::managed_external_buffer::handle_t response_message; - auto error = Stub()->ReceiveMessageFromStub(response_message); - if (error != nullptr) { - RespondErrorToAllRequests( - TRITONSERVER_ErrorMessage(error), responses, requests, request_count); - - return; - } + Stub()->ReceiveMessageFromStub(response_message); response = response_message; } @@ -1355,6 +1349,14 @@ ModelInstanceState::ProcessRequests( (*responses)[r] = nullptr; continue; } + + if (response_shm_handle[r] == 0) { + LOG_IF_ERROR( + TRITONBACKEND_ResponseDelete((*responses)[r]), + "failed to delete response"); + (*responses)[r] = nullptr; + continue; + } infer_response = InferResponse::LoadFromSharedMemory( Stub()->ShmPool(), response_shm_handle[r], false /* open_cuda_handle */); diff --git a/src/response_sender.cc b/src/response_sender.cc index 7df90ec2..6639a3e3 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -74,7 +74,7 @@ ResponseSender::~ResponseSender() void ResponseSender::UpdateStateAndCounters( - const std::shared_ptr& response, const uint32_t flags) + InferResponse* response, const uint32_t flags) { if (is_decoupled_ == nullptr) { // TODO: Can a model access the response sender on a BLS infer request? @@ -106,6 +106,7 @@ ResponseSender::UpdateStateAndCounters( } if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { + response_factory_deleted_.exchange(true); closed_ = true; } number_of_response_sent_++; @@ -123,7 +124,7 @@ ResponseSender::Send( py::gil_scoped_release release; CheckResponseSenderArguments(infer_response, flags); - UpdateStateAndCounters(infer_response, flags); + UpdateStateAndCounters(infer_response.get(), flags); if (infer_response) { infer_response->PruneOutputTensors(requested_output_names_); } diff --git a/src/response_sender.h b/src/response_sender.h index 6b0258fd..6ca7e997 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -46,8 +46,7 @@ class ResponseSender { ~ResponseSender(); void Send(std::shared_ptr response, const uint32_t flags); bool IsCancelled(); - void UpdateStateAndCounters( - const std::shared_ptr& response, const uint32_t flags); + void UpdateStateAndCounters(InferResponse* response, const uint32_t flags); // Can be useful at stopping the model from sending any more responses. void Close(); diff --git a/src/stub_launcher.cc b/src/stub_launcher.cc index 828228e6..90834170 100644 --- a/src/stub_launcher.cc +++ b/src/stub_launcher.cc @@ -593,7 +593,7 @@ StubLauncher::ModelInstanceStubProcess() stub_message_queue_->Push(initialize_message->ShmHandle()); bi::managed_external_buffer::handle_t message; - RETURN_IF_ERROR(ReceiveMessageFromStub(message)); + ReceiveMessageFromStub(message); std::unique_ptr initialize_response_message = IPCMessage::LoadFromSharedMemory(shm_pool_, message); @@ -724,58 +724,11 @@ StubLauncher::KillStubProcess() #endif } -TRITONSERVER_Error* +void StubLauncher::ReceiveMessageFromStub( bi::managed_external_buffer::handle_t& message) { - bool success = false; - while (!success) { - uint64_t timeout_miliseconds = 1000; - { - boost::posix_time::ptime timeout = - boost::get_system_time() + - boost::posix_time::milliseconds(timeout_miliseconds); - - bi::scoped_lock lock(*health_mutex_, timeout); - - // Check if lock has been acquired. - if (lock) { - ipc_control_->stub_health = false; - } else { - // If it failed to obtain the lock, it means that the stub has been - // stuck or exited while holding the health mutex lock. - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, "Failed to obtain the health mutex."); - } - } - - message = parent_message_queue_->Pop( - timeout_miliseconds /* duration ms */, success); - - bool is_stub_alive = false; - { - boost::posix_time::ptime timeout = - boost::get_system_time() + boost::posix_time::seconds(1); - bi::scoped_lock lock(*health_mutex_, timeout); - if (lock) { - is_stub_alive = ipc_control_->stub_health; - } else { - // If It failed to obtain the lock, it means that the stub has been - // stuck or exited while holding the health mutex lock. - is_stub_alive = false; - } - } - - if (!success && !is_stub_alive) { - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, - (std::string("Stub process '") + model_instance_name_ + - "' is not healthy.") - .c_str()); - } - } - - return nullptr; // success + message = parent_message_queue_->Pop(); } void diff --git a/src/stub_launcher.h b/src/stub_launcher.h index 6c8dd910..714a8773 100644 --- a/src/stub_launcher.h +++ b/src/stub_launcher.h @@ -146,8 +146,7 @@ class StubLauncher { void KillStubProcess(); // Get a message from the stub process - TRITONSERVER_Error* ReceiveMessageFromStub( - bi::managed_external_buffer::handle_t& message); + void ReceiveMessageFromStub(bi::managed_external_buffer::handle_t& message); // Wait for stub process void WaitForStubProcess(); From 47adab9d83b93f7e801a77da3d5fd1e7e5229490 Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Tue, 24 Sep 2024 13:37:57 +0000 Subject: [PATCH 7/7] Fix response factory cleanup --- src/infer_request.h | 1 + src/python_be.cc | 37 ++++++++++++++++++++++++++++++------- src/response_sender.h | 1 + 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/src/infer_request.h b/src/infer_request.h index c67e2fb0..f368d692 100644 --- a/src/infer_request.h +++ b/src/infer_request.h @@ -96,6 +96,7 @@ class InferRequest { InferenceTrace& GetTrace(); uint32_t ReleaseFlags(); void SetReleaseFlags(const uint32_t& flags); + intptr_t GetResponseFactoryAddress() { return response_factory_address_; } #ifdef TRITON_PB_STUB std::shared_ptr Exec(const bool is_decoupled); diff --git a/src/python_be.cc b/src/python_be.cc index b5334aa2..7cbb4f4f 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1089,6 +1089,17 @@ ModelInstanceState::ResponseSendDecoupled( ResponseSendMessage* send_message_payload = reinterpret_cast(send_message.data_.get()); std::unique_ptr error_message; + ScopedDefer response_factory_deleter([send_message_payload] { + if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { + TRITONBACKEND_ResponseFactory* response_factory = + reinterpret_cast( + send_message_payload->response_factory_address); + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + lresponse_factory(reinterpret_cast( + response_factory)); + } + }); ScopedDefer _([send_message_payload] { { bi::scoped_lock guard{send_message_payload->mu}; @@ -1214,13 +1225,6 @@ ModelInstanceState::ResponseSendDecoupled( SetErrorForResponseSendMessage( send_message_payload, WrapTritonErrorInSharedPtr(error), error_message); } - - if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - std::unique_ptr< - TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> - lresponse_factory( - reinterpret_cast(response_factory)); - } } TRITONSERVER_Error* @@ -1291,6 +1295,15 @@ ModelInstanceState::ProcessRequests( if (response_batch_shm_ptr->has_error) { if (response_batch_shm_ptr->is_error_set) { + for (uint32_t r = 0; r < request_count; r++) { + TRITONBACKEND_ResponseFactory* response_factory = + reinterpret_cast( + pb_infer_requests[r]->GetResponseFactoryAddress()); + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + lresponse_factory(reinterpret_cast( + response_factory)); + } auto error = PbString::LoadFromSharedMemory( Stub()->ShmPool(), response_batch_shm_ptr->error); return TRITONSERVER_ErrorNew( @@ -1357,6 +1370,16 @@ ModelInstanceState::ProcessRequests( (*responses)[r] = nullptr; continue; } + { + TRITONBACKEND_ResponseFactory* response_factory = + reinterpret_cast( + pb_infer_requests[r]->GetResponseFactoryAddress()); + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + lresponse_factory( + reinterpret_cast( + response_factory)); + } infer_response = InferResponse::LoadFromSharedMemory( Stub()->ShmPool(), response_shm_handle[r], false /* open_cuda_handle */); diff --git a/src/response_sender.h b/src/response_sender.h index 6ca7e997..7fce9dd2 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -43,6 +43,7 @@ class ResponseSender { const std::set& requested_output_names, std::unique_ptr& shm_pool, const std::shared_ptr& pb_cancel); + intptr_t ResponseFactory() { return response_factory_address_; } ~ResponseSender(); void Send(std::shared_ptr response, const uint32_t flags); bool IsCancelled();