From 8ad5169998f9f0609492b5782c0f3f684ae544dd Mon Sep 17 00:00:00 2001 From: Loic Pottier Date: Wed, 4 Jun 2025 20:39:40 -0700 Subject: [PATCH 1/7] Moving from eevery rank using the same queue to communicate with the stager to exchange-based communications. This will allow RMQ server to mutliple cores to process the data Signed-off-by: Loic Pottier --- src/AMSWorkflow/ams/rmq.py | 82 +++++++++- src/AMSlib/AMS.cpp | 21 +-- src/AMSlib/wf/basedb.hpp | 144 ++++++++++++------ tests/AMSlib/ams_interface/CMakeLists.txt | 20 +-- .../ams_interface/json_configs/rmq.json.in | 3 +- tests/AMSlib/ams_interface/verify_ete.py | 8 +- 6 files changed, 199 insertions(+), 79 deletions(-) diff --git a/src/AMSWorkflow/ams/rmq.py b/src/AMSWorkflow/ams/rmq.py index 61f65967..fd228634 100644 --- a/src/AMSWorkflow/ams/rmq.py +++ b/src/AMSWorkflow/ams/rmq.py @@ -225,12 +225,15 @@ class AMSChannel: def __init__( self, connection, - q_name, + exchange, + routing_key, callback: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ): self.connection = connection - self.q_name = q_name + self.exchange = exchange + self.routing_key = routing_key + self.q_name = None self.logger = logger if logger else logging.getLogger(__name__) self.callback = callback if callback else self.default_callback @@ -247,7 +250,18 @@ def default_callback(self, method, properties, body): def open(self): self.channel = self.connection.channel() - self.channel.queue_declare(queue=self.q_name) + q_name = self.routing_key + if self.exchange != '': + self.logger.info(f"Declared exchange {self.exchange}") + self.channel.exchange_declare(exchange = self.exchange, exchange_type = "direct") + q_name = "ams-debug" #TODO CHANGE + + result = self.channel.queue_declare(queue = q_name, exclusive = False, durable = False) + self.q_name = result.method.queue + self.logger.info(f"Declared queue {self.q_name}") + if self.exchange != '': + self.logger.info(f"Binding queue {self.q_name} to exchange {self.exchange}") + self.channel.queue_bind(exchange = self.exchange, queue = self.q_name, routing_key = self.routing_key) def close(self): self.channel.close() @@ -308,7 +322,7 @@ def send(self, text: str, exchange: str = ""): @param text The text to send @param exchange Exchange to use """ - self.channel.basic_publish(exchange=exchange, routing_key=self.q_name, body=text) + self.channel.basic_publish(exchange = exchange, routing_key = self.routing_key, body=text) return def get_messages(self): @@ -374,9 +388,11 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.connection.close() - def connect(self, queue): - """Connect to the queue""" - return AMSChannel(self.connection, queue, self.callback) + def connect(self, exchange, routing_key): + """ + Connect to the exchange and routing key. + """ + return AMSChannel(self.connection, exchange, routing_key, self.callback) class StatusPoller(BlockingClient): @@ -570,6 +586,7 @@ def on_channel_open(self, channel): self.add_on_channel_close_callback() # we do not set up exchange first here, we use the default exchange '' self.setup_queue(self._queue) + # self.setup_exchange(self.EXCHANGE) def add_on_channel_close_callback(self): """This method tells pika to call the on_channel_closed method if @@ -608,6 +625,57 @@ def setup_queue(self, queue_name): # arguments = {"x-consumer-timeout":1800000} # 30 minutes in ms self._channel.queue_declare(queue=queue_name, exclusive=False, callback=cb) + # def setup_exchange(self, exchange_name): + # """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC + # command. When it is complete, the on_exchange_declareok method will + # be invoked by pika. + + # :param str|unicode exchange_name: The name of the exchange to declare + + # """ + # LOGGER.info('Declaring exchange: %s', exchange_name) + # # Note: using functools.partial is not required, it is demonstrating + # # how arbitrary data can be passed to the callback when it is called + # cb = functools.partial( + # self.on_exchange_declareok, userdata=exchange_name) + # self._channel.exchange_declare( + # exchange=exchange_name, + # exchange_type=self.EXCHANGE_TYPE, + # callback=cb) + + # def setup_queue(self, queue_name): + # """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC + # command. When it is complete, the on_queue_declareok method will + # be invoked by pika. + + # :param str|unicode queue_name: The name of the queue to declare. + + # """ + # LOGGER.info('Declaring queue %s', queue_name) + # cb = functools.partial(self.on_queue_declareok, userdata=queue_name) + # self._channel.queue_declare(queue=queue_name, callback=cb) + + # def on_queue_declareok(self, _unused_frame, userdata): + # """Method invoked by pika when the Queue.Declare RPC call made in + # setup_queue has completed. In this method we will bind the queue + # and exchange together with the routing key by issuing the Queue.Bind + # RPC command. When this command is complete, the on_bindok method will + # be invoked by pika. + + # :param pika.frame.Method _unused_frame: The Queue.DeclareOk frame + # :param str|unicode userdata: Extra user data (queue name) + + # """ + # queue_name = userdata + # LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, queue_name, + # self.ROUTING_KEY) + # cb = functools.partial(self.on_bindok, userdata=queue_name) + # self._channel.queue_bind( + # queue_name, + # self.EXCHANGE, + # routing_key=self.ROUTING_KEY, + # callback=cb) + def on_queue_declareok(self, _unused_frame, userdata): """Method invoked by pika when the Queue.Declare RPC call made in setup_queue has completed. In this method we will bind the queue diff --git a/src/AMSlib/AMS.cpp b/src/AMSlib/AMS.cpp index b665b36a..4d892521 100644 --- a/src/AMSlib/AMS.cpp +++ b/src/AMSlib/AMS.cpp @@ -226,11 +226,13 @@ class AMSWrap getEntry(rmq_entry, "rabbitmq-password"); std::string rmq_user = getEntry(rmq_entry, "rabbitmq-user"); std::string rmq_vhost = getEntry(rmq_entry, "rabbitmq-vhost"); - std::string rmq_out_queue = - getEntry(rmq_entry, "rabbitmq-queue-physics"); - std::string exchange = + std::string exchange_physics = + getEntry(rmq_entry, "rabbitmq-exchange-physics"); + std::string routing_key_physics = + getEntry(rmq_entry, "rabbitmq-key-physics"); + std::string exchange_ml = getEntry(rmq_entry, "rabbitmq-exchange-training"); - std::string routing_key = + std::string routing_key_ml = getEntry(rmq_entry, "rabbitmq-key-training"); bool update_surrogate = getEntry(entry, "update_surrogate"); @@ -240,13 +242,13 @@ class AMSWrap rmq_cert = getEntry(rmq_entry, "rabbitmq-cert"); CFATAL(AMS, - (exchange == "" || routing_key == "") && update_surrogate, + (exchange_ml == "" || routing_key_ml == "") && update_surrogate, "Found empty RMQ exchange / routing-key, model update is not " "possible. " "Please provide a RMQ exchange or deactivate surrogate model " "update.") - if (exchange == "" || routing_key == "") { + if (exchange_ml == "" || routing_key_ml == "") { WARNING(AMS, "Found empty RMQ exchange or routing-key, deactivating model " "update") @@ -260,9 +262,10 @@ class AMSWrap rmq_user, rmq_vhost, rmq_cert, - rmq_out_queue, - exchange, - routing_key, + exchange_physics, + routing_key_physics, + exchange_ml, + routing_key_ml, update_surrogate); } diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp index 8d375a3a..52c51fed 100644 --- a/src/AMSlib/wf/basedb.hpp +++ b/src/AMSlib/wf/basedb.hpp @@ -934,16 +934,20 @@ class ConnectionManagerAMQP std::atomic _stop; /** @brief True if currently reconnectiong */ std::atomic _reconnecting; - std::string _queue_sender; - /** @brief name of the exchange */ - std::string _exchange; - /** @brief name of the routing binded to exchange */ - std::string _routing_key; + /** @brief Exchange name for outgoing communication */ + std::string _exchange_sender; + /** @brief Routing key for outgoing communication */ + std::string _routing_key_sender; + /** @brief name of the receiving exchange */ + std::string _exchange_receiver; + /** @brief name of the routing binded to receiving exchange */ + std::string _routing_key_receiver; /** @brief True if connection */ std::atomic _isConnected; - /** @brief Number of messages not acked / nacked */ std::atomic _nbProcessingMsg; + /** @brief Store queue name if explicit queue name requested */ + std::string _queue_name; public: ConnectionManagerAMQP(uint64_t id, @@ -953,10 +957,12 @@ class ConnectionManagerAMQP std::string service_host, int service_port, std::string rmq_cert, - std::string outbound_queue, - std::string exchange, - std::string routing_key, - bool connectionDrop = false) + std::string exchange_physics, + std::string routing_key_physics, + std::string exchange_ml, + std::string routing_key_ml, + bool connectionDrop = false, + std::string queue_name = "") : _rId(id), _address(service_host, service_port, @@ -964,12 +970,14 @@ class ConnectionManagerAMQP rmq_vhost, rmq_cert.empty() ? false : true), _stop(false), - _queue_sender(outbound_queue), - _exchange(exchange), - _routing_key(routing_key), + _exchange_sender(exchange_physics), + _exchange_receiver(exchange_ml), + _routing_key_sender(routing_key_physics), + _routing_key_receiver(routing_key_ml), _reconnecting(false), _isConnected(false), - _nbProcessingMsg(0) + _nbProcessingMsg(0), + _queue_name(queue_name) { #ifdef EVTHREAD_USE_PTHREADS_IMPLEMENTED evthread_use_pthreads(); @@ -988,8 +996,8 @@ class ConnectionManagerAMQP _address.hostname().c_str(), _address.port(), _address.vhost().c_str(), - _exchange.c_str(), - _routing_key.c_str()) + _exchange_receiver.c_str(), + _routing_key_receiver.c_str()) _base = event_base_new(); _handler = std::make_shared(_base, rmq_cert); @@ -1167,8 +1175,8 @@ class ConnectionManagerAMQP // Publish using the reliable channel if available. if (_reliableChannel) { _reliableChannel - ->publish("", - _queue_sender, + ->publish(_exchange_sender, + _routing_key_sender, reinterpret_cast(msg.dPtr.get()), msg.size) .onAck([this, msg]() { @@ -1276,24 +1284,54 @@ class ConnectionManagerAMQP _isConnected = false; }); - _channel->declareQueue(_queue_sender) - .onSuccess([](const std::string& name, + _channel->declareExchange(_exchange_sender, AMQP::ExchangeType::direct) + .onSuccess([&]() { + DBG(ConnectionManagerAMQP, "declared exchange: %s", _exchange_sender.c_str()) + + _channel->declareQueue(_queue_name) + .onSuccess([&](const std::string& queue_name, uint32_t messagecount, uint32_t consumercount) { DBG(ConnectionManagerAMQP, "declared queue: %s (messagecount=%d, " "consumercount=%d)", - name.c_str(), + queue_name.c_str(), messagecount, consumercount) - }) + // We bind the anonymous queue to the exchange + _channel->bindQueue(_exchange_sender, queue_name, _routing_key_sender) + .onSuccess([&, queue_name]() { + DBG(ConnectionManagerAMQP, + "Bounded queue %s to exchange %s with " + "routing key = %s", + queue_name.c_str(), + _exchange_sender.c_str(), + _routing_key_sender.c_str()) + }) // bindQueue + .onError([&](const char* message) { + WARNING(ConnectionManagerAMQP, + "Error while binding queue to exchange " + "%s", + message) + _isConnected = false; + }); // bindQueue + }) //declareQueue .onError([&](const char* message) { WARNING(ConnectionManagerAMQP, - "Error while creating broker queue: " + "Error while creating queue: " "%s", message) _isConnected = false; - }); + }); //declareQueue + }) // declareExchange + .onError([&](const char* message) { + WARNING(ConnectionManagerAMQP, + "Error while creating exchange: " + "%s", + message) + _isConnected = false; + }); // declareExchange + _isConnected = true; _reliableChannel = std::make_shared>(*_channel); @@ -1401,18 +1439,8 @@ class ConnectionManagerAMQP class RMQInterface { private: - /** @brief Path of the config file (JSON) */ - std::string _config; /** @brief MPI rank (0 if no MPI support) */ uint64_t _rId; - /** @brief name of the queue to send data */ - std::string _queue_sender; - /** @brief name of the exchange to receive data */ - std::string _exchange; - /** @brief name of the routing key to receive data */ - std::string _routing_key; - /** @brief TLS certificate path */ - std::string _cacert; /** @brief Represent the ID of the last message sent */ int _msg_tag; /** @brief True if we support surrogate update */ @@ -1432,9 +1460,10 @@ class RMQInterface * @param[in] service_port The port number * @param[in] service_host URL of RabbitMQ server * @param[in] rmq_cert Path to TLS certificate - * @param[in] outbound_queue Name of the queue on which AMSlib publishes (send) messages - * @param[in] exchange Exchange for incoming messages - * @param[in] routing_key Routing key for incoming messages (must match what the AMS Python side is using) + * @param[in] exchange_physics Name of the exchange on which AMSlib publishes (send) messages + * @param[in] routing_key_physics Routing key used by AMSlib to send messages + * @param[in] exchange_ml Exchange for incoming messages + * @param[in] routing_key_ml Routing key for incoming messages (must match what the AMS Python side is using) */ void connect(std::string rmq_user, std::string rmq_password, @@ -1442,13 +1471,24 @@ class RMQInterface std::string service_host, int service_port, std::string rmq_cert, - std::string outbound_queue, - std::string exchange, - std::string routing_key, + std::string exchange_physics, + std::string routing_key_physics, + std::string exchange_ml, + std::string routing_key_ml, bool updateSurrogate) { bool amsRMQFailure = checkEnvVariable("AMS_SIMULATE_RMQ_FAILURE"); + bool amsRMQNamedQueue = checkEnvVariable("AMS_USE_NAMED_QUEUE"); CWARNING(RMQInterface, amsRMQFailure, "Simulating connetion drops") + CWARNING(RMQInterface, amsRMQNamedQueue, "Using named queue for RabbitMQ (slower)") + + // If the queue_name is equals to "" RabbitMQ will create a queue for us (anonymous queue) + // For debug and test we can also force RMQ to use a specific queue name which enforce message + // retention even if the other side (consumer) is not listening when we send messages + std::string queue_name = ""; + if (amsRMQNamedQueue) { + queue_name = "ams-debug-queue-" + std::to_string(_rId); + } _publishingManager = std::make_unique(_rId, rmq_user, @@ -1457,10 +1497,12 @@ class RMQInterface service_host, service_port, rmq_cert, - outbound_queue, - exchange, - routing_key, - amsRMQFailure); + exchange_physics, + routing_key_physics, + exchange_ml, + routing_key_ml, + amsRMQFailure, + queue_name); _updateSurrogate = updateSurrogate; } @@ -1869,9 +1911,10 @@ class DBManager std::string& rmq_user, std::string& rmq_vhost, std::string& rmq_cert, - std::string& outbound_queue, - std::string& exchange, - std::string& routing_key, + std::string& exchange_physics, + std::string& routing_key_physics, + std::string& exchange_ml, + std::string& routing_key_ml, bool update_surrogate) { fs::path Path(rmq_cert); @@ -1890,9 +1933,10 @@ class DBManager host, port, rmq_cert, - outbound_queue, - exchange, - routing_key, + exchange_physics, + routing_key_physics, + exchange_ml, + routing_key_ml, update_surrogate); #else FATAL(DBManager, diff --git a/tests/AMSlib/ams_interface/CMakeLists.txt b/tests/AMSlib/ams_interface/CMakeLists.txt index 7d34ec0a..0740e359 100644 --- a/tests/AMSlib/ams_interface/CMakeLists.txt +++ b/tests/AMSlib/ams_interface/CMakeLists.txt @@ -5,27 +5,29 @@ endfunction() function(JSON_TESTS db_type) - configure_file("${CMAKE_CURRENT_SOURCE_DIR}/json_configs/env_2_models_fs_rand_uq.json.in" "${JSON_FP}" @ONLY) - + if(NOT EXISTS "${JSON_FP}") + configure_file("${CMAKE_CURRENT_SOURCE_DIR}/json_configs/env_2_models_fs_rand_uq.json.in" "${JSON_FP}" @ONLY) + endif() + # Tests Random models with different percentages both models store to file - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random10::Random50::Double::DB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_random_50;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_random_50") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random10::Random50::Double::DB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_random_50;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_random_50") # Tests delta-uq models with different aggregation both models store to file - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::DuqMean::DuqMax::Double::DB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_uq_mean app_uq_max;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_uq_mean app_uq_max") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::DuqMean::DuqMax::Double::DB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_uq_mean app_uq_max;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_uq_mean app_uq_max") # Tests detla uq model with a random uq model both models store to files - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random::DuqMax::Double::DB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_uq_max;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_uq_max") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random::DuqMax::Double::DB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_uq_max;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_uq_max") # Tests detla uq model with no model. uq model both store to files - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random::NoModel::Double::DB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_no_model;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_no_model") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random::NoModel::Double::DB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_no_model;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_no_model") # Tests 2 delta uq models with no deb . uq model both store to files - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::DuqMean::DuqMax::Double::NODB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_uq_mean_ndb app_uq_max_ndb;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_uq_mean_ndb app_uq_max_ndb") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::DuqMean::DuqMax::Double::NODB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_uq_mean_ndb app_uq_max_ndb;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_uq_mean_ndb app_uq_max_ndb") # Tests null models null dbs - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::None::None::Double::NODB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_no_model_no_db app_no_model_no_db ;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_no_model_no_db app_no_model_no_db") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::None::None::Double::NODB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_no_model_no_db app_no_model_no_db ;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_no_model_no_db app_no_model_no_db") unset(AMS_DB_TEST_TYPE) unset(JSON_FP) @@ -42,7 +44,7 @@ function(CHECK_RMQ_CONFIG file) string(JSON RMQ_PORT GET ${DB_CONF} "service-port") if(NOT "${RMQ_HOST}" STREQUAL "" AND NOT "${RMQ_PORT}" STREQUAL "0") - message(STATUS "RabbitMQ config ${file}: ${RMQ_HOST}:${RMQ_PORT}") + message(STATUS "RabbitMQ config ${file}: ${RMQ_HOST}:${RMQ_PORT}") else() message(WARNING "RabbitMQ config file ${file} looks empty! Make sure to fill these fields before running the tests") endif() diff --git a/tests/AMSlib/ams_interface/json_configs/rmq.json.in b/tests/AMSlib/ams_interface/json_configs/rmq.json.in index 7aec73d8..c3156602 100644 --- a/tests/AMSlib/ams_interface/json_configs/rmq.json.in +++ b/tests/AMSlib/ams_interface/json_configs/rmq.json.in @@ -9,7 +9,8 @@ "rabbitmq-user": "", "rabbitmq-vhost": "", "rabbitmq-cert": "", - "rabbitmq-queue-physics": "", + "rabbitmq-exchange-physics": "", + "rabbitmq-key-physics": "", "rabbitmq-exchange-training": "", "rabbitmq-key-training": "" }, diff --git a/tests/AMSlib/ams_interface/verify_ete.py b/tests/AMSlib/ams_interface/verify_ete.py index fccc61e5..8776ac60 100644 --- a/tests/AMSlib/ams_interface/verify_ete.py +++ b/tests/AMSlib/ams_interface/verify_ete.py @@ -265,7 +265,7 @@ def from_cli(argv): return error -def get_rmq_data(ams_config, domain_names, num_iterations, timeout=1): +def get_rmq_data(ams_config, domain_names, num_iterations, timeout=10): from ams.rmq import BlockingClient, default_ams_callback rmq_json = ams_config["db"]["rmq_config"] @@ -275,13 +275,15 @@ def get_rmq_data(ams_config, domain_names, num_iterations, timeout=1): port = rmq_json["service-port"] user = rmq_json["rabbitmq-user"] password = rmq_json["rabbitmq-password"] - queue = rmq_json["rabbitmq-queue-physics"] + exchange = rmq_json["rabbitmq-exchange-physics"] + rkey = rmq_json["rabbitmq-key-physics"] cert = None if "rabbitmq-cert" in rmq_json: cert = rmq_json["rabbitmq-cert"] cert = None if cert == "" else cert with BlockingClient(host, port, vhost, user, password, cert, default_ams_callback) as client: - with client.connect(queue) as channel: + queue = "ams-debug-queue-0" # TODO: fix that + with client.connect(exchange, rkey) as channel: msgs = channel.receive(n_msg=num_iterations * len(domain_names), timeout=timeout) dns = set(domain_names) From f3c7a58e41a5422f23a994c2c00677978014db0a Mon Sep 17 00:00:00 2001 From: Loic Pottier Date: Wed, 4 Jun 2025 20:40:42 -0700 Subject: [PATCH 2/7] Unset AMS_OBJECT when running tests to avoid conflict with existing env Signed-off-by: Loic Pottier --- tests/AMSlib/ams_interface/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/AMSlib/ams_interface/CMakeLists.txt b/tests/AMSlib/ams_interface/CMakeLists.txt index 0740e359..de418674 100644 --- a/tests/AMSlib/ams_interface/CMakeLists.txt +++ b/tests/AMSlib/ams_interface/CMakeLists.txt @@ -1,5 +1,5 @@ function(ADD_API_UNIT_TEST gname name cmd) - add_test(NAME ${name} COMMAND bash -c "${cmd}") + add_test(NAME ${name} COMMAND ${CMAKE_COMMAND} -E env --unset=AMS_OBJECTS bash -c "${cmd}") set_tests_properties(${name} PROPERTIES LABELS ${gname}) endfunction() From 2550db7d2dffbf08b3064444d5c9ffe1a9d371a2 Mon Sep 17 00:00:00 2001 From: Loic Pottier Date: Wed, 4 Jun 2025 22:20:32 -0700 Subject: [PATCH 3/7] Updated stagers to consume from exchange + routing key instead of directly from queue Signed-off-by: Loic Pottier --- src/AMSWorkflow/ams/rmq.py | 185 ++++++++++++++++++----------------- src/AMSWorkflow/ams/stage.py | 30 ++++-- 2 files changed, 114 insertions(+), 101 deletions(-) diff --git a/src/AMSWorkflow/ams/rmq.py b/src/AMSWorkflow/ams/rmq.py index fd228634..cdb0ecf1 100644 --- a/src/AMSWorkflow/ams/rmq.py +++ b/src/AMSWorkflow/ams/rmq.py @@ -17,6 +17,7 @@ import numpy as np import json import pika +# from pika.exchange_type import ExchangeType class AMSMessage(object): @@ -417,8 +418,11 @@ def __init__( user: str, password: str, cert: str, - queue: str, + exchange: str, + routing_key: str, + queue: str = "", prefetch_count: int = 1, + exchange_type: str = "direct", on_message_cb: Optional[Callable] = None, on_close_cb: Optional[Callable] = None, logger: Optional[logging.Logger] = None, @@ -441,6 +445,9 @@ def __init__( self._vhost = vhost self._cacert = cert self._queue = queue + self._exchange = exchange + self._exchange_type = exchange_type + self._routing_key = routing_key self.should_reconnect = False # Holds the latest error/reason to reconnect @@ -584,9 +591,7 @@ def on_channel_open(self, channel): self._channel = channel self.logger.debug("Channel opened") self.add_on_channel_close_callback() - # we do not set up exchange first here, we use the default exchange '' - self.setup_queue(self._queue) - # self.setup_exchange(self.EXCHANGE) + self.setup_exchange(self._exchange, self._exchange_type) def add_on_channel_close_callback(self): """This method tells pika to call the on_channel_closed method if @@ -612,6 +617,33 @@ def on_channel_closed(self, channel, reason): self._on_close_cb() # running user callback self.close_connection() + def setup_exchange(self, exchange_name, exchange_type): + """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC + command. When it is complete, the on_exchange_declareok method will + be invoked by pika. + + :param str|unicode exchange_name: The name of the exchange to declare + + """ + self.logger.debug(f"Declaring exchange: '{exchange_name}'") + cb = functools.partial( + self.on_exchange_declareok, userdata = exchange_name) + self._channel.exchange_declare( + exchange = exchange_name, + exchange_type = exchange_type, + callback = cb) + + def on_exchange_declareok(self, _unused_frame, userdata): + """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC + command. + + :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame + :param str|unicode userdata: Extra user data (exchange name) + + """ + self.logger.debug(f"Exchange declared: '{userdata}'") + self.setup_queue(self._queue) + def setup_queue(self, queue_name): """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is complete, the on_queue_declareok method will @@ -620,62 +652,11 @@ def setup_queue(self, queue_name): :param str|unicode queue_name: The name of the queue to declare. """ - self.logger.debug(f'Declaring queue "{queue_name}"') + self.logger.debug(f"Declaring queue '{queue_name}'") cb = functools.partial(self.on_queue_declareok, userdata=queue_name) # arguments = {"x-consumer-timeout":1800000} # 30 minutes in ms self._channel.queue_declare(queue=queue_name, exclusive=False, callback=cb) - # def setup_exchange(self, exchange_name): - # """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC - # command. When it is complete, the on_exchange_declareok method will - # be invoked by pika. - - # :param str|unicode exchange_name: The name of the exchange to declare - - # """ - # LOGGER.info('Declaring exchange: %s', exchange_name) - # # Note: using functools.partial is not required, it is demonstrating - # # how arbitrary data can be passed to the callback when it is called - # cb = functools.partial( - # self.on_exchange_declareok, userdata=exchange_name) - # self._channel.exchange_declare( - # exchange=exchange_name, - # exchange_type=self.EXCHANGE_TYPE, - # callback=cb) - - # def setup_queue(self, queue_name): - # """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC - # command. When it is complete, the on_queue_declareok method will - # be invoked by pika. - - # :param str|unicode queue_name: The name of the queue to declare. - - # """ - # LOGGER.info('Declaring queue %s', queue_name) - # cb = functools.partial(self.on_queue_declareok, userdata=queue_name) - # self._channel.queue_declare(queue=queue_name, callback=cb) - - # def on_queue_declareok(self, _unused_frame, userdata): - # """Method invoked by pika when the Queue.Declare RPC call made in - # setup_queue has completed. In this method we will bind the queue - # and exchange together with the routing key by issuing the Queue.Bind - # RPC command. When this command is complete, the on_bindok method will - # be invoked by pika. - - # :param pika.frame.Method _unused_frame: The Queue.DeclareOk frame - # :param str|unicode userdata: Extra user data (queue name) - - # """ - # queue_name = userdata - # LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, queue_name, - # self.ROUTING_KEY) - # cb = functools.partial(self.on_bindok, userdata=queue_name) - # self._channel.queue_bind( - # queue_name, - # self.EXCHANGE, - # routing_key=self.ROUTING_KEY, - # callback=cb) - def on_queue_declareok(self, _unused_frame, userdata): """Method invoked by pika when the Queue.Declare RPC call made in setup_queue has completed. In this method we will bind the queue @@ -688,7 +669,23 @@ def on_queue_declareok(self, _unused_frame, userdata): """ queue_name = userdata - self.logger.debug(f'Queue "{queue_name}" declared') + self.logger.info(f"Binding {self._exchange} to queue '{queue_name}' with key '{self._routing_key}'") + cb = functools.partial(self.on_bindok, userdata=queue_name) + self._channel.queue_bind( + queue_name, + self._exchange, + routing_key=self._routing_key, + callback=cb) + + def on_bindok(self, _unused_frame, userdata): + """Invoked by pika when the Queue.Bind method has completed. At this + point we will set the prefetch count for the channel. + + :param pika.frame.Method _unused_frame: The Queue.BindOk response frame + :param str|unicode userdata: Extra user data (queue name) + + """ + self.logger.debug(f"Queue bound: '{userdata}'") self.set_qos() def set_qos(self): @@ -840,49 +837,52 @@ def __init__( user: str, password: str, cert: str, - queue: str, + routing_key: str, prefetch_count: int = 1, on_message_cb: Optional[Callable] = None, on_close_cb: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ): super().__init__( - host, - port, - vhost, - user, - password, - cert, - queue, - prefetch_count, - on_message_cb, - on_close_cb, - logger, - ) - - # Callback when the channel is open - def on_channel_open(self, channel): - self._channel = channel - self.logger.debug("Channel opened") - self.add_on_channel_close_callback() - self._channel.exchange_declare( + host=host, + port=port, + vhost=vhost, + user=user, + password=password, + cert=cert, exchange="control-panel", + routing_key=routing_key, + queue="", exchange_type="fanout", - callback=self.on_exchange_declared, + prefetch_count=prefetch_count, + on_message_cb=on_message_cb, + on_close_cb=on_close_cb, + logger=logger, ) - # Callback when the exchange is declared - def on_exchange_declared(self, frame): - self._channel.queue_declare(queue="", exclusive=True, callback=self.on_queue_declared) + # # Callback when the channel is open + # def on_channel_open(self, channel): + # self._channel = channel + # self.logger.debug("Channel opened") + # self.add_on_channel_close_callback() + # self._channel.exchange_declare( + # exchange="control-panel", + # exchange_type="fanout", + # callback=self.on_exchange_declared, + # ) - # Callback when the queue is declared - def on_queue_declared(self, queue_result): - self._queue = queue_result.method.queue - self._channel.queue_bind(exchange="control-panel", queue=self._queue, callback=self.on_queue_bound) + # # Callback when the exchange is declared + # def on_exchange_declared(self, frame): + # self._channel.queue_declare(queue="", exclusive=True, callback=self.on_queue_declared) - # Callback when the queue is bound to the exchange - def on_queue_bound(self, frame): - self.set_qos() + # # Callback when the queue is declared + # def on_queue_declared(self, queue_result): + # self._queue = queue_result.method.queue + # self._channel.queue_bind(exchange="control-panel", queue=self._queue, callback=self.on_queue_bound) + + # # Callback when the queue is bound to the exchange + # def on_queue_bound(self, frame): + # self.set_qos() class AMSSyncProducer: @@ -1015,7 +1015,8 @@ class AMSRMQConfiguration: "rabbitmq-user": "", "rabbitmq-vhost": "", "rabbitmq-cert": "", - "rabbitmq-queue-physics": "", + "rabbitmq-exchange-physics": "", + "rabbitmq-key-physics": "", "rabbitmq-exchange-training": "", "rabbitmq-key-training": "" }, @@ -1030,7 +1031,8 @@ class AMSRMQConfiguration: rabbitmq_user: str rabbitmq_vhost: str rabbitmq_cert: str - rabbitmq_queue_physics: str + rabbitmq_exchange_physics: str + rabbitmq_key_physics: str rabbitmq_exchange_training: str = "" rabbitmq_key_training: str = "" rabbitmq_ml_submit_queue: str = "" @@ -1062,7 +1064,8 @@ def to_dict(self, AMSlib=False): "rabbitmq-user": self.rabbitmq_user, "rabbitmq-vhost": self.rabbitmq_vhost, "rabbitmq-cert": self.rabbitmq_cert, - "rabbitmq-queue-physics": self.rabbitmq_queue_physics, + "rabbitmq-exchange-physics": self.rabbitmq_exchange_physics, + "rabbitmq-key-physics": self.rabbitmq_key_physics, "rabbitmq-exchange-training": self.rabbitmq_exchange_training, "rabbitmq-key-training": self.rabbitmq_key_training, "rabbitmq-ml-submit-queue": self.rabbitmq_ml_submit_queue, diff --git a/src/AMSWorkflow/ams/stage.py b/src/AMSWorkflow/ams/stage.py index 3442b7dd..c9921e6b 100644 --- a/src/AMSWorkflow/ams/stage.py +++ b/src/AMSWorkflow/ams/stage.py @@ -262,7 +262,7 @@ class RMQDomainDataLoaderTask(Task): """ A RMQDomainDataLoaderTask consumes 'AMSMessages' from RabbitMQ bundles the data of the files into batches and forwards them to the next task waiting on the - output queuee. + output queue. Attributes: o_queue: The output queue to write the transformed messages @@ -281,14 +281,17 @@ def __init__( user, password, cert, - rmq_queue, + rmq_exchange, + rmq_routing_key, policy, prefetch_count=1, signals=[signal.SIGINT, signal.SIGUSR1], ): self.o_queue = o_queue self.cert = cert - self.rmq_queue = rmq_queue + # self.rmq_queue = rmq_queue + self.rmq_exchange = rmq_exchange + self.rmq_routing_key = rmq_routing_key self.prefetch_count = prefetch_count self.datasize_byte = 0 self.total_time_ns = 0 @@ -311,7 +314,9 @@ def __init__( user=user, password=password, cert=self.cert, - queue=self.rmq_queue, + exchange=self.rmq_exchange, + routing_key=self.rmq_routing_key, + queue="", on_message_cb=self.callback_message, on_close_cb=self.callback_close, prefetch_count=self.prefetch_count, @@ -1061,7 +1066,8 @@ def __init__( user, password, cert, - data_queue, + exchange, + routing_key, model_update_queue=None, ): """ @@ -1075,10 +1081,12 @@ def __init__( self._user = user self._password = password self._cert = Path(cert) - self._data_queue = data_queue + # self._data_queue = data_queue + self._exchange = exchange + self._routing_key = routing_key self._model_update_queue = model_update_queue - print("Received a data queue of", self._data_queue) - print("Received a model_update queue of", self._model_update_queue) + print(f"Received data from exchange {self._exchange} / rkey {self._routing_key}") + print(f"Received a model_update queue of {self._model_update_queue}") self._gracefull_shutdown = None self._o_queue = None @@ -1101,7 +1109,8 @@ def get_load_task(self, o_queue, policy): self._user, self._password, self._cert, - self._data_queue, + self._exchange, + self._routing_key, policy, prefetch_count=1, ) @@ -1176,7 +1185,8 @@ def from_cli(cls, args): config.rabbitmq_user, config.rabbitmq_password, config.rabbitmq_cert, - config.rabbitmq_queue_physics, + config.rabbitmq_exchange_physics, + config.rabbitmq_key_physics, config.rabbitmq_exchange_training if args.update_rmq_models else None, ) From 660d62c6edbc6f1f7b31beacdc25006aae6ba946 Mon Sep 17 00:00:00 2001 From: Loic Pottier Date: Thu, 5 Jun 2025 09:14:58 -0700 Subject: [PATCH 4/7] RMQ Prefetch default value is now 0 Signed-off-by: Loic Pottier --- src/AMSWorkflow/ams/rmq.py | 4 ++-- src/AMSWorkflow/ams/stage.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/AMSWorkflow/ams/rmq.py b/src/AMSWorkflow/ams/rmq.py index cdb0ecf1..6366f445 100644 --- a/src/AMSWorkflow/ams/rmq.py +++ b/src/AMSWorkflow/ams/rmq.py @@ -421,7 +421,7 @@ def __init__( exchange: str, routing_key: str, queue: str = "", - prefetch_count: int = 1, + prefetch_count: int = 0, exchange_type: str = "direct", on_message_cb: Optional[Callable] = None, on_close_cb: Optional[Callable] = None, @@ -838,7 +838,7 @@ def __init__( password: str, cert: str, routing_key: str, - prefetch_count: int = 1, + prefetch_count: int = 0, on_message_cb: Optional[Callable] = None, on_close_cb: Optional[Callable] = None, logger: Optional[logging.Logger] = None, diff --git a/src/AMSWorkflow/ams/stage.py b/src/AMSWorkflow/ams/stage.py index c9921e6b..a91a723f 100644 --- a/src/AMSWorkflow/ams/stage.py +++ b/src/AMSWorkflow/ams/stage.py @@ -284,7 +284,7 @@ def __init__( rmq_exchange, rmq_routing_key, policy, - prefetch_count=1, + prefetch_count = 0, signals=[signal.SIGINT, signal.SIGUSR1], ): self.o_queue = o_queue @@ -433,7 +433,7 @@ def __init__( user: str, password: str, cert: str, - prefetch_count: int = 1, + prefetch_count: int = 0, ): self._consumers = consumers super().__init__( @@ -1112,7 +1112,7 @@ def get_load_task(self, o_queue, policy): self._exchange, self._routing_key, policy, - prefetch_count=1, + prefetch_count = 0, ) self._o_queue = o_queue self._gracefull_shutdown = AMSShutdown( From 6fdc24446045df8cae7672910a35ff4e34bf661e Mon Sep 17 00:00:00 2001 From: Loic Pottier Date: Thu, 5 Jun 2025 09:20:41 -0700 Subject: [PATCH 5/7] Updated RMQ tests Signed-off-by: Loic Pottier --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1a3cca14..bf83331b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -285,7 +285,8 @@ jobs: \"service-port\": ${RABBITMQ_PORT}, \"service-host\": \"${RABBITMQ_HOST}\", \"rabbitmq-vhost\": \"/\", - \"rabbitmq-queue-physics\": \"test-ci\", + \"rabbitmq-exchange-physics\": \"exchange-ci\", + \"rabbitmq-key-physics\": \"queue-ci\", \"rabbitmq-exchange-training\": \"ams-fanout\", \"rabbitmq-key-training\": \"training\" }""" > rmq.json From 5d66b574b1527b617214c27c5f9116462ef6fd94 Mon Sep 17 00:00:00 2001 From: Loic Pottier Date: Thu, 5 Jun 2025 10:02:27 -0700 Subject: [PATCH 6/7] Cleanup python code Signed-off-by: Loic Pottier --- src/AMSWorkflow/ams/rmq.py | 38 ++++-------------------- tests/AMSlib/ams_interface/verify_ete.py | 8 +++-- 2 files changed, 11 insertions(+), 35 deletions(-) diff --git a/src/AMSWorkflow/ams/rmq.py b/src/AMSWorkflow/ams/rmq.py index 6366f445..31ed4655 100644 --- a/src/AMSWorkflow/ams/rmq.py +++ b/src/AMSWorkflow/ams/rmq.py @@ -17,8 +17,6 @@ import numpy as np import json import pika -# from pika.exchange_type import ExchangeType - class AMSMessage(object): """ @@ -228,13 +226,14 @@ def __init__( connection, exchange, routing_key, + queue = "", callback: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ): self.connection = connection self.exchange = exchange self.routing_key = routing_key - self.q_name = None + self.q_name = queue self.logger = logger if logger else logging.getLogger(__name__) self.callback = callback if callback else self.default_callback @@ -251,13 +250,11 @@ def default_callback(self, method, properties, body): def open(self): self.channel = self.connection.channel() - q_name = self.routing_key if self.exchange != '': self.logger.info(f"Declared exchange {self.exchange}") self.channel.exchange_declare(exchange = self.exchange, exchange_type = "direct") - q_name = "ams-debug" #TODO CHANGE - result = self.channel.queue_declare(queue = q_name, exclusive = False, durable = False) + result = self.channel.queue_declare(queue = self.q_name, exclusive = False, durable = False) self.q_name = result.method.queue self.logger.info(f"Declared queue {self.q_name}") if self.exchange != '': @@ -389,11 +386,11 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.connection.close() - def connect(self, exchange, routing_key): + def connect(self, exchange, routing_key, queue = ""): """ Connect to the exchange and routing key. """ - return AMSChannel(self.connection, exchange, routing_key, self.callback) + return AMSChannel(self.connection, exchange, routing_key, queue, self.callback) class StatusPoller(BlockingClient): @@ -860,31 +857,6 @@ def __init__( logger=logger, ) - # # Callback when the channel is open - # def on_channel_open(self, channel): - # self._channel = channel - # self.logger.debug("Channel opened") - # self.add_on_channel_close_callback() - # self._channel.exchange_declare( - # exchange="control-panel", - # exchange_type="fanout", - # callback=self.on_exchange_declared, - # ) - - # # Callback when the exchange is declared - # def on_exchange_declared(self, frame): - # self._channel.queue_declare(queue="", exclusive=True, callback=self.on_queue_declared) - - # # Callback when the queue is declared - # def on_queue_declared(self, queue_result): - # self._queue = queue_result.method.queue - # self._channel.queue_bind(exchange="control-panel", queue=self._queue, callback=self.on_queue_bound) - - # # Callback when the queue is bound to the exchange - # def on_queue_bound(self, frame): - # self.set_qos() - - class AMSSyncProducer: def __init__( self, diff --git a/tests/AMSlib/ams_interface/verify_ete.py b/tests/AMSlib/ams_interface/verify_ete.py index 8776ac60..29257581 100644 --- a/tests/AMSlib/ams_interface/verify_ete.py +++ b/tests/AMSlib/ams_interface/verify_ete.py @@ -282,8 +282,12 @@ def get_rmq_data(ams_config, domain_names, num_iterations, timeout=10): cert = rmq_json["rabbitmq-cert"] cert = None if cert == "" else cert with BlockingClient(host, port, vhost, user, password, cert, default_ams_callback) as client: - queue = "ams-debug-queue-0" # TODO: fix that - with client.connect(exchange, rkey) as channel: + # For testing purpose we expect data from Rank 0 + # using the debug mode enabled by setting AMS_USE_NAMED_QUEUE + # This allows us to use named queue which can retain data. + # without that, running the sender before the receiver would not work. + queue = "ams-debug-queue-0" + with client.connect(exchange, rkey, queue) as channel: msgs = channel.receive(n_msg=num_iterations * len(domain_names), timeout=timeout) dns = set(domain_names) From ac3b7a319a88499bc403df060b542c7f3d98989e Mon Sep 17 00:00:00 2001 From: Loic Pottier Date: Thu, 5 Jun 2025 10:09:53 -0700 Subject: [PATCH 7/7] Clang-format on basedb.hpp Signed-off-by: Loic Pottier --- src/AMSlib/wf/basedb.hpp | 116 +++++++++++++++++++++------------------ 1 file changed, 62 insertions(+), 54 deletions(-) diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp index 52c51fed..468c5dc2 100644 --- a/src/AMSlib/wf/basedb.hpp +++ b/src/AMSlib/wf/basedb.hpp @@ -1285,52 +1285,57 @@ class ConnectionManagerAMQP }); _channel->declareExchange(_exchange_sender, AMQP::ExchangeType::direct) - .onSuccess([&]() { - DBG(ConnectionManagerAMQP, "declared exchange: %s", _exchange_sender.c_str()) - - _channel->declareQueue(_queue_name) - .onSuccess([&](const std::string& queue_name, - uint32_t messagecount, - uint32_t consumercount) { + .onSuccess([&]() { DBG(ConnectionManagerAMQP, - "declared queue: %s (messagecount=%d, " - "consumercount=%d)", - queue_name.c_str(), - messagecount, - consumercount) - // We bind the anonymous queue to the exchange - _channel->bindQueue(_exchange_sender, queue_name, _routing_key_sender) - .onSuccess([&, queue_name]() { - DBG(ConnectionManagerAMQP, - "Bounded queue %s to exchange %s with " - "routing key = %s", - queue_name.c_str(), - _exchange_sender.c_str(), - _routing_key_sender.c_str()) - }) // bindQueue - .onError([&](const char* message) { - WARNING(ConnectionManagerAMQP, - "Error while binding queue to exchange " - "%s", - message) - _isConnected = false; - }); // bindQueue - }) //declareQueue + "declared exchange: %s", + _exchange_sender.c_str()) + + _channel->declareQueue(_queue_name) + .onSuccess([&](const std::string& queue_name, + uint32_t messagecount, + uint32_t consumercount) { + DBG(ConnectionManagerAMQP, + "declared queue: %s (messagecount=%d, " + "consumercount=%d)", + queue_name.c_str(), + messagecount, + consumercount) + // We bind the anonymous queue to the exchange + _channel + ->bindQueue(_exchange_sender, + queue_name, + _routing_key_sender) + .onSuccess([&, queue_name]() { + DBG(ConnectionManagerAMQP, + "Bounded queue %s to exchange %s with " + "routing key = %s", + queue_name.c_str(), + _exchange_sender.c_str(), + _routing_key_sender.c_str()) + }) // bindQueue + .onError([&](const char* message) { + WARNING(ConnectionManagerAMQP, + "Error while binding queue to exchange " + "%s", + message) + _isConnected = false; + }); // bindQueue + }) //declareQueue + .onError([&](const char* message) { + WARNING(ConnectionManagerAMQP, + "Error while creating queue: " + "%s", + message) + _isConnected = false; + }); //declareQueue + }) // declareExchange .onError([&](const char* message) { WARNING(ConnectionManagerAMQP, - "Error while creating queue: " + "Error while creating exchange: " "%s", message) _isConnected = false; - }); //declareQueue - }) // declareExchange - .onError([&](const char* message) { - WARNING(ConnectionManagerAMQP, - "Error while creating exchange: " - "%s", - message) - _isConnected = false; - }); // declareExchange + }); // declareExchange _isConnected = true; _reliableChannel = @@ -1480,7 +1485,9 @@ class RMQInterface bool amsRMQFailure = checkEnvVariable("AMS_SIMULATE_RMQ_FAILURE"); bool amsRMQNamedQueue = checkEnvVariable("AMS_USE_NAMED_QUEUE"); CWARNING(RMQInterface, amsRMQFailure, "Simulating connetion drops") - CWARNING(RMQInterface, amsRMQNamedQueue, "Using named queue for RabbitMQ (slower)") + CWARNING(RMQInterface, + amsRMQNamedQueue, + "Using named queue for RabbitMQ (slower)") // If the queue_name is equals to "" RabbitMQ will create a queue for us (anonymous queue) // For debug and test we can also force RMQ to use a specific queue name which enforce message @@ -1490,19 +1497,20 @@ class RMQInterface queue_name = "ams-debug-queue-" + std::to_string(_rId); } - _publishingManager = std::make_unique(_rId, - rmq_user, - rmq_password, - rmq_vhost, - service_host, - service_port, - rmq_cert, - exchange_physics, - routing_key_physics, - exchange_ml, - routing_key_ml, - amsRMQFailure, - queue_name); + _publishingManager = + std::make_unique(_rId, + rmq_user, + rmq_password, + rmq_vhost, + service_host, + service_port, + rmq_cert, + exchange_physics, + routing_key_physics, + exchange_ml, + routing_key_ml, + amsRMQFailure, + queue_name); _updateSurrogate = updateSurrogate; }