diff --git a/cmd/legacy_main.go b/cmd/legacy_main.go index 7a228c5df8..35d1b35890 100644 --- a/cmd/legacy_main.go +++ b/cmd/legacy_main.go @@ -107,6 +107,7 @@ func getConfigForUserAgent(mountConfig *cfg.Config) string { return fmt.Sprintf("%s:%s:%s", isFileCacheEnabled, isFileCacheForRangeReadEnabled, isParallelDownloadsEnabled) } func createStorageHandle(newConfig *cfg.Config, userAgent string) (storageHandle storage.StorageHandle, err error) { + readStallRetryConfig := newConfig.GcsRetries.ReadStall storageClientConfig := storageutil.StorageClientConfig{ ClientProtocol: newConfig.GcsConnection.ClientProtocol, MaxConnsPerHost: int(newConfig.GcsConnection.MaxConnsPerHost), @@ -124,6 +125,12 @@ func createStorageHandle(newConfig *cfg.Config, userAgent string) (storageHandle ExperimentalEnableJsonRead: newConfig.GcsConnection.ExperimentalEnableJsonRead, GrpcConnPoolSize: int(newConfig.GcsConnection.GrpcConnPoolSize), EnableHNS: newConfig.EnableHns, + EnableReadStallRetry: readStallRetryConfig.Enable, + InitialReqTimeout: readStallRetryConfig.InitialReqTimeout, + MaxReqTimeout: readStallRetryConfig.MaxReqTimeout, + MinReqTimeout: readStallRetryConfig.MinReqTimeout, + ReqIncreaseRate: readStallRetryConfig.ReqIncreaseRate, + ReqTargetPercentile: readStallRetryConfig.ReqTargetPercentile, } logger.Infof("UserAgent = %s\n", storageClientConfig.UserAgent) storageHandle, err = storage.NewStorageHandle(context.Background(), storageClientConfig) diff --git a/internal/storage/storage_handle.go b/internal/storage/storage_handle.go index de49eec2fe..dd8a2f5b18 100644 --- a/internal/storage/storage_handle.go +++ b/internal/storage/storage_handle.go @@ -18,9 +18,11 @@ import ( "fmt" "net/http" "os" + "strconv" "cloud.google.com/go/storage" control "cloud.google.com/go/storage/control/apiv2" + "cloud.google.com/go/storage/experimental" "github.com/googleapis/gax-go/v2" "github.com/googlecloudplatform/gcsfuse/v2/cfg" "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" @@ -35,6 +37,12 @@ import ( _ "google.golang.org/grpc/xds/googledirectpath" ) +const ( + // Used to modify the hidden options in go-sdk for read stall retry. + dynamicReadReqIncreaseRateEnv = "DYNAMIC_READ_REQ_INCREASE_RATE" + dynamicReadReqInitialTimeoutEnv = "DYNAMIC_READ_REQ_INITIAL_TIMEOUT" +) + type StorageHandle interface { // In case of non-empty billingProject, this project is set as user-project for // all subsequent calls on the bucket. Calls with user-project will be billed @@ -144,6 +152,24 @@ func createHTTPClientHandle(ctx context.Context, clientConfig *storageutil.Stora clientOpts = append(clientOpts, option.WithEndpoint(clientConfig.CustomEndpoint)) } + if clientConfig.EnableReadStallRetry { + // Hidden way to modify the increase rate for dynamic delay algorithm in go-sdk. + // Ref: http://shortn/_417eTbNbKK + // Temporarily we kept an option to change the increase-rate, will be removed + // once we get a good default. + os.Setenv(dynamicReadReqIncreaseRateEnv, strconv.FormatFloat(clientConfig.ReqIncreaseRate, 'f', -1, 64)) + + // Hidden way to modify the initial-timeout of the dynamic delay algorithm in go-sdk. + // Ref: http://shortn/_xArUKvvGQZ + // Temporarily we kept an option to change the initial-timeout, will be removed + // once we get a good default. + os.Setenv(dynamicReadReqInitialTimeoutEnv, clientConfig.InitialReqTimeout.String()) + clientOpts = append(clientOpts, experimental.WithReadStallTimeout(&experimental.ReadStallTimeoutConfig{ + Min: clientConfig.MinReqTimeout, + TargetPercentile: clientConfig.ReqTargetPercentile, + })) + } + return storage.NewClient(ctx, clientOpts...) } diff --git a/internal/storage/storage_handle_test.go b/internal/storage/storage_handle_test.go index 2e31face6e..7e9bfde6e5 100644 --- a/internal/storage/storage_handle_test.go +++ b/internal/storage/storage_handle_test.go @@ -19,6 +19,7 @@ import ( "fmt" "net/url" "testing" + "time" "github.com/googlecloudplatform/gcsfuse/v2/cfg" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" @@ -253,6 +254,196 @@ func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_WithGRPCClientPro assert.Contains(testSuite.T(), err.Error(), fmt.Sprintf("client-protocol requested is not HTTP1 or HTTP2: %s", cfg.GRPC)) } +func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_WithReadStallRetryEnabled() { + sc := storageutil.GetDefaultStorageClientConfig() + sc.EnableReadStallRetry = true + + storageClient, err := createHTTPClientHandle(context.Background(), &sc) + + assert.NotNil(testSuite.T(), err) + assert.Nil(testSuite.T(), storageClient) +} + +func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_WithReadStallRetry() { + testCases := []struct { + name string + enableReadStallRetry bool + }{ + { + name: "ReadStallRetryEnabled", + enableReadStallRetry: true, + }, + { + name: "ReadStallRetryDisabled", + enableReadStallRetry: false, + }, + } + + for _, tc := range testCases { + testSuite.Run(tc.name, func() { + sc := storageutil.GetDefaultStorageClientConfig() + sc.EnableReadStallRetry = tc.enableReadStallRetry + + storageClient, err := createHTTPClientHandle(context.Background(), &sc) + + assert.Nil(testSuite.T(), err) + assert.NotNil(testSuite.T(), storageClient) + }) + } +} + +func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_InitialReqTimeout() { + testCases := []struct { + name string + initialReqTimeout time.Duration + }{ + { + name: "ShortTimeout", + initialReqTimeout: 1 * time.Millisecond, + }, + { + name: "LongTimeout", + initialReqTimeout: 10 * time.Second, + }, + } + + for _, tc := range testCases { + testSuite.Run(tc.name, func() { + sc := storageutil.GetDefaultStorageClientConfig() + sc.EnableReadStallRetry = true + sc.InitialReqTimeout = tc.initialReqTimeout + + storageClient, err := createHTTPClientHandle(context.Background(), &sc) + + assert.Nil(testSuite.T(), err) + assert.NotNil(testSuite.T(), storageClient) + }) + } +} + +func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_MinReqTimeout() { + testCases := []struct { + name string + initialReqTimeout time.Duration + }{ + { + name: "ShortTimeout", + initialReqTimeout: 1 * time.Millisecond, + }, + { + name: "LongTimeout", + initialReqTimeout: 10 * time.Second, + }, + } + + for _, tc := range testCases { + testSuite.Run(tc.name, func() { + sc := storageutil.GetDefaultStorageClientConfig() + sc.EnableReadStallRetry = true + sc.MinReqTimeout = tc.initialReqTimeout + + storageClient, err := createHTTPClientHandle(context.Background(), &sc) + + assert.Nil(testSuite.T(), err) + assert.NotNil(testSuite.T(), storageClient) + }) + } +} + +func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_ReqIncreaseRate() { + testCases := []struct { + name string + reqIncreaseRate float64 + expectErr bool + }{ + { + name: "NegativeRate", + reqIncreaseRate: -0.5, + expectErr: true, + }, + { + name: "ZeroRate", + reqIncreaseRate: 0.0, + expectErr: true, + }, + { + name: "PositiveRate", + reqIncreaseRate: 1.5, + expectErr: false, + }, + } + + for _, tc := range testCases { + testSuite.Run(tc.name, func() { + sc := storageutil.GetDefaultStorageClientConfig() + sc.EnableReadStallRetry = true + sc.ReqIncreaseRate = tc.reqIncreaseRate + + storageClient, err := createHTTPClientHandle(context.Background(), &sc) + + if tc.expectErr { + assert.NotNil(testSuite.T(), err) + assert.Nil(testSuite.T(), storageClient) + } else { + assert.Nil(testSuite.T(), err) + assert.NotNil(testSuite.T(), storageClient) + } + }) + } +} + +func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_TargetPercentile() { + testCases := []struct { + name string + reqTargetPercentile float64 + expectErr bool + }{ + { + name: "LowPercentile", + reqTargetPercentile: 0.25, // 25th percentile + expectErr: false, + }, + { + name: "MidPercentile", + reqTargetPercentile: 0.50, // 50th percentile + expectErr: false, + }, + { + name: "HighPercentile", + reqTargetPercentile: 0.90, // 90th percentile + expectErr: false, + }, + { + name: "InvalidPercentile-Low", + reqTargetPercentile: -0.5, // Invalid percentile + expectErr: true, + }, + { + name: "InvalidPercentile-High", + reqTargetPercentile: 1.5, // Invalid percentile + expectErr: true, + }, + } + + for _, tc := range testCases { + testSuite.Run(tc.name, func() { + sc := storageutil.GetDefaultStorageClientConfig() + sc.EnableReadStallRetry = true + sc.ReqTargetPercentile = tc.reqTargetPercentile + + storageClient, err := createHTTPClientHandle(context.Background(), &sc) + + if tc.expectErr { + assert.NotNil(testSuite.T(), err) + assert.Nil(testSuite.T(), storageClient) + } else { + assert.Nil(testSuite.T(), err) + assert.NotNil(testSuite.T(), storageClient) + } + }) + } +} + func (testSuite *StorageHandleTest) TestNewStorageHandleWithGRPCClientWithCustomEndpointNilAndAuthEnabled() { sc := storageutil.GetDefaultStorageClientConfig() sc.CustomEndpoint = "" diff --git a/internal/storage/storageutil/client.go b/internal/storage/storageutil/client.go index 3428e620af..5331d59f57 100644 --- a/internal/storage/storageutil/client.go +++ b/internal/storage/storageutil/client.go @@ -55,6 +55,14 @@ type StorageClientConfig struct { // Enabling new API flow for HNS bucket. EnableHNS bool + + // Parameters to define retry behavior for stalled read. + EnableReadStallRetry bool + InitialReqTimeout time.Duration + MaxReqTimeout time.Duration + MinReqTimeout time.Duration + ReqIncreaseRate float64 + ReqTargetPercentile float64 } func CreateHttpClient(storageClientConfig *StorageClientConfig) (httpClient *http.Client, err error) { diff --git a/internal/storage/storageutil/test_util.go b/internal/storage/storageutil/test_util.go index 4abb0a89b5..c89afc1d28 100644 --- a/internal/storage/storageutil/test_util.go +++ b/internal/storage/storageutil/test_util.go @@ -42,5 +42,11 @@ func GetDefaultStorageClientConfig() (clientConfig StorageClientConfig) { ExperimentalEnableJsonRead: false, AnonymousAccess: true, EnableHNS: false, + EnableReadStallRetry: false, + InitialReqTimeout: 20 * time.Second, + MaxReqTimeout: 120 * time.Hour, + MinReqTimeout: 500 * time.Millisecond, + ReqIncreaseRate: 15, + ReqTargetPercentile: 0.99, } }