Skip to content

Commit

Permalink
feat: websocket callback event
Browse files Browse the repository at this point in the history
  • Loading branch information
chyroc committed Jan 10, 2024
1 parent cbd8cb9 commit 2e94dd8
Show file tree
Hide file tree
Showing 12 changed files with 1,659 additions and 0 deletions.
43 changes: 43 additions & 0 deletions lark_ws/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package lark_ws

import (
"sync"
"time"
)

type cache struct {
values sync.Map
}

func newCache() *cache {
return &cache{
values: sync.Map{},
}
}

func (r *cache) get(key string) (interface{}, bool) {
val, ok := r.values.Load(key)
if !ok {
return nil, false
}
item := val.(*cacheItem)
if item.expired.Before(time.Now()) {
r.values.Delete(key)
return nil, false
}
return item.val, true
}

func (r *cache) set(key string, val interface{}) {
r.values.Store(key, &cacheItem{
val: val,

// https://open.feishu.cn/document/server-docs/event-subscription-guide/overview
expired: time.Now().Add(time.Hour),
})
}

type cacheItem struct {
val interface{}
expired time.Time
}
157 changes: 157 additions & 0 deletions lark_ws/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package lark_ws

import (
"context"
"fmt"
"math/rand"
"net/http"
"net/url"
"runtime/debug"
"strconv"
"sync"
"time"

"github.com/chyroc/lark"
"github.com/gorilla/websocket"
)

type Client struct {
Lark *lark.Lark
wsDialer *websocket.Dialer
conn *websocket.Conn
connURL *url.URL
serviceID string
connID string
autoReconnect bool // 是否自动重连,默认开启
reconnectNonce int // 首次重连抖动,单位秒
reconnectCount int // 重连次数,负数无限次
reconnectInterval time.Duration // 重连间隔
pingInterval time.Duration // Ping间隔
cache *cache
mu sync.Mutex
}

type ClientOption func(cli *Client)

func WithAutoReconnect(b bool) ClientOption {
return func(cli *Client) {
cli.autoReconnect = b
}
}

func New(larkCli *lark.Lark, opts ...ClientOption) *Client {
cli := &Client{
Lark: larkCli,
wsDialer: &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second,
},
conn: nil,
connURL: nil,
serviceID: "",
connID: "",
autoReconnect: true,
reconnectNonce: 30,
reconnectCount: -1,
reconnectInterval: 2 * time.Minute,
pingInterval: 2 * time.Minute,
cache: newCache(),
mu: sync.Mutex{},
}

for _, opt := range opts {
opt(cli)
}

return cli
}

func (c *Client) Start(ctx context.Context) (err error) {
err = c.connect(ctx)
if err != nil {
c.logError(ctx, "connect failed, err: %s", err)
if !isRetryErr(err) {
return
}
c.disconnect(ctx)
if c.autoReconnect {
if err = c.reconnect(ctx); err != nil {
return err
}
} else {
return err
}
}
go c.pingLoop(ctx)
select {}
}

func (c *Client) reconnect(ctx context.Context) (err error) {
// 首次重连随机抖动
if c.reconnectNonce > 0 {
rand.Seed(time.Now().UnixNano())
num := rand.Intn(c.reconnectNonce * 1000)
time.Sleep(time.Duration(num) * time.Millisecond)
}

if c.reconnectCount >= 0 {
for i := 0; i < c.reconnectCount; i++ {
success, err := c.tryConnect(ctx, i)
if success || err != nil {
return err
}
time.Sleep(c.reconnectInterval)
}
return fmt.Errorf("unable to connect to server after %d retries", c.reconnectCount)
} else {
i := 0
for {
success, err := c.tryConnect(ctx, i)
if success || err != nil {
return err
}
time.Sleep(c.reconnectInterval)
i += 1
}
}
}

func (c *Client) tryConnect(ctx context.Context, cnt int) (bool, error) {
c.logInfo(ctx, "trying to reconnect: %d", cnt+1)
err := c.connect(ctx)
if err == nil {
return true, nil
} else if !isRetryErr(err) {
return false, err
} else {
c.logError(ctx, "connect failed, err: %v", err)
return false, nil
}
}

func (c *Client) pingLoop(ctx context.Context) {
defer func() {
if e := recover(); e != nil {
c.logWarn(ctx, "ping loop panic, panic: %v, stack: %s", e, string(debug.Stack()))
}
// TODO: 短时间内一直 panic, 退出
go c.pingLoop(ctx)
}()

for {
// TODO: 锁
if c.conn != nil {
i, _ := strconv.ParseInt(c.serviceID, 10, 32)
frame := newPingFrame(int32(i))
bs, _ := frame.Marshal()

err := c.writeMessage(websocket.BinaryMessage, bs)
if err != nil {
c.logWarn(ctx, "ping failed, err: %v", err)
} else {
c.logDebug(ctx, "ping success")
}
}
time.Sleep(c.pingInterval)
}
}
88 changes: 88 additions & 0 deletions lark_ws/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package lark_ws

import (
"context"
"errors"
"net/http"
"strconv"

"github.com/chyroc/lark"
)

func (c *Client) connect(ctx context.Context) (err error) {
c.mu.Lock()
defer c.mu.Unlock()

if c.conn != nil {
return
}

endpoint, err := c.getEndpoint(ctx)
if err != nil {
return
}
if err := c.saveURL(endpoint.URL); err != nil {
return err
}
c.saveClientConfig(endpoint.ClientConfig)

conn, resp, err := c.wsDialer.Dial(endpoint.URL, nil)
if err != nil && resp == nil {
return err
}
if resp.StatusCode != http.StatusSwitchingProtocols {
return parseWebsocketErr("Callback", "ConnWebsocket", resp)
}
c.conn = conn

c.logInfo(ctx, "connected to %s", endpoint.URL)

go c.receiveMessageLoop(ctx)

return
}

func (c *Client) disconnect(ctx context.Context) {
c.mu.Lock()
defer c.mu.Unlock()

if c.conn == nil {
return
}

_ = c.conn.Close()
c.conn = nil
c.connURL = nil
c.connID = ""
c.serviceID = ""
c.logInfo(ctx, "disconnected to %s", c.connURL)
}

const (
codeForbidden = 403
codeAuthFailed = 514
codeExceedConnLimit = 1000040350
)

func isRetryErr(err error) bool {
var e *lark.Error
if errors.As(err, &e) {
return e.Code != codeForbidden && e.Code != codeExceedConnLimit
}
return true
}

func parseWebsocketErr(scope, funcName string, resp *http.Response) error {
code, _ := strconv.ParseInt(resp.Header.Get("Handshake-Status"), 10, 64)
msg := resp.Header.Get("Handshake-Msg")
switch code {
case codeAuthFailed:
authCode, _ := strconv.ParseInt(resp.Header.Get("Handshake-Autherrcode"), 10, 64)
if authCode != 0 {
return lark.NewError(scope, funcName, authCode, msg)
}
return lark.NewError(scope, funcName, code, msg)
default:
return lark.NewError(scope, funcName, code, msg)
}
}
78 changes: 78 additions & 0 deletions lark_ws/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package lark_ws

import (
"context"
"fmt"
"net/http"
"net/url"
"time"

"github.com/chyroc/lark"
)

// endpoint ...
type endpoint struct {
URL string `json:"URL,omitempty"`
ClientConfig *endpointClientConfig `json:"ClientConfig,omitempty"`
}

// endpointClientConfig ...
type endpointClientConfig struct {
ReconnectCount int `json:"ReconnectCount,omitempty"`
ReconnectInterval int `json:"ReconnectInterval,omitempty"`
ReconnectNonce int `json:"ReconnectNonce,omitempty"`
PingInterval int `json:"PingInterval,omitempty"`
}

func (c *Client) getEndpoint(ctx context.Context) (*endpoint, error) {
type GenerateCallbackWebsocketEndpointReq struct {
AppID string `json:"AppID,omitempty"`
AppSecret string `json:"AppSecret,omitempty"`
Locale string `query:"locale" json:"-"`
}
type GenerateCallbackWebsocketEndpointResp struct {
Code int64 `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
Data *endpoint `json:"data,omitempty"`
}
req := &lark.RawRequestReq{
Scope: "Callback",
API: "GenerateCallbackWebsocketEndpoint",
Method: http.MethodPost,
URL: c.Lark.OpenBaseURL() + "/callback/ws/endpoint",
Body: GenerateCallbackWebsocketEndpointReq{
AppID: c.Lark.AppID(),
AppSecret: c.Lark.AppSecret(),
Locale: "zh",
},
MethodOption: &lark.MethodOption{},
}
resp := &GenerateCallbackWebsocketEndpointResp{}
_, err := c.Lark.RawRequest(ctx, req, resp)
if err != nil {
return nil, err
}
return resp.Data, nil
}

func (c *Client) saveURL(endpointURL string) error {
u, err := url.Parse(endpointURL)
if err != nil {
return fmt.Errorf("ws: invalid conn url: '%s'", endpointURL)
}
connID := u.Query().Get("device_id")
serviceID := u.Query().Get("service_id")

c.connID = connID
c.serviceID = serviceID
c.connURL = u

return nil
}

func (c *Client) saveClientConfig(conf *endpointClientConfig) {
c.reconnectCount = conf.ReconnectCount
c.reconnectInterval = time.Duration(conf.ReconnectInterval) * time.Second
c.reconnectNonce = conf.ReconnectNonce
c.pingInterval = time.Duration(conf.PingInterval) * time.Second
}
13 changes: 13 additions & 0 deletions lark_ws/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module github.com/chyroc/lark/lark_ws

go 1.20

replace github.com/chyroc/lark => ../

require (
github.com/chyroc/lark v0.0.112
github.com/gogo/protobuf v1.3.2
github.com/gorilla/websocket v1.5.1
)

require golang.org/x/net v0.17.0 // indirect
Loading

0 comments on commit 2e94dd8

Please sign in to comment.