Skip to content
38 changes: 38 additions & 0 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,44 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;

/// Check if content filtered topic feature of the subscription instance is enabled.
/**
* \return boolean flag indicating if the content filtered topic of this subscription is enabled.
*/
RCLCPP_PUBLIC
bool
is_cft_enabled() const;

/// Set the filter expression and expression parameters for the subscription.
/**
* \param[in] filter_expression An filter expression to set.
* \sa SubscriptionOptionsBase::ContentFilterOptions::filter_expression
* \param[in] expression_parameters Array of expression parameters to set.
* \sa SubscriptionOptionsBase::ContentFilterOptions::expression_parameters
* \throws RCLBadAlloc if memory cannot be allocated
* \throws RCLError if an unexpect error occurs
*/
RCLCPP_PUBLIC
virtual
void
set_content_filter(
const std::string & filter_expression,
const std::vector<std::string> & expression_parameters = {});

/// Get the filter expression and expression parameters for the subscription.
/**
* \param[out] filter_expression An filter expression to get.
* \param[out] expression_parameters Array of expression parameters to get.
* \throws RCLBadAlloc if memory cannot be allocated
* \throws RCLError if an unexpect error occurs
*/
RCLCPP_PUBLIC
virtual
void
get_content_filter(
std::string & filter_expression,
std::vector<std::string> & expression_parameters) const;

protected:
template<typename EventCallbackT>
void
Expand Down
29 changes: 29 additions & 0 deletions rclcpp/include/rclcpp/subscription_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ struct SubscriptionOptionsBase
TopicStatisticsOptions topic_stats_options;

QosOverridingOptions qos_overriding_options;

/// Options to configure content filtered topic in the subscription.
struct ContentFilterOptions
{
/// Filter expression is similar to the WHERE part of an SQL clause
std::string filter_expression;
/**
* Expression parameters is the tokens placeholder ‘parameters’ (i.e., "%n" tokens begin from 0)
* in the filter_expression
*/
std::vector<std::string> expression_parameters;
};

ContentFilterOptions content_filter_options;
};

/// Structure containing optional configuration for Subscriptions.
Expand Down Expand Up @@ -123,6 +137,21 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
rmw_implementation_payload->modify_rmw_subscription_options(result.rmw_subscription_options);
}

// Copy content_filter_options into rcl_subscription_options.
if (!content_filter_options.filter_expression.empty()) {
std::vector<const char *> cstrings =
get_c_vector_string(content_filter_options.expression_parameters);
rcl_ret_t ret = rcl_subscription_options_set_content_filter_options(
get_c_string(content_filter_options.filter_expression),
cstrings.size(),
cstrings.data(),
&result);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to set content_filter_options");
}
}

return result;
}

Expand Down
9 changes: 9 additions & 0 deletions rclcpp/include/rclcpp/utilities.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ RCLCPP_PUBLIC
const char *
get_c_string(const std::string & string_in);

/// Return the std::vector of C string from the given std::vector<std::string>.
/**
* \param[in] string_in is a std::string
* \return the std::vector of C string from the std::vector<std::string>
*/
RCLCPP_PUBLIC
std::vector<const char *>
get_c_vector_string(const std::vector<std::string> & strings);

} // namespace rclcpp

#endif // RCLCPP__UTILITIES_HPP_
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/wait_for_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ bool wait_for_message(
wait_set.add_subscription(subscription);
wait_set.add_guard_condition(gc);
auto ret = wait_set.wait(time_to_wait);
// if it is fixed in other PR, remove them and rebase.
wait_set.remove_subscription(subscription);
wait_set.remove_guard_condition(gc);
if (ret.kind() != rclcpp::WaitResultKind::Ready) {
return false;
}
Expand Down
91 changes: 91 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <string>
#include <vector>

#include "rcpputils/scope_exit.hpp"

#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
Expand Down Expand Up @@ -326,3 +328,92 @@ std::vector<rclcpp::NetworkFlowEndpoint> SubscriptionBase::get_network_flow_endp

return network_flow_endpoint_vector;
}

bool
SubscriptionBase::is_cft_enabled() const
{
return rcl_subscription_is_cft_enabled(subscription_handle_.get());
}

void
SubscriptionBase::set_content_filter(
const std::string & filter_expression,
const std::vector<std::string> & expression_parameters)
{
rcl_subscription_content_filter_options_t options =
rcl_subscription_get_default_content_filter_options();

std::vector<const char *> cstrings =
get_c_vector_string(expression_parameters);
rcl_ret_t ret = rcl_subscription_content_filter_options_init(
get_c_string(filter_expression),
cstrings.size(),
cstrings.data(),
&options);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to init subscription content_filtered_topic option");
}
RCPPUTILS_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_subscription_content_filter_options_fini(&options);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"Failed to fini subscription content_filtered_topic option: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
});

ret = rcl_subscription_set_content_filter(
subscription_handle_.get(),
&options);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to set cft expression parameters");
}
}

void
SubscriptionBase::get_content_filter(
std::string & filter_expression,
std::vector<std::string> & expression_parameters) const
{
rcl_subscription_content_filter_options_t options =
rcl_subscription_get_default_content_filter_options();

// use rcl_content_filter_options_t instead of char * and
rcl_ret_t ret = rcl_subscription_get_content_filter(
subscription_handle_.get(),
&options);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to get cft expression parameters");
}

RCPPUTILS_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_subscription_content_filter_options_fini(&options);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"Failed to fini subscription content_filtered_topic option: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
});

rmw_subscription_content_filter_options_t * content_filter_options =
options.rmw_subscription_content_filter_options;
if (content_filter_options->filter_expression) {
filter_expression = content_filter_options->filter_expression;
}

if (content_filter_options->expression_parameters) {
for (size_t i = 0; i < content_filter_options->expression_parameters->size; ++i) {
expression_parameters.push_back(
content_filter_options->expression_parameters->data[i]);
}
}
}
16 changes: 15 additions & 1 deletion rclcpp/src/rclcpp/time_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,19 @@ void TimeSource::attachNode(
return result;
});

const rclcpp::QoS qos = (
rclcpp::QoS(rclcpp::QoSInitialization::from_rmw(rmw_qos_profile_parameter_events))
);
rclcpp::SubscriptionOptions options;
options.content_filter_options.filter_expression =
std::string("node = '") + node_base_->get_fully_qualified_name() + "'";

// TODO(tfoote) use parameters interface not subscribe to events via topic ticketed #609
parameter_subscription_ = rclcpp::AsyncParametersClient::on_parameter_event(
node_topics_,
std::bind(&TimeSource::on_parameter_event, this, std::placeholders::_1));
std::bind(&TimeSource::on_parameter_event, this, std::placeholders::_1),
qos,
options);
}

void TimeSource::detachNode()
Expand Down Expand Up @@ -299,6 +308,11 @@ void TimeSource::on_parameter_event(
{
// Filter out events on 'use_sim_time' parameter instances in other nodes.
if (event->node != node_base_->get_fully_qualified_name()) {
if (parameter_subscription_->is_cft_enabled()) {
RCLCPP_ERROR(
logger_, "this node '%s' should not get parameter event of another node '%s'",
node_base_->get_fully_qualified_name(), event->node.c_str());
}
return;
}
// Filter for only 'use_sim_time' being added or changed.
Expand Down
13 changes: 13 additions & 0 deletions rclcpp/src/rclcpp/utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,17 @@ get_c_string(const std::string & string_in)
return string_in.c_str();
}

std::vector<const char *>
get_c_vector_string(const std::vector<std::string> & strings)
{
std::vector<const char *> cstrings;
cstrings.reserve(strings.size());

for (size_t i = 0; i < strings.size(); ++i) {
cstrings.push_back(strings[i].c_str());
}

return cstrings;
}

} // namespace rclcpp
17 changes: 17 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -739,3 +739,20 @@ ament_add_gtest(test_graph_listener test_graph_listener.cpp)
if(TARGET test_graph_listener)
target_link_libraries(test_graph_listener ${PROJECT_NAME} mimick)
endif()

function(test_subscription_content_filter_for_rmw_implementation)
set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation})
ament_add_gmock(test_subscription_content_filter${target_suffix}
test_subscription_content_filter.cpp
ENV ${rmw_implementation_env_var}
)
if(TARGET test_subscription_content_filter${target_suffix})
target_link_libraries(test_subscription_content_filter${target_suffix} ${PROJECT_NAME} mimick)
ament_target_dependencies(test_subscription_content_filter${target_suffix}
"rcpputils"
"rosidl_typesupport_cpp"
"test_msgs"
)
endif()
endfunction()
call_for_each_rmw_implementation(test_subscription_content_filter_for_rmw_implementation)
Loading