Skip to content

Commit

Permalink
Support range-scan in CLI (#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored May 19, 2024
1 parent cb87fae commit 82bdb1c
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 1 deletion.
2 changes: 2 additions & 0 deletions cmd/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/streamnative/oxia/cmd/client/list"
"github.com/streamnative/oxia/cmd/client/notifications"
"github.com/streamnative/oxia/cmd/client/put"
"github.com/streamnative/oxia/cmd/client/rangescan"
oxiacommon "github.com/streamnative/oxia/common"
)

Expand All @@ -50,6 +51,7 @@ func init() {
Cmd.AddCommand(del.Cmd)
Cmd.AddCommand(get.Cmd)
Cmd.AddCommand(list.Cmd)
Cmd.AddCommand(rangescan.Cmd)
Cmd.AddCommand(deleterange.Cmd)
Cmd.AddCommand(notifications.Cmd)
}
2 changes: 1 addition & 1 deletion cmd/client/common/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (m *MockClient) List(_ context.Context, minKeyInclusive string, maxKeyExclu

func (m *MockClient) RangeScan(_ context.Context, minKeyInclusive string, maxKeyExclusive string, options ...oxia.RangeScanOption) <-chan oxia.GetResult {
args := m.MethodCalled("RangeScan", minKeyInclusive, maxKeyExclusive, options)
arg0, ok := args.Get(0).(<-chan oxia.GetResult)
arg0, ok := args.Get(0).(chan oxia.GetResult)
if !ok {
panic("cast failed")
}
Expand Down
118 changes: 118 additions & 0 deletions cmd/client/rangescan/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2023 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rangescan

import (
"context"
"time"

"github.com/streamnative/oxia/oxia"

"github.com/spf13/cobra"

"github.com/streamnative/oxia/cmd/client/common"
)

var (
Config = flags{}
)

type flags struct {
keyMin string
keyMax string
hexDump bool
includeVersion bool
partitionKey string
}

func (flags *flags) Reset() {
flags.keyMin = ""
flags.keyMax = ""
flags.hexDump = false
flags.includeVersion = false
flags.partitionKey = ""
}

func init() {
Cmd.Flags().StringVarP(&Config.keyMin, "key-min", "s", "", "Key range minimum (inclusive)")
Cmd.Flags().StringVarP(&Config.keyMax, "key-max", "e", "", "Key range maximum (exclusive)")
Cmd.Flags().BoolVarP(&Config.includeVersion, "include-version", "v", false, "Include the record version object")
Cmd.Flags().BoolVar(&Config.hexDump, "hex", false, "Print the value in HexDump format")
Cmd.Flags().StringVarP(&Config.partitionKey, "partition-key", "p", "", "Partition Key to be used in override the shard routing")
}

var Cmd = &cobra.Command{
Use: "range-scan",
Short: "Scan records",
Long: `Scan all the records whose keys are in the specified range.`,
Args: cobra.NoArgs,
RunE: exec,
}

const lineSeparator = "-------------------------------------------------------------------------------\n"

func exec(cmd *cobra.Command, _ []string) error {
client, err := common.Config.NewClient()
if err != nil {
return err
}

var options []oxia.RangeScanOption
if Config.partitionKey != "" {
options = append(options, oxia.PartitionKey(Config.partitionKey))
}

if Config.keyMax == "" {
// By default, do not list internal keys
Config.keyMax = "__oxia/"
}

ch := client.RangeScan(context.Background(), Config.keyMin, Config.keyMax, options...)

isFirst := true
for result := range ch {
if result.Err != nil {
return result.Err
}

if !isFirst {
_, _ = cmd.OutOrStdout().Write([]byte(lineSeparator))
}

isFirst = false
if Config.hexDump {
common.WriteHexDump(cmd.OutOrStdout(), result.Value)
} else {
common.WriteOutput(cmd.OutOrStdout(), result.Value)
}

if Config.includeVersion {
_, _ = cmd.OutOrStdout().Write([]byte("---\n"))

version := result.Version
common.WriteOutput(cmd.OutOrStdout(), common.OutputVersion{
Key: result.Key,
VersionId: version.VersionId,
CreatedTimestamp: time.UnixMilli(int64(version.CreatedTimestamp)),
ModifiedTimestamp: time.UnixMilli(int64(version.ModifiedTimestamp)),
ModificationsCount: version.ModificationsCount,
Ephemeral: version.Ephemeral,
ClientIdentity: version.ClientIdentity,
})
}
}

return nil
}
79 changes: 79 additions & 0 deletions cmd/client/rangescan/cmd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2023 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rangescan

import (
"bytes"
"strings"
"testing"

"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"

"github.com/streamnative/oxia/oxia"

"github.com/streamnative/oxia/cmd/client/common"
)

func runCmd(cmd *cobra.Command, args string, stdin string) (string, error) {
actual := new(bytes.Buffer)
cmd.SetIn(bytes.NewBufferString(stdin))
cmd.SetOut(actual)
cmd.SetErr(actual)
cmd.SetArgs(strings.Split(args, " "))
err := cmd.Execute()
Config.Reset()
return strings.TrimSpace(actual.String()), err
}

func TestRangeScan_exec(t *testing.T) {
var emptyOptions []oxia.RangeScanOption
for _, test := range []struct {
name string
args string
expectedParameters []any
results []string
}{
{"range", "--key-min a --key-max c", []any{"a", "c", emptyOptions}, []string{"a", "b"}},
{"short", "-s a -e c", []any{"a", "c", emptyOptions}, []string{"a", "b"}},
{"range-no-min", "--key-max c", []any{"", "c", emptyOptions}, []string{"a", "b"}},
{"range-no-max", "--key-min a", []any{"a", "__oxia/", emptyOptions}, []string{"a", "b", "c"}},
{"partition-key", "-s a -e c -p xyz", []any{"a", "c", []oxia.RangeScanOption{oxia.PartitionKey("xyz")}}, []string{"a", "b"}},
} {
t.Run(test.name, func(t *testing.T) {
common.MockedClient = common.NewMockClient()

ch := make(chan oxia.GetResult, 100)
for _, res := range test.results {
ch <- oxia.GetResult{
Key: res,
Value: []byte(res),
Version: oxia.Version{},
Err: nil,
}
}
close(ch)

common.MockedClient.On("RangeScan", test.expectedParameters...).Return(ch)
out, err := runCmd(Cmd, test.args, "")
assert.NoError(t, err)

expectedOut := strings.Join(test.results, "\n"+lineSeparator)
assert.Equal(t, expectedOut, out)

common.MockedClient.AssertExpectations(t)
})
}
}

0 comments on commit 82bdb1c

Please sign in to comment.