Skip to content

Commit

Permalink
add fix for batch size anomalies
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHastings committed Oct 31, 2024
1 parent 969ec7d commit 1e5b5ce
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 165 deletions.
339 changes: 174 additions & 165 deletions cpp/src/link_prediction/similarity_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -368,187 +368,196 @@ all_pairs_similarity(raft::handle_t const& handle,
sum_two_hop_degrees,
MAX_PAIRS_PER_BATCH);

for (size_t batch_number = 0; batch_number < (batch_offsets.size() - 1); ++batch_number) {
if (batch_offsets[batch_number + 1] > batch_offsets[batch_number]) {
auto [offsets, v2] =
k_hop_nbrs(handle,
graph_view,
raft::device_span<vertex_t const>{
tmp_vertices.data() + batch_offsets[batch_number],
batch_offsets[batch_number + 1] - batch_offsets[batch_number]},
2,
do_expensive_check);

auto v1 = cugraph::detail::expand_sparse_offsets(
raft::device_span<size_t const>{offsets.data(), offsets.size()},
vertex_t{0},
handle.get_stream());
// FIXME: compute_offset_aligned_element_chunks can return duplicates. Should it? Should
// explore
// whether this functionality should be pushed into that function
batch_offsets.resize(std::distance(batch_offsets.begin(),
std::unique(batch_offsets.begin(), batch_offsets.end())));

cugraph::unrenumber_local_int_vertices(
handle,
v1.data(),
v1.size(),
size_t num_batches = batch_offsets.size() - 1;
if constexpr (multi_gpu) {
num_batches = cugraph::host_scalar_allreduce(
handle.get_comms(), num_batches, raft::comms::op_t::MAX, handle.get_stream());
}

for (size_t batch_number = 0; batch_number < num_batches; ++batch_number) {
raft::device_span<vertex_t const> batch_seeds{tmp_vertices.data(), size_t{0}};

if (((batch_number + 1) < batch_offsets.size()) &&
(batch_offsets[batch_number + 1] > batch_offsets[batch_number])) {
batch_seeds = raft::device_span<vertex_t const>{
tmp_vertices.data() + batch_offsets[batch_number],
vertex_t{0},
static_cast<vertex_t>(batch_offsets[batch_number + 1] - batch_offsets[batch_number]),
do_expensive_check);
batch_offsets[batch_number + 1] - batch_offsets[batch_number]};
}

auto [offsets, v2] = k_hop_nbrs(handle, graph_view, batch_seeds, 2, do_expensive_check);

auto new_size = thrust::distance(
auto v1 = cugraph::detail::expand_sparse_offsets(
raft::device_span<size_t const>{offsets.data(), offsets.size()},
vertex_t{0},
handle.get_stream());

cugraph::unrenumber_local_int_vertices(
handle,
v1.data(),
v1.size(),
tmp_vertices.data() + batch_offsets[batch_number],
vertex_t{0},
static_cast<vertex_t>(batch_offsets[batch_number + 1] - batch_offsets[batch_number]),
do_expensive_check);

auto new_size = thrust::distance(
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::remove_if(
handle.get_thrust_policy(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::remove_if(
handle.get_thrust_policy(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::make_zip_iterator(v1.end(), v2.end()),
[] __device__(auto tuple) { return thrust::get<0>(tuple) == thrust::get<1>(tuple); }));
thrust::make_zip_iterator(v1.end(), v2.end()),
[] __device__(auto tuple) { return thrust::get<0>(tuple) == thrust::get<1>(tuple); }));

v1.resize(new_size, handle.get_stream());
v2.resize(new_size, handle.get_stream());
v1.resize(new_size, handle.get_stream());
v2.resize(new_size, handle.get_stream());

if constexpr (multi_gpu) {
// shuffle vertex pairs
auto vertex_partition_range_lasts = graph_view.vertex_partition_range_lasts();

std::tie(v1, v2, std::ignore, std::ignore, std::ignore, std::ignore) =
detail::shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning<vertex_t,
edge_t,
weight_t,
int>(
handle,
std::move(v1),
std::move(v2),
std::nullopt,
std::nullopt,
std::nullopt,
vertex_partition_range_lasts);
}
if constexpr (multi_gpu) {
// shuffle vertex pairs
auto vertex_partition_range_lasts = graph_view.vertex_partition_range_lasts();

std::tie(v1, v2, std::ignore, std::ignore, std::ignore, std::ignore) =
detail::shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning<vertex_t,
edge_t,
weight_t,
int>(
handle,
std::move(v1),
std::move(v2),
std::nullopt,
std::nullopt,
std::nullopt,
vertex_partition_range_lasts);
}

auto score =
similarity(handle,
graph_view,
edge_weight_view,
std::make_tuple(raft::device_span<vertex_t const>{v1.data(), v1.size()},
raft::device_span<vertex_t const>{v2.data(), v2.size()}),
functor,
coeff,
do_expensive_check);

// Add a remove_if to remove items that are less than the last topk element
new_size = thrust::distance(
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()),
thrust::remove_if(handle.get_thrust_policy(),
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()),
thrust::make_zip_iterator(score.end(), v1.end(), v2.end()),
[similarity_threshold] __device__(auto tuple) {
return thrust::get<0>(tuple) < similarity_threshold;
}));

score.resize(new_size, handle.get_stream());
v1.resize(new_size, handle.get_stream());
v2.resize(new_size, handle.get_stream());

thrust::sort_by_key(handle.get_thrust_policy(),
score.begin(),
score.end(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::greater<weight_t>{});

size_t v1_keep = std::min(*topk, v1.size());

if (score.size() < (top_v1.size() + v1_keep)) {
score.resize(top_v1.size() + v1_keep, handle.get_stream());
v1.resize(score.size(), handle.get_stream());
v2.resize(score.size(), handle.get_stream());
}
auto score =
similarity(handle,
graph_view,
edge_weight_view,
std::make_tuple(raft::device_span<vertex_t const>{v1.data(), v1.size()},
raft::device_span<vertex_t const>{v2.data(), v2.size()}),
functor,
coeff,
do_expensive_check);

// Add a remove_if to remove items that are less than the last topk element
new_size = thrust::distance(
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()),
thrust::remove_if(handle.get_thrust_policy(),
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()),
thrust::make_zip_iterator(score.end(), v1.end(), v2.end()),
[similarity_threshold] __device__(auto tuple) {
return thrust::get<0>(tuple) < similarity_threshold;
}));

score.resize(new_size, handle.get_stream());
v1.resize(new_size, handle.get_stream());
v2.resize(new_size, handle.get_stream());

thrust::sort_by_key(handle.get_thrust_policy(),
score.begin(),
score.end(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::greater<weight_t>{});

size_t v1_keep = std::min(*topk, v1.size());

if (score.size() < (top_v1.size() + v1_keep)) {
score.resize(top_v1.size() + v1_keep, handle.get_stream());
v1.resize(score.size(), handle.get_stream());
v2.resize(score.size(), handle.get_stream());
}

thrust::copy(handle.get_thrust_policy(), top_v1.begin(), top_v1.end(), v1.begin() + v1_keep);
thrust::copy(handle.get_thrust_policy(), top_v2.begin(), top_v2.end(), v2.begin() + v1_keep);
thrust::copy(
handle.get_thrust_policy(), top_score.begin(), top_score.end(), score.begin() + v1_keep);

thrust::sort_by_key(handle.get_thrust_policy(),
score.begin(),
score.end(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::greater<weight_t>{});

if (top_v1.size() < std::min(*topk, v1.size())) {
top_v1.resize(std::min(*topk, v1.size()), handle.get_stream());
top_v2.resize(top_v1.size(), handle.get_stream());
top_score.resize(top_v1.size(), handle.get_stream());
}

thrust::copy(
handle.get_thrust_policy(), v1.begin(), v1.begin() + top_v1.size(), top_v1.begin());
thrust::copy(
handle.get_thrust_policy(), v2.begin(), v2.begin() + top_v1.size(), top_v2.begin());
thrust::copy(handle.get_thrust_policy(),
score.begin(),
score.begin() + top_v1.size(),
top_score.begin());

thrust::copy(
handle.get_thrust_policy(), top_v1.begin(), top_v1.end(), v1.begin() + v1_keep);
thrust::copy(
handle.get_thrust_policy(), top_v2.begin(), top_v2.end(), v2.begin() + v1_keep);
thrust::copy(
handle.get_thrust_policy(), top_score.begin(), top_score.end(), score.begin() + v1_keep);

thrust::sort_by_key(handle.get_thrust_policy(),
score.begin(),
score.end(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::greater<weight_t>{});

if (top_v1.size() < std::min(*topk, v1.size())) {
top_v1.resize(std::min(*topk, v1.size()), handle.get_stream());
top_v2.resize(top_v1.size(), handle.get_stream());
top_score.resize(top_v1.size(), handle.get_stream());
if constexpr (multi_gpu) {
bool is_root = handle.get_comms().get_rank() == int{0};
auto rx_sizes = cugraph::host_scalar_gather(
handle.get_comms(), top_v1.size(), int{0}, handle.get_stream());
std::vector<size_t> rx_displs;
size_t gathered_size{0};

if (is_root) {
rx_displs.resize(handle.get_comms().get_size());
rx_displs[0] = 0;
std::partial_sum(rx_sizes.begin(), rx_sizes.end() - 1, rx_displs.begin() + 1);
gathered_size = std::reduce(rx_sizes.begin(), rx_sizes.end());
}

thrust::copy(
handle.get_thrust_policy(), v1.begin(), v1.begin() + top_v1.size(), top_v1.begin());
thrust::copy(
handle.get_thrust_policy(), v2.begin(), v2.begin() + top_v1.size(), top_v2.begin());
thrust::copy(handle.get_thrust_policy(),
score.begin(),
score.begin() + top_v1.size(),
top_score.begin());
rmm::device_uvector<vertex_t> gathered_v1(gathered_size, handle.get_stream());
rmm::device_uvector<vertex_t> gathered_v2(gathered_size, handle.get_stream());
rmm::device_uvector<weight_t> gathered_score(gathered_size, handle.get_stream());

cugraph::device_gatherv(
handle.get_comms(),
thrust::make_zip_iterator(top_v1.begin(), top_v2.begin(), top_score.begin()),
thrust::make_zip_iterator(
gathered_v1.begin(), gathered_v2.begin(), gathered_score.begin()),
top_v1.size(),
rx_sizes,
rx_displs,
int{0},
handle.get_stream());

if constexpr (multi_gpu) {
bool is_root = handle.get_comms().get_rank() == int{0};
auto rx_sizes = cugraph::host_scalar_gather(
handle.get_comms(), top_v1.size(), int{0}, handle.get_stream());
std::vector<size_t> rx_displs;
size_t gathered_size{0};

if (is_root) {
rx_displs.resize(handle.get_comms().get_size());
rx_displs[0] = 0;
std::partial_sum(rx_sizes.begin(), rx_sizes.end() - 1, rx_displs.begin() + 1);
gathered_size = std::reduce(rx_sizes.begin(), rx_sizes.end());
if (is_root) {
thrust::sort_by_key(handle.get_thrust_policy(),
gathered_score.begin(),
gathered_score.end(),
thrust::make_zip_iterator(gathered_v1.begin(), gathered_v2.begin()),
thrust::greater<weight_t>{});

if (gathered_v1.size() > *topk) {
gathered_v1.resize(*topk, handle.get_stream());
gathered_v2.resize(*topk, handle.get_stream());
gathered_score.resize(*topk, handle.get_stream());
}

rmm::device_uvector<vertex_t> gathered_v1(gathered_size, handle.get_stream());
rmm::device_uvector<vertex_t> gathered_v2(gathered_size, handle.get_stream());
rmm::device_uvector<weight_t> gathered_score(gathered_size, handle.get_stream());

cugraph::device_gatherv(
handle.get_comms(),
thrust::make_zip_iterator(top_v1.begin(), top_v2.begin(), top_score.begin()),
thrust::make_zip_iterator(
gathered_v1.begin(), gathered_v2.begin(), gathered_score.begin()),

top_v1.size(),
rx_sizes,
rx_displs,
int{0},
handle.get_stream());

if (is_root) {
thrust::sort_by_key(handle.get_thrust_policy(),
gathered_score.begin(),
gathered_score.end(),
thrust::make_zip_iterator(gathered_v1.begin(), gathered_v2.begin()),
thrust::greater<weight_t>{});

if (gathered_v1.size() > *topk) {
gathered_v1.resize(*topk, handle.get_stream());
gathered_v2.resize(*topk, handle.get_stream());
gathered_score.resize(*topk, handle.get_stream());
}

top_v1 = std::move(gathered_v1);
top_v2 = std::move(gathered_v2);
top_score = std::move(gathered_score);
} else {
top_v1.resize(0, handle.get_stream());
top_v2.resize(0, handle.get_stream());
top_score.resize(0, handle.get_stream());
}
top_v1 = std::move(gathered_v1);
top_v2 = std::move(gathered_v2);
top_score = std::move(gathered_score);
} else {
top_v1.resize(0, handle.get_stream());
top_v2.resize(0, handle.get_stream());
top_score.resize(0, handle.get_stream());
}
}

if (top_score.size() == *topk) {
raft::update_host(
&similarity_threshold, top_score.data() + *topk - 1, 1, handle.get_stream());
if (top_score.size() == *topk) {
raft::update_host(
&similarity_threshold, top_score.data() + *topk - 1, 1, handle.get_stream());

if constexpr (multi_gpu) {
similarity_threshold = host_scalar_bcast(
handle.get_comms(), similarity_threshold, int{0}, handle.get_stream());
}
if constexpr (multi_gpu) {
similarity_threshold = host_scalar_bcast(
handle.get_comms(), similarity_threshold, int{0}, handle.get_stream());
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/tests/link_prediction/mg_similarity_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ INSTANTIATE_TEST_SUITE_P(
Similarity_Usecase{false, true, false, 20, 100, 10},
Similarity_Usecase{false, true, true, 20, 100},
Similarity_Usecase{false, true, true, 20, 100},
Similarity_Usecase{false, true, true, std::nullopt, 100, 10},
Similarity_Usecase{false, true, true, 20, 100, 10}),
::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"),
cugraph::test::File_Usecase("test/datasets/netscience.mtx"))));
Expand All @@ -329,6 +330,7 @@ INSTANTIATE_TEST_SUITE_P(
Similarity_Usecase{false, true, false, 20, 100, 10},
Similarity_Usecase{false, true, true, 20, 100},
Similarity_Usecase{false, true, true, 20, 100},
Similarity_Usecase{false, true, true, std::nullopt, 100, 10},
Similarity_Usecase{false, true, true, 20, 100, 10}),
::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, true, false))));

Expand Down

0 comments on commit 1e5b5ce

Please sign in to comment.