diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index 8beb09fbafe..71b32e4eb83 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -211,6 +211,15 @@ struct ucp_request { uct_worker_cb_id_t prog_id;/* Slow-path callback */ } disconnect; + struct { + ucp_worker_h ucp_worker; /* UCP worker where a discard UCT EP + * operation submitted on */ + uct_ep_h uct_ep; /* UCT EP that should be flushed and + destroyed */ + unsigned ep_flush_flags; /* Flags that should be passed into + @ref uct_ep_flush */ + } discard_uct_ep; + struct { uint64_t remote_addr; /* Remote address */ ucp_rkey_h rkey; /* Remote memory key */ diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 49d1f8a5941..523640438e2 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -102,6 +102,14 @@ ucs_mpool_ops_t ucp_frag_mpool_ops = { }; +#define ucp_worker_discard_uct_ep_hash_key(_uct_ep) \ + kh_int64_hash_func((uintptr_t)(_uct_ep)) + + +KHASH_IMPL(ucp_worker_discard_uct_ep_hash, uct_ep_h, char, 0, + ucp_worker_discard_uct_ep_hash_key, kh_int64_hash_equal); + + static ucs_status_t ucp_worker_wakeup_ctl_fd(ucp_worker_h worker, ucp_worker_event_fd_op_t op, int event_fd) @@ -439,14 +447,18 @@ static unsigned ucp_worker_iface_err_handle_progress(void *arg) /* Purge pending queue */ ucs_trace("ep %p: purge pending on uct_ep[%d]=%p", ucp_ep, lane, ucp_ep->uct_eps[lane]); - uct_ep_pending_purge(ucp_ep->uct_eps[lane], ucp_ep_err_pending_purge, - UCS_STATUS_PTR(status)); if (lane != failed_lane) { ucs_trace("ep %p: destroy uct_ep[%d]=%p", ucp_ep, lane, ucp_ep->uct_eps[lane]); - uct_ep_destroy(ucp_ep->uct_eps[lane]); + ucp_worker_discard_uct_ep(ucp_ep->worker, ucp_ep->uct_eps[lane], + UCT_FLUSH_FLAG_CANCEL, + ucp_ep_err_pending_purge, + UCS_STATUS_PTR(status)); ucp_ep->uct_eps[lane] = NULL; + } else { + uct_ep_pending_purge(ucp_ep->uct_eps[lane], ucp_ep_err_pending_purge, + UCS_STATUS_PTR(status)); } } @@ -616,17 +628,27 @@ ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep, static ucs_status_t ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status) { - ucp_worker_h worker = (ucp_worker_h)arg; + ucp_worker_h worker = (ucp_worker_h)arg; ucp_lane_index_t lane; ucs_status_t ret_status; ucp_ep_ext_gen_t *ep_ext; ucp_ep_h ucp_ep; + khiter_t iter; UCS_ASYNC_BLOCK(&worker->async); - ucs_debug("worker %p: error handler called for uct_ep %p: %s", + ucs_debug("worker %p: error handler called for UCT EP %p: %s", worker, uct_ep, ucs_status_string(status)); + iter = kh_get(ucp_worker_discard_uct_ep_hash, + &worker->discard_uct_ep_hash, uct_ep); + if (iter != kh_end(&worker->discard_uct_ep_hash)) { + ucs_debug("UCT EP %p is being discarded on UCP Worker %p", + uct_ep, worker); + ret_status = UCS_OK; + goto out; + } + /* TODO: need to optimize uct_ep -> ucp_ep lookup */ ucs_list_for_each(ep_ext, &worker->all_eps, ep_list) { ucp_ep = ucp_ep_from_ext_gen(ep_ext); @@ -635,17 +657,19 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status) ucp_wireup_ep_is_owner(ucp_ep->uct_eps[lane], uct_ep)) { ret_status = ucp_worker_set_ep_failed(worker, ucp_ep, uct_ep, lane, status); - UCS_ASYNC_UNBLOCK(&worker->async); - return ret_status; + goto out; } } } - ucs_error("no uct_ep_h %p associated with ucp_ep_h on ucp_worker_h %p", + ucs_error("UCT EP %p isn't associated with UCP EP and was not scheduled " + "to be discarded on UCP Worker %p", uct_ep, worker); - UCS_ASYNC_UNBLOCK(&worker->async); + ret_status = UCS_ERR_NO_ELEM; - return UCS_ERR_NO_ELEM; +out: + UCS_ASYNC_UNBLOCK(&worker->async); + return ret_status; } void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags) @@ -1834,6 +1858,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context, ucs_conn_match_init(&worker->conn_match_ctx, sizeof(uint64_t), &ucp_ep_match_ops); kh_init_inplace(ucp_worker_rkey_config, &worker->rkey_config_hash); + kh_init_inplace(ucp_worker_discard_uct_ep_hash, &worker->discard_uct_ep_hash); UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen_t) <= sizeof(ucp_ep_t)); if (context->config.features & (UCP_FEATURE_STREAM | UCP_FEATURE_AM)) { @@ -2023,6 +2048,12 @@ void ucp_worker_destroy(ucp_worker_h worker) ucp_worker_remove_am_handlers(worker); ucp_am_cleanup(worker); ucp_worker_close_cms(worker); + + if (worker->flush_ops_count != 0) { + ucs_warn("not all pending operations (%u) were flushed on worker %p " + "that is being destroyed", + worker->flush_ops_count, worker); + } UCS_ASYNC_UNBLOCK(&worker->async); ucp_worker_destroy_ep_configs(worker); @@ -2037,6 +2068,8 @@ void ucp_worker_destroy(ucp_worker_h worker) uct_worker_destroy(worker->uct); ucs_async_context_cleanup(&worker->async); ucs_conn_match_cleanup(&worker->conn_match_ctx); + kh_destroy_inplace(ucp_worker_discard_uct_ep_hash, + &worker->discard_uct_ep_hash); kh_destroy_inplace(ucp_worker_rkey_config, &worker->rkey_config_hash); ucs_ptr_map_destroy(&worker->ptr_map); ucs_strided_alloc_cleanup(&worker->ep_alloc); @@ -2316,7 +2349,6 @@ void ucp_worker_release_address(ucp_worker_h worker, ucp_address_t *address) ucs_free(address); } - void ucp_worker_print_info(ucp_worker_h worker, FILE *stream) { ucp_context_h context = worker->context; @@ -2360,3 +2392,197 @@ void ucp_worker_print_info(ucp_worker_h worker, FILE *stream) UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); } + +static unsigned ucp_worker_discard_uct_ep_destroy_progress(void *arg) +{ + ucp_request_t *req = (ucp_request_t*)arg; + uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep; + ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker; + khiter_t iter; + + ucp_trace_req(req, "destroy uct_ep=%p", uct_ep); + ucp_request_put(req); + + UCS_ASYNC_BLOCK(&worker->async); + --worker->flush_ops_count; + iter = kh_get(ucp_worker_discard_uct_ep_hash, + &worker->discard_uct_ep_hash, uct_ep); + if (iter == kh_end(&worker->discard_uct_ep_hash)) { + ucs_fatal("no %p UCT EP in the %p worker hash of discarded UCT EPs", + uct_ep, worker); + } + kh_del(ucp_worker_discard_uct_ep_hash, + &worker->discard_uct_ep_hash, iter); + UCS_ASYNC_UNBLOCK(&worker->async); + + uct_ep_destroy(uct_ep); + + return 1; +} + +static void +ucp_worker_discard_uct_ep_flush_comp(uct_completion_t *self, + ucs_status_t status) +{ + uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; + ucp_request_t *req = ucs_container_of(self, ucp_request_t, + send.state.uct_comp); + ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker; + + ucp_trace_req(req, "discard_uct_ep flush completion status %s", + ucs_status_string(status)); + + /* don't destroy UCT EP from the flush completion callback, schedule + * a progress callback on the main thread to destroy UCT EP */ + uct_worker_progress_register_safe(worker->uct, + ucp_worker_discard_uct_ep_destroy_progress, + req, UCS_CALLBACKQ_FLAG_ONESHOT, &cb_id); +} + +static ucs_status_t +ucp_worker_discard_uct_ep_pending_cb(uct_pending_req_t *self) +{ + ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); + uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep; + ucs_status_t status; + + status = uct_ep_flush(uct_ep, req->send.discard_uct_ep.ep_flush_flags, + &req->send.state.uct_comp); + if (status == UCS_INPROGRESS) { + return UCS_OK; + } else if (status == UCS_ERR_NO_RESOURCE) { + return UCS_ERR_NO_RESOURCE; + } + + /* UCS_OK is handled here as well */ + ucp_worker_discard_uct_ep_flush_comp(&req->send.state.uct_comp, + status); + return UCS_OK; +} + +static unsigned ucp_worker_discard_uct_ep_progress(void *arg) +{ + uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; + ucp_request_t *req = (ucp_request_t*)arg; + uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep; + ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker; + ucs_status_t status; + + status = ucp_worker_discard_uct_ep_pending_cb(&req->send.uct); + if (status == UCS_ERR_NO_RESOURCE) { + status = uct_ep_pending_add(uct_ep, &req->send.uct, 0); + ucs_assert((status == UCS_ERR_BUSY) || (status == UCS_OK)); + if (status == UCS_ERR_BUSY) { + /* adding to the pending queue failed, schedule the UCT EP discard + * operation on UCT worker progress again */ + uct_worker_progress_register_safe(worker->uct, + ucp_worker_discard_uct_ep_progress, + req, UCS_CALLBACKQ_FLAG_ONESHOT, + &cb_id); + } + + return 0; + } + + return 1; +} + +static void +ucp_worker_discard_tl_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep, + unsigned ep_flush_flags) +{ + uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; + ucp_request_t *req; + int ret; + + req = ucp_request_get(worker); + if (ucs_unlikely(req == NULL)) { + ucs_error("unable to allocate request for discarding UCT EP %p " + "on UCP worker %p", uct_ep, worker); + return; + } + + ++worker->flush_ops_count; + kh_put(ucp_worker_discard_uct_ep_hash, &worker->discard_uct_ep_hash, + uct_ep, &ret); + if (ret == UCS_KH_PUT_FAILED) { + ucs_fatal("failed to put %p UCT EP into the %p worker hash", + uct_ep, worker); + } else if (ret == UCS_KH_PUT_KEY_PRESENT) { + ucs_fatal("%p UCT EP is already present in the %p worker hash", + uct_ep, worker); + } + + ucs_assert(!ucp_wireup_ep_test(uct_ep)); + req->send.uct.func = ucp_worker_discard_uct_ep_pending_cb; + req->send.state.uct_comp.func = ucp_worker_discard_uct_ep_flush_comp; + req->send.state.uct_comp.count = 1; + req->send.discard_uct_ep.ucp_worker = worker; + req->send.discard_uct_ep.uct_ep = uct_ep; + req->send.discard_uct_ep.ep_flush_flags = ep_flush_flags; + uct_worker_progress_register_safe(worker->uct, + ucp_worker_discard_uct_ep_progress, + req, UCS_CALLBACKQ_FLAG_ONESHOT, + &cb_id); +} + +static uct_ep_h +ucp_worker_discard_wireup_ep(ucp_worker_h worker, + ucp_wireup_ep_t *wireup_ep, + unsigned ep_flush_flags, + uct_pending_purge_callback_t purge_cb, + void *purge_arg) +{ + uct_ep_h uct_ep; + int is_owner; + + ucs_assert(wireup_ep != NULL); + + if (wireup_ep->aux_ep != NULL) { + /* make sure that there are no WIREUP MSGs anymore that are scheduled + * on AUX EP, i.e. the purge callback hasn't be invoked here */ + uct_ep_pending_purge(wireup_ep->aux_ep, + (uct_pending_purge_callback_t) + ucs_empty_function_do_assert, + NULL); + + /* discard the WIREUP EP's auxiliary EP */ + ucp_worker_discard_tl_uct_ep(worker, wireup_ep->aux_ep, + ep_flush_flags); + ucp_wireup_ep_disown(&wireup_ep->super.super, wireup_ep->aux_ep); + } + + is_owner = wireup_ep->super.is_owner; + uct_ep = ucp_wireup_ep_extract_next_ep(&wireup_ep->super.super); + + /* destroy WIREUP EP allocated for this UCT EP, since discard operation + * most likely won't have an access to UCP EP as it could be destroyed + * by the caller */ + uct_ep_destroy(&wireup_ep->super.super); + + /* do nothing, if this wireup EP is not an owner for UCT EP */ + return is_owner ? uct_ep : NULL; +} + +/* must be called with async lock held */ +void ucp_worker_discard_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep, + unsigned ep_flush_flags, + uct_pending_purge_callback_t purge_cb, + void *purge_arg) +{ + ucs_assert(uct_ep != NULL); + ucs_assert(purge_cb != NULL); + + uct_ep_pending_purge(uct_ep, purge_cb, purge_arg); + + if (ucp_wireup_ep_test(uct_ep)) { + uct_ep = ucp_worker_discard_wireup_ep(worker, ucp_wireup_ep(uct_ep), + ep_flush_flags, + purge_cb, purge_arg); + if (uct_ep == NULL) { + return; + } + } + + ucp_worker_discard_tl_uct_ep(worker, uct_ep, ep_flush_flags); +} diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index f11b7380a6a..142d240db47 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -172,6 +172,11 @@ KHASH_TYPE(ucp_worker_rkey_config, ucp_rkey_config_key_t, ucp_worker_cfg_index_t typedef khash_t(ucp_worker_rkey_config) ucp_worker_rkey_config_hash_t; +/* Hash set to UCT EPs that are being discarded on UCP Worker */ +KHASH_TYPE(ucp_worker_discard_uct_ep_hash, uct_ep_h, char); +typedef khash_t(ucp_worker_discard_uct_ep_hash) ucp_worker_discard_uct_ep_hash_t; + + /** * UCP worker iface, which encapsulates UCT iface, its attributes and * some auxiliary info needed for tag matching offloads. @@ -208,61 +213,63 @@ struct ucp_worker_cm { * UCP worker (thread context). */ typedef struct ucp_worker { - unsigned flags; /* Worker flags */ - ucs_async_context_t async; /* Async context for this worker */ - ucp_context_h context; /* Back-reference to UCP context */ - uint64_t uuid; /* Unique ID for wireup */ - uct_worker_h uct; /* UCT worker handle */ - ucs_mpool_t req_mp; /* Memory pool for requests */ - ucs_mpool_t rkey_mp; /* Pool for small memory keys */ - uint64_t atomic_tls; /* Which resources can be used for atomics */ - - int inprogress; - char name[UCP_WORKER_NAME_MAX]; /* Worker name */ - - unsigned flush_ops_count;/* Number of pending operations */ - - int event_fd; /* Allocated (on-demand) event fd for wakeup */ - ucs_sys_event_set_t *event_set; /* Allocated UCS event set for wakeup */ - int eventfd; /* Event fd to support signal() calls */ - unsigned uct_events; /* UCT arm events */ - ucs_list_link_t arm_ifaces; /* List of interfaces to arm */ - - void *user_data; /* User-defined data */ - ucs_strided_alloc_t ep_alloc; /* Endpoint allocator */ - ucs_list_link_t stream_ready_eps; /* List of EPs with received stream data */ - ucs_list_link_t all_eps; /* List of all endpoints */ - ucs_conn_match_ctx_t conn_match_ctx; /* Endpoint-to-endpoint matching context */ - ucp_worker_iface_t **ifaces; /* Array of pointers to interfaces, - one for each resource */ - unsigned num_ifaces; /* Number of elements in ifaces array */ - unsigned num_active_ifaces; /* Number of activated ifaces */ - uint64_t scalable_tl_bitmap; /* Map of scalable tl resources */ - ucp_worker_cm_t *cms; /* Array of CMs, one for each component */ - ucs_mpool_t am_mp; /* Memory pool for AM receives */ - ucs_mpool_t reg_mp; /* Registered memory pool */ - ucs_mpool_t rndv_frag_mp; /* Memory pool for RNDV fragments */ - ucs_queue_head_t rkey_ptr_reqs; /* Queue of submitted RKEY PTR requests that - * are in-progress */ - uct_worker_cb_id_t rkey_ptr_cb_id;/* RKEY PTR worker callback queue ID */ - ucp_tag_match_t tm; /* Tag-matching queues and offload info */ - ucs_array_t(ucp_am_cbs) am; /* Array of AM callbacks and their data */ - uint64_t am_message_id; /* For matching long AMs */ - ucp_ep_h mem_type_ep[UCS_MEMORY_TYPE_LAST];/* memory type eps */ + unsigned flags; /* Worker flags */ + ucs_async_context_t async; /* Async context for this worker */ + ucp_context_h context; /* Back-reference to UCP context */ + uint64_t uuid; /* Unique ID for wireup */ + uct_worker_h uct; /* UCT worker handle */ + ucs_mpool_t req_mp; /* Memory pool for requests */ + ucs_mpool_t rkey_mp; /* Pool for small memory keys */ + uint64_t atomic_tls; /* Which resources can be used for atomics */ + + int inprogress; + char name[UCP_WORKER_NAME_MAX]; /* Worker name */ + + unsigned flush_ops_count; /* Number of pending operations */ + + int event_fd; /* Allocated (on-demand) event fd for wakeup */ + ucs_sys_event_set_t *event_set; /* Allocated UCS event set for wakeup */ + int eventfd; /* Event fd to support signal() calls */ + unsigned uct_events; /* UCT arm events */ + ucs_list_link_t arm_ifaces; /* List of interfaces to arm */ + + void *user_data; /* User-defined data */ + ucs_strided_alloc_t ep_alloc; /* Endpoint allocator */ + ucs_list_link_t stream_ready_eps; /* List of EPs with received stream data */ + ucs_list_link_t all_eps; /* List of all endpoints */ + ucs_conn_match_ctx_t conn_match_ctx; /* Endpoint-to-endpoint matching context */ + ucp_worker_iface_t **ifaces; /* Array of pointers to interfaces, + one for each resource */ + unsigned num_ifaces; /* Number of elements in ifaces array */ + unsigned num_active_ifaces; /* Number of activated ifaces */ + uint64_t scalable_tl_bitmap; /* Map of scalable tl resources */ + ucp_worker_cm_t *cms; /* Array of CMs, one for each component */ + ucs_mpool_t am_mp; /* Memory pool for AM receives */ + ucs_mpool_t reg_mp; /* Registered memory pool */ + ucs_mpool_t rndv_frag_mp; /* Memory pool for RNDV fragments */ + ucs_queue_head_t rkey_ptr_reqs; /* Queue of submitted RKEY PTR requests that + * are in-progress */ + uct_worker_cb_id_t rkey_ptr_cb_id; /* RKEY PTR worker callback queue ID */ + ucp_tag_match_t tm; /* Tag-matching queues and offload info */ + ucs_array_t(ucp_am_cbs) am; /* Array of AM callbacks and their data */ + uint64_t am_message_id; /* For matching long AMs */ + ucp_ep_h mem_type_ep[UCS_MEMORY_TYPE_LAST]; /* Memory type EPs */ UCS_STATS_NODE_DECLARE(stats) UCS_STATS_NODE_DECLARE(tm_offload_stats) - ucs_cpu_set_t cpu_mask; /* Save CPU mask for subsequent calls to ucp_worker_listen */ + ucs_cpu_set_t cpu_mask; /* Save CPU mask for subsequent calls to + ucp_worker_listen */ - ucp_worker_rkey_config_hash_t rkey_config_hash; /* rkey config key -> index */ - ucs_ptr_map_t ptr_map; /* UCP objects key to ptr mapping */ + ucp_worker_rkey_config_hash_t rkey_config_hash; /* RKEY config key -> index */ + ucp_worker_discard_uct_ep_hash_t discard_uct_ep_hash; /* Hash of discarded UCT EPs */ + ucs_ptr_map_t ptr_map; /* UCP objects key to ptr mapping */ - unsigned ep_config_count; /* Current number of ep configurations */ - ucp_ep_config_t ep_config[UCP_WORKER_MAX_EP_CONFIG]; + unsigned ep_config_count; /* Current number of ep configurations */ + ucp_ep_config_t ep_config[UCP_WORKER_MAX_EP_CONFIG]; - unsigned rkey_config_count; /* Current number of rkey configurations */ - ucp_rkey_config_t rkey_config[UCP_WORKER_MAX_RKEY_CONFIG]; + unsigned rkey_config_count; /* Current number of rkey configurations */ + ucp_rkey_config_t rkey_config[UCP_WORKER_MAX_RKEY_CONFIG]; } ucp_worker_t; @@ -305,8 +312,15 @@ void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags); int ucp_worker_err_handle_remove_filter(const ucs_callbackq_elem_t *elem, void *arg); + ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep, uct_ep_h uct_ep, ucp_lane_index_t lane, ucs_status_t status); +/* must be called with async lock held */ +void ucp_worker_discard_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep, + unsigned ep_flush_flags, + uct_pending_purge_callback_t purge_cb, + void *purge_arg); + #endif diff --git a/src/ucp/rma/flush.c b/src/ucp/rma/flush.c index c92ce8b65cd..02ebd510fa7 100644 --- a/src/ucp/rma/flush.c +++ b/src/ucp/rma/flush.c @@ -384,10 +384,6 @@ static ucs_status_t ucp_worker_flush_check(ucp_worker_h worker) ucp_worker_iface_t *wiface; ucs_status_t status; - if (worker->flush_ops_count) { - return UCS_INPROGRESS; - } - for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) { wiface = worker->ifaces[iface_id]; if (wiface->iface == NULL) { @@ -444,17 +440,26 @@ static unsigned ucp_worker_flush_progress(void *arg) ucs_status_t status; ucp_ep_h ep; - status = ucp_worker_flush_check(worker); - if ((status == UCS_OK) || (&next_ep->ep_list == &worker->all_eps)) { - /* If all ifaces are flushed, or we finished going over all endpoints, - * no need to progress this request actively any more. Just wait until - * all associated endpoint flush requests are completed. - */ - ucp_worker_flush_complete_one(req, UCS_OK, 1); - } else if (status != UCS_INPROGRESS) { - /* Error returned from uct iface flush */ - ucp_worker_flush_complete_one(req, status, 1); - } else if (worker->context->config.ext.flush_worker_eps) { + if (worker->flush_ops_count == 0) { + /* all scheduled progress operations on worker were completed */ + status = ucp_worker_flush_check(worker); + if ((status == UCS_OK) || (&next_ep->ep_list == &worker->all_eps)) { + /* If all ifaces are flushed, or we finished going over all + * endpoints, no need to progress this request actively anymore + * and we complete the flush operation with UCS_OK status. */ + ucp_worker_flush_complete_one(req, UCS_OK, 1); + goto out; + } else if (status != UCS_INPROGRESS) { + /* Error returned from uct iface flush, no need to progress + * this request actively anymore and we complete the flush + * operation with an error status. */ + ucp_worker_flush_complete_one(req, status, 1); + goto out; + } + } + + if ((worker->context->config.ext.flush_worker_eps) && + (&next_ep->ep_list != &worker->all_eps)) { /* Some endpoints are not flushed yet. Take next endpoint from the list * and start flush operation on it. */ @@ -477,6 +482,7 @@ static unsigned ucp_worker_flush_progress(void *arg) } } +out: return 0; } @@ -487,9 +493,12 @@ ucp_worker_flush_nbx_internal(ucp_worker_h worker, ucs_status_t status; ucp_request_t *req; - status = ucp_worker_flush_check(worker); - if ((status != UCS_INPROGRESS) && (status != UCS_ERR_NO_RESOURCE)) { - return UCS_STATUS_PTR(status); + if (!worker->flush_ops_count) { + status = ucp_worker_flush_check(worker); + if ((status != UCS_INPROGRESS) && (status != UCS_ERR_NO_RESOURCE)) { + /* UCS_OK is returned here as well */ + return UCS_STATUS_PTR(status); + } } req = ucp_request_get_param(worker, param, diff --git a/src/uct/ib/ud/base/ud_ep.c b/src/uct/ib/ud/base/ud_ep.c index c34ad4070a7..828e91b31c8 100644 --- a/src/uct/ib/ud/base/ud_ep.c +++ b/src/uct/ib/ud/base/ud_ep.c @@ -109,6 +109,10 @@ static UCS_F_ALWAYS_INLINE void uct_ud_ep_ca_ack(uct_ud_ep_t *ep) ep->tx.max_psn = ep->tx.acked_psn + ep->ca.cwnd; } +static void uct_ud_ep_reset_max_psn(uct_ud_ep_t *ep) +{ + ep->tx.max_psn = ep->tx.psn + ep->ca.cwnd; +} static void uct_ud_ep_reset(uct_ud_ep_t *ep) { @@ -116,9 +120,9 @@ static void uct_ud_ep_reset(uct_ud_ep_t *ep) ep->ca.cwnd = UCT_UD_CA_MIN_WINDOW; ep->ca.wmax = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t)->config.max_window; - ep->tx.max_psn = ep->tx.psn + ep->ca.cwnd; ep->tx.acked_psn = UCT_UD_INITIAL_PSN - 1; ep->tx.pending.ops = UCT_UD_EP_OP_NONE; + uct_ud_ep_reset_max_psn(ep); ucs_queue_head_init(&ep->tx.window); ep->resend.pos = ucs_queue_iter_begin(&ep->tx.window); @@ -221,7 +225,10 @@ static void uct_ud_ep_purge_outstanding(uct_ud_ep_t *ep) static void uct_ud_ep_purge(uct_ud_ep_t *ep, ucs_status_t status) { - uct_ud_ep_tx_stop(ep); + /* reset the maximal TX psn value to the default, since we should be able + * to do TX operation after purging of the EP and uct_ep_flush(LOCAL) + * operation has to return UCS_OK */ + uct_ud_ep_reset_max_psn(ep); uct_ud_ep_purge_outstanding(ep); ep->tx.acked_psn = (uct_ud_psn_t)(ep->tx.psn - 1); uct_ud_ep_window_release(ep, status, 0); diff --git a/test/gtest/Makefile.am b/test/gtest/Makefile.am index 15de8a489ed..ed78eab162c 100644 --- a/test/gtest/Makefile.am +++ b/test/gtest/Makefile.am @@ -135,6 +135,7 @@ gtest_SOURCES = \ ucp/test_ucp_tag_mem_type.cc \ ucp/test_ucp_tag.cc \ ucp/test_ucp_context.cc \ + ucp/test_ucp_worker.cc \ ucp/test_ucp_wireup.cc \ ucp/test_ucp_wakeup.cc \ ucp/test_ucp_fence.cc \ diff --git a/test/gtest/ucp/test_ucp_worker.cc b/test/gtest/ucp/test_ucp_worker.cc new file mode 100644 index 00000000000..7c41104c247 --- /dev/null +++ b/test/gtest/ucp/test_ucp_worker.cc @@ -0,0 +1,421 @@ +/** + * Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED. + * + * See file LICENSE for terms. + */ + +#include "ucp_test.h" +#include +#include + +extern "C" { +#include +#include +#include +#include +} + + +class test_ucp_worker_discard : public ucp_test { +public: + static ucp_params_t get_ctx_params() { + ucp_params_t params = ucp_test::get_ctx_params(); + params.features |= UCP_FEATURE_TAG; + return params; + } + +protected: + struct ep_test_info_t { + std::vector pending_reqs; + unsigned flush_count; + unsigned pending_add_count; + + ep_test_info_t() : flush_count(0), pending_add_count(0) { + } + }; + typedef std::map ep_test_info_map_t; + + void init() { + ucp_test::init(); + m_created_ep_count = 0; + m_destroyed_ep_count = 0; + m_fake_ep.flags = UCP_EP_FLAG_REMOTE_CONNECTED; + + m_flush_comps.clear(); + m_pending_reqs.clear(); + m_ep_test_info_map.clear(); + } + + void add_pending_reqs(uct_ep_h uct_ep, + uct_pending_callback_t func, + std::vector &pending_reqs, + unsigned base = 0) { + for (unsigned i = 0; i < m_pending_purge_reqs_count; i++) { + /* use `ucs_calloc()` here, since the memory could be released + * in the `ucp_wireup_msg_progress()` function by `ucs_free()` */ + ucp_request_t *req = static_cast( + ucs_calloc(1, sizeof(*req), + "ucp_request")); + ASSERT_TRUE(req != NULL); + + pending_reqs.push_back(req); + + if (func == ucp_wireup_msg_progress) { + req->send.ep = &m_fake_ep; + } + + req->send.uct.func = func; + uct_ep_pending_add(uct_ep, &req->send.uct, 0); + } + } + + void test_worker_discard(void *ep_flush_func, + void *ep_pending_add_func, + void *ep_pending_purge_func, + unsigned ep_count = 8, + unsigned wireup_ep_count = 0, + unsigned wireup_aux_ep_count = 0) { + uct_iface_ops_t ops = {0}; + unsigned created_wireup_aux_ep_count = 0; + unsigned total_ep_count = ep_count + wireup_aux_ep_count; + uct_iface_t iface; + std::vector eps(total_ep_count); + std::vector wireup_eps(wireup_ep_count); + ucp_ep_t ucp_ep; + ucs_status_t status; + + ASSERT_LE(wireup_ep_count, ep_count); + ASSERT_LE(wireup_aux_ep_count, wireup_ep_count); + + ucp_ep.worker = sender().worker(); + + ops.ep_flush = (uct_ep_flush_func_t)ep_flush_func; + ops.ep_pending_add = (uct_ep_pending_add_func_t)ep_pending_add_func; + ops.ep_pending_purge = (uct_ep_pending_purge_func_t)ep_pending_purge_func; + ops.ep_destroy = ep_destroy_func; + iface.ops = ops; + + for (unsigned i = 0; i < ep_count; i++) { + uct_ep_h discard_ep; + + eps[i].iface = &iface; + m_created_ep_count++; + + std::vector pending_reqs; + + if (i < wireup_ep_count) { + status = ucp_wireup_ep_create(&ucp_ep, &discard_ep); + ASSERT_UCS_OK(status); + + wireup_eps.push_back(discard_ep); + ucp_wireup_ep_set_next_ep(discard_ep, &eps[i]); + + ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(discard_ep); + + if (i < wireup_aux_ep_count) { + eps[ep_count + created_wireup_aux_ep_count].iface = &iface; + + /* coverity[escape] */ + wireup_ep->aux_ep = &eps[ep_count + + created_wireup_aux_ep_count]; + + created_wireup_aux_ep_count++; + m_created_ep_count++; + } + + if (ep_pending_purge_func == ep_pending_purge_func_iter_reqs) { + /* add WIREUP MSGs to the WIREUP EP (it will be added to + * UCT EP or WIREUP AUX EP) */ + add_pending_reqs(discard_ep, + (uct_pending_callback_t) + ucp_wireup_msg_progress, + pending_reqs); + } + } else { + discard_ep = &eps[i]; + } + + EXPECT_LE(m_created_ep_count, total_ep_count); + + + if (ep_pending_purge_func == ep_pending_purge_func_iter_reqs) { + /* add user's pending requests */ + add_pending_reqs(discard_ep, + (uct_pending_callback_t) + ucs_empty_function, + pending_reqs); + } + + unsigned purged_reqs_count = 0; + ucp_worker_discard_uct_ep(sender().worker(), discard_ep, + UCT_FLUSH_FLAG_LOCAL, + ep_pending_purge_count_reqs_cb, + &purged_reqs_count); + + if (ep_pending_purge_func == ep_pending_purge_func_iter_reqs) { + EXPECT_EQ(m_pending_purge_reqs_count, purged_reqs_count); + } else { + EXPECT_EQ(0u, purged_reqs_count); + } + } + + void *flush_req = sender().flush_worker_nb(0); + + ASSERT_FALSE(flush_req == NULL); + ASSERT_TRUE(UCS_PTR_IS_PTR(flush_req)); + + do { + progress(); + + if (!m_flush_comps.empty()) { + uct_completion_t *comp = m_flush_comps.back(); + + m_flush_comps.pop_back(); + uct_invoke_completion(comp, UCS_OK); + } + + if (!m_pending_reqs.empty()) { + uct_pending_req_t *req = m_pending_reqs.back(); + + status = req->func(req); + if (status == UCS_OK) { + m_pending_reqs.pop_back(); + } else { + EXPECT_EQ(UCS_ERR_NO_RESOURCE, status); + } + } + } while (ucp_request_check_status(flush_req) == UCS_INPROGRESS); + + EXPECT_UCS_OK(ucp_request_check_status(flush_req)); + EXPECT_EQ(m_created_ep_count, m_destroyed_ep_count); + EXPECT_EQ(m_created_ep_count, total_ep_count); + + for (unsigned i = 0; i < m_created_ep_count; i++) { + ep_test_info_t &test_info = ep_test_info_get(&eps[i]); + + /* check EP flush counters */ + if (ep_flush_func == ep_flush_func_return_3_no_resource_then_ok) { + EXPECT_EQ(4, test_info.flush_count); + } else if (ep_flush_func == ep_flush_func_return_in_progress) { + EXPECT_EQ(1, test_info.flush_count); + } + + /* check EP pending add counters */ + if (ep_pending_add_func == ep_pending_add_func_return_ok_then_busy) { + /* pending_add has to be called only once per EP */ + EXPECT_EQ(1, test_info.pending_add_count); + } + } + + EXPECT_TRUE(m_flush_comps.empty()); + EXPECT_TRUE(m_pending_reqs.empty()); + + ucp_request_release(flush_req); + + /* check that uct_ep_destroy() was called for the all EPs that + * were created in the test */ + for (unsigned i = 0; i < created_wireup_aux_ep_count; i++) { + EXPECT_EQ(NULL, eps[i].iface); + } + } + + static void ep_destroy_func(uct_ep_h ep) { + ep->iface = NULL; + m_destroyed_ep_count++; + } + + static ep_test_info_t& ep_test_info_get(uct_ep_h ep) { + ep_test_info_map_t::iterator it = m_ep_test_info_map.find(ep); + + if (it == m_ep_test_info_map.end()) { + ep_test_info_t test_info; + + m_ep_test_info_map.insert(std::make_pair(ep, test_info)); + it = m_ep_test_info_map.find(ep); + } + + return it->second; + } + + static unsigned + ep_test_info_flush_inc(uct_ep_h ep) { + ep_test_info_t &test_info = ep_test_info_get(ep); + return ++test_info.flush_count; + } + + static unsigned + ep_test_info_pending_add_inc(uct_ep_h ep) { + ep_test_info_t &test_info = ep_test_info_get(ep); + return ++test_info.pending_add_count; + } + + static ucs_status_t + ep_flush_func_return_3_no_resource_then_ok(uct_ep_h ep, unsigned flags, + uct_completion_t *comp) { + unsigned flush_ep_count = ep_test_info_flush_inc(ep); + EXPECT_LE(flush_ep_count, 4); + return (flush_ep_count < 4) ? + UCS_ERR_NO_RESOURCE : UCS_OK; + } + + static ucs_status_t + ep_flush_func_return_in_progress(uct_ep_h ep, unsigned flags, + uct_completion_t *comp) { + unsigned flush_ep_count = ep_test_info_flush_inc(ep); + EXPECT_LE(flush_ep_count, m_created_ep_count); + m_flush_comps.push_back(comp); + return UCS_INPROGRESS; + } + + static ucs_status_t + ep_pending_add_func_return_ok_then_busy(uct_ep_h ep, uct_pending_req_t *req, + unsigned flags) { + unsigned pending_add_ep_count = ep_test_info_pending_add_inc(ep); + EXPECT_LE(pending_add_ep_count, m_created_ep_count); + + if (pending_add_ep_count < m_created_ep_count) { + m_pending_reqs.push_back(req); + return UCS_OK; + } + + return UCS_ERR_BUSY; + } + + static void + ep_pending_purge_count_reqs_cb(uct_pending_req_t *self, + void *arg) { + unsigned *count = (unsigned*)arg; + (*count)++; + + ucp_request_t *req = ucs_container_of(self, + ucp_request_t, + send.uct); + + ASSERT_TRUE(self->func != ucp_wireup_ep_progress_pending); + ucs_free(req); + } + + static ucs_status_t + ep_pending_add_save_req(uct_ep_h ep, uct_pending_req_t *req, + unsigned flags) { + ep_test_info_t &test_info = ep_test_info_get(ep); + test_info.pending_reqs.push_back(req); + return UCS_OK; + } + + static void + ep_pending_purge_func_iter_reqs(uct_ep_h ep, + uct_pending_purge_callback_t cb, + void *arg) { + ep_test_info_t &test_info = ep_test_info_get(ep); + uct_pending_req_t *req; + + for (unsigned i = 0; i < m_pending_purge_reqs_count; i++) { + std::vector &req_vec = test_info.pending_reqs; + if (req_vec.size() == 0) { + break; + } + + req = req_vec.back(); + req_vec.pop_back(); + cb(req, arg); + } + } + +protected: + static unsigned m_created_ep_count; + static unsigned m_destroyed_ep_count; + static ucp_ep_t m_fake_ep; + static const unsigned m_pending_purge_reqs_count; + + static std::vector m_flush_comps; + static std::vector m_pending_reqs; + static ep_test_info_map_t m_ep_test_info_map; +}; + +unsigned test_ucp_worker_discard::m_created_ep_count = 0; +unsigned test_ucp_worker_discard::m_destroyed_ep_count = 0; +ucp_ep_t test_ucp_worker_discard::m_fake_ep = {}; +const unsigned test_ucp_worker_discard::m_pending_purge_reqs_count = 10; + +std::vector test_ucp_worker_discard::m_flush_comps; +std::vector test_ucp_worker_discard::m_pending_reqs; +test_ucp_worker_discard::ep_test_info_map_t test_ucp_worker_discard::m_ep_test_info_map; + + +UCS_TEST_P(test_ucp_worker_discard, flush_ok) { + test_worker_discard((void*)ucs_empty_function_return_success /* ep_flush */, + (void*)ucs_empty_function_do_assert /* ep_pending_add */, + (void*)ucs_empty_function /* ep_pending_purge */); +} + +UCS_TEST_P(test_ucp_worker_discard, wireup_ep_flush_ok) { + test_worker_discard((void*)ucs_empty_function_return_success /* ep_flush */, + (void*)ucs_empty_function_do_assert /* ep_pending_add */, + (void*)ucs_empty_function /* ep_pending_purge */, + 8 /* UCT EP count */, + 6 /* WIREUP EP count */, + 3 /* WIREUP AUX EP count */); +} + +UCS_TEST_P(test_ucp_worker_discard, flush_ok_pending_purge) { + test_worker_discard((void*)ucs_empty_function_return_success /* ep_flush */, + (void*)ep_pending_add_save_req /* ep_pending_add */, + (void*)ep_pending_purge_func_iter_reqs /* ep_pending_purge */); +} + +UCS_TEST_P(test_ucp_worker_discard, wireup_ep_flush_ok_pending_purge) { + test_worker_discard((void*)ucs_empty_function_return_success /* ep_flush */, + (void*)ep_pending_add_save_req /* ep_pending_add */, + (void*)ep_pending_purge_func_iter_reqs /* ep_pending_purge */, + 8 /* UCT EP count */, + 6 /* WIREUP EP count */, + 3 /* WIREUP AUX EP count */); +} + +UCS_TEST_P(test_ucp_worker_discard, flush_in_progress) { + test_worker_discard((void*)ep_flush_func_return_in_progress /* ep_flush */, + (void*)ucs_empty_function_do_assert /* ep_pending_add */, + (void*)ucs_empty_function /* ep_pending_purge */); +} + +UCS_TEST_P(test_ucp_worker_discard, wireup_ep_flush_in_progress) { + test_worker_discard((void*)ep_flush_func_return_in_progress /* ep_flush */, + (void*)ucs_empty_function_do_assert /* ep_pending_add */, + (void*)ucs_empty_function /* ep_pending_purge */, + 8 /* UCT EP count */, + 6 /* WIREUP EP count */, + 3 /* WIREUP AUX EP count */); +} + +UCS_TEST_P(test_ucp_worker_discard, flush_no_resource_pending_add_busy) { + test_worker_discard((void*)ep_flush_func_return_3_no_resource_then_ok /* ep_flush */, + (void*)ucs_empty_function_return_busy /* ep_pending_add */, + (void*)ucs_empty_function /* ep_pending_purge */); +} + +UCS_TEST_P(test_ucp_worker_discard, wireup_ep_flush_no_resource_pending_add_busy) { + test_worker_discard((void*)ep_flush_func_return_3_no_resource_then_ok /* ep_flush */, + (void*)ucs_empty_function_return_busy /* ep_pending_add */, + (void*)ucs_empty_function /* ep_pending_purge */, + 8 /* UCT EP count */, + 6 /* WIREUP EP count */, + 3 /* WIREUP AUX EP count */); +} + +UCS_TEST_P(test_ucp_worker_discard, flush_no_resource_pending_add_ok_then_busy) { + test_worker_discard((void*)ep_flush_func_return_3_no_resource_then_ok /* ep_flush */, + (void*)ep_pending_add_func_return_ok_then_busy /* ep_pending_add */, + (void*)ucs_empty_function /* ep_pending_purge */); +} + +UCS_TEST_P(test_ucp_worker_discard, wireup_ep_flush_no_resource_pending_add_ok_then_busy) { + test_worker_discard((void*)ep_flush_func_return_3_no_resource_then_ok /* ep_flush */, + (void*)ep_pending_add_func_return_ok_then_busy /* ep_pending_add */, + (void*)ucs_empty_function /* ep_pending_purge */, + 8 /* UCT EP count */, + 6 /* WIREUP EP count */, + 3 /* WIREUP AUX EP count */); +} + +UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_worker_discard, all, "all")