diff --git a/cmd/immudb/command/init.go b/cmd/immudb/command/init.go index 542e01764c..97619118d3 100644 --- a/cmd/immudb/command/init.go +++ b/cmd/immudb/command/init.go @@ -44,6 +44,7 @@ func (cl *Commandline) setupFlags(cmd *cobra.Command, options *server.Options) { cmd.Flags().Bool("replication-allow-tx-discarding", options.ReplicationOptions.AllowTxDiscarding, "allow precommitted transactions to be discarded if the replica diverges from the primary") cmd.Flags().Bool("replication-skip-integrity-check", options.ReplicationOptions.SkipIntegrityCheck, "disable integrity check when reading data during replication") cmd.Flags().Bool("replication-wait-for-indexing", options.ReplicationOptions.WaitForIndexing, "wait for indexing to be up to date during replication") + cmd.Flags().Int("shared-index-cache-size", options.SharedIndexCacheSize, "size (in bytes) of shared index cache") cmd.PersistentFlags().StringVar(&cl.config.CfgFn, "config", "", "config file (default path are configs or $HOME. Default filename is immudb.toml)") cmd.Flags().String("pidfile", options.Pidfile, "pid path with filename e.g. /var/run/immudb.pid") diff --git a/embedded/sql/functions.go b/embedded/sql/functions.go index d7eebddcd9..d922c620a1 100644 --- a/embedded/sql/functions.go +++ b/embedded/sql/functions.go @@ -144,7 +144,7 @@ func (f *SubstringFn) requiresType(t SQLValueType, cols map[string]ColDescriptor func (f *SubstringFn) Apply(tx *SQLTx, params []TypedValue) (TypedValue, error) { if len(params) != 3 { - return nil, fmt.Errorf("%w: '%s' function does expects one argument but %d were provided", ErrIllegalArguments, SubstringFnCall, len(params)) + return nil, fmt.Errorf("%w: '%s' function does expects three argument but %d were provided", ErrIllegalArguments, SubstringFnCall, len(params)) } v1, v2, v3 := params[0], params[1], params[2] @@ -165,6 +165,10 @@ func (f *SubstringFn) Apply(tx *SQLTx, params []TypedValue) (TypedValue, error) return nil, fmt.Errorf("%w: parameter 'length' cannot be negative", ErrIllegalArguments) } + if pos-1 >= int64(len(s)) { + return &Varchar{val: ""}, nil + } + end := pos - 1 + length if end > int64(len(s)) { end = int64(len(s)) diff --git a/embedded/store/immustore.go b/embedded/store/immustore.go index 64970c4f27..648c66cff9 100644 --- a/embedded/store/immustore.go +++ b/embedded/store/immustore.go @@ -43,6 +43,7 @@ import ( "github.com/codenotary/immudb/embedded/multierr" "github.com/codenotary/immudb/embedded/tbtree" "github.com/codenotary/immudb/embedded/watchers" + "github.com/codenotary/immudb/pkg/helpers/semaphore" "github.com/codenotary/immudb/pkg/helpers/slices" ) @@ -65,6 +66,7 @@ var ErrCannotUpdateKeyTransiency = errors.New("cannot change a non-transient key var ErrMaxActiveTransactionsLimitExceeded = errors.New("max active transactions limit exceeded") var ErrMVCCReadSetLimitExceeded = errors.New("MVCC read-set limit exceeded") var ErrMaxConcurrencyLimitExceeded = errors.New("max concurrency limit exceeded") +var ErrMaxIndexersLimitExceeded = errors.New("max indexers limit exceeded") var ErrPathIsNotADirectory = errors.New("path is not a directory") var ErrCorruptedTxData = errors.New("tx data is corrupted") var ErrCorruptedTxDataMaxTxEntriesExceeded = fmt.Errorf("%w: maximum number of TX entries exceeded", ErrCorruptedTxData) @@ -204,8 +206,8 @@ type ImmuStore struct { waiteesMutex sync.Mutex waiteesCount int // current number of go-routines waiting for a tx to be indexed or committed - _txbs []byte // pre-allocated buffer to support tx serialization - _valBs []byte // pre-allocated buffer to support tx exportation + _txbs []byte // pre-allocated buffer to support tx serialization + _valBs [DefaultMaxValueLen]byte // pre-allocated buffer to support tx exportation _valBsMux sync.Mutex aht *ahtree.AHtree @@ -213,8 +215,12 @@ type ImmuStore struct { durablePrecommitWHub *watchers.WatchersHub commitWHub *watchers.WatchersHub - indexers map[[sha256.Size]byte]*indexer - indexersMux sync.RWMutex + indexers map[[sha256.Size]byte]*indexer + nextIndexerID uint32 + indexCache *cache.Cache + + memSemaphore *semaphore.Semaphore // used by indexers to control amount acquired of memory + indexersMux sync.RWMutex opts *Options @@ -648,10 +654,10 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable writeTxHeaderVersion: opts.WriteTxHeaderVersion, - timeFunc: opts.TimeFunc, - multiIndexing: opts.MultiIndexing, - indexers: make(map[[sha256.Size]byte]*indexer), - + timeFunc: opts.TimeFunc, + multiIndexing: opts.MultiIndexing, + indexers: make(map[[sha256.Size]byte]*indexer), + memSemaphore: semaphore.New(uint64(opts.IndexOpts.MaxGlobalBufferedDataSize)), useExternalCommitAllowance: opts.UseExternalCommitAllowance, commitAllowedUpToTxID: committedTxID, @@ -663,7 +669,6 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable txPool: txPool, _txbs: txbs, - _valBs: make([]byte, maxValueLen), opts: opts, @@ -849,6 +854,18 @@ func (s *ImmuStore) InitIndexing(spec *IndexSpec) error { indexPath = filepath.Join(s.path, fmt.Sprintf("%s_%s", indexDirname, encPrefix)) } + if s.indexCache == nil { + if indexFactoryFunc := s.opts.IndexOpts.CacheFactory; indexFactoryFunc != nil { + s.indexCache = indexFactoryFunc() + } else { + c, err := cache.NewCache(s.opts.IndexOpts.CacheSize) + if err != nil { + return err + } + s.indexCache = c + } + } + indexer, err := newIndexer(indexPath, s, s.opts) if err != nil { return fmt.Errorf("%w: could not open indexer", err) @@ -862,7 +879,6 @@ func (s *ImmuStore) InitIndexing(spec *IndexSpec) error { } s.indexers[indexPrefix] = indexer - indexer.init(spec) return nil @@ -2666,7 +2682,15 @@ func (s *ImmuStore) ExportTx(txID uint64, allowPrecommitted bool, skipIntegrityC // val // TODO: improve value reading implementation, get rid of _valBs s._valBsMux.Lock() - _, err = s.readValueAt(s._valBs[:e.vLen], e.vOff, e.hVal, skipIntegrityCheck) + + var valBuf []byte + if e.vLen > len(s._valBs) { + valBuf = make([]byte, e.vLen) + } else { + valBuf = s._valBs[:e.vLen] + } + + _, err = s.readValueAt(valBuf, e.vOff, e.hVal, skipIntegrityCheck) if err != nil && !errors.Is(err, io.EOF) { s._valBsMux.Unlock() return nil, err @@ -2687,7 +2711,7 @@ func (s *ImmuStore) ExportTx(txID uint64, allowPrecommitted bool, skipIntegrityC } // val - _, err = buf.Write(s._valBs[:e.vLen]) + _, err = buf.Write(valBuf) if err != nil { s._valBsMux.Unlock() return nil, err @@ -3006,7 +3030,6 @@ func (s *ImmuStore) appendableReaderForTx(txID uint64, allowPrecommitted bool) ( } else { txr = &slicedReaderAt{bs: txbs.([]byte), off: txOff} } - return appendable.NewReaderFrom(txr, txOff, txSize), nil } diff --git a/embedded/store/indexer.go b/embedded/store/indexer.go index 98d48a094e..71e4566e21 100644 --- a/embedded/store/indexer.go +++ b/embedded/store/indexer.go @@ -23,8 +23,11 @@ import ( "encoding/binary" "errors" "fmt" + "math" + "math/rand" "path/filepath" "sync" + "sync/atomic" "time" "github.com/codenotary/immudb/embedded/tbtree" @@ -33,6 +36,13 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) +var ErrWriteStalling = errors.New("write stalling") + +const ( + writeStallingSleepDurationMin = 10 * time.Millisecond + writeStallingSleepDurationMax = 50 * time.Millisecond +) + type indexer struct { path string @@ -45,8 +55,8 @@ type indexer struct { maxBulkSize int bulkPreparationTimeout time.Duration - _kvs []*tbtree.KVT //pre-allocated for multi-tx bulk indexing - _val []byte //pre-allocated buffer to read entry values while mapping + _kvs []*tbtree.KVT //pre-allocated for multi-tx bulk indexing + _val [DefaultMaxValueLen]byte //pre-allocated buffer to read entry values while mapping index *tbtree.TBtree @@ -96,12 +106,19 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error) return nil, fmt.Errorf("%w: nil store", ErrIllegalArguments) } + id := atomic.AddUint32(&store.nextIndexerID, 1) + if id-1 > math.MaxUint16 { + return nil, ErrMaxIndexersLimitExceeded + } + indexOpts := tbtree.DefaultOptions(). + WithIdentifier(uint16(id - 1)). WithReadOnly(opts.ReadOnly). WithFileMode(opts.FileMode). WithLogger(opts.logger). WithFileSize(opts.FileSize). WithCacheSize(opts.IndexOpts.CacheSize). + WithCache(store.indexCache). WithFlushThld(opts.IndexOpts.FlushThld). WithSyncThld(opts.IndexOpts.SyncThld). WithFlushBufferSize(opts.IndexOpts.FlushBufferSize). @@ -115,7 +132,11 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error) WithCommitLogMaxOpenedFiles(opts.IndexOpts.CommitLogMaxOpenedFiles). WithRenewSnapRootAfter(opts.IndexOpts.RenewSnapRootAfter). WithCompactionThld(opts.IndexOpts.CompactionThld). - WithDelayDuringCompaction(opts.IndexOpts.DelayDuringCompaction) + WithDelayDuringCompaction(opts.IndexOpts.DelayDuringCompaction). + WithMaxBufferedDataSize(opts.IndexOpts.MaxBufferedDataSize). + WithOnFlushFunc(func(releasedDataSize int) { + store.memSemaphore.Release(uint64(releasedDataSize)) + }) if opts.appFactory != nil { indexOpts.WithAppFactory(tbtree.AppFactoryFunc(opts.appFactory)) @@ -150,7 +171,6 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error) maxBulkSize: opts.IndexOpts.MaxBulkSize, bulkPreparationTimeout: opts.IndexOpts.BulkPreparationTimeout, _kvs: kvs, - _val: make([]byte, store.maxValueLen), path: path, index: index, wHub: wHub, @@ -457,13 +477,33 @@ func (idx *indexer) doIndexing() { if errors.Is(err, ErrAlreadyClosed) || errors.Is(err, tbtree.ErrAlreadyClosed) { return } - if err != nil { + + if err != nil && !errors.Is(err, ErrWriteStalling) { + idx.store.logger.Errorf("indexing failed at '%s' due to error: %v", idx.store.path, err) + time.Sleep(60 * time.Second) + } + + if err := idx.handleWriteStalling(err); err != nil { idx.store.logger.Errorf("indexing failed at '%s' due to error: %v", idx.store.path, err) time.Sleep(60 * time.Second) } } } +func (idx *indexer) handleWriteStalling(err error) error { + if !errors.Is(err, ErrWriteStalling) { + return nil + } + + if err := idx.store.FlushIndexes(0, false); err != nil { + return err + } + + sleepTime := writeStallingSleepDurationMin + time.Duration(rand.Intn(int(writeStallingSleepDurationMax-writeStallingSleepDurationMin+1))) + time.Sleep(sleepTime) + return nil +} + func serializeIndexableEntry(b []byte, txmd []byte, e *TxEntry, kvmd []byte) int { n := 0 @@ -500,18 +540,27 @@ func (idx *indexer) mapKey(key []byte, vLen int, vOff int64, hVal [sha256.Size]b return key, nil } - _, err = idx.store.readValueAt(idx._val[:vLen], vOff, hVal, false) + buf := idx.valBuffer(vLen) + _, err = idx.store.readValueAt(buf, vOff, hVal, false) if err != nil { return nil, err } - return mapper(key, idx._val[:vLen]) + return mapper(key, buf) +} + +func (idx *indexer) valBuffer(vLen int) []byte { + if vLen > len(idx._val) { + return make([]byte, vLen) + } + return idx._val[:vLen] } func (idx *indexer) indexSince(txID uint64) error { ctx, cancel := context.WithTimeout(context.Background(), idx.bulkPreparationTimeout) defer cancel() + acquiredMem := 0 bulkSize := 0 indexableEntries := 0 @@ -521,6 +570,7 @@ func (idx *indexer) indexSince(txID uint64) error { return err } + txIndexedEntries := 0 txEntries := idx.tx.Entries() var txmd []byte @@ -568,6 +618,7 @@ func (idx *indexer) indexSince(txID uint64) error { idx._kvs[indexableEntries].T = txID + uint64(i) indexableEntries++ + txIndexedEntries++ if idx.spec.InjectiveMapping && txID > 1 { // wait for source indexer to be up to date @@ -591,11 +642,6 @@ func (idx *indexer) indexSince(txID uint64) error { return err } - _, err = idx.store.readValueAt(idx._val[:prevEntry.vLen], prevEntry.vOff, prevEntry.hVal, false) - if err != nil { - return err - } - targetPrevKey, err := idx.mapKey(sourceKey, prevEntry.vLen, prevEntry.vOff, prevEntry.hVal, idx.spec.TargetEntryMapper) if err != nil { return err @@ -637,12 +683,24 @@ func (idx *indexer) indexSince(txID uint64) error { idx._kvs[indexableEntries].T = txID + uint64(i) indexableEntries++ + txIndexedEntries++ } else if !errors.Is(err, ErrKeyNotFound) { return err } } } + if indexableEntries > 0 && txIndexedEntries > 0 { + size := estimateEntriesSize(idx._kvs[indexableEntries-txIndexedEntries : indexableEntries]) + if !idx.store.memSemaphore.Acquire(uint64(size)) { + if acquiredMem == 0 { + return ErrWriteStalling + } + break + } + acquiredMem += size + } + bulkSize++ if bulkSize < idx.maxBulkSize { @@ -675,3 +733,11 @@ func (idx *indexer) indexSince(txID uint64) error { return nil } + +func estimateEntriesSize(kvs []*tbtree.KVT) int { + size := 0 + for _, kv := range kvs { + size += len(kv.K) + len(kv.V) + 8 + } + return size +} diff --git a/embedded/store/indexer_test.go b/embedded/store/indexer_test.go index c0bb302a43..40583cace0 100644 --- a/embedded/store/indexer_test.go +++ b/embedded/store/indexer_test.go @@ -220,3 +220,75 @@ func TestClosedIndexer(t *testing.T) { assert.Error(t, err) assert.ErrorIs(t, err, ErrAlreadyClosed) } + +func TestIndexFlushShouldReleaseMemory(t *testing.T) { + d := t.TempDir() + store, err := Open(d, DefaultOptions()) + require.NoError(t, err) + defer store.Close() + + key := make([]byte, 100) + value := make([]byte, 100) + + n := 100 + for i := 0; i < n; i++ { + tx, err := store.NewWriteOnlyTx(context.Background()) + require.NoError(t, err) + + err = tx.Set(key, nil, value) + require.NoError(t, err) + _, err = tx.Commit(context.Background()) + require.NoError(t, err) + } + + idx, err := store.getIndexerFor([]byte{}) + require.NoError(t, err) + require.NotNil(t, idx) + + require.Greater(t, store.memSemaphore.Value(), uint64(0)) + + _, _, err = idx.index.Flush() + require.NoError(t, err) + require.Zero(t, store.memSemaphore.Value()) +} + +func TestIndexerWriteStalling(t *testing.T) { + d := t.TempDir() + store, err := Open(d, DefaultOptions().WithMultiIndexing(true).WithIndexOptions(DefaultIndexOptions().WithMaxBufferedDataSize(1024).WithMaxGlobalBufferedDataSize(1024))) + require.NoError(t, err) + defer store.Close() + + nIndexes := 30 + + for i := 0; i < nIndexes; i++ { + err = store.InitIndexing(&IndexSpec{ + TargetPrefix: []byte{byte(i)}, + TargetEntryMapper: func(x int) func(key []byte, value []byte) ([]byte, error) { + return func(key, value []byte) ([]byte, error) { + return append([]byte{byte(x)}, key...), nil + } + }(i), + }) + require.NoError(t, err) + } + + key := make([]byte, 100) + value := make([]byte, 100) + + n := 100 + for i := 0; i < n; i++ { + tx, err := store.NewWriteOnlyTx(context.Background()) + require.NoError(t, err) + + err = tx.Set(key, nil, value) + require.NoError(t, err) + _, err = tx.Commit(context.Background()) + require.NoError(t, err) + } + + for i := 0; i < nIndexes; i++ { + idx, err := store.getIndexerFor([]byte{byte(i)}) + require.NoError(t, err) + require.Equal(t, idx.Ts(), uint64(n)) + } +} diff --git a/embedded/store/options.go b/embedded/store/options.go index 4e9e7a8107..a21261b902 100644 --- a/embedded/store/options.go +++ b/embedded/store/options.go @@ -24,8 +24,10 @@ import ( "github.com/codenotary/immudb/embedded/ahtree" "github.com/codenotary/immudb/embedded/appendable" "github.com/codenotary/immudb/embedded/appendable/multiapp" + "github.com/codenotary/immudb/embedded/cache" "github.com/codenotary/immudb/embedded/logger" "github.com/codenotary/immudb/embedded/tbtree" + "github.com/codenotary/immudb/pkg/helpers/semaphore" ) const DefaultMaxActiveTransactions = 1000 @@ -51,6 +53,7 @@ const DefaultCommitLogMaxOpenedFiles = 10 const DefaultWriteTxHeaderVersion = MaxTxHeaderVersion const DefaultWriteBufferSize = 1 << 22 //4Mb const DefaultIndexingMaxBulkSize = 1 +const DefaultIndexingGlobalMaxBufferedDataSize = 1 << 30 const DefaultBulkPreparationTimeout = DefaultSyncFrequency const DefaultTruncationFrequency = 24 * time.Hour const MinimumRetentionPeriod = 24 * time.Hour @@ -66,8 +69,14 @@ type AppFactoryFunc func( type AppRemoveFunc func(rootPath, subPath string) error +type IndexCacheFactoryFunc func() *cache.Cache + +type IndexMemSemaphoreFactoryFunc func() *semaphore.Semaphore + type TimeFunc func() time.Time +type FlushFunc func() error + type Options struct { ReadOnly bool @@ -147,7 +156,13 @@ type Options struct { } type IndexOptions struct { - // Size of the Btree node cache + // Global limit for amount of data that can be buffered from all indexes + MaxGlobalBufferedDataSize int + + // MaxBufferedDataSize + MaxBufferedDataSize int + + // Size (in bytes) of the Btree node cache CacheSize int // Number of new index entries between disk flushes @@ -191,6 +206,9 @@ type IndexOptions struct { // Maximum time waiting for more transactions to be committed and included into the same bulk BulkPreparationTimeout time.Duration + + // CacheFactory function + CacheFactory IndexCacheFactoryFunc } type AHTOptions struct { @@ -240,26 +258,29 @@ func DefaultOptions() *Options { CompressionLevel: DefaultCompressionLevel, EmbeddedValues: DefaultEmbeddedValues, PreallocFiles: DefaultPreallocFiles, - IndexOpts: DefaultIndexOptions(), - AHTOpts: DefaultAHTOptions(), + + IndexOpts: DefaultIndexOptions(), + AHTOpts: DefaultAHTOptions(), } } func DefaultIndexOptions() *IndexOptions { return &IndexOptions{ - CacheSize: tbtree.DefaultCacheSize, - FlushThld: tbtree.DefaultFlushThld, - SyncThld: tbtree.DefaultSyncThld, - FlushBufferSize: tbtree.DefaultFlushBufferSize, - CleanupPercentage: tbtree.DefaultCleanUpPercentage, - MaxActiveSnapshots: tbtree.DefaultMaxActiveSnapshots, - MaxNodeSize: tbtree.DefaultMaxNodeSize, - RenewSnapRootAfter: tbtree.DefaultRenewSnapRootAfter, - CompactionThld: tbtree.DefaultCompactionThld, - DelayDuringCompaction: 0, - NodesLogMaxOpenedFiles: tbtree.DefaultNodesLogMaxOpenedFiles, - HistoryLogMaxOpenedFiles: tbtree.DefaultHistoryLogMaxOpenedFiles, - CommitLogMaxOpenedFiles: tbtree.DefaultCommitLogMaxOpenedFiles, + MaxGlobalBufferedDataSize: DefaultIndexingGlobalMaxBufferedDataSize, + MaxBufferedDataSize: tbtree.DefaultMaxBufferedDataSize, + CacheSize: tbtree.DefaultCacheSize, + FlushThld: tbtree.DefaultFlushThld, + SyncThld: tbtree.DefaultSyncThld, + FlushBufferSize: tbtree.DefaultFlushBufferSize, + CleanupPercentage: tbtree.DefaultCleanUpPercentage, + MaxActiveSnapshots: tbtree.DefaultMaxActiveSnapshots, + MaxNodeSize: tbtree.DefaultMaxNodeSize, + RenewSnapRootAfter: tbtree.DefaultRenewSnapRootAfter, + CompactionThld: tbtree.DefaultCompactionThld, + DelayDuringCompaction: 0, + NodesLogMaxOpenedFiles: tbtree.DefaultNodesLogMaxOpenedFiles, + HistoryLogMaxOpenedFiles: tbtree.DefaultHistoryLogMaxOpenedFiles, + CommitLogMaxOpenedFiles: tbtree.DefaultCommitLogMaxOpenedFiles, MaxBulkSize: DefaultIndexingMaxBulkSize, BulkPreparationTimeout: DefaultBulkPreparationTimeout, @@ -365,6 +386,15 @@ func (opts *IndexOptions) Validate() error { if opts == nil { return fmt.Errorf("%w: nil index options ", ErrInvalidOptions) } + if opts.MaxBufferedDataSize <= 0 { + return fmt.Errorf("%w: invalid index option MaxBufferedDataSize", ErrInvalidOptions) + } + if opts.MaxGlobalBufferedDataSize <= 0 { + return fmt.Errorf("%w: invalid index option MaxGlobalBufferedDataSize", ErrInvalidOptions) + } + if opts.MaxBufferedDataSize > opts.MaxGlobalBufferedDataSize { + return fmt.Errorf("%w: invalid index option MaxBufferedDataSize > MaxGlobalBufferedDataSize", ErrInvalidOptions) + } if opts.CacheSize <= 0 { return fmt.Errorf("%w: invalid index option CacheSize", ErrInvalidOptions) } @@ -670,6 +700,21 @@ func (opts *IndexOptions) WithCommitLogMaxOpenedFiles(commitLogMaxOpenedFiles in return opts } +func (opts *IndexOptions) WithMaxBufferedDataSize(size int) *IndexOptions { + opts.MaxBufferedDataSize = size + return opts +} + +func (opts *IndexOptions) WithMaxGlobalBufferedDataSize(size int) *IndexOptions { + opts.MaxGlobalBufferedDataSize = size + return opts +} + +func (opts *IndexOptions) WithCacheFactoryFunc(indexCacheFactory IndexCacheFactoryFunc) *IndexOptions { + opts.CacheFactory = indexCacheFactory + return opts +} + // AHTOptions func (opts *AHTOptions) WithWriteBufferSize(writeBufferSize int) *AHTOptions { diff --git a/embedded/store/options_test.go b/embedded/store/options_test.go index 8449f1829e..f0f64fd2ed 100644 --- a/embedded/store/options_test.go +++ b/embedded/store/options_test.go @@ -85,6 +85,9 @@ func TestInvalidIndexOptions(t *testing.T) { {"NodesLogMaxOpenedFiles", DefaultIndexOptions().WithNodesLogMaxOpenedFiles(0)}, {"HistoryLogMaxOpenedFiles", DefaultIndexOptions().WithHistoryLogMaxOpenedFiles(0)}, {"CommitLogMaxOpenedFiles", DefaultIndexOptions().WithCommitLogMaxOpenedFiles(0)}, + {"MaxGlobalBufferedDataSize", DefaultIndexOptions().WithMaxGlobalBufferedDataSize(0)}, + {"MaxGlobalBufferedDataSize", DefaultIndexOptions().WithMaxGlobalBufferedDataSize(DefaultIndexOptions().MaxBufferedDataSize - 1)}, + {"MaxBufferedDataSize", DefaultIndexOptions().WithMaxBufferedDataSize(0)}, } { t.Run(d.n, func(t *testing.T) { require.ErrorIs(t, d.opts.Validate(), ErrInvalidOptions) @@ -195,6 +198,8 @@ func TestValidOptions(t *testing.T) { require.Equal(t, 1*time.Millisecond, indexOpts.WithDelayDuringCompaction(1*time.Millisecond).DelayDuringCompaction) require.Equal(t, 4096*2, indexOpts.WithFlushBufferSize(4096*2).FlushBufferSize) require.Equal(t, float32(10), indexOpts.WithCleanupPercentage(10).CleanupPercentage) + require.Equal(t, int(10), indexOpts.WithMaxBufferedDataSize(10).MaxBufferedDataSize) + require.Equal(t, int(10), indexOpts.WithMaxGlobalBufferedDataSize(10).MaxGlobalBufferedDataSize) require.Nil(t, opts.WithAHTOptions(nil).AHTOpts) require.ErrorIs(t, opts.Validate(), ErrInvalidOptions) diff --git a/embedded/tbtree/options.go b/embedded/tbtree/options.go index 4e0f7c7591..8b2554b4bd 100644 --- a/embedded/tbtree/options.go +++ b/embedded/tbtree/options.go @@ -24,6 +24,7 @@ import ( "github.com/codenotary/immudb/embedded/appendable" "github.com/codenotary/immudb/embedded/appendable/multiapp" + "github.com/codenotary/immudb/embedded/cache" "github.com/codenotary/immudb/embedded/logger" ) @@ -32,6 +33,7 @@ const ( DefaultFlushThld = 100_000 DefaultSyncThld = 1_000_000 DefaultFlushBufferSize = 4096 + DefaultMaxBufferedDataSize = 1 << 22 // 4MB DefaultCleanUpPercentage float32 = 0 DefaultMaxActiveSnapshots = 100 DefaultRenewSnapRootAfter = time.Duration(1000) * time.Millisecond @@ -58,18 +60,23 @@ type AppFactoryFunc func( type AppRemoveFunc func(rootPath, subPath string) error +type OnFlushFunc func(releasedDataSize int) + type Options struct { logger logger.Logger - flushThld int - syncThld int - flushBufferSize int - cleanupPercentage float32 - maxActiveSnapshots int - renewSnapRootAfter time.Duration - cacheSize int - readOnly bool - fileMode os.FileMode + ID uint16 + flushThld int + syncThld int + maxBufferedDataSize int // maximum amount of KV data that can be buffered before triggering flushing + flushBufferSize int + cleanupPercentage float32 + maxActiveSnapshots int + renewSnapRootAfter time.Duration + cacheSize int + cache *cache.Cache + readOnly bool + fileMode os.FileMode nodesLogMaxOpenedFiles int historyLogMaxOpenedFiles int @@ -86,11 +93,14 @@ type Options struct { appFactory AppFactoryFunc appRemove AppRemoveFunc + onFlush OnFlushFunc } func DefaultOptions() *Options { return &Options{ logger: logger.NewSimpleLogger("immudb ", os.Stderr), + ID: 0, + maxBufferedDataSize: DefaultMaxBufferedDataSize, flushThld: DefaultFlushThld, syncThld: DefaultSyncThld, flushBufferSize: DefaultFlushBufferSize, @@ -176,7 +186,7 @@ func (opts *Options) Validate() error { return fmt.Errorf("%w: invalid RenewSnapRootAfter", ErrInvalidOptions) } - if opts.cacheSize < MinCacheSize { + if opts.cacheSize < MinCacheSize || (opts.cacheSize == 0 && opts.cache == nil) { return fmt.Errorf("%w: invalid CacheSize", ErrInvalidOptions) } @@ -241,6 +251,11 @@ func (opts *Options) WithCacheSize(cacheSize int) *Options { return opts } +func (opts *Options) WithCache(cache *cache.Cache) *Options { + opts.cache = cache + return opts +} + func (opts *Options) WithReadOnly(readOnly bool) *Options { opts.readOnly = readOnly return opts @@ -295,3 +310,18 @@ func (opts *Options) WithDelayDuringCompaction(delay time.Duration) *Options { opts.delayDuringCompaction = delay return opts } + +func (opts *Options) WithIdentifier(id uint16) *Options { + opts.ID = id + return opts +} + +func (opts *Options) WithMaxBufferedDataSize(size int) *Options { + opts.maxBufferedDataSize = size + return opts +} + +func (opts *Options) WithOnFlushFunc(onFlush OnFlushFunc) *Options { + opts.onFlush = onFlush + return opts +} diff --git a/embedded/tbtree/tbtree.go b/embedded/tbtree/tbtree.go index 4f92473900..db7bb0a450 100644 --- a/embedded/tbtree/tbtree.go +++ b/embedded/tbtree/tbtree.go @@ -167,6 +167,7 @@ func (e *cLogEntry) deserialize(b []byte) { // TBTree implements a timed-btree type TBtree struct { path string + id uint16 logger logger.Logger @@ -185,6 +186,7 @@ type TBtree struct { insertionCountSinceSync int insertionCountSinceCleanup int flushThld int + maxBufferedDataSize int syncThld int flushBufferSize int cleanupPercentage float32 @@ -204,10 +206,12 @@ type TBtree struct { appFactory AppFactoryFunc appRemove AppRemoveFunc - snapshots map[uint64]*Snapshot - maxSnapshotID uint64 - lastSnapRoot node - lastSnapRootAt time.Time + bufferedDataSize int + onFlush OnFlushFunc + snapshots map[uint64]*Snapshot + maxSnapshotID uint64 + lastSnapRoot node + lastSnapRootAt time.Time committedLogSize int64 committedNLogSize int64 @@ -556,24 +560,30 @@ func OpenWith(path string, nLog, hLog, cLog appendable.Appendable, opts *Options } } - cache, err := cache.NewCache(opts.cacheSize) - if err != nil { - return nil, err + nodeCache := opts.cache + if nodeCache == nil { + nodeCache, err = cache.NewCache(opts.cacheSize) + if err != nil { + return nil, err + } } t := &TBtree{ path: path, + id: opts.ID, logger: opts.logger, nLog: nLog, hLog: hLog, cLog: cLog, - cache: cache, + cache: nodeCache, maxNodeSize: maxNodeSize, maxKeySize: maxKeySize, maxValueSize: maxValueSize, flushThld: opts.flushThld, + maxBufferedDataSize: opts.maxBufferedDataSize, syncThld: opts.syncThld, flushBufferSize: opts.flushBufferSize, + onFlush: opts.onFlush, cleanupPercentage: opts.cleanupPercentage, renewSnapRootAfter: opts.renewSnapRootAfter, maxActiveSnapshots: opts.maxActiveSnapshots, @@ -739,12 +749,16 @@ func (t *TBtree) cachePut(n node) { defer t.nmutex.Unlock() size, _ := n.size() - r, _, _ := t.cache.PutWeighted(n.offset(), n, size) + r, _, _ := t.cache.PutWeighted(encodeOffset(t.id, n.offset()), n, size) if r != nil { metricsCacheEvict.WithLabelValues(t.path).Inc() } } +func encodeOffset(id uint16, offset int64) int64 { + return int64(id)<<48 | offset +} + func (t *TBtree) nodeAt(offset int64, updateCache bool) (node, error) { t.nmutex.Lock() defer t.nmutex.Unlock() @@ -752,7 +766,9 @@ func (t *TBtree) nodeAt(offset int64, updateCache bool) (node, error) { size := t.cache.EntriesCount() metricsCacheSizeStats.WithLabelValues(t.path).Set(float64(size)) - v, err := t.cache.Get(offset) + encOffset := encodeOffset(t.id, offset) + + v, err := t.cache.Get(encOffset) if err == nil { metricsCacheHit.WithLabelValues(t.path).Inc() return v.(node), nil @@ -768,7 +784,7 @@ func (t *TBtree) nodeAt(offset int64, updateCache bool) (node, error) { if updateCache { size, _ := n.size() - r, _, _ := t.cache.PutWeighted(n.offset(), n, size) + r, _, _ := t.cache.PutWeighted(encOffset, n, size) if r != nil { metricsCacheEvict.WithLabelValues(t.path).Inc() } @@ -1204,6 +1220,11 @@ func (t *TBtree) flushTree(cleanupPercentageHint float32, forceSync bool, forceC } t.insertionCountSinceFlush = 0 + if t.onFlush != nil { + t.onFlush(t.bufferedDataSize) + } + t.bufferedDataSize = 0 + if cleanupPercentage != 0 { t.insertionCountSinceCleanup = 0 } @@ -1629,6 +1650,14 @@ func (t *TBtree) BulkInsert(kvts []*KVT) error { return t.bulkInsert(kvts) } +func estimateSize(kvts []*KVT) int { + size := 0 + for _, kv := range kvts { + size += len(kv.K) + len(kv.V) + 8 + } + return size +} + func (t *TBtree) bulkInsert(kvts []*KVT) error { if t.closed { return ErrAlreadyClosed @@ -1638,6 +1667,15 @@ func (t *TBtree) bulkInsert(kvts []*KVT) error { return ErrIllegalArguments } + entriesSize := estimateSize(kvts) + if t.bufferedDataSize > 0 && t.bufferedDataSize+entriesSize > t.maxBufferedDataSize { + _, _, err := t.flushTree(t.cleanupPercentage, false, false, "bulkInsert") + if err != nil { + return err + } + } + t.bufferedDataSize += entriesSize + currTs := t.root.ts() // newTs will hold the greatest time, the minimun value will be currTs + 1 diff --git a/pkg/database/types.go b/pkg/database/types.go index f97d26850e..2c1e41c7f5 100644 --- a/pkg/database/types.go +++ b/pkg/database/types.go @@ -41,7 +41,7 @@ type dbRef struct { deleted bool } -//NewDatabaseList constructs a new database list +// NewDatabaseList constructs a new database list func NewDatabaseList() DatabaseList { return &databaseList{ databases: make([]DB, 0), diff --git a/pkg/helpers/semaphore/semaphore.go b/pkg/helpers/semaphore/semaphore.go new file mode 100644 index 0000000000..67f1a7f094 --- /dev/null +++ b/pkg/helpers/semaphore/semaphore.go @@ -0,0 +1,35 @@ +package semaphore + +import "sync/atomic" + +type Semaphore struct { + maxWeight uint64 + currWeight uint64 +} + +func New(maxWeight uint64) *Semaphore { + return &Semaphore{ + maxWeight: maxWeight, + currWeight: 0, + } +} + +func (m *Semaphore) Acquire(n uint64) bool { + if newVal := atomic.AddUint64(&m.currWeight, n); newVal <= m.maxWeight { + return true + } + m.Release(n) + return false +} + +func (m *Semaphore) Release(n uint64) { + atomic.AddUint64(&m.currWeight, ^uint64(n-1)) +} + +func (m *Semaphore) Value() uint64 { + return atomic.LoadUint64(&m.currWeight) +} + +func (m *Semaphore) MaxWeight() uint64 { + return m.maxWeight +} diff --git a/pkg/server/db_options.go b/pkg/server/db_options.go index 85fbe8f86e..ab25dd0f7c 100644 --- a/pkg/server/db_options.go +++ b/pkg/server/db_options.go @@ -231,7 +231,7 @@ func (s *ImmuServer) defaultAHTOptions() *ahtOptions { func (s *ImmuServer) databaseOptionsFrom(opts *dbOptions) *database.Options { return database.DefaultOption(). WithDBRootPath(s.Options.Dir). - WithStoreOptions(s.storeOptionsForDB(opts.Database, s.remoteStorage, opts.storeOptions())). + WithStoreOptions(s.storeOptionsForDB(opts.Database, s.remoteStorage, opts.storeOptions(s))). AsReplica(opts.Replica). WithSyncReplication(opts.SyncReplication). WithSyncAcks(opts.SyncAcks). @@ -241,7 +241,7 @@ func (s *ImmuServer) databaseOptionsFrom(opts *dbOptions) *database.Options { WithMaxResultSize(s.Options.MaxResultSize) } -func (opts *dbOptions) storeOptions() *store.Options { +func (opts *dbOptions) storeOptions(s *ImmuServer) *store.Options { indexOpts := store.DefaultIndexOptions() if opts.IndexOptions != nil { @@ -263,6 +263,10 @@ func (opts *dbOptions) storeOptions() *store.Options { WithBulkPreparationTimeout(time.Millisecond * time.Duration(opts.IndexOptions.BulkPreparationTimeout)) } + if s != nil { + indexOpts.WithCacheFactoryFunc(s.indexCacheFactoryFunc()) + } + ahtOpts := store.DefaultAHTOptions() if opts.AHTOptions != nil { @@ -386,7 +390,6 @@ func (opts *dbOptions) databaseNullableSettings() *schema.DatabaseNullableSettin // Only those fields that were present up to the 1.2.2 release are supported. // Changing any other fields requires new API calls. func dbSettingsToDBNullableSettings(settings *schema.DatabaseSettings) *schema.DatabaseNullableSettings { - nullableUInt32 := func(v uint32) *schema.NullableUint32 { if v > 0 { return &schema.NullableUint32{ @@ -820,7 +823,7 @@ func (opts *dbOptions) Validate() error { ErrIllegalArguments, opts.Database, store.MinimumTruncationFrequency.Hours()) } - return opts.storeOptions().Validate() + return opts.storeOptions(nil).Validate() } func (opts *dbOptions) isReplicatorRequired() bool { diff --git a/pkg/server/options.go b/pkg/server/options.go index f2fa5fc9e2..118d7c324c 100644 --- a/pkg/server/options.go +++ b/pkg/server/options.go @@ -77,6 +77,7 @@ type Options struct { GRPCReflectionServerEnabled bool SwaggerUIEnabled bool LogRequestMetadata bool + SharedIndexCacheSize int } type RemoteStorageOptions struct { @@ -147,6 +148,7 @@ func DefaultOptions() *Options { GRPCReflectionServerEnabled: true, SwaggerUIEnabled: true, LogRequestMetadata: false, + SharedIndexCacheSize: 1 << 27, // 128MB } } diff --git a/pkg/server/remote_storage.go b/pkg/server/remote_storage.go index 36795a10dc..a036d394c4 100644 --- a/pkg/server/remote_storage.go +++ b/pkg/server/remote_storage.go @@ -27,6 +27,7 @@ import ( "github.com/codenotary/immudb/embedded/appendable" "github.com/codenotary/immudb/embedded/appendable/multiapp" "github.com/codenotary/immudb/embedded/appendable/remoteapp" + "github.com/codenotary/immudb/embedded/cache" "github.com/codenotary/immudb/embedded/remotestorage" "github.com/codenotary/immudb/embedded/remotestorage/s3" "github.com/codenotary/immudb/embedded/store" @@ -158,6 +159,16 @@ func (s *ImmuServer) updateRemoteUUID(remoteStorage remotestorage.Storage) error return remoteStorage.Put(ctx, IDENTIFIER_FNAME, filepath.Join(s.Options.Dir, IDENTIFIER_FNAME)) } +func (s *ImmuServer) indexCacheFactoryFunc() store.IndexCacheFactoryFunc { + if s.indexCacheFunc == nil { + c, _ := cache.NewCache(s.Options.SharedIndexCacheSize) + s.indexCacheFunc = func() *cache.Cache { + return c + } + } + return s.indexCacheFunc +} + func (s *ImmuServer) storeOptionsForDB(name string, remoteStorage remotestorage.Storage, stOpts *store.Options) *store.Options { if remoteStorage != nil { stOpts.WithAppFactory(func(rootPath, subPath string, opts *multiapp.Options) (appendable.Appendable, error) { diff --git a/pkg/server/server.go b/pkg/server/server.go index 81ec61e6ff..d239610412 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1029,7 +1029,7 @@ func (s *ImmuServer) LoadDatabase(ctx context.Context, req *schema.LoadDatabaseR return nil, ErrIllegalArguments } - s.Logger.Infof("loadinig database '%s'...", req.Database) + s.Logger.Infof("loading database '%s'...", req.Database) defer func() { if err == nil { diff --git a/pkg/server/types.go b/pkg/server/types.go index ee3d376f73..411a2a7596 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -26,6 +26,7 @@ import ( "github.com/codenotary/immudb/pkg/server/sessions" "github.com/codenotary/immudb/pkg/truncator" + "github.com/codenotary/immudb/embedded/cache" "github.com/codenotary/immudb/embedded/remotestorage" pgsqlsrv "github.com/codenotary/immudb/pkg/pgsql/server" "github.com/codenotary/immudb/pkg/replication" @@ -83,9 +84,9 @@ type ImmuServer struct { StreamServiceFactory stream.ServiceFactory PgsqlSrv pgsqlsrv.PGSQLServer - remoteStorage remotestorage.Storage - - SessManager sessions.Manager + remoteStorage remotestorage.Storage + indexCacheFunc func() *cache.Cache + SessManager sessions.Manager } // DefaultServer ...