From 68b32d56cf31d2a07ea2a43e290f45202bbb9321 Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Fri, 26 Apr 2024 10:55:11 +0200 Subject: [PATCH] Alow order by on non indexed fields Signed-off-by: Stefano Scafiti --- Dockerfile | 1 + embedded/sql/engine.go | 32 ++- embedded/sql/engine_test.go | 383 ++++++++++++++++++++---------- embedded/sql/file_sort.go | 385 +++++++++++++++++++++++++++++++ embedded/sql/options.go | 21 +- embedded/sql/options_test.go | 6 + embedded/sql/row_reader.go | 1 + embedded/sql/sort_reader.go | 208 +++++++++++++++++ embedded/sql/sort_reader_test.go | 73 ++++++ embedded/sql/sql_tx.go | 31 ++- embedded/sql/stmt.go | 125 +++++----- 11 files changed, 1086 insertions(+), 180 deletions(-) create mode 100644 embedded/sql/file_sort.go create mode 100644 embedded/sql/sort_reader.go create mode 100644 embedded/sql/sort_reader_test.go diff --git a/Dockerfile b/Dockerfile index 0bddcff4a7..8c92907b15 100644 --- a/Dockerfile +++ b/Dockerfile @@ -53,6 +53,7 @@ COPY --from=build /src/immudb /usr/sbin/immudb COPY --from=build /src/immuadmin /usr/local/bin/immuadmin COPY --from=build --chown="$IMMU_UID:$IMMU_GID" /empty "$IMMUDB_HOME" COPY --from=build --chown="$IMMU_UID:$IMMU_GID" /empty "$IMMUDB_DIR" +COPY --from=build --chown="$IMMU_UID:$IMMU_GID" /empty /tmp COPY --from=build "/etc/ssl/certs/ca-certificates.crt" "/etc/ssl/certs/ca-certificates.crt" EXPOSE 3322 diff --git a/embedded/sql/engine.go b/embedded/sql/engine.go index 62de221f08..f33e24f6f2 100644 --- a/embedded/sql/engine.go +++ b/embedded/sql/engine.go @@ -54,13 +54,12 @@ var ErrNotNullableColumnCannotBeNull = errors.New("not nullable column can not b var ErrNewColumnMustBeNullable = errors.New("new column must be nullable") var ErrIndexAlreadyExists = errors.New("index already exists") var ErrMaxNumberOfColumnsInIndexExceeded = errors.New("number of columns in multi-column index exceeded") -var ErrNoAvailableIndex = errors.New("no available index") var ErrIndexNotFound = errors.New("index not found") var ErrInvalidNumberOfValues = errors.New("invalid number of values provided") var ErrInvalidValue = errors.New("invalid value provided") var ErrInferredMultipleTypes = errors.New("inferred multiple types") var ErrExpectingDQLStmt = errors.New("illegal statement. DQL statement expected") -var ErrLimitedOrderBy = errors.New("order is limit to one indexed column") +var ErrLimitedOrderBy = errors.New("order by is limited to one column") var ErrLimitedGroupBy = errors.New("group by requires ordering by the grouping column") var ErrIllegalMappedKey = errors.New("error illegal mapped key") var ErrCorruptedData = store.ErrCorruptedData @@ -104,6 +103,7 @@ type Engine struct { prefix []byte distinctLimit int + sortBufferSize int autocommit bool lazyIndexConstraintValidation bool @@ -144,6 +144,7 @@ func NewEngine(st *store.ImmuStore, opts *Options) (*Engine, error) { store: st, prefix: make([]byte, len(opts.prefix)), distinctLimit: opts.distinctLimit, + sortBufferSize: opts.sortBufferSize, autocommit: opts.autocommit, lazyIndexConstraintValidation: opts.lazyIndexConstraintValidation, multidbHandler: opts.multidbHandler, @@ -506,6 +507,33 @@ func (e *Engine) execPreparedStmts(ctx context.Context, tx *SQLTx, stmts []SQLSt return currTx, committedTxs, stmts[execStmts:], nil } +func (e *Engine) queryAll(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) ([]*Row, error) { + reader, err := e.Query(ctx, tx, sql, params) + if err != nil { + return nil, err + } + defer reader.Close() + + return readAllRows(ctx, reader) +} + +func readAllRows(ctx context.Context, reader RowReader) ([]*Row, error) { + rows := make([]*Row, 0, 100) + for { + row, err := reader.Read(ctx) + if err == ErrNoMoreRows { + break + } + + if err != nil { + return nil, err + } + + rows = append(rows, row) + } + return rows, nil +} + func (e *Engine) Query(ctx context.Context, tx *SQLTx, sql string, params map[string]interface{}) (RowReader, error) { stmts, err := Parse(strings.NewReader(sql)) if err != nil { diff --git a/embedded/sql/engine_test.go b/embedded/sql/engine_test.go index 6e52aa9203..1594c9e305 100644 --- a/embedded/sql/engine_test.go +++ b/embedded/sql/engine_test.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "math" + "math/rand" "os" "sort" "strconv" @@ -30,6 +31,7 @@ import ( "testing" "time" + "github.com/codenotary/immudb/embedded/logger" "github.com/codenotary/immudb/embedded/store" "github.com/codenotary/immudb/embedded/tbtree" "github.com/google/uuid" @@ -2461,7 +2463,7 @@ func TestQuery(t *testing.T) { } t.Run("should resolve every row", func(t *testing.T) { - r, err = engine.Query(context.Background(), nil, "SELECT * FROM table1", nil) + r, err = engine.Query(context.Background(), nil, "SELECT * FROM table1 ORDER BY title", nil) require.NoError(t, err) colsBySel, err := r.colsBySelector(context.Background()) @@ -2583,8 +2585,29 @@ func TestQuery(t *testing.T) { }) r, err = engine.Query(context.Background(), nil, "SELECT id, title, active, payload FROM table1 ORDER BY title", nil) - require.ErrorIs(t, err, ErrLimitedOrderBy) - require.Nil(t, r) + require.NoError(t, err) + require.NotNil(t, r) + + allRows := make([]*Row, rowCount) + for i := 0; i < rowCount; i++ { + row, err := r.Read(context.Background()) + require.NoError(t, err) + + allRows[i] = row + } + _, err = r.Read(context.Background()) + require.ErrorIs(t, ErrNoMoreRows, err) + + err = r.Close() + require.NoError(t, err) + + isSorted := sort.SliceIsSorted(allRows, func(i, j int) bool { + r1 := allRows[i].ValuesByPosition[1] + r2 := allRows[j].ValuesByPosition[1] + + return r1.RawValue().(string) < r2.RawValue().(string) + }) + require.True(t, isSorted) r, err = engine.Query(context.Background(), nil, "SELECT Id, Title, Active, payload FROM Table1 ORDER BY Id DESC", nil) require.NoError(t, err) @@ -3070,11 +3093,6 @@ func TestIndexing(t *testing.T) { require.ErrorIs(t, err, store.ErrKeyAlreadyExists) }) - t.Run("should fail due non-available index", func(t *testing.T) { - _, err = engine.Query(context.Background(), nil, "SELECT * FROM table1 ORDER BY amount DESC", nil) - require.ErrorIs(t, err, ErrNoAvailableIndex) - }) - t.Run("should use primary index by default", func(t *testing.T) { r, err := engine.Query(context.Background(), nil, "SELECT * FROM table1", nil) require.NoError(t, err) @@ -3317,9 +3335,18 @@ func TestIndexing(t *testing.T) { require.NoError(t, err) }) - t.Run("should fail using index on `ts` when ordering by `title`", func(t *testing.T) { - _, err := engine.Query(context.Background(), nil, "SELECT * FROM table1 USE INDEX ON (ts) ORDER BY title", nil) - require.ErrorIs(t, err, ErrNoAvailableIndex) + t.Run("should use specified index on `ts` when ordering by `title`", func(t *testing.T) { + r, err := engine.Query(context.Background(), nil, "SELECT * FROM table1 USE INDEX ON (ts) ORDER BY title", nil) + require.NoError(t, err) + + scanSpecs := r.ScanSpecs() + require.NotNil(t, scanSpecs) + require.Len(t, scanSpecs.Index.cols, 1) + require.Equal(t, scanSpecs.Index.cols[0].colName, "ts") + require.True(t, scanSpecs.SortRequired) + + err = r.Close() + require.NoError(t, err) }) t.Run("should use index on `title` with max value in desc order", func(t *testing.T) { @@ -3620,7 +3647,7 @@ func TestOrderBy(t *testing.T) { require.NoError(t, err) defer closeStore(t, st) - engine, err := NewEngine(st, DefaultOptions().WithPrefix(sqlPrefix)) + engine, err := NewEngine(st, DefaultOptions().WithPrefix(sqlPrefix).WithSortBufferSize(1024)) require.NoError(t, err) _, _, err = engine.Exec(context.Background(), nil, "CREATE TABLE table1 (id INTEGER, title VARCHAR[100], age INTEGER, PRIMARY KEY id)", nil) @@ -3629,37 +3656,13 @@ func TestOrderBy(t *testing.T) { _, err = engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY id, title DESC", nil) require.ErrorIs(t, err, ErrLimitedOrderBy) - _, err = engine.Query(context.Background(), nil, "SELECT id, title, age FROM (SELECT id, title, age FROM table1) ORDER BY id", nil) - require.ErrorIs(t, err, ErrLimitedOrderBy) - - _, err = engine.Query(context.Background(), nil, "SELECT id, title, age FROM (SELECT id, title, age FROM table1 AS t1) ORDER BY age DESC", nil) - require.ErrorIs(t, err, ErrLimitedOrderBy) - _, err = engine.Query(context.Background(), nil, "SELECT id, title, age FROM table2 ORDER BY title", nil) require.ErrorIs(t, err, ErrTableDoesNotExist) _, err = engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY amount", nil) require.ErrorIs(t, err, ErrColumnDoesNotExist) - _, _, err = engine.Exec(context.Background(), nil, "CREATE INDEX ON table1(title)", nil) - require.NoError(t, err) - - _, err = engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY age", nil) - require.ErrorIs(t, err, ErrLimitedOrderBy) - - _, _, err = engine.Exec(context.Background(), nil, "CREATE INDEX ON table1(age)", nil) - require.NoError(t, err) - - params := make(map[string]interface{}, 1) - params["age"] = nil - _, _, err = engine.Exec(context.Background(), nil, "INSERT INTO table1 (id, title, age) VALUES (1, 'title', @age)", params) - require.NoError(t, err) - - _, _, err = engine.Exec(context.Background(), nil, "INSERT INTO table1 (id, title) VALUES (2, 'title')", nil) - require.NoError(t, err) - - rowCount := 1 - + rowCount := 100 + rand.Intn(engine.sortBufferSize-100) // [100, sortBufferSize] for i := 0; i < rowCount; i++ { params := make(map[string]interface{}, 3) params["id"] = i + 3 @@ -3670,110 +3673,207 @@ func TestOrderBy(t *testing.T) { require.NoError(t, err) } - r, err := engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY title", nil) - require.NoError(t, err) + t.Run("ascending order by should be executed using in-memory sorting", func(t *testing.T) { + reader, err := engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY age", nil) + require.NoError(t, err) + defer reader.Close() - orderBy := r.OrderBy() - require.NotNil(t, orderBy) - require.Len(t, orderBy, 1) - require.Equal(t, "title", orderBy[0].Column) - require.Equal(t, "table1", orderBy[0].Table) + rows, err := readAllRows(context.Background(), reader) + require.NoError(t, err) + require.Len(t, rows, rowCount) - row, err := r.Read(context.Background()) - require.NoError(t, err) - require.Len(t, row.ValuesBySelector, 3) + tx := reader.Tx() + require.Len(t, tx.tempFiles, 0) - require.Equal(t, int64(1), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) - require.Equal(t, "title", row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) - require.Nil(t, row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) + isSorted := sort.SliceIsSorted(rows, func(i, j int) bool { + v1 := rows[i].ValuesByPosition[2].RawValue().(int64) + v2 := rows[j].ValuesByPosition[2].RawValue().(int64) + return v1 < v2 + }) + require.True(t, isSorted) + }) - row, err = r.Read(context.Background()) - require.NoError(t, err) - require.Len(t, row.ValuesBySelector, 3) + t.Run("descending order by should be executed using in-memory sorting", func(t *testing.T) { + reader, err := engine.Query(context.Background(), nil, "SELECT title, age FROM table1 ORDER BY title DESC", nil) + require.NoError(t, err) + defer reader.Close() - require.Equal(t, int64(2), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) - require.Equal(t, "title", row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) - require.Nil(t, row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) + rows, err := readAllRows(context.Background(), reader) + require.NoError(t, err) + require.Len(t, rows, rowCount) - for i := 0; i < rowCount; i++ { - row, err := r.Read(context.Background()) + tx := reader.Tx() + require.Len(t, tx.tempFiles, 0) + + isSorted := sort.SliceIsSorted(rows, func(i, j int) bool { + v1 := rows[i].ValuesByPosition[0].RawValue().(string) + v2 := rows[j].ValuesByPosition[0].RawValue().(string) + return v1 >= v2 + }) + require.True(t, isSorted) + }) + + t.Run("ascending order by should be executed using file sorting", func(t *testing.T) { + engine.sortBufferSize = 4 + rand.Intn(13) // [4, 16] + + reader, err := engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY age", nil) require.NoError(t, err) - require.NotNil(t, row) - require.Len(t, row.ValuesBySelector, 3) + defer reader.Close() - require.Equal(t, int64(i+3), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) - require.Equal(t, fmt.Sprintf("title%d", i), row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) - require.Equal(t, int64(40+i), row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) - } + rows, err := readAllRows(context.Background(), reader) + require.NoError(t, err) + require.Len(t, rows, rowCount) - err = r.Close() - require.NoError(t, err) + tx := reader.Tx() + require.Len(t, tx.tempFiles, 2) + + isSorted := sort.SliceIsSorted(rows, func(i, j int) bool { + v1 := rows[i].ValuesByPosition[2].RawValue().(int64) + v2 := rows[j].ValuesByPosition[2].RawValue().(int64) + return v1 < v2 + }) + require.True(t, isSorted) + }) + + t.Run("descending order by should be executed using file sorting", func(t *testing.T) { + engine.sortBufferSize = 4 + rand.Intn(13) // [4, 16] + + reader, err := engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY title DESC", nil) + require.NoError(t, err) + defer reader.Close() + + rows, err := readAllRows(context.Background(), reader) + require.NoError(t, err) + require.Len(t, rows, rowCount) + + tx := reader.Tx() + require.Len(t, tx.tempFiles, 2) + + isSorted := sort.SliceIsSorted(rows, func(i, j int) bool { + v1 := rows[i].ValuesByPosition[1].RawValue().(string) + v2 := rows[j].ValuesByPosition[1].RawValue().(string) + return v1 >= v2 + }) + require.True(t, isSorted) + }) - r, err = engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY age", nil) + t.Run("order by on top of subquery", func(t *testing.T) { + rows, err := engine.queryAll(context.Background(), nil, "SELECT id, title, age FROM (SELECT id, title, age FROM table1 AS t1) ORDER BY age DESC", nil) + require.NoError(t, err) + require.Len(t, rows, rowCount) + + isSorted := sort.SliceIsSorted(rows, func(i, j int) bool { + v1 := rows[i].ValuesByPosition[2].RawValue().(int64) + v2 := rows[j].ValuesByPosition[2].RawValue().(int64) + return v1 >= v2 + }) + require.True(t, isSorted) + }) + + _, _, err = engine.Exec(context.Background(), nil, "CREATE INDEX ON table1(title)", nil) require.NoError(t, err) - row, err = r.Read(context.Background()) + _, _, err = engine.Exec(context.Background(), nil, "CREATE INDEX ON table1(age)", nil) require.NoError(t, err) - require.Len(t, row.ValuesBySelector, 3) - require.Equal(t, int64(1), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) - require.Equal(t, "title", row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) - require.Nil(t, row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) + params := make(map[string]interface{}, 1) + params["age"] = nil + _, _, err = engine.Exec(context.Background(), nil, "INSERT INTO table1 (id, title, age) VALUES (1, 'title', @age)", params) + require.NoError(t, err) - row, err = r.Read(context.Background()) + _, _, err = engine.Exec(context.Background(), nil, "INSERT INTO table1 (id, title) VALUES (2, 'title')", nil) require.NoError(t, err) - require.Len(t, row.ValuesBySelector, 3) - require.Equal(t, int64(2), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) - require.Equal(t, "title", row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) - require.Nil(t, row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) + t.Run("ascending order by should be executed using index", func(t *testing.T) { + r, err := engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY title", nil) + require.NoError(t, err) + + orderBy := r.OrderBy() + require.NotNil(t, orderBy) + require.Len(t, orderBy, 1) + require.Equal(t, "title", orderBy[0].Column) + require.Equal(t, "table1", orderBy[0].Table) + scanSpecs := r.ScanSpecs() + require.False(t, scanSpecs.SortRequired) - for i := 0; i < rowCount; i++ { row, err := r.Read(context.Background()) require.NoError(t, err) - require.NotNil(t, row) require.Len(t, row.ValuesBySelector, 3) - require.Equal(t, int64(i+3), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) - require.Equal(t, fmt.Sprintf("title%d", i), row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) - require.Equal(t, int64(40+i), row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) - } + require.Equal(t, int64(1), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) + require.Equal(t, "title", row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) + require.Nil(t, row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) - err = r.Close() - require.NoError(t, err) + row, err = r.Read(context.Background()) + require.NoError(t, err) + require.Len(t, row.ValuesBySelector, 3) - r, err = engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY age DESC", nil) - require.NoError(t, err) + require.Equal(t, int64(2), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) + require.Equal(t, "title", row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) + require.Nil(t, row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) - for i := 0; i < rowCount; i++ { - row, err := r.Read(context.Background()) + titles := make([]string, 0, rowCount) + for i := 0; i < rowCount; i++ { + row, err := r.Read(context.Background()) + require.NoError(t, err) + require.NotNil(t, row) + require.Len(t, row.ValuesBySelector, 3) + + id := row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue().(int64) + title := row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue().(string) + + titles = append(titles, title) + + j, err := strconv.ParseInt(strings.TrimPrefix(title, "title"), 10, 64) + require.NoError(t, err) + require.Equal(t, j+3, id) + } + + isSorted := sort.SliceIsSorted(titles, func(i, j int) bool { + return titles[i] < titles[j] + }) + require.True(t, isSorted) + err = r.Close() require.NoError(t, err) - require.NotNil(t, row) - require.Len(t, row.ValuesBySelector, 3) + }) - require.Equal(t, int64(rowCount-1-i+3), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) - require.Equal(t, fmt.Sprintf("title%d", rowCount-1-i), row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) - require.Equal(t, int64(40-(rowCount-1-i)), row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) - } + t.Run("descending order by should be executed using index", func(t *testing.T) { + r, err := engine.Query(context.Background(), nil, "SELECT id, title, age FROM table1 ORDER BY age DESC", nil) + require.NoError(t, err) - row, err = r.Read(context.Background()) - require.NoError(t, err) - require.Len(t, row.ValuesBySelector, 3) + orderBy := r.OrderBy() + require.NotNil(t, orderBy) + require.Len(t, orderBy, 1) + require.Equal(t, "age", orderBy[0].Column) + require.Equal(t, "table1", orderBy[0].Table) + scanSpecs := r.ScanSpecs() + require.False(t, scanSpecs.SortRequired) - require.Equal(t, int64(2), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) - require.Equal(t, "title", row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) - require.Nil(t, row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) + ages := make([]int64, 0, rowCount+2) + for i := 0; i < rowCount; i++ { + row, err := r.Read(context.Background()) + require.NoError(t, err) + require.NotNil(t, row) + require.Len(t, row.ValuesBySelector, 3) - row, err = r.Read(context.Background()) - require.NoError(t, err) - require.Len(t, row.ValuesBySelector, 3) + id := row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue().(int64) + title := row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue().(string) + age := row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue().(int64) - require.Equal(t, int64(1), row.ValuesBySelector[EncodeSelector("", "table1", "id")].RawValue()) - require.Equal(t, "title", row.ValuesBySelector[EncodeSelector("", "table1", "title")].RawValue()) - require.Nil(t, row.ValuesBySelector[EncodeSelector("", "table1", "age")].RawValue()) + ages = append(ages, age) - err = r.Close() - require.NoError(t, err) + j, err := strconv.ParseInt(strings.TrimPrefix(title, "title"), 10, 64) + require.NoError(t, err) + require.Equal(t, j+3, id) + } + + isSorted := sort.SliceIsSorted(ages, func(i, j int) bool { + return ages[i] >= ages[j] + }) + require.True(t, isSorted) + err = r.Close() + require.NoError(t, err) + }) } func TestQueryWithRowFiltering(t *testing.T) { @@ -3899,9 +3999,6 @@ func TestQueryWithInClause(t *testing.T) { _, _, err := engine.Exec(context.Background(), nil, "CREATE TABLE table1 (id INTEGER, title VARCHAR[50], active BOOLEAN, PRIMARY KEY id)", nil) require.NoError(t, err) - _, _, err = engine.Exec(context.Background(), nil, "CREATE INDEX ON table1(title)", nil) - require.NoError(t, err) - rowCount := 10 for i := 0; i < rowCount; i++ { @@ -3915,7 +4012,7 @@ func TestQueryWithInClause(t *testing.T) { require.False(t, inListExp.isConstant()) t.Run("infer parameters without parameters should return an empty list", func(t *testing.T) { - params, err := engine.InferParameters(context.Background(), nil, "SELECT id, title, active FROM table1 WHERE title IN ('title0', 'title1')") + params, err := engine.InferParameters(context.Background(), nil, "SELECT id, title, active FROM table1 WHERE title IN ('title0', 'title1') ORDER BY title") require.NoError(t, err) require.Empty(t, params) }) @@ -7144,10 +7241,6 @@ func TestCopyCatalogToTx(t *testing.T) { _, _, err := engine.Exec(context.Background(), nil, "INSERT INTO table1 (name, amount) VALUES ('name1', 10), ('name1', 10)", nil) require.ErrorIs(t, err, store.ErrKeyAlreadyExists) - // should fail due non-available index - _, err = engine.Query(context.Background(), nil, "SELECT * FROM table1 ORDER BY amount DESC", nil) - require.ErrorIs(t, err, ErrNoAvailableIndex) - // should use primary index by default r, err := engine.Query(context.Background(), nil, "SELECT * FROM table1", nil) require.NoError(t, err) @@ -7246,6 +7339,62 @@ func BenchmarkInsertInto(b *testing.B) { } } +func BenchmarkNotIndexedOrderBy(b *testing.B) { + st, err := store.Open(b.TempDir(), store.DefaultOptions().WithMultiIndexing(true).WithLogger(logger.NewMemoryLoggerWithLevel(logger.LogError))) + if err != nil { + b.Fail() + } + + defer st.Close() + + engine, err := NewEngine(st, DefaultOptions().WithPrefix(sqlPrefix).WithSortBufferSize(1024)) + if err != nil { + b.Fail() + } + + _, _, err = engine.Exec(context.Background(), nil, `CREATE TABLE mytable(id INTEGER AUTO_INCREMENT, title VARCHAR[50], PRIMARY KEY id);`, nil) + if err != nil { + b.Fail() + } + + for nBatch := 0; nBatch < 100; nBatch++ { + tx, err := engine.NewTx(context.Background(), DefaultTxOptions().WithExplicitClose(true)) + if err != nil { + b.Fail() + } + + nRows := 1000 + for i := 0; i < nRows; i++ { + _, _, err := engine.Exec(context.Background(), tx, "INSERT INTO mytable(title) VALUES (@title)", map[string]interface{}{ + "title": fmt.Sprintf("title%d", rand.Int()), + }) + if err != nil { + b.Fail() + } + } + + err = tx.Commit(context.Background()) + if err != nil { + b.Fail() + } + } + + b.ResetTimer() + + start := time.Now() + reader, err := engine.Query(context.Background(), nil, "SELECT * FROM mytable ORDER BY title ASC LIMIT 1", nil) + if err != nil { + b.Fail() + } + defer reader.Close() + + _, err = reader.Read(context.Background()) + if err != nil { + b.Fail() + } + fmt.Println("Elapsed:", time.Since(start)) +} + func TestLikeWithNullableColumns(t *testing.T) { engine := setupCommonTest(t) diff --git a/embedded/sql/file_sort.go b/embedded/sql/file_sort.go new file mode 100644 index 0000000000..f459a8ad3f --- /dev/null +++ b/embedded/sql/file_sort.go @@ -0,0 +1,385 @@ +/* +Copyright 2024 Codenotary Inc. All rights reserved. + +SPDX-License-Identifier: BUSL-1.1 +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://mariadb.com/bsl11/ + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sql + +import ( + "bufio" + "bytes" + "encoding/binary" + "io" + "os" + "sort" +) + +type sortedChunk struct { + offset uint64 + size uint64 +} + +type fileSorter struct { + colPosBySelector map[string]int + colTypes []string + cmp func(r1, r2 Tuple) bool + + tx *SQLTx + sortBufSize int + sortBuf []*Row + nextIdx int + + tempFile *os.File + writer *bufio.Writer + tempFileSize uint64 + + chunksToMerge []sortedChunk +} + +func (s *fileSorter) update(r *Row) error { + if s.nextIdx == s.sortBufSize { + err := s.sortAndFlushBuffer() + if err != nil { + return err + } + s.nextIdx = 0 + } + + s.sortBuf[s.nextIdx] = r + s.nextIdx++ + + return nil +} + +func (s *fileSorter) finalize() (resultReader, error) { + if s.nextIdx > 0 { + s.sortBuffer() + } + + // result rows are all in memory + if len(s.chunksToMerge) == 0 { + return &bufferResultReader{ + sortBuf: s.sortBuf[:s.nextIdx], + }, nil + } + + err := s.flushBuffer() + if err != nil { + return nil, err + } + + err = s.writer.Flush() + if err != nil { + return nil, err + } + return s.mergeAllChunks() +} + +func (s *fileSorter) mergeAllChunks() (resultReader, error) { + currFile := s.tempFile + + outFile, err := s.tx.createTempFile() + if err != nil { + return nil, err + } + + lbuf := &bufio.Reader{} + rbuf := &bufio.Reader{} + + lr := &fileRowReader{ + colTypes: s.colTypes, + reader: lbuf, + } + rr := &fileRowReader{ + colTypes: s.colTypes, + reader: rbuf, + } + + chunks := s.chunksToMerge + for len(chunks) > 1 { + s.writer.Reset(outFile) + + var offset uint64 + + newChunks := make([]sortedChunk, (len(chunks)+1)/2) + for i := 0; i < len(chunks)/2; i++ { + c1 := chunks[i*2] + c2 := chunks[i*2+1] + + lbuf.Reset(io.NewSectionReader(currFile, int64(c1.offset), int64(c1.size))) + rbuf.Reset(io.NewSectionReader(currFile, int64(c2.offset), int64(c2.size))) + + err := s.mergeChunks(lr, rr, s.writer) + if err != nil { + return nil, err + } + + newChunks[i] = sortedChunk{ + offset: offset, + size: c1.size + c2.size, + } + offset += c1.size + c2.size + } + + err := s.writer.Flush() + if err != nil { + return nil, err + } + + if len(chunks)%2 != 0 { // copy last sorted chunk + lastChunk := chunks[len(chunks)-1] + + _, err := io.Copy(outFile, io.NewSectionReader(currFile, int64(lastChunk.offset), int64(lastChunk.size))) + if err != nil { + return nil, err + } + newChunks[len(chunks)/2] = lastChunk + } + + temp := currFile + currFile = outFile + outFile = temp + + _, err = outFile.Seek(0, io.SeekStart) + if err != nil { + return nil, err + } + + chunks = newChunks + } + + return &fileRowReader{ + colTypes: s.colTypes, + colPosBySelector: s.colPosBySelector, + reader: bufio.NewReader(io.NewSectionReader(currFile, 0, int64(s.tempFileSize))), + }, nil +} + +func (s *fileSorter) mergeChunks(lr, rr *fileRowReader, writer io.Writer) error { + var err error + var lrAtEOF bool + var t1, t2 Tuple + + for { + if t1 == nil { + t1, err = lr.ReadValues() + if err == io.EOF { + lrAtEOF = true + break + } + + if err != nil { + return err + } + } + + if t2 == nil { + t2, err = rr.ReadValues() + if err == io.EOF { + break + } + + if err != nil { + return err + } + } + + var rawData []byte + if s.cmp(t1, t2) { + rawData = lr.rowBuf.Bytes() + t1 = nil + } else { + rawData = rr.rowBuf.Bytes() + t2 = nil + } + + _, err := writer.Write(rawData) + if err != nil { + return err + } + } + + readerToCopy := lr + if lrAtEOF { + readerToCopy = rr + } + + _, err = writer.Write(readerToCopy.rowBuf.Bytes()) + if err != nil { + return err + } + + _, err = io.Copy(writer, readerToCopy.reader) + return err +} + +type resultReader interface { + Read() (*Row, error) +} + +type bufferResultReader struct { + sortBuf []*Row + nextIdx int +} + +func (r *bufferResultReader) Read() (*Row, error) { + if r.nextIdx == len(r.sortBuf) { + return nil, ErrNoMoreRows + } + + row := r.sortBuf[r.nextIdx] + r.nextIdx++ + return row, nil +} + +type fileRowReader struct { + colPosBySelector map[string]int + colTypes []SQLValueType + reader io.Reader + rowBuf bytes.Buffer +} + +func (r *fileRowReader) ReadValues() ([]TypedValue, error) { + var size uint16 + err := binary.Read(r.reader, binary.BigEndian, &size) + if err != nil { + return nil, err + } + + r.rowBuf.Reset() + + binary.Write(&r.rowBuf, binary.BigEndian, &size) + + _, err = io.CopyN(&r.rowBuf, r.reader, int64(size)) + if err != nil { + return nil, err + } + + data := r.rowBuf.Bytes() + return decodeValues(data[2:], r.colTypes) +} + +func (r *fileRowReader) Read() (*Row, error) { + values, err := r.ReadValues() + if err == io.EOF { + return nil, ErrNoMoreRows + } + if err != nil { + return nil, err + } + + valuesBySelector := make(map[string]TypedValue) + for sel, pos := range r.colPosBySelector { + valuesBySelector[sel] = values[pos] + } + + row := &Row{ + ValuesByPosition: values, + ValuesBySelector: valuesBySelector, + } + return row, nil +} + +func decodeValues(data []byte, cols []string) ([]TypedValue, error) { + values := make([]TypedValue, len(cols)) + + var voff int + for i, col := range cols { + v, n, err := DecodeValue(data[voff:], col) + if err != nil { + return nil, err + } + voff += n + + values[i] = v + } + return values, nil +} + +func (s *fileSorter) sortAndFlushBuffer() error { + s.sortBuffer() + return s.flushBuffer() +} + +func (s *fileSorter) sortBuffer() { + buf := s.sortBuf[:s.nextIdx] + + sort.Slice(buf, func(i, j int) bool { + r1 := buf[i] + r2 := buf[j] + + return s.cmp(r1.ValuesByPosition, r2.ValuesByPosition) + }) +} + +func (s *fileSorter) flushBuffer() error { + writer, err := s.tempFileWriter() + if err != nil { + return err + } + + var chunkSize uint64 + for _, row := range s.sortBuf[:s.nextIdx] { + data, err := encodeRow(row) + if err != nil { + return err + } + + _, err = writer.Write(data) + if err != nil { + return err + } + + chunkSize += uint64(len(data)) + } + + s.chunksToMerge = append(s.chunksToMerge, sortedChunk{ + offset: s.tempFileSize, + size: chunkSize, + }) + s.tempFileSize += chunkSize + return nil +} + +func (s *fileSorter) tempFileWriter() (*bufio.Writer, error) { + if s.writer != nil { + return s.writer, nil + } + file, err := s.tx.createTempFile() + if err != nil { + return nil, err + } + s.tempFile = file + s.writer = bufio.NewWriter(file) + return s.writer, nil +} + +func encodeRow(r *Row) ([]byte, error) { + var buf bytes.Buffer + buf.Write([]byte{0, 0}) // make room for size field + + for _, v := range r.ValuesByPosition { + rawValue, err := EncodeValue(v, v.Type(), -1) + if err != nil { + return nil, err + } + buf.Write(rawValue) + } + + data := buf.Bytes() + size := uint16(len(data) - 2) + binary.BigEndian.PutUint16(data, size) + + return data, nil +} diff --git a/embedded/sql/options.go b/embedded/sql/options.go index b00cbb05d2..ed0edbf36f 100644 --- a/embedded/sql/options.go +++ b/embedded/sql/options.go @@ -22,10 +22,14 @@ import ( "github.com/codenotary/immudb/embedded/store" ) -var defaultDistinctLimit = 1 << 20 // ~ 1mi rows +const ( + defaultDistinctLimit = 1 << 20 // ~ 1mi rows + defaultSortBufferSize = 1024 +) type Options struct { prefix []byte + sortBufferSize int distinctLimit int autocommit bool lazyIndexConstraintValidation bool @@ -35,7 +39,8 @@ type Options struct { func DefaultOptions() *Options { return &Options{ - distinctLimit: defaultDistinctLimit, + sortBufferSize: defaultSortBufferSize, + distinctLimit: defaultDistinctLimit, } } @@ -48,6 +53,10 @@ func (opts *Options) Validate() error { return fmt.Errorf("%w: invalid DistinctLimit value", store.ErrInvalidOptions) } + if opts.sortBufferSize <= 0 { + return fmt.Errorf("%w: invalid SortBufferSize value", store.ErrInvalidOptions) + } + return nil } @@ -75,3 +84,11 @@ func (opts *Options) WithMultiDBHandler(multidbHandler MultiDBHandler) *Options opts.multidbHandler = multidbHandler return opts } + +// WithSortBufferSize specifies the size of the buffer used to sort rows in-memory +// when executing queries containing an ORDER BY clause. The default value is 1024. +// Increasing this value improves sorting speed at the expense of higher memory usage. +func (opts *Options) WithSortBufferSize(size int) *Options { + opts.sortBufferSize = size + return opts +} diff --git a/embedded/sql/options_test.go b/embedded/sql/options_test.go index 78e08aa4b1..d751523601 100644 --- a/embedded/sql/options_test.go +++ b/embedded/sql/options_test.go @@ -42,5 +42,11 @@ func TestOptions(t *testing.T) { opts.WithAutocommit(true) require.True(t, opts.autocommit) + opts.WithSortBufferSize(0) + require.Error(t, opts.Validate()) + + opts.WithSortBufferSize(defaultSortBufferSize) + require.Equal(t, opts.sortBufferSize, defaultSortBufferSize) + require.NoError(t, opts.Validate()) } diff --git a/embedded/sql/row_reader.go b/embedded/sql/row_reader.go index c59e1a6f65..9817423803 100644 --- a/embedded/sql/row_reader.go +++ b/embedded/sql/row_reader.go @@ -45,6 +45,7 @@ type ScanSpecs struct { rangesByColID map[uint32]*typedValueRange IncludeHistory bool DescOrder bool + SortRequired bool } type Row struct { diff --git a/embedded/sql/sort_reader.go b/embedded/sql/sort_reader.go new file mode 100644 index 0000000000..62c8a97666 --- /dev/null +++ b/embedded/sql/sort_reader.go @@ -0,0 +1,208 @@ +/* +Copyright 2024 Codenotary Inc. All rights reserved. + +SPDX-License-Identifier: BUSL-1.1 +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://mariadb.com/bsl11/ + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sql + +import ( + "context" +) + +type sortRowReader struct { + rowReader RowReader + selectors []Selector + orderByDescriptors []ColDescriptor + sortKeysPositions []int + desc bool + sorter fileSorter + + resultReader resultReader +} + +func newSortRowReader(rowReader RowReader, selectors []Selector, desc bool) (*sortRowReader, error) { + if rowReader == nil || len(selectors) == 0 { + return nil, ErrIllegalArguments + } + + descriptors, err := rowReader.Columns(context.Background()) + if err != nil { + return nil, err + } + + colPosBySelector, err := getColPositionsBySelector(descriptors) + if err != nil { + return nil, err + } + + colTypes, err := getColTypes(rowReader) + if err != nil { + return nil, err + } + + sortKeysPositions := getSortKeysPositions(colPosBySelector, selectors, rowReader.TableAlias()) + + tx := rowReader.Tx() + sr := &sortRowReader{ + rowReader: rowReader, + orderByDescriptors: getOrderByDescriptors(descriptors, sortKeysPositions), + selectors: selectors, + desc: desc, + sortKeysPositions: sortKeysPositions, + sorter: fileSorter{ + colPosBySelector: colPosBySelector, + colTypes: colTypes, + tx: tx, + sortBufSize: tx.engine.sortBufferSize, + sortBuf: make([]*Row, tx.engine.sortBufferSize), + }, + } + + sr.sorter.cmp = func(t1, t2 Tuple) bool { + k1 := sr.extractSortKey(t1) + k2 := sr.extractSortKey(t2) + + res, _ := k1.Compare(k2) + if desc { + return res > 0 + } + return res <= 0 + } + return sr, nil +} + +func getColTypes(r RowReader) ([]string, error) { + descriptors, err := r.Columns(context.Background()) + if err != nil { + return nil, err + } + + cols := make([]string, len(descriptors)) + for i, desc := range descriptors { + cols[i] = desc.Type + } + return cols, err +} + +func getSortKeysPositions(colPosBySelector map[string]int, selectors []Selector, tableAlias string) []int { + sortKeysPositions := make([]int, len(selectors)) + for i, sel := range selectors { + aggFn, table, col := sel.resolve(tableAlias) + encSel := EncodeSelector(aggFn, table, col) + pos := colPosBySelector[encSel] + sortKeysPositions[i] = pos + } + return sortKeysPositions +} + +func getColPositionsBySelector(desc []ColDescriptor) (map[string]int, error) { + colPositionsBySelector := make(map[string]int) + for i, desc := range desc { + colPositionsBySelector[desc.Selector()] = i + } + return colPositionsBySelector, nil +} + +func (sr *sortRowReader) extractSortKey(t Tuple) Tuple { + sortKey := make([]TypedValue, len(sr.sortKeysPositions)) + for i, pos := range sr.sortKeysPositions { + sortKey[i] = t[pos] + } + return sortKey +} + +func getOrderByDescriptors(descriptors []ColDescriptor, sortKeysPositions []int) []ColDescriptor { + orderByDescriptors := make([]ColDescriptor, len(sortKeysPositions)) + for i, pos := range sortKeysPositions { + orderByDescriptors[i] = descriptors[pos] + } + return orderByDescriptors +} + +func (sr *sortRowReader) onClose(callback func()) { + sr.rowReader.onClose(callback) +} + +func (sr *sortRowReader) Tx() *SQLTx { + return sr.rowReader.Tx() +} + +func (sr *sortRowReader) TableAlias() string { + return sr.rowReader.TableAlias() +} + +func (sr *sortRowReader) Parameters() map[string]interface{} { + return sr.rowReader.Parameters() +} + +func (sr *sortRowReader) OrderBy() []ColDescriptor { + return sr.orderByDescriptors +} + +func (sr *sortRowReader) ScanSpecs() *ScanSpecs { + return sr.rowReader.ScanSpecs() +} + +func (sr *sortRowReader) Columns(ctx context.Context) ([]ColDescriptor, error) { + return sr.rowReader.Columns(ctx) +} + +func (sr *sortRowReader) colsBySelector(ctx context.Context) (map[string]ColDescriptor, error) { + return sr.rowReader.colsBySelector(ctx) +} + +func (sr *sortRowReader) InferParameters(ctx context.Context, params map[string]SQLValueType) error { + return sr.rowReader.InferParameters(ctx, params) +} + +func (sr *sortRowReader) Read(ctx context.Context) (*Row, error) { + if sr.resultReader == nil { + reader, err := sr.readAndSort(ctx) + if err != nil { + return nil, err + } + sr.resultReader = reader + } + return sr.resultReader.Read() +} + +func (sr *sortRowReader) readAndSort(ctx context.Context) (resultReader, error) { + err := sr.readAll(ctx) + if err != nil { + return nil, err + } + return sr.sorter.finalize() +} + +func (sr *sortRowReader) readAll(ctx context.Context) error { + for { + row, err := sr.rowReader.Read(ctx) + if err == ErrNoMoreRows { + return nil + } + + if err != nil { + return err + } + + err = sr.sorter.update(row) + if err != nil { + return err + } + } +} + +func (sr *sortRowReader) Close() error { + return sr.rowReader.Close() +} diff --git a/embedded/sql/sort_reader_test.go b/embedded/sql/sort_reader_test.go new file mode 100644 index 0000000000..c7aa0599b3 --- /dev/null +++ b/embedded/sql/sort_reader_test.go @@ -0,0 +1,73 @@ +/* +Copyright 2024 Codenotary Inc. All rights reserved. + +SPDX-License-Identifier: BUSL-1.1 +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://mariadb.com/bsl11/ + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sql + +import ( + "context" + "testing" + + "github.com/codenotary/immudb/embedded/store" + "github.com/stretchr/testify/require" +) + +func TestSortRowReader(t *testing.T) { + st, err := store.Open(t.TempDir(), store.DefaultOptions().WithMultiIndexing(true)) + require.NoError(t, err) + + engine, err := NewEngine(st, DefaultOptions().WithPrefix(sqlPrefix)) + require.NoError(t, err) + + _, err = newSortRowReader(nil, nil, false) + require.ErrorIs(t, err, ErrIllegalArguments) + + tx, err := engine.NewTx(context.Background(), DefaultTxOptions()) + require.NoError(t, err) + + _, _, err = engine.Exec(context.Background(), tx, "CREATE TABLE table1(id INTEGER, number INTEGER, PRIMARY KEY id)", nil) + require.NoError(t, err) + + tx, err = engine.NewTx(context.Background(), DefaultTxOptions()) + require.NoError(t, err) + + defer tx.Cancel() + + table := tx.catalog.tables[0] + + r, err := newRawRowReader(tx, nil, table, period{}, "", &ScanSpecs{Index: table.primaryIndex, SortRequired: true}) + require.NoError(t, err) + + sr, err := newSortRowReader(r, []Selector{&ColSelector{col: "number"}}, false) + require.NoError(t, err) + + orderBy := sr.OrderBy() + require.NotNil(t, orderBy) + require.Len(t, orderBy, 1) + require.Equal(t, "number", orderBy[0].Column) + require.Equal(t, "table1", orderBy[0].Table) + + cols, err := sr.Columns(context.Background()) + require.NoError(t, err) + require.Len(t, cols, 2) + + require.Empty(t, sr.Parameters()) + + scanSpecs := sr.ScanSpecs() + require.NotNil(t, scanSpecs) + require.NotNil(t, scanSpecs.Index) + require.True(t, scanSpecs.SortRequired) + require.True(t, scanSpecs.Index.IsPrimary()) +} diff --git a/embedded/sql/sql_tx.go b/embedded/sql/sql_tx.go index 683818aa86..cef090dc32 100644 --- a/embedded/sql/sql_tx.go +++ b/embedded/sql/sql_tx.go @@ -19,6 +19,7 @@ package sql import ( "context" "errors" + "os" "time" "github.com/codenotary/immudb/embedded/multierr" @@ -31,7 +32,8 @@ type SQLTx struct { opts *TxOptions - tx *store.OngoingTx + tx *store.OngoingTx + tempFiles []*os.File catalog *Catalog // in-mem catalog @@ -115,10 +117,14 @@ func (sqlTx *SQLTx) getWithPrefix(ctx context.Context, prefix, neq []byte) (key } func (sqlTx *SQLTx) Cancel() error { + defer sqlTx.removeTempFiles() + return sqlTx.tx.Cancel() } func (sqlTx *SQLTx) Commit(ctx context.Context) error { + defer sqlTx.removeTempFiles() + err := sqlTx.tx.RequireMVCCOnFollowingTxs(sqlTx.mutatedCatalog) if err != nil { return err @@ -157,3 +163,26 @@ func (sqlTx *SQLTx) addOnCommittedCallback(callback onCommittedCallback) error { return nil } + +func (sqlTx *SQLTx) createTempFile() (*os.File, error) { + tempFile, err := os.CreateTemp("", "immudb") + if err == nil { + sqlTx.tempFiles = append(sqlTx.tempFiles, tempFile) + } + return tempFile, err +} + +func (sqlTx *SQLTx) removeTempFiles() error { + for _, file := range sqlTx.tempFiles { + err := file.Close() + if err != nil { + return err + } + + err = os.Remove(file.Name()) + if err != nil { + return err + } + } + return nil +} diff --git a/embedded/sql/stmt.go b/embedded/sql/stmt.go index a939e4f5cd..fb90b0f9c9 100644 --- a/embedded/sql/stmt.go +++ b/embedded/sql/stmt.go @@ -1553,6 +1553,20 @@ type TypedValue interface { IsNull() bool } +type Tuple []TypedValue + +func (t Tuple) Compare(other Tuple) (int, error) { + i := 0 + for i < len(t) && i < len(other) { + res, err := t[i].Compare(other[i]) + if err != nil || res != 0 { + return res, err + } + i++ + } + return len(t) - len(other), nil +} + func NewNull(t SQLValueType) *NullValue { return &NullValue{t: t} } @@ -2438,30 +2452,6 @@ func (stmt *SelectStmt) execAt(ctx context.Context, tx *SQLTx, params map[string if len(stmt.orderBy) > 1 { return nil, ErrLimitedOrderBy } - - if len(stmt.orderBy) > 0 { - tableRef, ok := stmt.ds.(*tableRef) - if !ok { - return nil, ErrLimitedOrderBy - } - - table, err := tableRef.referencedTable(tx) - if err != nil { - return nil, err - } - - colName := stmt.orderBy[0].sel.col - - indexed, err := table.IsIndexed(colName) - if err != nil { - return nil, err - } - - if !indexed { - return nil, ErrLimitedOrderBy - } - } - return tx, nil } @@ -2493,6 +2483,13 @@ func (stmt *SelectStmt) Resolve(ctx context.Context, tx *SQLTx, params map[strin rowReader = newConditionalRowReader(rowReader, stmt.where) } + if scanSpecs.SortRequired { + rowReader, err = newSortRowReader(rowReader, stmt.orderBySelectors(), scanSpecs.DescOrder) + if err != nil { + return nil, err + } + } + containsAggregations := false for _, sel := range stmt.selectors { _, containsAggregations = sel.(*AggColSelector) @@ -2596,9 +2593,15 @@ func (stmt *SelectStmt) Alias() string { } func (stmt *SelectStmt) genScanSpecs(tx *SQLTx, params map[string]interface{}) (*ScanSpecs, error) { + sortingRequired := len(stmt.orderBy) > 0 + descOrder := sortingRequired && stmt.orderBy[0].descOrder + tableRef, isTableRef := stmt.ds.(*tableRef) if !isTableRef { - return nil, nil + return &ScanSpecs{ + SortRequired: sortingRequired, + DescOrder: descOrder, + }, nil } table, err := tableRef.referencedTable(tx) @@ -2614,39 +2617,12 @@ func (stmt *SelectStmt) genScanSpecs(tx *SQLTx, params map[string]interface{}) ( } } - var preferredIndex *Index - - if len(stmt.indexOn) > 0 { - cols := make([]*Column, len(stmt.indexOn)) - - for i, colName := range stmt.indexOn { - col, err := table.GetColumnByName(colName) - if err != nil { - return nil, err - } - - cols[i] = col - } - - index, err := table.GetIndexByName(indexName(table.name, cols)) - if err != nil { - return nil, err - } - - preferredIndex = index + preferredIndex, err := stmt.selectIndex(table) + if err != nil { + return nil, err } var sortingIndex *Index - var descOrder bool - - if stmt.orderBy == nil { - if preferredIndex == nil { - sortingIndex = table.primaryIndex - } else { - sortingIndex = preferredIndex - } - } - if len(stmt.orderBy) > 0 { col, err := table.GetColumnByName(stmt.orderBy[0].sel.col) if err != nil { @@ -2657,16 +2633,19 @@ func (stmt *SelectStmt) genScanSpecs(tx *SQLTx, params map[string]interface{}) ( if idx.sortableUsing(col.id, rangesByColID) { if preferredIndex == nil || idx.id == preferredIndex.id { sortingIndex = idx + sortingRequired = false break } } } - - descOrder = stmt.orderBy[0].descOrder } if sortingIndex == nil { - return nil, ErrNoAvailableIndex + if preferredIndex == nil { + sortingIndex = table.primaryIndex + } else { + sortingIndex = preferredIndex + } } if tableRef.history && !sortingIndex.IsPrimary() { @@ -2678,9 +2657,39 @@ func (stmt *SelectStmt) genScanSpecs(tx *SQLTx, params map[string]interface{}) ( rangesByColID: rangesByColID, IncludeHistory: tableRef.history, DescOrder: descOrder, + SortRequired: sortingRequired, }, nil } +func (stmt *SelectStmt) selectIndex(table *Table) (*Index, error) { + if len(stmt.indexOn) == 0 { + return nil, nil + } + + cols := make([]*Column, len(stmt.indexOn)) + for i, colName := range stmt.indexOn { + col, err := table.GetColumnByName(colName) + if err != nil { + return nil, err + } + + cols[i] = col + } + return table.GetIndexByName(indexName(table.name, cols)) +} + +func (stmt *SelectStmt) orderBySelectors() []Selector { + var selectors []Selector + for _, col := range stmt.orderBy { + sel := &ColSelector{ + table: col.sel.table, + col: col.sel.col, + } + selectors = append(selectors, sel) + } + return selectors +} + type UnionStmt struct { distinct bool left, right DataSource