Skip to content

Commit

Permalink
Merge pull request #446 from GoRethink/develop
Browse files Browse the repository at this point in the history
External custom marshalers
  • Loading branch information
CMogilko authored Aug 29, 2018
2 parents 9b148cc + b0a1d0a commit 64076e7
Show file tree
Hide file tree
Showing 20 changed files with 573 additions and 174 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## v4.1.0 - 2018-08-29

### Fixed

- Rare `Connection` leaks if socket errors occurred
- Updated `ql2.proto` file from rethinkdb repo

### Added

- Support for independent custom type marshalers

## v4.0.0 - 2017-12-14

### Fixed
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

![GoRethink Logo](https://raw.github.com/wiki/gorethink/gorethink/gopher-and-thinker-s.png "Golang Gopher and RethinkDB Thinker")

Current version: v4.0.0 (RethinkDB v2.3)
Current version: v4.1.0 (RethinkDB v2.3)

<!-- This project is no longer maintained, for more information see the [v3.0.0 release](https://github.com/gorethink/gorethink/releases/tag/v3.0.0)-->

Expand Down Expand Up @@ -40,7 +40,7 @@ import (

func Example() {
session, err := r.Connect(r.ConnectOpts{
Address: url,
Address: url, // endpoint without http
})
if err != nil {
log.Fatalln(err)
Expand Down Expand Up @@ -400,6 +400,8 @@ Sometimes the default behaviour for converting Go types to and from ReQL is not

An good example of how to use these interfaces is in the [`types`](https://github.com/gorethink/gorethink/blob/master/types/geometry.go#L84-L106) package, in this package the `Point` type is encoded as the `GEOMETRY` pseudo-type instead of a normal JSON object.

On the other side, you can implement external encode/decode functions with [`SetTypeEncoding`](https://godoc.org/github.com/gorethink/gorethink/encoding#SetTypeEncoding) function.

## Logging

By default the driver logs are disabled however when enabled the driver will log errors when it fails to connect to the database. If you would like more verbose error logging you can call `r.SetVerbose(true)`.
Expand Down
2 changes: 1 addition & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
"github.com/cenkalti/backoff"
"github.com/hailocab/go-hostpool"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

Expand Down
8 changes: 4 additions & 4 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"sync/atomic"
"time"

"golang.org/x/net/context"
p "gopkg.in/gorethink/gorethink.v4/ql2"
"sync"
"bytes"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"bytes"
"golang.org/x/net/context"
p "gopkg.in/gorethink/gorethink.v4/ql2"
"sync"
)

const (
Expand Down
42 changes: 21 additions & 21 deletions connection_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package gorethink

import (
test "gopkg.in/check.v1"
p "gopkg.in/gorethink/gorethink.v4/ql2"
"golang.org/x/net/context"
"encoding/binary"
"encoding/json"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"golang.org/x/net/context"
test "gopkg.in/check.v1"
p "gopkg.in/gorethink/gorethink.v4/ql2"
"io"
"time"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/opentracing/opentracing-go"
)

type ConnectionSuite struct{}
Expand Down Expand Up @@ -52,7 +52,7 @@ func (s *ConnectionSuite) TestConnection_Query_Ok(c *test.C) {
func (s *ConnectionSuite) TestConnection_Query_DefaultDBOk(c *test.C) {
ctx := context.Background()
token := int64(1)
q := testQuery(Table("table").Get("id"),)
q := testQuery(Table("table").Get("id"))
q2 := q
q2.Opts["db"], _ = DB("db").Build()
writeData := serializeQuery(token, q2)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (s *ConnectionSuite) TestConnection_Query_NoReplyOk(c *test.C) {
connection := newConnection(conn, "addr", &ConnectOpts{})
connection.runConnection()
response, cursor, err := connection.Query(nil, q)
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)
connection.Close()

c.Assert(response, test.IsNil)
Expand Down Expand Up @@ -247,9 +247,9 @@ func (s *ConnectionSuite) TestConnection_processResponses_SocketErr(c *test.C) {
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 1}, promise: promise1}
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 2}, promise: promise2}
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 2}, promise: promise3}
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)
connection.responseChan <- responseAndError{err: io.EOF}
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)

select {
case f := <-promise1:
Expand Down Expand Up @@ -284,9 +284,9 @@ func (s *ConnectionSuite) TestConnection_processResponses_StopOk(c *test.C) {

connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 1}, promise: promise1}
close(connection.responseChan)
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)
close(connection.stopReadChan)
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)

select {
case f := <-promise1:
Expand All @@ -299,7 +299,7 @@ func (s *ConnectionSuite) TestConnection_processResponses_StopOk(c *test.C) {

func (s *ConnectionSuite) TestConnection_processResponses_ResponseFirst(c *test.C) {
promise1 := make(chan responseAndCursor, 1)
response1 := &Response{Token:1, Type: p.Response_RUNTIME_ERROR, ErrorType: p.Response_INTERNAL}
response1 := &Response{Token: 1, Type: p.Response_RUNTIME_ERROR, ErrorType: p.Response_INTERNAL}

conn := &connMock{}
conn.On("Close").Return(nil)
Expand All @@ -309,11 +309,11 @@ func (s *ConnectionSuite) TestConnection_processResponses_ResponseFirst(c *test.
go connection.processResponses()

connection.responseChan <- responseAndError{response: response1}
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 1}, promise: promise1}
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)
connection.Close()
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)

select {
case f := <-promise1:
Expand Down Expand Up @@ -437,8 +437,8 @@ func (s *ConnectionSuite) TestConnection_processResponse_FirstPartialOk(c *test.
ctx := context.Background()
token := int64(3)
q := Query{Token: token}
rawResponse1 := json.RawMessage{1,2,3}
rawResponse2 := json.RawMessage{3,4,5}
rawResponse1 := json.RawMessage{1, 2, 3}
rawResponse2 := json.RawMessage{3, 4, 5}
response := &Response{Token: token, Type: p.Response_SUCCESS_PARTIAL, Responses: []json.RawMessage{rawResponse1, rawResponse2}}

connection := newConnection(nil, "addr", &ConnectOpts{})
Expand All @@ -463,8 +463,8 @@ func (s *ConnectionSuite) TestConnection_processResponse_PartialOk(c *test.C) {
token := int64(3)
term := Table("test")
q := Query{Token: token}
rawResponse1 := json.RawMessage{1,2,3}
rawResponse2 := json.RawMessage{3,4,5}
rawResponse1 := json.RawMessage{1, 2, 3}
rawResponse2 := json.RawMessage{3, 4, 5}
response := &Response{Token: token, Type: p.Response_SUCCESS_PARTIAL, Responses: []json.RawMessage{rawResponse1, rawResponse2}}

connection := newConnection(nil, "addr", &ConnectOpts{})
Expand All @@ -491,8 +491,8 @@ func (s *ConnectionSuite) TestConnection_processResponse_SequenceOk(c *test.C) {

token := int64(3)
q := Query{Token: token}
rawResponse1 := json.RawMessage{1,2,3}
rawResponse2 := json.RawMessage{3,4,5}
rawResponse1 := json.RawMessage{1, 2, 3}
rawResponse2 := json.RawMessage{3, 4, 5}
response := &Response{Token: token, Type: p.Response_SUCCESS_SEQUENCE, Responses: []json.RawMessage{rawResponse1, rawResponse2}}

connection := newConnection(nil, "addr", &ConnectOpts{})
Expand Down
2 changes: 1 addition & 1 deletion cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"reflect"
"sync"

"github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
"gopkg.in/gorethink/gorethink.v4/encoding"
p "gopkg.in/gorethink/gorethink.v4/ql2"
"github.com/opentracing/opentracing-go"
)

var (
Expand Down
18 changes: 8 additions & 10 deletions encoding/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

var byteSliceType = reflect.TypeOf([]byte(nil))

type decoderFunc func(dv reflect.Value, sv reflect.Value)
type decoderFunc func(dv reflect.Value, sv reflect.Value) error

// Decode decodes map[string]interface{} into a struct. The first parameter
// must be a pointer.
Expand Down Expand Up @@ -54,18 +54,16 @@ func decode(dst interface{}, src interface{}, blank bool) (err error) {
}
}

decodeValue(dv, sv, blank)
return nil
return decodeValue(dv, sv, blank)
}

// decodeValue decodes the source value into the destination value
func decodeValue(dv, sv reflect.Value, blank bool) {
valueDecoder(dv, sv, blank)(dv, sv)
func decodeValue(dv, sv reflect.Value, blank bool) error {
return valueDecoder(dv, sv, blank)(dv, sv)
}

type decoderCacheKey struct {
dt, st reflect.Type
blank bool
}

var decoderCache struct {
Expand All @@ -90,7 +88,7 @@ func valueDecoder(dv, sv reflect.Value, blank bool) decoderFunc {

func typeDecoder(dt, st reflect.Type, blank bool) decoderFunc {
decoderCache.RLock()
f := decoderCache.m[decoderCacheKey{dt, st, blank}]
f := decoderCache.m[decoderCacheKey{dt, st}]
decoderCache.RUnlock()
if f != nil {
return f
Expand All @@ -103,9 +101,9 @@ func typeDecoder(dt, st reflect.Type, blank bool) decoderFunc {
decoderCache.Lock()
var wg sync.WaitGroup
wg.Add(1)
decoderCache.m[decoderCacheKey{dt, st, blank}] = func(dv, sv reflect.Value) {
decoderCache.m[decoderCacheKey{dt, st}] = func(dv, sv reflect.Value) error {
wg.Wait()
f(dv, sv)
return f(dv, sv)
}
decoderCache.Unlock()

Expand All @@ -114,7 +112,7 @@ func typeDecoder(dt, st reflect.Type, blank bool) decoderFunc {
f = newTypeDecoder(dt, st, blank)
wg.Done()
decoderCache.Lock()
decoderCache.m[decoderCacheKey{dt, st, blank}] = f
decoderCache.m[decoderCacheKey{dt, st}] = f
decoderCache.Unlock()
return f
}
Expand Down
Loading

0 comments on commit 64076e7

Please sign in to comment.