From 7a765ac79cfaead96f34b4a4b42e4eb260e9ae41 Mon Sep 17 00:00:00 2001 From: dvonthenen Date: Tue, 21 Nov 2023 19:15:57 -0800 Subject: [PATCH] Fix KeepAlive Interval and Expose Error Callback --- examples/streaming/microphone/main.go | 7 ++++ pkg/api/live/v1/default.go | 41 ++++++++++++++++++++++-- pkg/api/live/v1/interfaces/interfaces.go | 1 + pkg/api/live/v1/interfaces/types.go | 19 ++++++++--- pkg/api/live/v1/router.go | 33 ++++++++++--------- pkg/api/manage/v1/interfaces/types.go | 12 +++---- pkg/client/live/client.go | 37 ++++++++++++++------- pkg/client/live/constants.go | 2 +- pkg/client/live/types.go | 1 + 9 files changed, 113 insertions(+), 40 deletions(-) diff --git a/examples/streaming/microphone/main.go b/examples/streaming/microphone/main.go index ac89fe56..a481c00b 100644 --- a/examples/streaming/microphone/main.go +++ b/examples/streaming/microphone/main.go @@ -37,6 +37,13 @@ func (c MyCallback) Metadata(md *api.MetadataResponse) error { log.Printf("Metadata.Created: %s\n\n", strings.TrimSpace(md.Created)) return nil } +func (c MyCallback) Error(er *api.ErrorResponse) error { + // handle the error + log.Printf("\nError.Type: %s\n", er.Type) + log.Printf("Error.Message: %s\n", er.Message) + log.Printf("Error.Description: %s\n\n", er.Description) + return nil +} func main() { // init library diff --git a/pkg/api/live/v1/default.go b/pkg/api/live/v1/default.go index bc27f0d0..f2745243 100644 --- a/pkg/api/live/v1/default.go +++ b/pkg/api/live/v1/default.go @@ -16,10 +16,18 @@ import ( interfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/live/v1/interfaces" ) +// DefaultCallbackHandler is a default callback handler for live transcription +// Simply prints the transcript to stdout type DefaultCallbackHandler struct { sb strings.Builder } +// NewDefaultCallbackHandler creates a new DefaultCallbackHandler +func NewDefaultCallbackHandler() DefaultCallbackHandler { + return DefaultCallbackHandler{} +} + +// Message is the callback for a message func (dch DefaultCallbackHandler) Message(mr *interfaces.MessageResponse) error { var debugStr string if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { @@ -56,6 +64,7 @@ func (dch DefaultCallbackHandler) Message(mr *interfaces.MessageResponse) error return nil } +// Metadata is the callback for a metadata func (dch DefaultCallbackHandler) Metadata(md *interfaces.MetadataResponse) error { var debugStr string if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { @@ -88,6 +97,34 @@ func (dch DefaultCallbackHandler) Metadata(md *interfaces.MetadataResponse) erro return nil } -func NewDefaultCallbackHandler() DefaultCallbackHandler { - return DefaultCallbackHandler{} +func (dch DefaultCallbackHandler) Error(er *interfaces.ErrorResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.Compare(strings.ToLower(debugStr), "true") == 0 { + data, err := json.Marshal(er) + if err != nil { + klog.V(1).Infof("Error json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJson, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nError Object:\n%s\n\n", prettyJson) + + return nil + } + + // handle the message + fmt.Printf("\nError.Type: %s\n", er.Type) + fmt.Printf("Error.Message: %s\n", er.Message) + fmt.Printf("Error.Description: %s\n\n", er.Description) + + return nil } diff --git a/pkg/api/live/v1/interfaces/interfaces.go b/pkg/api/live/v1/interfaces/interfaces.go index c33d235f..f1b39202 100644 --- a/pkg/api/live/v1/interfaces/interfaces.go +++ b/pkg/api/live/v1/interfaces/interfaces.go @@ -8,5 +8,6 @@ package interfaces type LiveMessageCallback interface { Message(mr *MessageResponse) error Metadata(md *MetadataResponse) error + Error(er *ErrorResponse) error // TODO: implement other conversation insights } diff --git a/pkg/api/live/v1/interfaces/types.go b/pkg/api/live/v1/interfaces/types.go index 37227ca8..3b287bcd 100644 --- a/pkg/api/live/v1/interfaces/types.go +++ b/pkg/api/live/v1/interfaces/types.go @@ -7,27 +7,35 @@ package interfaces /* Shared defintions for the Deepgram API */ -type Words struct { +// Word is a single word in a transcript +type Word struct { Confidence float64 `json:"confidence,omitempty"` End float64 `json:"end,omitempty"` PunctuatedWord string `json:"punctuated_word,omitempty"` Start float64 `json:"start,omitempty"` Word string `json:"word,omitempty"` } -type Alternatives struct { + +// Alternative is a single alternative in a transcript +type Alternative struct { Confidence float64 `json:"confidence,omitempty"` Transcript string `json:"transcript,omitempty"` - Words []Words `json:"words,omitempty"` + Words []Word `json:"words,omitempty"` } + +// Channel is a single channel in a transcript type Channel struct { - Alternatives []Alternatives `json:"alternatives,omitempty"` + Alternatives []Alternative `json:"alternatives,omitempty"` } +// ModelInfo is the model information for a transcript type ModelInfo struct { Arch string `json:"arch,omitempty"` Name string `json:"name,omitempty"` Version string `json:"version,omitempty"` } + +// Metadata is the metadata for a transcript type Metadata struct { ModelInfo ModelInfo `json:"model_info,omitempty"` ModelUUID string `json:"model_uuid,omitempty"` @@ -37,6 +45,7 @@ type Metadata struct { /* Results from Live Transcription */ +// MessageResponse is the response from a live transcription type MessageResponse struct { Channel Channel `json:"channel,omitempty"` ChannelIndex []int `json:"channel_index,omitempty"` @@ -48,6 +57,7 @@ type MessageResponse struct { Type string `json:"type,omitempty"` } +// MetadataResponse is the response from a live transcription type MetadataResponse struct { Channels int `json:"channels,omitempty"` Created string `json:"created,omitempty"` @@ -60,6 +70,7 @@ type MetadataResponse struct { Type string `json:"type,omitempty"` } +// ErrorResponse is the response from a live transcription type ErrorResponse struct { Description string `json:"description"` Message string `json:"message"` diff --git a/pkg/api/live/v1/router.go b/pkg/api/live/v1/router.go index d59b1f10..162bf0b9 100644 --- a/pkg/api/live/v1/router.go +++ b/pkg/api/live/v1/router.go @@ -6,7 +6,6 @@ package live import ( "encoding/json" - "errors" prettyjson "github.com/hokaccha/go-prettyjson" klog "k8s.io/klog/v2" @@ -48,7 +47,7 @@ func (r *MessageRouter) Message(byMsg []byte) error { switch mt.Type { case interfaces.TypeErrorResponse: - return r.HandleError(byMsg) + return r.ErrorResponse(byMsg) case interfaces.TypeMessageResponse: return r.MessageResponse(byMsg) case interfaces.TypeMetadataResponse: @@ -121,31 +120,35 @@ func (r *MessageRouter) MetadataResponse(byMsg []byte) error { return nil } -// HandleError handles error messages -func (r *MessageRouter) HandleError(byMsg []byte) error { - klog.V(6).Infof("router.HandleError ENTER\n") +func (r *MessageRouter) ErrorResponse(byMsg []byte) error { + klog.V(6).Infof("router.ErrorResponse ENTER\n") // trace debugging - r.printDebugMessages(1, "HandleError", byMsg) + r.printDebugMessages(5, "ErrorResponse", byMsg) var er interfaces.ErrorResponse err := json.Unmarshal(byMsg, &er) if err != nil { - klog.V(1).Infof("HandleError json.Unmarshal failed. Err: %v\n", err) - klog.V(6).Infof("router.HandleError LEAVE\n") + klog.V(1).Infof("ErrorResponse json.Unmarshal failed. Err: %v\n", err) + klog.V(6).Infof("router.ErrorResponse LEAVE\n") return err } - b, err := json.MarshalIndent(er, "", " ") - if err != nil { - klog.V(1).Infof("HandleError MarshalIndent failed. Err: %v\n", err) - klog.V(6).Infof("router.HandleError LEAVE\n") + if r.callback != nil { + err := r.callback.Error(&er) + if err != nil { + klog.V(1).Infof("callback.ErrorResponse failed. Err: %v\n", err) + } else { + klog.V(5).Infof("callback.ErrorResponse succeeded\n") + } + klog.V(6).Infof("router.ErrorResponse LEAVE\n") return err } - klog.V(1).Infof("\n\nError: %s\n\n", string(b)) - klog.V(6).Infof("router.HandleError LEAVE\n") - return errors.New(string(b)) + klog.V(1).Infof("User callback is undefined\n") + klog.V(6).Infof("router.ErrorResponse ENTER\n") + + return nil } // UnhandledMessage handles the UnhandledMessage message diff --git a/pkg/api/manage/v1/interfaces/types.go b/pkg/api/manage/v1/interfaces/types.go index 95d79309..ed82195a 100644 --- a/pkg/api/manage/v1/interfaces/types.go +++ b/pkg/api/manage/v1/interfaces/types.go @@ -129,8 +129,8 @@ type Request struct { Callback interface{} `json:"callback,omitempty"` } -// Models provides a list of models -type Models struct { +// Model provides a list of models +type Model struct { Name string `json:"name,omitempty"` Language string `json:"language,omitempty"` Version string `json:"version,omitempty"` @@ -143,8 +143,8 @@ type Resolution struct { Amount int `json:"amount,omitempty"` } -// Results provides a list of results -type Results struct { +// Result provides a list of results +type Result struct { Start string `json:"start,omitempty"` End string `json:"end,omitempty"` Hours float64 `json:"hours,omitempty"` @@ -162,7 +162,7 @@ type RequestList struct { // UsageField provides a usage field type UsageField struct { Tags []any `json:"tags,omitempty"` - Models []Models `json:"models,omitempty"` + Models []Model `json:"models,omitempty"` ProcessingMethods []string `json:"processing_methods,omitempty"` Features []string `json:"features,omitempty"` } @@ -172,7 +172,7 @@ type Usage struct { Start string `json:"start,omitempty"` End string `json:"end,omitempty"` Resolution Resolution `json:"resolution,omitempty"` - Results []Results `json:"results,omitempty"` + Results []Result `json:"results,omitempty"` } /***********************************/ diff --git a/pkg/client/live/client.go b/pkg/client/live/client.go index 1d95531e..45921653 100644 --- a/pkg/client/live/client.go +++ b/pkg/client/live/client.go @@ -415,6 +415,9 @@ func (c *Client) closeWs() { func (c *Client) ping() { klog.V(6).Infof("live.ping() ENTER\n") + var counter uint64 + counter = 0 + ticker := time.NewTicker(pingPeriod) defer ticker.Stop() for { @@ -424,7 +427,8 @@ func (c *Client) ping() { klog.V(6).Infof("live.ping() LEAVE\n") return case <-ticker.C: - klog.V(4).Infof("Starting ping...") + klog.V(5).Infof("Starting ping...") + counter++ ws := c.Connect() if ws == nil { @@ -434,18 +438,27 @@ func (c *Client) ping() { // doing a write, need to lock c.mu.Lock() - klog.V(4).Infof("Sending ping... need reply in %d\n", (pingPeriod / 2)) - - // deepgram keepalive message - errDg := ws.WriteMessage(websocket.BinaryMessage, []byte("{ \"type\": \"KeepAlive\" }")) - if errDg != nil { - klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg) + klog.V(5).Infof("Sending ping... need reply in %d\n", (pingPeriod / 2)) + + var errDg error + if c.cOptions.EnableKeepAlive { + klog.V(5).Infof("Sending Deepgram KeepAlive message...\n") + // deepgram keepalive message + errDg = ws.WriteMessage(websocket.BinaryMessage, []byte("{ \"type\": \"KeepAlive\" }")) + if errDg != nil { + klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg) + } } - // websocket protocol ping/pong - errProto := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(pingPeriod/2)) - if errProto != nil { - klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto) + // websocket protocol ping/pong... this loop is every 5 seconds, so ping every 20 seconds + var errProto error + errProto = nil + if counter%4 == 0 { + klog.V(5).Infof("Sending Protocol KeepAlive message...\n") + errProto = ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(pingPeriod/2)) + if errProto != nil { + klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto) + } } c.mu.Unlock() @@ -453,7 +466,7 @@ func (c *Client) ping() { klog.V(1).Infof("WebSocketClient::ping failed\n") c.closeWs() } else { - klog.V(2).Infof("Ping sent!") + klog.V(5).Infof("Ping sent!") } } } diff --git a/pkg/client/live/constants.go b/pkg/client/live/constants.go index 1a7320de..d8fbe2dc 100644 --- a/pkg/client/live/constants.go +++ b/pkg/client/live/constants.go @@ -11,7 +11,7 @@ import ( ) const ( - pingPeriod = 30 * time.Second + pingPeriod = 5 * time.Second connectionRetryInfinite int64 = 0 defaultConnectRetry int64 = 3 diff --git a/pkg/client/live/types.go b/pkg/client/live/types.go index 1d296efe..15cb0f30 100644 --- a/pkg/client/live/types.go +++ b/pkg/client/live/types.go @@ -22,6 +22,7 @@ type ClientOptions struct { Path string // override for the endpoint path usually RedirectService bool // allows HTTP redirects to be followed SkipServerAuth bool // keeps the client from authenticating with the server + EnableKeepAlive bool // enables the keep alive feature } // Client is a struct representing the websocket client connection