Skip to content

Commit

Permalink
Refactor AMQP logic to better isolate rust AMQP code from uAMQP code. (
Browse files Browse the repository at this point in the history
…#6008)

* refactor uAMQP and Rust AMQP into separate implementations for ease of use

* Add connection support; restructured tests to fail on RUST AMQP rather than attempting to run; removed some uAMQP-only features  (#5986)

* Checkpoint of connection logic

* Started implementing Rust based Connection by pulling out uAMQP artifacts

* Implemented AMQP Connection in Rust; started API surface refactoring for Rust APIs; Refactored tests to remove some uAMQP only elements.

* Don't leak runtime context on calls

* export UUID from AMQP

* Cleaned up some more elements; reduced scope of doxygen significantly

* runtime context needs to be process global not thread global; all tests pass or fail at this point

* Merged main into branch
  • Loading branch information
LarryOsterman committed Oct 8, 2024
1 parent 150d9a3 commit bae72f2
Show file tree
Hide file tree
Showing 383 changed files with 5,665 additions and 3,984 deletions.
4 changes: 2 additions & 2 deletions .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
"*nlohmann-json*",
"eng/docs/api/assets/**/*",
"eng/CredScanSuppression.json",
"sdk/core/azure-core-amqp/vendor/**/*",
"sdk/core/azure-core-amqp/rust_amqp/azure_core_amqp/**/*",
"sdk/core/azure-core-amqp/**/vendor/**/*",
"sdk/core/azure-core-amqp/**/rust_amqp/azure_core_amqp/**/*",
"*.toml",
"sdk/storage/*/NOTICE.txt",
"sdk/tables/*/NOTICE.txt"
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[workspace]
resolver = "2"
members = [
"sdk/core/azure-core-amqp/rust_amqp/azure_core_amqp",
"sdk/core/azure-core-amqp/rust_amqp/rust_wrapper",
"sdk/core/azure-core-amqp/src/impl/rust_amqp/rust_amqp/azure_core_amqp",
"sdk/core/azure-core-amqp/src/impl/rust_amqp/rust_amqp/rust_wrapper",
]

[workspace.package]
Expand All @@ -14,7 +14,7 @@ rust-version = "1.76"


[workspace.dependencies.azure_core_amqp]
path = "sdk/core/azure-core-amqp/rust_amqp/azure_core_amqp"
path = "sdk/core/azure-core-amqp/src/impl/rust_amqp/rust_amqp/azure_core_amqp"


[workspace.dependencies]
Expand Down
1 change: 1 addition & 0 deletions cmake-modules/AzureDoxygen.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ function(generate_documentation PROJECT_NAME PROJECT_VERSION)
set(DOXYGEN_REPEAT_BRIEF NO)

doxygen_add_docs(${PROJECT_NAME}-docs
./inc ./README.md
ALL
COMMENT "Generate documentation for ${PROJECT_NAME} with Doxygen Version ${DOXYGEN_VERSION}")
endif()
Expand Down
122 changes: 100 additions & 22 deletions sdk/core/azure-core-amqp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ if (USE_UAMQP)
set(build_as_object_library ON CACHE BOOL "Produce object library" FORCE)
set(atomic_refcount ON CACHE BOOL "Use atomic refcount" FORCE)

add_subdirectory(vendor/azure-uamqp-c SYSTEM)
add_subdirectory(src/impl/uamqp/vendor/azure-uamqp-c SYSTEM)

# uAMQP specific compiler settings. Primarily used to disable warnings in the uAMQP codebase.
if (MSVC)
Expand All @@ -67,8 +67,8 @@ endif()

if (USE_RUST_AMQP)

include (${CMAKE_SOURCE_DIR}/cmake-modules/acquire_rust.cmake)
install_rustup()
# include (${CMAKE_SOURCE_DIR}/cmake-modules/acquire_rust.cmake)
# install_rustup()

include(FetchContent)
FetchContent_Declare(
Expand All @@ -79,7 +79,7 @@ if (USE_RUST_AMQP)
FetchContent_MakeAvailable(Corrosion)

# Import targets defined in a package or workspace manifest `Cargo.toml` file
corrosion_import_crate(MANIFEST_PATH ${CMAKE_CURRENT_SOURCE_DIR}/rust_amqp/rust_wrapper/Cargo.toml)
corrosion_import_crate(MANIFEST_PATH ${CMAKE_CURRENT_SOURCE_DIR}/src/impl/rust_amqp/rust_amqp/rust_wrapper/Cargo.toml)
endif()


Expand Down Expand Up @@ -123,21 +123,13 @@ set (AZURE_CORE_AMQP_HEADER
)

set(AZURE_CORE_AMQP_SOURCE
src/amqp/cancellable.cpp
src/amqp/claim_based_security.cpp
src/amqp/connection.cpp
src/amqp/connection_string_credential.cpp
src/amqp/link.cpp
src/amqp/management.cpp
src/amqp/message_receiver.cpp
src/amqp/message_sender.cpp
src/amqp/private/claims_based_security_impl.hpp
src/amqp/private/connection_impl.hpp
src/amqp/private/link_impl.hpp
src/amqp/private/management_impl.hpp
src/amqp/private/message_receiver_impl.hpp
src/amqp/private/message_sender_impl.hpp
src/amqp/private/session_impl.hpp
src/amqp/private/unique_handle.hpp
src/amqp/session.cpp
src/common/global_state.cpp
Expand All @@ -160,25 +152,111 @@ set(AZURE_CORE_AMQP_SOURCE
src/models/private/source_impl.hpp
src/models/private/target_impl.hpp
src/models/private/value_impl.hpp
src/network/amqp_header_transport.cpp
src/network/private/transport_impl.hpp
src/network/private/transport_impl.hpp
src/network/sasl_transport.cpp
src/network/socket_listener.cpp
src/network/socket_transport.cpp
src/network/tls_transport.cpp
src/network/transport.cpp
src/private/package_version.hpp
)

if (USE_UAMQP)
add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER} $<TARGET_OBJECTS:uamqp>)
set(AZURE_UAMQP_SOURCE
src/impl/uamqp/amqp/cancellable.cpp
src/impl/uamqp/amqp/claim_based_security.cpp
src/impl/uamqp/amqp/connection.cpp
src/impl/uamqp/amqp/connection_string_credential.cpp
src/impl/uamqp/amqp/link.cpp
src/impl/uamqp/amqp/management.cpp
src/impl/uamqp/amqp/message_receiver.cpp
src/impl/uamqp/amqp/message_sender.cpp
src/impl/uamqp/amqp/session.cpp
src/impl/uamqp/amqp/private/claims_based_security_impl.hpp
src/impl/uamqp/amqp/private/connection_impl.hpp
src/impl/uamqp/amqp/private/link_impl.hpp
src/impl/uamqp/amqp/private/management_impl.hpp
src/impl/uamqp/amqp/private/message_receiver_impl.hpp
src/impl/uamqp/amqp/private/message_sender_impl.hpp
src/impl/uamqp/amqp/private/session_impl.hpp
# src/impl/uamqp/models/amqp_detach.cpp
# src/impl/uamqp/models/amqp_error.cpp
# src/impl/uamqp/models/amqp_header.cpp
# src/impl/uamqp/models/amqp_message.cpp
# src/impl/uamqp/models/amqp_properties.cpp
# src/impl/uamqp/models/amqp_transfer.cpp
# src/impl/uamqp/models/amqp_value.cpp
# src/impl/uamqp/models/message_source.cpp
# src/impl/uamqp/models/message_target.cpp
# src/impl/uamqp/models/messaging_values.cpp
# src/impl/uamqp/models/private/error_impl.hpp
# src/impl/uamqp/models/private/header_impl.hpp
# src/impl/uamqp/models/private/message_impl.hpp
# src/impl/uamqp/models/private/performatives
# src/impl/uamqp/models/private/properties_impl.hpp
# src/impl/uamqp/models/private/source_impl.hpp
# src/impl/uamqp/models/private/target_impl.hpp
# src/impl/uamqp/models/private/value_impl.hpp
# src/impl/uamqp/models/private/performatives/detach_impl.hpp
# src/impl/uamqp/models/private/performatives/transfer_impl.hpp
src/impl/uamqp/network/amqp_header_transport.cpp
src/impl/uamqp/network/sasl_transport.cpp
src/impl/uamqp/network/socket_listener.cpp
src/impl/uamqp/network/socket_transport.cpp
src/impl/uamqp/network/tls_transport.cpp
src/impl/uamqp/network/transport.cpp
src/impl/uamqp/network/private/transport_impl.hpp
)
endif()

if(USE_RUST_AMQP)
set(AZURE_RUST_AMQP_SOURCE
src/impl/rust_amqp/amqp/claim_based_security.cpp
src/impl/rust_amqp/amqp/connection.cpp
src/impl/rust_amqp/amqp/connection_string_credential.cpp
src/impl/rust_amqp/amqp/link.cpp
src/impl/rust_amqp/amqp/management.cpp
src/impl/rust_amqp/amqp/message_receiver.cpp
src/impl/rust_amqp/amqp/message_sender.cpp
src/impl/rust_amqp/amqp/session.cpp
src/impl/rust_amqp/amqp/private/claims_based_security_impl.hpp
src/impl/rust_amqp/amqp/private/connection_impl.hpp
src/impl/rust_amqp/amqp/private/link_impl.hpp
src/impl/rust_amqp/amqp/private/management_impl.hpp
src/impl/rust_amqp/amqp/private/message_receiver_impl.hpp
src/impl/rust_amqp/amqp/private/message_sender_impl.hpp
src/impl/rust_amqp/amqp/private/session_impl.hpp
# src/impl/rust_amqp/models/amqp_detach.cpp
# src/impl/rust_amqp/models/amqp_error.cpp
# src/impl/rust_amqp/models/amqp_header.cpp
# src/impl/rust_amqp/models/amqp_message.cpp
# src/impl/rust_amqp/models/amqp_properties.cpp
# src/impl/rust_amqp/models/amqp_transfer.cpp
# src/impl/rust_amqp/models/amqp_value.cpp
# src/impl/rust_amqp/models/message_source.cpp
# src/impl/rust_amqp/models/message_target.cpp
# src/impl/rust_amqp/models/messaging_values.cpp
# src/impl/rust_amqp/models/private/error_impl.hpp
# src/impl/rust_amqp/models/private/header_impl.hpp
# src/impl/rust_amqp/models/private/message_impl.hpp
# src/impl/rust_amqp/models/private/performatives
# src/impl/rust_amqp/models/private/properties_impl.hpp
# src/impl/rust_amqp/models/private/source_impl.hpp
# src/impl/rust_amqp/models/private/target_impl.hpp
# src/impl/rust_amqp/models/private/value_impl.hpp
# src/impl/rust_amqp/models/private/performatives/detach_impl.hpp
# src/impl/rust_amqp/models/private/performatives/transfer_impl.hpp
)

endif()

if (USE_UAMQP)
add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER} ${AZURE_UAMQP_SOURCE} $<TARGET_OBJECTS:uamqp>)
elseif(USE_RUST_AMQP)
add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER})
add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER} ${AZURE_RUST_AMQP_SOURCE})
endif()

if (USE_UAMQP)
target_include_directories(azure-core-amqp SYSTEM PRIVATE ${UAMQP_INC_FOLDER})
target_include_directories(azure-core-amqp PRIVATE src/impl/uamqp/amqp/private src/impl/uamqp/amqp/network ${UAMQP_INC_FOLDER})
endif()

if (USE_RUST_AMQP)
target_include_directories(azure-core-amqp PRIVATE src/impl/rust_amqp/amqp/private src/impl/rust_amqp/rust_amqp/rust_wrapper)
endif()

target_include_directories(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@
#include <thread>

#if ENABLE_RUST_AMQP
#include "thread_context.hpp"
#include "runtime_context.hpp"
#endif

namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace _detail {

#if ENABLE_RUST_AMQP
extern thread_local RustThreadContext RustThreadContextInstance;
#endif

#if ENABLE_UAMQP
/**
* uAMQP and azure-c-shared-util require that the platform_init and platform_uninit
Expand Down Expand Up @@ -55,6 +51,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
std::thread m_pollingThread;
std::atomic<bool> m_activelyPolling;
bool m_stopped{false};
#elif ENABLE_RUST_AMQP
RustRuntimeContext m_runtimeContext;
#endif

public:
Expand All @@ -70,6 +68,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
void AddPollable(std::shared_ptr<Pollable> pollable);

void RemovePollable(std::shared_ptr<Pollable> pollable);
#elif ENABLE_RUST_AMQP
Azure::Core::Amqp::_detail::RustRuntimeContext* GetRuntimeContext()
{
return m_runtimeContext.GetRuntimeContext();
}
#endif

void AssertIdle()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
// Licensed under the MIT License.

#pragma once

#if ENABLE_RUST_AMQP
#include "../rust_amqp/rust_wrapper/rust_amqp_wrapper.h"
#include "../src/amqp/private/unique_handle.hpp"
#include "rust_amqp_wrapper.h"

#include <azure/core/azure_assert.hpp>

Expand All @@ -31,16 +30,23 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace

using UniqueRustRuntimeContext = Azure::Core::Amqp::_detail::UniqueHandleHelper<
Azure::Core::Amqp::_detail::RustRuntimeContext>::type;
class RustThreadContext final {

/**
* @brief Represents the an implementation of the rust multithreaded runtime.
*
* Needed to implement blocking Rust API calls.
*/
class RustRuntimeContext final {

UniqueRustRuntimeContext m_runtimeContext;

public:
RustThreadContext()
: m_runtimeContext(Azure::Core::Amqp::_detail::RustInterop::runtime_context_new())
RustRuntimeContext()
: m_runtimeContext{Azure::Core::Amqp::_detail::RustInterop::runtime_context_new()}
{
}
Azure::Core::Amqp::_detail::RustRuntimeContext* GetRuntimeContext() const noexcept

Azure::Core::Amqp::_detail::RustRuntimeContext* GetRuntimeContext()
{
return m_runtimeContext.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
namespace Azure { namespace Core { namespace Amqp { namespace _detail {
class LinkImpl;

#if ENABLE_UAMQP
enum class LinkState
{
Invalid,
Expand All @@ -48,6 +49,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
};

std::ostream& operator<<(std::ostream& stream, LinkState linkState);
#endif

enum class LinkTransferResult
{
Expand All @@ -65,11 +67,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Invalid
};

#if defined(_azure_TESTING_BUILD)

// Note that this entire class is a test hook to enable testing of the Link family of apis. It is
// not exposed to customers because there are no customer scenarios for it.
#if defined(_azure_TESTING_BUILD)
class Link;
#if ENABLE_UAMQP
class LinkImplEvents;
class LinkImplEventsImpl;

Expand All @@ -89,6 +91,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
virtual void OnLinkFlowOn(Link const& link) = 0;
virtual ~LinkEvents() = default;
};
#endif

class Link final {
public:
Expand Down Expand Up @@ -165,7 +168,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
friend class LinkImpl;
friend class LinkImplEventsImpl;
Link(std::shared_ptr<LinkImpl> impl) : m_impl{impl} {}
#if ENABLE_UAMQP
std::shared_ptr<LinkImplEvents> m_implEvents;
#endif
std::shared_ptr<LinkImpl> m_impl;
};
#endif // _azure_TESTING_BUILD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
}}}} // namespace Azure::Core::Amqp::_detail

namespace Azure { namespace Core { namespace Amqp { namespace _internal {
#if ENABLE_UAMQP
enum class MessageSendStatus
{
Invalid,
Expand All @@ -51,8 +52,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Error,
};
std::ostream& operator<<(std::ostream& stream, MessageSenderState state);

#endif
class MessageSender;
#if ENABLE_UAMQP
class MessageSenderEvents {
protected:
~MessageSenderEvents() = default;
Expand All @@ -68,6 +70,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Models::_internal::AmqpError const& error)
= 0;
};
#endif

struct MessageSenderOptions final
{
Expand Down Expand Up @@ -128,9 +131,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {

class MessageSender final {
public:
#if ENABLE_UAMQP
using MessageSendCompleteCallback
= std::function<void(MessageSendStatus sendResult, Models::AmqpValue const& deliveryState)>;

#endif
~MessageSender() noexcept;

MessageSender(MessageSender const&) = default;
Expand Down Expand Up @@ -166,6 +170,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
std::uint64_t GetMaxMessageSize() const;

#if ENABLE_UAMQP
/** @brief Send a message synchronously to the target of the message sender.
*
* @param message The message to send.
Expand All @@ -176,7 +181,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
_azure_NODISCARD std::tuple<MessageSendStatus, Models::_internal::AmqpError> Send(
Models::AmqpMessage const& message,
Context const& context = {});
#elif ENABLE_RUST_AMQP
_azure_NODISCARD Models::_internal::AmqpError Send(
Models::AmqpMessage const& message,
Context const& context = {});

#endif
private:
// Half-open the message sender (does not block waiting on the Open to complete).
_azure_NODISCARD Models::_internal::AmqpError HalfOpen(Context const& context = {});
Expand Down
Loading

0 comments on commit bae72f2

Please sign in to comment.