UCP/CORE: Implement flush+destroy for UCT EPs on UCP Worker#5608
UCP/CORE: Implement flush+destroy for UCT EPs on UCP Worker#5608yosefe merged 34 commits intoopenucx:masterfrom
Conversation
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
1f909f2 to
6c0082f
Compare
|
@yosefe could you review pls? |
yosefe
left a comment
There was a problem hiding this comment.
is it possible to add gtest?
added the tests |
7f94236 to
f7b0e3c
Compare
|
bot:pipe:retest |
|
@evgeny-leksikov could you review pls? |
|
bot:pipe:retest |
src/ucp/rma/flush.c
Outdated
| /* Error returned from uct iface flush */ | ||
| ucp_worker_flush_complete_one(req, status, 1); | ||
| } else if (worker->context->config.ext.flush_worker_eps) { | ||
| if (ucp_worker_flush_ops_count_check(worker)) { |
There was a problem hiding this comment.
why not leaving this check inside ucp_worker_flush_check? seems unnecessary
also checking !worker->flush_ops_count looks more clear than ucp_worker_flush_ops_count_check name
src/ucp/wireup/wireup_ep.c
Outdated
| ucp_wireup_ep_t *wireup_ep; /* WIREUP EP that owns the UCT | ||
| * pending request for the WIREUP | ||
| * MSG */ | ||
| uct_pending_purge_callback_t wireup_msg_cb; /* UCT pending purge cakkback that |
src/ucp/wireup/wireup_ep.c
Outdated
| ucp_wireup_ep_pending_purge_arg_t *purge_arg = | ||
| (ucp_wireup_ep_pending_purge_arg_t*)arg; | ||
| ucp_wireup_ep_t *wireup_ep = purge_arg->wireup_ep; | ||
| uct_ep_h uct_ep = |
There was a problem hiding this comment.
minor: imo, using single line without alignment by = would be more readable in this case
src/ucp/wireup/wireup_ep.c
Outdated
| /* do purging on AUX EP or on UCT EP if WIREUP is an owner of it */ | ||
| if ((uct_ep == wireup_ep->aux_ep) || wireup_ep->super.is_owner) { | ||
| /* need to NULL the WIREUP EP in the WIREUP MSG proxy pending request | ||
| * to avoid dereferencing it when progressing prnding requests, since |
test/gtest/ucp/test_ucp_worker.cc
Outdated
| pending_reqs[base + i] = req; | ||
|
|
||
| if (func == ucp_wireup_msg_progress) { | ||
|
|
|
bot:pipe:retest |
|
@evgeny-leksikov @brminich could you review pls? |
test/gtest/ucp/test_ucp_worker.cc
Outdated
| m_destroyed_ep_count++; | ||
| } | ||
|
|
||
| static ep_test_info_t* ep_test_info_get(uct_ep_h ep) { |
There was a problem hiding this comment.
return type can be reference ep_test_info_t &
test/gtest/ucp/test_ucp_worker.cc
Outdated
| ep_test_info_map_t::iterator it = m_ep_test_info_map.find(ep); | ||
|
|
||
| if (it == m_ep_test_info_map.end()) { | ||
| ep_test_info_t test_info = {}; |
There was a problem hiding this comment.
since it's c++ struct, better to implement a constructor
test/gtest/ucp/test_ucp_worker.cc
Outdated
|
|
||
| std::vector<uct_pending_req_t*> *req_vec = &it->second; | ||
| for (unsigned i = 0; i < m_pending_purge_reqs_count; i++) { | ||
| std::vector<uct_pending_req_t*> *req_vec = &test_info->pending_reqs; |
There was a problem hiding this comment.
std::vector<uct_pending_req_t*> &req_vec = ...
src/ucp/core/ucp_worker.c
Outdated
| /* need to remove from the pending queue */ | ||
| status = UCS_OK; | ||
| } else { | ||
| ucs_assert(status != UCS_ERR_BUSY); |
There was a problem hiding this comment.
this assert looks wrong since status value is returned by flush here
There was a problem hiding this comment.
this is not wrong, just to make sure that we will not enter the endless loop if uct_ep_flush() returns UCS_ERR_BUSY
added the comment
There was a problem hiding this comment.
but uct_ep_flush doc does not say that it cannot return UCS_ERR_BUSY
There was a problem hiding this comment.
but uct_ep_flush doc does not say that it cannot return
UCS_ERR_BUSY
yes, will handle it
@yosefe what do you think to use progress_register() instead of this loop?
seems to be error-prone or overcomplicated if it will have several retrun/break/continue in the loop
src/ucp/core/ucp_worker.c
Outdated
| } else if (status == UCS_INPROGRESS) { | ||
| /* need to remove from the pending queue */ | ||
| status = UCS_OK; | ||
| } else { |
There was a problem hiding this comment.
UCS_OK is not handled?
There was a problem hiding this comment.
it is handled in the else section
src/ucp/core/ucp_worker.c
Outdated
| } | ||
|
|
||
| ++worker->flush_ops_count; | ||
| iter = kh_put(ucp_worker_discard_uct_ep_hash, |
There was a problem hiding this comment.
minor: maybe just kh_put(ucp_worker_discard_uct_ep_hash,...? Then no need in unused iter var
| ucp_worker_iface_t *wiface; | ||
| ucs_status_t status; | ||
|
|
||
| if (worker->flush_ops_count) { |
There was a problem hiding this comment.
why is removed here and moved outside?
There was a problem hiding this comment.
we want to check for flush_ops count separately from uct_iface_flush() that we check in this function
There was a problem hiding this comment.
why, seems can just leave it returning UCS_INPROGRESS?
There was a problem hiding this comment.
why, seems can just leave it returning UCS_INPROGRESS?
added a comment to make it clear
if it returns UCS_INPROGESS, but going over all EPs were done, we complete with UCS_OK, but we shouldn't
src/ucp/wireup/wireup_ep.c
Outdated
| { | ||
| ucp_request_t *proxy_req = ucs_container_of(self, ucp_request_t, send.uct); | ||
| uct_pending_req_t *req = proxy_req->send.proxy.req; | ||
| ucp_request_t *proxy_req = ucs_container_of(self, ucp_request_t, send.uct); |
There was a problem hiding this comment.
single unrelated change in the file
src/ucp/core/ucp_worker.c
Outdated
| /* need to remove from the pending queue */ | ||
| status = UCS_OK; | ||
| } else { | ||
| /* make sure that uct_ep_flush() doenst return UCS_ERR_BUSY to not |
|
bot:pipe:retest |
test/gtest/ucp/test_ucp_worker.cc
Outdated
| unsigned pending_add_count; | ||
|
|
||
| ep_test_info_t() { | ||
| pending_reqs.clear(); |
There was a problem hiding this comment.
not needed, std::vector has its own constructor.
test/gtest/ucp/test_ucp_worker.cc
Outdated
|
|
||
| ep_test_info_t() { | ||
| pending_reqs.clear(); | ||
| flush_count = 0; |
There was a problem hiding this comment.
minor: better to use initializer list
ep_test_info_t() : flush_count(0), pending_add_count(0) {
}
test/gtest/ucp/test_ucp_worker.cc
Outdated
| ep_test_info_t &test_info = ep_test_info_get(ep); | ||
| test_info.flush_count++; | ||
| return test_info.flush_count; |
There was a problem hiding this comment.
minor: can be shorter
return ++ep_test_info_get(ep).flush_count;
| ucp_request_t *proxy_req = ucs_container_of(self, ucp_request_t, send.uct); | ||
| uct_pending_req_t *req = proxy_req->send.proxy.req; |
There was a problem hiding this comment.
it is not my code, I returned it back in the same state as it was
|
@evgeny-leksikov is it ok now? |
|
@yosefe could you review pls? |
|
@yosefe could you review pls? |
src/ucp/core/ucp_worker.c
Outdated
| ucp_worker_discard_uct_ep_flush_comp(&req->send.state.uct_comp, | ||
| status); | ||
| } | ||
| return status; |
There was a problem hiding this comment.
if status != NO_RESOURCE should also return UCS_OK
There was a problem hiding this comment.
if flush returned ok, who will call the completion?
There was a problem hiding this comment.
if status != NO_RESOURCE should also return UCS_OK
why? we should put to the pending and wait for the callback invocation
if flush returned ok, who will call the completion?
it will be done in ucp_worker_discard_uct_ep_flush_comp()
There was a problem hiding this comment.
- if the status is error which is not NO_RESOURCE, the flush failed, we cannot retry
- ok
There was a problem hiding this comment.
- if the status is error which is not NO_RESOURCE, the flush failed, we cannot retry
ah, I see. fixed
| /* Error returned from uct iface flush */ | ||
| ucp_worker_flush_complete_one(req, status, 1); | ||
| } else if (worker->context->config.ext.flush_worker_eps) { | ||
| if (worker->flush_ops_count == 0) { |
There was a problem hiding this comment.
BTW, do we require iface_flush completion today before destroying UCP worker (AFAIK not)?
since it can leak some endpoints which are still being flushed
There was a problem hiding this comment.
BTW, do we require iface_flush completion today before destroying UCP worker (AFAIK not)?
we don't require, but now we require flushing all ops to be done and or iface_flush is completed or going over all EPs is done
There was a problem hiding this comment.
need to check for leaks in worker destroy
| uct_ep = ucp_worker_discard_wireup_ep(worker, ucp_wireup_ep(uct_ep), | ||
| ep_flush_flags, | ||
| purge_cb, purge_arg); |
There was a problem hiding this comment.
seems weird recursion: ucp_worker_discard_uct_ep->ucp_worker_discard_wireup_ep->ucp_worker_discard_uct_ep
maybe separate the 2nd part of this function to a helper:
- discard_uct_ep()
if wireup:
discard_wireup_ep()
if next_ep_owner:
discard_uct_ep_helper(next_ep) [works only on regular uct_ep, not wireup]
else:
discard_uct_ep_helper(uct_ep)
There was a problem hiding this comment.
but what the problem with this recursion?
ucp_worker_discard_uct_ep->ucp_worker_discard_wireup_ep for WIREUP EP
and ->ucp_worker_discard_uct_ep for AUX EP
There was a problem hiding this comment.
imo would be better /wo recursion, since we know next_ep cannot be wireup_ep
|
@yosefe could you review pls? |
|
Ported to yosefe#223 |
What
Implement flush+destroy for UCT EPs on UCP Worker.
Why ?
To be able to do
flush(CANCEL/LOCAL)and then destroy UCT EP when UCP EP could be destroyed.So, UCT EP will be destroyed on UCP Worker.
It fixes possible undone outstanding operation on some transports (e.g. there was an issue on RC).
How ?
Implement
ucp_worker_discard_uct_ep()that's called instead ofuct_ep_destroy()