forked from tolsen/mongonet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wire.go
171 lines (135 loc) · 2.87 KB
/
wire.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package mongonet
import (
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"fmt"
)
const (
OP_REPLY = 1
OP_MSG_LEGACY = 1000
OP_UPDATE = 2001
OP_INSERT = 2002
RESERVED = 2003
OP_QUERY = 2004
OP_GET_MORE = 2005
OP_DELETE = 2006
OP_KILL_CURSORS = 2007
OP_COMMAND = 2010
OP_COMMAND_REPLY = 2011
OP_MSG = 2013
)
type MessageHeader struct {
Size int32 // total message size
RequestID int32
ResponseTo int32
OpCode int32
}
func (h *MessageHeader) WriteInto(buf []byte) {
writeInt32(h.Size, buf, 0)
writeInt32(h.RequestID, buf, 4)
writeInt32(h.ResponseTo, buf, 8)
writeInt32(h.OpCode, buf, 12)
}
// ------------
type Message interface {
Header() MessageHeader
Serialize() []byte
HasResponse() bool
IsExhaust() bool
}
// OP_REPLY
type ReplyMessage struct {
header MessageHeader
Flags int32
CursorId int64
StartingFrom int32
NumberReturned int32
Docs []SimpleBSON
}
func (rm *ReplyMessage) CommandDoc() bsoncore.Document {
if len(rm.Docs) == 0 {
return nil
}
return bsoncore.Document(rm.Docs[0].BSON)
}
// OP_UPDATE
type UpdateMessage struct {
header MessageHeader
Reserved int32
Namespace string
Flags int32
Filter SimpleBSON
Update SimpleBSON
}
// OP_QUERY
type QueryMessage struct {
header MessageHeader
Flags int32
Namespace string
Skip int32
NReturn int32
Query SimpleBSON
Project SimpleBSON
}
// OP_GET_MORE
type GetMoreMessage struct {
header MessageHeader
Reserved int32
Namespace string
NReturn int32
CursorId int64
}
// OP_INSERT
type InsertMessage struct {
header MessageHeader
Flags int32
Namespace string
Docs []SimpleBSON
}
// OP_DELETE
type DeleteMessage struct {
header MessageHeader
Reserved int32
Namespace string
Flags int32
Filter SimpleBSON
}
// OP_KILL_CURSORS
type KillCursorsMessage struct {
header MessageHeader
Reserved int32
NumCursors int32
CursorIds []int64
}
// OP_COMMAND
type CommandMessage struct {
header MessageHeader
DB string
CmdName string
CommandArgs SimpleBSON
Metadata SimpleBSON
InputDocs []SimpleBSON
}
// OP_COMMAND_REPLY
type CommandReplyMessage struct {
header MessageHeader
CommandReply SimpleBSON
Metadata SimpleBSON
OutputDocs []SimpleBSON
}
// OP_MSG
// Note that checksum is not implemented
type MessageMessage struct {
MsgHeader MessageHeader
FlagBits int32
Sections []MessageMessageSection
}
func (mm *MessageMessage) BodyDoc() (bsoncore.Document, error) {
// when parsed in in parseMessageMessage we checked that there was exactly one BodySection
// as such we can stop as soon as we find it
for _, sec := range mm.Sections {
if bodySection, ok := sec.(*BodySection); ok && bodySection != nil {
return bsoncore.Document(bodySection.Body.BSON), nil
}
}
return nil, fmt.Errorf("no body section found for OP_MSG")
}