-
Notifications
You must be signed in to change notification settings - Fork 532
UCP/CORE: Implement flush+destroy for UCT EPs on UCP Worker #5608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f7b0e3c
627a8a5
e9d3b1d
ecd8663
8d4a9f9
d229eae
8558d4c
cdfe88d
c1a8fee
28edf67
022a6f2
46b1272
cb197a4
5d0b3fa
12d1c34
d1ab7d4
d809dc1
7e6a5b4
a0f759b
46afaaf
e2982d5
7ecd09d
9ef1f34
efdb4d3
509dd6c
07b8bb3
43c3e92
9153e50
9c346d6
465be85
cdb8875
2a5888f
d3cf051
ea18b51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,6 +102,14 @@ ucs_mpool_ops_t ucp_frag_mpool_ops = { | |
| }; | ||
|
|
||
|
|
||
| #define ucp_worker_discard_uct_ep_hash_key(_uct_ep) \ | ||
| kh_int64_hash_func((uintptr_t)(_uct_ep)) | ||
|
|
||
|
|
||
| KHASH_IMPL(ucp_worker_discard_uct_ep_hash, uct_ep_h, char, 0, | ||
| ucp_worker_discard_uct_ep_hash_key, kh_int64_hash_equal); | ||
|
|
||
|
|
||
| static ucs_status_t ucp_worker_wakeup_ctl_fd(ucp_worker_h worker, | ||
| ucp_worker_event_fd_op_t op, | ||
| int event_fd) | ||
|
|
@@ -439,14 +447,18 @@ static unsigned ucp_worker_iface_err_handle_progress(void *arg) | |
| /* Purge pending queue */ | ||
| ucs_trace("ep %p: purge pending on uct_ep[%d]=%p", ucp_ep, lane, | ||
| ucp_ep->uct_eps[lane]); | ||
| uct_ep_pending_purge(ucp_ep->uct_eps[lane], ucp_ep_err_pending_purge, | ||
| UCS_STATUS_PTR(status)); | ||
|
|
||
| if (lane != failed_lane) { | ||
| ucs_trace("ep %p: destroy uct_ep[%d]=%p", ucp_ep, lane, | ||
| ucp_ep->uct_eps[lane]); | ||
| uct_ep_destroy(ucp_ep->uct_eps[lane]); | ||
| ucp_worker_discard_uct_ep(ucp_ep->worker, ucp_ep->uct_eps[lane], | ||
| UCT_FLUSH_FLAG_CANCEL, | ||
| ucp_ep_err_pending_purge, | ||
| UCS_STATUS_PTR(status)); | ||
| ucp_ep->uct_eps[lane] = NULL; | ||
| } else { | ||
| uct_ep_pending_purge(ucp_ep->uct_eps[lane], ucp_ep_err_pending_purge, | ||
| UCS_STATUS_PTR(status)); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -616,17 +628,27 @@ ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep, | |
| static ucs_status_t | ||
| ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status) | ||
| { | ||
| ucp_worker_h worker = (ucp_worker_h)arg; | ||
| ucp_worker_h worker = (ucp_worker_h)arg; | ||
| ucp_lane_index_t lane; | ||
| ucs_status_t ret_status; | ||
| ucp_ep_ext_gen_t *ep_ext; | ||
| ucp_ep_h ucp_ep; | ||
| khiter_t iter; | ||
|
|
||
| UCS_ASYNC_BLOCK(&worker->async); | ||
|
|
||
| ucs_debug("worker %p: error handler called for uct_ep %p: %s", | ||
| ucs_debug("worker %p: error handler called for UCT EP %p: %s", | ||
| worker, uct_ep, ucs_status_string(status)); | ||
|
|
||
| iter = kh_get(ucp_worker_discard_uct_ep_hash, | ||
| &worker->discard_uct_ep_hash, uct_ep); | ||
| if (iter != kh_end(&worker->discard_uct_ep_hash)) { | ||
| ucs_debug("UCT EP %p is being discarded on UCP Worker %p", | ||
| uct_ep, worker); | ||
| ret_status = UCS_OK; | ||
| goto out; | ||
| } | ||
|
|
||
| /* TODO: need to optimize uct_ep -> ucp_ep lookup */ | ||
| ucs_list_for_each(ep_ext, &worker->all_eps, ep_list) { | ||
| ucp_ep = ucp_ep_from_ext_gen(ep_ext); | ||
|
|
@@ -635,17 +657,19 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status) | |
| ucp_wireup_ep_is_owner(ucp_ep->uct_eps[lane], uct_ep)) { | ||
| ret_status = ucp_worker_set_ep_failed(worker, ucp_ep, uct_ep, | ||
| lane, status); | ||
| UCS_ASYNC_UNBLOCK(&worker->async); | ||
| return ret_status; | ||
| goto out; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| ucs_error("no uct_ep_h %p associated with ucp_ep_h on ucp_worker_h %p", | ||
| ucs_error("UCT EP %p isn't associated with UCP EP and was not scheduled " | ||
| "to be discarded on UCP Worker %p", | ||
| uct_ep, worker); | ||
| UCS_ASYNC_UNBLOCK(&worker->async); | ||
| ret_status = UCS_ERR_NO_ELEM; | ||
|
|
||
| return UCS_ERR_NO_ELEM; | ||
| out: | ||
| UCS_ASYNC_UNBLOCK(&worker->async); | ||
| return ret_status; | ||
| } | ||
|
|
||
| void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags) | ||
|
|
@@ -1834,6 +1858,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context, | |
| ucs_conn_match_init(&worker->conn_match_ctx, sizeof(uint64_t), | ||
| &ucp_ep_match_ops); | ||
| kh_init_inplace(ucp_worker_rkey_config, &worker->rkey_config_hash); | ||
| kh_init_inplace(ucp_worker_discard_uct_ep_hash, &worker->discard_uct_ep_hash); | ||
|
|
||
| UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen_t) <= sizeof(ucp_ep_t)); | ||
| if (context->config.features & (UCP_FEATURE_STREAM | UCP_FEATURE_AM)) { | ||
|
|
@@ -2023,6 +2048,12 @@ void ucp_worker_destroy(ucp_worker_h worker) | |
| ucp_worker_remove_am_handlers(worker); | ||
| ucp_am_cleanup(worker); | ||
| ucp_worker_close_cms(worker); | ||
|
|
||
| if (worker->flush_ops_count != 0) { | ||
| ucs_warn("not all pending operations (%u) were flushed on worker %p " | ||
| "that is being destroyed", | ||
| worker->flush_ops_count, worker); | ||
| } | ||
| UCS_ASYNC_UNBLOCK(&worker->async); | ||
|
|
||
| ucp_worker_destroy_ep_configs(worker); | ||
|
|
@@ -2037,6 +2068,8 @@ void ucp_worker_destroy(ucp_worker_h worker) | |
| uct_worker_destroy(worker->uct); | ||
| ucs_async_context_cleanup(&worker->async); | ||
| ucs_conn_match_cleanup(&worker->conn_match_ctx); | ||
| kh_destroy_inplace(ucp_worker_discard_uct_ep_hash, | ||
| &worker->discard_uct_ep_hash); | ||
| kh_destroy_inplace(ucp_worker_rkey_config, &worker->rkey_config_hash); | ||
| ucs_ptr_map_destroy(&worker->ptr_map); | ||
| ucs_strided_alloc_cleanup(&worker->ep_alloc); | ||
|
|
@@ -2316,7 +2349,6 @@ void ucp_worker_release_address(ucp_worker_h worker, ucp_address_t *address) | |
| ucs_free(address); | ||
| } | ||
|
|
||
|
|
||
| void ucp_worker_print_info(ucp_worker_h worker, FILE *stream) | ||
| { | ||
| ucp_context_h context = worker->context; | ||
|
|
@@ -2360,3 +2392,197 @@ void ucp_worker_print_info(ucp_worker_h worker, FILE *stream) | |
|
|
||
| UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); | ||
| } | ||
|
|
||
| static unsigned ucp_worker_discard_uct_ep_destroy_progress(void *arg) | ||
| { | ||
| ucp_request_t *req = (ucp_request_t*)arg; | ||
| uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep; | ||
| ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker; | ||
| khiter_t iter; | ||
|
|
||
| ucp_trace_req(req, "destroy uct_ep=%p", uct_ep); | ||
| ucp_request_put(req); | ||
|
|
||
| UCS_ASYNC_BLOCK(&worker->async); | ||
| --worker->flush_ops_count; | ||
| iter = kh_get(ucp_worker_discard_uct_ep_hash, | ||
| &worker->discard_uct_ep_hash, uct_ep); | ||
| if (iter == kh_end(&worker->discard_uct_ep_hash)) { | ||
| ucs_fatal("no %p UCT EP in the %p worker hash of discarded UCT EPs", | ||
| uct_ep, worker); | ||
| } | ||
| kh_del(ucp_worker_discard_uct_ep_hash, | ||
| &worker->discard_uct_ep_hash, iter); | ||
| UCS_ASYNC_UNBLOCK(&worker->async); | ||
dmitrygx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| uct_ep_destroy(uct_ep); | ||
|
|
||
| return 1; | ||
| } | ||
|
|
||
| static void | ||
| ucp_worker_discard_uct_ep_flush_comp(uct_completion_t *self, | ||
| ucs_status_t status) | ||
| { | ||
| uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; | ||
| ucp_request_t *req = ucs_container_of(self, ucp_request_t, | ||
| send.state.uct_comp); | ||
| ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker; | ||
|
|
||
| ucp_trace_req(req, "discard_uct_ep flush completion status %s", | ||
| ucs_status_string(status)); | ||
|
|
||
| /* don't destroy UCT EP from the flush completion callback, schedule | ||
dmitrygx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * a progress callback on the main thread to destroy UCT EP */ | ||
| uct_worker_progress_register_safe(worker->uct, | ||
| ucp_worker_discard_uct_ep_destroy_progress, | ||
| req, UCS_CALLBACKQ_FLAG_ONESHOT, &cb_id); | ||
| } | ||
|
|
||
| static ucs_status_t | ||
| ucp_worker_discard_uct_ep_pending_cb(uct_pending_req_t *self) | ||
| { | ||
| ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); | ||
| uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep; | ||
| ucs_status_t status; | ||
|
|
||
| status = uct_ep_flush(uct_ep, req->send.discard_uct_ep.ep_flush_flags, | ||
| &req->send.state.uct_comp); | ||
| if (status == UCS_INPROGRESS) { | ||
| return UCS_OK; | ||
| } else if (status == UCS_ERR_NO_RESOURCE) { | ||
| return UCS_ERR_NO_RESOURCE; | ||
| } | ||
|
|
||
| /* UCS_OK is handled here as well */ | ||
| ucp_worker_discard_uct_ep_flush_comp(&req->send.state.uct_comp, | ||
| status); | ||
| return UCS_OK; | ||
| } | ||
|
|
||
| static unsigned ucp_worker_discard_uct_ep_progress(void *arg) | ||
| { | ||
| uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; | ||
| ucp_request_t *req = (ucp_request_t*)arg; | ||
| uct_ep_h uct_ep = req->send.discard_uct_ep.uct_ep; | ||
| ucp_worker_h worker = req->send.discard_uct_ep.ucp_worker; | ||
| ucs_status_t status; | ||
|
|
||
| status = ucp_worker_discard_uct_ep_pending_cb(&req->send.uct); | ||
| if (status == UCS_ERR_NO_RESOURCE) { | ||
| status = uct_ep_pending_add(uct_ep, &req->send.uct, 0); | ||
| ucs_assert((status == UCS_ERR_BUSY) || (status == UCS_OK)); | ||
| if (status == UCS_ERR_BUSY) { | ||
| /* adding to the pending queue failed, schedule the UCT EP discard | ||
| * operation on UCT worker progress again */ | ||
| uct_worker_progress_register_safe(worker->uct, | ||
| ucp_worker_discard_uct_ep_progress, | ||
| req, UCS_CALLBACKQ_FLAG_ONESHOT, | ||
| &cb_id); | ||
| } | ||
|
|
||
| return 0; | ||
| } | ||
|
|
||
| return 1; | ||
| } | ||
|
|
||
| static void | ||
| ucp_worker_discard_tl_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep, | ||
| unsigned ep_flush_flags) | ||
| { | ||
| uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; | ||
| ucp_request_t *req; | ||
| int ret; | ||
|
|
||
| req = ucp_request_get(worker); | ||
| if (ucs_unlikely(req == NULL)) { | ||
| ucs_error("unable to allocate request for discarding UCT EP %p " | ||
| "on UCP worker %p", uct_ep, worker); | ||
| return; | ||
| } | ||
|
|
||
| ++worker->flush_ops_count; | ||
yosefe marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| kh_put(ucp_worker_discard_uct_ep_hash, &worker->discard_uct_ep_hash, | ||
| uct_ep, &ret); | ||
| if (ret == UCS_KH_PUT_FAILED) { | ||
| ucs_fatal("failed to put %p UCT EP into the %p worker hash", | ||
| uct_ep, worker); | ||
| } else if (ret == UCS_KH_PUT_KEY_PRESENT) { | ||
| ucs_fatal("%p UCT EP is already present in the %p worker hash", | ||
| uct_ep, worker); | ||
| } | ||
|
|
||
| ucs_assert(!ucp_wireup_ep_test(uct_ep)); | ||
| req->send.uct.func = ucp_worker_discard_uct_ep_pending_cb; | ||
| req->send.state.uct_comp.func = ucp_worker_discard_uct_ep_flush_comp; | ||
| req->send.state.uct_comp.count = 1; | ||
| req->send.discard_uct_ep.ucp_worker = worker; | ||
| req->send.discard_uct_ep.uct_ep = uct_ep; | ||
| req->send.discard_uct_ep.ep_flush_flags = ep_flush_flags; | ||
| uct_worker_progress_register_safe(worker->uct, | ||
| ucp_worker_discard_uct_ep_progress, | ||
| req, UCS_CALLBACKQ_FLAG_ONESHOT, | ||
| &cb_id); | ||
| } | ||
|
|
||
| static uct_ep_h | ||
| ucp_worker_discard_wireup_ep(ucp_worker_h worker, | ||
| ucp_wireup_ep_t *wireup_ep, | ||
| unsigned ep_flush_flags, | ||
| uct_pending_purge_callback_t purge_cb, | ||
| void *purge_arg) | ||
| { | ||
| uct_ep_h uct_ep; | ||
| int is_owner; | ||
|
|
||
| ucs_assert(wireup_ep != NULL); | ||
|
|
||
| if (wireup_ep->aux_ep != NULL) { | ||
| /* make sure that there are no WIREUP MSGs anymore that are scheduled | ||
| * on AUX EP, i.e. the purge callback hasn't be invoked here */ | ||
| uct_ep_pending_purge(wireup_ep->aux_ep, | ||
| (uct_pending_purge_callback_t) | ||
| ucs_empty_function_do_assert, | ||
| NULL); | ||
|
|
||
| /* discard the WIREUP EP's auxiliary EP */ | ||
| ucp_worker_discard_tl_uct_ep(worker, wireup_ep->aux_ep, | ||
| ep_flush_flags); | ||
| ucp_wireup_ep_disown(&wireup_ep->super.super, wireup_ep->aux_ep); | ||
| } | ||
|
|
||
| is_owner = wireup_ep->super.is_owner; | ||
| uct_ep = ucp_wireup_ep_extract_next_ep(&wireup_ep->super.super); | ||
|
|
||
| /* destroy WIREUP EP allocated for this UCT EP, since discard operation | ||
| * most likely won't have an access to UCP EP as it could be destroyed | ||
| * by the caller */ | ||
| uct_ep_destroy(&wireup_ep->super.super); | ||
|
|
||
| /* do nothing, if this wireup EP is not an owner for UCT EP */ | ||
| return is_owner ? uct_ep : NULL; | ||
| } | ||
|
|
||
| /* must be called with async lock held */ | ||
| void ucp_worker_discard_uct_ep(ucp_worker_h worker, uct_ep_h uct_ep, | ||
| unsigned ep_flush_flags, | ||
| uct_pending_purge_callback_t purge_cb, | ||
| void *purge_arg) | ||
| { | ||
| ucs_assert(uct_ep != NULL); | ||
| ucs_assert(purge_cb != NULL); | ||
|
|
||
| uct_ep_pending_purge(uct_ep, purge_cb, purge_arg); | ||
|
|
||
| if (ucp_wireup_ep_test(uct_ep)) { | ||
| uct_ep = ucp_worker_discard_wireup_ep(worker, ucp_wireup_ep(uct_ep), | ||
| ep_flush_flags, | ||
| purge_cb, purge_arg); | ||
|
Comment on lines
+2579
to
+2581
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems weird recursion: ucp_worker_discard_uct_ep->ucp_worker_discard_wireup_ep->ucp_worker_discard_uct_ep maybe separate the 2nd part of this function to a helper:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but what the problem with this recursion?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. imo would be better /wo recursion, since we know next_ep cannot be wireup_ep
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| if (uct_ep == NULL) { | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| ucp_worker_discard_tl_uct_ep(worker, uct_ep, ep_flush_flags); | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.