Skip to content

Commit

Permalink
feat: support to retry for stalled request
Browse files Browse the repository at this point in the history
  • Loading branch information
raj-prince committed Oct 21, 2024
1 parent 8464802 commit 204ce41
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 0 deletions.
7 changes: 7 additions & 0 deletions cmd/legacy_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func getConfigForUserAgent(mountConfig *cfg.Config) string {
return fmt.Sprintf("%s:%s:%s", isFileCacheEnabled, isFileCacheForRangeReadEnabled, isParallelDownloadsEnabled)
}
func createStorageHandle(newConfig *cfg.Config, userAgent string) (storageHandle storage.StorageHandle, err error) {
readStallRetryConfig := newConfig.GcsRetries.ReadStall
storageClientConfig := storageutil.StorageClientConfig{
ClientProtocol: newConfig.GcsConnection.ClientProtocol,
MaxConnsPerHost: int(newConfig.GcsConnection.MaxConnsPerHost),
Expand All @@ -124,6 +125,12 @@ func createStorageHandle(newConfig *cfg.Config, userAgent string) (storageHandle
ExperimentalEnableJsonRead: newConfig.GcsConnection.ExperimentalEnableJsonRead,
GrpcConnPoolSize: int(newConfig.GcsConnection.GrpcConnPoolSize),
EnableHNS: newConfig.EnableHns,
EnableReadStallRetry: readStallRetryConfig.Enable,
InitialReqTimeout: readStallRetryConfig.InitialReqTimeout,
MaxReqTimeout: readStallRetryConfig.MaxReqTimeout,
MinReqTimeout: readStallRetryConfig.MinReqTimeout,
ReqIncreaseRate: readStallRetryConfig.ReqIncreaseRate,
ReqTargetPercentile: readStallRetryConfig.ReqTargetPercentile,
}
logger.Infof("UserAgent = %s\n", storageClientConfig.UserAgent)
storageHandle, err = storage.NewStorageHandle(context.Background(), storageClientConfig)
Expand Down
26 changes: 26 additions & 0 deletions internal/storage/storage_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"fmt"
"net/http"
"os"
"strconv"

"cloud.google.com/go/storage"
control "cloud.google.com/go/storage/control/apiv2"
"cloud.google.com/go/storage/experimental"
"github.com/googleapis/gax-go/v2"
"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
Expand All @@ -35,6 +37,12 @@ import (
_ "google.golang.org/grpc/xds/googledirectpath"
)

const (
// Used to modify the hidden options in go-sdk for read stall retry.
dynamicReadReqIncreaseRateEnv = "DYNAMIC_READ_REQ_INCREASE_RATE"
dynamicReadReqInitialTimeoutEnv = "DYNAMIC_READ_REQ_INITIAL_TIMEOUT"
)

type StorageHandle interface {
// In case of non-empty billingProject, this project is set as user-project for
// all subsequent calls on the bucket. Calls with user-project will be billed
Expand Down Expand Up @@ -144,6 +152,24 @@ func createHTTPClientHandle(ctx context.Context, clientConfig *storageutil.Stora
clientOpts = append(clientOpts, option.WithEndpoint(clientConfig.CustomEndpoint))
}

if clientConfig.EnableReadStallRetry {
// Hidden way to modify the increase rate for dynamic delay algorithm in go-sdk.
// Ref: http://shortn/_417eTbNbKK
// Temporarily we kept an option to change the increase-rate, will be removed
// once we get a good default.
os.Setenv(dynamicReadReqIncreaseRateEnv, strconv.FormatFloat(clientConfig.ReqIncreaseRate, 'f', -1, 64))

// Hidden way to modify the initial-timeout of the dynamic delay algorithm in go-sdk.
// Ref: http://shortn/_xArUKvvGQZ
// Temporarily we kept an option to change the initial-timeout, will be removed
// once we get a good default.
os.Setenv(dynamicReadReqInitialTimeoutEnv, clientConfig.InitialReqTimeout.String())
clientOpts = append(clientOpts, experimental.WithReadStallTimeout(&experimental.ReadStallTimeoutConfig{
Min: clientConfig.MinReqTimeout,
TargetPercentile: clientConfig.ReqTargetPercentile,
}))
}

return storage.NewClient(ctx, clientOpts...)
}

Expand Down
191 changes: 191 additions & 0 deletions internal/storage/storage_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/url"
"testing"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
Expand Down Expand Up @@ -253,6 +254,196 @@ func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_WithGRPCClientPro
assert.Contains(testSuite.T(), err.Error(), fmt.Sprintf("client-protocol requested is not HTTP1 or HTTP2: %s", cfg.GRPC))
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_WithReadStallRetryEnabled() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = true

storageClient, err := createHTTPClientHandle(context.Background(), &sc)

assert.NotNil(testSuite.T(), err)
assert.Nil(testSuite.T(), storageClient)
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_WithReadStallRetry() {
testCases := []struct {
name string
enableReadStallRetry bool
}{
{
name: "ReadStallRetryEnabled",
enableReadStallRetry: true,
},
{
name: "ReadStallRetryDisabled",
enableReadStallRetry: false,
},
}

for _, tc := range testCases {
testSuite.Run(tc.name, func() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = tc.enableReadStallRetry

storageClient, err := createHTTPClientHandle(context.Background(), &sc)

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
})
}
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_InitialReqTimeout() {
testCases := []struct {
name string
initialReqTimeout time.Duration
}{
{
name: "ShortTimeout",
initialReqTimeout: 1 * time.Millisecond,
},
{
name: "LongTimeout",
initialReqTimeout: 10 * time.Second,
},
}

for _, tc := range testCases {
testSuite.Run(tc.name, func() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = true
sc.InitialReqTimeout = tc.initialReqTimeout

storageClient, err := createHTTPClientHandle(context.Background(), &sc)

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
})
}
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_MinReqTimeout() {
testCases := []struct {
name string
initialReqTimeout time.Duration
}{
{
name: "ShortTimeout",
initialReqTimeout: 1 * time.Millisecond,
},
{
name: "LongTimeout",
initialReqTimeout: 10 * time.Second,
},
}

for _, tc := range testCases {
testSuite.Run(tc.name, func() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = true
sc.MinReqTimeout = tc.initialReqTimeout

storageClient, err := createHTTPClientHandle(context.Background(), &sc)

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
})
}
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_ReqIncreaseRate() {
testCases := []struct {
name string
reqIncreaseRate float64
expectErr bool
}{
{
name: "NegativeRate",
reqIncreaseRate: -0.5,
expectErr: true,
},
{
name: "ZeroRate",
reqIncreaseRate: 0.0,
expectErr: true,
},
{
name: "PositiveRate",
reqIncreaseRate: 1.5,
expectErr: false,
},
}

for _, tc := range testCases {
testSuite.Run(tc.name, func() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = true
sc.ReqIncreaseRate = tc.reqIncreaseRate

storageClient, err := createHTTPClientHandle(context.Background(), &sc)

if tc.expectErr {
assert.NotNil(testSuite.T(), err)
assert.Nil(testSuite.T(), storageClient)
} else {
assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
}
})
}
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_TargetPercentile() {
testCases := []struct {
name string
reqTargetPercentile float64
expectErr bool
}{
{
name: "LowPercentile",
reqTargetPercentile: 0.25, // 25th percentile
expectErr: false,
},
{
name: "MidPercentile",
reqTargetPercentile: 0.50, // 50th percentile
expectErr: false,
},
{
name: "HighPercentile",
reqTargetPercentile: 0.90, // 90th percentile
expectErr: false,
},
{
name: "InvalidPercentile-Low",
reqTargetPercentile: -0.5, // Invalid percentile
expectErr: true,
},
{
name: "InvalidPercentile-High",
reqTargetPercentile: 1.5, // Invalid percentile
expectErr: true,
},
}

for _, tc := range testCases {
testSuite.Run(tc.name, func() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = true
sc.ReqTargetPercentile = tc.reqTargetPercentile

storageClient, err := createHTTPClientHandle(context.Background(), &sc)

if tc.expectErr {
assert.NotNil(testSuite.T(), err)
assert.Nil(testSuite.T(), storageClient)
} else {
assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
}
})
}
}

func (testSuite *StorageHandleTest) TestNewStorageHandleWithGRPCClientWithCustomEndpointNilAndAuthEnabled() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.CustomEndpoint = ""
Expand Down
8 changes: 8 additions & 0 deletions internal/storage/storageutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ type StorageClientConfig struct {

// Enabling new API flow for HNS bucket.
EnableHNS bool

// Parameters to define retry behavior for stalled read.
EnableReadStallRetry bool
InitialReqTimeout time.Duration
MaxReqTimeout time.Duration
MinReqTimeout time.Duration
ReqIncreaseRate float64
ReqTargetPercentile float64
}

func CreateHttpClient(storageClientConfig *StorageClientConfig) (httpClient *http.Client, err error) {
Expand Down
6 changes: 6 additions & 0 deletions internal/storage/storageutil/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,11 @@ func GetDefaultStorageClientConfig() (clientConfig StorageClientConfig) {
ExperimentalEnableJsonRead: false,
AnonymousAccess: true,
EnableHNS: false,
EnableReadStallRetry: false,
InitialReqTimeout: 20 * time.Second,
MaxReqTimeout: 120 * time.Hour,
MinReqTimeout: 500 * time.Millisecond,
ReqIncreaseRate: 15,
ReqTargetPercentile: 0.99,
}
}

0 comments on commit 204ce41

Please sign in to comment.