Skip to content

Commit

Permalink
[BugFix] Setting max_compaction_concurrency to -1/0 via dynamic param…
Browse files Browse the repository at this point in the history
…eter is not as expected. (#50875)

Signed-off-by: edwinhzhang <edwinhzhang@tencent.com>
Signed-off-by: zhanghe <edwinhzhang@tencent.com>
Signed-off-by: zhanghe <18700939@qq.com>
Co-authored-by: wyb <wybb86@gmail.com>
  • Loading branch information
zhangheihei and wyb authored Oct 22, 2024
1 parent 6080065 commit 6e55970
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 24 deletions.
45 changes: 43 additions & 2 deletions be/src/storage/compaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void CompactionManager::schedule() {

st = ThreadPoolBuilder("compact_pool")
.set_min_threads(1)
.set_max_threads(std::max(1, max_task_num()))
.set_max_threads(std::max(1, _max_task_num))
.set_max_queue_size(1000)
.build(&_compaction_pool);
DCHECK(st.ok());
Expand Down Expand Up @@ -575,9 +575,50 @@ std::unordered_set<CompactionTask*> CompactionManager::get_running_task(const Ta
return res;
}

int32_t CompactionManager::compute_max_compaction_task_num() const {
int32_t max_task_num = 0;
// new compaction framework
if (config::base_compaction_num_threads_per_disk >= 0 && config::cumulative_compaction_num_threads_per_disk >= 0) {
max_task_num = static_cast<int32_t>(
StorageEngine::instance()->get_store_num() *
(config::cumulative_compaction_num_threads_per_disk + config::base_compaction_num_threads_per_disk));
} else {
// When cumulative_compaction_num_threads_per_disk or config::base_compaction_num_threads_per_disk is less than 0,
// there is no limit to _max_task_num if max_compaction_concurrency is also less than 0, and here we set maximum value to be 20.
max_task_num = std::min(20, static_cast<int32_t>(StorageEngine::instance()->get_store_num() * 5));
}

{
std::lock_guard lg(_compact_threads_mutex);
if (_max_compaction_concurrency > 0 && _max_compaction_concurrency < max_task_num) {
max_task_num = _max_compaction_concurrency;
}
}

return max_task_num;
}

void CompactionManager::set_max_compaction_concurrency(int threads_num) {
std::lock_guard lg(_compact_threads_mutex);
_max_compaction_concurrency = threads_num;
}

Status CompactionManager::update_max_threads(int max_threads) {
if (_compaction_pool != nullptr) {
return _compaction_pool->update_max_threads(max_threads);
int32 max_thread_num = 0;
set_max_compaction_concurrency(max_threads);
{
std::lock_guard lg(_tasks_mutex);
if (max_threads == 0) {
_max_task_num = 0;
return Status::OK();
}

_max_task_num = compute_max_compaction_task_num();
max_thread_num = _max_task_num;
}

return _compaction_pool->update_max_threads(std::max(1, max_thread_num));
} else {
return Status::InternalError("Thread pool not exist");
}
Expand Down
16 changes: 14 additions & 2 deletions be/src/storage/compaction_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ class CompactionManager {
return exceed;
}

int32_t max_task_num() { return _max_task_num; }
int32_t max_task_num() const {
std::lock_guard lg(_tasks_mutex);
return _max_task_num;
}

uint16_t running_cumulative_tasks_num_for_dir(DataDir* data_dir) {
std::lock_guard lg(_tasks_mutex);
Expand All @@ -112,6 +115,10 @@ class CompactionManager {

Status update_max_threads(int max_threads);

int32_t compute_max_compaction_task_num() const;

void set_max_compaction_concurrency(int threads_num);

double max_score();

double last_score();
Expand All @@ -128,6 +135,8 @@ class CompactionManager {

int get_waiting_task_num();

ThreadPool* TEST_get_compaction_thread_pool() { return _compaction_pool.get(); }

void disable_table_compaction(int64_t table_id, int64_t deadline);

private:
Expand All @@ -149,7 +158,7 @@ class CompactionManager {
// protect by _mutex
std::set<CompactionCandidate, CompactionCandidateComparator> _compaction_candidates;

std::mutex _tasks_mutex;
mutable std::mutex _tasks_mutex;
std::atomic<uint64_t> _next_task_id;
std::map<int64_t, std::unordered_set<CompactionTask*>> _running_tasks;
std::unordered_map<DataDir*, uint16_t> _data_dir_to_cumulative_task_num_map;
Expand Down Expand Up @@ -177,6 +186,9 @@ class CompactionManager {

std::unique_ptr<ThreadPool> _compaction_pool = nullptr;
std::thread _scheduler_thread;

mutable std::mutex _compact_threads_mutex;
int32_t _max_compaction_concurrency = 0;
};

} // namespace starrocks
17 changes: 2 additions & 15 deletions be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,21 +180,8 @@ Status StorageEngine::start_bg_threads() {
}
}
} else {
int32_t max_task_num = 0;
// new compaction framework
if (config::base_compaction_num_threads_per_disk >= 0 &&
config::cumulative_compaction_num_threads_per_disk >= 0) {
max_task_num = static_cast<int32_t>(StorageEngine::instance()->get_store_num() *
(config::cumulative_compaction_num_threads_per_disk +
config::base_compaction_num_threads_per_disk));
} else {
// When cumulative_compaction_num_threads_per_disk or config::base_compaction_num_threads_per_disk is less than 0,
// there is no limit to _max_task_num if max_compaction_concurrency is also less than 0, and here we set maximum value to be 20.
max_task_num = std::min(20, static_cast<int32_t>(StorageEngine::instance()->get_store_num() * 5));
}
if (config::max_compaction_concurrency > 0 && config::max_compaction_concurrency < max_task_num) {
max_task_num = config::max_compaction_concurrency;
}
_compaction_manager->set_max_compaction_concurrency(config::max_compaction_concurrency);
int32_t max_task_num = _compaction_manager->compute_max_compaction_task_num();

(void)Compaction::init(max_task_num);

Expand Down
42 changes: 42 additions & 0 deletions be/test/storage/compaction_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ class CompactionManagerTest : public testing::Test {
ASSERT_TRUE(fs::remove_all(config::storage_root_path).ok());
}
config::storage_root_path = _default_storage_root_path;
config::max_compaction_concurrency = -1;
config::enable_event_based_compaction_framework = true;
config::max_compaction_candidate_num = 40960;
config::cumulative_compaction_num_threads_per_disk = 1;
config::base_compaction_num_threads_per_disk = 1;
}

protected:
Expand Down Expand Up @@ -360,4 +365,41 @@ TEST_F(CompactionManagerTest, test_compaction_parallel) {
ASSERT_EQ(0, _engine->compaction_manager()->running_tasks_num());
}

TEST_F(CompactionManagerTest, test_compaction_update_thread_pool_num) {
config::max_compaction_concurrency = 10;
config::cumulative_compaction_num_threads_per_disk = 2;
config::base_compaction_num_threads_per_disk = 2;
_engine->compaction_manager()->set_max_compaction_concurrency(config::max_compaction_concurrency);
int32_t compaction_concurrency = _engine->compaction_manager()->compute_max_compaction_task_num();
EXPECT_EQ(4, compaction_concurrency);

config::cumulative_compaction_num_threads_per_disk = 0;
config::base_compaction_num_threads_per_disk = 0;
_engine->compaction_manager()->set_max_compaction_concurrency(config::max_compaction_concurrency);
compaction_concurrency = _engine->compaction_manager()->compute_max_compaction_task_num();
EXPECT_EQ(0, compaction_concurrency);

config::cumulative_compaction_num_threads_per_disk = -1;
config::base_compaction_num_threads_per_disk = -1;
_engine->compaction_manager()->set_max_compaction_concurrency(config::max_compaction_concurrency);
compaction_concurrency = _engine->compaction_manager()->compute_max_compaction_task_num();
EXPECT_EQ(5, compaction_concurrency);

_engine->compaction_manager()->init_max_task_num(compaction_concurrency);
_engine->compaction_manager()->schedule();
EXPECT_EQ(5, _engine->compaction_manager()->TEST_get_compaction_thread_pool()->max_threads());

_engine->compaction_manager()->update_max_threads(3);
EXPECT_EQ(3, _engine->compaction_manager()->TEST_get_compaction_thread_pool()->max_threads());
EXPECT_EQ(3, _engine->compaction_manager()->max_task_num());

_engine->compaction_manager()->update_max_threads(0);
EXPECT_EQ(3, _engine->compaction_manager()->TEST_get_compaction_thread_pool()->max_threads());
EXPECT_EQ(0, _engine->compaction_manager()->max_task_num());

_engine->compaction_manager()->update_max_threads(-1);
EXPECT_EQ(5, _engine->compaction_manager()->TEST_get_compaction_thread_pool()->max_threads());
EXPECT_EQ(5, _engine->compaction_manager()->max_task_num());
}

} // namespace starrocks
2 changes: 1 addition & 1 deletion docs/en/administration/management/BE_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ curl http://<BE_IP>:<BE_HTTP_PORT>/varz
- Type: Int
- Unit: -
- Is mutable: Yes
- Description: The maximum concurrency of compactions (including both Base Compaction and Cumulative Compaction). The value `-1` indicates that no limit is imposed on the concurrency. `0` indicates disabling compaction.
- Description: The maximum concurrency of compactions (including both Base Compaction and Cumulative Compaction). The value `-1` indicates that no limit is imposed on the concurrency. `0` indicates disabling compaction. This parameter is mutable when the Event-based Compaction Framework is enabled.
- Introduced in: -

##### compaction_trace_threshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ Explanation:

* **BE Configuration**

| Name | Default| Description|
| Name | Default| Description|
| --- | --- | --- |
| vector_chunk_size | 4096 | Number of chunk rows |
| mem_limit | 90% | BE process memory upper limit. You can set it as a percentage ("80%") or a physical limit ("100G"). The default hard limit is 90% of the server's memory size, and the soft limit is 80%. You need to configure this parameter if you want to deploy StarRocks with other memory-intensive services on a same server. |
| disable_storage_page_cache | false | The boolean value to control whether to disable PageCache. When PageCache is enabled, StarRocks caches the recently scanned data. PageCache can significantly improve the query performance when similar queries are repeated frequently. `true` indicates to disable PageCache. Use this item together with `storage_page_cache_limit`, you can accelerate query performance in scenarios with sufficient memory resources and much data scan. The default value of this item has been changed from `true` to `false` since StarRocks v2.4. |
| write_buffer_size | 104857600 | The capacity limit of a single MemTable, exceeding which a disk swipe will be performed. |
| load_process_max_memory_limit_bytes | 107374182400 | The upper limit of memory resources that can be taken up by all load processes on a BE node. Its value is the smaller one between `mem_limit * load_process_max_memory_limit_percent / 100` and `load_process_max_memory_limit_bytes`. If this threshold is exceeded, a flush and backpressure will be triggered. |
| load_process_max_memory_limit_percent | 30 | The maximum percentage of memory resources that can be taken up by all load processes on a BE node. Its value is the smaller one between `mem_limit * load_process_max_memory_limit_percent / 100` and `load_process_max_memory_limit_bytes`. If this threshold is exceeded, a flush and backpressure will be triggered. |
| default_load_mem_limit | 2147483648 | If the memory limit on the receiving side is reached for a single import instance, a disk swipe will be triggered. This needs to be modified with the Session variable `load_mem_limit` to take effect. |
| default_load_mem_limit | 2147483648 | If the memory limit on the receiving side is reached for a single import instance, a disk swipe will be triggered. This needs to be modified with the Session variable `load_mem_limit` to take effect. This parameter is mutable when the Event-based Compaction Framework is enabled.|
| max_compaction_concurrency | -1 | The maximum concurrency of compactions (both Base Compaction and Cumulative Compaction). The value -1 indicates that no limit is imposed on the concurrency. |
| cumulative_compaction_check_interval_seconds | 1 | Interval of compaction check|

Expand Down
2 changes: 1 addition & 1 deletion docs/zh/administration/management/BE_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ curl http://<BE_IP>:<BE_HTTP_PORT>/varz
- 类型:Int
- 单位:-
- 是否动态:是
- 描述:Compaction 线程数上限(即 BaseCompaction + CumulativeCompaction 的最大并发)。该参数防止 Compaction 占用过多内存。 `-1` 代表没有限制。`0` 表示禁用 Compaction。
- 描述:Compaction 线程数上限(即 BaseCompaction + CumulativeCompaction 的最大并发)。该参数防止 Compaction 占用过多内存。 `-1` 代表没有限制。`0` 表示禁用 Compaction。开启 Event-based Compaction Framework 时,该参数才支持动态设置。
- 引入版本:-

##### compaction_trace_threshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ StarRocks BE 中的内存分为以下几类。
| consistency_max_memory_limit_percent | 20 | 一致性校验任务使用的内存上限,取 mem_limit * consistency_max_memory_limit_percent / 100 和 consistency_max_memory_limit 中较小的值。内存使用超限,会导致一致性校验任务失败。 |
| consistency_max_memory_limit | 10G | 一致性校验任务使用的内存上限,取 mem_limit * consistency_max_memory_limit_percent / 100 和 consistency_max_memory_limit 中较小的值。内存使用超限,会导致一致性校验任务失败。 |
| memory_limitation_per_thread_for_schema_change | 2 | 单个 Schema Change 任务的内存使用上限,内存使用超限,会导致 Schema Change 任务失败。 |
| max_compaction_concurrency | -1 | Compaction 线程数上限(即 BaseCompaction + CumulativeCompaction 的最大并发)。该参数防止 Compaction 占用过多内存。 -1 代表没有限制。0 表示不允许 compaction。 |
| max_compaction_concurrency | -1 | Compaction 线程数上限(即 BaseCompaction + CumulativeCompaction 的最大并发)。该参数防止 Compaction 占用过多内存。 -1 代表没有限制。0 表示不允许 compaction。开启 Event-based Compaction Framework 时,该参数才支持动态设置。 |

### Session 变量

Expand Down

0 comments on commit 6e55970

Please sign in to comment.