Skip to content

Commit

Permalink
Fix KeepAlive Interval and Expose Error Callback
Browse files Browse the repository at this point in the history
  • Loading branch information
dvonthenen committed Nov 22, 2023
1 parent e6f4803 commit 7a765ac
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 40 deletions.
7 changes: 7 additions & 0 deletions examples/streaming/microphone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func (c MyCallback) Metadata(md *api.MetadataResponse) error {
log.Printf("Metadata.Created: %s\n\n", strings.TrimSpace(md.Created))
return nil
}
func (c MyCallback) Error(er *api.ErrorResponse) error {
// handle the error
log.Printf("\nError.Type: %s\n", er.Type)
log.Printf("Error.Message: %s\n", er.Message)
log.Printf("Error.Description: %s\n\n", er.Description)
return nil
}

func main() {
// init library
Expand Down
41 changes: 39 additions & 2 deletions pkg/api/live/v1/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ import (
interfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/live/v1/interfaces"
)

// DefaultCallbackHandler is a default callback handler for live transcription
// Simply prints the transcript to stdout
type DefaultCallbackHandler struct {
sb strings.Builder
}

// NewDefaultCallbackHandler creates a new DefaultCallbackHandler
func NewDefaultCallbackHandler() DefaultCallbackHandler {
return DefaultCallbackHandler{}
}

// Message is the callback for a message
func (dch DefaultCallbackHandler) Message(mr *interfaces.MessageResponse) error {
var debugStr string
if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" {
Expand Down Expand Up @@ -56,6 +64,7 @@ func (dch DefaultCallbackHandler) Message(mr *interfaces.MessageResponse) error
return nil
}

// Metadata is the callback for a metadata
func (dch DefaultCallbackHandler) Metadata(md *interfaces.MetadataResponse) error {
var debugStr string
if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" {
Expand Down Expand Up @@ -88,6 +97,34 @@ func (dch DefaultCallbackHandler) Metadata(md *interfaces.MetadataResponse) erro
return nil
}

func NewDefaultCallbackHandler() DefaultCallbackHandler {
return DefaultCallbackHandler{}
func (dch DefaultCallbackHandler) Error(er *interfaces.ErrorResponse) error {
var debugStr string
if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" {
klog.V(4).Infof("DEEPGRAM_DEBUG found")
debugStr = v
}

if strings.Compare(strings.ToLower(debugStr), "true") == 0 {
data, err := json.Marshal(er)
if err != nil {
klog.V(1).Infof("Error json.Marshal failed. Err: %v\n", err)
return err
}

prettyJson, err := prettyjson.Format(data)
if err != nil {
klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err)
return err
}
klog.V(2).Infof("\n\nError Object:\n%s\n\n", prettyJson)

return nil
}

// handle the message
fmt.Printf("\nError.Type: %s\n", er.Type)
fmt.Printf("Error.Message: %s\n", er.Message)
fmt.Printf("Error.Description: %s\n\n", er.Description)

return nil
}
1 change: 1 addition & 0 deletions pkg/api/live/v1/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ package interfaces
type LiveMessageCallback interface {
Message(mr *MessageResponse) error
Metadata(md *MetadataResponse) error
Error(er *ErrorResponse) error
// TODO: implement other conversation insights
}
19 changes: 15 additions & 4 deletions pkg/api/live/v1/interfaces/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,35 @@ package interfaces
/*
Shared defintions for the Deepgram API
*/
type Words struct {
// Word is a single word in a transcript
type Word struct {
Confidence float64 `json:"confidence,omitempty"`
End float64 `json:"end,omitempty"`
PunctuatedWord string `json:"punctuated_word,omitempty"`
Start float64 `json:"start,omitempty"`
Word string `json:"word,omitempty"`
}
type Alternatives struct {

// Alternative is a single alternative in a transcript
type Alternative struct {
Confidence float64 `json:"confidence,omitempty"`
Transcript string `json:"transcript,omitempty"`
Words []Words `json:"words,omitempty"`
Words []Word `json:"words,omitempty"`
}

// Channel is a single channel in a transcript
type Channel struct {
Alternatives []Alternatives `json:"alternatives,omitempty"`
Alternatives []Alternative `json:"alternatives,omitempty"`
}

// ModelInfo is the model information for a transcript
type ModelInfo struct {
Arch string `json:"arch,omitempty"`
Name string `json:"name,omitempty"`
Version string `json:"version,omitempty"`
}

// Metadata is the metadata for a transcript
type Metadata struct {
ModelInfo ModelInfo `json:"model_info,omitempty"`
ModelUUID string `json:"model_uuid,omitempty"`
Expand All @@ -37,6 +45,7 @@ type Metadata struct {
/*
Results from Live Transcription
*/
// MessageResponse is the response from a live transcription
type MessageResponse struct {
Channel Channel `json:"channel,omitempty"`
ChannelIndex []int `json:"channel_index,omitempty"`
Expand All @@ -48,6 +57,7 @@ type MessageResponse struct {
Type string `json:"type,omitempty"`
}

// MetadataResponse is the response from a live transcription
type MetadataResponse struct {
Channels int `json:"channels,omitempty"`
Created string `json:"created,omitempty"`
Expand All @@ -60,6 +70,7 @@ type MetadataResponse struct {
Type string `json:"type,omitempty"`
}

// ErrorResponse is the response from a live transcription
type ErrorResponse struct {
Description string `json:"description"`
Message string `json:"message"`
Expand Down
33 changes: 18 additions & 15 deletions pkg/api/live/v1/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package live

import (
"encoding/json"
"errors"

prettyjson "github.com/hokaccha/go-prettyjson"
klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -48,7 +47,7 @@ func (r *MessageRouter) Message(byMsg []byte) error {

switch mt.Type {
case interfaces.TypeErrorResponse:
return r.HandleError(byMsg)
return r.ErrorResponse(byMsg)
case interfaces.TypeMessageResponse:
return r.MessageResponse(byMsg)
case interfaces.TypeMetadataResponse:
Expand Down Expand Up @@ -121,31 +120,35 @@ func (r *MessageRouter) MetadataResponse(byMsg []byte) error {
return nil
}

// HandleError handles error messages
func (r *MessageRouter) HandleError(byMsg []byte) error {
klog.V(6).Infof("router.HandleError ENTER\n")
func (r *MessageRouter) ErrorResponse(byMsg []byte) error {
klog.V(6).Infof("router.ErrorResponse ENTER\n")

// trace debugging
r.printDebugMessages(1, "HandleError", byMsg)
r.printDebugMessages(5, "ErrorResponse", byMsg)

var er interfaces.ErrorResponse
err := json.Unmarshal(byMsg, &er)
if err != nil {
klog.V(1).Infof("HandleError json.Unmarshal failed. Err: %v\n", err)
klog.V(6).Infof("router.HandleError LEAVE\n")
klog.V(1).Infof("ErrorResponse json.Unmarshal failed. Err: %v\n", err)
klog.V(6).Infof("router.ErrorResponse LEAVE\n")
return err
}

b, err := json.MarshalIndent(er, "", " ")
if err != nil {
klog.V(1).Infof("HandleError MarshalIndent failed. Err: %v\n", err)
klog.V(6).Infof("router.HandleError LEAVE\n")
if r.callback != nil {
err := r.callback.Error(&er)
if err != nil {
klog.V(1).Infof("callback.ErrorResponse failed. Err: %v\n", err)
} else {
klog.V(5).Infof("callback.ErrorResponse succeeded\n")
}
klog.V(6).Infof("router.ErrorResponse LEAVE\n")
return err
}

klog.V(1).Infof("\n\nError: %s\n\n", string(b))
klog.V(6).Infof("router.HandleError LEAVE\n")
return errors.New(string(b))
klog.V(1).Infof("User callback is undefined\n")
klog.V(6).Infof("router.ErrorResponse ENTER\n")

return nil
}

// UnhandledMessage handles the UnhandledMessage message
Expand Down
12 changes: 6 additions & 6 deletions pkg/api/manage/v1/interfaces/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ type Request struct {
Callback interface{} `json:"callback,omitempty"`
}

// Models provides a list of models
type Models struct {
// Model provides a list of models
type Model struct {
Name string `json:"name,omitempty"`
Language string `json:"language,omitempty"`
Version string `json:"version,omitempty"`
Expand All @@ -143,8 +143,8 @@ type Resolution struct {
Amount int `json:"amount,omitempty"`
}

// Results provides a list of results
type Results struct {
// Result provides a list of results
type Result struct {
Start string `json:"start,omitempty"`
End string `json:"end,omitempty"`
Hours float64 `json:"hours,omitempty"`
Expand All @@ -162,7 +162,7 @@ type RequestList struct {
// UsageField provides a usage field
type UsageField struct {
Tags []any `json:"tags,omitempty"`
Models []Models `json:"models,omitempty"`
Models []Model `json:"models,omitempty"`
ProcessingMethods []string `json:"processing_methods,omitempty"`
Features []string `json:"features,omitempty"`
}
Expand All @@ -172,7 +172,7 @@ type Usage struct {
Start string `json:"start,omitempty"`
End string `json:"end,omitempty"`
Resolution Resolution `json:"resolution,omitempty"`
Results []Results `json:"results,omitempty"`
Results []Result `json:"results,omitempty"`
}

/***********************************/
Expand Down
37 changes: 25 additions & 12 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,9 @@ func (c *Client) closeWs() {
func (c *Client) ping() {
klog.V(6).Infof("live.ping() ENTER\n")

var counter uint64
counter = 0

ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
Expand All @@ -424,7 +427,8 @@ func (c *Client) ping() {
klog.V(6).Infof("live.ping() LEAVE\n")
return
case <-ticker.C:
klog.V(4).Infof("Starting ping...")
klog.V(5).Infof("Starting ping...")
counter++

ws := c.Connect()
if ws == nil {
Expand All @@ -434,26 +438,35 @@ func (c *Client) ping() {

// doing a write, need to lock
c.mu.Lock()
klog.V(4).Infof("Sending ping... need reply in %d\n", (pingPeriod / 2))

// deepgram keepalive message
errDg := ws.WriteMessage(websocket.BinaryMessage, []byte("{ \"type\": \"KeepAlive\" }"))
if errDg != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg)
klog.V(5).Infof("Sending ping... need reply in %d\n", (pingPeriod / 2))

var errDg error
if c.cOptions.EnableKeepAlive {
klog.V(5).Infof("Sending Deepgram KeepAlive message...\n")
// deepgram keepalive message
errDg = ws.WriteMessage(websocket.BinaryMessage, []byte("{ \"type\": \"KeepAlive\" }"))
if errDg != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg)
}
}

// websocket protocol ping/pong
errProto := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(pingPeriod/2))
if errProto != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto)
// websocket protocol ping/pong... this loop is every 5 seconds, so ping every 20 seconds
var errProto error
errProto = nil
if counter%4 == 0 {
klog.V(5).Infof("Sending Protocol KeepAlive message...\n")
errProto = ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(pingPeriod/2))
if errProto != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto)
}
}
c.mu.Unlock()

if errDg != nil || errProto != nil {
klog.V(1).Infof("WebSocketClient::ping failed\n")
c.closeWs()
} else {
klog.V(2).Infof("Ping sent!")
klog.V(5).Infof("Ping sent!")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/live/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

const (
pingPeriod = 30 * time.Second
pingPeriod = 5 * time.Second

connectionRetryInfinite int64 = 0
defaultConnectRetry int64 = 3
Expand Down
1 change: 1 addition & 0 deletions pkg/client/live/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ClientOptions struct {
Path string // override for the endpoint path usually <version/listen>
RedirectService bool // allows HTTP redirects to be followed
SkipServerAuth bool // keeps the client from authenticating with the server
EnableKeepAlive bool // enables the keep alive feature
}

// Client is a struct representing the websocket client connection
Expand Down

0 comments on commit 7a765ac

Please sign in to comment.