forked from krakend/krakend-amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_test.go
123 lines (107 loc) · 2.87 KB
/
client_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// +build integration
package amqp
import (
"bytes"
"context"
"flag"
"fmt"
"io/ioutil"
"testing"
"time"
"github.com/luraproject/lura/config"
"github.com/luraproject/lura/encoding"
"github.com/luraproject/lura/logging"
"github.com/luraproject/lura/proxy"
)
var (
rabbitmqHost *string = flag.String("rabbitmq", "localhost", "The host of the rabbitmq server")
totalIterations *int = flag.Int("iterations", 10000, "The number of produce and consume iterations")
)
func Test(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
batchSize := 10
buf := new(bytes.Buffer)
l, _ := logging.NewLogger("DEBUG", buf, "")
defer func() {
fmt.Println(buf.String())
}()
bf := NewBackendFactory(ctx, l, func(_ *config.Backend) proxy.Proxy {
t.Error("this backend factory shouldn't be called")
return proxy.NoopProxy
})
amqpHost := fmt.Sprintf("amqp://guest:guest@%s:5672", *rabbitmqHost)
consumerProxy := bf(&config.Backend{
Host: []string{amqpHost},
ExtraConfig: config.ExtraConfig{
consumerNamespace: map[string]interface{}{
"name": "queue-1",
"exchange": "some-exchange",
"durable": true,
"delete": false,
"exclusive": false,
"no_wait": true,
"auto_ack": false,
"no_local": false,
"routing_key": []string{"#"},
"prefetch_count": batchSize,
},
},
Decoder: encoding.JSONDecoder,
})
producerProxy := bf(&config.Backend{
Host: []string{amqpHost},
ExtraConfig: config.ExtraConfig{
producerNamespace: map[string]interface{}{
"name": "queue-1",
"exchange": "some-exchange",
"durable": true,
"delete": false,
"exclusive": false,
"no_wait": true,
"mandatory": true,
"immediate": false,
},
},
})
fmt.Println("proxies created. starting the test")
for i := 0; i < *totalIterations; i++ {
resp, err := producerProxy(ctx, &proxy.Request{
Headers: map[string][]string{
"Content-Type": {"application/json"},
},
Params: map[string]string{"routing_key": "some_value"},
Body: ioutil.NopCloser(bytes.NewBufferString(fmt.Sprintf("{\"foo\":\"bar\",\"some\":%d}", i))),
})
if err != nil {
t.Error(err)
return
}
if resp == nil || !resp.IsComplete {
t.Errorf("unexpected response %v", resp)
return
}
}
for i := 0; i < *totalIterations; i++ {
localCtx, cancel := context.WithTimeout(ctx, time.Second)
resp, err := consumerProxy(localCtx, nil)
cancel()
if err != nil {
t.Errorf("#%d: unexpected error %s", i, err.Error())
return
}
if resp == nil || !resp.IsComplete {
t.Errorf("#%d: unexpected response %v", i, resp)
return
}
res, ok := resp.Data["foo"]
if !ok {
t.Errorf("#%d: unexpected response %v", i, resp)
return
}
if v, ok := res.(string); !ok || v != "bar" {
t.Errorf("#%d: unexpected response %v", i, resp)
return
}
}
}