From 7de5445fcda1beb0dbc79e27534c12d6b8c89338 Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Mon, 24 Aug 2020 17:56:15 +0000 Subject: [PATCH 1/8] UCP/WIREUP: Add flush+destroy of UCT EPs used by TMP EP lanes --- src/ucp/wireup/wireup.c | 35 ++++++++++- src/ucp/wireup/wireup.h | 4 ++ src/ucp/wireup/wireup_ep.c | 126 ++++++++++++++++++++++++++++++++++++- src/ucp/wireup/wireup_ep.h | 1 + 4 files changed, 160 insertions(+), 6 deletions(-) diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index a1fb9138837..5c818743dbd 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -271,9 +271,9 @@ ucp_wireup_match_p2p_lanes(ucp_ep_h ep, static ucs_status_t ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane, - const ucp_unpacked_address_t *remote_address, - const uct_ep_addr_t **ep_addr_p, - const uct_device_addr_t **dev_addr_p) + const ucp_unpacked_address_t *remote_address, + const uct_ep_addr_t **ep_addr_p, + const uct_device_addr_t **dev_addr_p) { const ucp_address_entry_t *address; unsigned ep_addr_index; @@ -292,6 +292,35 @@ ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane, return UCS_ERR_UNREACHABLE; } +static ucp_lane_index_t +ucp_wireup_ep_lane_used_by_another_ep_config(ucp_ep_config_key_t *ep_config_key, + ucp_ep_config_key_t *another_ep_config_key, + ucp_lane_index_t lane) +{ + ucp_lane_index_t another_lane; + + for (another_lane = 0; another_lane < another_ep_config_key->num_lanes; + ++another_lane) { + if (ucp_ep_config_lane_is_equal(ep_config_key, + another_ep_config_key, + lane, 0)) { + return another_lane; + } + } + + return UCP_NULL_LANE; +} + +ucp_lane_index_t ucp_wireup_ep_lane_used_by_another_ep(ucp_ep_h ep, + ucp_ep_h another_ep, + ucp_lane_index_t lane) +{ + return ucp_wireup_ep_lane_used_by_another_ep_config( + &ucp_ep_config(ep)->key, + &ucp_ep_config(another_ep)->key, + lane); +} + ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, const ucp_unpacked_address_t *remote_address, diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index 6b57608064d..3bb52f0373e 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -145,6 +145,10 @@ void ucp_wireup_remote_connected(ucp_ep_h ep); unsigned ucp_ep_init_flags(const ucp_worker_h worker, const ucp_ep_params_t *params); +ucp_lane_index_t ucp_wireup_ep_lane_used_by_another_ep(ucp_ep_h ep, + ucp_ep_h another_ep, + ucp_lane_index_t lane); + ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, const ucp_unpacked_address_t *remote_address, diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index a462c05e01e..0f7f94aa074 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -43,6 +43,113 @@ ucp_wireup_ep_connect_to_ep(uct_ep_h uct_ep, const uct_device_addr_t *dev_addr, return uct_ep_connect_to_ep(wireup_ep->super.uct_ep, dev_addr, ep_addr); } +static void +ucp_wireup_tmp_ep_destroy_complete_cb(void *request, ucs_status_t status, + void *user_data) +{ + ucp_wireup_ep_t *wireup_ep = (ucp_wireup_ep_t*)user_data; + + if (wireup_ep == NULL) { + /* check for NULL pointer to workaround Coverity warning (it wrongly + * assumes that this callback could be called upon GET/PUT operation) */ + ucs_fatal("req=%p: user_data passed to the TMP EP destroy cb is NULL", + (ucp_request_t*)request - 1); + } + + wireup_ep->flags &= ~UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP; + ucs_assert(wireup_ep->tmp_ep == NULL); + ucp_request_release(request); +} + +static unsigned ucp_wireup_tmp_ep_disconnect_progress(void *arg) +{ + ucp_request_t *req = (ucp_request_t*)arg; + ucp_ep_h tmp_ep = req->send.ep; + ucp_worker_h worker = tmp_ep->worker; + ucs_async_context_t *async = &worker->async; + + UCS_ASYNC_BLOCK(async); + ucp_ep_disconnected(tmp_ep, 1); + --worker->flush_ops_count; + ucp_request_complete_send(req, req->status); + UCS_ASYNC_UNBLOCK(async); + + return 1; +} + +static void ucp_wireup_tmp_ep_flushed_cb(ucp_request_t *req) +{ + uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; + ucp_ep_h tmp_ep = req->send.ep; + + uct_worker_progress_register_safe(tmp_ep->worker->uct, + ucp_wireup_tmp_ep_disconnect_progress, + req, UCS_CALLBACKQ_FLAG_ONESHOT, &cb_id); +} + +/* the following values could be returned from the function: + * - true: destroying of the TMP EP was completed inplace, if the complete_cb + * was specified, it wouldn't be called + * - false: destroying of the TMP EP is in progress now, if the complete_cb + * was specified, it would be called upon completion the destroying + * of the TMP EP */ +int ucp_wireup_tmp_ep_destroy(ucp_ep_h ep, ucp_wireup_ep_t *wireup_ep, + unsigned ep_flush_flags, + ucp_send_nbx_callback_t complete_cb) +{ + ucp_ep_h tmp_ep = wireup_ep->tmp_ep; + ucp_worker_h worker = tmp_ep->worker; + ucp_request_param_t param = ucp_request_null_param; + ucp_lane_index_t lane, found_lane; + uct_ep_h uct_ep; + void *req; + + ucs_assert(tmp_ep != ep); + + /* to prevent flush+destroy UCT EPs that are used by the main EP, + * they have to be remove from the TMP EP lanes and their WIREUP + * EPs have to be destroyed */ + for (lane = 0; lane < ucp_ep_num_lanes(tmp_ep); ++lane) { + if (tmp_ep->uct_eps[lane] != NULL) { + found_lane = ucp_wireup_ep_lane_used_by_another_ep(tmp_ep, ep, lane); + if (found_lane != UCP_NULL_LANE) { + uct_ep = tmp_ep->uct_eps[lane]; + ucs_assert(ucp_wireup_ep_test(uct_ep) && + !ucp_wireup_ep(uct_ep)->super.is_owner); + + ucs_debug("ep %p: destroy uct_ep[%d]=%p", ep, lane, uct_ep); + uct_ep_destroy(uct_ep); + tmp_ep->uct_eps[lane] = NULL; + } + } + } + + if (complete_cb != NULL) { + param.op_attr_mask |= UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_USER_DATA; + param.cb.send = complete_cb; + param.user_data = wireup_ep; + } + + wireup_ep->tmp_ep = NULL; + + req = ucp_ep_flush_internal(tmp_ep, ep_flush_flags, 0, ¶m, NULL, + ucp_wireup_tmp_ep_flushed_cb, + "tmp_ep_flushed_cb"); + if (req != NULL) { + if (!UCS_PTR_IS_ERR(req)) { + ++worker->flush_ops_count; + return 0; + } + + ucs_error("ucp_ep_flush_internal() completed with error: %s", + ucs_status_string(UCS_PTR_STATUS(req))); + } + + ucp_ep_disconnected(tmp_ep, 1); + return 1; +} + /* * We switch the endpoint in this function (instead in wireup code) since * this is guaranteed to run from the main thread. @@ -50,10 +157,11 @@ ucp_wireup_ep_connect_to_ep(uct_ep_h uct_ep, const uct_device_addr_t *dev_addr, static unsigned ucp_wireup_ep_progress(void *arg) { ucp_wireup_ep_t *wireup_ep = arg; - ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; + ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; ucs_queue_head_t tmp_pending_queue; uct_pending_req_t *uct_req; ucp_request_t *req; + int ret; UCS_ASYNC_BLOCK(&ucp_ep->worker->async); @@ -75,6 +183,18 @@ static unsigned ucp_wireup_ep_progress(void *arg) goto out_unblock; } + if (wireup_ep->tmp_ep != NULL) { + ret = ucp_wireup_tmp_ep_destroy(ucp_ep, wireup_ep, UCT_FLUSH_FLAG_LOCAL, + ucp_wireup_tmp_ep_destroy_complete_cb); + if (!ret) { + wireup_ep->flags |= UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP; + goto out_unblock; + } + } else if (wireup_ep->flags & UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP) { + /* destroying of TMP EP is in progress, return from the function */ + goto out_unblock; + } + ucs_trace("ep %p: switching wireup_ep %p to ready state", ucp_ep, wireup_ep); /* Move wireup pending queue to temporary queue and remove references to @@ -386,12 +506,12 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_wireup_ep_t) uct_ep_destroy(self->sockaddr_ep); } + UCS_ASYNC_BLOCK(&worker->async); if (self->tmp_ep != NULL) { ucs_assert(!(self->tmp_ep->flags & UCP_EP_FLAG_USED)); - ucp_ep_disconnected(self->tmp_ep, 1); + ucp_wireup_tmp_ep_destroy(ucp_ep, self, UCT_FLUSH_FLAG_CANCEL, NULL); } - UCS_ASYNC_BLOCK(&worker->async); --worker->flush_ops_count; UCS_ASYNC_UNBLOCK(&worker->async); } diff --git a/src/ucp/wireup/wireup_ep.h b/src/ucp/wireup/wireup_ep.h index efa3eb60f17..86c76cb0978 100644 --- a/src/ucp/wireup/wireup_ep.h +++ b/src/ucp/wireup/wireup_ep.h @@ -21,6 +21,7 @@ enum { UCP_WIREUP_EP_FLAG_READY = UCS_BIT(0), /**< next_ep is fully connected */ UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(1), /**< Debug: next_ep connected to remote */ + UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP = UCS_BIT(2) /**< wireup ep is in progress to destroy TMP EP */ }; From 631b32c6ea627fa70fab84809359b7c7b3aa9f42 Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Thu, 27 Aug 2020 07:13:54 +0000 Subject: [PATCH 2/8] UCP/WIREUP: Fix review comments --- src/ucp/wireup/wireup_ep.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index 0f7f94aa074..8ba81948095 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -49,12 +49,11 @@ ucp_wireup_tmp_ep_destroy_complete_cb(void *request, ucs_status_t status, { ucp_wireup_ep_t *wireup_ep = (ucp_wireup_ep_t*)user_data; - if (wireup_ep == NULL) { - /* check for NULL pointer to workaround Coverity warning (it wrongly - * assumes that this callback could be called upon GET/PUT operation) */ - ucs_fatal("req=%p: user_data passed to the TMP EP destroy cb is NULL", - (ucp_request_t*)request - 1); - } + /* check for NULL pointer to workaround Coverity warning (it wrongly + * assumes that this callback could be called upon GET/PUT operation) */ + ucs_assertv_always(wireup_ep == NULL, + "req=%p: user_data passed to the TMP EP destroy cb " + "mustn't be NULL", (ucp_request_t*)request - 1); wireup_ep->flags &= ~UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP; ucs_assert(wireup_ep->tmp_ep == NULL); @@ -107,7 +106,7 @@ int ucp_wireup_tmp_ep_destroy(ucp_ep_h ep, ucp_wireup_ep_t *wireup_ep, ucs_assert(tmp_ep != ep); /* to prevent flush+destroy UCT EPs that are used by the main EP, - * they have to be remove from the TMP EP lanes and their WIREUP + * they have to be removed from the TMP EP lanes and their WIREUP * EPs have to be destroyed */ for (lane = 0; lane < ucp_ep_num_lanes(tmp_ep); ++lane) { if (tmp_ep->uct_eps[lane] != NULL) { From 72190be12dc5c798eb415862774ea15dcd53e8b2 Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Fri, 28 Aug 2020 12:06:20 +0000 Subject: [PATCH 3/8] UCP/WIREUP: Fix comments --- src/ucp/core/ucp_ep.c | 16 +++++++++------- src/ucp/core/ucp_ep.h | 4 +++- src/ucp/wireup/wireup.c | 20 +++++--------------- src/ucp/wireup/wireup.h | 7 ++++--- src/ucp/wireup/wireup_ep.c | 5 ++++- 5 files changed, 25 insertions(+), 27 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index cbb7595fee2..760d04efca9 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1016,13 +1016,15 @@ void ucp_ep_destroy(ucp_ep_h ep) int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2, - ucp_lane_index_t lane, int compare_types) + ucp_lane_index_t lane1, + ucp_lane_index_t lane2, + int compare_types) { - return (key1->lanes[lane].rsc_index == key2->lanes[lane].rsc_index) && - (key1->lanes[lane].proxy_lane == key2->lanes[lane].proxy_lane) && - (key1->lanes[lane].dst_md_index == key2->lanes[lane].dst_md_index) && - (key1->lanes[lane].path_index == key2->lanes[lane].path_index) && - ((key1->lanes[lane].lane_types == key2->lanes[lane].lane_types) || + return (key1->lanes[lane1].rsc_index == key2->lanes[lane2].rsc_index) && + (key1->lanes[lane1].proxy_lane == key2->lanes[lane2].proxy_lane) && + (key1->lanes[lane1].dst_md_index == key2->lanes[lane2].dst_md_index) && + (key1->lanes[lane1].path_index == key2->lanes[lane2].path_index) && + ((key1->lanes[lane1].lane_types == key2->lanes[lane2].lane_types) || !compare_types); } @@ -1052,7 +1054,7 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, } for (lane = 0; lane < key1->num_lanes; ++lane) { - if (!ucp_ep_config_lane_is_equal(key1, key2, lane, 1)) + if (!ucp_ep_config_lane_is_equal(key1, key2, lane, lane, 1)) { return 0; } diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index e04c33c5667..c39bc488b61 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -538,7 +538,9 @@ void ucp_ep_config_cleanup(ucp_worker_h worker, ucp_ep_config_t *config); int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2, - ucp_lane_index_t lane, int compare_types); + ucp_lane_index_t lane1, + ucp_lane_index_t lane2, + int compare_types); int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2); diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 5c818743dbd..040b9ca2b95 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -292,10 +292,10 @@ ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane, return UCS_ERR_UNREACHABLE; } -static ucp_lane_index_t -ucp_wireup_ep_lane_used_by_another_ep_config(ucp_ep_config_key_t *ep_config_key, - ucp_ep_config_key_t *another_ep_config_key, - ucp_lane_index_t lane) +ucp_lane_index_t +ucp_wireup_ep_configs_use_same_lane(ucp_ep_config_key_t *ep_config_key, + ucp_ep_config_key_t *another_ep_config_key, + ucp_lane_index_t lane) { ucp_lane_index_t another_lane; @@ -303,7 +303,7 @@ ucp_wireup_ep_lane_used_by_another_ep_config(ucp_ep_config_key_t *ep_config_key, ++another_lane) { if (ucp_ep_config_lane_is_equal(ep_config_key, another_ep_config_key, - lane, 0)) { + lane, another_lane, 0)) { return another_lane; } } @@ -311,16 +311,6 @@ ucp_wireup_ep_lane_used_by_another_ep_config(ucp_ep_config_key_t *ep_config_key, return UCP_NULL_LANE; } -ucp_lane_index_t ucp_wireup_ep_lane_used_by_another_ep(ucp_ep_h ep, - ucp_ep_h another_ep, - ucp_lane_index_t lane) -{ - return ucp_wireup_ep_lane_used_by_another_ep_config( - &ucp_ep_config(ep)->key, - &ucp_ep_config(another_ep)->key, - lane); -} - ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, const ucp_unpacked_address_t *remote_address, diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index 3bb52f0373e..e8a94a9d3d3 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -145,9 +145,10 @@ void ucp_wireup_remote_connected(ucp_ep_h ep); unsigned ucp_ep_init_flags(const ucp_worker_h worker, const ucp_ep_params_t *params); -ucp_lane_index_t ucp_wireup_ep_lane_used_by_another_ep(ucp_ep_h ep, - ucp_ep_h another_ep, - ucp_lane_index_t lane); +ucp_lane_index_t +ucp_wireup_ep_configs_use_same_lane(ucp_ep_config_key_t *ep_config_key, + ucp_ep_config_key_t *another_ep_config_key, + ucp_lane_index_t lane); ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index 8ba81948095..ed83af37800 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -110,7 +110,10 @@ int ucp_wireup_tmp_ep_destroy(ucp_ep_h ep, ucp_wireup_ep_t *wireup_ep, * EPs have to be destroyed */ for (lane = 0; lane < ucp_ep_num_lanes(tmp_ep); ++lane) { if (tmp_ep->uct_eps[lane] != NULL) { - found_lane = ucp_wireup_ep_lane_used_by_another_ep(tmp_ep, ep, lane); + found_lane = + ucp_wireup_ep_configs_use_same_lane(&ucp_ep_config(tmp_ep)->key, + &ucp_ep_config(ep)->key, + lane); if (found_lane != UCP_NULL_LANE) { uct_ep = tmp_ep->uct_eps[lane]; ucs_assert(ucp_wireup_ep_test(uct_ep) && From cdd175c99ea40253fbb8c80ed95ced894f8196ab Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Fri, 28 Aug 2020 15:37:33 +0000 Subject: [PATCH 4/8] UCP/WIREUP/CORE: Fix review comments --- src/ucp/core/ucp_ep.c | 20 ++++++++++++-------- src/ucp/core/ucp_ep.h | 2 +- src/ucp/wireup/wireup.c | 2 +- src/ucp/wireup/wireup_ep.c | 6 +++++- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 760d04efca9..12d641d9d7c 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1018,14 +1018,18 @@ int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2, ucp_lane_index_t lane1, ucp_lane_index_t lane2, - int compare_types) + int compare_tl_only) { - return (key1->lanes[lane1].rsc_index == key2->lanes[lane2].rsc_index) && - (key1->lanes[lane1].proxy_lane == key2->lanes[lane2].proxy_lane) && - (key1->lanes[lane1].dst_md_index == key2->lanes[lane2].dst_md_index) && - (key1->lanes[lane1].path_index == key2->lanes[lane2].path_index) && - ((key1->lanes[lane1].lane_types == key2->lanes[lane2].lane_types) || - !compare_types); + return compare_tl_only ? + /* compare only TL configuration indexes */ + ((key1->lanes[lane1].rsc_index == key2->lanes[lane2].rsc_index) && + (key1->lanes[lane1].dst_md_index == key2->lanes[lane2].dst_md_index) && + (key1->lanes[lane1].path_index == key2->lanes[lane2].path_index)) : + /* compare the whole TL lane configurations, i.e. it should be + * the same TLs and they should point to the same proxy lane and + * have the same lane types */ + !memcmp(&key1->lanes[lane1], &key2->lanes[lane2], + sizeof(*key1->lanes)); } int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, @@ -1054,7 +1058,7 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, } for (lane = 0; lane < key1->num_lanes; ++lane) { - if (!ucp_ep_config_lane_is_equal(key1, key2, lane, lane, 1)) + if (!ucp_ep_config_lane_is_equal(key1, key2, lane, lane, 0)) { return 0; } diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index c39bc488b61..b324e159f52 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -540,7 +540,7 @@ int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2, ucp_lane_index_t lane1, ucp_lane_index_t lane2, - int compare_types); + int compare_tl_only); int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2); diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 040b9ca2b95..54c3fe4a74a 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -303,7 +303,7 @@ ucp_wireup_ep_configs_use_same_lane(ucp_ep_config_key_t *ep_config_key, ++another_lane) { if (ucp_ep_config_lane_is_equal(ep_config_key, another_ep_config_key, - lane, another_lane, 0)) { + lane, another_lane, 1)) { return another_lane; } } diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index ed83af37800..695a78e335e 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -51,7 +51,7 @@ ucp_wireup_tmp_ep_destroy_complete_cb(void *request, ucs_status_t status, /* check for NULL pointer to workaround Coverity warning (it wrongly * assumes that this callback could be called upon GET/PUT operation) */ - ucs_assertv_always(wireup_ep == NULL, + ucs_assertv_always(wireup_ep != NULL, "req=%p: user_data passed to the TMP EP destroy cb " "mustn't be NULL", (ucp_request_t*)request - 1); @@ -81,6 +81,10 @@ static void ucp_wireup_tmp_ep_flushed_cb(ucp_request_t *req) uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; ucp_ep_h tmp_ep = req->send.ep; + /* schedule the destorying of TMP EP lanes to the main thread to + * not destroy UCT EP from the UCT EP flush callback, since + * UCT EP maybe touched during the progress after calling UCT + * flush completion callback */ uct_worker_progress_register_safe(tmp_ep->worker->uct, ucp_wireup_tmp_ep_disconnect_progress, req, UCS_CALLBACKQ_FLAG_ONESHOT, &cb_id); From 4e1c6264e7bf92a845af4b5aef5ae676bf90cf80 Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Mon, 31 Aug 2020 21:05:30 +0300 Subject: [PATCH 5/8] UCP/WIREUP: Use dst_dev_index instead of dst_md_index --- src/ucp/core/ucp_ep.c | 31 ++++++++++++++++--------------- src/ucp/core/ucp_ep.h | 38 +++++++++++++++++++------------------- src/ucp/wireup/select.c | 34 ++++++++++++++++++++++------------ src/ucp/wireup/wireup.c | 6 +++--- 4 files changed, 60 insertions(+), 49 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 12d641d9d7c..5f19bef1492 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1014,21 +1014,22 @@ void ucp_ep_destroy(ucp_ep_h ep) return; } -int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, - const ucp_ep_config_key_t *key2, - ucp_lane_index_t lane1, - ucp_lane_index_t lane2, - int compare_tl_only) +int ucp_ep_config_lane_tl_is_equal(const ucp_ep_config_key_t *key1, + const ucp_ep_config_key_t *key2, + ucp_lane_index_t lane1, + ucp_lane_index_t lane2) { - return compare_tl_only ? - /* compare only TL configuration indexes */ - ((key1->lanes[lane1].rsc_index == key2->lanes[lane2].rsc_index) && - (key1->lanes[lane1].dst_md_index == key2->lanes[lane2].dst_md_index) && - (key1->lanes[lane1].path_index == key2->lanes[lane2].path_index)) : - /* compare the whole TL lane configurations, i.e. it should be - * the same TLs and they should point to the same proxy lane and - * have the same lane types */ - !memcmp(&key1->lanes[lane1], &key2->lanes[lane2], + return (key1->lanes[lane1].rsc_index == key2->lanes[lane2].rsc_index) && + (key1->lanes[lane1].dst_dev_index == key2->lanes[lane2].dst_dev_index) && + (key1->lanes[lane1].path_index == key2->lanes[lane2].path_index); +} + +static int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, + const ucp_ep_config_key_t *key2, + ucp_lane_index_t lane1, + ucp_lane_index_t lane2) +{ + return !memcmp(&key1->lanes[lane1], &key2->lanes[lane2], sizeof(*key1->lanes)); } @@ -1058,7 +1059,7 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, } for (lane = 0; lane < key1->num_lanes; ++lane) { - if (!ucp_ep_config_lane_is_equal(key1, key2, lane, lane, 0)) + if (!ucp_ep_config_lane_is_equal(key1, key2, lane, lane)) { return 0; } diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index b324e159f52..d8879e389ad 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -113,23 +113,24 @@ enum { */ struct ucp_ep_config_key { - ucp_lane_index_t num_lanes; /* Number of active lanes */ + ucp_lane_index_t num_lanes; /* Number of active lanes */ struct { - ucp_rsc_index_t rsc_index; /* Resource index */ - ucp_lane_index_t proxy_lane; /* UCP_NULL_LANE - no proxy - otherwise - in which lane the real - transport endpoint is stored */ - ucp_md_index_t dst_md_index; /* Destination memory domain index */ - uint8_t path_index; /* Device path index */ - ucp_lane_type_mask_t lane_types; /* Which types of operations this lane - was selected for */ + ucp_rsc_index_t rsc_index; /* Resource index */ + ucp_lane_index_t proxy_lane; /* UCP_NULL_LANE - no proxy + otherwise - in which lane the real + transport endpoint is stored */ + ucp_md_index_t dst_md_index; /* Destination memory domain index */ + ucp_rsc_index_t dst_dev_index; /* Destination device index */ + uint8_t path_index; /* Device path index */ + ucp_lane_type_mask_t lane_types; /* Which types of operations this lane + was selected for */ } lanes[UCP_MAX_LANES]; - ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */ - ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */ - ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */ - ucp_lane_index_t cm_lane; /* Lane for holding a CM connection (can be NULL) */ + ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */ + ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */ + ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */ + ucp_lane_index_t cm_lane; /* Lane for holding a CM connection (can be NULL) */ /* Lanes for remote memory access, sorted by priority, highest first */ ucp_lane_index_t rma_lanes[UCP_MAX_LANES]; @@ -450,7 +451,7 @@ struct ucp_wireup_sockaddr_data { uint8_t addr_mode; /**< The attached address format defined by UCP_WIREUP_SA_DATA_xx */ - uint8_t dev_index; /**< Device address index used to + ucp_rsc_index_t dev_index; /**< Device address index used to build remote address in UCP_WIREUP_SA_DATA_CM_ADDR mode */ @@ -536,11 +537,10 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config, void ucp_ep_config_cleanup(ucp_worker_h worker, ucp_ep_config_t *config); -int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, - const ucp_ep_config_key_t *key2, - ucp_lane_index_t lane1, - ucp_lane_index_t lane2, - int compare_tl_only); +int ucp_ep_config_lane_tl_is_equal(const ucp_ep_config_key_t *key1, + const ucp_ep_config_key_t *key2, + ucp_lane_index_t lane1, + ucp_lane_index_t lane2); int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2); diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 66d6fdb31b3..9ad1ed90bcd 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -47,6 +47,7 @@ typedef struct { unsigned path_index; ucp_lane_index_t proxy_lane; ucp_md_index_t dst_md_index; + ucp_rsc_index_t dst_dev_index; ucp_lane_type_mask_t lane_types; double score[UCP_LANE_TYPE_LAST]; } ucp_wireup_lane_desc_t; @@ -471,6 +472,7 @@ static inline double ucp_wireup_tl_iface_latency(ucp_context_h context, static UCS_F_NOINLINE ucs_status_t ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info, ucp_md_index_t dst_md_index, + ucp_rsc_index_t dst_dev_index, ucp_lane_type_t lane_type, int is_proxy, ucp_wireup_select_context_t *select_ctx) { @@ -490,9 +492,12 @@ ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info, (lane_desc->path_index == select_info->path_index)) { lane = lane_desc - select_ctx->lane_descs; - ucs_assertv_always(dst_md_index == lane_desc->dst_md_index, + ucs_assertv_always(dst_dev_index == lane_desc->dst_dev_index, "lane[%d].dst_md_index=%d, dst_md_index=%d", lane, lane_desc->dst_md_index, dst_md_index); + ucs_assertv_always(dst_md_index == lane_desc->dst_md_index, + "lane[%d].dst_dev_index=%d, dst_dev_index=%d", + lane, lane_desc->dst_dev_index, dst_dev_index); ucs_assertv_always(!(lane_desc->lane_types & UCS_BIT(lane_type)), "lane[%d]=0x%x |= 0x%x", lane, lane_desc->lane_types, lane_type); @@ -532,12 +537,13 @@ ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info, lane_desc = &select_ctx->lane_descs[select_ctx->num_lanes]; ++select_ctx->num_lanes; - lane_desc->rsc_index = select_info->rsc_index; - lane_desc->addr_index = select_info->addr_index; - lane_desc->path_index = select_info->path_index; - lane_desc->proxy_lane = proxy_lane; - lane_desc->dst_md_index = dst_md_index; - lane_desc->lane_types = UCS_BIT(lane_type); + lane_desc->rsc_index = select_info->rsc_index; + lane_desc->addr_index = select_info->addr_index; + lane_desc->path_index = select_info->path_index; + lane_desc->proxy_lane = proxy_lane; + lane_desc->dst_md_index = dst_md_index; + lane_desc->dst_dev_index = dst_dev_index; + lane_desc->lane_types = UCS_BIT(lane_type); for (lane_type_iter = 0; lane_type_iter < UCP_LANE_TYPE_LAST; ++lane_type_iter) { lane_desc->score[lane_type_iter] = 0.0; @@ -566,6 +572,7 @@ ucp_wireup_add_lane(const ucp_wireup_select_params_t *select_params, int is_proxy = 0; ucp_md_index_t dst_md_index; uint64_t remote_event_flags; + ucp_rsc_index_t dst_dev_index; if ((lane_type == UCP_LANE_TYPE_AM) || (lane_type == UCP_LANE_TYPE_AM_BW) || (lane_type == UCP_LANE_TYPE_TAG)) { @@ -580,10 +587,12 @@ ucp_wireup_add_lane(const ucp_wireup_select_params_t *select_params, remote_event_flags); } - dst_md_index = select_params->address->address_list - [select_info->addr_index].md_index; - return ucp_wireup_add_lane_desc(select_info, dst_md_index, lane_type, - is_proxy, select_ctx); + dst_md_index = select_params->address->address_list + [select_info->addr_index].md_index; + dst_dev_index = select_params->address->address_list + [select_info->addr_index].dev_index; + return ucp_wireup_add_lane_desc(select_info, dst_md_index, dst_dev_index, + lane_type, is_proxy, select_ctx); } static int ucp_wireup_compare_score(const void *elem1, const void *elem2, @@ -834,7 +843,8 @@ ucp_wireup_add_cm_lane(const ucp_wireup_select_params_t *select_params, /* server is not a proxy because it can create all lanes connected */ return ucp_wireup_add_lane_desc(&select_info, UCP_NULL_RESOURCE, - UCP_LANE_TYPE_CM, 0, select_ctx); + UCP_NULL_RESOURCE, UCP_LANE_TYPE_CM, + 0, select_ctx); } static ucs_status_t diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 54c3fe4a74a..21d809e2482 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -301,9 +301,9 @@ ucp_wireup_ep_configs_use_same_lane(ucp_ep_config_key_t *ep_config_key, for (another_lane = 0; another_lane < another_ep_config_key->num_lanes; ++another_lane) { - if (ucp_ep_config_lane_is_equal(ep_config_key, - another_ep_config_key, - lane, another_lane, 1)) { + if (ucp_ep_config_lane_tl_is_equal(ep_config_key, + another_ep_config_key, + lane, another_lane)) { return another_lane; } } From 88b41c8efcaa29d3fc03b855d6adcca79ebf30cf Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Tue, 1 Sep 2020 09:20:52 +0000 Subject: [PATCH 6/8] UCP/CORE/WIREUP: Fix review comments --- src/ucp/core/ucp_ep.c | 6 ++++-- src/ucp/wireup/select.c | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 5f19bef1492..b0f2c147d93 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1029,8 +1029,10 @@ static int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, ucp_lane_index_t lane1, ucp_lane_index_t lane2) { - return !memcmp(&key1->lanes[lane1], &key2->lanes[lane2], - sizeof(*key1->lanes)); + return ucp_ep_config_lane_tl_is_equal(key1, key2, lane1, lane2) && + (key1->lanes[lane1].proxy_lane == key2->lanes[lane2].proxy_lane) && + (key1->lanes[lane1].dst_md_index == key2->lanes[lane2].dst_md_index) && + (key1->lanes[lane1].lane_types == key2->lanes[lane2].lane_types); } int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 9ad1ed90bcd..6641a43efaf 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -492,10 +492,10 @@ ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info, (lane_desc->path_index == select_info->path_index)) { lane = lane_desc - select_ctx->lane_descs; - ucs_assertv_always(dst_dev_index == lane_desc->dst_dev_index, + ucs_assertv_always(dst_md_index == lane_desc->dst_md_index, "lane[%d].dst_md_index=%d, dst_md_index=%d", lane, lane_desc->dst_md_index, dst_md_index); - ucs_assertv_always(dst_md_index == lane_desc->dst_md_index, + ucs_assertv_always(dst_dev_index == lane_desc->dst_dev_index, "lane[%d].dst_dev_index=%d, dst_dev_index=%d", lane, lane_desc->dst_dev_index, dst_dev_index); ucs_assertv_always(!(lane_desc->lane_types & UCS_BIT(lane_type)), From 9df23461eedb548deaf6b23b265530fda8fae2c6 Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Tue, 1 Sep 2020 21:19:06 +0000 Subject: [PATCH 7/8] UCP/WIREUP/CORE: Fix review comments --- src/ucp/core/ucp_ep.c | 4 ++-- src/ucp/core/ucp_ep.h | 4 ++-- src/ucp/wireup/select.c | 19 ++++++++----------- src/ucp/wireup/wireup.c | 19 ++++++++----------- src/ucp/wireup/wireup.h | 6 +++--- src/ucp/wireup/wireup_ep.c | 13 +++++++++---- 6 files changed, 32 insertions(+), 33 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index b0f2c147d93..5abca4d9eb1 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1015,8 +1015,8 @@ void ucp_ep_destroy(ucp_ep_h ep) } int ucp_ep_config_lane_tl_is_equal(const ucp_ep_config_key_t *key1, - const ucp_ep_config_key_t *key2, ucp_lane_index_t lane1, + const ucp_ep_config_key_t *key2, ucp_lane_index_t lane2) { return (key1->lanes[lane1].rsc_index == key2->lanes[lane2].rsc_index) && @@ -1029,7 +1029,7 @@ static int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, ucp_lane_index_t lane1, ucp_lane_index_t lane2) { - return ucp_ep_config_lane_tl_is_equal(key1, key2, lane1, lane2) && + return ucp_ep_config_lane_tl_is_equal(key1, lane1, key2, lane2) && (key1->lanes[lane1].proxy_lane == key2->lanes[lane2].proxy_lane) && (key1->lanes[lane1].dst_md_index == key2->lanes[lane2].dst_md_index) && (key1->lanes[lane1].lane_types == key2->lanes[lane2].lane_types); diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index d8879e389ad..eb63bac12da 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -451,7 +451,7 @@ struct ucp_wireup_sockaddr_data { uint8_t addr_mode; /**< The attached address format defined by UCP_WIREUP_SA_DATA_xx */ - ucp_rsc_index_t dev_index; /**< Device address index used to + uint8_t dev_index; /**< Device address index used to build remote address in UCP_WIREUP_SA_DATA_CM_ADDR mode */ @@ -538,8 +538,8 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config, void ucp_ep_config_cleanup(ucp_worker_h worker, ucp_ep_config_t *config); int ucp_ep_config_lane_tl_is_equal(const ucp_ep_config_key_t *key1, - const ucp_ep_config_key_t *key2, ucp_lane_index_t lane1, + const ucp_ep_config_key_t *key2, ucp_lane_index_t lane2); int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 6641a43efaf..e88ca81f5d3 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -569,9 +569,10 @@ ucp_wireup_add_lane(const ucp_wireup_select_params_t *select_params, ucp_lane_type_t lane_type, ucp_wireup_select_context_t *select_ctx) { - int is_proxy = 0; + int is_proxy = 0; + ucp_address_entry_t *addr_entry = &select_params->address->address_list + [select_info->addr_index]; ucp_md_index_t dst_md_index; - uint64_t remote_event_flags; ucp_rsc_index_t dst_dev_index; if ((lane_type == UCP_LANE_TYPE_AM) || (lane_type == UCP_LANE_TYPE_AM_BW) || @@ -580,17 +581,13 @@ ucp_wireup_add_lane(const ucp_wireup_select_params_t *select_params, * deactivate its interface and wait for signaled active message to wake up. * Use a proxy lane which would send the first active message as signaled to * make sure the remote interface will indeed wake up. */ - remote_event_flags = select_params->address->address_list - [select_info->addr_index].iface_attr.event_flags; - is_proxy = ucp_wireup_is_lane_proxy(select_params->ep->worker, - select_info->rsc_index, - remote_event_flags); + is_proxy = ucp_wireup_is_lane_proxy(select_params->ep->worker, + select_info->rsc_index, + addr_entry->iface_attr.event_flags); } - dst_md_index = select_params->address->address_list - [select_info->addr_index].md_index; - dst_dev_index = select_params->address->address_list - [select_info->addr_index].dev_index; + dst_md_index = addr_entry->md_index; + dst_dev_index = addr_entry->dev_index; return ucp_wireup_add_lane_desc(select_info, dst_md_index, dst_dev_index, lane_type, is_proxy, select_ctx); } diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 21d809e2482..e1c35d7b939 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -293,18 +293,15 @@ ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane, } ucp_lane_index_t -ucp_wireup_ep_configs_use_same_lane(ucp_ep_config_key_t *ep_config_key, - ucp_ep_config_key_t *another_ep_config_key, - ucp_lane_index_t lane) +ucp_wireup_ep_configs_use_same_tl_lane(ucp_ep_config_key_t *key1, + ucp_ep_config_key_t *key2, + ucp_lane_index_t lane) { - ucp_lane_index_t another_lane; - - for (another_lane = 0; another_lane < another_ep_config_key->num_lanes; - ++another_lane) { - if (ucp_ep_config_lane_tl_is_equal(ep_config_key, - another_ep_config_key, - lane, another_lane)) { - return another_lane; + ucp_lane_index_t lane_idx; + + for (lane_idx = 0; lane_idx < key2->num_lanes; ++lane_idx) { + if (ucp_ep_config_lane_tl_is_equal(key1, lane, key2, lane_idx)) { + return lane_idx; } } diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index e8a94a9d3d3..79361ed8ca7 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -146,9 +146,9 @@ unsigned ucp_ep_init_flags(const ucp_worker_h worker, const ucp_ep_params_t *params); ucp_lane_index_t -ucp_wireup_ep_configs_use_same_lane(ucp_ep_config_key_t *ep_config_key, - ucp_ep_config_key_t *another_ep_config_key, - ucp_lane_index_t lane); +ucp_wireup_ep_configs_use_same_tl_lane(ucp_ep_config_key_t *key1, + ucp_ep_config_key_t *key2, + ucp_lane_index_t lane); ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index 695a78e335e..db47e683f14 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -115,9 +115,9 @@ int ucp_wireup_tmp_ep_destroy(ucp_ep_h ep, ucp_wireup_ep_t *wireup_ep, for (lane = 0; lane < ucp_ep_num_lanes(tmp_ep); ++lane) { if (tmp_ep->uct_eps[lane] != NULL) { found_lane = - ucp_wireup_ep_configs_use_same_lane(&ucp_ep_config(tmp_ep)->key, - &ucp_ep_config(ep)->key, - lane); + ucp_wireup_ep_configs_use_same_tl_lane(&ucp_ep_config(tmp_ep)->key, + &ucp_ep_config(ep)->key, + lane); if (found_lane != UCP_NULL_LANE) { uct_ep = tmp_ep->uct_eps[lane]; ucs_assert(ucp_wireup_ep_test(uct_ep) && @@ -496,6 +496,7 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_wireup_ep_t) { ucp_ep_h ucp_ep = self->super.ucp_ep; ucp_worker_h worker = ucp_ep->worker; + int UCS_V_UNUSED ret; ucs_assert(ucs_queue_is_empty(&self->pending_q)); ucs_assert(self->pending_count == 0); @@ -515,7 +516,11 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_wireup_ep_t) UCS_ASYNC_BLOCK(&worker->async); if (self->tmp_ep != NULL) { ucs_assert(!(self->tmp_ep->flags & UCP_EP_FLAG_USED)); - ucp_wireup_tmp_ep_destroy(ucp_ep, self, UCT_FLUSH_FLAG_CANCEL, NULL); + /* TODO: replace by ucp_worker_discard_uct_ep() with FLUSH_CANCEL flag, + * since the TMP EP will never exist after completing the destroying + * of the TMP EP */ + ret = ucp_wireup_tmp_ep_destroy(ucp_ep, self, UCT_FLUSH_FLAG_CANCEL, NULL); + ucs_assert(ret == 1); } --worker->flush_ops_count; From b94d40610df46749ec964e6a673bafdbdd3feed4 Mon Sep 17 00:00:00 2001 From: dmitrygx Date: Wed, 2 Sep 2020 11:28:56 +0000 Subject: [PATCH 8/8] PREPARE --- src/ucp/core/ucp_ep.c | 21 ++++++++++----------- src/ucp/core/ucp_ep.h | 8 ++++---- src/ucp/wireup/wireup.c | 8 ++++---- src/ucp/wireup/wireup.h | 6 +++--- src/ucp/wireup/wireup_cm.c | 5 +++++ src/ucp/wireup/wireup_ep.c | 6 +++--- 6 files changed, 29 insertions(+), 25 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 79e327ef34e..7b8af93392a 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1014,10 +1014,10 @@ void ucp_ep_destroy(ucp_ep_h ep) return; } -int ucp_ep_config_lane_tl_is_equal(const ucp_ep_config_key_t *key1, - ucp_lane_index_t lane1, - const ucp_ep_config_key_t *key2, - ucp_lane_index_t lane2) +int ucp_ep_config_lane_is_same_peer(const ucp_ep_config_key_t *key1, + ucp_lane_index_t lane1, + const ucp_ep_config_key_t *key2, + ucp_lane_index_t lane2) { return (key1->lanes[lane1].rsc_index == key2->lanes[lane2].rsc_index) && (key1->lanes[lane1].dst_dev_index == key2->lanes[lane2].dst_dev_index) && @@ -1026,13 +1026,12 @@ int ucp_ep_config_lane_tl_is_equal(const ucp_ep_config_key_t *key1, static int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2, - ucp_lane_index_t lane1, - ucp_lane_index_t lane2) + ucp_lane_index_t lane) { - return ucp_ep_config_lane_tl_is_equal(key1, lane1, key2, lane2) && - (key1->lanes[lane1].proxy_lane == key2->lanes[lane2].proxy_lane) && - (key1->lanes[lane1].dst_md_index == key2->lanes[lane2].dst_md_index) && - (key1->lanes[lane1].lane_types == key2->lanes[lane2].lane_types); + return ucp_ep_config_lane_is_same_peer(key1, lane, key2, lane) && + (key1->lanes[lane].proxy_lane == key2->lanes[lane].proxy_lane) && + (key1->lanes[lane].dst_md_index == key2->lanes[lane].dst_md_index) && + (key1->lanes[lane].lane_types == key2->lanes[lane].lane_types); } int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, @@ -1061,7 +1060,7 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, } for (lane = 0; lane < key1->num_lanes; ++lane) { - if (!ucp_ep_config_lane_is_equal(key1, key2, lane, lane)) + if (!ucp_ep_config_lane_is_equal(key1, key2, lane)) { return 0; } diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index 67f02d1e689..410537fb7ac 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -538,10 +538,10 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config, void ucp_ep_config_cleanup(ucp_worker_h worker, ucp_ep_config_t *config); -int ucp_ep_config_lane_tl_is_equal(const ucp_ep_config_key_t *key1, - ucp_lane_index_t lane1, - const ucp_ep_config_key_t *key2, - ucp_lane_index_t lane2); +int ucp_ep_config_lane_is_same_peer(const ucp_ep_config_key_t *key1, + ucp_lane_index_t lane1, + const ucp_ep_config_key_t *key2, + ucp_lane_index_t lane2); int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2); diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index fc26b36799b..c0f961b5aed 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -293,14 +293,14 @@ ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane, } ucp_lane_index_t -ucp_wireup_ep_configs_use_same_tl_lane(ucp_ep_config_key_t *key1, - ucp_ep_config_key_t *key2, - ucp_lane_index_t lane) +ucp_wireup_ep_configs_can_reuse_lane(ucp_ep_config_key_t *key1, + ucp_ep_config_key_t *key2, + ucp_lane_index_t lane) { ucp_lane_index_t lane_idx; for (lane_idx = 0; lane_idx < key2->num_lanes; ++lane_idx) { - if (ucp_ep_config_lane_tl_is_equal(key1, lane, key2, lane_idx)) { + if (ucp_ep_config_lane_is_same_peer(key1, lane, key2, lane_idx)) { return lane_idx; } } diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index c4c02daae17..0eeaa4971e3 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -146,9 +146,9 @@ unsigned ucp_ep_init_flags(const ucp_worker_h worker, const ucp_ep_params_t *params); ucp_lane_index_t -ucp_wireup_ep_configs_use_same_tl_lane(ucp_ep_config_key_t *key1, - ucp_ep_config_key_t *key2, - ucp_lane_index_t lane); +ucp_wireup_ep_configs_can_reuse_lane(ucp_ep_config_key_t *key1, + ucp_ep_config_key_t *key2, + ucp_lane_index_t lane); ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, diff --git a/src/ucp/wireup/wireup_cm.c b/src/ucp/wireup/wireup_cm.c index fdda747d5a3..2e17f776cd3 100644 --- a/src/ucp/wireup/wireup_cm.c +++ b/src/ucp/wireup/wireup_cm.c @@ -198,6 +198,11 @@ static void uct_wireup_cm_tmp_ep_cleanup(ucp_ep_h tmp_ep, ucs_queue_head_t *queu continue; } + /* to prevent flush+destroy UCT EPs that are used by the main EP, + * they have to be removed from the TMP EP lanes and their WIREUP + * EPs have to be destroyed */ + + /* transfer the pending queues content from the previous tmp_ep to * a temporary queue */ uct_ep_pending_purge(tmp_ep->uct_eps[lane_idx], diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index 3cd8913e166..d674b749623 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -116,9 +116,9 @@ int ucp_wireup_tmp_ep_destroy(ucp_ep_h ep, ucp_wireup_ep_t *wireup_ep, for (lane = 0; lane < ucp_ep_num_lanes(tmp_ep); ++lane) { if (tmp_ep->uct_eps[lane] != NULL) { found_lane = - ucp_wireup_ep_configs_use_same_tl_lane(&ucp_ep_config(tmp_ep)->key, - &ucp_ep_config(ep)->key, - lane); + ucp_wireup_ep_configs_can_reuse_lane(&ucp_ep_config(tmp_ep)->key, + &ucp_ep_config(ep)->key, + lane); if (found_lane != UCP_NULL_LANE) { uct_ep = tmp_ep->uct_eps[lane]; ucs_assert(ucp_wireup_ep_test(uct_ep) &&