Skip to content

Commit

Permalink
io overlap
Browse files Browse the repository at this point in the history
  • Loading branch information
Zijie-Tian committed Sep 27, 2023
1 parent fb44b4b commit fcb3512
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 30 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ MigrationBackup/
/.vs
/out/build/x64-Debug
cscope*
.vscode/

build_linux/
!.github/actions/build
Expand Down
23 changes: 10 additions & 13 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"files.associations": {
"iomanip": "cpp",
"*.tcc": "cpp",
"chrono": "cpp",
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
Expand All @@ -15,36 +16,33 @@
"any": "cpp",
"array": "cpp",
"atomic": "cpp",
"hash_map": "cpp",
"hash_set": "cpp",
"strstream": "cpp",
"bit": "cpp",
"*.tcc": "cpp",
"bitset": "cpp",
"chrono": "cpp",
"cinttypes": "cpp",
"codecvt": "cpp",
"compare": "cpp",
"complex": "cpp",
"concepts": "cpp",
"condition_variable": "cpp",
"cstdint": "cpp",
"deque": "cpp",
"list": "cpp",
"map": "cpp",
"set": "cpp",
"string": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"vector": "cpp",
"exception": "cpp",
"algorithm": "cpp",
"functional": "cpp",
"iterator": "cpp",
"map": "cpp",
"memory": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"optional": "cpp",
"random": "cpp",
"ratio": "cpp",
"set": "cpp",
"string": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"tuple": "cpp",
Expand All @@ -53,23 +51,22 @@
"fstream": "cpp",
"future": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"mutex": "cpp",
"new": "cpp",
"numbers": "cpp",
"ostream": "cpp",
"semaphore": "cpp",
"shared_mutex": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"stop_token": "cpp",
"streambuf": "cpp",
"thread": "cpp",
"cfenv": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp"
"typeinfo": "cpp",
"bit": "cpp"
}
}
97 changes: 84 additions & 13 deletions apps/search_disk_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ctime>
#include <iomanip>
#include <atomic>
#include <vector>

#include "index.h"
#include "iostats.h"
Expand Down Expand Up @@ -73,7 +74,8 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre
const uint32_t num_threads, const uint32_t recall_at, const uint32_t beamwidth,
const uint32_t num_nodes_to_cache, const uint32_t search_io_limit,
const std::vector<uint32_t> &Lvec, const float fail_if_recall_below,
const std::vector<std::string> &query_filters, std::ofstream& csv_stream, const bool use_reorder_data = false)
const std::vector<std::string> &query_filters, std::ofstream& csv_stream,
std::string& profile_perfix, const bool use_reorder_data = false)
{
diskann::cout << "Search parameters: #threads: " << num_threads << ", ";
if (beamwidth <= 0)
Expand All @@ -85,6 +87,12 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre
else
diskann::cout << ", io_limit: " << search_io_limit << "." << std::endl;

std::ofstream cache_hit_rate_file;
cache_hit_rate_file.open(profile_perfix + "_cache_hit_rate.csv", std::ios::out);

std::ofstream nhop_file;
nhop_file.open(profile_perfix + "_nhop.csv", std::ios::out);

std::string warmup_query_file = index_path_prefix + "_sample_data.bin";

// load query bin
Expand Down Expand Up @@ -198,12 +206,12 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre

std::string recall_string = "Recall@" + std::to_string(recall_at);
diskann::cout << std::setw(6) << "L" << std::setw(12) << "Beamwidth" << std::setw(16) << "QPS / thread" << std::setw(16)
<< "Mean Latency" << std::setw(16) << "99.9 Latency" << std::setw(16) << "Mean IOs" << std::setw(16)
<< "Mean Latency" << std::setw(16) << "99.9 Latency" << std::setw(16) << "Mean IOs" << std::setw(16) << "Mean Cache hit Rate" << std::setw(16)
<< "CPU (us)" << std::setw(16) << "ComputeDist (us)" << std::setw(16) << "PQDist (us)" << std::setw(16) << "SinglePQDist (us) & chunks"
<< std::setw(16) << "IO Time(us)" << std::setw(16) << "Mean IO 4k" << std::setw(16) << "Mean Nnbrs" << std::setw(16) << "Mean Ncompdist" ;

csv_stream << "L,Beamwidth,QPS / thread,Mean Latency,99.9 Latency,Mean IOs,CPU (us),ComputeDist (us),"
<< "PQDist (us),SinglePQDist (us) & chunks,IO Time(us),Mean IO 4k,Mean Nnbrs,Mean Ncompdist";
csv_stream << "L,Beamwidth,QPS / thread,Mean Latency,99.9 Latency,Mean IOs,Mean Cache hit Rate,CPU (us),"
<< "ComputeDist (us),PQDist (us),SinglePQDist (us),IO Time(us),Mean IO 4k,Mean Nnbrs,Mean Ncompdist";

if (calc_recall_flag)
{
Expand All @@ -227,6 +235,10 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre

for (uint32_t test_id = 0; test_id < Lvec.size(); test_id++)
{
std::string ids_file = profile_perfix + "_" + std::to_string(Lvec[test_id]) + "_ids.csv";
std::ofstream ids_stream;
ids_stream.open(ids_file, std::ios::out);

uint32_t L = Lvec[test_id];

if (L < recall_at)
Expand Down Expand Up @@ -255,6 +267,7 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre
#pragma omp parallel for schedule(dynamic, 1)
for (int64_t i = 0; i < (int64_t)query_num; i++)
{
// (stats + i) -> iter_ids = new uint32_t[L * optimized_beamwidth];
if (!filtered_search)
{
_pFlashIndex->cached_beam_search(query + (i * query_aligned_dim), recall_at, L,
Expand Down Expand Up @@ -322,6 +335,24 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre
auto mean_nchunks = diskann::get_mean_stats<float>(
stats, query_num, [](const diskann::QueryStats &stats) { return stats.n_chunks; });

auto cache_hit_rate = diskann::get_mean_stats<float>(
stats, query_num, [](const diskann::QueryStats &stats) { return stats.cache_hit_rate; });

std::vector<float> chr_arr;
diskann::get_stats_arr<float>(
stats, query_num, chr_arr, [](const diskann::QueryStats &stats) { return stats.cache_hit_rate; });

std::vector<unsigned> n_cache_hits_arr;
diskann::get_stats_arr<unsigned>(
stats, query_num, n_cache_hits_arr, [](const diskann::QueryStats &stats) { return stats.n_cache_hits; });

std::vector<unsigned> n_cache_misses_arr;
diskann::get_stats_arr<unsigned>(
stats, query_num, n_cache_misses_arr, [](const diskann::QueryStats &stats) { return stats.n_cache_misses; });

std::vector<unsigned> n_hop_arr;
diskann::get_stats_arr<unsigned>(
stats, query_num, n_hop_arr, [](const diskann::QueryStats &stats) { return stats.n_hops; });

double recall = 0;
if (calc_recall_flag)
Expand All @@ -331,13 +362,45 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre
best_recall = std::max(recall, best_recall);
}

diskann::cout << "Cache hit rate distribution: " << std::endl;

for (int i = 0; i < n_cache_hits_arr.size(); i++){
cache_hit_rate_file << n_cache_hits_arr[i] << ",";
}
cache_hit_rate_file << "\n";

for (int i = 0; i < n_cache_misses_arr.size(); i++){
cache_hit_rate_file << n_cache_misses_arr[i] << ",";
}
cache_hit_rate_file << "\n";

for (int i = 0; i < chr_arr.size(); i++){
// diskann::cout << chr_arr[i] << " ";
cache_hit_rate_file << chr_arr[i] << ",";
}
cache_hit_rate_file << "\n" << "\n";

diskann::cout << "Nhop distribution: " << std::endl;
for (int i = 0; i < n_hop_arr.size(); i++){
nhop_file << n_hop_arr[i] << ",";
}
nhop_file << "\n" ;




for (int64_t i = 0; i < (int64_t)query_num; i++) {
ids_stream << i << "," << stats[i].iter_ids << "\n";
}


diskann::cout << std::setw(6) << L << std::setw(12) << optimized_beamwidth << std::setw(16) << qps / num_threads
<< std::setw(16) << mean_latency << std::setw(16) << latency_999 << std::setw(16) << mean_ios
<< std::setw(16) << mean_latency << std::setw(16) << latency_999 << std::setw(16) << mean_ios << std::setw(16) << cache_hit_rate
<< std::setw(16) << mean_cpuus << std::setw(16) << mean_compute_dist_us << std::setw(16) << mean_pqdist_us << std::setw(16) << mean_single_pqdist_us << "," << mean_nchunks
<< std::setw(16) << mean_io_us << std::setw(12) << mean_n_4k << std::setw(16) << mean_nnbrs << std::setw(16) << mean_ndist;

csv_stream << L << "," << optimized_beamwidth << "," << qps / num_threads << "," << mean_latency << ","
<< latency_999 << "," << mean_ios << "," << mean_cpuus << "," << mean_compute_dist_us << ","
<< latency_999 << "," << mean_ios << "," << cache_hit_rate << "," << mean_cpuus << "," << mean_compute_dist_us << ","
<< mean_pqdist_us << "," << mean_single_pqdist_us << "," << mean_io_us << ","
<< mean_n_4k << "," << mean_nnbrs << "," << mean_ndist;

Expand All @@ -350,6 +413,8 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre
diskann::cout << std::endl;
csv_stream << "\n";
}

ids_stream.close();
delete[] stats;
}

Expand All @@ -370,6 +435,10 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre
diskann::aligned_free(query);
if (warmup != nullptr)
diskann::aligned_free(warmup);

cache_hit_rate_file.close();
nhop_file.close();

return best_recall >= fail_if_recall_below ? 0 : -1;
}

Expand Down Expand Up @@ -409,7 +478,7 @@ void run_iostat(std::ofstream &csv_stream, std::atomic_bool& run_profile, int n_
diskann::replace_dstats(curr_dstats, prev_dstats, n_dev);

for (int i = 0; i < n_dev; ++i) {
if (strcmp(io_stats[i]->devname, "nvme3n1") != 0) {
if (strcmp(io_stats[i]->devname, "sda") != 0) {
continue;
}

Expand Down Expand Up @@ -543,6 +612,8 @@ int main(int argc, char **argv)
std::ofstream csv_stream;
csv_stream.open(csv_file, std::ios::out);

std::string perfix = csv_file.substr(0, csv_file.find_last_of("."));


//! 启动iostats测试线程。
std::string iostats = result_path_prefix + "_iostats.csv";
Expand Down Expand Up @@ -612,15 +683,15 @@ int main(int argc, char **argv)
if (data_type == std::string("float"))
search_disk_index<float, uint16_t>(
metric, index_path_prefix, result_path_prefix, query_file, gt_file, num_threads, K, W,
num_nodes_to_cache, search_io_limit, Lvec, fail_if_recall_below, query_filters, csv_stream, use_reorder_data);
num_nodes_to_cache, search_io_limit, Lvec, fail_if_recall_below, query_filters, csv_stream, perfix, use_reorder_data);
else if (data_type == std::string("int8"))
search_disk_index<int8_t, uint16_t>(
metric, index_path_prefix, result_path_prefix, query_file, gt_file, num_threads, K, W,
num_nodes_to_cache, search_io_limit, Lvec, fail_if_recall_below, query_filters, csv_stream, use_reorder_data);
num_nodes_to_cache, search_io_limit, Lvec, fail_if_recall_below, query_filters, csv_stream, perfix, use_reorder_data);
else if (data_type == std::string("uint8"))
search_disk_index<uint8_t, uint16_t>(
metric, index_path_prefix, result_path_prefix, query_file, gt_file, num_threads, K, W,
num_nodes_to_cache, search_io_limit, Lvec, fail_if_recall_below, query_filters, csv_stream, use_reorder_data);
num_nodes_to_cache, search_io_limit, Lvec, fail_if_recall_below, query_filters, csv_stream, perfix, use_reorder_data);
else
{
std::cerr << "Unsupported data type. Use float or int8 or uint8" << std::endl;
Expand All @@ -632,15 +703,15 @@ int main(int argc, char **argv)
if (data_type == std::string("float"))
search_disk_index<float>(metric, index_path_prefix, result_path_prefix, query_file, gt_file,
num_threads, K, W, num_nodes_to_cache, search_io_limit, Lvec,
fail_if_recall_below, query_filters, csv_stream, use_reorder_data);
fail_if_recall_below, query_filters, csv_stream, perfix, use_reorder_data);
else if (data_type == std::string("int8"))
search_disk_index<int8_t>(metric, index_path_prefix, result_path_prefix, query_file, gt_file,
num_threads, K, W, num_nodes_to_cache, search_io_limit, Lvec,
fail_if_recall_below, query_filters, csv_stream, use_reorder_data);
fail_if_recall_below, query_filters, csv_stream, perfix, use_reorder_data);
else if (data_type == std::string("uint8"))
search_disk_index<uint8_t>(metric, index_path_prefix, result_path_prefix, query_file, gt_file,
num_threads, K, W, num_nodes_to_cache, search_io_limit, Lvec,
fail_if_recall_below, query_filters, csv_stream, use_reorder_data);
fail_if_recall_below, query_filters, csv_stream, perfix, use_reorder_data);
else
{
std::cerr << "Unsupported data type. Use float or int8 or uint8" << std::endl;
Expand Down
77 changes: 77 additions & 0 deletions build/benchmarks/search_disk_io_overlap.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/bin/bash
# set -x

#! 检查/app路径是否存在
if [ ! -d "/app" ]; then
echo "May not Docker env."
exit 1
fi

base_path="/app/DiskANN/build/data"

shard_path=${base_path}/bigann_shards
search_path=${base_path}/bigann_search_overlap

base_file="/app/data/bigann-1B/base.1B.u8bin"
query_file="/app/data/bigann-1B/query.public.10K.u8bin"


mkdir -p ${search_path}
# rm -f ${search_path}/

# num_points=100000
npts_arr=("100000" "200000" "300000" "400000" "500000")
# npts_arr=("1000000")

#! 10% cached
cache_size_arr=("10000" "20000" "30000" "40000" "50000")
# cache_size_arr=("100000")

N_threads_arr=("4" "8" "16" "32" "64")

# for num_points in "${npts_arr[@]}"; do
for index in "${!npts_arr[@]}"; do
num_points=${npts_arr[$index]}
cache_size=${cache_size_arr[$index]}

#! 这个参数没用了,我用QD去制定了。,但是后面可能还会用上。
B=$(echo "scale=4; ((64 * ${npts_arr[$index]}) + (${cache_size_arr[$index]} * (4 * 64 + 1 * 128))) / 2^30" | bc | awk '{printf "%.4f\n", $0}')
# B=$(echo "scale=4; ((64 * ${npts_arr[$index]}) + 4 * 2) / 2^30" | bc | awk '{printf "%.4f\n", $0}')

# B=${B_arr[$index]}

echo "num_points: ${num_points} cache_size: ${cache_size} B: ${B} T: ${N_threads}"

shard_file=${shard_path}/bigann_shard_${num_points}.data
# ./apps/utils/crop_data uint8 ${base_file} ${search_path}/bigann_shard_${num_points} ${num_points}

if [[ ! -f "$shard_file" ]]; then
echo "$shard_file not exists."
exit -1
fi

if [[ ! -d "${search_path}/${num_points}" ]]; then
mkdir -p ${search_path}/${num_points}

./apps/utils/compute_groundtruth --data_type uint8 --dist_fn l2 --base_file ${shard_file} --query_file ${query_file} --gt_file ${search_path}/bigann_search_${num_points}_gt100 --K 100

./apps/build_disk_index --data_type uint8 --dist_fn l2 --data_path ${shard_file} --index_path_prefix ${search_path}/disk_index_bigann_${num_points}_R64_L75_A1.2 -R 64 -L 75 -T 144 -B ${B} --QD 64 -M 128

mv ${search_path}/disk_index_bigann_${num_points}_R64_L75_A1.2* ${search_path}/${num_points}/
mv ${search_path}/bigann_search_${num_points}_gt100 ${search_path}/${num_points}/
fi

# if [[ ! -d "${search_path}/${num_points}/search_res" ]]; then
# mkdir -p ${search_path}/${num_points}/search_res
# fi

for T_id in "${!N_threads_arr[@]}"; do
N_threads=${N_threads_arr[$T_id]}
if [[ ! -d "${search_path}/${num_points}/${N_threads}/search_res" ]]; then
mkdir -p ${search_path}/${num_points}/${N_threads}/search_res
fi

./apps/search_disk_index --data_type uint8 --dist_fn l2 --index_path_prefix ${search_path}/${num_points}/disk_index_bigann_${num_points}_R64_L75_A1.2 --query_file ${query_file} --gt_file ${search_path}/${num_points}/bigann_search_${num_points}_gt100 -K 10 -T ${N_threads} -W 2 -L 10 20 30 40 50 100 --result_path ${search_path}/${num_points}/${N_threads}/search_res/res --csv_file ${search_path}/${num_points}/${N_threads}/search_stats.csv --num_nodes_to_cache ${cache_size} 1> ${search_path}/${num_points}/${N_threads}/search_disk_index_bigann_${num_points}_T${N_threads}_R64_L75_A1.2.log 2>&1
# gdb --args ./apps/search_disk_index --data_type uint8 --dist_fn l2 --index_path_prefix ${search_path}/${num_points}/disk_index_bigann_${num_points}_R64_L75_A1.2 --query_file ${query_file} --gt_file ${search_path}/${num_points}/bigann_search_${num_points}_gt100 -K 10 -T ${N_threads} -W 2 -L 10 20 30 40 50 100 --result_path ${search_path}/${num_points}/${N_threads}/search_res/res --csv_file ${search_path}/${num_points}/${N_threads}/search_stats.csv --num_nodes_to_cache ${cache_size}
done
done
Loading

0 comments on commit fcb3512

Please sign in to comment.