Skip to content

Commit

Permalink
chore(embedded/store): improve index flush logic
Browse files Browse the repository at this point in the history
- implement memory aware flushing;
- share node cache across multiple indexes;
- implement write stalling;
- avoid to allocate large buffers when store.maxValueSize is too large.

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>
  • Loading branch information
ostafen committed Aug 27, 2024
1 parent bf27080 commit 2a3d382
Show file tree
Hide file tree
Showing 16 changed files with 408 additions and 72 deletions.
1 change: 1 addition & 0 deletions cmd/immudb/command/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (cl *Commandline) setupFlags(cmd *cobra.Command, options *server.Options) {
cmd.Flags().Bool("replication-allow-tx-discarding", options.ReplicationOptions.AllowTxDiscarding, "allow precommitted transactions to be discarded if the replica diverges from the primary")
cmd.Flags().Bool("replication-skip-integrity-check", options.ReplicationOptions.SkipIntegrityCheck, "disable integrity check when reading data during replication")
cmd.Flags().Bool("replication-wait-for-indexing", options.ReplicationOptions.WaitForIndexing, "wait for indexing to be up to date during replication")
cmd.Flags().Int("shared-index-cache-size", options.SharedIndexCacheSize, "size (in bytes) of shared index cache")

cmd.PersistentFlags().StringVar(&cl.config.CfgFn, "config", "", "config file (default path are configs or $HOME. Default filename is immudb.toml)")
cmd.Flags().String("pidfile", options.Pidfile, "pid path with filename e.g. /var/run/immudb.pid")
Expand Down
6 changes: 5 additions & 1 deletion embedded/sql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (f *SubstringFn) requiresType(t SQLValueType, cols map[string]ColDescriptor

func (f *SubstringFn) Apply(tx *SQLTx, params []TypedValue) (TypedValue, error) {
if len(params) != 3 {
return nil, fmt.Errorf("%w: '%s' function does expects one argument but %d were provided", ErrIllegalArguments, SubstringFnCall, len(params))
return nil, fmt.Errorf("%w: '%s' function does expects three argument but %d were provided", ErrIllegalArguments, SubstringFnCall, len(params))
}

v1, v2, v3 := params[0], params[1], params[2]
Expand All @@ -165,6 +165,10 @@ func (f *SubstringFn) Apply(tx *SQLTx, params []TypedValue) (TypedValue, error)
return nil, fmt.Errorf("%w: parameter 'length' cannot be negative", ErrIllegalArguments)
}

if pos-1 >= int64(len(s)) {
return &Varchar{val: ""}, nil
}

end := pos - 1 + length
if end > int64(len(s)) {
end = int64(len(s))
Expand Down
49 changes: 36 additions & 13 deletions embedded/store/immustore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/codenotary/immudb/embedded/multierr"
"github.com/codenotary/immudb/embedded/tbtree"
"github.com/codenotary/immudb/embedded/watchers"
"github.com/codenotary/immudb/pkg/helpers/semaphore"
"github.com/codenotary/immudb/pkg/helpers/slices"
)

Expand All @@ -65,6 +66,7 @@ var ErrCannotUpdateKeyTransiency = errors.New("cannot change a non-transient key
var ErrMaxActiveTransactionsLimitExceeded = errors.New("max active transactions limit exceeded")
var ErrMVCCReadSetLimitExceeded = errors.New("MVCC read-set limit exceeded")
var ErrMaxConcurrencyLimitExceeded = errors.New("max concurrency limit exceeded")
var ErrMaxIndexersLimitExceeded = errors.New("max indexers limit exceeded")
var ErrPathIsNotADirectory = errors.New("path is not a directory")
var ErrCorruptedTxData = errors.New("tx data is corrupted")
var ErrCorruptedTxDataMaxTxEntriesExceeded = fmt.Errorf("%w: maximum number of TX entries exceeded", ErrCorruptedTxData)
Expand Down Expand Up @@ -204,17 +206,21 @@ type ImmuStore struct {
waiteesMutex sync.Mutex
waiteesCount int // current number of go-routines waiting for a tx to be indexed or committed

_txbs []byte // pre-allocated buffer to support tx serialization
_valBs []byte // pre-allocated buffer to support tx exportation
_txbs []byte // pre-allocated buffer to support tx serialization
_valBs [DefaultMaxValueLen]byte // pre-allocated buffer to support tx exportation
_valBsMux sync.Mutex

aht *ahtree.AHtree
inmemPrecommitWHub *watchers.WatchersHub
durablePrecommitWHub *watchers.WatchersHub
commitWHub *watchers.WatchersHub

indexers map[[sha256.Size]byte]*indexer
indexersMux sync.RWMutex
indexers map[[sha256.Size]byte]*indexer
nextIndexerID uint32
indexCache *cache.Cache

memSemaphore *semaphore.Semaphore // used by indexers to control amount acquired of memory
indexersMux sync.RWMutex

opts *Options

Expand Down Expand Up @@ -648,10 +654,10 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable

writeTxHeaderVersion: opts.WriteTxHeaderVersion,

timeFunc: opts.TimeFunc,
multiIndexing: opts.MultiIndexing,
indexers: make(map[[sha256.Size]byte]*indexer),

timeFunc: opts.TimeFunc,
multiIndexing: opts.MultiIndexing,
indexers: make(map[[sha256.Size]byte]*indexer),
memSemaphore: semaphore.New(uint64(opts.IndexOpts.MaxGlobalBufferedDataSize)),
useExternalCommitAllowance: opts.UseExternalCommitAllowance,
commitAllowedUpToTxID: committedTxID,

Expand All @@ -663,7 +669,6 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable

txPool: txPool,
_txbs: txbs,
_valBs: make([]byte, maxValueLen),

opts: opts,

Expand Down Expand Up @@ -849,6 +854,18 @@ func (s *ImmuStore) InitIndexing(spec *IndexSpec) error {
indexPath = filepath.Join(s.path, fmt.Sprintf("%s_%s", indexDirname, encPrefix))
}

if s.indexCache == nil {
if indexFactoryFunc := s.opts.IndexOpts.CacheFactory; indexFactoryFunc != nil {
s.indexCache = indexFactoryFunc()
} else {
c, err := cache.NewCache(s.opts.IndexOpts.CacheSize)
if err != nil {
return err
}
s.indexCache = c
}
}

indexer, err := newIndexer(indexPath, s, s.opts)
if err != nil {
return fmt.Errorf("%w: could not open indexer", err)
Expand All @@ -862,7 +879,6 @@ func (s *ImmuStore) InitIndexing(spec *IndexSpec) error {
}

s.indexers[indexPrefix] = indexer

indexer.init(spec)

return nil
Expand Down Expand Up @@ -2666,7 +2682,15 @@ func (s *ImmuStore) ExportTx(txID uint64, allowPrecommitted bool, skipIntegrityC
// val
// TODO: improve value reading implementation, get rid of _valBs
s._valBsMux.Lock()
_, err = s.readValueAt(s._valBs[:e.vLen], e.vOff, e.hVal, skipIntegrityCheck)

var valBuf []byte
if e.vLen > len(s._valBs) {
valBuf = make([]byte, e.vLen)
} else {
valBuf = s._valBs[:e.vLen]
}

_, err = s.readValueAt(valBuf, e.vOff, e.hVal, skipIntegrityCheck)
if err != nil && !errors.Is(err, io.EOF) {
s._valBsMux.Unlock()
return nil, err
Expand All @@ -2687,7 +2711,7 @@ func (s *ImmuStore) ExportTx(txID uint64, allowPrecommitted bool, skipIntegrityC
}

// val
_, err = buf.Write(s._valBs[:e.vLen])
_, err = buf.Write(valBuf)
if err != nil {
s._valBsMux.Unlock()
return nil, err
Expand Down Expand Up @@ -3006,7 +3030,6 @@ func (s *ImmuStore) appendableReaderForTx(txID uint64, allowPrecommitted bool) (
} else {
txr = &slicedReaderAt{bs: txbs.([]byte), off: txOff}
}

return appendable.NewReaderFrom(txr, txOff, txSize), nil
}

Expand Down
90 changes: 78 additions & 12 deletions embedded/store/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
"encoding/binary"
"errors"
"fmt"
"math"
"math/rand"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/codenotary/immudb/embedded/tbtree"
Expand All @@ -33,6 +36,13 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

var ErrWriteStalling = errors.New("write stalling")

const (
writeStallingSleepDurationMin = 10 * time.Millisecond
writeStallingSleepDurationMax = 50 * time.Millisecond
)

type indexer struct {
path string

Expand All @@ -45,8 +55,8 @@ type indexer struct {
maxBulkSize int
bulkPreparationTimeout time.Duration

_kvs []*tbtree.KVT //pre-allocated for multi-tx bulk indexing
_val []byte //pre-allocated buffer to read entry values while mapping
_kvs []*tbtree.KVT //pre-allocated for multi-tx bulk indexing
_val [DefaultMaxValueLen]byte //pre-allocated buffer to read entry values while mapping

index *tbtree.TBtree

Expand Down Expand Up @@ -96,12 +106,19 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error)
return nil, fmt.Errorf("%w: nil store", ErrIllegalArguments)
}

id := atomic.AddUint32(&store.nextIndexerID, 1)
if id-1 > math.MaxUint16 {
return nil, ErrMaxIndexersLimitExceeded
}

indexOpts := tbtree.DefaultOptions().
WithIdentifier(uint16(id - 1)).
WithReadOnly(opts.ReadOnly).
WithFileMode(opts.FileMode).
WithLogger(opts.logger).
WithFileSize(opts.FileSize).
WithCacheSize(opts.IndexOpts.CacheSize).
WithCache(store.indexCache).
WithFlushThld(opts.IndexOpts.FlushThld).
WithSyncThld(opts.IndexOpts.SyncThld).
WithFlushBufferSize(opts.IndexOpts.FlushBufferSize).
Expand All @@ -115,7 +132,11 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error)
WithCommitLogMaxOpenedFiles(opts.IndexOpts.CommitLogMaxOpenedFiles).
WithRenewSnapRootAfter(opts.IndexOpts.RenewSnapRootAfter).
WithCompactionThld(opts.IndexOpts.CompactionThld).
WithDelayDuringCompaction(opts.IndexOpts.DelayDuringCompaction)
WithDelayDuringCompaction(opts.IndexOpts.DelayDuringCompaction).
WithMaxBufferedDataSize(opts.IndexOpts.MaxBufferedDataSize).
WithOnFlushFunc(func(releasedDataSize int) {
store.memSemaphore.Release(uint64(releasedDataSize))
})

if opts.appFactory != nil {
indexOpts.WithAppFactory(tbtree.AppFactoryFunc(opts.appFactory))
Expand Down Expand Up @@ -150,7 +171,6 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error)
maxBulkSize: opts.IndexOpts.MaxBulkSize,
bulkPreparationTimeout: opts.IndexOpts.BulkPreparationTimeout,
_kvs: kvs,
_val: make([]byte, store.maxValueLen),
path: path,
index: index,
wHub: wHub,
Expand Down Expand Up @@ -457,13 +477,33 @@ func (idx *indexer) doIndexing() {
if errors.Is(err, ErrAlreadyClosed) || errors.Is(err, tbtree.ErrAlreadyClosed) {
return
}
if err != nil {

if err != nil && !errors.Is(err, ErrWriteStalling) {
idx.store.logger.Errorf("indexing failed at '%s' due to error: %v", idx.store.path, err)
time.Sleep(60 * time.Second)
}

if err := idx.handleWriteStalling(err); err != nil {
idx.store.logger.Errorf("indexing failed at '%s' due to error: %v", idx.store.path, err)
time.Sleep(60 * time.Second)
}
}
}

func (idx *indexer) handleWriteStalling(err error) error {
if !errors.Is(err, ErrWriteStalling) {
return nil
}

if err := idx.store.FlushIndexes(0, false); err != nil {
return err
}

sleepTime := writeStallingSleepDurationMin + time.Duration(rand.Intn(int(writeStallingSleepDurationMax-writeStallingSleepDurationMin+1)))
time.Sleep(sleepTime)
return nil
}

func serializeIndexableEntry(b []byte, txmd []byte, e *TxEntry, kvmd []byte) int {
n := 0

Expand Down Expand Up @@ -500,18 +540,27 @@ func (idx *indexer) mapKey(key []byte, vLen int, vOff int64, hVal [sha256.Size]b
return key, nil
}

_, err = idx.store.readValueAt(idx._val[:vLen], vOff, hVal, false)
buf := idx.valBuffer(vLen)
_, err = idx.store.readValueAt(buf, vOff, hVal, false)
if err != nil {
return nil, err
}

return mapper(key, idx._val[:vLen])
return mapper(key, buf)
}

func (idx *indexer) valBuffer(vLen int) []byte {
if vLen > len(idx._val) {
return make([]byte, vLen)
}
return idx._val[:vLen]
}

func (idx *indexer) indexSince(txID uint64) error {
ctx, cancel := context.WithTimeout(context.Background(), idx.bulkPreparationTimeout)
defer cancel()

acquiredMem := 0
bulkSize := 0
indexableEntries := 0

Expand All @@ -521,6 +570,7 @@ func (idx *indexer) indexSince(txID uint64) error {
return err
}

txIndexedEntries := 0
txEntries := idx.tx.Entries()

var txmd []byte
Expand Down Expand Up @@ -568,6 +618,7 @@ func (idx *indexer) indexSince(txID uint64) error {
idx._kvs[indexableEntries].T = txID + uint64(i)

indexableEntries++
txIndexedEntries++

if idx.spec.InjectiveMapping && txID > 1 {
// wait for source indexer to be up to date
Expand All @@ -591,11 +642,6 @@ func (idx *indexer) indexSince(txID uint64) error {
return err
}

_, err = idx.store.readValueAt(idx._val[:prevEntry.vLen], prevEntry.vOff, prevEntry.hVal, false)
if err != nil {
return err
}

targetPrevKey, err := idx.mapKey(sourceKey, prevEntry.vLen, prevEntry.vOff, prevEntry.hVal, idx.spec.TargetEntryMapper)
if err != nil {
return err
Expand Down Expand Up @@ -637,12 +683,24 @@ func (idx *indexer) indexSince(txID uint64) error {
idx._kvs[indexableEntries].T = txID + uint64(i)

indexableEntries++
txIndexedEntries++
} else if !errors.Is(err, ErrKeyNotFound) {
return err
}
}
}

if indexableEntries > 0 && txIndexedEntries > 0 {
size := estimateEntriesSize(idx._kvs[indexableEntries-txIndexedEntries : indexableEntries])
if !idx.store.memSemaphore.Acquire(uint64(size)) {
if acquiredMem == 0 {
return ErrWriteStalling
}
break
}
acquiredMem += size
}

bulkSize++

if bulkSize < idx.maxBulkSize {
Expand Down Expand Up @@ -675,3 +733,11 @@ func (idx *indexer) indexSince(txID uint64) error {

return nil
}

func estimateEntriesSize(kvs []*tbtree.KVT) int {
size := 0
for _, kv := range kvs {
size += len(kv.K) + len(kv.V) + 8
}
return size
}
Loading

0 comments on commit 2a3d382

Please sign in to comment.