Skip to content

Commit

Permalink
Merge pull request #4 from NilFoundation/directly-include-paralleliza…
Browse files Browse the repository at this point in the history
…tion-utils

Directly include parallelization utils
  • Loading branch information
AndreyMlashkin authored Jun 10, 2024
2 parents 8a7886c + e273c98 commit d09f687
Show file tree
Hide file tree
Showing 7 changed files with 478 additions and 4 deletions.
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
[submodule "libs/parallelization-utils"]
path = libs/parallelization-utils
url = git@github.com:NilFoundation/actor-core
[submodule "libs/threaded-zk"]
path = libs/parallel-zk
url = git@github.com:NilFoundation/actor-zk
Expand Down
1 change: 0 additions & 1 deletion libs/parallelization-utils
Submodule parallelization-utils deleted from c2aa7c
81 changes: 81 additions & 0 deletions libs/parallelization-utils/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
cmake_minimum_required(VERSION 2.8.12)

cmake_policy(SET CMP0025 NEW)
cmake_policy(SET CMP0028 NEW)
cmake_policy(SET CMP0042 NEW)
cmake_policy(SET CMP0048 NEW)
cmake_policy(SET CMP0057 NEW)
cmake_policy(SET CMP0076 NEW)

list(APPEND CMAKE_MODULE_PATH
"${CMAKE_CURRENT_LIST_DIR}/cmake"
"${CMAKE_CURRENT_LIST_DIR}/cmake/packages"
"${CMAKE_CURRENT_LIST_DIR}/cmake/modules/share/modules/cmake")

include(CMConfig)
include(CMSetupVersion)
include(CMTest)

if(NOT CMAKE_WORKSPACE_NAME OR NOT ("${CMAKE_WORKSPACE_NAME}" STREQUAL "actor"))
cm_workspace(actor)
endif()

macro(cm_find_package NAME)
foreach(ITERATOR ${CMAKE_WORKSPACE_LIST})
if(NOT "${NAME}" MATCHES "^${ITERATOR}_.*$" AND NOT "${NAME}" STREQUAL CM)
find_package(${ARGV})
else()
set(${ARGV0}_FOUND ON CACHE BOOL "")
endif()
endforeach()
endmacro()

cm_project(core WORKSPACE_NAME ${CMAKE_WORKSPACE_NAME} LANGUAGES C CXX)

if(NOT Boost_FOUND AND NOT CMAKE_CROSSCOMPILING)
cm_find_package(Boost)
endif()

cm_find_package(CM)
include(CMDeploy)
include(FindPkgConfig)

option(BUILD_WITH_CCACHE "Build with ccache usage" TRUE)

if(UNIX AND BUILD_WITH_CCACHE)
find_program(CCACHE_FOUND ccache)
if(CCACHE_FOUND)
set(CMAKE_CXX_COMPILER_LAUNCHER "ccache")
endif(CCACHE_FOUND)
endif()

list(APPEND ${CURRENT_PROJECT_NAME}_PUBLIC_HEADERS)

list(APPEND ${CURRENT_PROJECT_NAME}_UNGROUPED_SOURCES)

list(APPEND ${CURRENT_PROJECT_NAME}_HEADERS ${${CURRENT_PROJECT_NAME}_PUBLIC_HEADERS})

list(APPEND ${CURRENT_PROJECT_NAME}_SOURCES ${${CURRENT_PROJECT_NAME}_UNGROUPED_SOURCES})

cm_setup_version(VERSION 0.1.0 PREFIX ${CMAKE_WORKSPACE_NAME}_${CURRENT_PROJECT_NAME})

add_library(${CMAKE_WORKSPACE_NAME}_${CURRENT_PROJECT_NAME} INTERFACE)

set_target_properties(${CMAKE_WORKSPACE_NAME}_${CURRENT_PROJECT_NAME} PROPERTIES
EXPORT_NAME ${CURRENT_PROJECT_NAME})

target_include_directories(${CMAKE_WORKSPACE_NAME}_${CURRENT_PROJECT_NAME} INTERFACE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/include>

$<$<BOOL:${Boost_FOUND}>:${Boost_INCLUDE_DIRS}>)

target_link_libraries(${CMAKE_WORKSPACE_NAME}_${CURRENT_PROJECT_NAME} INTERFACE
${Boost_LIBRARIES})

cm_deploy(TARGETS ${CMAKE_WORKSPACE_NAME}_${CURRENT_PROJECT_NAME}
INCLUDE include
NAMESPACE ${CMAKE_WORKSPACE_NAME}::)


cm_add_test_subdirectory(test)
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
//---------------------------------------------------------------------------//
// Copyright (c) 2023 Martun Karapetyan <martun@nil.foundation>
//
// MIT License
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//---------------------------------------------------------------------------//

#ifndef CRYPTO3_PARALLELIZATION_UTILS_HPP
#define CRYPTO3_PARALLELIZATION_UTILS_HPP

#include <future>

#include <nil/actor/core/thread_pool.hpp>

namespace nil {
namespace crypto3 {

template<class ReturnType>
std::vector<ReturnType> wait_for_all(std::vector<std::future<ReturnType>> futures) {
std::vector<ReturnType> results;
for (auto& f: futures) {
results.push_back(f.get());
}
return results;
}

inline void wait_for_all(std::vector<std::future<void>> futures) {
for (auto& f: futures) {
f.get();
}
}

// Divides work into chunks and makes calls to 'func' in parallel.
template<class ReturnType>
std::vector<std::future<ReturnType>> parallel_run_in_chunks(
std::size_t elements_count,
std::function<ReturnType(std::size_t begin, std::size_t end)> func,
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {

auto& thread_pool = ThreadPool::get_instance(pool_id);

std::vector<std::future<ReturnType>> fut;
std::size_t workers_to_use = std::max((size_t)1, std::min(elements_count, thread_pool.get_pool_size()));

// For pool #0 we have experimentally found that operations over chunks of <4096 elements
// do not load the cores. In case we have smaller chunks, it's better to load less cores.
static constexpr std::size_t POOL_0_MIN_CHUNK_SIZE = 1 << 12;

// Pool #0 will take care of the lowest level of operations, like polynomial operations.
// We want the minimal size of elements_per_worker to be 'POOL_0_MIN_CHUNK_SIZE', otherwise the cores are not loaded.
if (pool_id == ThreadPool::PoolLevel::LOW && elements_count / workers_to_use < POOL_0_MIN_CHUNK_SIZE) {
workers_to_use = elements_count / POOL_0_MIN_CHUNK_SIZE + ((elements_count % POOL_0_MIN_CHUNK_SIZE) ? 1 : 0);
workers_to_use = std::max((size_t)1, workers_to_use);
}
const std::size_t elements_per_worker = elements_count / workers_to_use;

std::size_t begin = 0;
for (std::size_t i = 0; i < workers_to_use; i++) {
auto end = begin + (elements_count - begin) / (workers_to_use - i);
fut.emplace_back(thread_pool.post<ReturnType>([begin, end, func]() {
return func(begin, end);
}));
begin = end;
}
return fut;
}

// Similar to std::transform, but in parallel. We return void here for better usability for our use cases.
template<class InputIt1, class InputIt2, class OutputIt, class BinaryOperation>
void parallel_transform(InputIt1 first1, InputIt1 last1, InputIt2 first2,
OutputIt d_first, BinaryOperation binary_op,
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {

wait_for_all(parallel_run_in_chunks<void>(
std::distance(first1, last1),
// We need the lambda to be mutable, to be able to modify iterators captured by value.
[first1, last1, first2, d_first, binary_op](std::size_t begin, std::size_t end) mutable {
std::advance(first1, begin);
std::advance(first2, begin);
std::advance(d_first, begin);
for (std::size_t i = begin; i < end && first1 != last1; i++) {
*d_first = binary_op(*first1, *first2);
++first1;
++first2;
++d_first;
}
}, pool_id));
}

// Similar to std::transform, but in parallel. We return void here for better usability for our use cases.
template<class InputIt, class OutputIt, class UnaryOperation>
void parallel_transform(InputIt first1, InputIt last1,
OutputIt d_first, UnaryOperation unary_op,
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {

wait_for_all(parallel_run_in_chunks<void>(
std::distance(first1, last1),
// We need the lambda to be mutable, to be able to modify iterators captured by value.
[first1, last1, d_first, unary_op](std::size_t begin, std::size_t end) mutable {
std::advance(first1, begin);
std::advance(d_first, begin);
for (std::size_t i = begin; i < end && first1 != last1; i++) {
*d_first = unary_op(*first1);
++first1;
++d_first;
}
}, pool_id));
}

// This one is an optimization, since copying field elements is quite slow.
// BinaryOperation is supposed to modify the object in-place.
template<class InputIt1, class InputIt2, class BinaryOperation>
void in_place_parallel_transform(InputIt1 first1, InputIt1 last1, InputIt2 first2,
BinaryOperation binary_op,
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {

wait_for_all(parallel_run_in_chunks<void>(
std::distance(first1, last1),
// We need the lambda to be mutable, to be able to modify iterators captured by value.
[first1, last1, first2, binary_op](std::size_t begin, std::size_t end) mutable {
std::advance(first1, begin);
std::advance(first2, begin);
for (std::size_t i = begin; i < end && first1 != last1; i++) {
binary_op(*first1, *first2);
++first1;
++first2;
}
}, pool_id));
}

// This one is an optimization, since copying field elements is quite slow.
// UnaryOperation is supposed to modify the object in-place.
template<class InputIt, class UnaryOperation>
void parallel_foreach(InputIt first1, InputIt last1, UnaryOperation unary_op,
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {

wait_for_all(parallel_run_in_chunks<void>(
std::distance(first1, last1),
// We need the lambda to be mutable, to be able to modify iterators captured by value.
[first1, last1, unary_op](std::size_t begin, std::size_t end) mutable {
std::advance(first1, begin);
for (std::size_t i = begin; i < end && first1 != last1; i++) {
unary_op(*first1);
++first1;
}
}, pool_id));
}

// Calls function func for each value between [start, end).
inline void parallel_for(std::size_t start, std::size_t end, std::function<void(std::size_t index)> func,
ThreadPool::PoolLevel pool_id = ThreadPool::PoolLevel::LOW) {
wait_for_all(parallel_run_in_chunks<void>(
end - start,
[start, func](std::size_t range_begin, std::size_t range_end) {
for (std::size_t i = start + range_begin; i < start + range_end; i++) {
func(i);
}
}, pool_id));
}

} // namespace crypto3
} // namespace nil

#endif // CRYPTO3_PARALLELIZATION_UTILS_HPP
99 changes: 99 additions & 0 deletions libs/parallelization-utils/include/nil/actor/core/thread_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//---------------------------------------------------------------------------//
// Copyright (c) 2023 Martun Karapetyan <martun@nil.foundation>
//
// MIT License
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//---------------------------------------------------------------------------//

#ifndef CRYPTO3_THREAD_POOL_HPP
#define CRYPTO3_THREAD_POOL_HPP

#include <boost/asio/thread_pool.hpp>
#include <boost/asio/post.hpp>

#include <functional>
#include <future>
#include <thread>
#include <limits>
#include <memory>
#include <stdexcept>


namespace nil {
namespace crypto3 {

class ThreadPool {
public:

enum class PoolLevel {
LOW,
HIGH
};

/** Returns a thread pool, based on the pool_id. pool with LOW is normally used for low-level operations, like polynomial
* operations and fft. Any code that uses these operations and needs to be parallel will submit its tasks to pool with HIGH.
* Submission of higher level tasks to low level pool will immediately result in a deadlock.
*/
static ThreadPool& get_instance(PoolLevel pool_id, std::size_t pool_size = std::thread::hardware_concurrency()) {
static ThreadPool instance_for_low_level(pool_size);
static ThreadPool instance_for_higher_level(pool_size);

if (pool_id == PoolLevel::LOW)
return instance_for_low_level;
if (pool_id == PoolLevel::HIGH)
return instance_for_higher_level;
throw std::invalid_argument("Invalid instance of thread pool requested.");
}

ThreadPool(const ThreadPool& obj)= delete;
ThreadPool& operator=(const ThreadPool& obj)= delete;

template<class ReturnType>
inline std::future<ReturnType> post(std::function<ReturnType()> task) {
auto packaged_task = std::make_shared<std::packaged_task<ReturnType()>>(std::move(task));
std::future<ReturnType> fut = packaged_task->get_future();
boost::asio::post(pool, [packaged_task]() -> void { (*packaged_task)(); });
return fut;
}

// Waits for all the tasks to complete.
inline void join() {
pool.join();
}

std::size_t get_pool_size() const {
return pool_size;
}

private:
inline ThreadPool(std::size_t pool_size)
: pool(pool_size)
, pool_size(pool_size) {
}

boost::asio::thread_pool pool;
const std::size_t pool_size;

};

} // namespace crypto3
} // namespace nil

#endif // CRYPTO3_THREAD_POOL_HPP
Loading

0 comments on commit d09f687

Please sign in to comment.