diff --git a/cpp/include/cugraph/edge_partition_device_view.cuh b/cpp/include/cugraph/edge_partition_device_view.cuh index 21ed83a87f..628c3cc10c 100644 --- a/cpp/include/cugraph/edge_partition_device_view.cuh +++ b/cpp/include/cugraph/edge_partition_device_view.cuh @@ -219,6 +219,7 @@ class edge_partition_device_view_t count /* size = 1 */, rmm::cuda_stream_view stream) const { + if (thrust::distance(major_first, major_last) == 0) { + RAFT_CUDA_TRY(cudaMemsetAsync(count.data(), 0, sizeof(size_t), stream)); + } + rmm::device_uvector d_tmp_storage(0, stream); size_t tmp_storage_bytes{0}; @@ -368,6 +373,7 @@ class edge_partition_device_view_t()); } + template + __host__ void compute_number_of_edges_async(MajorIterator major_first, + MajorIterator major_last, + raft::device_span count /* size = 1 */, + rmm::cuda_stream_view stream) const + { + if (thrust::distance(major_first, major_last) == 0) { + RAFT_CUDA_TRY(cudaMemsetAsync(count.data(), 0, sizeof(size_t), stream)); + } + + rmm::device_uvector d_tmp_storage(0, stream); + size_t tmp_storage_bytes{0}; + + auto local_degree_first = thrust::make_transform_iterator( + major_first, + detail::local_degree_op_t{this->offsets_, + std::byte{0} /* dummy */, + std::byte{0} /* dummy */, + std::byte{0} /* dummy */}); + cub::DeviceReduce::Sum(static_cast(nullptr), + tmp_storage_bytes, + local_degree_first, + count.data(), + thrust::distance(major_first, major_last), + stream); + d_tmp_storage.resize(tmp_storage_bytes, stream); + cub::DeviceReduce::Sum(d_tmp_storage.data(), + tmp_storage_bytes, + local_degree_first, + count.data(), + thrust::distance(major_first, major_last), + stream); + } + __host__ rmm::device_uvector compute_local_degrees(rmm::cuda_stream_view stream) const { rmm::device_uvector local_degrees(this->major_range_size(), stream); @@ -682,6 +727,7 @@ class edge_partition_device_view_t((*segment_offsets).data(), (*segment_offsets).size()), - graph_view.local_vertex_partition_range_first(), - handle.get_stream()); - (*key_segment_offsets).back() = *((*key_segment_offsets).rbegin() + 1); - frontier_key_last = frontier_key_first + (*key_segment_offsets).back(); + if (thrust::distance(frontier_key_first, frontier_key_last) > 0) { + key_segment_offsets = compute_key_segment_offsets( + frontier_key_first, + frontier_key_last, + raft::host_span((*segment_offsets).data(), (*segment_offsets).size()), + graph_view.local_vertex_partition_range_first(), + handle.get_stream()); + (*key_segment_offsets).back() = *((*key_segment_offsets).rbegin() + 1); + frontier_key_last = frontier_key_first + (*key_segment_offsets).back(); + } else { + key_segment_offsets = std::vector((*segment_offsets).size(), 0); + } } } @@ -931,10 +935,16 @@ extract_transform_v_frontier_e(raft::handle_t const& handle, (range_size > 0) ? (num_keys / static_cast(range_size)) : double{0.0}; } avg_fill_ratio /= static_cast(minor_comm_size); - constexpr double threshold_ratio = 8.0 /* tuning parameter */ / static_cast(sizeof(vertex_t) * 8); - if (avg_fill_ratio > threshold_ratio) { + auto avg_frontier_size = + std::reduce(local_frontier_sizes.begin(), local_frontier_sizes.end()) / + static_cast(minor_comm_size); + + if ((avg_fill_ratio > threshold_ratio) && + (static_cast(avg_frontier_size) > + packed_bools_per_word() * + 32 /* tuning parameter, to consider additional kernel launch overhead */)) { frontier_bitmap = compute_vertex_list_bitmap_info(frontier_key_first, frontier_key_last, @@ -972,18 +982,21 @@ extract_transform_v_frontier_e(raft::handle_t const& handle, auto max_tmp_buffer_size = static_cast( static_cast(handle.get_device_properties().totalGlobalMem) * 0.2); - auto aggregate_major_range_size = host_scalar_allreduce( - comm, - static_cast(thrust::distance(frontier_key_first, frontier_key_last)), - raft::comms::op_t::SUM, - handle.get_stream()); - auto aggregate_max_pushes = host_scalar_allreduce( - comm, - local_max_pushes, - raft::comms::op_t::SUM, - handle.get_stream()); // this is approximate as we only consider local edges for - // [frontier_key_first, frontier_key_last), note that neighbor lists - // are partitioned if minor_comm_size > 1 + size_t aggregate_major_range_size{}; + size_t aggregate_max_pushes{}; // this is approximate as we only consider local edges for + // [frontier_key_first, frontier_key_last), note that neighbor + // lists are partitioned if minor_comm_size > 1 + { + auto tmp = host_scalar_allreduce( + comm, + thrust::make_tuple( + static_cast(thrust::distance(frontier_key_first, frontier_key_last)), + local_max_pushes), + raft::comms::op_t::SUM, + handle.get_stream()); + aggregate_major_range_size = thrust::get<0>(tmp); + aggregate_max_pushes = thrust::get<1>(tmp); + } size_t key_size{0}; if constexpr (std::is_arithmetic_v) { @@ -1290,6 +1303,10 @@ extract_transform_v_frontier_e(raft::handle_t const& handle, } } } +#if EXTRACT_PERFORMANCE_MEASUREMENT + if (loop_stream_pool_indices) { handle.sync_stream_pool(*loop_stream_pool_indices); } + auto subtime2 = std::chrono::steady_clock::now(); +#endif if (key_segment_offset_vectors) { for (size_t j = 0; j < loop_count; ++j) { @@ -1377,7 +1394,7 @@ extract_transform_v_frontier_e(raft::handle_t const& handle, } if (loop_stream_pool_indices) { handle.sync_stream_pool(*loop_stream_pool_indices); } #if EXTRACT_PERFORMANCE_MEASUREMENT - auto subtime2 = std::chrono::steady_clock::now(); + auto subtime3 = std::chrono::steady_clock::now(); #endif thrust::fill( @@ -1501,14 +1518,14 @@ extract_transform_v_frontier_e(raft::handle_t const& handle, if (stream_pool_indices) { handle.sync_stream_pool(*stream_pool_indices); } #if EXTRACT_PERFORMANCE_MEASUREMENT - auto subtime3 = std::chrono::steady_clock::now(); + auto subtime4 = std::chrono::steady_clock::now(); #endif std::vector h_counts(loop_count); raft::update_host(h_counts.data(), counters.data(), loop_count, handle.get_stream()); handle.sync_stream(); #if EXTRACT_PERFORMANCE_MEASUREMENT - auto subtime4 = std::chrono::steady_clock::now(); + auto subtime5 = std::chrono::steady_clock::now(); #endif for (size_t j = 0; j < loop_count; ++j) { @@ -1535,15 +1552,16 @@ extract_transform_v_frontier_e(raft::handle_t const& handle, } if (loop_stream_pool_indices) { handle.sync_stream_pool(*loop_stream_pool_indices); } #if EXTRACT_PERFORMANCE_MEASUREMENT - auto subtime5 = std::chrono::steady_clock::now(); + auto subtime6 = std::chrono::steady_clock::now(); std::chrono::duration subdur0 = subtime1 - subtime0; std::chrono::duration subdur1 = subtime2 - subtime1; std::chrono::duration subdur2 = subtime3 - subtime2; std::chrono::duration subdur3 = subtime4 - subtime3; std::chrono::duration subdur4 = subtime5 - subtime4; + std::chrono::duration subdur5 = subtime6 - subtime5; std::cerr << "sub (extract) took (" << subdur0.count() << "," << subdur1.count() << "," - << subdur2.count() << "," << subdur3.count() << "," << subdur4.count() - << ") loop_count=" << loop_count << std::endl; + << subdur2.count() << "," << subdur3.count() << "," << subdur4.count() << "," + << subdur5.count() << ") loop_count=" << loop_count << std::endl; #endif } #if EXTRACT_PERFORMANCE_MEASUREMENT diff --git a/cpp/src/prims/detail/per_v_transform_reduce_e.cuh b/cpp/src/prims/detail/per_v_transform_reduce_e.cuh index b7d4f88865..3ab4ba39e3 100644 --- a/cpp/src/prims/detail/per_v_transform_reduce_e.cuh +++ b/cpp/src/prims/detail/per_v_transform_reduce_e.cuh @@ -1944,11 +1944,17 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, (range_size > 0) ? (num_keys / static_cast(range_size)) : double{0.0}; } avg_fill_ratio /= static_cast(minor_comm_size); - double threshold_ratio = 2.0 /* tuning parameter (consider that we need to reprodce vertex list from bitmap)*/ / static_cast((v_compressible ? sizeof(uint32_t) : sizeof(vertex_t)) * 8); - if (avg_fill_ratio > threshold_ratio) { + auto avg_key_list_size = + std::reduce(local_key_list_sizes.begin(), local_key_list_sizes.end()) / + static_cast(minor_comm_size); + + if ((avg_fill_ratio > threshold_ratio) && + (static_cast(avg_key_list_size) > + packed_bools_per_word() * + 32 /* tuning parameter, to considerr additional kernel launch overhead */)) { v_list_bitmap = compute_vertex_list_bitmap_info(sorted_unique_key_first, sorted_unique_nzd_key_last, local_v_list_range_firsts[minor_comm_rank], diff --git a/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh b/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh index 5c8083837b..ff70ae2195 100644 --- a/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh +++ b/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh @@ -157,8 +157,8 @@ sort_and_reduce_buffer_elements( dataframe_buffer_type_t&& key_buffer, optional_dataframe_buffer_type_t&& payload_buffer, ReduceOp reduce_op, - std::conditional_t, std::tuple, std::byte /* dummy */> - vertex_range, + std::conditional_t, std::vector, std::byte /* dummy */> + vertex_range_offsets, std::optional invalid_key /* drop (key, (payload)) pairs with invalid key */) { constexpr bool compressed = @@ -173,7 +173,7 @@ sort_and_reduce_buffer_elements( reduce_op::any>)) { // try to use // bitmap for // filtering - key_t range_size = std::get<1>(vertex_range) - std::get<0>(vertex_range); + key_t range_size = vertex_range_offsets.back() - vertex_range_offsets.front(); if (static_cast(size_dataframe_buffer(key_buffer)) >= static_cast(range_size) * 0.125 /* tuning parameter */) { // use bitmap for filtering @@ -190,7 +190,7 @@ sort_and_reduce_buffer_elements( update_keep_flag_t{ raft::device_span(bitmap.data(), bitmap.size()), raft::device_span(keep_flags.data(), keep_flags.size()), - std::get<0>(vertex_range), + vertex_range_offsets.front(), get_dataframe_buffer_begin(key_buffer), to_thrust_optional(invalid_key)}); auto stencil_first = thrust::make_transform_iterator( @@ -245,7 +245,7 @@ sort_and_reduce_buffer_elements( key_buffer.end(), output_key_buffer.begin(), cuda::proclaim_return_type( - [v_first = std::get<0>(vertex_range)] __device__(uint32_t v_offset) { + [v_first = vertex_range_offsets.front()] __device__(uint32_t v_offset) { return static_cast(v_first + v_offset); })); return std::make_tuple(std::move(output_key_buffer), std::move(payload_buffer)); @@ -274,7 +274,7 @@ sort_and_reduce_buffer_elements( auto input_key_first = thrust::make_transform_iterator( get_dataframe_buffer_begin(key_buffer), cuda::proclaim_return_type( - [v_first = std::get<0>(vertex_range)] __device__(auto v_offset) { + [v_first = vertex_range_offsets.front()] __device__(auto v_offset) { return static_cast(v_first + v_offset); })); resize_dataframe_buffer( @@ -331,7 +331,7 @@ sort_and_reduce_buffer_elements( auto input_key_first = thrust::make_transform_iterator( get_dataframe_buffer_begin(key_buffer), cuda::proclaim_return_type( - [v_first = std::get<0>(vertex_range)] __device__(auto v_offset) { + [v_first = vertex_range_offsets.front()] __device__(auto v_offset) { return static_cast(v_first + v_offset); })); auto tmp_payload_buffer = allocate_dataframe_buffer( @@ -430,7 +430,7 @@ sort_and_reduce_buffer_elements( auto input_key_first = thrust::make_transform_iterator( get_dataframe_buffer_begin(key_buffer), cuda::proclaim_return_type( - [v_first = std::get<0>(vertex_range)] __device__(auto v_offset) { + [v_first = vertex_range_offsets.front()] __device__(auto v_offset) { return static_cast(v_first + v_offset); })); thrust::reduce_by_key(handle.get_thrust_policy(), @@ -531,20 +531,36 @@ transform_reduce_v_frontier_outgoing_e_by_dst(raft::handle_t const& handle, // 2. reduce the buffer - std:: - conditional_t, std::tuple, std::byte /* dummy */> - vertex_range{}; - if constexpr (std::is_integral_v) { - vertex_range = std::make_tuple(graph_view.local_edge_partition_dst_range_first(), - graph_view.local_edge_partition_dst_range_last()); + std::vector vertex_range_offsets{}; + if constexpr (GraphViewType::is_multi_gpu) { + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_rank = major_comm.get_rank(); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_rank = minor_comm.get_rank(); + auto const minor_comm_size = minor_comm.get_size(); + vertex_range_offsets = std::vector(major_comm_size + 1); + for (int i = 0; i < major_comm_size; ++i) { + auto vertex_partition_id = + detail::compute_local_edge_partition_minor_range_vertex_partition_id_t{ + major_comm_size, minor_comm_size, major_comm_rank, minor_comm_rank}(i); + vertex_range_offsets[i] = graph_view.vertex_partition_range_first(vertex_partition_id); + } + vertex_range_offsets.back() = graph_view.local_edge_partition_dst_range_last(); + } else { + vertex_range_offsets = std::vector{graph_view.local_edge_partition_dst_range_first(), + graph_view.local_edge_partition_dst_range_last()}; } + std::conditional_t, std::vector, std::byte /* dummy */> + aux_range_offsets{}; + if constexpr (std::is_integral_v) { aux_range_offsets = vertex_range_offsets; } std::tie(key_buffer, payload_buffer) = detail::sort_and_reduce_buffer_elements( handle, std::move(key_buffer), std::move(payload_buffer), reduce_op, - vertex_range, + aux_range_offsets, std::nullopt); #if TRANSFORM_REDUCE_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); @@ -557,285 +573,304 @@ transform_reduce_v_frontier_outgoing_e_by_dst(raft::handle_t const& handle, bool aligned_path = false; // FIXME: delete double fill_ratio = 0.0; // FIXME: delete if constexpr (GraphViewType::is_multi_gpu) { - // FIXME: this step is unnecessary if major_comm_size== 1 auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); - auto const major_comm_rank = major_comm.get_rank(); auto const major_comm_size = major_comm.get_size(); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_rank = minor_comm.get_rank(); - auto const minor_comm_size = minor_comm.get_size(); - - constexpr bool try_compression = (sizeof(vertex_t) == 8) && std::is_same_v; + if (major_comm_size > 1) { + constexpr bool try_compression = (sizeof(vertex_t) == 8) && std::is_same_v; - std::conditional_t max_vertex_partition_size{ - 0}; - std::conditional_t, std::byte /* dummy */> - h_vertex_firsts{}; - if constexpr (try_compression) { h_vertex_firsts = std::vector(major_comm_size); } - std::vector h_vertex_lasts(major_comm_size); - for (size_t i = 0; i < h_vertex_lasts.size(); ++i) { - auto vertex_partition_id = - detail::compute_local_edge_partition_minor_range_vertex_partition_id_t{ - major_comm_size, minor_comm_size, major_comm_rank, minor_comm_rank}(i); + std::conditional_t + max_vertex_partition_size{0}; + std::conditional_t, std::byte /* dummy */> + h_vertex_firsts{}; if constexpr (try_compression) { - max_vertex_partition_size = std::max( - graph_view.vertex_partition_range_size(vertex_partition_id), max_vertex_partition_size); - h_vertex_firsts[i] = graph_view.vertex_partition_range_first(vertex_partition_id); + h_vertex_firsts = std::vector(vertex_range_offsets.begin(), + vertex_range_offsets.begin() + major_comm_size); + } + std::vector h_vertex_lasts(vertex_range_offsets.begin() + 1, + vertex_range_offsets.end()); + for (size_t i = 0; i < h_vertex_lasts.size(); ++i) { + if constexpr (try_compression) { + max_vertex_partition_size = std::max( + vertex_range_offsets[i + 1] - vertex_range_offsets[i], max_vertex_partition_size); + } } - h_vertex_lasts[i] = graph_view.vertex_partition_range_last(vertex_partition_id); - } - std::conditional_t>, - std::byte /* dummy */> - d_vertex_firsts{}; - rmm::device_uvector d_vertex_lasts(h_vertex_lasts.size(), handle.get_stream()); - if constexpr (try_compression) { - if (max_vertex_partition_size <= std::numeric_limits::max()) { - d_vertex_firsts = - rmm::device_uvector(h_vertex_firsts.size(), handle.get_stream()); - raft::update_device((*d_vertex_firsts).data(), - h_vertex_firsts.data(), - h_vertex_firsts.size(), - handle.get_stream()); + std::conditional_t>, + std::byte /* dummy */> + d_vertex_firsts{}; + rmm::device_uvector d_vertex_lasts(h_vertex_lasts.size(), handle.get_stream()); + if constexpr (try_compression) { + if (max_vertex_partition_size <= std::numeric_limits::max()) { + d_vertex_firsts = + rmm::device_uvector(h_vertex_firsts.size(), handle.get_stream()); + raft::update_device((*d_vertex_firsts).data(), + h_vertex_firsts.data(), + h_vertex_firsts.size(), + handle.get_stream()); + } } - } - raft::update_device( - d_vertex_lasts.data(), h_vertex_lasts.data(), h_vertex_lasts.size(), handle.get_stream()); - rmm::device_uvector d_tx_buffer_last_boundaries(d_vertex_lasts.size(), - handle.get_stream()); - auto reduce_by_first = - thrust_tuple_get_or_identity( - get_dataframe_buffer_begin(key_buffer)); - thrust::lower_bound(handle.get_thrust_policy(), - reduce_by_first, - reduce_by_first + size_dataframe_buffer(key_buffer), - d_vertex_lasts.begin(), - d_vertex_lasts.end(), - d_tx_buffer_last_boundaries.begin()); - std::conditional_t>, - std::byte /* dummy */> - compressed_v_buffer{}; - if constexpr (try_compression) { - if (d_vertex_firsts) { - compressed_v_buffer = - rmm::device_uvector(size_dataframe_buffer(key_buffer), handle.get_stream()); - thrust::transform(handle.get_thrust_policy(), - get_dataframe_buffer_begin(key_buffer), - get_dataframe_buffer_end(key_buffer), - (*compressed_v_buffer).begin(), - cuda::proclaim_return_type( - [firsts = raft::device_span((*d_vertex_firsts).data(), - (*d_vertex_firsts).size()), - lasts = raft::device_span( - d_vertex_lasts.data(), d_vertex_lasts.size())] __device__(auto v) { - auto major_comm_rank = thrust::distance( - lasts.begin(), - thrust::upper_bound(thrust::seq, lasts.begin(), lasts.end(), v)); - return static_cast(v - firsts[major_comm_rank]); - })); - resize_dataframe_buffer(key_buffer, 0, handle.get_stream()); - shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); + raft::update_device( + d_vertex_lasts.data(), h_vertex_lasts.data(), h_vertex_lasts.size(), handle.get_stream()); + rmm::device_uvector d_tx_buffer_last_boundaries(d_vertex_lasts.size(), + handle.get_stream()); + auto key_v_first = + thrust_tuple_get_or_identity( + get_dataframe_buffer_begin(key_buffer)); + thrust::lower_bound(handle.get_thrust_policy(), + key_v_first, + key_v_first + size_dataframe_buffer(key_buffer), + d_vertex_lasts.begin(), + d_vertex_lasts.end(), + d_tx_buffer_last_boundaries.begin()); + std::conditional_t>, + std::byte /* dummy */> + compressed_v_buffer{}; + if constexpr (try_compression) { + if (d_vertex_firsts) { + compressed_v_buffer = + rmm::device_uvector(size_dataframe_buffer(key_buffer), handle.get_stream()); + thrust::transform(handle.get_thrust_policy(), + get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_end(key_buffer), + (*compressed_v_buffer).begin(), + cuda::proclaim_return_type( + [firsts = raft::device_span( + (*d_vertex_firsts).data(), (*d_vertex_firsts).size()), + lasts = raft::device_span( + d_vertex_lasts.data(), d_vertex_lasts.size())] __device__(auto v) { + auto major_comm_rank = thrust::distance( + lasts.begin(), + thrust::upper_bound(thrust::seq, lasts.begin(), lasts.end(), v)); + return static_cast(v - firsts[major_comm_rank]); + })); + resize_dataframe_buffer(key_buffer, 0, handle.get_stream()); + shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); + } } - } - std::vector h_tx_buffer_last_boundaries(d_tx_buffer_last_boundaries.size()); - raft::update_host(h_tx_buffer_last_boundaries.data(), - d_tx_buffer_last_boundaries.data(), - d_tx_buffer_last_boundaries.size(), - handle.get_stream()); - handle.sync_stream(); - std::vector tx_counts(h_tx_buffer_last_boundaries.size()); - std::adjacent_difference( - h_tx_buffer_last_boundaries.begin(), h_tx_buffer_last_boundaries.end(), tx_counts.begin()); + std::vector h_tx_buffer_last_boundaries(d_tx_buffer_last_boundaries.size()); + raft::update_host(h_tx_buffer_last_boundaries.data(), + d_tx_buffer_last_boundaries.data(), + d_tx_buffer_last_boundaries.size(), + handle.get_stream()); + handle.sync_stream(); + std::vector tx_counts(h_tx_buffer_last_boundaries.size()); + std::adjacent_difference( + h_tx_buffer_last_boundaries.begin(), h_tx_buffer_last_boundaries.end(), tx_counts.begin()); #if TRANSFORM_REDUCE_PERFORMANCE_MEASUREMENT - RAFT_CUDA_TRY(cudaDeviceSynchronize()); - time3 = std::chrono::steady_clock::now(); + RAFT_CUDA_TRY(cudaDeviceSynchronize()); + time3 = std::chrono::steady_clock::now(); #endif - size_t min_element_size{cache_line_size}; - if constexpr (std::is_same_v) { - if constexpr (try_compression) { - if (compressed_v_buffer) { - min_element_size = std::min(sizeof(uint32_t), min_element_size); + size_t min_element_size{cache_line_size}; + if constexpr (std::is_same_v) { + if constexpr (try_compression) { + if (compressed_v_buffer) { + min_element_size = std::min(sizeof(uint32_t), min_element_size); + } else { + min_element_size = std::min(sizeof(key_t), min_element_size); + } } else { min_element_size = std::min(sizeof(key_t), min_element_size); } } else { - min_element_size = std::min(sizeof(key_t), min_element_size); + static_assert(is_thrust_tuple_of_arithmetic::value); + min_element_size = + std::min(cugraph::min_thrust_tuple_element_sizes(), min_element_size); } - } else { - static_assert(is_thrust_tuple_of_arithmetic::value); - min_element_size = - std::min(cugraph::min_thrust_tuple_element_sizes(), min_element_size); - } - if constexpr (!std::is_same_v) { - if constexpr (std::is_arithmetic_v) { - min_element_size = std::min(sizeof(payload_t), min_element_size); - } else { - static_assert(is_thrust_tuple_of_arithmetic::value); - min_element_size = std::min(min_thrust_tuple_element_sizes(), min_element_size); + if constexpr (!std::is_same_v) { + if constexpr (std::is_arithmetic_v) { + min_element_size = std::min(sizeof(payload_t), min_element_size); + } else { + static_assert(is_thrust_tuple_of_arithmetic::value); + min_element_size = + std::min(min_thrust_tuple_element_sizes(), min_element_size); + } } - } - assert((cache_line_size % min_element_size) == 0); - auto alignment = cache_line_size / min_element_size; - std::optional, key_t>> - invalid_key{std::nullopt}; + assert((cache_line_size % min_element_size) == 0); + auto alignment = cache_line_size / min_element_size; + std::optional, key_t>> + invalid_key{std::nullopt}; - size_t local_key_buffer_size{}; - if constexpr (try_compression) { - if (compressed_v_buffer) { - local_key_buffer_size = size_dataframe_buffer(*compressed_v_buffer); + size_t local_key_buffer_size{}; + if constexpr (try_compression) { + if (compressed_v_buffer) { + local_key_buffer_size = size_dataframe_buffer(*compressed_v_buffer); + } else { + local_key_buffer_size = size_dataframe_buffer(key_buffer); + } } else { local_key_buffer_size = size_dataframe_buffer(key_buffer); } - } else { - local_key_buffer_size = size_dataframe_buffer(key_buffer); - } - auto avg_key_buffer_size = - host_scalar_allreduce( - major_comm, local_key_buffer_size, raft::comms::op_t::SUM, handle.get_stream()) / - major_comm_size; - if (avg_key_buffer_size >= alignment * size_t{128} /* 128 tuning parameter */) { - aligned_path = true; // FIXME: delete - if constexpr (std::is_same_v) { - if constexpr (try_compression) { - if (compressed_v_buffer) { - invalid_key = std::numeric_limits::max(); + auto avg_key_buffer_size = + host_scalar_allreduce( + major_comm, local_key_buffer_size, raft::comms::op_t::SUM, handle.get_stream()) / + major_comm_size; + if (avg_key_buffer_size >= alignment * size_t{128} /* 128 tuning parameter */) { + aligned_path = true; // FIXME: delete + if constexpr (std::is_same_v) { + if constexpr (try_compression) { + if (compressed_v_buffer) { + invalid_key = std::numeric_limits::max(); + } else { + invalid_key = invalid_vertex_id_v; + } } else { invalid_key = invalid_vertex_id_v; } } else { - invalid_key = invalid_vertex_id_v; + invalid_key = key_t{}; + thrust::get<0>(*invalid_key) = invalid_vertex_id_v; } - } else { - invalid_key = key_t{}; - thrust::get<0>(*invalid_key) = invalid_vertex_id_v; - } - if constexpr (try_compression) { - if (compressed_v_buffer) { - auto rx_compressed_v_buffer = - allocate_dataframe_buffer(size_t{0}, handle.get_stream()); - std::tie(rx_compressed_v_buffer, + if constexpr (try_compression) { + if (compressed_v_buffer) { + auto rx_compressed_v_buffer = + allocate_dataframe_buffer(size_t{0}, handle.get_stream()); + std::tie(rx_compressed_v_buffer, + std::ignore, + std::ignore, + std::ignore, + std::ignore, + std::ignore, + std::ignore) = shuffle_values(major_comm, + get_dataframe_buffer_begin(*compressed_v_buffer), + tx_counts, + alignment, + std::make_optional(std::get<1>(*invalid_key)), + handle.get_stream()); + compressed_v_buffer = std::move(rx_compressed_v_buffer); + } else { + auto rx_key_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); + std::tie(rx_key_buffer, + std::ignore, + std::ignore, + std::ignore, + std::ignore, + std::ignore, + std::ignore) = shuffle_values(major_comm, + get_dataframe_buffer_begin(key_buffer), + tx_counts, + alignment, + std::make_optional(std::get<0>(*invalid_key)), + handle.get_stream()); + key_buffer = std::move(rx_key_buffer); + } + } else { + auto rx_key_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); + std::tie(rx_key_buffer, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore) = shuffle_values(major_comm, - get_dataframe_buffer_begin(*compressed_v_buffer), + get_dataframe_buffer_begin(key_buffer), tx_counts, alignment, - std::make_optional(std::get<1>(*invalid_key)), + invalid_key, handle.get_stream()); - compressed_v_buffer = std::move(rx_compressed_v_buffer); - } else { - auto rx_key_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); - std::tie(rx_key_buffer, + key_buffer = std::move(rx_key_buffer); + } + if constexpr (!std::is_same_v) { + auto rx_payload_buffer = + allocate_dataframe_buffer(size_t{0}, handle.get_stream()); + std::tie(rx_payload_buffer, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore, std::ignore) = shuffle_values(major_comm, - get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(payload_buffer), tx_counts, alignment, - std::make_optional(std::get<0>(*invalid_key)), + std::nullopt, handle.get_stream()); - key_buffer = std::move(rx_key_buffer); + payload_buffer = std::move(rx_payload_buffer); } } else { - auto rx_key_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); - std::tie(rx_key_buffer, - std::ignore, - std::ignore, - std::ignore, - std::ignore, - std::ignore, - std::ignore) = shuffle_values(major_comm, - get_dataframe_buffer_begin(key_buffer), - tx_counts, - alignment, - invalid_key, - handle.get_stream()); - key_buffer = std::move(rx_key_buffer); - } - if constexpr (!std::is_same_v) { - auto rx_payload_buffer = - allocate_dataframe_buffer(size_t{0}, handle.get_stream()); - std::tie(rx_payload_buffer, - std::ignore, - std::ignore, - std::ignore, - std::ignore, - std::ignore, - std::ignore) = shuffle_values(major_comm, - get_dataframe_buffer_begin(payload_buffer), - tx_counts, - alignment, - std::nullopt, - handle.get_stream()); - payload_buffer = std::move(rx_payload_buffer); - } - } else { - if constexpr (try_compression) { - if (compressed_v_buffer) { - auto rx_compressed_v_buffer = - allocate_dataframe_buffer(size_t{0}, handle.get_stream()); - std::tie(rx_compressed_v_buffer, std::ignore) = - shuffle_values(major_comm, - get_dataframe_buffer_begin(*compressed_v_buffer), - tx_counts, - handle.get_stream()); - compressed_v_buffer = std::move(rx_compressed_v_buffer); + if constexpr (try_compression) { + if (compressed_v_buffer) { + auto rx_compressed_v_buffer = + allocate_dataframe_buffer(size_t{0}, handle.get_stream()); + std::tie(rx_compressed_v_buffer, std::ignore) = + shuffle_values(major_comm, + get_dataframe_buffer_begin(*compressed_v_buffer), + tx_counts, + handle.get_stream()); + compressed_v_buffer = std::move(rx_compressed_v_buffer); + } else { + auto rx_key_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); + std::tie(rx_key_buffer, std::ignore) = shuffle_values( + major_comm, get_dataframe_buffer_begin(key_buffer), tx_counts, handle.get_stream()); + key_buffer = std::move(rx_key_buffer); + } } else { auto rx_key_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); std::tie(rx_key_buffer, std::ignore) = shuffle_values( major_comm, get_dataframe_buffer_begin(key_buffer), tx_counts, handle.get_stream()); key_buffer = std::move(rx_key_buffer); } - } else { - auto rx_key_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); - std::tie(rx_key_buffer, std::ignore) = shuffle_values( - major_comm, get_dataframe_buffer_begin(key_buffer), tx_counts, handle.get_stream()); - key_buffer = std::move(rx_key_buffer); - } - if constexpr (!std::is_same_v) { - auto rx_payload_buffer = - allocate_dataframe_buffer(size_t{0}, handle.get_stream()); - std::tie(rx_payload_buffer, std::ignore) = shuffle_values( - major_comm, get_dataframe_buffer_begin(payload_buffer), tx_counts, handle.get_stream()); - payload_buffer = std::move(rx_payload_buffer); + if constexpr (!std::is_same_v) { + auto rx_payload_buffer = + allocate_dataframe_buffer(size_t{0}, handle.get_stream()); + std::tie(rx_payload_buffer, std::ignore) = shuffle_values( + major_comm, get_dataframe_buffer_begin(payload_buffer), tx_counts, handle.get_stream()); + payload_buffer = std::move(rx_payload_buffer); + } } - } #if TRANSFORM_REDUCE_PERFORMANCE_MEASUREMENT - RAFT_CUDA_TRY(cudaDeviceSynchronize()); - time4 = std::chrono::steady_clock::now(); + RAFT_CUDA_TRY(cudaDeviceSynchronize()); + time4 = std::chrono::steady_clock::now(); #endif - if constexpr (std::is_integral_v) { - vertex_range = std::make_tuple(graph_view.local_vertex_partition_range_first(), - graph_view.local_vertex_partition_range_last()); - fill_ratio = static_cast(size_dataframe_buffer(key_buffer)) / - static_cast(std::get<1>(vertex_range) - - std::get<0>(vertex_range)); // FIXME: delete - } - if constexpr (try_compression) { - if (compressed_v_buffer) { + if constexpr (std::is_integral_v) { + aux_range_offsets = std::vector{graph_view.local_vertex_partition_range_first(), + graph_view.local_vertex_partition_range_last()}; +#if 1 // FIXME: delete + size_t key_buffer_size{}; + if constexpr (try_compression) { + if (compressed_v_buffer) { + key_buffer_size = (*compressed_v_buffer).size(); + } else { + key_buffer_size = size_dataframe_buffer(key_buffer); + } + } else { + key_buffer_size = size_dataframe_buffer(key_buffer); + } + fill_ratio = static_cast(key_buffer_size) / + static_cast(aux_range_offsets.back() - aux_range_offsets.front()); +#endif + } + if constexpr (try_compression) { + if (compressed_v_buffer) { #if TRANSFORM_REDUCE_PERFORMANCE_MEASUREMENT - size_before_greduce = size_dataframe_buffer(*compressed_v_buffer); // FIXME: delete + size_before_greduce = size_dataframe_buffer(*compressed_v_buffer); // FIXME: delete #endif - std::tie(key_buffer, payload_buffer) = - detail::sort_and_reduce_buffer_elements( - handle, - std::move(*compressed_v_buffer), - std::move(payload_buffer), - reduce_op, - vertex_range, - invalid_key ? std::make_optional(std::get<1>(*invalid_key)) : std::nullopt); + std::tie(key_buffer, payload_buffer) = + detail::sort_and_reduce_buffer_elements( + handle, + std::move(*compressed_v_buffer), + std::move(payload_buffer), + reduce_op, + aux_range_offsets, + invalid_key ? std::make_optional(std::get<1>(*invalid_key)) : std::nullopt); + } else { +#if TRANSFORM_REDUCE_PERFORMANCE_MEASUREMENT + size_before_greduce = size_dataframe_buffer(key_buffer); // FIXME: delete +#endif + std::tie(key_buffer, payload_buffer) = + detail::sort_and_reduce_buffer_elements( + handle, + std::move(key_buffer), + std::move(payload_buffer), + reduce_op, + aux_range_offsets, + invalid_key ? std::make_optional(std::get<0>(*invalid_key)) : std::nullopt); + } } else { #if TRANSFORM_REDUCE_PERFORMANCE_MEASUREMENT size_before_greduce = size_dataframe_buffer(key_buffer); // FIXME: delete @@ -846,21 +881,9 @@ transform_reduce_v_frontier_outgoing_e_by_dst(raft::handle_t const& handle, std::move(key_buffer), std::move(payload_buffer), reduce_op, - vertex_range, - invalid_key ? std::make_optional(std::get<0>(*invalid_key)) : std::nullopt); + aux_range_offsets, + invalid_key); } - } else { -#if TRANSFORM_REDUCE_PERFORMANCE_MEASUREMENT - size_before_greduce = size_dataframe_buffer(key_buffer); // FIXME: delete -#endif - std::tie(key_buffer, payload_buffer) = - detail::sort_and_reduce_buffer_elements( - handle, - std::move(key_buffer), - std::move(payload_buffer), - reduce_op, - vertex_range, - invalid_key); } } #if TRANSFORM_REDUCE_PERFORMANCE_MEASUREMENT diff --git a/cpp/src/traversal/bfs_impl.cuh b/cpp/src/traversal/bfs_impl.cuh index 11ef079570..cdfa3422e2 100644 --- a/cpp/src/traversal/bfs_impl.cuh +++ b/cpp/src/traversal/bfs_impl.cuh @@ -119,7 +119,7 @@ void bfs(raft::handle_t const& handle, static_assert(!GraphViewType::is_storage_transposed, "GraphViewType should support the push model."); -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto prep0 = std::chrono::steady_clock::now(); #endif @@ -217,7 +217,7 @@ void bfs(raft::handle_t const& handle, thrust::fill(handle.get_thrust_policy(), output_first, output_first + n_sources, vertex_t{0}); // 3. update meta data for direction optimizing BFS -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto prep1 = std::chrono::steady_clock::now(); #endif @@ -330,7 +330,7 @@ void bfs(raft::handle_t const& handle, } // 4. initialize BFS frontier -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto prep2 = std::chrono::steady_clock::now(); #endif @@ -351,7 +351,7 @@ void bfs(raft::handle_t const& handle, handle, graph_view); // this may mark some vertices visited in previous iterations as unvisited // (but this is OK as we check prev_dst_visited_flags first) fill_edge_dst_property(handle, graph_view, dst_visited_flags.mutable_view(), false); -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto prep3 = std::chrono::steady_clock::now(); #endif @@ -362,7 +362,7 @@ void bfs(raft::handle_t const& handle, vertex_frontier.bucket(bucket_idx_cur).end(), prev_dst_visited_flags.mutable_view(), true); -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto prep4 = std::chrono::steady_clock::now(); std::chrono::duration dur0 = prep1 - prep0; @@ -382,7 +382,7 @@ void bfs(raft::handle_t const& handle, while (true) { vertex_t next_aggregate_frontier_size{}; if (topdown) { -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto topdown0 = std::chrono::steady_clock::now(); #endif @@ -405,7 +405,7 @@ void bfs(raft::handle_t const& handle, edge_dummy_property_t{}.view(), e_op, reduce_op::any()); -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto topdown1 = std::chrono::steady_clock::now(); #endif @@ -423,19 +423,19 @@ void bfs(raft::handle_t const& handle, vertex_frontier.bucket(bucket_idx_next) = key_bucket_t( handle, std::move(new_frontier_vertex_buffer)); -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto topdown2 = std::chrono::steady_clock::now(); #endif next_aggregate_frontier_size = static_cast(vertex_frontier.bucket(bucket_idx_next).aggregate_size()); -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto topdown3 = std::chrono::steady_clock::now(); #endif if (next_aggregate_frontier_size == 0) { -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT std::chrono::duration dur0 = topdown1 - topdown0; std::chrono::duration dur1 = topdown2 - topdown1; std::chrono::duration dur2 = topdown3 - topdown2; @@ -453,13 +453,14 @@ void bfs(raft::handle_t const& handle, vertex_frontier.bucket(bucket_idx_next).end(), prev_dst_visited_flags.mutable_view(), true); -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto topdown4 = std::chrono::steady_clock::now(); + auto topdown5 = std::chrono::steady_clock::now(); #endif if (direction_optimizing) { - { + if (vertex_frontier.bucket(bucket_idx_next).size() > 0) { rmm::device_uvector tmp_vertices((*nzd_unvisited_vertices).size(), handle.get_stream()); tmp_vertices.resize( @@ -473,6 +474,10 @@ void bfs(raft::handle_t const& handle, handle.get_stream()); nzd_unvisited_vertices = std::move(tmp_vertices); } +#if BFS_PERFORMANCE_MEASUREMENT + RAFT_CUDA_TRY(cudaDeviceSynchronize()); + topdown5 = std::chrono::steady_clock::now(); +#endif double m_f{0.0}; double m_u{0.0}; @@ -560,17 +565,17 @@ void bfs(raft::handle_t const& handle, thrust::plus{})); } - auto aggregate_m_f = - GraphViewType::is_multi_gpu - ? host_scalar_allreduce( - handle.get_comms(), m_f, raft::comms::op_t::SUM, handle.get_stream()) - : m_f; - auto aggregate_m_u = - GraphViewType::is_multi_gpu - ? host_scalar_allreduce( - handle.get_comms(), m_u, raft::comms::op_t::SUM, handle.get_stream()) - : m_u; -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete + auto aggregate_m_f = m_f; + auto aggregate_m_u = m_u; + if constexpr (GraphViewType::is_multi_gpu) { + auto tmp = host_scalar_allreduce(handle.get_comms(), + thrust::make_tuple(m_f, m_u), + raft::comms::op_t::SUM, + handle.get_stream()); + aggregate_m_f = thrust::get<0>(tmp); + aggregate_m_u = thrust::get<1>(tmp); + } +#if BFS_PERFORMANCE_MEASUREMENT std::cerr << "m_f=" << m_f << " m_u=" << m_u << " direction_optimizing_alpha=" << direction_optimizing_alpha << " aggregate_m_f * direction_optimzing_alpha=" @@ -584,9 +589,9 @@ void bfs(raft::handle_t const& handle, topdown = false; } } -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); - auto topdown5 = std::chrono::steady_clock::now(); + auto topdown6 = std::chrono::steady_clock::now(); #endif if (topdown) { // staying in top-down @@ -602,25 +607,26 @@ void bfs(raft::handle_t const& handle, vertex_frontier.bucket(bucket_idx_next) = key_bucket_t(handle); } -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); - auto topdown6 = std::chrono::steady_clock::now(); + auto topdown7 = std::chrono::steady_clock::now(); std::chrono::duration dur0 = topdown1 - topdown0; std::chrono::duration dur1 = topdown2 - topdown1; std::chrono::duration dur2 = topdown3 - topdown2; std::chrono::duration dur3 = topdown4 - topdown3; std::chrono::duration dur4 = topdown5 - topdown4; std::chrono::duration dur5 = topdown6 - topdown5; - std::chrono::duration dur = topdown6 - topdown0; + std::chrono::duration dur6 = topdown7 - topdown6; + std::chrono::duration dur = topdown7 - topdown0; std::cerr << "depth=" << depth << " topdown next_aggregate_frontier_size=" << next_aggregate_frontier_size - << " next topdown=" << topdown << " (prim,vf,host,fill,dir,vf) took " << dur.count() - << " (" << dur0.count() << "," << dur1.count() << "," << dur2.count() << "," - << dur3.count() << "," << dur4.count() << "," << dur5.count() << ") s." - << std::endl; + << " next topdown=" << topdown << " (prim,vf,host,fill,unvisited,dir,vf) took " + << dur.count() << " (" << dur0.count() << "," << dur1.count() << "," << dur2.count() + << "," << dur3.count() << "," << dur4.count() << "," << dur5.count() << "," + << dur6.count() << ") s." << std::endl; #endif - } else { // bottom up -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete + } else { // bottom up +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto bottomup0 = std::chrono::steady_clock::now(); #endif @@ -688,24 +694,31 @@ void bfs(raft::handle_t const& handle, handle.get_stream()); nzd_unvisited_vertices = std::move(tmp_vertices); } -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto bottomup1 = std::chrono::steady_clock::now(); #endif - next_aggregate_frontier_size = - GraphViewType::is_multi_gpu - ? host_scalar_allreduce(handle.get_comms(), - static_cast(new_frontier_vertex_buffer.size()), - raft::comms::op_t::SUM, - handle.get_stream()) - : static_cast(new_frontier_vertex_buffer.size()); -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete + next_aggregate_frontier_size = static_cast(new_frontier_vertex_buffer.size()); + auto aggregate_nzd_unvisited_vertices = + static_cast((*nzd_unvisited_vertices).size()); + if constexpr (GraphViewType::is_multi_gpu) { + auto tmp = host_scalar_allreduce( + handle.get_comms(), + thrust::make_tuple(static_cast(new_frontier_vertex_buffer.size()), + static_cast((*nzd_unvisited_vertices).size())), + raft::comms::op_t::SUM, + handle.get_stream()); + next_aggregate_frontier_size = thrust::get<0>(tmp); + aggregate_nzd_unvisited_vertices = thrust::get<1>(tmp); + } + +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto bottomup2 = std::chrono::steady_clock::now(); #endif if (next_aggregate_frontier_size == 0) { -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT std::chrono::duration dur0 = bottomup1 - bottomup0; std::chrono::duration dur1 = bottomup2 - bottomup1; std::chrono::duration dur = bottomup2 - bottomup0; @@ -721,29 +734,16 @@ void bfs(raft::handle_t const& handle, new_frontier_vertex_buffer.end(), prev_dst_visited_flags.mutable_view(), true); -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); auto bottomup3 = std::chrono::steady_clock::now(); #endif - // FIXME: better move this right after host_scalar_allreduce??? - auto aggregate_nzd_unvisited_vertices = - GraphViewType::is_multi_gpu - ? host_scalar_allreduce(handle.get_comms(), - static_cast((*nzd_unvisited_vertices).size()), - raft::comms::op_t::SUM, - handle.get_stream()) - : static_cast((*nzd_unvisited_vertices).size()); - if ((next_aggregate_frontier_size * direction_optimizing_beta < aggregate_nzd_unvisited_vertices) && (next_aggregate_frontier_size < cur_aggregate_frontier_size)) { topdown = true; } -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete - RAFT_CUDA_TRY(cudaDeviceSynchronize()); - auto bottomup4 = std::chrono::steady_clock::now(); -#endif if (topdown) { // swithcing to top-down vertex_frontier.bucket(bucket_idx_cur) = @@ -756,21 +756,20 @@ void bfs(raft::handle_t const& handle, raft::device_span((*nzd_unvisited_vertices).data(), (*nzd_unvisited_vertices).size())); } -#if BFS_PERFORMANCE_MEASUREMENT // FIXME: delete +#if BFS_PERFORMANCE_MEASUREMENT RAFT_CUDA_TRY(cudaDeviceSynchronize()); - auto bottomup5 = std::chrono::steady_clock::now(); + auto bottomup4 = std::chrono::steady_clock::now(); std::chrono::duration dur0 = bottomup1 - bottomup0; std::chrono::duration dur1 = bottomup2 - bottomup1; std::chrono::duration dur2 = bottomup3 - bottomup2; std::chrono::duration dur3 = bottomup4 - bottomup3; - std::chrono::duration dur4 = bottomup5 - bottomup4; - std::chrono::duration dur = bottomup5 - bottomup0; + std::chrono::duration dur = bottomup4 - bottomup0; std::cerr << "depth=" << depth << " bottomup next_aggregate_frontier_size=" << next_aggregate_frontier_size << " aggregatee_nzd_unvisited_vertices=" << aggregate_nzd_unvisited_vertices - << " (prim+,host,fill,dir,vf) took " << dur.count() << " (" << dur0.count() << "," - << dur1.count() << "," << dur2.count() << "," << dur3.count() << "," << dur4.count() - << ") s." << std::endl; + << " (prim+,host,fill,vf) took " << dur.count() << " (" << dur0.count() << "," + << dur1.count() << "," << dur2.count() << "," << dur3.count() << ") s." + << std::endl; #endif } cur_aggregate_frontier_size = next_aggregate_frontier_size;