diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 4bf61b9cad9..7b8af93392a 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1014,16 +1014,24 @@ void ucp_ep_destroy(ucp_ep_h ep) return; } -int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, - const ucp_ep_config_key_t *key2, - ucp_lane_index_t lane, int compare_types) +int ucp_ep_config_lane_is_same_peer(const ucp_ep_config_key_t *key1, + ucp_lane_index_t lane1, + const ucp_ep_config_key_t *key2, + ucp_lane_index_t lane2) { - return (key1->lanes[lane].rsc_index == key2->lanes[lane].rsc_index) && + return (key1->lanes[lane1].rsc_index == key2->lanes[lane2].rsc_index) && + (key1->lanes[lane1].dst_dev_index == key2->lanes[lane2].dst_dev_index) && + (key1->lanes[lane1].path_index == key2->lanes[lane2].path_index); +} + +static int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, + const ucp_ep_config_key_t *key2, + ucp_lane_index_t lane) +{ + return ucp_ep_config_lane_is_same_peer(key1, lane, key2, lane) && (key1->lanes[lane].proxy_lane == key2->lanes[lane].proxy_lane) && (key1->lanes[lane].dst_md_index == key2->lanes[lane].dst_md_index) && - (key1->lanes[lane].path_index == key2->lanes[lane].path_index) && - ((key1->lanes[lane].lane_types == key2->lanes[lane].lane_types) || - !compare_types); + (key1->lanes[lane].lane_types == key2->lanes[lane].lane_types); } int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, @@ -1052,7 +1060,7 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, } for (lane = 0; lane < key1->num_lanes; ++lane) { - if (!ucp_ep_config_lane_is_equal(key1, key2, lane, 1)) + if (!ucp_ep_config_lane_is_equal(key1, key2, lane)) { return 0; } diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index b5938063eba..410537fb7ac 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -113,23 +113,24 @@ enum { */ struct ucp_ep_config_key { - ucp_lane_index_t num_lanes; /* Number of active lanes */ + ucp_lane_index_t num_lanes; /* Number of active lanes */ struct { - ucp_rsc_index_t rsc_index; /* Resource index */ - ucp_lane_index_t proxy_lane; /* UCP_NULL_LANE - no proxy - otherwise - in which lane the real - transport endpoint is stored */ - ucp_md_index_t dst_md_index; /* Destination memory domain index */ - uint8_t path_index; /* Device path index */ - ucp_lane_type_mask_t lane_types; /* Which types of operations this lane - was selected for */ + ucp_rsc_index_t rsc_index; /* Resource index */ + ucp_lane_index_t proxy_lane; /* UCP_NULL_LANE - no proxy + otherwise - in which lane the real + transport endpoint is stored */ + ucp_md_index_t dst_md_index; /* Destination memory domain index */ + ucp_rsc_index_t dst_dev_index; /* Destination device index */ + uint8_t path_index; /* Device path index */ + ucp_lane_type_mask_t lane_types; /* Which types of operations this lane + was selected for */ } lanes[UCP_MAX_LANES]; - ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */ - ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */ - ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */ - ucp_lane_index_t cm_lane; /* Lane for holding a CM connection (can be NULL) */ + ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */ + ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */ + ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */ + ucp_lane_index_t cm_lane; /* Lane for holding a CM connection (can be NULL) */ /* Lanes for remote memory access, sorted by priority, highest first */ ucp_lane_index_t rma_lanes[UCP_MAX_LANES]; @@ -537,9 +538,10 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config, void ucp_ep_config_cleanup(ucp_worker_h worker, ucp_ep_config_t *config); -int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1, - const ucp_ep_config_key_t *key2, - ucp_lane_index_t lane, int compare_types); +int ucp_ep_config_lane_is_same_peer(const ucp_ep_config_key_t *key1, + ucp_lane_index_t lane1, + const ucp_ep_config_key_t *key2, + ucp_lane_index_t lane2); int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1, const ucp_ep_config_key_t *key2); diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index ce193d1a058..c61ca86ac51 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -47,6 +47,7 @@ typedef struct { unsigned path_index; ucp_lane_index_t proxy_lane; ucp_md_index_t dst_md_index; + ucp_rsc_index_t dst_dev_index; ucp_lane_type_mask_t lane_types; double score[UCP_LANE_TYPE_LAST]; } ucp_wireup_lane_desc_t; @@ -471,6 +472,7 @@ static inline double ucp_wireup_tl_iface_latency(ucp_context_h context, static UCS_F_NOINLINE ucs_status_t ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info, ucp_md_index_t dst_md_index, + ucp_rsc_index_t dst_dev_index, ucp_lane_type_t lane_type, int is_proxy, ucp_wireup_select_context_t *select_ctx) { @@ -493,6 +495,9 @@ ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info, ucs_assertv_always(dst_md_index == lane_desc->dst_md_index, "lane[%d].dst_md_index=%d, dst_md_index=%d", lane, lane_desc->dst_md_index, dst_md_index); + ucs_assertv_always(dst_dev_index == lane_desc->dst_dev_index, + "lane[%d].dst_dev_index=%d, dst_dev_index=%d", + lane, lane_desc->dst_dev_index, dst_dev_index); ucs_assertv_always(!(lane_desc->lane_types & UCS_BIT(lane_type)), "lane[%d]=0x%x |= 0x%x", lane, lane_desc->lane_types, lane_type); @@ -532,12 +537,13 @@ ucp_wireup_add_lane_desc(const ucp_wireup_select_info_t *select_info, lane_desc = &select_ctx->lane_descs[select_ctx->num_lanes]; ++select_ctx->num_lanes; - lane_desc->rsc_index = select_info->rsc_index; - lane_desc->addr_index = select_info->addr_index; - lane_desc->path_index = select_info->path_index; - lane_desc->proxy_lane = proxy_lane; - lane_desc->dst_md_index = dst_md_index; - lane_desc->lane_types = UCS_BIT(lane_type); + lane_desc->rsc_index = select_info->rsc_index; + lane_desc->addr_index = select_info->addr_index; + lane_desc->path_index = select_info->path_index; + lane_desc->proxy_lane = proxy_lane; + lane_desc->dst_md_index = dst_md_index; + lane_desc->dst_dev_index = dst_dev_index; + lane_desc->lane_types = UCS_BIT(lane_type); for (lane_type_iter = 0; lane_type_iter < UCP_LANE_TYPE_LAST; ++lane_type_iter) { lane_desc->score[lane_type_iter] = 0.0; @@ -563,9 +569,11 @@ ucp_wireup_add_lane(const ucp_wireup_select_params_t *select_params, ucp_lane_type_t lane_type, ucp_wireup_select_context_t *select_ctx) { - int is_proxy = 0; + int is_proxy = 0; + ucp_address_entry_t *addr_entry = &select_params->address->address_list + [select_info->addr_index]; ucp_md_index_t dst_md_index; - uint64_t remote_event_flags; + ucp_rsc_index_t dst_dev_index; if ((lane_type == UCP_LANE_TYPE_AM) || (lane_type == UCP_LANE_TYPE_AM_BW) || (lane_type == UCP_LANE_TYPE_TAG)) { @@ -573,17 +581,15 @@ ucp_wireup_add_lane(const ucp_wireup_select_params_t *select_params, * deactivate its interface and wait for signaled active message to wake up. * Use a proxy lane which would send the first active message as signaled to * make sure the remote interface will indeed wake up. */ - remote_event_flags = select_params->address->address_list - [select_info->addr_index].iface_attr.event_flags; - is_proxy = ucp_wireup_is_lane_proxy(select_params->ep->worker, - select_info->rsc_index, - remote_event_flags); + is_proxy = ucp_wireup_is_lane_proxy(select_params->ep->worker, + select_info->rsc_index, + addr_entry->iface_attr.event_flags); } - dst_md_index = select_params->address->address_list - [select_info->addr_index].md_index; - return ucp_wireup_add_lane_desc(select_info, dst_md_index, lane_type, - is_proxy, select_ctx); + dst_md_index = addr_entry->md_index; + dst_dev_index = addr_entry->dev_index; + return ucp_wireup_add_lane_desc(select_info, dst_md_index, dst_dev_index, + lane_type, is_proxy, select_ctx); } static int ucp_wireup_compare_score(const void *elem1, const void *elem2, @@ -833,7 +839,8 @@ ucp_wireup_add_cm_lane(const ucp_wireup_select_params_t *select_params, /* server is not a proxy because it can create all lanes connected */ return ucp_wireup_add_lane_desc(&select_info, UCP_NULL_RESOURCE, - UCP_LANE_TYPE_CM, 0, select_ctx); + UCP_NULL_RESOURCE, UCP_LANE_TYPE_CM, + 0, select_ctx); } static ucs_status_t diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index db27bc74ffa..c0f961b5aed 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -271,9 +271,9 @@ ucp_wireup_match_p2p_lanes(ucp_ep_h ep, static ucs_status_t ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane, - const ucp_unpacked_address_t *remote_address, - const uct_ep_addr_t **ep_addr_p, - const uct_device_addr_t **dev_addr_p) + const ucp_unpacked_address_t *remote_address, + const uct_ep_addr_t **ep_addr_p, + const uct_device_addr_t **dev_addr_p) { const ucp_address_entry_t *address; unsigned ep_addr_index; @@ -292,6 +292,22 @@ ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane, return UCS_ERR_UNREACHABLE; } +ucp_lane_index_t +ucp_wireup_ep_configs_can_reuse_lane(ucp_ep_config_key_t *key1, + ucp_ep_config_key_t *key2, + ucp_lane_index_t lane) +{ + ucp_lane_index_t lane_idx; + + for (lane_idx = 0; lane_idx < key2->num_lanes; ++lane_idx) { + if (ucp_ep_config_lane_is_same_peer(key1, lane, key2, lane_idx)) { + return lane_idx; + } + } + + return UCP_NULL_LANE; +} + ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, const ucp_unpacked_address_t *remote_address, diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index 5333aa77d1a..0eeaa4971e3 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -145,6 +145,11 @@ void ucp_wireup_remote_connected(ucp_ep_h ep); unsigned ucp_ep_init_flags(const ucp_worker_h worker, const ucp_ep_params_t *params); +ucp_lane_index_t +ucp_wireup_ep_configs_can_reuse_lane(ucp_ep_config_key_t *key1, + ucp_ep_config_key_t *key2, + ucp_lane_index_t lane); + ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, const ucp_unpacked_address_t *remote_address, diff --git a/src/ucp/wireup/wireup_cm.c b/src/ucp/wireup/wireup_cm.c index fdda747d5a3..2e17f776cd3 100644 --- a/src/ucp/wireup/wireup_cm.c +++ b/src/ucp/wireup/wireup_cm.c @@ -198,6 +198,11 @@ static void uct_wireup_cm_tmp_ep_cleanup(ucp_ep_h tmp_ep, ucs_queue_head_t *queu continue; } + /* to prevent flush+destroy UCT EPs that are used by the main EP, + * they have to be removed from the TMP EP lanes and their WIREUP + * EPs have to be destroyed */ + + /* transfer the pending queues content from the previous tmp_ep to * a temporary queue */ uct_ep_pending_purge(tmp_ep->uct_eps[lane_idx], diff --git a/src/ucp/wireup/wireup_ep.c b/src/ucp/wireup/wireup_ep.c index 53c91469e72..d674b749623 100644 --- a/src/ucp/wireup/wireup_ep.c +++ b/src/ucp/wireup/wireup_ep.c @@ -44,6 +44,119 @@ ucp_wireup_ep_connect_to_ep(uct_ep_h uct_ep, const uct_device_addr_t *dev_addr, return uct_ep_connect_to_ep(wireup_ep->super.uct_ep, dev_addr, ep_addr); } +static void +ucp_wireup_tmp_ep_destroy_complete_cb(void *request, ucs_status_t status, + void *user_data) +{ + ucp_wireup_ep_t *wireup_ep = (ucp_wireup_ep_t*)user_data; + + /* check for NULL pointer to workaround Coverity warning (it wrongly + * assumes that this callback could be called upon GET/PUT operation) */ + ucs_assertv_always(wireup_ep != NULL, + "req=%p: user_data passed to the TMP EP destroy cb " + "mustn't be NULL", (ucp_request_t*)request - 1); + + wireup_ep->flags &= ~UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP; + ucs_assert(wireup_ep->tmp_ep == NULL); + ucp_request_release(request); +} + +static unsigned ucp_wireup_tmp_ep_disconnect_progress(void *arg) +{ + ucp_request_t *req = (ucp_request_t*)arg; + ucp_ep_h tmp_ep = req->send.ep; + ucp_worker_h worker = tmp_ep->worker; + ucs_async_context_t *async = &worker->async; + + UCS_ASYNC_BLOCK(async); + ucp_ep_disconnected(tmp_ep, 1); + --worker->flush_ops_count; + ucp_request_complete_send(req, req->status); + UCS_ASYNC_UNBLOCK(async); + + return 1; +} + +static void ucp_wireup_tmp_ep_flushed_cb(ucp_request_t *req) +{ + uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; + ucp_ep_h tmp_ep = req->send.ep; + + /* schedule the destorying of TMP EP lanes to the main thread to + * not destroy UCT EP from the UCT EP flush callback, since + * UCT EP maybe touched during the progress after calling UCT + * flush completion callback */ + uct_worker_progress_register_safe(tmp_ep->worker->uct, + ucp_wireup_tmp_ep_disconnect_progress, + req, UCS_CALLBACKQ_FLAG_ONESHOT, &cb_id); +} + +/* the following values could be returned from the function: + * - true: destroying of the TMP EP was completed inplace, if the complete_cb + * was specified, it wouldn't be called + * - false: destroying of the TMP EP is in progress now, if the complete_cb + * was specified, it would be called upon completion the destroying + * of the TMP EP */ +int ucp_wireup_tmp_ep_destroy(ucp_ep_h ep, ucp_wireup_ep_t *wireup_ep, + unsigned ep_flush_flags, + ucp_send_nbx_callback_t complete_cb) +{ + ucp_ep_h tmp_ep = wireup_ep->tmp_ep; + ucp_worker_h worker = tmp_ep->worker; + ucp_request_param_t param = ucp_request_null_param; + ucp_lane_index_t lane, found_lane; + uct_ep_h uct_ep; + void *req; + + ucs_assert(tmp_ep != ep); + + /* to prevent flush+destroy UCT EPs that are used by the main EP, + * they have to be removed from the TMP EP lanes and their WIREUP + * EPs have to be destroyed */ + for (lane = 0; lane < ucp_ep_num_lanes(tmp_ep); ++lane) { + if (tmp_ep->uct_eps[lane] != NULL) { + found_lane = + ucp_wireup_ep_configs_can_reuse_lane(&ucp_ep_config(tmp_ep)->key, + &ucp_ep_config(ep)->key, + lane); + if (found_lane != UCP_NULL_LANE) { + uct_ep = tmp_ep->uct_eps[lane]; + ucs_assert(ucp_wireup_ep_test(uct_ep) && + !ucp_wireup_ep(uct_ep)->super.is_owner); + + ucs_debug("ep %p: destroy uct_ep[%d]=%p", ep, lane, uct_ep); + uct_ep_destroy(uct_ep); + tmp_ep->uct_eps[lane] = NULL; + } + } + } + + if (complete_cb != NULL) { + param.op_attr_mask |= UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_USER_DATA; + param.cb.send = complete_cb; + param.user_data = wireup_ep; + } + + wireup_ep->tmp_ep = NULL; + + req = ucp_ep_flush_internal(tmp_ep, ep_flush_flags, 0, ¶m, NULL, + ucp_wireup_tmp_ep_flushed_cb, + "tmp_ep_flushed_cb"); + if (req != NULL) { + if (!UCS_PTR_IS_ERR(req)) { + ++worker->flush_ops_count; + return 0; + } + + ucs_error("ucp_ep_flush_internal() completed with error: %s", + ucs_status_string(UCS_PTR_STATUS(req))); + } + + ucp_ep_disconnected(tmp_ep, 1); + return 1; +} + void ucp_wireup_ep_replay_pending_requests(ucp_ep_h ucp_ep, ucs_queue_head_t *tmp_pending_queue) { @@ -66,9 +179,10 @@ void ucp_wireup_ep_replay_pending_requests(ucp_ep_h ucp_ep, static unsigned ucp_wireup_ep_progress(void *arg) { ucp_wireup_ep_t *wireup_ep = arg; - ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; + ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; ucs_queue_head_t tmp_pending_queue; uct_pending_req_t *uct_req; + int ret; UCS_ASYNC_BLOCK(&ucp_ep->worker->async); @@ -90,6 +204,18 @@ static unsigned ucp_wireup_ep_progress(void *arg) goto out_unblock; } + if (wireup_ep->tmp_ep != NULL) { + ret = ucp_wireup_tmp_ep_destroy(ucp_ep, wireup_ep, UCT_FLUSH_FLAG_LOCAL, + ucp_wireup_tmp_ep_destroy_complete_cb); + if (!ret) { + wireup_ep->flags |= UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP; + goto out_unblock; + } + } else if (wireup_ep->flags & UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP) { + /* destroying of TMP EP is in progress, return from the function */ + goto out_unblock; + } + ucs_trace("ep %p: switching wireup_ep %p to ready state", ucp_ep, wireup_ep); /* Move wireup pending queue to temporary queue and remove references to @@ -381,6 +507,7 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_wireup_ep_t) { ucp_ep_h ucp_ep = self->super.ucp_ep; ucp_worker_h worker = ucp_ep->worker; + int UCS_V_UNUSED ret; ucs_assert(ucs_queue_is_empty(&self->pending_q)); ucs_assert(self->pending_count == 0); @@ -397,12 +524,16 @@ static UCS_CLASS_CLEANUP_FUNC(ucp_wireup_ep_t) uct_ep_destroy(self->sockaddr_ep); } + UCS_ASYNC_BLOCK(&worker->async); if (self->tmp_ep != NULL) { ucs_assert(!(self->tmp_ep->flags & UCP_EP_FLAG_USED)); - ucp_ep_disconnected(self->tmp_ep, 1); + /* TODO: replace by ucp_worker_discard_uct_ep() with FLUSH_CANCEL flag, + * since the TMP EP will never exist after completing the destroying + * of the TMP EP */ + ret = ucp_wireup_tmp_ep_destroy(ucp_ep, self, UCT_FLUSH_FLAG_CANCEL, NULL); + ucs_assert(ret == 1); } - UCS_ASYNC_BLOCK(&worker->async); --worker->flush_ops_count; UCS_ASYNC_UNBLOCK(&worker->async); } diff --git a/src/ucp/wireup/wireup_ep.h b/src/ucp/wireup/wireup_ep.h index b2b309b4c19..e6a7c0208c6 100644 --- a/src/ucp/wireup/wireup_ep.h +++ b/src/ucp/wireup/wireup_ep.h @@ -21,6 +21,7 @@ enum { UCP_WIREUP_EP_FLAG_READY = UCS_BIT(0), /**< next_ep is fully connected */ UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(1), /**< Debug: next_ep connected to remote */ + UCP_WIREUP_EP_FLAG_DESTROY_TMP_EP = UCS_BIT(2) /**< wireup ep is in progress to destroy TMP EP */ };