diff --git a/examples/analytical_apps/pagerank/pagerank_directed.h b/examples/analytical_apps/pagerank/pagerank_directed.h index 3abc2a78..9815af8b 100644 --- a/examples/analytical_apps/pagerank/pagerank_directed.h +++ b/examples/analytical_apps/pagerank/pagerank_directed.h @@ -112,6 +112,9 @@ class PageRankDirected for (fid_t i = 2; i < frag.fnum(); ++i) { fid_t src_fid = messages.UpdatePartialOuterVertices(); + if (src_fid == kInvalidFid) { + continue; + } ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) { double cur = ctx.next_result[u]; auto es = frag.GetIncomingAdjList(u, src_fid); @@ -123,22 +126,24 @@ class PageRankDirected } fid_t src_fid = messages.UpdatePartialOuterVertices(); - ForEach(inner_vertices, [src_fid, &frag, &ctx, base, &dangling_sums]( - int tid, vertex_t u) { - double cur = ctx.next_result[u]; - auto es = frag.GetIncomingAdjList(u, src_fid); - for (auto& e : es) { - cur += ctx.result[e.get_neighbor()]; - } - int en = frag.GetLocalOutDegree(u); - cur = ctx.delta * cur + base; - if (en == 0) { - dangling_sums[tid] += cur; - ctx.next_result[u] = cur; - } else { - ctx.next_result[u] = cur / en; - } - }); + if (src_fid != kInvalidFid) { + ForEach(inner_vertices, [src_fid, &frag, &ctx, base, &dangling_sums]( + int tid, vertex_t u) { + double cur = ctx.next_result[u]; + auto es = frag.GetIncomingAdjList(u, src_fid); + for (auto& e : es) { + cur += ctx.result[e.get_neighbor()]; + } + int en = frag.GetLocalOutDegree(u); + cur = ctx.delta * cur + base; + if (en == 0) { + dangling_sums[tid] += cur; + ctx.next_result[u] = cur; + } else { + ctx.next_result[u] = cur / en; + } + }); + } } else { // If the fragment is sparse or there is only one fragment, one round of // iterating inner vertices is prefered. diff --git a/examples/analytical_apps/pagerank/pagerank_local.h b/examples/analytical_apps/pagerank/pagerank_local.h index 3838a4a4..29675a59 100644 --- a/examples/analytical_apps/pagerank/pagerank_local.h +++ b/examples/analytical_apps/pagerank/pagerank_local.h @@ -112,14 +112,16 @@ class PageRankLocal ctx.preprocess_time += GetCurrentTime(); ctx.exec_time -= GetCurrentTime(); #endif - ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) { - double cur = ctx.next_result[u]; - auto es = frag.GetOutgoingAdjList(u, src_fid); - for (auto& e : es) { - cur += ctx.result[e.get_neighbor()]; - } - ctx.next_result[u] = cur; - }); + if (src_fid != kInvalidFid) { + ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) { + double cur = ctx.next_result[u]; + auto es = frag.GetOutgoingAdjList(u, src_fid); + for (auto& e : es) { + cur += ctx.result[e.get_neighbor()]; + } + ctx.next_result[u] = cur; + }); + } #ifdef PROFILING ctx.exec_time += GetCurrentTime(); #endif @@ -133,27 +135,31 @@ class PageRankLocal ctx.preprocess_time += GetCurrentTime(); ctx.exec_time -= GetCurrentTime(); #endif - if (last_step) { - ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) { - double cur = ctx.next_result[u]; - auto es = frag.GetOutgoingAdjList(u, src_fid); - for (auto& e : es) { - cur += ctx.result[e.get_neighbor()]; - } - ctx.next_result[u] = 1 - ctx.delta + ctx.delta * cur; - }); - } else { - ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) { - double cur = ctx.next_result[u]; - auto es = frag.GetOutgoingAdjList(u, src_fid); - for (auto& e : es) { - cur += ctx.result[e.get_neighbor()]; - } - ctx.next_result[u] = 1 - ctx.delta + ctx.delta * cur; - int en = frag.GetLocalOutDegree(u); - ctx.next_result[u] = - en > 0 ? ctx.next_result[u] / en : ctx.next_result[u]; - }); + if (src_fid != kInvalidFid) { + if (last_step) { + ForEach(inner_vertices, + [src_fid, &frag, &ctx](int tid, vertex_t u) { + double cur = ctx.next_result[u]; + auto es = frag.GetOutgoingAdjList(u, src_fid); + for (auto& e : es) { + cur += ctx.result[e.get_neighbor()]; + } + ctx.next_result[u] = 1 - ctx.delta + ctx.delta * cur; + }); + } else { + ForEach(inner_vertices, + [src_fid, &frag, &ctx](int tid, vertex_t u) { + double cur = ctx.next_result[u]; + auto es = frag.GetOutgoingAdjList(u, src_fid); + for (auto& e : es) { + cur += ctx.result[e.get_neighbor()]; + } + ctx.next_result[u] = 1 - ctx.delta + ctx.delta * cur; + int en = frag.GetLocalOutDegree(u); + ctx.next_result[u] = + en > 0 ? ctx.next_result[u] / en : ctx.next_result[u]; + }); + } } #ifdef PROFILING ctx.exec_time += GetCurrentTime(); @@ -183,14 +189,16 @@ class PageRankLocal ctx.preprocess_time += GetCurrentTime(); ctx.exec_time -= GetCurrentTime(); #endif - ForEach(frag.OuterVertices(src_fid), - [&frag, &ctx](int tid, vertex_t u) { - double cur = ctx.result[u] * ctx.delta; - auto es = frag.GetIncomingAdjList(u); - for (auto& e : es) { - atomic_add(ctx.next_result[e.get_neighbor()], cur); - } - }); + if (src_fid != kInvalidFid) { + ForEach(frag.OuterVertices(src_fid), + [&frag, &ctx](int tid, vertex_t u) { + double cur = ctx.result[u] * ctx.delta; + auto es = frag.GetIncomingAdjList(u); + for (auto& e : es) { + atomic_add(ctx.next_result[e.get_neighbor()], cur); + } + }); + } #ifdef PROFILING ctx.exec_time += GetCurrentTime(); #endif diff --git a/examples/analytical_apps/pagerank/pagerank_opt.h b/examples/analytical_apps/pagerank/pagerank_opt.h index 2857c21a..9920adeb 100644 --- a/examples/analytical_apps/pagerank/pagerank_opt.h +++ b/examples/analytical_apps/pagerank/pagerank_opt.h @@ -134,14 +134,16 @@ class PageRankOpt : public BatchShuffleAppBase>, ctx.preprocess_time += GetCurrentTime(); ctx.exec_time -= GetCurrentTime(); #endif - ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) { - double cur = ctx.next_result[u]; - auto es = frag.GetOutgoingAdjList(u, src_fid); - for (auto& e : es) { - cur += ctx.result[e.get_neighbor()]; - } - ctx.next_result[u] = cur; - }); + if (src_fid != kInvalidFid) { + ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) { + double cur = ctx.next_result[u]; + auto es = frag.GetOutgoingAdjList(u, src_fid); + for (auto& e : es) { + cur += ctx.result[e.get_neighbor()]; + } + ctx.next_result[u] = cur; + }); + } #ifdef PROFILING ctx.exec_time += GetCurrentTime(); #endif @@ -155,31 +157,33 @@ class PageRankOpt : public BatchShuffleAppBase>, ctx.preprocess_time += GetCurrentTime(); ctx.exec_time -= GetCurrentTime(); #endif - if (ctx.step != ctx.max_round) { - ForEach(inner_vertices, - [src_fid, &frag, &ctx, base](int tid, vertex_t u) { - double cur = ctx.next_result[u]; - auto es = frag.GetOutgoingAdjList(u, src_fid); - for (auto& e : es) { - cur += ctx.result[e.get_neighbor()]; - } - int en = frag.GetLocalOutDegree(u); - ctx.result[u] = en > 0 ? (ctx.delta * cur + base) / en : base; - }); - - messages.SyncInnerVertices(frag, ctx.result, - thread_num()); - } else { - ForEach(inner_vertices, - [src_fid, &frag, &ctx, base](int tid, vertex_t u) { - double cur = ctx.next_result[u]; - auto es = frag.GetOutgoingAdjList(u, src_fid); - for (auto& e : es) { - cur += ctx.result[e.get_neighbor()]; - } - int en = frag.GetLocalOutDegree(u); - ctx.result[u] = en > 0 ? (ctx.delta * cur + base) : base; - }); + if (src_fid != kInvalidFid) { + if (ctx.step != ctx.max_round) { + ForEach(inner_vertices, [src_fid, &frag, &ctx, base](int tid, + vertex_t u) { + double cur = ctx.next_result[u]; + auto es = frag.GetOutgoingAdjList(u, src_fid); + for (auto& e : es) { + cur += ctx.result[e.get_neighbor()]; + } + int en = frag.GetLocalOutDegree(u); + ctx.result[u] = en > 0 ? (ctx.delta * cur + base) / en : base; + }); + + messages.SyncInnerVertices(frag, ctx.result, + thread_num()); + } else { + ForEach(inner_vertices, + [src_fid, &frag, &ctx, base](int tid, vertex_t u) { + double cur = ctx.next_result[u]; + auto es = frag.GetOutgoingAdjList(u, src_fid); + for (auto& e : es) { + cur += ctx.result[e.get_neighbor()]; + } + int en = frag.GetLocalOutDegree(u); + ctx.result[u] = en > 0 ? (ctx.delta * cur + base) : base; + }); + } } #ifdef PROFILING ctx.exec_time += GetCurrentTime(); diff --git a/grape/config.h b/grape/config.h index aa33dbcb..7f60cfe8 100644 --- a/grape/config.h +++ b/grape/config.h @@ -38,6 +38,7 @@ namespace gflags = google; // type alias using fid_t = unsigned; +const fid_t kInvalidFid = std::numeric_limits::max(); #ifdef USE_HUGEPAGES template diff --git a/grape/parallel/batch_shuffle_message_manager.h b/grape/parallel/batch_shuffle_message_manager.h index b4eedbaf..3bfafec6 100644 --- a/grape/parallel/batch_shuffle_message_manager.h +++ b/grape/parallel/batch_shuffle_message_manager.h @@ -262,7 +262,9 @@ class BatchShuffleMessageManager : public MessageManagerBase { * is, messages from all other fragments are received. */ void UpdateOuterVertices() { - MPI_Waitall(recv_reqs_.size(), &recv_reqs_[0], MPI_STATUSES_IGNORE); + if (!recv_reqs_.empty()) { + MPI_Waitall(recv_reqs_.size(), &recv_reqs_[0], MPI_STATUSES_IGNORE); + } } /** @@ -272,6 +274,24 @@ class BatchShuffleMessageManager : public MessageManagerBase { * @return Source fragment id. */ fid_t UpdatePartialOuterVertices() { + if (recv_reqs_.empty()) { + return kInvalidFid; + } + + bool updateDone = true; + for (fid_t i = 0; i < fnum_; ++i) { + if (remaining_reqs_[i] > 0) { + updateDone = false; + break; + } + } + if (updateDone) { + remaining_frags_ = 0; + recv_reqs_.clear(); + recv_from_.clear(); + return kInvalidFid; + } + int index; fid_t ret; while (true) {