-
Notifications
You must be signed in to change notification settings - Fork 5
/
writer.go
177 lines (135 loc) · 3.41 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package cdb
import (
"bufio"
"encoding/binary"
"io"
)
// slot (bucket)
type slot struct {
hash, position uint32
}
// hashTable is a linearly probed initialize hash table
type hashTable []slot
// writerImpl implements Writer interface
type writerImpl struct {
tables [tableNum]hashTable
writer io.WriteSeeker
buffer *bufio.Writer
hasher Hasher
begin, current int64
}
// newWriter returns pointer to new instance of writerImpl
func newWriter(writer io.WriteSeeker, hasher Hasher) (*writerImpl, error) {
startPosition := int64(tablesRefsSize)
begin, err := writer.Seek(0, io.SeekCurrent)
if err != nil {
return nil, err
}
if _, err = writer.Seek(startPosition, io.SeekStart); err != nil {
return nil, err
}
return &writerImpl{
writer: writer,
buffer: bufio.NewWriter(writer),
hasher: hasher,
begin: begin,
current: startPosition,
}, nil
}
// Put saves a new associated pair <key, value> into databases. Returns an error on failure.
func (w *writerImpl) Put(key, value []byte) error {
lenKey, lenValue := len(key), len(value)
if uint64(lenKey) > maxUint || uint64(lenValue) > maxUint {
return ErrOutOfMemory
}
if err := writePair(w.buffer, uint32(lenKey), uint32(lenValue)); err != nil {
return err
}
if err := binary.Write(w.buffer, binary.LittleEndian, key); err != nil {
return err
}
if err := binary.Write(w.buffer, binary.LittleEndian, value); err != nil {
return err
}
hashFunc := w.hasher()
hashFunc.Write(key)
h := hashFunc.Sum32()
table := w.tables[h%tableNum]
table = append(table, slot{h, uint32(w.current)})
w.tables[h%tableNum] = table
if err := w.addPos(8); err != nil {
return err
}
if err := w.addPos(lenKey); err != nil {
return err
}
if err := w.addPos(lenValue); err != nil {
return err
}
return nil
}
// Close commits database, makes it possible for reading.
func (w *writerImpl) Close() error {
w.buffer.Flush()
for _, table := range &w.tables {
n := uint32(len(table) << 1)
if n == 0 {
continue
}
slots := make(hashTable, n)
for _, slot := range table {
k := (slot.hash >> 8) % n
// Linear probing
for slots[k].position != 0 {
k = (k + 1) % n
}
slots[k].position = slot.position
slots[k].hash = slot.hash
}
for _, slot := range slots {
if err := writePair(w.writer, slot.hash, slot.position); err != nil {
return err
}
}
}
offset, err := w.writer.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
if _, err := w.writer.Seek(w.begin, io.SeekStart); err != nil {
return err
}
var pos uint32
for _, table := range &w.tables {
n := len(table) << 1
if n == 0 {
pos = 0
} else {
pos = uint32(w.current)
}
if err := writePair(w.writer, pos, uint32(n)); err != nil {
return err
}
if err := w.addPos(slotSize * n); err != nil {
return err
}
}
if _, err := w.writer.Seek(offset, io.SeekStart); err != nil {
return err
}
return nil
}
// addPos try to shift current position on len. Returns err when was attempt to create a database up to 4 gb
func (w *writerImpl) addPos(offset int) error {
newPos := w.current + int64(offset)
if newPos >= maxUint {
return ErrOutOfMemory
}
w.current = newPos
return nil
}
// writePair writes binary representation of two uint32 numbers to io.Writer
func writePair(writer io.Writer, a, b uint32) error {
var pairBuf = []uint32{a, b}
return binary.Write(writer, binary.LittleEndian, pairBuf)
}