Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to retry for stalled read request #2612

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/legacy_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func getConfigForUserAgent(mountConfig *cfg.Config) string {
return fmt.Sprintf("%s:%s:%s", isFileCacheEnabled, isFileCacheForRangeReadEnabled, isParallelDownloadsEnabled)
}
func createStorageHandle(newConfig *cfg.Config, userAgent string) (storageHandle storage.StorageHandle, err error) {
readStallRetryConfig := newConfig.GcsRetries.ReadStall
storageClientConfig := storageutil.StorageClientConfig{
ClientProtocol: newConfig.GcsConnection.ClientProtocol,
MaxConnsPerHost: int(newConfig.GcsConnection.MaxConnsPerHost),
Expand All @@ -124,6 +125,12 @@ func createStorageHandle(newConfig *cfg.Config, userAgent string) (storageHandle
ExperimentalEnableJsonRead: newConfig.GcsConnection.ExperimentalEnableJsonRead,
GrpcConnPoolSize: int(newConfig.GcsConnection.GrpcConnPoolSize),
EnableHNS: newConfig.EnableHns,
EnableReadStallRetry: readStallRetryConfig.Enable,
InitialReqTimeout: readStallRetryConfig.InitialReqTimeout,
MaxReqTimeout: readStallRetryConfig.MaxReqTimeout,
MinReqTimeout: readStallRetryConfig.MinReqTimeout,
ReqIncreaseRate: readStallRetryConfig.ReqIncreaseRate,
ReqTargetPercentile: readStallRetryConfig.ReqTargetPercentile,
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
}
logger.Infof("UserAgent = %s\n", storageClientConfig.UserAgent)
storageHandle, err = storage.NewStorageHandle(context.Background(), storageClientConfig)
Expand Down
26 changes: 26 additions & 0 deletions internal/storage/storage_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"fmt"
"net/http"
"os"
"strconv"

"cloud.google.com/go/storage"
control "cloud.google.com/go/storage/control/apiv2"
"cloud.google.com/go/storage/experimental"
"github.com/googleapis/gax-go/v2"
"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
Expand All @@ -35,6 +37,13 @@ import (
_ "google.golang.org/grpc/xds/googledirectpath"
)

const (
// Used to modify the hidden options in go-sdk for read stall retry.
// Ref: http://shortn/_DIoIm9dDT3
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
dynamicReadReqIncreaseRateEnv = "DYNAMIC_READ_REQ_INCREASE_RATE"
dynamicReadReqInitialTimeoutEnv = "DYNAMIC_READ_REQ_INITIAL_TIMEOUT"
)

type StorageHandle interface {
// In case of non-empty billingProject, this project is set as user-project for
// all subsequent calls on the bucket. Calls with user-project will be billed
Expand Down Expand Up @@ -144,6 +153,23 @@ func createHTTPClientHandle(ctx context.Context, clientConfig *storageutil.Stora
clientOpts = append(clientOpts, option.WithEndpoint(clientConfig.CustomEndpoint))
}

if clientConfig.EnableReadStallRetry {
// Hidden way to modify the increase rate for dynamic delay algorithm in go-sdk.
// Ref: http://shortn/_417eTbNbKK
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// Temporarily we kept an option to change the increase-rate, will be removed
// once we get a good default.
os.Setenv(dynamicReadReqIncreaseRateEnv, strconv.FormatFloat(clientConfig.ReqIncreaseRate, 'f', -1, 64))
raj-prince marked this conversation as resolved.
Show resolved Hide resolved

// Hidden way to modify the initial-timeout of the dynamic delay algorithm in go-sdk.
// Ref: http://shortn/_xArUKvvGQZ
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// Temporarily we kept an option to change the initial-timeout, will be removed
// once we get a good default.
os.Setenv(dynamicReadReqInitialTimeoutEnv, clientConfig.InitialReqTimeout.String())
clientOpts = append(clientOpts, experimental.WithReadStallTimeout(&experimental.ReadStallTimeoutConfig{
Min: clientConfig.MinReqTimeout,
TargetPercentile: clientConfig.ReqTargetPercentile,
}))
}
return storage.NewClient(ctx, clientOpts...)
}

Expand Down
181 changes: 181 additions & 0 deletions internal/storage/storage_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/url"
"testing"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
Expand Down Expand Up @@ -253,6 +254,186 @@ func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_WithGRPCClientPro
assert.Contains(testSuite.T(), err.Error(), fmt.Sprintf("client-protocol requested is not HTTP1 or HTTP2: %s", cfg.GRPC))
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_WithReadStallRetry() {
testCases := []struct {
name string
gargnitingoogle marked this conversation as resolved.
Show resolved Hide resolved
enableReadStallRetry bool
}{
{
name: "ReadStallRetryEnabled",
enableReadStallRetry: true,
},
{
name: "ReadStallRetryDisabled",
enableReadStallRetry: false,
},
}

for _, tc := range testCases {
testSuite.Run(tc.name, func() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = tc.enableReadStallRetry

storageClient, err := createHTTPClientHandle(context.Background(), &sc)
gargnitingoogle marked this conversation as resolved.
Show resolved Hide resolved

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
})
}
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_ReadStallInitialReqTimeout() {
testCases := []struct {
name string
initialReqTimeout time.Duration
}{
{
name: "ShortTimeout",
initialReqTimeout: 1 * time.Millisecond,
},
{
name: "LongTimeout",
initialReqTimeout: 10 * time.Second,
},
}

for _, tc := range testCases {
testSuite.Run(tc.name, func() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = true
sc.InitialReqTimeout = tc.initialReqTimeout

storageClient, err := createHTTPClientHandle(context.Background(), &sc)
raj-prince marked this conversation as resolved.
Show resolved Hide resolved

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
})
}
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_ReadStallMinReqTimeout() {
testCases := []struct {
name string
minReqTimeout time.Duration
}{
{
name: "ShortTimeout",
minReqTimeout: 1 * time.Millisecond,
},
{
name: "LongTimeout",
minReqTimeout: 10 * time.Second,
},
}

for _, tc := range testCases {
testSuite.Run(tc.name, func() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = true
sc.MinReqTimeout = tc.minReqTimeout

storageClient, err := createHTTPClientHandle(context.Background(), &sc)

assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
})
}
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_ReadStallReqIncreaseRate() {
testCases := []struct {
name string
reqIncreaseRate float64
expectErr bool
}{
{
name: "NegativeRate",
reqIncreaseRate: -0.5,
expectErr: true,
},
{
name: "ZeroRate",
reqIncreaseRate: 0.0,
expectErr: true,
},
{
name: "PositiveRate",
reqIncreaseRate: 1.5,
expectErr: false,
},
}

for _, tc := range testCases {
testSuite.Run(tc.name, func() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = true
sc.ReqIncreaseRate = tc.reqIncreaseRate

storageClient, err := createHTTPClientHandle(context.Background(), &sc)

if tc.expectErr {
assert.NotNil(testSuite.T(), err)
gargnitingoogle marked this conversation as resolved.
Show resolved Hide resolved
assert.Nil(testSuite.T(), storageClient)
} else {
assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
}
})
}
}

func (testSuite *StorageHandleTest) TestCreateHTTPClientHandle_ReadStallReqTargetPercentile() {
testCases := []struct {
name string
reqTargetPercentile float64
expectErr bool
}{
{
name: "LowPercentile",
reqTargetPercentile: 0.25, // 25th percentile
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
expectErr: false,
},
{
name: "MidPercentile",
reqTargetPercentile: 0.50, // 50th percentile
expectErr: false,
},
{
name: "HighPercentile",
reqTargetPercentile: 0.90, // 90th percentile
expectErr: false,
},
{
name: "InvalidPercentile-Low",
reqTargetPercentile: -0.5, // Invalid percentile
expectErr: true,
},
{
name: "InvalidPercentile-High",
reqTargetPercentile: 1.5, // Invalid percentile
expectErr: true,
},
}

for _, tc := range testCases {
testSuite.Run(tc.name, func() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.EnableReadStallRetry = true
sc.ReqTargetPercentile = tc.reqTargetPercentile

storageClient, err := createHTTPClientHandle(context.Background(), &sc)

if tc.expectErr {
assert.NotNil(testSuite.T(), err)
assert.Nil(testSuite.T(), storageClient)
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
} else {
assert.Nil(testSuite.T(), err)
assert.NotNil(testSuite.T(), storageClient)
}
})
}
}

func (testSuite *StorageHandleTest) TestNewStorageHandleWithGRPCClientWithCustomEndpointNilAndAuthEnabled() {
sc := storageutil.GetDefaultStorageClientConfig()
sc.CustomEndpoint = ""
Expand Down
8 changes: 8 additions & 0 deletions internal/storage/storageutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ type StorageClientConfig struct {

// Enabling new API flow for HNS bucket.
EnableHNS bool

// Parameters to define retry behavior for stalled read.
EnableReadStallRetry bool
InitialReqTimeout time.Duration
MaxReqTimeout time.Duration
MinReqTimeout time.Duration
ReqIncreaseRate float64
ReqTargetPercentile float64
}

func CreateHttpClient(storageClientConfig *StorageClientConfig) (httpClient *http.Client, err error) {
Expand Down
6 changes: 6 additions & 0 deletions internal/storage/storageutil/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,11 @@ func GetDefaultStorageClientConfig() (clientConfig StorageClientConfig) {
ExperimentalEnableJsonRead: false,
AnonymousAccess: true,
EnableHNS: false,
EnableReadStallRetry: false,
InitialReqTimeout: 20 * time.Second,
MaxReqTimeout: 120 * time.Hour,
MinReqTimeout: 500 * time.Millisecond,
ReqIncreaseRate: 15,
ReqTargetPercentile: 0.99,
}
}
Loading