-
Notifications
You must be signed in to change notification settings - Fork 1
/
stdin2immudb.go
163 lines (154 loc) · 4.41 KB
/
stdin2immudb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package main
import (
"bufio"
"bytes"
"context"
"flag"
"fmt"
"github.com/codenotary/immudb/pkg/api/schema"
immuclient "github.com/codenotary/immudb/pkg/client"
"log"
"os"
"time"
)
var config struct {
IpAddr string
Port int
Username string
Password string
DBName string
BatchSize int
Offset int
Prefix string
RBack bool
TStamp bool
Verbose bool
}
func cfginit() {
flag.StringVar(&config.IpAddr, "addr", "", "IP address of immudb server")
flag.IntVar(&config.Port, "port", 3322, "Port number of immudb server")
flag.StringVar(&config.Username, "user", "immudb", "Username for authenticating to immudb")
flag.StringVar(&config.Password, "pass", "immudb", "Password for authenticating to immudb")
flag.StringVar(&config.DBName, "db", "defaultdb", "Name of the database to use")
flag.IntVar(&config.BatchSize, "batchsize", 1000, "Batch size")
flag.IntVar(&config.Offset, "offset", 0, "Initial counter value")
flag.StringVar(&config.Prefix, "prefix", "LINE", "Prefix for Key generation")
flag.BoolVar(&config.RBack, "readback", false, "Don't write, read back instead (and check value)")
flag.BoolVar(&config.TStamp, "timestamp", false, "Use current epoch as numeric part of the key (instead a progressive integer)")
flag.BoolVar(&config.Verbose, "verbose", false, "Set verbose output")
flag.Parse()
}
func connect() (client immuclient.ImmuClient, ctx context.Context) {
opts := immuclient.DefaultOptions().WithAddress(config.IpAddr).WithPort(config.Port)
client = immuclient.NewClient().WithOptions(opts)
ctx = context.Background()
err := client.OpenSession(ctx, []byte(config.Username), []byte(config.Password), config.DBName)
if err != nil {
log.Fatalln("Failed to use the database. Reason:", err)
}
return
}
func inserter(ch chan string, out chan bool) {
client, ctx := connect()
kvs := make([]*schema.KeyValue, config.BatchSize)
var idx = 0
var cnt = 0
t0 := time.Now()
for line := range ch {
var keystr string
if config.TStamp {
keystr = fmt.Sprintf("%s%.14d.%.4d", config.Prefix, time.Now().UnixMilli(), cnt)
} else {
keystr = fmt.Sprintf("%s%.9d", config.Prefix, idx+config.Offset)
}
kvs[cnt] = &schema.KeyValue{
Key: []byte(keystr),
Value: []byte(line),
}
idx++
cnt++
if cnt == config.BatchSize {
kvList := &schema.SetRequest{KVs: kvs}
if _, err := client.SetAll(ctx, kvList); err != nil {
log.Fatalln("Failed to submit the batch. Reason:", err)
} else if config.Verbose {
log.Printf("Inserted %d lines", idx)
}
cnt = 0
}
}
if cnt > 0 {
kvs = kvs[:cnt]
kvList := &schema.SetRequest{KVs: kvs}
if _, err := client.SetAll(ctx, kvList); err != nil {
log.Fatalln("Failed to submit the batch. Reason:", err)
} else if config.Verbose {
log.Printf("Inserted %d lines", idx)
}
}
log.Printf("DONE: inserted %d lines in %s", idx, time.Now().Sub(t0))
out <- true
}
func checker(ch chan string, out chan bool) {
client, ctx := connect()
keys := make([][]byte, config.BatchSize)
vals := make([][]byte, config.BatchSize)
var idx = 0
var cnt = 0
t0 := time.Now()
for line := range ch {
keys[cnt] = []byte(fmt.Sprintf("%s%.9d", config.Prefix, idx+config.Offset))
vals[cnt] = []byte(line)
idx++
cnt++
if cnt == config.BatchSize {
readback, err := client.GetAll(ctx, keys)
if err != nil {
log.Fatalln("Failed to read the batch. Reason:", err)
} else if config.Verbose {
log.Printf("Read %d lines", idx)
}
for j := 0; j < cnt; j++ {
if bytes.Compare(vals[j], readback.Entries[j].Value) != 0 {
log.Fatal("Mismatch %s <> %s", string(vals[j]), string(readback.Entries[j].Value))
}
}
cnt = 0
}
}
if cnt > 0 {
keys = keys[:cnt]
readback, err := client.GetAll(ctx, keys)
if err != nil {
log.Fatalln("Failed to read the batch. Reason:", err)
} else if config.Verbose {
log.Printf("Read %d lines", idx)
}
for j := 0; j < cnt; j++ {
if bytes.Compare(vals[j], readback.Entries[j].Value) != 0 {
log.Fatal("Mismatch %s <> %s", string(vals[j]), string(readback.Entries[j].Value))
}
}
}
log.Printf("DONE: read %d lines in %s", idx, time.Now().Sub(t0))
out <- true
}
func main() {
cfginit()
ch := make(chan string)
out := make(chan bool)
if config.RBack {
go checker(ch, out)
} else {
go inserter(ch, out)
}
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
ch <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Println(err)
}
close(ch)
<-out
}