Skip to content

Commit

Permalink
support tracking updated aggregated states (#465)
Browse files Browse the repository at this point in the history
* support tracking bucket changes in two level hash table

* add expanded data in aggregate state to track updated and retracted

* fix unstable smoke test

* fix comments
 * remove compile aggregate functions for streaming query now
 * remove no_more_keys
 * remove overflow_rows
 * move out refactor code for retract impl

* fix comments 2
 * rename UpdatesTrackingData.h to TrackingUpdatesData.h
 * use temp arena instread of shared ptr of arena
  • Loading branch information
yl-lisen authored Feb 3, 2024
1 parent e08e71d commit 5dc02c0
Show file tree
Hide file tree
Showing 29 changed files with 1,587 additions and 2,066 deletions.
26 changes: 23 additions & 3 deletions src/Common/HashMapsTemplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
#include <Common/ColumnsHashing.h>
#include <Common/HashTable/FixedHashMap.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/StringHashTable.h>
#include <Common/HashTable/StringHashMap.h>
#include <Common/HashTable/TwoLevelStringHashMap.h>

namespace DB
{
Expand All @@ -24,9 +25,14 @@ void serializeHashMap(const Map & map, MappedSerializer && mapped_serializer, Wr
});
}

template <bool is_string_hash_map, typename Map, typename MappedDeserializer>
template <typename Map, typename MappedDeserializer>
void deserializeHashMap(Map & map, MappedDeserializer && mapped_deserializer, Arena & pool, ReadBuffer & rb)
{
using Mapped = std::decay_t<Map>::mapped_type;

constexpr bool is_string_hash_map
= std::is_same_v<std::decay_t<Map>, StringHashMap<Mapped>> || std::is_same_v<std::decay_t<Map>, TwoLevelStringHashMap<Mapped>>;

/// For StringHashMap or TwoLevelStringHashMap, it requires StringRef key padded 8 keys(left and right).
/// So far, the Arena's MemoryChunk is always padding right 15, so we just pad left 8 here
if constexpr (is_string_hash_map)
Expand Down Expand Up @@ -60,6 +66,20 @@ void deserializeHashMap(Map & map, MappedDeserializer && mapped_deserializer, Ar
pool.setPaddingLeft(0);
}

template <typename Map, typename MappedSerializer>
void serializeTwoLevelHashMap(const Map & map, MappedSerializer && mapped_serializer, WriteBuffer & wb)
{
serializeHashMap<Map, MappedSerializer>(map, std::move(mapped_serializer), wb);
map.writeUpdatedBuckets(wb);
}

template <typename Map, typename MappedDeserializer>
void deserializeTwoLevelHashMap(Map & map, MappedDeserializer && mapped_deserializer, Arena & pool, ReadBuffer & rb)
{
deserializeHashMap<Map, MappedDeserializer>(map, std::move(mapped_deserializer), pool, rb);
map.readUpdatedBuckets(rb); /// recover buckets updated status
}

/// HashMapsTemplate is a taken from HashJoin class and make it standalone
/// and could be shared among different components

Expand Down Expand Up @@ -187,7 +207,7 @@ struct HashMapsTemplate
#define M(NAME) \
case HashType::NAME: { \
assert(NAME); \
deserializeHashMap<false>(*NAME, mapped_deserializer, pool, rb); \
deserializeHashMap(*NAME, mapped_deserializer, pool, rb); \
return; \
}
APPLY_FOR_HASH_KEY_VARIANTS(M)
Expand Down
14 changes: 14 additions & 0 deletions src/Common/HashTable/TimeBucketHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ class TimeBucketHashMapTable
p.second.forEachValue(func);
}

template <typename Func>
void ALWAYS_INLINE forEachValueOfUpdatedBuckets(Func && func, bool reset_updated = false)
{
for (auto & p : this->impls)
{
if (this->isBucketUpdated(p.first))
{
p.second.forEachValue(func);
if (reset_updated)
this->resetUpdatedBucket(p.first);
}
}
}

typename Cell::Mapped & ALWAYS_INLINE operator[](const Key & x)
{
LookupResult it;
Expand Down
57 changes: 57 additions & 0 deletions src/Common/HashTable/TimeBucketHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
using ConstLookupResult = typename Impl::ConstLookupResult;

/// FIXME, choose a better perf data structure
/// Usually we don't have too many time buckets
std::map<Int64, Impl> impls;
std::unordered_map<Int64, bool/*updated*/> updated_buckets;
Impl sentinel;

TimeBucketHashTable() { }
Expand Down Expand Up @@ -263,6 +265,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
{
auto window = windowKey(key_holder);
impls[window].emplace(key_holder, it, inserted, hash_value);
updated_buckets[window] = true; /// updated
}

LookupResult ALWAYS_INLINE find(Key x, size_t hash_value)
Expand All @@ -289,6 +292,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
{
DB::writeIntBinary(p.first);
p.second.write(wb);
DB::writeBinary(updated_buckets[p.first], wb);
}
}

Expand All @@ -309,7 +313,12 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
/// Write key and key-value separator
DB::writeIntText(p.first, wb);
DB::writeChar(KEY_VALUE_SEPARATOR, wb);
/// <impl,updated>
DB::writeChar('<', wb);
p.second.writeText(wb);
DB::writeChar(',', wb);
DB::writeBoolText(updated_buckets[p.first], wb);
DB::writeChar('>', wb);
}
DB::writeChar(END_BUCKET_MARKER, wb);
}
Expand All @@ -327,6 +336,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
assert(key != 0);
assert(!impls.contains(key));
impls[key].read(rb);
DB::readBinary(updated_buckets[key], rb);
}
}

Expand All @@ -349,7 +359,12 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty

assert(key != 0);
assert(!impls.contains(key));
/// <impl,updated>
DB::assertChar('<', rb);
impls[key].readText(rb);
DB::assertChar(',', rb);
DB::readBoolText(updated_buckets[key], rb);
DB::assertChar('>', rb);
}
DB::assertChar(END_BUCKET_MARKER, rb);
}
Expand Down Expand Up @@ -402,6 +417,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
last_removed_watermark = it->first;
++removed;

updated_buckets.erase(it->first);
it = impls.erase(it);
}
else
Expand Down Expand Up @@ -438,4 +454,45 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty

return buckets;
}

bool isBucketUpdated(Int64 bucket_) const
{
auto it = updated_buckets.find(bucket_);
if (it != updated_buckets.end())
return it->second;

return false;
}

void resetUpdatedBucket(Int64 bucket_)
{
auto it = updated_buckets.find(bucket_);
if (it != updated_buckets.end())
it->second = false;
}

void writeUpdatedBuckets(DB::WriteBuffer & wb) const
{
DB::writeVarUInt(updated_buckets.size(), wb);
for (const auto & [bucket, updated] : updated_buckets)
{
DB::writeIntBinary(bucket, wb);
DB::writeBinary(updated, wb);
}
}

void readUpdatedBuckets(DB::ReadBuffer & rb)
{
size_t size = 0;
DB::readVarUInt(size, rb);
updated_buckets.clear();
Int64 bucket = 0;
bool updated = false;
for (size_t i = 0; i < size; ++i)
{
DB::readIntBinary(bucket, rb);
DB::readBinary(updated, rb);
updated_buckets.emplace(bucket, updated);
}
}
};
14 changes: 14 additions & 0 deletions src/Common/HashTable/TwoLevelHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ class TwoLevelHashMapTable : public TwoLevelHashTable<Key, Cell, Hash, Grower, A
this->impls[i].forEachValue(func);
}

template <typename Func>
void ALWAYS_INLINE forEachValueOfUpdatedBuckets(Func && func, bool reset_updated = false)
{
for (auto i = 0u; i < this->NUM_BUCKETS; ++i)
{
if (this->isBucketUpdated(i))
{
this->impls[i].forEachValue(func);
if (reset_updated)
this->resetUpdatedBucket(i);
}
}
}

template <typename Func>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{
Expand Down
47 changes: 47 additions & 0 deletions src/Common/HashTable/TwoLevelHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class TwoLevelHashTable :
using ConstLookupResult = typename Impl::ConstLookupResult;

Impl impls[NUM_BUCKETS];
bool updated_buckets[NUM_BUCKETS] = {false};


TwoLevelHashTable() = default;
Expand Down Expand Up @@ -119,6 +120,7 @@ class TwoLevelHashTable :
size_t hash_value = cell->getHash(src);
size_t buck = getBucketFromHash(hash_value);
impls[buck].insertUniqueNonZero(cell, hash_value);
updated_buckets[buck] = true;
}
}

Expand Down Expand Up @@ -271,6 +273,7 @@ class TwoLevelHashTable :
{
size_t buck = getBucketFromHash(hash_value);
impls[buck].emplace(key_holder, it, inserted, hash_value);
updated_buckets[buck] = true;
}

LookupResult ALWAYS_INLINE find(Key x, size_t hash_value)
Expand All @@ -292,7 +295,10 @@ class TwoLevelHashTable :
void write(DB::WriteBuffer & wb) const
{
for (UInt32 i = 0; i < NUM_BUCKETS; ++i)
{
impls[i].write(wb);
DB::writeBinary(updated_buckets[i], wb);
}
}

void writeText(DB::WriteBuffer & wb) const
Expand All @@ -301,14 +307,23 @@ class TwoLevelHashTable :
{
if (i != 0)
DB::writeChar(',', wb);

/// <impl,updated>
DB::writeChar('<', wb);
impls[i].writeText(wb);
DB::writeChar(',', wb);
DB::writeBoolText(updated_buckets[i], wb);
DB::writeChar('>', wb);
}
}

void read(DB::ReadBuffer & rb)
{
for (UInt32 i = 0; i < NUM_BUCKETS; ++i)
{
impls[i].read(rb);
DB::readBinary(updated_buckets[i], rb);
}
}

void readText(DB::ReadBuffer & rb)
Expand All @@ -317,7 +332,13 @@ class TwoLevelHashTable :
{
if (i != 0)
DB::assertChar(',', rb);

/// <impl,updated>
DB::assertChar('<', rb);
impls[i].readText(rb);
DB::assertChar(',', rb);
DB::readBoolText(updated_buckets[i], rb);
DB::assertChar('>', rb);
}
}

Expand Down Expand Up @@ -365,5 +386,31 @@ class TwoLevelHashTable :
std::iota(bucket_ids.begin(), bucket_ids.end(), 0);
return bucket_ids;
}

bool isBucketUpdated(Int64 bucket_) const
{
return updated_buckets[bucket_];
}

void resetUpdatedBucket(Int64 bucket_)
{
updated_buckets[bucket_] = false;
}

void writeUpdatedBuckets(DB::WriteBuffer & wb) const
{
DB::writeVarUInt(NUM_BUCKETS, wb);
for (const auto & elem : updated_buckets)
DB::writeBinary(elem, wb);
}

void readUpdatedBuckets(DB::ReadBuffer & rb)
{
size_t size = 0;
DB::readVarUInt(size, rb);
assert(size == NUM_BUCKETS);
for (auto & elem : updated_buckets)
DB::readBinary(elem, rb);
}
/// proton : ends
};
14 changes: 14 additions & 0 deletions src/Common/HashTable/TwoLevelStringHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ class TwoLevelStringHashMap : public TwoLevelStringHashTable<StringHashMapSubMap
this->impls[i].forEachValue(func);
}

template <typename Func>
void ALWAYS_INLINE forEachValueOfUpdatedBuckets(Func && func, bool reset_updated = false)
{
for (auto i = 0u; i < this->NUM_BUCKETS; ++i)
{
if (this->isBucketUpdated(i))
{
this->impls[i].forEachValue(func);
if (reset_updated)
this->resetUpdatedBucket(i);
}
}
}

template <typename Func>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{
Expand Down
Loading

0 comments on commit 5dc02c0

Please sign in to comment.