diff --git a/README.md b/README.md index 3dbc3df5..9f0fec13 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,11 @@ For documentation relating to Speech-to-Text (and Intelligence) from PreRecorded For documentation relating to Text-to-Speech: +- WebSocket: + - Speak REST Client - [https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/client/speak/v1/websocket](https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/client/speak/v1/websocket) + - Speak REST API - [https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/websocket](https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/websocket) + - Speak API Interfaces - [https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/websocket/interfaces](https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/websocket/interfaces) + - REST: - Speak REST Client - [https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/client/speak/v1/rest](https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/client/speak/v1/rest) - Speak REST API - [https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/rest](https://pkg.go.dev/github.com/deepgram/deepgram-go-sdk@main/pkg/api/speak/v1/rest) @@ -207,6 +212,11 @@ Speech-to-Text - Live Audio: - From a Microphone - [examples/speech-to-text/websocket/microphone](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/microphone/main.go) - From an HTTP Endpoint - [examples/speech-to-text/websocket/http](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/http/main.go) +Text-to-Speech - WebSocket + +- Websocket Simple Example - [examples/text-to-speech/websocket/simple](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/text-to-speech/websocket/simple/main.go) +- Interactive Websocket - [examples/text-to-speech/websocket/interactive](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/text-to-speech/websocket/interactive/main.go) + Text-to-Speech - REST - Save audio to a Path - [examples/text-to-speech/rest/file](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/text-to-speech/rest/file/main.go) diff --git a/docs.go b/docs.go index d4262d78..e4c68664 100644 --- a/docs.go +++ b/docs.go @@ -30,5 +30,5 @@ import ( _ "github.com/deepgram/deepgram-go-sdk/pkg/api/listen/v1/websocket" _ "github.com/deepgram/deepgram-go-sdk/pkg/api/manage/v1" _ "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/rest" - // _ "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket" + _ "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket" ) diff --git a/examples/speech-to-text/websocket/http/main.go b/examples/speech-to-text/websocket/http/main.go index 12fc44bc..45ec2ec4 100644 --- a/examples/speech-to-text/websocket/http/main.go +++ b/examples/speech-to-text/websocket/http/main.go @@ -37,7 +37,7 @@ func main() { } // create a Deepgram client - dgClient, err := client.NewWebSocketForDemo(ctx, transcriptOptions) + dgClient, err := client.NewWSForDemo(ctx, transcriptOptions) if err != nil { fmt.Println("ERROR creating LiveTranscription connection:", err) return diff --git a/examples/speech-to-text/websocket/microphone/main.go b/examples/speech-to-text/websocket/microphone/main.go index b36a8a80..6980a820 100644 --- a/examples/speech-to-text/websocket/microphone/main.go +++ b/examples/speech-to-text/websocket/microphone/main.go @@ -150,7 +150,7 @@ func main() { } // create a Deepgram client - dgClient, err := client.NewWebSocket(ctx, "", cOptions, tOptions, callback) + dgClient, err := client.NewWS(ctx, "", cOptions, tOptions, callback) if err != nil { fmt.Println("ERROR creating LiveTranscription connection:", err) return diff --git a/examples/speech-to-text/websocket/replay/main.go b/examples/speech-to-text/websocket/replay/main.go index 85a59f6f..c56c461f 100644 --- a/examples/speech-to-text/websocket/replay/main.go +++ b/examples/speech-to-text/websocket/replay/main.go @@ -37,7 +37,7 @@ func main() { } // create a Deepgram client - dgClient, err := client.NewWebSocketForDemo(ctx, options) + dgClient, err := client.NewWSForDemo(ctx, options) if err != nil { log.Println("ERROR creating LiveTranscription connection:", err) return diff --git a/examples/speech-to-text/websocket/test/main.go b/examples/speech-to-text/websocket/test/main.go index d222c8a3..7fe07c81 100644 --- a/examples/speech-to-text/websocket/test/main.go +++ b/examples/speech-to-text/websocket/test/main.go @@ -55,7 +55,7 @@ func main() { } // create a Deepgram client - dgClient, err := client.NewWebSocket(ctx, "", cOptions, tOptions, nil) + dgClient, err := client.NewWS(ctx, "", cOptions, tOptions, nil) if err != nil { fmt.Println("ERROR creating LiveTranscription connection:", err) return diff --git a/examples/text-to-speech/websocket/interactive/main.go b/examples/text-to-speech/websocket/interactive/main.go new file mode 100644 index 00000000..05e4f52d --- /dev/null +++ b/examples/text-to-speech/websocket/interactive/main.go @@ -0,0 +1,159 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package main + +import ( + "bufio" + "context" + "fmt" + "os" + "strings" + "time" + + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" + speak "github.com/deepgram/deepgram-go-sdk/pkg/client/speak" +) + +const ( + API_KEY = "" + TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + AUDIO_FILE = "output.mp3" +) + +// Implement your own callback +type MyCallback struct{} + +func (c MyCallback) Metadata(md *msginterfaces.MetadataResponse) error { + fmt.Printf("\n[Metadata] Received\n") + fmt.Printf("Metadata.RequestID: %s\n", strings.TrimSpace(md.RequestID)) + return nil +} + +func (c MyCallback) Binary(byMsg []byte) error { + fmt.Printf("\n[Binary] Received\n") + + file, err := os.OpenFile(AUDIO_FILE, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o666) + if err != nil { + fmt.Printf("Error creating file %s: %v\n", AUDIO_FILE, err) + return err + } + defer file.Close() + + _, err = file.Write(byMsg) + if err != nil { + fmt.Printf("Error writing audio data to file: %v\n", err) + return err + } + + fmt.Printf("Audio data saved to %s\n", AUDIO_FILE) + return nil +} + +func (c MyCallback) Flush(fl *msginterfaces.FlushedResponse) error { + fmt.Printf("\n[Flushed] Received\n") + fmt.Printf("\n\nPress 'r' and ENTER to reset the buffer, 'f' and ENTER to flush, enter new text to send it, or just ENTER to exit...\n\n> ") + return nil +} + +func (c MyCallback) Warning(wr *msginterfaces.WarningResponse) error { + fmt.Printf("\n[Warning] Received\n") + fmt.Printf("Warning.Code: %s\n", wr.WarnCode) + fmt.Printf("Warning.Description: %s\n\n", wr.WarnMsg) + return nil +} + +func (c MyCallback) Error(er *msginterfaces.ErrorResponse) error { + fmt.Printf("\n[Error] Received\n") + fmt.Printf("Error.Code: %s\n", er.ErrCode) + fmt.Printf("Error.Description: %s\n\n", er.Description) + return nil +} + +func (c MyCallback) Close(cr *msginterfaces.CloseResponse) error { + fmt.Printf("\n[Close] Received\n") + return nil +} + +func (c MyCallback) Open(or *msginterfaces.OpenResponse) error { + fmt.Printf("\n[Open] Received\n") + return nil +} + +func main() { + // init library + speak.InitWithDefault() + + // Go context + ctx := context.Background() + + // print instructions + fmt.Print("\n\nPress ENTER to exit!\n\n") + + // set the TTS options + ttsOptions := &interfaces.SpeakOptions{ + Model: "aura-asteria-en", + } + + // set the Client options + cOptions := &interfaces.ClientOptions{} + + // create the callback + callback := MyCallback{} + + // create a new stream using the NewStream function + dgClient, err := speak.NewWS(ctx, "", cOptions, ttsOptions, callback) + if err != nil { + fmt.Println("ERROR creating TTS connection:", err) + return + } + + // connect the websocket to Deepgram + bConnected := dgClient.Connect() + if !bConnected { + fmt.Println("Client.Connect failed") + os.Exit(1) + } + + // Simulate user input to reset the buffer, flush, send new text, or just exit + time.Sleep(2 * time.Second) + fmt.Printf("\n\nPress 'r' and ENTER to reset the buffer, 'f' and ENTER to flush, enter new text to send it, or just ENTER to exit...\n\n> ") + input := bufio.NewScanner(os.Stdin) + for input.Scan() { + switch input.Text() { + case "r": + err = dgClient.Reset() + if err != nil { + fmt.Printf("Error resetting buffer: %v\n", err) + } else { + fmt.Println("Buffer reset successfully.") + } + case "f": + err = dgClient.Flush() + if err != nil { + fmt.Printf("Error flushing buffer: %v\n", err) + } else { + fmt.Println("Buffer flushed successfully.") + } + case "": + goto EXIT + default: + err = dgClient.SpeakWithText(input.Text()) + if err != nil { + fmt.Printf("Error sending text input: %v\n", err) + } else { + fmt.Println("Text sent successfully.") + } + fmt.Printf("\n\nPress 'r' and ENTER to reset the buffer, 'f' and ENTER to flush, enter new text to send it, or just ENTER to exit...\n\n> ") + } + } + +EXIT: + + // close the connection + dgClient.Stop() + + fmt.Printf("Program exiting...\n") +} diff --git a/examples/text-to-speech/websocket/simple/main.go b/examples/text-to-speech/websocket/simple/main.go new file mode 100644 index 00000000..d9cedada --- /dev/null +++ b/examples/text-to-speech/websocket/simple/main.go @@ -0,0 +1,139 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package main + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces/v1" + speak "github.com/deepgram/deepgram-go-sdk/pkg/client/speak" +) + +const ( + TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + AUDIO_FILE = "output.mp3" +) + +// Implement your own callback +type MyCallback struct{} + +func (c MyCallback) Metadata(md *msginterfaces.MetadataResponse) error { + fmt.Printf("\n[Metadata] Received\n") + fmt.Printf("Metadata.RequestID: %s\n", strings.TrimSpace(md.RequestID)) + return nil +} + +func (c MyCallback) Binary(byMsg []byte) error { + fmt.Printf("\n[Binary] Received\n") + + file, err := os.OpenFile(AUDIO_FILE, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o666) + if err != nil { + fmt.Printf("Error creating file %s: %v\n", AUDIO_FILE, err) + return err + } + defer file.Close() + + _, err = file.Write(byMsg) + if err != nil { + fmt.Printf("Error writing audio data to file: %v\n", err) + return err + } + + fmt.Printf("Audio data saved to %s\n", AUDIO_FILE) + return nil +} + +func (c MyCallback) Flush(fl *msginterfaces.FlushedResponse) error { + fmt.Printf("\n[Flushed] Received\n") + return nil +} + +func (c MyCallback) Warning(wr *msginterfaces.WarningResponse) error { + fmt.Printf("\n[Warning] Received\n") + fmt.Printf("Warning.Code: %s\n", wr.WarnCode) + fmt.Printf("Warning.Description: %s\n\n", wr.WarnMsg) + return nil +} + +func (c MyCallback) Error(er *msginterfaces.ErrorResponse) error { + fmt.Printf("\n[Error] Received\n") + fmt.Printf("Error.Code: %s\n", er.ErrCode) + fmt.Printf("Error.Description: %s\n\n", er.ErrMsg) + return nil +} + +func (c MyCallback) Close(cr *msginterfaces.CloseResponse) error { + fmt.Printf("\n[Close] Received\n") + return nil +} + +func (c MyCallback) Open(or *msginterfaces.OpenResponse) error { + fmt.Printf("\n[Open] Received\n") + return nil +} + +func main() { + // init library + speak.Init(speak.InitLib{ + LogLevel: speak.LogLevelDefault, // LogLevelDefault, LogLevelFull, LogLevelDebug, LogLevelTrace + }) + + // Go context + ctx := context.Background() + + // set the Client options + cOptions := &interfaces.ClientOptions{ + // AutoFlushSpeakDelta: 1000, + } + + // set the TTS options + ttsOptions := &interfaces.SpeakOptions{ + Model: "aura-asteria-en", + } + + // create the callback + callback := MyCallback{} + + // create a new stream using the NewStream function + dgClient, err := speak.NewWS(ctx, "", cOptions, ttsOptions, callback) + if err != nil { + fmt.Println("ERROR creating TTS connection:", err) + return + } + + // connect the websocket to Deepgram + bConnected := dgClient.Connect() + if !bConnected { + fmt.Println("Client.Connect failed") + os.Exit(1) + } + + // Send the text input + err = dgClient.SpeakWithText(TTS_TEXT) + if err != nil { + fmt.Printf("Error sending text input: %v\n", err) + return + } + + // If AutoFlushSpeakDelta is not set, you Flush the text input manually + err = dgClient.Flush() + if err != nil { + fmt.Printf("Error sending text input: %v\n", err) + return + } + + // wait for user input to exit + time.Sleep(5 * time.Second) + + // close the connection + dgClient.Stop() + + fmt.Printf("Program exiting...\n") +} diff --git a/pkg/api/speak/v1/websocket/constants.go b/pkg/api/speak/v1/websocket/constants.go new file mode 100644 index 00000000..3b175d19 --- /dev/null +++ b/pkg/api/speak/v1/websocket/constants.go @@ -0,0 +1,25 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "errors" +) + +const ( + PackageVersion string = "v1.0" +) + +// errors +var ( + // ErrInvalidInput required input was not found + ErrInvalidInput = errors.New("required input was not found") + + // ErrInvalidMessageType invalid message type + ErrInvalidMessageType = errors.New("invalid message type") + + // ErrUserCallbackNotDefined user callback object not defined + ErrUserCallbackNotDefined = errors.New("user callback object not defined") +) diff --git a/pkg/api/speak/v1/websocket/default.go b/pkg/api/speak/v1/websocket/default.go new file mode 100644 index 00000000..b2fc33e3 --- /dev/null +++ b/pkg/api/speak/v1/websocket/default.go @@ -0,0 +1,249 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "encoding/json" + "fmt" + "os" + "strings" + + prettyjson "github.com/hokaccha/go-prettyjson" + klog "k8s.io/klog/v2" + + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" +) + +// DefaultCallbackHandler is a default callback handler for text-to-speech connections +type DefaultCallbackHandler struct{} + +// NewDefaultCallbackHandler creates a new DefaultCallbackHandler +func NewDefaultCallbackHandler() DefaultCallbackHandler { + return DefaultCallbackHandler{} +} + +// Open is the callback for when the connection opens +func (dch DefaultCallbackHandler) Open(or *interfaces.OpenResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(or) + if err != nil { + klog.V(1).Infof("Open json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nOpen Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n\n[OpenResponse]\n\n") + + return nil +} + +// Metadata is the callback for information about the connection +func (dch DefaultCallbackHandler) Metadata(md *interfaces.MetadataResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(md) + if err != nil { + klog.V(1).Infof("Metadata json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nMetadata Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n\nMetadata.RequestID: %s\n", strings.TrimSpace(md.RequestID)) + + return nil +} + +// Flushed is the callback for when the connection flushes +func (dch DefaultCallbackHandler) Flush(fr *interfaces.FlushedResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(fr) + if err != nil { + klog.V(1).Infof("Flush json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nFlush Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n\nFlushed.SequenceID: %d\n", fr.SequenceID) + + return nil +} + +// Binary is the callback for when the connection receives binary data +func (dch DefaultCallbackHandler) Binary(br []byte) error { + klog.V(3).Infof("Received binary data: %d bytes", len(br)) + return nil +} + +// Close is the callback for when the connection closes +func (dch DefaultCallbackHandler) Close(or *interfaces.CloseResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(or) + if err != nil { + klog.V(1).Infof("Close json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nClose Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n\n[CloseResponse]\n\n") + + return nil +} + +// Warning is the callback for error messages +func (dch DefaultCallbackHandler) Warning(wr *interfaces.WarningResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(wr) + if err != nil { + klog.V(1).Infof("Error json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nWarning Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n[WarningResponse]\n") + fmt.Printf("\nError.Code: %s\n", wr.WarnCode) + fmt.Printf("Error.Message: %s\n", wr.WarnMsg) + + return nil +} + +// Error is the callback for error messages +func (dch DefaultCallbackHandler) Error(er *interfaces.ErrorResponse) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + data, err := json.Marshal(er) + if err != nil { + klog.V(1).Infof("Error json.Marshal failed. Err: %v\n", err) + return err + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + return err + } + klog.V(2).Infof("\n\nError Object:\n%s\n\n", prettyJSON) + + return nil + } + + // handle the message + fmt.Printf("\n[ErrorResponse]\n") + fmt.Printf("\nError.Type: %s\n", er.ErrCode) + fmt.Printf("Error.Message: %s\n", er.ErrMsg) + fmt.Printf("Error.Description: %s\n\n", er.Description) + fmt.Printf("Error.Variant: %s\n\n", er.Variant) + + return nil +} + +// UnhandledEvent is the callback for unknown messages +func (dch DefaultCallbackHandler) UnhandledEvent(byData []byte) error { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + if strings.EqualFold(debugStr, "true") { + prettyJSON, err := prettyjson.Format(byData) + if err != nil { + klog.V(2).Infof("\n\nRaw Data:\n%s\n\n", string(byData)) + } else { + klog.V(2).Infof("\n\nError Object:\n%s\n\n", prettyJSON) + } + + return nil + } + + // handle the message + fmt.Printf("\n[UnhandledEvent]") + fmt.Printf("Dump:\n%s\n\n", string(byData)) + + return nil +} diff --git a/pkg/api/speak/v1/websocket/interfaces/constants.go b/pkg/api/speak/v1/websocket/interfaces/constants.go new file mode 100644 index 00000000..63ddcff1 --- /dev/null +++ b/pkg/api/speak/v1/websocket/interfaces/constants.go @@ -0,0 +1,18 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package interfacesv1 + +// These are the message types that can be received from the text-to-speech streaming API +const ( + // message types + TypeOpenResponse string = "Open" + TypeMetadataResponse string = "Metadata" + TypeFlushedResponse string = "Flushed" + TypeCloseResponse string = "Close" + + // "Error" type + TypeWarningResponse string = "Warning" + TypeErrorResponse string = "Error" +) diff --git a/pkg/api/speak/v1/websocket/interfaces/interfaces.go b/pkg/api/speak/v1/websocket/interfaces/interfaces.go new file mode 100644 index 00000000..b6fb1a6c --- /dev/null +++ b/pkg/api/speak/v1/websocket/interfaces/interfaces.go @@ -0,0 +1,21 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +// This package defines interfaces for the live API +package interfacesv1 + +// SpeakMessageCallback is a callback used to receive notifications for platforms messages +type SpeakMessageCallback interface { + // These are WS TextMessage that are used for flow control. + Open(or *OpenResponse) error + Metadata(md *MetadataResponse) error + Flush(fl *FlushedResponse) error + Close(cr *CloseResponse) error + + Warning(er *WarningResponse) error + Error(er *ErrorResponse) error + + // These are WS BinaryMessage that are used to send audio data to the client + Binary(byMsg []byte) error +} diff --git a/pkg/api/speak/v1/websocket/interfaces/types.go b/pkg/api/speak/v1/websocket/interfaces/types.go new file mode 100644 index 00000000..f4c7fb93 --- /dev/null +++ b/pkg/api/speak/v1/websocket/interfaces/types.go @@ -0,0 +1,58 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package interfacesv1 + +import ( + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" +) + +/***********************************/ +// Request/Input structs +/***********************************/ +type SpeakOptions interfaces.SpeakOptions + +/***********************************/ +// MessageType is the header to bootstrap you way unmarshalling other messages +/***********************************/ +/* + Example: + { + "type": "message", + "message": { + ... + } + } +*/ +type MessageType struct { + Type string `json:"type"` +} + +// MetadataResponse is the response from the text-to-speech request which contains metadata about the request +type MetadataResponse struct { + Type string `json:"type,omitempty"` + RequestID string `json:"request_id,omitempty"` +} + +// FlushedResponse is the response which indicates that the server has flushed the buffer and is ready to return audio +type FlushedResponse struct { + Type string `json:"type,omitempty"` + SequenceID int `json:"sequence_id,omitempty"` +} + +// OpenResponse is the response from the connection opening +type OpenResponse struct { + Type string `json:"type,omitempty"` +} + +// CloseResponse is the response from the connection closing +type CloseResponse struct { + Type string `json:"type,omitempty"` +} + +// WarningResponse is the Deepgram specific response warning +type WarningResponse interfaces.DeepgramWarning + +// ErrorResponse is the Deepgram specific response error +type ErrorResponse interfaces.DeepgramError diff --git a/pkg/api/speak/v1/websocket/router.go b/pkg/api/speak/v1/websocket/router.go new file mode 100644 index 00000000..a0e15116 --- /dev/null +++ b/pkg/api/speak/v1/websocket/router.go @@ -0,0 +1,195 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "encoding/json" + "os" + "strings" + + prettyjson "github.com/hokaccha/go-prettyjson" + klog "k8s.io/klog/v2" + + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" +) + +// MessageRouter routes events +type MessageRouter struct { + callback interfaces.SpeakMessageCallback + debugWebsocket bool +} + +// NewWithDefault creates a MessageRouter with the default callback handler +func NewWithDefault() *MessageRouter { + return NewStream(NewDefaultCallbackHandler()) +} + +// New creates a MessageRouter with a user-defined callback +func NewStream(callback interfaces.SpeakMessageCallback) *MessageRouter { + debugStr := os.Getenv("DEEPGRAM_DEBUG") + return &MessageRouter{ + callback: callback, + debugWebsocket: strings.EqualFold(strings.ToLower(debugStr), "true"), + } +} + +// OpenHelper handles the OpenResponse message +func (r *MessageRouter) OpenHelper(or *interfaces.OpenResponse) error { + return r.callback.Open(or) +} + +// CloseHelper handles the OpenResponse message +func (r *MessageRouter) CloseHelper(or *interfaces.CloseResponse) error { + return r.callback.Close(or) +} + +// ErrorHelper handles the ErrorResponse message +func (r *MessageRouter) ErrorHelper(er *interfaces.ErrorResponse) error { + return r.callback.Error(er) +} + +// processMessage generalizes the handling of all message types +func (r *MessageRouter) processGeneric(msgType string, byMsg []byte, action func(data *interface{}) error, data interface{}) error { + klog.V(6).Infof("router.%s ENTER\n", msgType) + + r.printDebugMessages(5, msgType, byMsg) + + var err error + if err = action(&data); err != nil { + klog.V(1).Infof("callback.%s failed. Err: %v\n", msgType, err) + } else { + klog.V(5).Infof("callback.%s succeeded\n", msgType) + } + klog.V(6).Infof("router.%s LEAVE\n", msgType) + + return err +} + +func (r *MessageRouter) processFlushed(byMsg []byte) error { + var msg interfaces.FlushedResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + return err + } + + action := func(data *interface{}) error { + return r.callback.Flush(&msg) + } + + return r.processGeneric("MessageResponse", byMsg, action, msg) +} + +func (r *MessageRouter) processMetadata(byMsg []byte) error { + var msg interfaces.MetadataResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + return err + } + + action := func(data *interface{}) error { + return r.callback.Metadata(&msg) + } + + return r.processGeneric("MetadataResponse", byMsg, action, msg) +} + +func (r *MessageRouter) processWarningResponse(byMsg []byte) error { + var msg interfaces.WarningResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + return err + } + + action := func(data *interface{}) error { + return r.callback.Warning(&msg) + } + + return r.processGeneric("WarningResponse", byMsg, action, msg) +} + +func (r *MessageRouter) processErrorResponse(byMsg []byte) error { + var msg interfaces.ErrorResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + return err + } + + action := func(data *interface{}) error { + return r.callback.Error(&msg) + } + + return r.processGeneric("ErrorResponse", byMsg, action, msg) +} + +// Message handles platform messages and routes them appropriately based on the MessageType +func (r *MessageRouter) Message(byMsg []byte) error { + klog.V(6).Infof("router.Message ENTER\n") + + if r.debugWebsocket { + klog.V(5).Infof("Raw Message:\n%s\n", string(byMsg)) + } + + var mt interfaces.MessageType + if err := json.Unmarshal(byMsg, &mt); err != nil { + klog.V(1).Infof("json.Unmarshal(MessageType) failed. Err: %v\n", err) + klog.V(6).Infof("router.Message LEAVE\n") + return err + } + + var err error + switch mt.Type { + case interfaces.TypeFlushedResponse: + err = r.processFlushed(byMsg) + case interfaces.TypeMetadataResponse: + err = r.processMetadata(byMsg) + case interfaces.TypeWarningResponse: + err = r.processWarningResponse(byMsg) + case interfaces.TypeErrorResponse: + err = r.processErrorResponse(byMsg) + default: + err = r.UnhandledMessage(byMsg) + klog.V(1).Infof("Message type %s is unhandled\n", mt.Type) + } + + if err == nil { + klog.V(6).Infof("MessageType(%s) after - Result: succeeded\n", mt.Type) + } else { + klog.V(5).Infof("MessageType(%s) after - Result: %v\n", mt.Type, err) + } + klog.V(6).Infof("router.Message LEAVE\n") + return err +} + +// Binary handles binary messages +func (r *MessageRouter) Binary(byMsg []byte) error { + klog.V(6).Infof("router.Binary ENTER\n") + + err := r.callback.Binary(byMsg) + if err != nil { + klog.V(1).Infof("callback.Binary failed. Err: %v\n", err) + } else { + klog.V(5).Infof("callback.Binary succeeded\n") + } + + klog.V(6).Infof("router.Binary LEAVE\n") + return err +} + +// UnhandledMessage logs and handles any unexpected message types +func (r *MessageRouter) UnhandledMessage(byMsg []byte) error { + klog.V(6).Infof("router.UnhandledMessage ENTER\n") + r.printDebugMessages(3, "UnhandledMessage", byMsg) + klog.V(1).Infof("Unknown Event was received\n") + klog.V(6).Infof("router.UnhandledMessage LEAVE\n") + return ErrInvalidMessageType +} + +// printDebugMessages formats and logs debugging messages +func (r *MessageRouter) printDebugMessages(level klog.Level, function string, byMsg []byte) { + prettyJSON, err := prettyjson.Format(byMsg) + if err != nil { + klog.V(1).Infof("prettyjson.Format failed. Err: %v\n", err) + return + } + klog.V(level).Infof("\n\n-----------------------------------------------\n") + klog.V(level).Infof("%s RAW:\n%s\n", function, prettyJSON) + klog.V(level).Infof("-----------------------------------------------\n\n\n") +} diff --git a/pkg/client/interfaces/v1/options.go b/pkg/client/interfaces/v1/options.go index efaee632..a727128d 100644 --- a/pkg/client/interfaces/v1/options.go +++ b/pkg/client/interfaces/v1/options.go @@ -52,7 +52,7 @@ func (o *ClientOptions) Parse() error { // prerecorded // currently nothing - // websocket + // speech-to-text websocket if v := os.Getenv("DEEPGRAM_WEBSOCKET_REDIRECT"); v != "" { klog.V(3).Infof("DEEPGRAM_WEBSOCKET_REDIRECT found") o.RedirectService = strings.EqualFold(strings.ToLower(v), "true") @@ -63,22 +63,39 @@ func (o *ClientOptions) Parse() error { } // these require inspecting messages, therefore you must update the InspectMessage() method - if v := os.Getenv("DEEPGRAM_WEBSOCKET_AUTO_FLUSH"); v != "" { - klog.V(3).Infof("DEEPGRAM_WEBSOCKET_AUTO_FLUSH found") + if v := os.Getenv("DEEPGRAM_WEBSOCKET_REPLY_AUTO_FLUSH"); v != "" { + klog.V(3).Infof("DEEPGRAM_WEBSOCKET_REPLY_AUTO_FLUSH found") i, err := strconv.ParseInt(v, 10, 64) if err == nil { - klog.V(3).Infof("DEEPGRAM_WEBSOCKET_AUTO_FLUSH set to %d", i) + klog.V(3).Infof("DEEPGRAM_WEBSOCKET_REPLY_AUTO_FLUSH set to %d", i) o.AutoFlushReplyDelta = i } } + // text-to-speech websocket + // these require inspecting messages, therefore you must update the InspectMessage() method + if v := os.Getenv("DEEPGRAM_WEBSOCKET_SPEAK_AUTO_FLUSH"); v != "" { + klog.V(3).Infof("DEEPGRAM_WEBSOCKET_SPEAK_AUTO_FLUSH found") + i, err := strconv.ParseInt(v, 10, 64) + if err == nil { + klog.V(3).Infof("DEEPGRAM_WEBSOCKET_SPEAK_AUTO_FLUSH set to %d", i) + o.AutoFlushSpeakDelta = i + } + } + return nil } -func (o *ClientOptions) InspectMessage() bool { +// InspectListenMessage returns true if the Listen message should be inspected +func (o *ClientOptions) InspectListenMessage() bool { return o.AutoFlushReplyDelta != 0 } +// InspectSpeakMessage returns true if the Speak message should be inspected +func (o *ClientOptions) InspectSpeakMessage() bool { + return o.AutoFlushSpeakDelta != 0 +} + func (o *PreRecordedTranscriptionOptions) Check() error { // checks // currently no op diff --git a/pkg/client/interfaces/v1/types-client.go b/pkg/client/interfaces/v1/types-client.go index 678ea79a..33029fe5 100644 --- a/pkg/client/interfaces/v1/types-client.go +++ b/pkg/client/interfaces/v1/types-client.go @@ -17,10 +17,11 @@ type ClientOptions struct { // prerecorded client options - // live client options - RedirectService bool // allows HTTP redirects to be followed - EnableKeepAlive bool // enables the keep alive feature + // speech-to-text client options + RedirectService bool // allows HTTP redirects to be followed + EnableKeepAlive bool // enables the keep alive feature + AutoFlushReplyDelta int64 // enables the auto flush feature based on the delta in milliseconds - // these require inspecting messages, therefore you must update the InspectMessage() method - AutoFlushReplyDelta int64 // enables the auto flush feature + // text-to-speech client options + AutoFlushSpeakDelta int64 // enables the auto flush feature based on the delta in milliseconds } diff --git a/pkg/client/listen/client.go b/pkg/client/listen/client.go index ccbc7c1d..679dbf23 100644 --- a/pkg/client/listen/client.go +++ b/pkg/client/listen/client.go @@ -83,9 +83,12 @@ NewWebSocketForDemo creates a new websocket connection with all default options Notes: - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY */ -func NewWebSocketForDemo(ctx context.Context, options *interfaces.LiveTranscriptionOptions) (*listenv1ws.Client, error) { +func NewWSForDemo(ctx context.Context, options *interfaces.LiveTranscriptionOptions) (*listenv1ws.Client, error) { return listenv1ws.New(ctx, "", &interfaces.ClientOptions{}, options, nil) } +func NewWebSocketForDemo(ctx context.Context, options *interfaces.LiveTranscriptionOptions) (*listenv1ws.Client, error) { + return NewWSForDemo(ctx, options) +} // NewLiveForDemo is an alias for NewWebSocketForDemo // @@ -104,9 +107,12 @@ Notes: - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY - The callback handler is set to the default handler which just prints all messages to the console */ -func NewWebSocketWithDefaults(ctx context.Context, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { +func NewWSWithDefaults(ctx context.Context, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { return listenv1ws.New(ctx, "", &interfaces.ClientOptions{}, tOptions, callback) } +func NewWebSocketWithDefaults(ctx context.Context, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { + return NewWSWithDefaults(ctx, tOptions, callback) +} // NewLiveWithDefaults is an alias for NewWebSocketWithDefaults // @@ -127,10 +133,13 @@ Notes: - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY - The callback handler is set to the default handler which just prints all messages to the console */ -func NewWebSocket(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { +func NewWS(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { ctx, ctxCancel := context.WithCancel(ctx) return listenv1ws.NewWithCancel(ctx, ctxCancel, apiKey, cOptions, tOptions, callback) } +func NewWebSocket(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { + return NewWS(ctx, apiKey, cOptions, tOptions, callback) +} // NewLive is an alias for NewWebSocket // @@ -152,9 +161,12 @@ Notes: - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY - The callback handler is set to the default handler which just prints all messages to the console */ -func NewWebSocketWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { +func NewWSWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { return listenv1ws.NewWithCancel(ctx, ctxCancel, apiKey, cOptions, tOptions, callback) } +func NewWebSocketWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { + return NewWSWithCancel(ctx, ctxCancel, apiKey, cOptions, tOptions, callback) +} // NewLiveWithCancel is an alias for NewWebSocketWithCancel // diff --git a/pkg/client/listen/v1/websocket/client.go b/pkg/client/listen/v1/websocket/client.go index aeed52b7..01001944 100644 --- a/pkg/client/listen/v1/websocket/client.go +++ b/pkg/client/listen/v1/websocket/client.go @@ -420,7 +420,7 @@ func (c *Client) listen() { } // inspect the message - if c.cOptions.InspectMessage() { + if c.cOptions.InspectListenMessage() { err := c.inspect(byMsg) if err != nil { klog.V(1).Infof("listen: inspect failed. Err: %v\n", err) diff --git a/pkg/client/speak/client.go b/pkg/client/speak/client.go index b87d5308..2020f726 100644 --- a/pkg/client/speak/client.go +++ b/pkg/client/speak/client.go @@ -8,10 +8,12 @@ This package provides the speak client implementation for the Deepgram API package speak import ( - // msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" + "context" + + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces/v1" speakv1rest "github.com/deepgram/deepgram-go-sdk/pkg/client/speak/v1/rest" - // speakv1ws "github.com/deepgram/deepgram-go-sdk/pkg/client/speak/v1/websocket" + speakv1ws "github.com/deepgram/deepgram-go-sdk/pkg/client/speak/v1/websocket" ) /***********************************/ @@ -77,68 +79,68 @@ func NewREST(apiKey string, options *interfaces.ClientOptions) *speakv1rest.Clie return speakv1rest.New(apiKey, options) } -// /***********************************/ -// // WebSocket Client -// /***********************************/ -// const ( -// WebSocketPackageVersion = speakv1ws.PackageVersion -// ) - -// type WebSocketClient = speakv1ws.Client - -// /* -// NewWebSocketForDemo creates a new websocket connection with all default options - -// Notes: -// - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY -// */ -// func NewWebSocketForDemo(ctx context.Context, options *interfaces.SpeakOptions) (*speakv1ws.Client, error) { -// return speakv1ws.NewWebSocketForDemo(ctx, options) -// } - -// /* -// NewStreamWithDefaults creates a new websocket connection with all default options - -// Notes: -// - The callback handler is set to the default handler -// */ -// func NewWebSocketWithDefaults(ctx context.Context, options *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { -// return speakv1ws.NewWebSocketWithDefaults(ctx, options, callback) -// } - -// /* -// NewStream creates a new websocket connection with the specified options - -// Input parameters: -// - ctx: context.Context object -// - apiKey: string containing the Deepgram API key -// - cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. -// - sOptions: SpeakOptions which allows overriding things like model, etc. -// - callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages - -// Notes: -// - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY -// - The callback handler is set to the default handler -// */ -// func NewWebSocket(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { -// return speakv1ws.NewWebSocket(ctx, apiKey, cOptions, sOptions, callback) -// } - -// /* -// NewWebSocketWithCancel creates a new websocket connection but has facilities to BYOC (Bring Your Own Cancel) - -// Input parameters: -// - ctx: context.Context object -// - ctxCancel: allow passing in own cancel -// - apiKey: string containing the Deepgram API key -// - cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. -// - sOptions: SpeakOptions which allows overriding things like model, etc. -// - callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages - -// Notes: -// - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY -// - The callback handler is set to the default handler -// */ -// func NewWebSocketWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { -// return speakv1ws.NewWebSocketWithCancel(ctx, ctxCancel, apiKey, cOptions, sOptions, callback) -// } +/***********************************/ +// WebSocket Client +/***********************************/ +const ( + WebSocketPackageVersion = speakv1ws.PackageVersion +) + +type WebSocketClient = speakv1ws.Client + +/* +NewWSForDemo creates a new websocket connection with all default options + +Notes: + - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY +*/ +func NewWSForDemo(ctx context.Context, options *interfaces.SpeakOptions) (*speakv1ws.Client, error) { + return speakv1ws.NewForDemo(ctx, options) +} + +/* +NewWSWithDefaults creates a new websocket connection with all default options + +Notes: + - The callback handler is set to the default handler +*/ +func NewWSWithDefaults(ctx context.Context, options *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { + return speakv1ws.NewWithDefaults(ctx, options, callback) +} + +/* +NewWS creates a new websocket connection with the specified options + +Input parameters: +- ctx: context.Context object +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- sOptions: SpeakOptions which allows overriding things like model, etc. +- callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages + +Notes: + - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY + - The callback handler is set to the default handler +*/ +func NewWS(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { + return speakv1ws.New(ctx, apiKey, cOptions, sOptions, callback) +} + +/* +NewWSWithCancel creates a new websocket connection but has facilities to BYOC (Bring Your Own Cancel) + +Input parameters: +- ctx: context.Context object +- ctxCancel: allow passing in own cancel +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- sOptions: SpeakOptions which allows overriding things like model, etc. +- callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages + +Notes: + - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY + - The callback handler is set to the default handler +*/ +func NewWSWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*speakv1ws.Client, error) { + return speakv1ws.NewWithCancel(ctx, ctxCancel, apiKey, cOptions, sOptions, callback) +} diff --git a/pkg/client/speak/v1/websocket/client.go b/pkg/client/speak/v1/websocket/client.go new file mode 100644 index 00000000..96debdad --- /dev/null +++ b/pkg/client/speak/v1/websocket/client.go @@ -0,0 +1,899 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +// This package provides the speak/streaming client implementation for the Deepgram API +package websocketv1 + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "regexp" + "strings" + "time" + + "github.com/dvonthenen/websocket" + klog "k8s.io/klog/v2" + + speak "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket" + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" + version "github.com/deepgram/deepgram-go-sdk/pkg/api/version" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces/v1" +) + +type controlMessage struct { + Type string `json:"type"` +} +type TextSource struct { + Type string `json:"type"` + Text string `json:"text"` +} + +/* +NewForDemo creates a new websocket connection with all default options + +Notes: + - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY +*/ +func NewForDemo(ctx context.Context, options *interfaces.SpeakOptions) (*Client, error) { + return New(ctx, "", &interfaces.ClientOptions{}, options, nil) +} + +/* +NewWithDefaults creates a new websocket connection with all default options + +Notes: + - The callback handler is set to the default handler +*/ +func NewWithDefaults(ctx context.Context, options *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*Client, error) { + return New(ctx, "", &interfaces.ClientOptions{}, options, callback) +} + +/* +New creates a new websocket connection with the specified options + +Input parameters: +- ctx: context.Context object +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- sOptions: SpeakOptions which allows overriding things like model, etc. +- callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages + +Notes: + - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY + - The callback handler is set to the default handler +*/ +func New(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*Client, error) { + ctx, ctxCancel := context.WithCancel(ctx) + return NewWithCancel(ctx, ctxCancel, apiKey, cOptions, sOptions, callback) +} + +/* +NewWithCancel creates a new websocket connection with the specified options + +Input parameters: +- ctx: context.Context object +- ctxCancel: allow passing in own cancel +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- sOptions: SpeakOptions which allows overriding things like model, etc. +- callback: SpeakMessageCallback is a callback which lets you perform actions based on platform messages + +Notes: + - If apiKey is an empty string, the Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY + - The callback handler is set to the default handler +*/ +func NewWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, sOptions *interfaces.SpeakOptions, callback msginterfaces.SpeakMessageCallback) (*Client, error) { + klog.V(6).Infof("speak.New() ENTER\n") + + if apiKey != "" { + cOptions.APIKey = apiKey + } + err := cOptions.Parse() + if err != nil { + klog.V(1).Infof("ClientOptions.Parse() failed. Err: %v\n", err) + return nil, err + } + err = sOptions.Check() + if err != nil { + klog.V(1).Infof("SpeakOptions.Check() failed. Err: %v\n", err) + return nil, err + } + + if callback == nil { + klog.V(2).Infof("Using DefaultCallbackHandler.\n") + callback = speak.NewDefaultCallbackHandler() + } + + // init + conn := Client{ + cOptions: cOptions, + sOptions: sOptions, + sendBuf: make(chan []byte, 1), + callback: callback, + router: speak.NewStream(callback), + ctx: ctx, + ctxCancel: ctxCancel, + retry: true, + } + + klog.V(3).Infof("NewDeepGramWSClient Succeeded\n") + klog.V(6).Infof("speak.New() LEAVE\n") + + return &conn, nil +} + +// Connect performs a websocket connection with "DefaultConnectRetry" number of retries. +func (c *Client) Connect() bool { + // set the retry count + if c.retryCnt == 0 { + c.retryCnt = DefaultConnectRetry + } + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), true) != nil +} + +// ConnectWithCancel performs a websocket connection with specified number of retries and providing a +// cancel function to stop the connection +func (c *Client) ConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) bool { + return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt, true) != nil +} + +// AttemptReconnect performs a reconnect after failing retries +func (c *Client) AttemptReconnect(ctx context.Context, retries int64) bool { + c.retry = true + c.ctx, c.ctxCancel = context.WithCancel(ctx) + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries), true) != nil +} + +// AttemptReconnect performs a reconnect after failing retries and providing a cancel function +func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool { + c.retry = true + return c.internalConnectWithCancel(ctx, ctxCancel, int(retries), true) != nil +} + +func (c *Client) internalConnect() *websocket.Conn { + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), false) +} + +//nolint:funlen // this is a complex function. keep as is +func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn { + klog.V(7).Infof("speak.internalConnectWithCancel() ENTER\n") + + // set the context + c.ctx = ctx + c.ctxCancel = ctxCancel + c.retryCnt = int64(retryCnt) + + // we explicitly stopped and should not attempt to reconnect + if !c.retry { + klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.") + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + return nil + } + + // lock conn access + if lock { + klog.V(3).Infof("Locking connection mutex\n") + c.muConn.Lock() + defer c.muConn.Unlock() + } + + // if the connection is good, return it otherwise, attempt reconnect + if c.wsconn != nil { + select { + case <-c.ctx.Done(): + klog.V(1).Infof("Connection is not valid\n") + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + return nil + default: + klog.V(7).Infof("Connection is good. Return object.") + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + return c.wsconn + } + } else { + select { + case <-c.ctx.Done(): + klog.V(1).Infof("Context is not valid. Has been canceled.\n") + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + return nil + default: + klog.V(3).Infof("Context is still valid. Retry...\n") + } + } + + dialer := websocket.Dialer{ + HandshakeTimeout: 45 * time.Second, + /* #nosec G402 */ + TLSClientConfig: &tls.Config{InsecureSkipVerify: c.cOptions.SkipServerAuth}, + RedirectService: c.cOptions.RedirectService, + SkipServerAuth: c.cOptions.SkipServerAuth, + } + + // set websocket headers + myHeader := http.Header{} + + // restore application options to HTTP header + if headers, ok := c.ctx.Value(interfaces.HeadersContext{}).(http.Header); ok { + for k, v := range headers { + for _, v := range v { + klog.V(3).Infof("internalConnectWithCancel RESTORE Header: %s = %s\n", k, v) + myHeader.Add(k, v) + } + } + } + + // sets the API key + myHeader.Set("Host", c.cOptions.Host) + myHeader.Set("Authorization", "token "+c.cOptions.APIKey) + myHeader.Set("User-Agent", interfaces.DgAgent) + + // attempt to establish connection + i := int64(0) + for { + if i >= c.retryCnt { + klog.V(3).Infof("Connect timeout... exiting!\n") + c.retry = false + break + } + + // delay on subsequent calls + if i > 0 { + klog.V(2).Infof("Sleep for retry #%d...\n", i) + time.Sleep(time.Second * time.Duration(defaultDelayBetweenRetry)) + } + + i++ + + // create new connection + url, err := version.GetSpeakStreamAPI(c.ctx, c.cOptions.Host, c.cOptions.APIVersion, c.cOptions.Path, c.sOptions) + if err != nil { + klog.V(1).Infof("version.GetSpeakAPI failed. Err: %v\n", err) + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + return nil // no point in retrying because this is going to fail on every retry + } + klog.V(5).Infof("Connecting to %s\n", url) + + // perform the websocket connection + ws, res, err := dialer.DialContext(c.ctx, url, myHeader) + if res != nil { + klog.V(3).Infof("HTTP Response: %s\n", res.Status) + res.Body.Close() + } + if err != nil { + klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host) + klog.V(1).Infof("Dialer failed. Err: %v\n", err) + continue + } + + // set the object to allow threads to function + c.wsconn = ws + c.retry = true + + // kick off threads to listen for messages + go c.listen() + + if c.cOptions.AutoFlushSpeakDelta != 0 { + go c.flush() + } + + // fire off open connection + err = c.router.OpenHelper(&msginterfaces.OpenResponse{ + Type: msginterfaces.TypeOpenResponse, + }) + if err != nil { + klog.V(1).Infof("router.OpenHelper failed. Err: %v\n", err) + } + + klog.V(3).Infof("WebSocket Connection Successful!") + klog.V(7).Infof("speak.internalConnectWithCancel() LEAVE\n") + + return c.wsconn + } + + // if we get here, we failed to connect + klog.V(1).Infof("Failed to connect to websocket: %s\n", c.cOptions.Host) + klog.V(7).Infof("speak.ConnectWithRetry() LEAVE\n") + + return nil +} + +//nolint:funlen,gocyclo // this is a complex function. keep as is +func (c *Client) listen() { + klog.V(6).Infof("speak.listen() ENTER\n") + + defer func() { + if r := recover(); r != nil { + klog.V(1).Infof("Panic triggered\n") + + // send error on callback + err := ErrFatalPanicRecovered + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr) + } + + // fatal close + c.closeWs(true) + + klog.V(6).Infof("speak.flush() LEAVE\n") + return + } + }() + + for { + // doing a read, need to lock + c.muConn.Lock() + + // get the connection + ws := c.internalConnect() + if ws == nil { + // release + c.muConn.Unlock() + + klog.V(3).Infof("listen: Connection is not valid\n") + klog.V(6).Infof("speak.listen() LEAVE\n") + return + } + + // release the lock + c.muConn.Unlock() + + // msgType can be binary or text + msgType, byMsg, err := ws.ReadMessage() + + if err != nil { + errStr := err.Error() + switch { + case strings.Contains(errStr, SuccessfulSocketErr): + klog.V(3).Infof("Graceful websocket close\n") + + // graceful close + c.closeWs(false) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + case strings.Contains(errStr, UseOfClosedSocket): + klog.V(3).Infof("Probable graceful websocket close: %v\n", err) + + // fatal close + c.closeWs(false) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + case strings.Contains(errStr, FatalReadSocketErr): + klog.V(1).Infof("Fatal socket error: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("speak.listen(): Fatal socket error. Err: %v\n", sendErr) + } + + // fatal close + c.closeWs(true) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + case strings.Contains(errStr, "Deepgram"): + klog.V(1).Infof("speak.listen(): Deepgram error. Err: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("speak.listen(): Deepgram ErrorMsg. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(false) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry: + klog.V(3).Infof("Client object EOF\n") + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("speak.listen(): EOF error. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(true) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + default: + klog.V(1).Infof("speak.listen(): Cannot read websocket message. Err: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("speak.listen(): EOF error. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(true) + + klog.V(6).Infof("speak.listen() LEAVE\n") + return + } + } + + if len(byMsg) == 0 { + klog.V(7).Infof("listen(): message empty") + continue + } + + switch msgType { + case websocket.TextMessage: + // inspect the message + if c.cOptions.InspectSpeakMessage() { + err := c.inspect(byMsg) + if err != nil { + klog.V(1).Infof("speak: inspect failed. Err: %v\n", err) + } + } + + // route the message + err := c.router.Message(byMsg) + if err != nil { + klog.V(1).Infof("speak.listen(): router.Message failed. Err: %v\n", err) + } + case websocket.BinaryMessage: + // audio data! + err := c.router.Binary(byMsg) + if err != nil { + klog.V(1).Infof("speak.listen(): router.Message failed. Err: %v\n", err) + } + default: + klog.V(7).Infof("speak.listen(): msg recv: type %d, len: %d\n", msgType, len(byMsg)) + } + } +} + +// SpeakWithText writes text to the websocket server to obtain corresponding audio +// +// This function will automatically wrap the text in the appropriate JSON structure +// and send it to the server +// +// Args: +// +// text: string containing the text to be spoken +// +// Return: +// +// error: if successful, returns nil otherwise an error object +func (c *Client) SpeakWithText(text string) error { + klog.V(6).Infof("speak.SpeakText() ENTER\n") + klog.V(4).Infof("text: %s\n", text) + + err := c.WriteJSON(TextSource{ + Type: MessageTypeSpeak, + Text: text, + }) + if err == nil { + klog.V(4).Infof("SpeakText Succeeded\n") + } else { + klog.V(1).Infof("SpeakText failed. Err: %v\n", err) + } + + klog.V(6).Infof("speak.SpeakText() LEAVE\n") + + return err +} + +// Speak is an alias function for SpeakWithText +func (c *Client) Speak(text string) error { + return c.SpeakWithText(text) +} + +// // SpeakWithStream writes binary data to the websocket server +// // NOTE: This is unimplemented on the server side +// func (c *Client) SpeakWithStream(byData []byte) error { +// klog.V(6).Infof("speak.SpeakText() ENTER\n") + +// err := c.WriteBinary(byData) +// if err == nil { +// klog.V(4).Infof("SpeakText Succeeded\n") +// } else { +// klog.V(1).Infof("SpeakText failed. Err: %v\n", err) +// } + +// klog.V(6).Infof("speak.SpeakText() LEAVE\n") + +// return err +// } + +// // WriteBinary writes binary data to the websocket server +// // NOTE: This is unimplemented on the server side +// func (c *Client) WriteBinary(byData []byte) error { +// klog.V(6).Infof("speak.WriteBinary() ENTER\n") + +// // doing a write, need to lock +// c.muConn.Lock() +// defer c.muConn.Unlock() + +// // get the connection +// ws := c.internalConnect() +// if ws == nil { +// err := ErrInvalidConnection +// klog.V(1).Infof("c.Connect() is nil. Err: %v\n", err) +// klog.V(6).Infof("speak.WriteBinary() LEAVE\n") + +// return err +// } + +// if err := ws.WriteMessage( +// websocket.BinaryMessage, +// byData, +// ); err != nil { +// klog.V(1).Infof("WriteBinary WriteMessage failed. Err: %v\n", err) +// klog.V(6).Infof("speak.WriteBinary() LEAVE\n") +// return err +// } + +// klog.V(6).Infof("WriteBinary Successful\n") +// klog.V(7).Infof("payload: %x\n", byData) +// klog.V(6).Infof("speak.WriteBinary() LEAVE\n") + +// return nil +// } + +// WriteJSON writes a JSON control payload to the websocket server. In using this function, +// use must provide the payload in JSON-format. These are control messages are for +// managing the text-to-speech session on the Deepgram server. +// +// Args: +// +// payload: interface{} containing the JSON payload +// +// Return: +// +// error: if successful, returns nil otherwise an error object +func (c *Client) WriteJSON(payload interface{}) error { + klog.V(6).Infof("speak.WriteJSON() ENTER\n") + + byData, err := json.Marshal(payload) + if err != nil { + klog.V(1).Infof("WriteJSON: Error marshaling JSON. Data: %v, Err: %v\n", payload, err) + klog.V(6).Infof("speak.WriteJSON() LEAVE\n") + return err + } + + if c.cOptions.AutoFlushSpeakDelta > 0 { + var mt MessageType + if err := json.Unmarshal(byData, &mt); err == nil { + switch mt.Type { + case MessageTypeSpeak: + // last datagram received + c.muFinal.Lock() + now := time.Now() + klog.V(6).Infof("Speak Text Sent at: %s\n", now.String()) + c.lastDatagram = &now + c.muFinal.Unlock() + case MessageTypeFlush: + // increment the flush count + c.muFinal.Lock() + c.flushCount++ + c.lastDatagram = nil + klog.V(5).Infof("Increment Flush Count: %d\n", c.flushCount) + c.muFinal.Unlock() + } + } + } + + // doing a write, need to lock + c.muConn.Lock() + defer c.muConn.Unlock() + + // doing a write, need to lock + ws := c.internalConnect() + if ws == nil { + err := ErrInvalidConnection + klog.V(1).Infof("c.internalConnect() is nil. Err: %v\n", err) + klog.V(6).Infof("speak.WriteJSON() LEAVE\n") + + return err + } + if err := ws.WriteMessage( + websocket.TextMessage, + byData, + ); err != nil { + klog.V(1).Infof("WriteJSON WriteMessage failed. Err: %v\n", err) + klog.V(6).Infof("speak.WriteJSON() LEAVE\n") + return err + } + + klog.V(4).Infof("WriteJSON succeeded.\n") + klog.V(7).Infof("payload: %s\n", string(byData)) + klog.V(6).Infof("speak.WriteJSON() LEAVE\n") + + return nil +} + +// Flush will instruct the server to flush the current text buffer +func (c *Client) Flush() error { + klog.V(6).Infof("speak.Flush() ENTER\n") + + err := c.WriteJSON(controlMessage{Type: MessageTypeFlush}) + if err != nil { + klog.V(1).Infof("Flush failed. Err: %v\n", err) + klog.V(6).Infof("speak.Flush() LEAVE\n") + + return err + } + + klog.V(4).Infof("Flush Succeeded\n") + klog.V(6).Infof("speak.Flush() LEAVE\n") + + return err +} + +// Reset will instruct the server to reset the current buffer +func (c *Client) Reset() error { + klog.V(6).Infof("speak.Reset() ENTER\n") + + err := c.WriteJSON(controlMessage{Type: MessageTypeReset}) + if err != nil { + klog.V(1).Infof("Reset failed. Err: %v\n", err) + klog.V(6).Infof("speak.Reset() LEAVE\n") + + return err + } + + klog.V(4).Infof("Reset Succeeded\n") + klog.V(6).Infof("speak.Reset() LEAVE\n") + return nil +} + +// closeStream sends an application level message to Deepgram +func (c *Client) closeStream(lock bool) error { + klog.V(6).Infof("speak.closeStream() ENTER\n") + + // doing a write, need to lock + if lock { + c.muConn.Lock() + defer c.muConn.Unlock() + } + + err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Close\" }")) + if err != nil { + klog.V(1).Infof("WriteMessage failed. Err: %v\n", err) + klog.V(6).Infof("speak.closeStream() LEAVE\n") + + return err + } + + klog.V(4).Infof("closeStream Succeeded\n") + klog.V(6).Infof("speak.closeStream() LEAVE\n") + + return err +} + +// normalClosure sends a normal closure message to the server +func (c *Client) normalClosure(lock bool) error { + klog.V(6).Infof("speak.normalClosure() ENTER\n") + + // doing a write, need to lock + if lock { + c.muConn.Lock() + defer c.muConn.Unlock() + } + + ws := c.internalConnect() + if ws == nil { + err := ErrInvalidConnection + klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err) + klog.V(6).Infof("speak.normalClosure() LEAVE\n") + + return err + } + + err := c.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + switch err { + case websocket.ErrCloseSent: + klog.V(3).Infof("ErrCloseSent was sent. Err: %v\n", err) + case nil: + klog.V(4).Infof("normalClosure Succeeded\n") + default: + klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", err) + } + + klog.V(6).Infof("speak.normalClosure() LEAVE\n") + + return err +} + +// Stop will send close message and shutdown websocket connection +func (c *Client) Stop() { + klog.V(3).Infof("Stopping...\n") + c.retry = false + + // exit gracefully + c.ctxCancel() + c.closeWs(false) +} + +// closeWs closes the websocket connection +func (c *Client) closeWs(fatal bool) { + klog.V(6).Infof("speak.closeWs() closing channels...\n") + + // doing a write, need to lock + c.muConn.Lock() + defer c.muConn.Unlock() + + if c.wsconn != nil && !fatal { + // calling this even though it's apart of the TTS WS protocol, causes a websocket: close 1005 (no status) + // // deepgram requires a close message to be sent + // _ = c.closeStream(false) + // time.Sleep(TerminationSleep) // allow time for server to register closure + + // websocket protocol message + _ = c.normalClosure(false) + time.Sleep(TerminationSleep) // allow time for server to register closure + } + + if fatal || c.wsconn != nil { + // fire off close connection + err := c.router.CloseHelper(&msginterfaces.CloseResponse{ + Type: msginterfaces.TypeCloseResponse, + }) + if err != nil { + klog.V(1).Infof("router.CloseHelper failed. Err: %v\n", err) + } + } + + // close the connection + if c.wsconn != nil { + c.wsconn.Close() + c.wsconn = nil + } + + klog.V(4).Infof("speak.closeWs() Succeeded\n") + klog.V(6).Infof("speak.closeWs() LEAVE\n") +} + +// flush thread +func (c *Client) flush() { + klog.V(6).Infof("speak.flush() ENTER\n") + + defer func() { + if r := recover(); r != nil { + klog.V(1).Infof("Panic triggered\n") + + // send error on callback + err := ErrFatalPanicRecovered + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("speak: Fatal socket error. Err: %v\n", sendErr) + } + + // fatal close + c.closeWs(true) + + klog.V(6).Infof("speak.flush() LEAVE\n") + return + } + }() + + ticker := time.NewTicker(flushPeriod) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + klog.V(3).Infof("speak.flush() Exiting\n") + + // exit gracefully + c.closeWs(false) + + klog.V(6).Infof("speak.flush() LEAVE\n") + return + case <-ticker.C: + // doing a read, need to lock. + c.muFinal.Lock() + + // have we received anything? no, then skip + if c.lastDatagram == nil { + klog.V(7).Infof("No datagram received. Skipping...\n") + c.muFinal.Unlock() + continue + } + + // we have received something, but is it recent? + trigger := c.lastDatagram.Add(time.Millisecond * time.Duration(c.cOptions.AutoFlushSpeakDelta)) + now := time.Now() + klog.V(6).Infof("Time (Last): %s\n", trigger.String()) + klog.V(6).Infof("Time (Now ): %s\n", now.String()) + bNeedFlush := trigger.Before(now) + if bNeedFlush { + c.lastDatagram = nil + } + + // release + c.muFinal.Unlock() + + if bNeedFlush { + klog.V(5).Infof("Sending Flush message...\n") + err := c.Flush() + if err == nil { + klog.V(5).Infof("Flush sent!") + } else { + klog.V(1).Infof("Failed to send Flush. Err: %v\n", err) + } + } + } + } +} + +// sendError sends an error message to the callback handler +func (c *Client) sendError(err error) error { + response := c.errorToResponse(err) + sendErr := c.router.ErrorHelper(response) + if err != nil { + klog.V(1).Infof("speak.listen(): router.Error failed. Err: %v\n", sendErr) + } + + return err +} + +// errorToResponse converts an error into a Deepgram error response +func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse { + r := regexp.MustCompile(`websocket: ([a-z]+) (\d+) .+: (.+)`) + + var errorCode string + var errorNum string + var errorDesc string + + matches := r.FindStringSubmatch(err.Error()) + if len(matches) > 3 { + errorCode = matches[1] + errorNum = matches[2] + errorDesc = matches[3] + } else { + errorCode = UnknownDeepgramErr + errorNum = UnknownDeepgramErr + errorDesc = err.Error() + } + + response := &msginterfaces.ErrorResponse{ + Type: msginterfaces.TypeErrorResponse, + ErrMsg: strings.TrimSpace(fmt.Sprintf("%s %s", errorCode, errorNum)), + Description: strings.TrimSpace(errorDesc), + Variant: errorNum, + } + return response +} + +// inspect will check the message and determine the type to +// see if we should do actionable based on those types of messages +func (c *Client) inspect(byMsg []byte) error { + klog.V(7).Infof("speak.inspect() ENTER\n") + + var mt MessageType + if err := json.Unmarshal(byMsg, &mt); err != nil { + klog.V(1).Infof("json.Unmarshal(MessageType) failed. Err: %v\n", err) + klog.V(7).Infof("speak.inspect() LEAVE\n") + return err + } + + switch mt.Type { + case msginterfaces.TypeFlushedResponse: + klog.V(7).Infof("TypeFlushedResponse\n") + + // decrement the flush count + c.muFinal.Lock() + c.flushCount-- + klog.V(5).Infof("Flush Count: %d\n", c.flushCount) + c.muFinal.Unlock() + default: + klog.V(5).Infof("MessageType: %s\n", mt.Type) + } + + klog.V(7).Info("inspect() succeeded\n") + klog.V(7).Infof("speak.inspect() LEAVE\n") + return nil +} diff --git a/pkg/client/speak/v1/websocket/constants.go b/pkg/client/speak/v1/websocket/constants.go new file mode 100644 index 00000000..99945fce --- /dev/null +++ b/pkg/client/speak/v1/websocket/constants.go @@ -0,0 +1,64 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "errors" + "time" +) + +const ( + PackageVersion string = "v1.0" +) + +// external constants +const ( + DefaultConnectRetry int64 = 3 + + ChunkSize = 1024 * 2 + TerminationSleep = 100 * time.Millisecond + + // socket errors + FatalReadSocketErr string = "read: can't assign requested address" + FatalWriteSocketErr string = "write: broken pipe" + UseOfClosedSocket string = "use of closed network connection" + UnknownDeepgramErr string = "unknown deepgram error" + + // socket successful close error + SuccessfulSocketErr string = "close 1000" +) + +const ( + // MessageTypeFlush flushes the audio from the server + MessageTypeSpeak string = "Speak" + + // MessageTypeFlush flushes the audio from the server + MessageTypeFlush string = "Flush" + + // MessageTypeReset resets the text buffer + MessageTypeReset string = "Reset" + + // MessageTypeClose closes the stream + MessageTypeClose string = "Close" +) + +// errors +var ( + // ErrInvalidInput required input was not found + ErrInvalidInput = errors.New("required input was not found") + + // ErrInvalidConnection connection is not valid + ErrInvalidConnection = errors.New("connection is not valid") + + // ErrFatalPanicRecovered fatal panic recovered + ErrFatalPanicRecovered = errors.New("fatal panic - attempt to recover") +) + +// internal constants for retry, waits, back-off, etc. +const ( + flushPeriod = 500 * time.Millisecond + + defaultDelayBetweenRetry int64 = 2 +) diff --git a/pkg/client/speak/v1/websocket/types.go b/pkg/client/speak/v1/websocket/types.go new file mode 100644 index 00000000..60ab1921 --- /dev/null +++ b/pkg/client/speak/v1/websocket/types.go @@ -0,0 +1,45 @@ +// Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "context" + "sync" + "time" + + "github.com/dvonthenen/websocket" + + speak "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket" + msginterface "github.com/deepgram/deepgram-go-sdk/pkg/api/speak/v1/websocket/interfaces" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces/v1" +) + +// MessageType helper struct to determine the message type +type MessageType struct { + Type string `json:"type"` +} + +// Client is a struct representing the websocket client connection +type Client struct { + cOptions *interfaces.ClientOptions + sOptions *interfaces.SpeakOptions + + sendBuf chan []byte + ctx context.Context + ctxCancel context.CancelFunc + + muConn sync.RWMutex + wsconn *websocket.Conn + retry bool + retryCnt int64 + + callback msginterface.SpeakMessageCallback + router *speak.MessageRouter + + // internal constants for retry, waits, back-off, etc. + lastDatagram *time.Time + muFinal sync.RWMutex + flushCount int64 +}