From ffa98fdd1bc521998f79a12510e5b53f45b74330 Mon Sep 17 00:00:00 2001 From: Max Date: Tue, 29 Apr 2025 06:45:48 +0200 Subject: [PATCH 01/12] initial setup of header filers --- CMakeLists.txt | 5 ++- .../usubscription/v3/RpcClientUSubscription.h | 41 +++++++++++++++++++ .../client/usubscription/v3/USubscription.h | 31 ++++++++++++++ .../v3/RpcClientUSubscription.cpp | 18 ++++++++ 4 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h create mode 100644 include/up-cpp/client/usubscription/v3/USubscription.h create mode 100644 src/client/usubscription/v3/RpcClientUSubscription.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 256391aae..89528e261 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -48,7 +48,10 @@ endif() file(GLOB_RECURSE SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp") -add_library(${PROJECT_NAME} ${SRC_FILES}) +add_library(${PROJECT_NAME} ${SRC_FILES} + src/client/usubscription/v3/RpcClientUSubscription.cpp + include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h + include/up-cpp/client/usubscription/v3/USubscription.h) add_library(up-cpp::${PROJECT_NAME} ALIAS ${PROJECT_NAME}) target_include_directories(${PROJECT_NAME} diff --git a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h new file mode 100644 index 000000000..7d2bf43aa --- /dev/null +++ b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h @@ -0,0 +1,41 @@ +// +// Created by max on 28.04.25. +// + +#ifndef RPCCLIENTUSUBSCRIPTION_H +#define RPCCLIENTUSUBSCRIPTION_H + +#include + +#include "USubscription.h" +#include "up-cpp/communication/RpcClient.h" + +namespace uprotocol::core::usubscription::v3 { + +struct RpcClientUSubscription : USubscription { + + explicit RpcClientUSubscription(std::unique_ptr client) + : client_(std::move(client)){}; + + void default_call_option(); + + SubscriptionResponse subscribe(const SubscriptionRequest& subscription_request) override; + + UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) override; + + FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) override; + + NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) override; + + NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) override; + + FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) override; + +private: + std::unique_ptr client_; + +}; + +} // namespace uprotocol::core::usubscription::v3 + +#endif //RPCCLIENTUSUBSCRIPTION_H diff --git a/include/up-cpp/client/usubscription/v3/USubscription.h b/include/up-cpp/client/usubscription/v3/USubscription.h new file mode 100644 index 000000000..53aafb349 --- /dev/null +++ b/include/up-cpp/client/usubscription/v3/USubscription.h @@ -0,0 +1,31 @@ +// +// Created by max on 28.04.25. +// + +#ifndef USUBSCRIPTION_H +#define USUBSCRIPTION_H +#include "RpcClientUSubscription.h" + +namespace uprotocol::core::usubscription::v3 { + + struct USubscription { + + virtual ~USubscription() = default; + + virtual SubscriptionResponse subscribe(const SubscriptionRequest& subscription_request) = 0; + + virtual UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) = 0; + + virtual FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) = 0; + + virtual NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) =0 ; + + virtual NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) = 0; + + virtual FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) = 0; + + }; + +} // namespace uprotocol::core::usubscription::v3 + +#endif //USUBSCRIPTION_H diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp new file mode 100644 index 000000000..999e584f4 --- /dev/null +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -0,0 +1,18 @@ +// +// Created by max on 28.04.25. +// + +#include "up-cpp/client/usubscription/v3/RpcClientUSubscription.h" + +namespace uprotocol::core::usubscription::v3 { + using Payload = datamodel::builder::Payload; + + SubscriptionResponse RpcClientUSubscription::subscribe( + const SubscriptionRequest& subscription_request) { + Payload test_test(subscription_request); + auto invoke_handle = client_->invokeMethod(test_test, //TODO(max)); + return SubscriptionResponse(); + } + + } // namespace uprotocol::core::usubscription::v3 + From a980b37be50704f8afadf86a7b43f9754c249464 Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Tue, 29 Apr 2025 15:42:06 +0200 Subject: [PATCH 02/12] Update RpcClientUSubscription to protobuf singature. Add subscription method, which is currently in dev --- .../usubscription/v3/RpcClientUSubscription.h | 84 +++++++++++++++---- .../client/usubscription/v3/USubscription.h | 6 +- .../v3/RpcClientUSubscription.cpp | 71 +++++++++++++--- 3 files changed, 127 insertions(+), 34 deletions(-) diff --git a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h index 7d2bf43aa..d351dd3fa 100644 --- a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h +++ b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h @@ -1,39 +1,87 @@ -// -// Created by max on 28.04.25. -// - #ifndef RPCCLIENTUSUBSCRIPTION_H #define RPCCLIENTUSUBSCRIPTION_H #include +#include "Consumer.h" -#include "USubscription.h" +// #include "USubscription.h" #include "up-cpp/communication/RpcClient.h" namespace uprotocol::core::usubscription::v3 { -struct RpcClientUSubscription : USubscription { - - explicit RpcClientUSubscription(std::unique_ptr client) - : client_(std::move(client)){}; - +struct RpcClientUSubscription : public uSubscription { +public: + RpcClientUSubscription() = default; + ~RpcClientUSubscription() override = default; + + explicit RpcClientUSubscription(std::unique_ptr rpc_client) + : rpc_client_(std::move(rpc_client)){}; + + + void Subscribe(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::SubscriptionRequest* request, + ::uprotocol::core::usubscription::v3::SubscriptionResponse* response, + ::google::protobuf::Closure* done) override; + + void Unsubscribe(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, + ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, + ::google::protobuf::Closure* done) override; + + void FetchSubscriptions(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request, + ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response, + ::google::protobuf::Closure* done) override; + + void RegisterForNotifications(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, + ::uprotocol::core::usubscription::v3::NotificationsResponse* response, + ::google::protobuf::Closure* done) override; + + void UnregisterForNotifications(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, + ::uprotocol::core::usubscription::v3::NotificationsResponse* response, + ::google::protobuf::Closure* done) override; + + void FetchSubscribers(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request, + ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response, + ::google::protobuf::Closure* done) override; + + void Reset(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::ResetRequest* request, + ::uprotocol::core::usubscription::v3::ResetResponse* response, + ::google::protobuf::Closure* done) override; + +// Backup void default_call_option(); - SubscriptionResponse subscribe(const SubscriptionRequest& subscription_request) override; + // SubscriptionResponse subscribe(const SubscriptionRequest& subscription_request); - UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) override; + // UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request); - FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) override; + // FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request); - NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) override; + // NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request); - NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) override; + // NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request); - FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) override; + // FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request); private: - std::unique_ptr client_; - + // RPC request + std::unique_ptr rpc_client_; + communication::RpcClient::InvokeHandle rpc_handle_; + SubscriptionResponse subscription_response_; + + // Transport + std::shared_ptr transport_; + + // URI info about the uSubscription service + client::usubscription::v3::USubscriptionUUriBuilder uSubscriptionUUriBuilder_; + + // Topic to subscribe to + const v1::UUri subscription_topic_; }; } // namespace uprotocol::core::usubscription::v3 diff --git a/include/up-cpp/client/usubscription/v3/USubscription.h b/include/up-cpp/client/usubscription/v3/USubscription.h index 53aafb349..0de39ca93 100644 --- a/include/up-cpp/client/usubscription/v3/USubscription.h +++ b/include/up-cpp/client/usubscription/v3/USubscription.h @@ -1,10 +1,6 @@ -// -// Created by max on 28.04.25. -// - #ifndef USUBSCRIPTION_H #define USUBSCRIPTION_H -#include "RpcClientUSubscription.h" +#include namespace uprotocol::core::usubscription::v3 { diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index 999e584f4..f7d04e399 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -1,18 +1,67 @@ -// -// Created by max on 28.04.25. -// - #include "up-cpp/client/usubscription/v3/RpcClientUSubscription.h" +#include +#include namespace uprotocol::core::usubscription::v3 { using Payload = datamodel::builder::Payload; - SubscriptionResponse RpcClientUSubscription::subscribe( - const SubscriptionRequest& subscription_request) { - Payload test_test(subscription_request); - auto invoke_handle = client_->invokeMethod(test_test, //TODO(max)); - return SubscriptionResponse(); - } +void RpcClientUSubscription::Subscribe( + google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::SubscriptionRequest* request, + ::uprotocol::core::usubscription::v3::SubscriptionResponse* response, + ::google::protobuf::Closure* done) { + + constexpr uint16_t RESOURCE_ID_SUBSCRIBE = 0x0001; + // TODO(lennart): needs to be set + v1::UPriority priority; + std::chrono::milliseconds subscription_request_ttl; + + if ((request == nullptr) || (response == nullptr) || (done == nullptr)) { + controller->SetFailed("Invalid input parameters"); + done->Run(); + return; + } + + rpc_client_ = std::make_unique( + transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_SUBSCRIBE), + priority, subscription_request_ttl); + + auto on_response = [this, response](const auto& maybe_response) { + if (maybe_response.has_value() && + maybe_response.value().has_payload()) { + if (response->ParseFromString(maybe_response.value().payload())) { + if (response->topic().SerializeAsString() == + subscription_topic_.SerializeAsString()) { + subscription_response_ = *response; + } + } + } + }; + + try { + auto payload = datamodel::builder::Payload(request); + + rpc_handle_ = rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + + if (static_cast(rpc_handle_.isConnected()) == v1::OK) { // TODO(lennart): check if this is correct + response->CopyFrom(rpc_handle_.value); // Copy data to the response + } else { + controller->SetFailed("RPC call failed"); + } + + } catch (const std::exception& e) { + controller->SetFailed(std::string("Exception during Subscribe: ") + e.what()); + } + + done->Run(); +} - } // namespace uprotocol::core::usubscription::v3 +// Backup + // SubscriptionResponse RpcClientUSubscription::subscribe( + // const SubscriptionRequest& subscription_request) { + // Payload test_test(subscription_request); + // auto invoke_handle = client_->invokeMethod(test_test, //TODO(max)); + // return SubscriptionResponse(); + // } +} // namespace uprotocol::core::usubscription::v3 From 24a0ce43eba607ae3ce318659e882a65e546b362 Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Tue, 29 Apr 2025 16:31:01 +0200 Subject: [PATCH 03/12] Update RpcLientUSubscription --- src/client/usubscription/v3/RpcClientUSubscription.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index f7d04e399..01486ff09 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -43,6 +43,9 @@ void RpcClientUSubscription::Subscribe( rpc_handle_ = rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + auto result = communication::Subscriber::subscribe( + transport_, subscription_topic_, std::move(callback)); // TODO(lennart) callback? + if (static_cast(rpc_handle_.isConnected()) == v1::OK) { // TODO(lennart): check if this is correct response->CopyFrom(rpc_handle_.value); // Copy data to the response } else { From d86189cce06543360712010340adac22a2e6d377 Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Wed, 30 Apr 2025 12:15:49 +0200 Subject: [PATCH 04/12] before using Consumer --- src/client/usubscription/v3/RpcClientUSubscription.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index 01486ff09..dca072b15 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -39,7 +39,7 @@ void RpcClientUSubscription::Subscribe( }; try { - auto payload = datamodel::builder::Payload(request); + auto payload = datamodel::builder::Payload(*request); rpc_handle_ = rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); From 26e716130cdff66fa21fc1cef3d707c49a7a1928 Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Wed, 30 Apr 2025 16:27:24 +0200 Subject: [PATCH 05/12] Building without error. Test and check if functionality is correct. --- .../usubscription/v3/RpcClientUSubscription.h | 253 +++++++++++++----- .../v3/RpcClientUSubscription.cpp | 198 +++++++++++--- 2 files changed, 352 insertions(+), 99 deletions(-) diff --git a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h index d351dd3fa..399dc674c 100644 --- a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h +++ b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h @@ -1,89 +1,218 @@ +// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + #ifndef RPCCLIENTUSUBSCRIPTION_H #define RPCCLIENTUSUBSCRIPTION_H +#include +#include +#include +#include +#include #include -#include "Consumer.h" +#include -// #include "USubscription.h" -#include "up-cpp/communication/RpcClient.h" +#include namespace uprotocol::core::usubscription::v3 { +using uprotocol::core::usubscription::v3::SubscriptionRequest; +using uprotocol::core::usubscription::v3::UnsubscribeRequest; +using uprotocol::core::usubscription::v3::Update; +using uprotocol::core::usubscription::v3::uSubscription; + +/** + * @struct RpcClientUSubscriptionOptions + * @brief Additional details for uSubscription service. + * + * Each member represents an optional parameter for the uSubscription service. + */ +struct RpcClientUSubscriptionOptions { + /// Permission level of the subscription request + std::optional permission_level; + /// TAP token for access. + std::optional token; + /// Expiration time of the subscription. + std::optional when_expire; + /// Sample period for the subscription messages in milliseconds. + std::optional sample_period_ms; + /// Details of the subscriber. + std::optional subscriber_details; + /// Details of the subscription. + std::optional subscription_details; +}; -struct RpcClientUSubscription : public uSubscription { -public: - RpcClientUSubscription() = default; - ~RpcClientUSubscription() override = default; - - explicit RpcClientUSubscription(std::unique_ptr rpc_client) - : rpc_client_(std::move(rpc_client)){}; +/// @struct uSubscriptionUUriBuilder +/// @brief Structure to build uSubscription request URIs. +/// +/// This structure is used to build URIs for uSubscription service. It uses the +/// service options from uSubscription proto to set the authority name, ue_id, +/// ue_version_major, and the notification topic resource ID in the URI. +struct USubscriptionUUriBuilder { +private: + /// URI for the uSubscription service + v1::UUri uri_; + /// Resource ID of the notification topic + uint32_t sink_resource_id_; - - void Subscribe(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::SubscriptionRequest* request, - ::uprotocol::core::usubscription::v3::SubscriptionResponse* response, - ::google::protobuf::Closure* done) override; +public: + /// @brief Constructor for USubscriptionUUriBuilder. + USubscriptionUUriBuilder() { + // Get the service descriptor + const google::protobuf::ServiceDescriptor* service = + uSubscription::descriptor(); + const auto& service_options = service->options(); + + // Get the service options + const auto& service_name = + service_options.GetExtension(uprotocol::service_name); + const auto& service_version_major = + service_options.GetExtension(uprotocol::service_version_major); + const auto& service_id = + service_options.GetExtension(uprotocol::service_id); + const auto& notification_topic = + service_options.GetExtension(uprotocol::notification_topic, 0); + + // Set the values in the URI + uri_.set_authority_name(service_name); + uri_.set_ue_id(service_id); + uri_.set_ue_version_major(service_version_major); + sink_resource_id_ = notification_topic.id(); + } + + /// @brief Get the URI with a specific resource ID. + /// + /// @param resource_id The resource ID to set in the URI. + /// + /// @return The URI with the specified resource ID. + v1::UUri getServiceUriWithResourceId(uint32_t resource_id) const { + v1::UUri uri = uri_; // Copy the base URI + uri.set_resource_id(resource_id); + return uri; + } + + /// @brief Get the notification URI. + /// + /// @return The notification URI. + v1::UUri getNotificationUri() const { + v1::UUri uri = uri_; // Copy the base URI + uri.set_resource_id(sink_resource_id_); + return uri; + } +}; +/// @brief Interface for uEntities to create subscriptions. +/// +/// Like all L3 client APIs, the RpcClientUSubscription is a wrapper on top of the +/// L2 Communication APIs and USubscription service. +struct RpcClientUSubscription : public uSubscription{ + using RpcClientUSubscriptionOrStatus = + utils::Expected, v1::UStatus>; + using ListenCallback = transport::UTransport::ListenCallback; + using ListenHandle = transport::UTransport::ListenHandle; + using SubscriptionResponse = core::usubscription::v3::SubscriptionResponse; + + /// @brief Create a subscription + /// + /// @param transport Transport to register with. + /// @param subscription_topic Topic to subscribe to. + /// @param callback Function that is called when publish message is + /// received. + /// @param priority Priority of the subscription request. + /// @param subscribe_request_ttl Time to live for the subscription request. + /// @param rpc_client_usubscription_options Additional details for uSubscription service. + // [[nodiscard]] static RpcClientUSubscriptionOrStatus create( + // std::shared_ptr transport, + // const v1::UUri& subscription_topic, ListenCallback&& callback, + // v1::UPriority priority, + // std::chrono::milliseconds subscription_request_ttl, + // RpcClientUSubscriptionOptions rpc_client_usubscription_options); + + /// @brief Unsubscribe from the topic and call uSubscription service to + /// close the subscription. void Unsubscribe(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, - ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, - ::google::protobuf::Closure* done) override; - - void FetchSubscriptions(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request, - ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response, - ::google::protobuf::Closure* done) override; + const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, + ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, + ::google::protobuf::Closure* done) override; - void RegisterForNotifications(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, - ::uprotocol::core::usubscription::v3::NotificationsResponse* response, - ::google::protobuf::Closure* done) override; + /// @brief getter for subscription update + /// + /// @return subscription update + Update getSubscriptionUpdate() const { return subscription_update_; } - void UnregisterForNotifications(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, - ::uprotocol::core::usubscription::v3::NotificationsResponse* response, - ::google::protobuf::Closure* done) override; + /// @brief Destructor + ~RpcClientUSubscription() = default; - void FetchSubscribers(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request, - ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response, - ::google::protobuf::Closure* done) override; + /// This section for test code only delete later - void Reset(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::ResetRequest* request, - ::uprotocol::core::usubscription::v3::ResetResponse* response, - ::google::protobuf::Closure* done) override; +protected: + /// @brief Constructor + /// + /// @param transport Transport to register with. + /// @param subscriber_details Additional details about the subscriber. + RpcClientUSubscription(std::shared_ptr transport, + v1::UUri subscription_topic, + RpcClientUSubscriptionOptions rpc_client_usubscription_options = {}); -// Backup - void default_call_option(); - - // SubscriptionResponse subscribe(const SubscriptionRequest& subscription_request); - - // UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request); - - // FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request); +private: + // Transport + std::shared_ptr transport_; - // NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request); + // Topic to subscribe to + const v1::UUri subscription_topic_; + // Additional details about uSubscription service + RpcClientUSubscriptionOptions rpc_client_usubscription_options_; - // NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request); + // URI info about the uSubscription service + USubscriptionUUriBuilder uSubscriptionUUriBuilder_; - // FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request); + // Subscription updates + std::unique_ptr noficationSinkHandle_; + Update subscription_update_; -private: // RPC request std::unique_ptr rpc_client_; communication::RpcClient::InvokeHandle rpc_handle_; SubscriptionResponse subscription_response_; - - // Transport - std::shared_ptr transport_; - - // URI info about the uSubscription service - client::usubscription::v3::USubscriptionUUriBuilder uSubscriptionUUriBuilder_; - - // Topic to subscribe to - const v1::UUri subscription_topic_; + UnsubscribeResponse unsubscribe_response_; + + // L2 Subscriber details + std::unique_ptr subscriber_; + + // Allow the protected constructor for this class to be used in make_unique + // inside of create() + friend std::unique_ptr + std::make_unique, + const uprotocol::v1::UUri, + uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions>( + std::shared_ptr&&, + const uprotocol::v1::UUri&&, + uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions&&); + + /// @brief Build SubscriptionRequest for subscription request + SubscriptionRequest buildSubscriptionRequest(); + + /// @brief Build UnsubscriptionRequest for unsubscription request + UnsubscribeRequest buildUnsubscriptionRequest(); + + /// @brief Create a notification sink to receive subscription updates + v1::UStatus createNotificationSink(); + + /// @brief Subscribe to the topic + /// + void Subscribe(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::SubscriptionRequest* request, + ::uprotocol::core::usubscription::v3::SubscriptionResponse* response, + ::google::protobuf::Closure* done) override; }; -} // namespace uprotocol::core::usubscription::v3 +} // namespace uprotocol::core::usubscription::v3 -#endif //RPCCLIENTUSUBSCRIPTION_H +#endif // RPCCLIENTUSUBSCRIPTION_H \ No newline at end of file diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index dca072b15..24ed9a6ad 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -1,9 +1,97 @@ -#include "up-cpp/client/usubscription/v3/RpcClientUSubscription.h" -#include -#include +// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include +#include + +#include namespace uprotocol::core::usubscription::v3 { - using Payload = datamodel::builder::Payload; + +void someCallBack(const uprotocol::v1::UMessage& message) { + // Print the message + std::cout << message.DebugString() << std::endl; +} + +RpcClientUSubscription::RpcClientUSubscription(std::shared_ptr transport, + v1::UUri subscription_topic, + RpcClientUSubscriptionOptions rpc_client_usubscription_options) + : transport_(std::move(transport)), + subscription_topic_(std::move(subscription_topic)), + rpc_client_usubscription_options_(std::move(rpc_client_usubscription_options)), + rpc_client_(nullptr) { + // Initialize uSubscriptionUUriBuilder_ + uSubscriptionUUriBuilder_ = USubscriptionUUriBuilder(); +} + +// [[nodiscard]] RpcClientUSubscription::RpcClientUSubscriptionOrStatus RpcClientUSubscription::create( +// std::shared_ptr transport, +// const v1::UUri& subscription_topic, ListenCallback&& callback, +// v1::UPriority priority, std::chrono::milliseconds subscription_request_ttl, +// RpcClientUSubscriptionOptions rpc_client_usubscription_options) { +// auto rpc_client_usubscription = std::make_unique( +// std::forward>(transport), +// std::forward(subscription_topic), +// std::forward(rpc_client_usubscription_options)); + +// // Attempt to connect create notification sink for updates. +// auto status = rpc_client_usubscription->createNotificationSink(); +// if (status.code() == v1::UCode::OK) { +// status = rpc_client_usubscription->subscribe(priority, subscription_request_ttl, +// std::move(callback)); +// if (status.code() == v1::UCode::OK) { +// return RpcClientUSubscriptionOrStatus(std::move(rpc_client_usubscription)); +// } +// return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); +// } +// // If connection fails, return the error status. +// return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); +// } + +v1::UStatus RpcClientUSubscription::createNotificationSink() { + auto notification_sink_callback = [this](const v1::UMessage& update) { + if (update.has_payload()) { + Update data; + if (data.ParseFromString(update.payload())) { + if (data.topic().SerializeAsString() == + subscription_topic_.SerializeAsString()) { + subscription_update_ = std::move(data); + } + } + } + }; + + auto notification_topic = uSubscriptionUUriBuilder_.getNotificationUri(); + + auto result = communication::NotificationSink::create( + transport_, std::move(notification_sink_callback), notification_topic); + + if (result.has_value()) { + noficationSinkHandle_ = std::move(result).value(); + v1::UStatus status; + status.set_code(v1::UCode::OK); + return status; + } + return result.error(); +} + +SubscriptionRequest RpcClientUSubscription::buildSubscriptionRequest() { + auto attributes = utils::ProtoConverter::BuildSubscribeAttributes( + rpc_client_usubscription_options_.when_expire, rpc_client_usubscription_options_.subscription_details, + rpc_client_usubscription_options_.sample_period_ms); + + auto subscription_request = utils::ProtoConverter::BuildSubscriptionRequest( + subscription_topic_, attributes); + return subscription_request; +} void RpcClientUSubscription::Subscribe( google::protobuf::RpcController* controller, @@ -12,59 +100,95 @@ void RpcClientUSubscription::Subscribe( ::google::protobuf::Closure* done) { constexpr uint16_t RESOURCE_ID_SUBSCRIBE = 0x0001; - // TODO(lennart): needs to be set - v1::UPriority priority; - std::chrono::milliseconds subscription_request_ttl; + constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? + auto subscription_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); + auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; // TODO(lennart) priority + + auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); - if ((request == nullptr) || (response == nullptr) || (done == nullptr)) { - controller->SetFailed("Invalid input parameters"); - done->Run(); - return; - } rpc_client_ = std::make_unique( transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_SUBSCRIBE), priority, subscription_request_ttl); - + auto on_response = [this, response](const auto& maybe_response) { if (maybe_response.has_value() && - maybe_response.value().has_payload()) { + maybe_response.value().has_payload()) { if (response->ParseFromString(maybe_response.value().payload())) { if (response->topic().SerializeAsString() == - subscription_topic_.SerializeAsString()) { + subscription_topic_.SerializeAsString()) { subscription_response_ = *response; } } } }; - try { - auto payload = datamodel::builder::Payload(*request); + // SubscriptionRequest const subscription_request = buildSubscriptionRequest(); + auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct, has been subscription_request before - rpc_handle_ = rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + rpc_handle_ = + rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); - auto result = communication::Subscriber::subscribe( - transport_, subscription_topic_, std::move(callback)); // TODO(lennart) callback? + // response->CopyFrom(rpc_handle_.value); - if (static_cast(rpc_handle_.isConnected()) == v1::OK) { // TODO(lennart): check if this is correct - response->CopyFrom(rpc_handle_.value); // Copy data to the response - } else { - controller->SetFailed("RPC call failed"); - } - - } catch (const std::exception& e) { - controller->SetFailed(std::string("Exception during Subscribe: ") + e.what()); - } + auto subscription_callback = someCallBack; + // Create a L2 subscription + auto result = communication::Subscriber::subscribe( + transport_, subscription_topic_, std::move(subscription_callback)); + if (result.has_value()) { + subscriber_ = std::move(result).value(); + v1::UStatus status; + status.set_code(v1::UCode::OK); + // return status; + done->Run(); + } + controller->SetFailed("result.error()"); done->Run(); } -// Backup - // SubscriptionResponse RpcClientUSubscription::subscribe( - // const SubscriptionRequest& subscription_request) { - // Payload test_test(subscription_request); - // auto invoke_handle = client_->invokeMethod(test_test, //TODO(max)); - // return SubscriptionResponse(); - // } +UnsubscribeRequest RpcClientUSubscription::buildUnsubscriptionRequest() { + auto unsubscribe_request = + utils::ProtoConverter::BuildUnSubscribeRequest(subscription_topic_); + return unsubscribe_request; +} + +void RpcClientUSubscription::Unsubscribe( + google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, + ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, + ::google::protobuf::Closure* done) { + + constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? + constexpr uint16_t RESOURCE_ID_UNSUBSCRIBE = 0x0002; + auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); + auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; // TODO(lennart) priority + + rpc_client_ = std::make_unique( + transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_UNSUBSCRIBE), + priority, request_ttl); + + auto on_response = [this, response](const auto& maybe_response) { + if (maybe_response.has_value() && + maybe_response.value().has_payload()) { + if (response->ParseFromString(maybe_response.value().payload())) { + // if (response->topic().SerializeAsString() == // TODO(lennart) see if this check is somehow possible + // subscription_topic_.SerializeAsString()) { + unsubscribe_response_ = *response; + // } + } + } + }; + + // UnsubscribeRequest const unsubscribe_request = buildUnsubscriptionRequest(); + auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct, has been subscription_request before + + rpc_handle_ = + rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + + subscriber_.reset(); + + done->Run(); +} -} // namespace uprotocol::core::usubscription::v3 +} // namespace uprotocol::core::usubscription::v3 From 9eeeb021047c3d92f5e960768c591907baa4971d Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Fri, 2 May 2025 13:15:09 +0200 Subject: [PATCH 06/12] Added comment --- src/client/usubscription/v3/RpcClientUSubscription.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index 24ed9a6ad..b3492ad11 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -131,7 +131,7 @@ void RpcClientUSubscription::Subscribe( // response->CopyFrom(rpc_handle_.value); - auto subscription_callback = someCallBack; + auto subscription_callback = someCallBack; // TODO(lennart) update with correct callback // Create a L2 subscription auto result = communication::Subscriber::subscribe( transport_, subscription_topic_, std::move(subscription_callback)); @@ -143,7 +143,9 @@ void RpcClientUSubscription::Subscribe( // return status; done->Run(); } + controller->SetFailed("result.error()"); + done->Run(); } From b09d890c030061cda15238a8a3577e194047e9d1 Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Fri, 2 May 2025 16:51:25 +0200 Subject: [PATCH 07/12] =?UTF-8?q?Done=20for=20today.=20No=20major=20change?= =?UTF-8?q?s.=20Waiting=20for=20Pete=C2=B4s=20answer.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/client/usubscription/v3/RpcClientUSubscription.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index b3492ad11..a38cc6e0d 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -10,6 +10,7 @@ // SPDX-License-Identifier: Apache-2.0 #include +#include #include #include @@ -100,11 +101,12 @@ void RpcClientUSubscription::Subscribe( ::google::protobuf::Closure* done) { constexpr uint16_t RESOURCE_ID_SUBSCRIBE = 0x0001; + // TODO(lennart) see default_call_options() for the request in Rust constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? auto subscription_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; // TODO(lennart) priority - auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); + auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); // Might serve as default_call_options() in Rust rpc_client_ = std::make_unique( From 994924beef937bca4ddfb49b62afdbeeacfaf7bc Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Mon, 5 May 2025 11:32:07 +0200 Subject: [PATCH 08/12] Added missing methods for RpcClientUSubscription --- .../usubscription/v3/RpcClientUSubscription.h | 31 +++- .../v3/RpcClientUSubscription.cpp | 169 ++++++++++++++++-- 2 files changed, 189 insertions(+), 11 deletions(-) diff --git a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h index 399dc674c..6e3fedef7 100644 --- a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h +++ b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h @@ -117,7 +117,6 @@ struct RpcClientUSubscription : public uSubscription{ utils::Expected, v1::UStatus>; using ListenCallback = transport::UTransport::ListenCallback; using ListenHandle = transport::UTransport::ListenHandle; - using SubscriptionResponse = core::usubscription::v3::SubscriptionResponse; /// @brief Create a subscription /// @@ -141,6 +140,32 @@ struct RpcClientUSubscription : public uSubscription{ const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, ::google::protobuf::Closure* done) override; + + /// @brief Fetch all subscriptions for a given topic or subscriber contained inside a [`FetchSubscriptionsRequest`] + void FetchSubscriptions(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request, + ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response, + ::google::protobuf::Closure* done) override; + + /// @brief Register for notifications relevant to a given topic inside a [`NotificationsRequest`] + /// changing in subscription status. + void RegisterForNotifications(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, + ::uprotocol::core::usubscription::v3::NotificationsResponse* response, + ::google::protobuf::Closure* done) override; + + /// @brief Unregister for notifications relevant to a given topic inside a [`NotificationsRequest`] + /// changing in subscription status. + void UnregisterForNotifications(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, + ::uprotocol::core::usubscription::v3::NotificationsResponse* response, + ::google::protobuf::Closure* done) override; + + /// @brief Fetch a list of subscribers that are currently subscribed to a given topic in a [`FetchSubscribersRequest`] + void FetchSubscribers(google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request, + ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response, + ::google::protobuf::Closure* done) override; /// @brief getter for subscription update /// @@ -180,8 +205,12 @@ struct RpcClientUSubscription : public uSubscription{ // RPC request std::unique_ptr rpc_client_; communication::RpcClient::InvokeHandle rpc_handle_; + SubscriptionResponse subscription_response_; UnsubscribeResponse unsubscribe_response_; + FetchSubscriptionsResponse fetch_subscription_response_; + NotificationsResponse notification_response_; + FetchSubscribersResponse fetch_subscribers_response_; // L2 Subscriber details std::unique_ptr subscriber_; diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index a38cc6e0d..2aa45f073 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -104,7 +104,7 @@ void RpcClientUSubscription::Subscribe( // TODO(lennart) see default_call_options() for the request in Rust constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? auto subscription_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; // TODO(lennart) priority + auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); // Might serve as default_call_options() in Rust @@ -129,9 +129,7 @@ void RpcClientUSubscription::Subscribe( auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct, has been subscription_request before rpc_handle_ = - rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); - - // response->CopyFrom(rpc_handle_.value); + rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); auto subscription_callback = someCallBack; // TODO(lennart) update with correct callback // Create a L2 subscription @@ -166,7 +164,7 @@ void RpcClientUSubscription::Unsubscribe( constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? constexpr uint16_t RESOURCE_ID_UNSUBSCRIBE = 0x0002; auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; // TODO(lennart) priority + auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; rpc_client_ = std::make_unique( transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_UNSUBSCRIBE), @@ -176,17 +174,16 @@ void RpcClientUSubscription::Unsubscribe( if (maybe_response.has_value() && maybe_response.value().has_payload()) { if (response->ParseFromString(maybe_response.value().payload())) { - // if (response->topic().SerializeAsString() == // TODO(lennart) see if this check is somehow possible - // subscription_topic_.SerializeAsString()) { + if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe + subscription_topic_.SerializeAsString()) { unsubscribe_response_ = *response; - // } + } } } }; // UnsubscribeRequest const unsubscribe_request = buildUnsubscriptionRequest(); - auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct, has been subscription_request before - + auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct rpc_handle_ = rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); @@ -195,4 +192,156 @@ void RpcClientUSubscription::Unsubscribe( done->Run(); } +void RpcClientUSubscription::FetchSubscriptions( + google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request, + ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response, + ::google::protobuf::Closure* done) { + + constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? + constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIPTIONS = 0x0003; + auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); + auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + + rpc_client_ = std::make_unique( + transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_FETCH_SUBSCRIPTIONS), + priority, request_ttl); + + auto on_response = [this, response](const auto& maybe_response) { + if (maybe_response.has_value() && + maybe_response.value().has_payload()) { + if (response->ParseFromString(maybe_response.value().payload())) { + if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe + subscription_topic_.SerializeAsString()) { + fetch_subscription_response_ = *response; + } + } + } + }; + + // FetchSubscriptionsRequest const fetch_subscriptions_request = buildFetchSubscriptionsRequest(); + auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + + rpc_handle_ = + rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + + // TODO(lennart) any handle for the response? + + done->Run(); +} + +void RpcClientUSubscription::RegisterForNotifications( + google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, + ::uprotocol::core::usubscription::v3::NotificationsResponse* response, + ::google::protobuf::Closure* done) { + + constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? + constexpr uint16_t RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS = 0x0006; + auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); + auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + + rpc_client_ = std::make_unique( + transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS), + priority, request_ttl); + + auto on_response = [this, response](const auto& maybe_response) { + if (maybe_response.has_value() && + maybe_response.value().has_payload()) { + if (response->ParseFromString(maybe_response.value().payload())) { + if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe + subscription_topic_.SerializeAsString()) { + notification_response_ = *response; + } + } + } + }; + + // NotificationsRequest const register_notifications_request = buildRegisterNotificationsRequest(); + auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + + rpc_handle_ = + rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + + // TODO(lennart) any handle for the response? + + done->Run(); +} + +void RpcClientUSubscription::UnregisterForNotifications( + google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, + ::uprotocol::core::usubscription::v3::NotificationsResponse* response, + ::google::protobuf::Closure* done) { + + constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? + constexpr uint16_t RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS = 0x0007; + auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); + auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + + rpc_client_ = std::make_unique( + transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS), + priority, request_ttl); + + auto on_response = [this, response](const auto& maybe_response) { + if (maybe_response.has_value() && + maybe_response.value().has_payload()) { + if (response->ParseFromString(maybe_response.value().payload())) { + if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe + subscription_topic_.SerializeAsString()) { + notification_response_ = *response; + } + } + } + }; + + // NotificationsRequest const unregister_notifications_request = buildUnregisterNotificationsRequest(); + auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + + rpc_handle_ = + rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + + // TODO(lennart) any handle for the response? + + done->Run(); +} + +void RpcClientUSubscription::FetchSubscribers( + google::protobuf::RpcController* controller, + const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request, + ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response, + ::google::protobuf::Closure* done) { + + constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? + constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIBERS = 0x0008; + auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); + auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + + rpc_client_ = std::make_unique( + transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_FETCH_SUBSCRIBERS), + priority, request_ttl); + + auto on_response = [this, response](const auto& maybe_response) { + if (maybe_response.has_value() && + maybe_response.value().has_payload()) { + if (response->ParseFromString(maybe_response.value().payload())) { + if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe + subscription_topic_.SerializeAsString()) { + fetch_subscribers_response_ = *response; + } + } + } + }; + + // FetchSubscribersRequest const fetch_subscribers_request = buildFetchSubscribersRequest(); + auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + + rpc_handle_ = + rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + + // TODO(lennart) any handle for the response? + + done->Run(); +} + } // namespace uprotocol::core::usubscription::v3 From bf20377c1b9922cfcc7cccd332dac5be73f93b39 Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Mon, 5 May 2025 12:52:45 +0200 Subject: [PATCH 09/12] Readded create-Method in RpcClientUSubscription. Created first MockTest --- .../usubscription/v3/RpcClientUSubscription.h | 10 ++-- .../v3/RpcClientUSubscription.cpp | 51 ++++++++++--------- test/CMakeLists.txt | 3 ++ 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h index 6e3fedef7..af3052c4b 100644 --- a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h +++ b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h @@ -127,12 +127,10 @@ struct RpcClientUSubscription : public uSubscription{ /// @param priority Priority of the subscription request. /// @param subscribe_request_ttl Time to live for the subscription request. /// @param rpc_client_usubscription_options Additional details for uSubscription service. - // [[nodiscard]] static RpcClientUSubscriptionOrStatus create( - // std::shared_ptr transport, - // const v1::UUri& subscription_topic, ListenCallback&& callback, - // v1::UPriority priority, - // std::chrono::milliseconds subscription_request_ttl, - // RpcClientUSubscriptionOptions rpc_client_usubscription_options); + [[nodiscard]] static RpcClientUSubscriptionOrStatus create( + std::shared_ptr transport, + const v1::UUri& subscription_topic, ListenCallback&& callback, + RpcClientUSubscriptionOptions rpc_client_usubscription_options); /// @brief Unsubscribe from the topic and call uSubscription service to /// close the subscription. diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index 2aa45f073..abc325bf5 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -33,29 +33,32 @@ RpcClientUSubscription::RpcClientUSubscription(std::shared_ptr transport, -// const v1::UUri& subscription_topic, ListenCallback&& callback, -// v1::UPriority priority, std::chrono::milliseconds subscription_request_ttl, -// RpcClientUSubscriptionOptions rpc_client_usubscription_options) { -// auto rpc_client_usubscription = std::make_unique( -// std::forward>(transport), -// std::forward(subscription_topic), -// std::forward(rpc_client_usubscription_options)); - -// // Attempt to connect create notification sink for updates. -// auto status = rpc_client_usubscription->createNotificationSink(); -// if (status.code() == v1::UCode::OK) { -// status = rpc_client_usubscription->subscribe(priority, subscription_request_ttl, -// std::move(callback)); -// if (status.code() == v1::UCode::OK) { -// return RpcClientUSubscriptionOrStatus(std::move(rpc_client_usubscription)); -// } -// return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); -// } -// // If connection fails, return the error status. -// return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); -// } +[[nodiscard]] RpcClientUSubscription::RpcClientUSubscriptionOrStatus RpcClientUSubscription::create( + std::shared_ptr transport, + const v1::UUri& subscription_topic, ListenCallback&& callback, + RpcClientUSubscriptionOptions rpc_client_usubscription_options) { + auto rpc_client_usubscription = std::make_unique( + std::forward>(transport), + std::forward(subscription_topic), + std::forward(rpc_client_usubscription_options)); + + google::protobuf::RpcController *controller = nullptr; + ::uprotocol::core::usubscription::v3::SubscriptionRequest const *subscription_request = nullptr; + SubscriptionResponse *subscription_response = nullptr; + + // Attempt to connect create notification sink for updates. + auto status = rpc_client_usubscription->createNotificationSink(); + if (status.code() == v1::UCode::OK) { + rpc_client_usubscription->Subscribe(controller, subscription_request, + subscription_response, nullptr); + if (status.code() == v1::UCode::OK) { + return RpcClientUSubscriptionOrStatus(std::move(rpc_client_usubscription)); + } + return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); + } + // If connection fails, return the error status. + return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); +} v1::UStatus RpcClientUSubscription::createNotificationSink() { auto notification_sink_callback = [this](const v1::UMessage& update) { @@ -332,7 +335,7 @@ void RpcClientUSubscription::FetchSubscribers( } } }; - + // FetchSubscribersRequest const fetch_subscribers_request = buildFetchSubscribersRequest(); auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index bd7e6d67f..56c122710 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -84,6 +84,9 @@ add_coverage_test("NotificationSourceTest" coverage/communication/NotificationSo # client add_coverage_test("ConsumerTest" coverage/client/usubscription/v3/ConsumerTest.cpp) +# core +add_coverage_test("RpcClientUSubscriptionTest" coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp) + ########################## EXTRAS ############################################# add_extra_test("PublisherSubscriberTest" extra/PublisherSubscriberTest.cpp) add_extra_test("NotificationTest" extra/NotificationTest.cpp) From 638aa0bc3e85b72b2799ae00765d199f62ae1c2b Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Mon, 5 May 2025 12:53:27 +0200 Subject: [PATCH 10/12] Readded create-Method in RpcClientUSubscription. Created first MockTest --- .../v3/RpcClientUSubscriptionTest.cpp | 223 ++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp diff --git a/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp b/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp new file mode 100644 index 000000000..8ed25476f --- /dev/null +++ b/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp @@ -0,0 +1,223 @@ +// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include +#include +#include +#include + +#include + +#include "UTransportMock.h" + +namespace { +using MsgDiff = google::protobuf::util::MessageDifferencer; + +void someCallBack(const uprotocol::v1::UMessage& message) { + // Print the message + std::cout << message.DebugString() << std::endl; +} + +class RpcClientUSubscriptionTest : public testing::Test { +private: + std::shared_ptr mockTransportClient_; + std::shared_ptr mockTransportServer_; + uprotocol::v1::UUri client_uuri; + uprotocol::v1::UUri server_uuri; + uprotocol::v1::UUri subscription_uuri; + +protected: + // Run once per TEST_F. + // Used to set up clean environments per test. + + std::shared_ptr getMockTransportClient() + const { + return mockTransportClient_; + } + std::shared_ptr getMockTransportServer() + const { + return mockTransportServer_; + } + uprotocol::v1::UUri& getClientUUri() { return client_uuri; } + const uprotocol::v1::UUri& getServerUUri() const { return server_uuri; } + const uprotocol::v1::UUri& getSubscriptionUUri() const { + return subscription_uuri; + } + + void SetUp() override { + constexpr uint32_t TEST_UE_ID = 0x18000; + constexpr uint32_t DEFAULT_RESOURCE_ID = 0x8000; + // Create a generic transport uri + client_uuri.set_authority_name("random_string"); + client_uuri.set_ue_id(TEST_UE_ID); + client_uuri.set_ue_version_major(3); + client_uuri.set_resource_id(0); + + // Set up a transport + mockTransportClient_ = + std::make_shared(client_uuri); + + // Craete server default uri and set up a transport + server_uuri.set_authority_name("core.usubscription"); + server_uuri.set_ue_id(0); + server_uuri.set_ue_version_major(3); + server_uuri.set_resource_id(0); + + mockTransportServer_ = + std::make_shared(server_uuri); + + // Create a generic subscription uri + subscription_uuri.set_authority_name("10.0.0.2"); + subscription_uuri.set_ue_id(TEST_UE_ID); + subscription_uuri.set_ue_version_major(3); + subscription_uuri.set_resource_id(DEFAULT_RESOURCE_ID); + }; + void TearDown() override {} + + // Run once per execution of the test application. + // Used for setup of all tests. Has access to this instance. + RpcClientUSubscriptionTest() = default; + + void buildDefaultSourceURI(); + void buildValidNotificationURI(); + void buildInValidNotificationURI(); + + // Run once per execution of the test application. + // Used only for global setup outside of tests. + static void SetUpTestSuite() {} + static void TearDownTestSuite() {} + +public: + ~RpcClientUSubscriptionTest() override = default; +}; + +// Negative test case with no source filter +TEST_F(RpcClientUSubscriptionTest, ConstructorTestSuccess) { // NOLINT + // constexpr int REQUEST_TTL_TIME = 0x8000; + auto subscription_callback = someCallBack; + // auto subscribe_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); + // auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; + + auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); + + auto rpc_client_usubscription_or_status = + uprotocol::core::usubscription::v3::RpcClientUSubscription::create( + getMockTransportClient(), getSubscriptionUUri(), + subscription_callback, options); + + // Ensure that the rpc_client_usubscription creation was successful + ASSERT_TRUE(rpc_client_usubscription_or_status.has_value()); + + // Obtain a pointer to the created rpc_client_usubscription instance + const auto& rpc_client_usubscription_ptr = rpc_client_usubscription_or_status.value(); + + // Verify that the rpc_client_usubscription pointer is not null, indicating successful + // creation + ASSERT_NE(rpc_client_usubscription_ptr, nullptr); +} + +TEST_F(RpcClientUSubscriptionTest, SubscribeTestSuccess) { // NOLINT + constexpr uint32_t DEFAULT_RESOURCE_ID = 0x8000; + // constexpr int REQUEST_TTL_TIME = 0x8000; + auto subscription_callback = someCallBack; + // auto subscribe_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); + // auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; + + auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); + + auto rpc_client_usubscription_or_status = + uprotocol::core::usubscription::v3::RpcClientUSubscription::create( + getMockTransportClient(), getSubscriptionUUri(), + subscription_callback, options); + + // Ensure that the RpcClientUSubscription creation was successful + ASSERT_TRUE(rpc_client_usubscription_or_status.has_value()); + + // Obtain a pointer to the created rpc_client_usubscription instance + const auto& rpc_client_usubscription_ptr = rpc_client_usubscription_or_status.value(); + + // Verify that the rpc_client_usubscription pointer is not null, indicating successful + // creation + ASSERT_NE(rpc_client_usubscription_ptr, nullptr); + + // Create notification source sink uri to match resource id of sink + auto notification_uuri = getServerUUri(); + notification_uuri.set_resource_id(DEFAULT_RESOURCE_ID); + + // set format UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY + auto format = + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY; + + auto notification_source = uprotocol::communication::NotificationSource( + getMockTransportServer(), std::move(notification_uuri), + std::move(getClientUUri()), format); + // Build payload + const std::string data = "test"; + auto payload = uprotocol::datamodel::builder::Payload(data, format); + + notification_source.notify(std::move(payload)); + + // Check send count + EXPECT_TRUE(getMockTransportServer()->getSendCount() == 1); + EXPECT_TRUE(getMockTransportClient()->getSendCount() == 1); +} + +TEST_F(RpcClientUSubscriptionTest, UnsubscribeTestSuccess) { // NOLINT + constexpr uint32_t DEFAULT_RESOURCE_ID = 0x8000; + // constexpr int REQUEST_TTL_TIME = 0x8000; + auto subscription_callback = someCallBack; + // auto subscribe_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); + // auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; + + auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); + + auto rpc_client_usubscription_or_status = + uprotocol::core::usubscription::v3::RpcClientUSubscription::create( + getMockTransportClient(), getSubscriptionUUri(), + subscription_callback, options); + + // Ensure that the rpc_client_usubscription creation was successful + ASSERT_TRUE(rpc_client_usubscription_or_status.has_value()); + + // Obtain a pointer to the created rpc_client_usubscription instance + const auto& rpc_client_usubscription_ptr = rpc_client_usubscription_or_status.value(); + + // Verify that the rpc_client_usubscription pointer is not null, indicating successful + // creation + ASSERT_NE(rpc_client_usubscription_ptr, nullptr); + + // Create notification source sink uri to match resource id of sink + auto notification_uuri = getServerUUri(); + notification_uuri.set_resource_id(DEFAULT_RESOURCE_ID); + + // set format UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY + auto format = + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY; + + auto notification_source = uprotocol::communication::NotificationSource( + getMockTransportServer(), std::move(notification_uuri), + std::move(getClientUUri()), format); + // Build payload + const std::string data = "test"; + auto payload = uprotocol::datamodel::builder::Payload(data, format); + + notification_source.notify(std::move(payload)); + + // Check send count + EXPECT_TRUE(getMockTransportServer()->getSendCount() == 1); + EXPECT_TRUE(getMockTransportClient()->getSendCount() == 1); + + rpc_client_usubscription_ptr->Unsubscribe(nullptr, nullptr, nullptr, nullptr); + + EXPECT_TRUE(getMockTransportClient()->getSendCount() == 2); +} + +} // namespace From ad4c9c34a78d00e949f2918c94cf1c043385e47d Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Mon, 5 May 2025 17:05:10 +0200 Subject: [PATCH 11/12] Updated minor things in RpcClientUSubscription. --- .../v3/RpcClientUSubscription.cpp | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index abc325bf5..91b38a544 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -17,11 +17,6 @@ namespace uprotocol::core::usubscription::v3 { -void someCallBack(const uprotocol::v1::UMessage& message) { - // Print the message - std::cout << message.DebugString() << std::endl; -} - RpcClientUSubscription::RpcClientUSubscription(std::shared_ptr transport, v1::UUri subscription_topic, RpcClientUSubscriptionOptions rpc_client_usubscription_options) @@ -51,7 +46,7 @@ RpcClientUSubscription::RpcClientUSubscription(std::shared_ptrSubscribe(controller, subscription_request, subscription_response, nullptr); - if (status.code() == v1::UCode::OK) { + if (controller == nullptr) { return RpcClientUSubscriptionOrStatus(std::move(rpc_client_usubscription)); } return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); @@ -109,8 +104,6 @@ void RpcClientUSubscription::Subscribe( auto subscription_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; - auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); // Might serve as default_call_options() in Rust - rpc_client_ = std::make_unique( transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_SUBSCRIBE), @@ -134,7 +127,10 @@ void RpcClientUSubscription::Subscribe( rpc_handle_ = rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); - auto subscription_callback = someCallBack; // TODO(lennart) update with correct callback + // TODO(lennart) any handle for the response? + + // Question TODO(max): communication::Subscriber::subscribe(...) necessary? + auto subscription_callback = nullptr; // TODO(lennart) update with correct callback // Create a L2 subscription auto result = communication::Subscriber::subscribe( transport_, subscription_topic_, std::move(subscription_callback)); @@ -147,7 +143,7 @@ void RpcClientUSubscription::Subscribe( done->Run(); } - controller->SetFailed("result.error()"); + controller->SetFailed("result.error()"); // TODO(lennart) method needs to be implemented done->Run(); } @@ -190,6 +186,8 @@ void RpcClientUSubscription::Unsubscribe( rpc_handle_ = rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + // TODO(lennart) any handle for the response? + subscriber_.reset(); done->Run(); From 6d693da6d3dc703774d69d6ce6f9a58450d7b172 Mon Sep 17 00:00:00 2001 From: Lennart Becker Date: Thu, 8 May 2025 15:33:40 +0200 Subject: [PATCH 12/12] building first try of sending method --- .../usubscription/v3/RpcClientUSubscription.h | 109 ++-- .../client/usubscription/v3/USubscription.h | 18 +- .../v3/RpcClientUSubscription.cpp | 561 +++++++++--------- .../v3/RpcClientUSubscriptionTest.cpp | 140 +---- 4 files changed, 352 insertions(+), 476 deletions(-) diff --git a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h index af3052c4b..ef25e327e 100644 --- a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h +++ b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h @@ -21,6 +21,7 @@ #include #include +#include "up-cpp/client/usubscription/v3/USubscription.h" namespace uprotocol::core::usubscription::v3 { using uprotocol::core::usubscription::v3::SubscriptionRequest; @@ -112,7 +113,7 @@ struct USubscriptionUUriBuilder { /// /// Like all L3 client APIs, the RpcClientUSubscription is a wrapper on top of the /// L2 Communication APIs and USubscription service. -struct RpcClientUSubscription : public uSubscription{ +struct RpcClientUSubscription : public USubscription{ using RpcClientUSubscriptionOrStatus = utils::Expected, v1::UStatus>; using ListenCallback = transport::UTransport::ListenCallback; @@ -127,61 +128,63 @@ struct RpcClientUSubscription : public uSubscription{ /// @param priority Priority of the subscription request. /// @param subscribe_request_ttl Time to live for the subscription request. /// @param rpc_client_usubscription_options Additional details for uSubscription service. - [[nodiscard]] static RpcClientUSubscriptionOrStatus create( - std::shared_ptr transport, - const v1::UUri& subscription_topic, ListenCallback&& callback, - RpcClientUSubscriptionOptions rpc_client_usubscription_options); + // [[nodiscard]] static RpcClientUSubscriptionOrStatus create( + // std::shared_ptr transport, + // const v1::UUri& subscription_topic, ListenCallback&& callback, + // RpcClientUSubscriptionOptions rpc_client_usubscription_options); + /// @brief Subscribe to the topic + /// + utils::Expected subscribe(const SubscriptionRequest& subscription_request) override; + // void subscribe(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::SubscriptionRequest* request, + // ::uprotocol::core::usubscription::v3::SubscriptionResponse* response, + // ::google::protobuf::Closure* done) override; + /// @brief Unsubscribe from the topic and call uSubscription service to /// close the subscription. - void Unsubscribe(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, - ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, - ::google::protobuf::Closure* done) override; + // void Unsubscribe(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, + // ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, + // ::google::protobuf::Closure* done) override; - /// @brief Fetch all subscriptions for a given topic or subscriber contained inside a [`FetchSubscriptionsRequest`] - void FetchSubscriptions(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request, - ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response, - ::google::protobuf::Closure* done) override; + // /// @brief Fetch all subscriptions for a given topic or subscriber contained inside a [`FetchSubscriptionsRequest`] + // void FetchSubscriptions(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request, + // ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response, + // ::google::protobuf::Closure* done) override; - /// @brief Register for notifications relevant to a given topic inside a [`NotificationsRequest`] - /// changing in subscription status. - void RegisterForNotifications(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, - ::uprotocol::core::usubscription::v3::NotificationsResponse* response, - ::google::protobuf::Closure* done) override; + // /// @brief Register for notifications relevant to a given topic inside a [`NotificationsRequest`] + // /// changing in subscription status. + // void RegisterForNotifications(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, + // ::uprotocol::core::usubscription::v3::NotificationsResponse* response, + // ::google::protobuf::Closure* done) override; - /// @brief Unregister for notifications relevant to a given topic inside a [`NotificationsRequest`] - /// changing in subscription status. - void UnregisterForNotifications(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, - ::uprotocol::core::usubscription::v3::NotificationsResponse* response, - ::google::protobuf::Closure* done) override; - - /// @brief Fetch a list of subscribers that are currently subscribed to a given topic in a [`FetchSubscribersRequest`] - void FetchSubscribers(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request, - ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response, - ::google::protobuf::Closure* done) override; - - /// @brief getter for subscription update - /// - /// @return subscription update - Update getSubscriptionUpdate() const { return subscription_update_; } + // /// @brief Unregister for notifications relevant to a given topic inside a [`NotificationsRequest`] + // /// changing in subscription status. + // void UnregisterForNotifications(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, + // ::uprotocol::core::usubscription::v3::NotificationsResponse* response, + // ::google::protobuf::Closure* done) override; + + // /// @brief Fetch a list of subscribers that are currently subscribed to a given topic in a [`FetchSubscribersRequest`] + // void FetchSubscribers(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request, + // ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response, + // ::google::protobuf::Closure* done) override; /// @brief Destructor - ~RpcClientUSubscription() = default; + ~RpcClientUSubscription() override = default; /// This section for test code only delete later -protected: +// protected: /// @brief Constructor /// /// @param transport Transport to register with. /// @param subscriber_details Additional details about the subscriber. - RpcClientUSubscription(std::shared_ptr transport, - v1::UUri subscription_topic, + explicit RpcClientUSubscription(std::shared_ptr transport, RpcClientUSubscriptionOptions rpc_client_usubscription_options = {}); private: @@ -190,29 +193,13 @@ struct RpcClientUSubscription : public uSubscription{ // Topic to subscribe to const v1::UUri subscription_topic_; + // Additional details about uSubscription service RpcClientUSubscriptionOptions rpc_client_usubscription_options_; // URI info about the uSubscription service USubscriptionUUriBuilder uSubscriptionUUriBuilder_; - // Subscription updates - std::unique_ptr noficationSinkHandle_; - Update subscription_update_; - - // RPC request - std::unique_ptr rpc_client_; - communication::RpcClient::InvokeHandle rpc_handle_; - - SubscriptionResponse subscription_response_; - UnsubscribeResponse unsubscribe_response_; - FetchSubscriptionsResponse fetch_subscription_response_; - NotificationsResponse notification_response_; - FetchSubscribersResponse fetch_subscribers_response_; - - // L2 Subscriber details - std::unique_ptr subscriber_; - // Allow the protected constructor for this class to be used in make_unique // inside of create() friend std::unique_ptr @@ -232,12 +219,6 @@ struct RpcClientUSubscription : public uSubscription{ /// @brief Create a notification sink to receive subscription updates v1::UStatus createNotificationSink(); - /// @brief Subscribe to the topic - /// - void Subscribe(google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::SubscriptionRequest* request, - ::uprotocol::core::usubscription::v3::SubscriptionResponse* response, - ::google::protobuf::Closure* done) override; }; } // namespace uprotocol::core::usubscription::v3 diff --git a/include/up-cpp/client/usubscription/v3/USubscription.h b/include/up-cpp/client/usubscription/v3/USubscription.h index 0de39ca93..9c387039d 100644 --- a/include/up-cpp/client/usubscription/v3/USubscription.h +++ b/include/up-cpp/client/usubscription/v3/USubscription.h @@ -1,24 +1,30 @@ #ifndef USUBSCRIPTION_H #define USUBSCRIPTION_H #include +#include +#include +#include "up-cpp/utils/Expected.h" namespace uprotocol::core::usubscription::v3 { struct USubscription { + template + using ResponseOrStatus = utils::Expected; + virtual ~USubscription() = default; - virtual SubscriptionResponse subscribe(const SubscriptionRequest& subscription_request) = 0; + virtual ResponseOrStatus subscribe(const SubscriptionRequest& subscription_request) = 0; - virtual UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) = 0; + // virtual UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) = 0; - virtual FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) = 0; + // virtual FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) = 0; - virtual NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) =0 ; + // virtual NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) =0 ; - virtual NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) = 0; + // virtual NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) = 0; - virtual FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) = 0; + // virtual FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) = 0; }; diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index 91b38a544..a160f1dce 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -12,75 +12,78 @@ #include #include #include +#include #include +#include "up-cpp/communication/RpcClient.h" + +constexpr uint16_t RESOURCE_ID_SUBSCRIBE = 0x0001; +// TODO(lennart) see default_call_options() for the request in Rust +constexpr auto SUBSCRIPTION_REQUEST_TTL = std::chrono::milliseconds(0x0800); // TODO(lennart) change time +auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; // MUST be >= 4 namespace uprotocol::core::usubscription::v3 { RpcClientUSubscription::RpcClientUSubscription(std::shared_ptr transport, - v1::UUri subscription_topic, RpcClientUSubscriptionOptions rpc_client_usubscription_options) : transport_(std::move(transport)), - subscription_topic_(std::move(subscription_topic)), - rpc_client_usubscription_options_(std::move(rpc_client_usubscription_options)), - rpc_client_(nullptr) { + rpc_client_usubscription_options_(std::move(rpc_client_usubscription_options)) { // Initialize uSubscriptionUUriBuilder_ uSubscriptionUUriBuilder_ = USubscriptionUUriBuilder(); } -[[nodiscard]] RpcClientUSubscription::RpcClientUSubscriptionOrStatus RpcClientUSubscription::create( - std::shared_ptr transport, - const v1::UUri& subscription_topic, ListenCallback&& callback, - RpcClientUSubscriptionOptions rpc_client_usubscription_options) { - auto rpc_client_usubscription = std::make_unique( - std::forward>(transport), - std::forward(subscription_topic), - std::forward(rpc_client_usubscription_options)); +// [[nodiscard]] RpcClientUSubscription::RpcClientUSubscriptionOrStatus RpcClientUSubscription::create( +// std::shared_ptr transport, +// const v1::UUri& subscription_topic, ListenCallback&& callback, +// RpcClientUSubscriptionOptions rpc_client_usubscription_options) { +// auto rpc_client_usubscription = std::make_unique( +// std::forward>(transport), +// std::forward(rpc_client_usubscription_options)); - google::protobuf::RpcController *controller = nullptr; - ::uprotocol::core::usubscription::v3::SubscriptionRequest const *subscription_request = nullptr; - SubscriptionResponse *subscription_response = nullptr; - - // Attempt to connect create notification sink for updates. - auto status = rpc_client_usubscription->createNotificationSink(); - if (status.code() == v1::UCode::OK) { - rpc_client_usubscription->Subscribe(controller, subscription_request, - subscription_response, nullptr); - if (controller == nullptr) { - return RpcClientUSubscriptionOrStatus(std::move(rpc_client_usubscription)); - } - return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); - } - // If connection fails, return the error status. - return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); -} - -v1::UStatus RpcClientUSubscription::createNotificationSink() { - auto notification_sink_callback = [this](const v1::UMessage& update) { - if (update.has_payload()) { - Update data; - if (data.ParseFromString(update.payload())) { - if (data.topic().SerializeAsString() == - subscription_topic_.SerializeAsString()) { - subscription_update_ = std::move(data); - } - } - } - }; - - auto notification_topic = uSubscriptionUUriBuilder_.getNotificationUri(); - - auto result = communication::NotificationSink::create( - transport_, std::move(notification_sink_callback), notification_topic); - - if (result.has_value()) { - noficationSinkHandle_ = std::move(result).value(); - v1::UStatus status; - status.set_code(v1::UCode::OK); - return status; - } - return result.error(); -} +// google::protobuf::RpcController *controller = nullptr; +// ::uprotocol::core::usubscription::v3::SubscriptionRequest const *subscription_request = nullptr; +// SubscriptionResponse *subscription_response = nullptr; + +// // Attempt to connect create notification sink for updates. +// auto status = rpc_client_usubscription->createNotificationSink(); +// if (status.code() == v1::UCode::OK) { +// rpc_client_usubscription->Subscribe(controller, subscription_request, +// subscription_response, nullptr); +// if (controller == nullptr) { +// return RpcClientUSubscriptionOrStatus(std::move(rpc_client_usubscription)); +// } +// return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); +// } +// // If connection fails, return the error status. +// return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); +// } + +// v1::UStatus RpcClientUSubscription::createNotificationSink() { +// auto notification_sink_callback = [this](const v1::UMessage& update) { +// if (update.has_payload()) { +// Update data; +// if (data.ParseFromString(update.payload())) { +// if (data.topic().SerializeAsString() == +// subscription_topic_.SerializeAsString()) { +// subscription_update_ = std::move(data); +// } +// } +// } +// }; + +// auto notification_topic = uSubscriptionUUriBuilder_.getNotificationUri(); + +// auto result = communication::NotificationSink::create( +// transport_, std::move(notification_sink_callback), notification_topic); + +// if (result.has_value()) { +// noficationSinkHandle_ = std::move(result).value(); +// v1::UStatus status; +// status.set_code(v1::UCode::OK); +// return status; +// } +// return result.error(); +// } SubscriptionRequest RpcClientUSubscription::buildSubscriptionRequest() { auto attributes = utils::ProtoConverter::BuildSubscribeAttributes( @@ -92,257 +95,235 @@ SubscriptionRequest RpcClientUSubscription::buildSubscriptionRequest() { return subscription_request; } -void RpcClientUSubscription::Subscribe( - google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::SubscriptionRequest* request, - ::uprotocol::core::usubscription::v3::SubscriptionResponse* response, - ::google::protobuf::Closure* done) { +RpcClientUSubscription::ResponseOrStatus RpcClientUSubscription::subscribe(const SubscriptionRequest& subscription_request) { + + communication::RpcClient rpc_client( + transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_SUBSCRIBE), + priority, SUBSCRIPTION_REQUEST_TTL); + + datamodel::builder::Payload payload(subscription_request); + + auto invoke_future = + rpc_client.invokeMethod(std::move(payload)); + + auto message_or_status = invoke_future.get(); - constexpr uint16_t RESOURCE_ID_SUBSCRIBE = 0x0001; - // TODO(lennart) see default_call_options() for the request in Rust - constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? - auto subscription_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + if (!message_or_status.has_value()) { + return ResponseOrStatus( + utils::Unexpected(message_or_status.error())); + } + SubscriptionResponse subscription_response; + subscription_response.ParseFromString(message_or_status.value().payload()); - rpc_client_ = std::make_unique( - transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_SUBSCRIBE), - priority, subscription_request_ttl); - - auto on_response = [this, response](const auto& maybe_response) { - if (maybe_response.has_value() && - maybe_response.value().has_payload()) { - if (response->ParseFromString(maybe_response.value().payload())) { - if (response->topic().SerializeAsString() == - subscription_topic_.SerializeAsString()) { - subscription_response_ = *response; - } - } - } - }; - - // SubscriptionRequest const subscription_request = buildSubscriptionRequest(); - auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct, has been subscription_request before - - rpc_handle_ = - rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); - - // TODO(lennart) any handle for the response? - - // Question TODO(max): communication::Subscriber::subscribe(...) necessary? - auto subscription_callback = nullptr; // TODO(lennart) update with correct callback - // Create a L2 subscription - auto result = communication::Subscriber::subscribe( - transport_, subscription_topic_, std::move(subscription_callback)); - - if (result.has_value()) { - subscriber_ = std::move(result).value(); - v1::UStatus status; - status.set_code(v1::UCode::OK); - // return status; - done->Run(); + if (subscription_response.topic().SerializeAsString() == + subscription_topic_.SerializeAsString()) { + return ResponseOrStatus(subscription_response); } - controller->SetFailed("result.error()"); // TODO(lennart) method needs to be implemented - - done->Run(); + return ResponseOrStatus( + utils::Unexpected(message_or_status.error())); + } -UnsubscribeRequest RpcClientUSubscription::buildUnsubscriptionRequest() { - auto unsubscribe_request = - utils::ProtoConverter::BuildUnSubscribeRequest(subscription_topic_); - return unsubscribe_request; -} -void RpcClientUSubscription::Unsubscribe( - google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, - ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, - ::google::protobuf::Closure* done) { - - constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? - constexpr uint16_t RESOURCE_ID_UNSUBSCRIBE = 0x0002; - auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; - - rpc_client_ = std::make_unique( - transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_UNSUBSCRIBE), - priority, request_ttl); - - auto on_response = [this, response](const auto& maybe_response) { - if (maybe_response.has_value() && - maybe_response.value().has_payload()) { - if (response->ParseFromString(maybe_response.value().payload())) { - if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe - subscription_topic_.SerializeAsString()) { - unsubscribe_response_ = *response; - } - } - } - }; - - // UnsubscribeRequest const unsubscribe_request = buildUnsubscriptionRequest(); - auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct - rpc_handle_ = - rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); - - // TODO(lennart) any handle for the response? - - subscriber_.reset(); - done->Run(); -} -void RpcClientUSubscription::FetchSubscriptions( - google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request, - ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response, - ::google::protobuf::Closure* done) { +// UnsubscribeRequest RpcClientUSubscription::buildUnsubscriptionRequest() { +// auto unsubscribe_request = +// utils::ProtoConverter::BuildUnSubscribeRequest(subscription_topic_); +// return unsubscribe_request; +// } + +// void RpcClientUSubscription::Unsubscribe( +// google::protobuf::RpcController* controller, +// const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, +// ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, +// ::google::protobuf::Closure* done) { - constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? - constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIPTIONS = 0x0003; - auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; +// constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? +// constexpr uint16_t RESOURCE_ID_UNSUBSCRIBE = 0x0002; +// auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); +// auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; - rpc_client_ = std::make_unique( - transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_FETCH_SUBSCRIPTIONS), - priority, request_ttl); - - auto on_response = [this, response](const auto& maybe_response) { - if (maybe_response.has_value() && - maybe_response.value().has_payload()) { - if (response->ParseFromString(maybe_response.value().payload())) { - if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe - subscription_topic_.SerializeAsString()) { - fetch_subscription_response_ = *response; - } - } - } - }; - - // FetchSubscriptionsRequest const fetch_subscriptions_request = buildFetchSubscriptionsRequest(); - auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct - - rpc_handle_ = - rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); - - // TODO(lennart) any handle for the response? - - done->Run(); -} - -void RpcClientUSubscription::RegisterForNotifications( - google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, - ::uprotocol::core::usubscription::v3::NotificationsResponse* response, - ::google::protobuf::Closure* done) { +// rpc_client_ = std::make_unique( +// transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_UNSUBSCRIBE), +// priority, request_ttl); + +// auto on_response = [this, response](const auto& maybe_response) { +// if (maybe_response.has_value() && +// maybe_response.value().has_payload()) { +// if (response->ParseFromString(maybe_response.value().payload())) { +// if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe +// subscription_topic_.SerializeAsString()) { +// unsubscribe_response_ = *response; +// } +// } +// } +// }; + +// // UnsubscribeRequest const unsubscribe_request = buildUnsubscriptionRequest(); +// auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct +// rpc_handle_ = +// rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + +// // TODO(lennart) any handle for the response? + +// subscriber_.reset(); + +// done->Run(); +// } + +// void RpcClientUSubscription::FetchSubscriptions( +// google::protobuf::RpcController* controller, +// const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request, +// ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response, +// ::google::protobuf::Closure* done) { - constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? - constexpr uint16_t RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS = 0x0006; - auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; +// constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? +// constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIPTIONS = 0x0003; +// auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); +// auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; - rpc_client_ = std::make_unique( - transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS), - priority, request_ttl); - - auto on_response = [this, response](const auto& maybe_response) { - if (maybe_response.has_value() && - maybe_response.value().has_payload()) { - if (response->ParseFromString(maybe_response.value().payload())) { - if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe - subscription_topic_.SerializeAsString()) { - notification_response_ = *response; - } - } - } - }; - - // NotificationsRequest const register_notifications_request = buildRegisterNotificationsRequest(); - auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct - - rpc_handle_ = - rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); - - // TODO(lennart) any handle for the response? - - done->Run(); -} - -void RpcClientUSubscription::UnregisterForNotifications( - google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, - ::uprotocol::core::usubscription::v3::NotificationsResponse* response, - ::google::protobuf::Closure* done) { +// rpc_client_ = std::make_unique( +// transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_FETCH_SUBSCRIPTIONS), +// priority, request_ttl); + +// auto on_response = [this, response](const auto& maybe_response) { +// if (maybe_response.has_value() && +// maybe_response.value().has_payload()) { +// if (response->ParseFromString(maybe_response.value().payload())) { +// if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe +// subscription_topic_.SerializeAsString()) { +// fetch_subscription_response_ = *response; +// } +// } +// } +// }; + +// // FetchSubscriptionsRequest const fetch_subscriptions_request = buildFetchSubscriptionsRequest(); +// auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + +// rpc_handle_ = +// rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + +// // TODO(lennart) any handle for the response? + +// done->Run(); +// } + +// void RpcClientUSubscription::RegisterForNotifications( +// google::protobuf::RpcController* controller, +// const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, +// ::uprotocol::core::usubscription::v3::NotificationsResponse* response, +// ::google::protobuf::Closure* done) { - constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? - constexpr uint16_t RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS = 0x0007; - auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; +// constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? +// constexpr uint16_t RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS = 0x0006; +// auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); +// auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; - rpc_client_ = std::make_unique( - transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS), - priority, request_ttl); - - auto on_response = [this, response](const auto& maybe_response) { - if (maybe_response.has_value() && - maybe_response.value().has_payload()) { - if (response->ParseFromString(maybe_response.value().payload())) { - if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe - subscription_topic_.SerializeAsString()) { - notification_response_ = *response; - } - } - } - }; - - // NotificationsRequest const unregister_notifications_request = buildUnregisterNotificationsRequest(); - auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct - - rpc_handle_ = - rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); - - // TODO(lennart) any handle for the response? - - done->Run(); -} - -void RpcClientUSubscription::FetchSubscribers( - google::protobuf::RpcController* controller, - const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request, - ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response, - ::google::protobuf::Closure* done) { +// rpc_client_ = std::make_unique( +// transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS), +// priority, request_ttl); + +// auto on_response = [this, response](const auto& maybe_response) { +// if (maybe_response.has_value() && +// maybe_response.value().has_payload()) { +// if (response->ParseFromString(maybe_response.value().payload())) { +// if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe +// subscription_topic_.SerializeAsString()) { +// notification_response_ = *response; +// } +// } +// } +// }; + +// // NotificationsRequest const register_notifications_request = buildRegisterNotificationsRequest(); +// auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + +// rpc_handle_ = +// rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + +// // TODO(lennart) any handle for the response? + +// done->Run(); +// } + +// void RpcClientUSubscription::UnregisterForNotifications( +// google::protobuf::RpcController* controller, +// const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, +// ::uprotocol::core::usubscription::v3::NotificationsResponse* response, +// ::google::protobuf::Closure* done) { - constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? - constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIBERS = 0x0008; - auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; +// constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? +// constexpr uint16_t RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS = 0x0007; +// auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); +// auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; - rpc_client_ = std::make_unique( - transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_FETCH_SUBSCRIBERS), - priority, request_ttl); - - auto on_response = [this, response](const auto& maybe_response) { - if (maybe_response.has_value() && - maybe_response.value().has_payload()) { - if (response->ParseFromString(maybe_response.value().payload())) { - if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe - subscription_topic_.SerializeAsString()) { - fetch_subscribers_response_ = *response; - } - } - } - }; - - // FetchSubscribersRequest const fetch_subscribers_request = buildFetchSubscribersRequest(); - auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct - - rpc_handle_ = - rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); - - // TODO(lennart) any handle for the response? - - done->Run(); -} +// rpc_client_ = std::make_unique( +// transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS), +// priority, request_ttl); + +// auto on_response = [this, response](const auto& maybe_response) { +// if (maybe_response.has_value() && +// maybe_response.value().has_payload()) { +// if (response->ParseFromString(maybe_response.value().payload())) { +// if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe +// subscription_topic_.SerializeAsString()) { +// notification_response_ = *response; +// } +// } +// } +// }; + +// // NotificationsRequest const unregister_notifications_request = buildUnregisterNotificationsRequest(); +// auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + +// rpc_handle_ = +// rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + +// // TODO(lennart) any handle for the response? + +// done->Run(); +// } + +// void RpcClientUSubscription::FetchSubscribers( +// google::protobuf::RpcController* controller, +// const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request, +// ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response, +// ::google::protobuf::Closure* done) { + +// constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? +// constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIBERS = 0x0008; +// auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); +// auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + +// rpc_client_ = std::make_unique( +// transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_FETCH_SUBSCRIBERS), +// priority, request_ttl); + +// auto on_response = [this, response](const auto& maybe_response) { +// if (maybe_response.has_value() && +// maybe_response.value().has_payload()) { +// if (response->ParseFromString(maybe_response.value().payload())) { +// if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe +// subscription_topic_.SerializeAsString()) { +// fetch_subscribers_response_ = *response; +// } +// } +// } +// }; + +// // FetchSubscribersRequest const fetch_subscribers_request = buildFetchSubscribersRequest(); +// auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + +// rpc_handle_ = +// rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + +// // TODO(lennart) any handle for the response? + +// done->Run(); +// } } // namespace uprotocol::core::usubscription::v3 diff --git a/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp b/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp index 8ed25476f..fe6a3ef33 100644 --- a/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp +++ b/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp @@ -21,11 +21,6 @@ namespace { using MsgDiff = google::protobuf::util::MessageDifferencer; -void someCallBack(const uprotocol::v1::UMessage& message) { - // Print the message - std::cout << message.DebugString() << std::endl; -} - class RpcClientUSubscriptionTest : public testing::Test { private: std::shared_ptr mockTransportClient_; @@ -101,123 +96,36 @@ class RpcClientUSubscriptionTest : public testing::Test { // Negative test case with no source filter TEST_F(RpcClientUSubscriptionTest, ConstructorTestSuccess) { // NOLINT - // constexpr int REQUEST_TTL_TIME = 0x8000; - auto subscription_callback = someCallBack; - // auto subscribe_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - // auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); - - auto rpc_client_usubscription_or_status = - uprotocol::core::usubscription::v3::RpcClientUSubscription::create( - getMockTransportClient(), getSubscriptionUUri(), - subscription_callback, options); - - // Ensure that the rpc_client_usubscription creation was successful - ASSERT_TRUE(rpc_client_usubscription_or_status.has_value()); - - // Obtain a pointer to the created rpc_client_usubscription instance - const auto& rpc_client_usubscription_ptr = rpc_client_usubscription_or_status.value(); - - // Verify that the rpc_client_usubscription pointer is not null, indicating successful - // creation - ASSERT_NE(rpc_client_usubscription_ptr, nullptr); + + auto rpc_client_usubscription = + std::make_unique(getMockTransportClient(), + options); + + // Verify that the RpcClientUSubscription pointer is not null, indicating successful + ASSERT_NE(rpc_client_usubscription, nullptr); } TEST_F(RpcClientUSubscriptionTest, SubscribeTestSuccess) { // NOLINT - constexpr uint32_t DEFAULT_RESOURCE_ID = 0x8000; - // constexpr int REQUEST_TTL_TIME = 0x8000; - auto subscription_callback = someCallBack; - // auto subscribe_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - // auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; - + auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); - - auto rpc_client_usubscription_or_status = - uprotocol::core::usubscription::v3::RpcClientUSubscription::create( - getMockTransportClient(), getSubscriptionUUri(), - subscription_callback, options); - - // Ensure that the RpcClientUSubscription creation was successful - ASSERT_TRUE(rpc_client_usubscription_or_status.has_value()); - - // Obtain a pointer to the created rpc_client_usubscription instance - const auto& rpc_client_usubscription_ptr = rpc_client_usubscription_or_status.value(); - - // Verify that the rpc_client_usubscription pointer is not null, indicating successful - // creation - ASSERT_NE(rpc_client_usubscription_ptr, nullptr); - - // Create notification source sink uri to match resource id of sink - auto notification_uuri = getServerUUri(); - notification_uuri.set_resource_id(DEFAULT_RESOURCE_ID); - - // set format UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY - auto format = - uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY; - - auto notification_source = uprotocol::communication::NotificationSource( - getMockTransportServer(), std::move(notification_uuri), - std::move(getClientUUri()), format); - // Build payload - const std::string data = "test"; - auto payload = uprotocol::datamodel::builder::Payload(data, format); - - notification_source.notify(std::move(payload)); - - // Check send count - EXPECT_TRUE(getMockTransportServer()->getSendCount() == 1); - EXPECT_TRUE(getMockTransportClient()->getSendCount() == 1); -} - -TEST_F(RpcClientUSubscriptionTest, UnsubscribeTestSuccess) { // NOLINT - constexpr uint32_t DEFAULT_RESOURCE_ID = 0x8000; - // constexpr int REQUEST_TTL_TIME = 0x8000; - auto subscription_callback = someCallBack; - // auto subscribe_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); - // auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; - - auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); - - auto rpc_client_usubscription_or_status = - uprotocol::core::usubscription::v3::RpcClientUSubscription::create( - getMockTransportClient(), getSubscriptionUUri(), - subscription_callback, options); - - // Ensure that the rpc_client_usubscription creation was successful - ASSERT_TRUE(rpc_client_usubscription_or_status.has_value()); - - // Obtain a pointer to the created rpc_client_usubscription instance - const auto& rpc_client_usubscription_ptr = rpc_client_usubscription_or_status.value(); - - // Verify that the rpc_client_usubscription pointer is not null, indicating successful - // creation - ASSERT_NE(rpc_client_usubscription_ptr, nullptr); - - // Create notification source sink uri to match resource id of sink - auto notification_uuri = getServerUUri(); - notification_uuri.set_resource_id(DEFAULT_RESOURCE_ID); - - // set format UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY - auto format = - uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY; - - auto notification_source = uprotocol::communication::NotificationSource( - getMockTransportServer(), std::move(notification_uuri), - std::move(getClientUUri()), format); - // Build payload - const std::string data = "test"; - auto payload = uprotocol::datamodel::builder::Payload(data, format); - - notification_source.notify(std::move(payload)); - - // Check send count - EXPECT_TRUE(getMockTransportServer()->getSendCount() == 1); - EXPECT_TRUE(getMockTransportClient()->getSendCount() == 1); - - rpc_client_usubscription_ptr->Unsubscribe(nullptr, nullptr, nullptr, nullptr); - - EXPECT_TRUE(getMockTransportClient()->getSendCount() == 2); + + uprotocol::core::usubscription::v3::SubscriptionRequest subscription_request = uprotocol::utils::ProtoConverter::BuildSubscriptionRequest( + getSubscriptionUUri(), uprotocol::core::usubscription::v3::SubscribeAttributes()); + + auto rpc_client_usubscription = + std::make_unique(getMockTransportClient(), + options); + + // Verify that the RpcClientUSubscription pointer is not null, indicating successful + ASSERT_NE(rpc_client_usubscription, nullptr); + + auto result = rpc_client_usubscription->subscribe(subscription_request); + + ASSERT_NE(&result, nullptr); } } // namespace