Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions examples/analytical_apps/pagerank/pagerank_directed.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ class PageRankDirected

for (fid_t i = 2; i < frag.fnum(); ++i) {
fid_t src_fid = messages.UpdatePartialOuterVertices();
if (src_fid == kInvalidFid) {
continue;
}
ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetIncomingAdjList(u, src_fid);
Expand All @@ -123,22 +126,24 @@ class PageRankDirected
}

fid_t src_fid = messages.UpdatePartialOuterVertices();
ForEach(inner_vertices, [src_fid, &frag, &ctx, base, &dangling_sums](
int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetIncomingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
int en = frag.GetLocalOutDegree(u);
cur = ctx.delta * cur + base;
if (en == 0) {
dangling_sums[tid] += cur;
ctx.next_result[u] = cur;
} else {
ctx.next_result[u] = cur / en;
}
});
if (src_fid != kInvalidFid) {
ForEach(inner_vertices, [src_fid, &frag, &ctx, base, &dangling_sums](
int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetIncomingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
int en = frag.GetLocalOutDegree(u);
cur = ctx.delta * cur + base;
if (en == 0) {
dangling_sums[tid] += cur;
ctx.next_result[u] = cur;
} else {
ctx.next_result[u] = cur / en;
}
});
}
} else {
// If the fragment is sparse or there is only one fragment, one round of
// iterating inner vertices is prefered.
Expand Down
82 changes: 45 additions & 37 deletions examples/analytical_apps/pagerank/pagerank_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,16 @@ class PageRankLocal
ctx.preprocess_time += GetCurrentTime();
ctx.exec_time -= GetCurrentTime();
#endif
ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
ctx.next_result[u] = cur;
});
if (src_fid != kInvalidFid) {
ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
ctx.next_result[u] = cur;
});
}
#ifdef PROFILING
ctx.exec_time += GetCurrentTime();
#endif
Expand All @@ -133,27 +135,31 @@ class PageRankLocal
ctx.preprocess_time += GetCurrentTime();
ctx.exec_time -= GetCurrentTime();
#endif
if (last_step) {
ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
ctx.next_result[u] = 1 - ctx.delta + ctx.delta * cur;
});
} else {
ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
ctx.next_result[u] = 1 - ctx.delta + ctx.delta * cur;
int en = frag.GetLocalOutDegree(u);
ctx.next_result[u] =
en > 0 ? ctx.next_result[u] / en : ctx.next_result[u];
});
if (src_fid != kInvalidFid) {
if (last_step) {
ForEach(inner_vertices,
[src_fid, &frag, &ctx](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
ctx.next_result[u] = 1 - ctx.delta + ctx.delta * cur;
});
} else {
ForEach(inner_vertices,
[src_fid, &frag, &ctx](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
ctx.next_result[u] = 1 - ctx.delta + ctx.delta * cur;
int en = frag.GetLocalOutDegree(u);
ctx.next_result[u] =
en > 0 ? ctx.next_result[u] / en : ctx.next_result[u];
});
}
}
#ifdef PROFILING
ctx.exec_time += GetCurrentTime();
Expand Down Expand Up @@ -183,14 +189,16 @@ class PageRankLocal
ctx.preprocess_time += GetCurrentTime();
ctx.exec_time -= GetCurrentTime();
#endif
ForEach(frag.OuterVertices(src_fid),
[&frag, &ctx](int tid, vertex_t u) {
double cur = ctx.result[u] * ctx.delta;
auto es = frag.GetIncomingAdjList(u);
for (auto& e : es) {
atomic_add(ctx.next_result[e.get_neighbor()], cur);
}
});
if (src_fid != kInvalidFid) {
ForEach(frag.OuterVertices(src_fid),
[&frag, &ctx](int tid, vertex_t u) {
double cur = ctx.result[u] * ctx.delta;
auto es = frag.GetIncomingAdjList(u);
for (auto& e : es) {
atomic_add(ctx.next_result[e.get_neighbor()], cur);
}
});
}
#ifdef PROFILING
ctx.exec_time += GetCurrentTime();
#endif
Expand Down
70 changes: 37 additions & 33 deletions examples/analytical_apps/pagerank/pagerank_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,16 @@ class PageRankOpt : public BatchShuffleAppBase<FRAG_T, PageRankContext<FRAG_T>>,
ctx.preprocess_time += GetCurrentTime();
ctx.exec_time -= GetCurrentTime();
#endif
ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
ctx.next_result[u] = cur;
});
if (src_fid != kInvalidFid) {
ForEach(inner_vertices, [src_fid, &frag, &ctx](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
ctx.next_result[u] = cur;
});
}
#ifdef PROFILING
ctx.exec_time += GetCurrentTime();
#endif
Expand All @@ -155,31 +157,33 @@ class PageRankOpt : public BatchShuffleAppBase<FRAG_T, PageRankContext<FRAG_T>>,
ctx.preprocess_time += GetCurrentTime();
ctx.exec_time -= GetCurrentTime();
#endif
if (ctx.step != ctx.max_round) {
ForEach(inner_vertices,
[src_fid, &frag, &ctx, base](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
int en = frag.GetLocalOutDegree(u);
ctx.result[u] = en > 0 ? (ctx.delta * cur + base) / en : base;
});

messages.SyncInnerVertices<fragment_t, double>(frag, ctx.result,
thread_num());
} else {
ForEach(inner_vertices,
[src_fid, &frag, &ctx, base](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
int en = frag.GetLocalOutDegree(u);
ctx.result[u] = en > 0 ? (ctx.delta * cur + base) : base;
});
if (src_fid != kInvalidFid) {
if (ctx.step != ctx.max_round) {
ForEach(inner_vertices, [src_fid, &frag, &ctx, base](int tid,
vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
int en = frag.GetLocalOutDegree(u);
ctx.result[u] = en > 0 ? (ctx.delta * cur + base) / en : base;
});

messages.SyncInnerVertices<fragment_t, double>(frag, ctx.result,
thread_num());
} else {
ForEach(inner_vertices,
[src_fid, &frag, &ctx, base](int tid, vertex_t u) {
double cur = ctx.next_result[u];
auto es = frag.GetOutgoingAdjList(u, src_fid);
for (auto& e : es) {
cur += ctx.result[e.get_neighbor()];
}
int en = frag.GetLocalOutDegree(u);
ctx.result[u] = en > 0 ? (ctx.delta * cur + base) : base;
});
}
}
#ifdef PROFILING
ctx.exec_time += GetCurrentTime();
Expand Down
1 change: 1 addition & 0 deletions grape/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace gflags = google;

// type alias
using fid_t = unsigned;
const fid_t kInvalidFid = std::numeric_limits<fid_t>::max();

#ifdef USE_HUGEPAGES
template <typename T>
Expand Down
22 changes: 21 additions & 1 deletion grape/parallel/batch_shuffle_message_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ class BatchShuffleMessageManager : public MessageManagerBase {
* is, messages from all other fragments are received.
*/
void UpdateOuterVertices() {
MPI_Waitall(recv_reqs_.size(), &recv_reqs_[0], MPI_STATUSES_IGNORE);
if (!recv_reqs_.empty()) {
MPI_Waitall(recv_reqs_.size(), &recv_reqs_[0], MPI_STATUSES_IGNORE);
}
}

/**
Expand All @@ -272,6 +274,24 @@ class BatchShuffleMessageManager : public MessageManagerBase {
* @return Source fragment id.
*/
fid_t UpdatePartialOuterVertices() {
if (recv_reqs_.empty()) {
return kInvalidFid;
}

bool updateDone = true;
for (fid_t i = 0; i < fnum_; ++i) {
if (remaining_reqs_[i] > 0) {
updateDone = false;
break;
}
}
if (updateDone) {
remaining_frags_ = 0;
recv_reqs_.clear();
recv_from_.clear();
return kInvalidFid;
}

int index;
fid_t ret;
while (true) {
Expand Down
Loading