-
Notifications
You must be signed in to change notification settings - Fork 1
/
runner.go
319 lines (256 loc) · 7.55 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
package tob
import (
"context"
"errors"
"fmt"
"net/url"
"time"
"github.com/telkomdev/tob/config"
"github.com/telkomdev/tob/services/airflow"
"github.com/telkomdev/tob/services/diskstatus"
"github.com/telkomdev/tob/services/dummy"
"github.com/telkomdev/tob/services/elasticsearch"
"github.com/telkomdev/tob/services/kafka"
"github.com/telkomdev/tob/services/mongodb"
"github.com/telkomdev/tob/services/mysqldb"
"github.com/telkomdev/tob/services/oracle"
"github.com/telkomdev/tob/services/postgres"
"github.com/telkomdev/tob/services/redisdb"
"github.com/telkomdev/tob/services/web"
)
// Runner the tob runner
type Runner struct {
configs config.Config
services map[string]Service
stopChan chan bool
verbose bool
initialized bool
notificators []Notificator
waiter Waiter
}
// NewRunner Runner's constructor
func NewRunner(notificators []Notificator, configs config.Config, verbose bool) (*Runner, error) {
runner := new(Runner)
runner.configs = configs
stopChan := make(chan bool, 1)
runner.stopChan = stopChan
services := make(map[string]Service)
runner.services = services
runner.verbose = verbose
runner.notificators = notificators
return runner, nil
}
func initServiceKind(serviceKind ServiceKind, verbose bool) (Service, bool) {
services := make(map[ServiceKind]Service)
services[Airflow] = airflow.NewAirflow(verbose, Logger)
services[AirflowFlower] = airflow.NewAirflowFlower(verbose, Logger)
services[Dummy] = dummy.NewDummy(verbose, Logger)
services[DiskStatus] = diskstatus.NewDiskStatus(verbose, Logger)
services[Kafka] = kafka.NewKafka(verbose, Logger)
services[MongoDB] = mongodb.NewMongo(verbose, Logger)
services[MySQL] = mysqldb.NewMySQL(verbose, Logger)
services[Postgresql] = postgres.NewPostgres(verbose, Logger)
services[Oracle] = oracle.NewOracle(verbose, Logger)
services[Redis] = redisdb.NewRedis(verbose, Logger)
services[Web] = web.NewWeb(verbose, Logger)
services[Elasticsearch] = elasticsearch.NewElasticsearch(verbose, Logger)
s, ok := services[serviceKind]
return s, ok
}
// Add will add new service to Runner
func (r *Runner) Add(service Service) {
if service != nil {
r.services[service.Name()] = service
}
}
// InitServices will init initial services
func (r *Runner) InitServices() error {
serviceConfigInterface, ok := r.configs["service"]
if !ok {
return errors.New("field service not found in config file")
}
serviceConfigs, ok := serviceConfigInterface.(map[string]interface{})
if !ok {
return errors.New("invalid config file")
}
totalServiceToBeExecuted := 0
for name, confInterface := range serviceConfigs {
conf, ok := confInterface.(map[string]interface{})
if !ok {
return errors.New("invalid config file")
}
Logger.Println(name)
urlStr, ok := conf["url"].(string)
if !ok {
return errors.New("invalid config file")
}
serviceKind, ok := conf["kind"].(string)
if !ok {
return errors.New("invalid config file")
}
checkIntervalF, ok := conf["checkInterval"].(float64)
if !ok {
return errors.New("invalid config file")
}
// convert to int
checkInterval := int(checkIntervalF)
// set default checkInterval
if checkInterval <= 0 {
// set check interval to 5 minutes
checkInterval = 5000
}
serviceEnabled, ok := conf["enable"].(bool)
if !ok {
return errors.New("invalid config file")
}
if s, ok := initServiceKind(ServiceKind(serviceKind), r.verbose); ok {
r.services[name] = s
}
if service, ok := r.services[name]; ok && service != nil && serviceEnabled {
// validate and parse urlStr
_, err := url.Parse(urlStr)
if err != nil {
return err
}
service.SetURL(urlStr)
service.SetCheckInterval(checkInterval)
service.Enable(serviceEnabled)
service.SetConfig(conf)
// by default service is recovered
service.SetRecover(true)
err = service.Connect()
if err != nil {
return err
}
totalServiceToBeExecuted++
}
}
// set initialized to true
r.initialized = true
// set waiter capacity with amount of service to be executed
r.waiter = newWaiter(uint(totalServiceToBeExecuted))
if r.verbose {
Logger.Printf("total service to be executed: %d\n", uint(totalServiceToBeExecuted))
}
return nil
}
func healthCheck(n string, s Service, t *time.Ticker, waiter Waiter, notificators []Notificator) {
for {
select {
case <-s.Stop():
Logger.Printf("runner service %s received stop channel, cleanup resource now !!\n", n)
// stop ticker
t.Stop()
// tell waiter this service execution is done
waiter.Done()
return
case <-t.C:
// set message to empty
s.SetMessage("")
resp := s.Ping()
respStr := string(resp)
if respStr == NotOk && s.IsRecover() {
// set last downtime
s.SetLastDownTimeNow()
// set recover to false
s.SetRecover(false)
notificatorMessage := fmt.Sprintf("%s is DOWN", n)
if s.GetMessage() != "" {
notificatorMessage = fmt.Sprintf("%s is DOWN | %s", n, s.GetMessage())
}
for _, notificator := range notificators {
if notificator.IsEnabled() {
err := notificator.Send(notificatorMessage)
if err != nil {
Logger.Printf("notificator %s error: %s", notificator.Provider(), err.Error())
}
}
}
}
if respStr == OK && !s.IsRecover() {
// set recover to true
s.SetRecover(true)
notificatorMessage := fmt.Sprintf("%s is UP. It was down for %s", n, s.GetDownTimeDiff())
if s.GetMessage() != "" {
notificatorMessage = fmt.Sprintf("%s %s", n, s.GetMessage())
}
for _, notificator := range notificators {
if notificator.IsEnabled() {
err := notificator.Send(notificatorMessage)
if err != nil {
Logger.Printf("notificator %s error: %s\n", notificator.Provider(), err.Error())
}
}
}
}
Logger.Printf("%s => %s\n", n, respStr)
}
}
}
// Run will Run the tob Runner
func (r *Runner) Run(ctx context.Context) {
if !r.initialized {
panic("service not initialized yet")
}
if r.notificators == nil || len(r.notificators) <= 0 {
panic("notificator cannot be nil")
}
// close waiter's channel indicates that no more values will be sent on it
defer func() { r.waiter.Close() }()
for name, service := range r.services {
if service != nil && service.IsEnabled() {
ticker := time.NewTicker(time.Second * time.Duration(service.GetCheckInterval()))
// run all services health check on its goroutine
go healthCheck(name, service, ticker, r.waiter, r.notificators)
}
}
// block here
for {
// The try-receive operation here is to
// try to exit the worker goroutine as
// early as possible. Try-receive
// optimized by the standard Go
// compiler, so they are very efficient.
select {
case <-ctx.Done():
Logger.Println("runner context canceled")
r.cleanup()
// wait all service's goroutine to stop
r.waiter.Wait()
return
default:
}
select {
case <-r.stopChan:
Logger.Println("runner received stop channel, cleanup resource now !!")
r.cleanup()
// wait all service's goroutine to stop
r.waiter.Wait()
return
case <-ctx.Done():
Logger.Println("runner context canceled")
r.cleanup()
// wait all service's goroutine to stop
r.waiter.Wait()
return
}
}
}
// Stop will receive stop channel
func (r *Runner) Stop() chan<- bool {
return r.stopChan
}
// cleanup will Cleanup the tob Runner services resource
func (r *Runner) cleanup() error {
for _, service := range r.services {
if service != nil && service.IsEnabled() {
err := service.Close()
if err != nil {
Logger.Println(err)
}
// send stop channel
service.Stop() <- true
}
}
return nil
}