Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/dataflux): run worksteal listing parallel to sequential listing #10966

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3ab3831
test case for fast-list
akansha1812 Sep 25, 2024
559ae40
resolve comments
akansha1812 Sep 27, 2024
1cdc956
add object_lister and worksteal to fast_list
akansha1812 Oct 7, 2024
7eb0471
Merge branch 'googleapis:main' into main
akansha1812 Oct 7, 2024
1f05177
add unit tests with emulator
akansha1812 Oct 8, 2024
05b1515
Merge branch 'googleapis:main' into main
akansha1812 Oct 8, 2024
5bed09c
resolve PR errors
akansha1812 Oct 8, 2024
80054bf
reduce numobjects to resolve timeout error
akansha1812 Oct 8, 2024
e33adf5
reduce objects created for timeout error
akansha1812 Oct 8, 2024
b1b69b9
remove env variables for grpc and http
akansha1812 Oct 9, 2024
c2e56f8
run dataflux emulator tests
akansha1812 Oct 9, 2024
c78aaba
Merge branch 'main' into main
akansha1812 Oct 14, 2024
b714222
Merge branch 'main' into main
akansha1812 Oct 21, 2024
4028cb6
resolve comments
akansha1812 Oct 21, 2024
4851cc8
update ranges to nil when sequential listing is faster
akansha1812 Oct 21, 2024
0d494e0
default page size for seq listing is 5000
akansha1812 Oct 21, 2024
722e961
remove version enabled from TestDoSeqListingEmulated
akansha1812 Oct 21, 2024
f3c1571
increase emulator time
akansha1812 Oct 21, 2024
db1a757
make next_page more readable
akansha1812 Oct 22, 2024
56ab509
Merge branch 'main' into main
akansha1812 Oct 22, 2024
2b72b0e
to resolve race conditions
akansha1812 Oct 22, 2024
692e74e
rename goroutineID to id
akansha1812 Oct 22, 2024
a6cb0f4
Merge branch 'main' into main
akansha1812 Oct 23, 2024
1998873
move counter from beginning of the loop
akansha1812 Oct 25, 2024
7583e16
add mutex to error counter
akansha1812 Oct 25, 2024
4b1dcf0
emulator test for error counter and remove worker to track error from…
akansha1812 Oct 25, 2024
34f0840
Merge branch 'main' into main
akansha1812 Oct 25, 2024
564bbd3
test emulator error
akansha1812 Oct 25, 2024
ab830f6
grpc client returns context eror in desc
akansha1812 Oct 28, 2024
e14c31a
Merge branch 'main' into main
akansha1812 Oct 28, 2024
5af30c7
Merge branch 'main' into main
akansha1812 Oct 29, 2024
328f2b7
Merge branch 'main' into main
akansha1812 Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 61 additions & 12 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"runtime"
"strings"
"sync"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -92,6 +93,11 @@ type Lister struct {
skipDirectoryObjects bool
}

type contextErr struct {
mu sync.Mutex
counter int
}

// NewLister creates a new dataflux Lister to list objects in the give bucket.
func NewLister(c *storage.Client, in *ListerInput) *Lister {
bucket := c.Bucket(in.BucketName)
Expand Down Expand Up @@ -127,35 +133,58 @@ func NewLister(c *storage.Client, in *ListerInput) *Lister {
// worksteal listing is expected to be faster.
func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) {
// countError tracks the number of failed listing methods.
countError := 0
cc := &contextErr{counter: 0}

var results []*storage.ObjectAttrs
ctx, cancel := context.WithCancel(ctx)
defer cancel()

wsCompletedfirst := false
seqCompletedfirst := false
var wsObjects []*storage.ObjectAttrs
var seqObjects []*storage.ObjectAttrs
var nextToken string

// Errgroup takes care of running both methods in parallel. As soon as one of
// the method is complete, the running method also stops.
g, childCtx := errgroup.WithContext(ctx)

// To start listing method is Open and runs both worksteal and sequential listing
// in parallel. The method which completes first is used for all subsequent runs.

// TODO: Run worksteal listing when method is Open or WorkSteal.
// Run worksteal listing when method is Open or WorkSteal.
if c.method != sequential {

g.Go(func() error {
objects, err := c.workstealListing(childCtx)
if err != nil {
cc.increment()
return fmt.Errorf("error in running worksteal_lister: %w", err)
}
// Close context when sequential listing is complete.
cancel()
wsCompletedfirst = true
wsObjects = objects

return nil
})
}

// Run sequential listing when method is Open or Sequential.
if c.method != worksteal {

g.Go(func() error {
objects, nextToken, err := c.sequentialListing(childCtx)
objects, token, err := c.sequentialListing(childCtx)
if err != nil {
countError++
cc.increment()
return fmt.Errorf("error in running sequential listing: %w", err)
}
// If sequential listing completes first, set method to sequential listing
// and ranges to nil. The nextToken will be used to continue sequential listing.
results = objects
c.pageToken = nextToken
c.method = sequential
// Close context when sequential listing is complete.
cancel()
seqCompletedfirst = true
seqObjects = objects
nextToken = token

return nil
})
}
Expand All @@ -169,13 +198,27 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
// As one of the listing method completes, it is expected to cancel context for the
// only then return error. other method. If both sequential and worksteal listing
// fail due to context canceled, return error.
if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) {
return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err)
if err != nil && (!errors.Is(err, context.Canceled) || cc.counter > 1) {
return nil, fmt.Errorf("failed waiting for sequential and work steal lister : %w", err)
}
if wsCompletedfirst {
// If worksteal listing completes first, set method to worksteal listing and nextToken to "".
// The c.ranges channel will be used to continue worksteal listing.
results = wsObjects
c.pageToken = ""
c.method = worksteal
} else if seqCompletedfirst {
// If sequential listing completes first, set method to sequential listing
// and ranges to nil. The nextToken will be used to continue sequential listing.
results = seqObjects
c.pageToken = nextToken
c.method = sequential
c.ranges = nil
}

akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
// If ranges for worksteal and pageToken for sequential listing is empty, then
// listing is complete.
if c.pageToken == "" {
if c.pageToken == "" && len(c.ranges) == 0 {
return results, iterator.Done
}
return results, nil
Expand All @@ -188,6 +231,12 @@ func (c *Lister) Close() {
}
}

func (cc *contextErr) increment() {
cc.mu.Lock()
defer cc.mu.Unlock()
cc.counter++
}

// updateStartEndOffset updates start and end offset based on prefix.
// If a prefix is given, adjust start and end value such that it lists
// objects with the given prefix. updateStartEndOffset assumes prefix will
Expand Down
120 changes: 120 additions & 0 deletions storage/dataflux/fast_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
package dataflux

import (
"context"
"fmt"
"log"
"os"
"runtime"
"testing"
"time"

"cloud.google.com/go/storage"
"github.com/google/uuid"
)

func TestUpdateStartEndOffset(t *testing.T) {
Expand Down Expand Up @@ -192,3 +198,117 @@ func TestNewLister(t *testing.T) {
})
}
}

func TestNextBatchEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client *storage.Client) {

bucketHandle := client.Bucket(bucket)
if err := bucketHandle.Create(ctx, project, &storage.BucketAttrs{
Name: bucket,
}); err != nil {
t.Fatal(err)
}
wantObjects := 2
if err := createObject(ctx, bucketHandle, wantObjects); err != nil {
t.Fatalf("unable to create objects: %v", err)
}
c := NewLister(client, &ListerInput{BucketName: bucket})
defer c.Close()
childCtx, cancel := context.WithCancel(ctx)
cancel()
result, err := c.NextBatch(childCtx)
if err != nil {
t.Fatalf("NextBatch() expected to fail with %v, got %v", context.Canceled, err)
}
if len(result) > 0 {
t.Errorf("NextBatch() got object %v, want 0 objects", len(result))
}
})
}

var emulatorClients map[string]*storage.Client

type skipTransportTestKey string

func initEmulatorClients() func() error {
noopCloser := func() error { return nil }

if !isEmulatorEnvironmentSet() {
return noopCloser
}
ctx := context.Background()

grpcClient, err := storage.NewGRPCClient(ctx)
if err != nil {
log.Fatalf("Error setting up gRPC client for emulator tests: %v", err)
return noopCloser
}
httpClient, err := storage.NewClient(ctx)
if err != nil {
log.Fatalf("Error setting up HTTP client for emulator tests: %v", err)
return noopCloser
}

emulatorClients = map[string]*storage.Client{
"http": httpClient,
"grpc": grpcClient,
}

return func() error {
gerr := grpcClient.Close()
herr := httpClient.Close()

if gerr != nil {
return gerr
}
return herr
}
}

// transportClienttest executes the given function with a sub-test, a project name
// based on the transport, a unique bucket name also based on the transport, and
// the transport-specific client to run the test with. It also checks the environment
// to ensure it is suitable for emulator-based tests, or skips.
func transportClientTest(ctx context.Context, t *testing.T, test func(*testing.T, context.Context, string, string, *storage.Client)) {
checkEmulatorEnvironment(t)
for transport, client := range emulatorClients {
if reason := ctx.Value(skipTransportTestKey(transport)); reason != nil {
t.Skip("transport", fmt.Sprintf("%q", transport), "explicitly skipped:", reason)
}
t.Run(transport, func(t *testing.T) {
project := fmt.Sprintf("%s-project", transport)
bucket := fmt.Sprintf("%s-bucket-%d", transport, time.Now().Nanosecond())
test(t, ctx, project, bucket, client)
})
}
}

// checkEmulatorEnvironment skips the test if the emulator environment variables
// are not set.
func checkEmulatorEnvironment(t *testing.T) {
if !isEmulatorEnvironmentSet() {
t.Skip("Emulator tests skipped without emulator environment variables set")
}
}

// isEmulatorEnvironmentSet checks if the emulator environment variables are set.
func isEmulatorEnvironmentSet() bool {
return os.Getenv("STORAGE_EMULATOR_HOST_GRPC") != "" && os.Getenv("STORAGE_EMULATOR_HOST") != ""
}

// createObject creates given number of objects in the given bucket.
func createObject(ctx context.Context, bucket *storage.BucketHandle, numObjects int) error {

for i := 0; i < numObjects; i++ {
// Generate a unique object name using UUIDs
objectName := fmt.Sprintf("object%s", uuid.New().String())
// Create a writer for the object
wc := bucket.Object(objectName).NewWriter(ctx)
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved

// Close the writer to finalize the upload
if err := wc.Close(); err != nil {
return fmt.Errorf("failed to close writer for object %q: %v", objectName, err)
}
}
return nil
}
30 changes: 19 additions & 11 deletions storage/dataflux/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMain(m *testing.M) {
if err := httpTestBucket.Create(testPrefix); err != nil {
log.Fatalf("test bucket creation failed: %v", err)
}

cleanupEmulatorClients := initEmulatorClients()
m.Run()

if err := httpTestBucket.Cleanup(); err != nil {
Expand All @@ -62,6 +62,10 @@ func TestMain(m *testing.M) {
if err := deleteExpiredBuckets(testPrefix); err != nil {
log.Printf("expired http bucket cleanup failed: %v", err)
}
if err := cleanupEmulatorClients(); err != nil {
// Don't fail the test if cleanup fails.
log.Printf("Post-test cleanup failed for emulator clients: %v", err)
}
}

// Lists the all the objects in the bucket.
Expand Down Expand Up @@ -99,13 +103,14 @@ func TestIntegration_NextBatch(t *testing.T) {
}
const landsatBucket = "gcp-public-data-landsat"
const landsatPrefix = "LC08/01/001/00"
wantObjects := 17225

ctx := context.Background()
c, err := storage.NewClient(ctx)
if err != nil {
t.Fatalf("NewClient: %v", err)
}

numObjectsPrefix := 17225
in := &ListerInput{
BucketName: landsatBucket,
Query: storage.Query{Prefix: landsatPrefix},
Expand All @@ -115,22 +120,25 @@ func TestIntegration_NextBatch(t *testing.T) {
df := NewLister(c, in)
defer df.Close()
totalObjects := 0
counter := 0
for {
objects, err := df.NextBatch(ctx)
if err != nil && err != iterator.Done {
t.Errorf("df.NextBatch : %v", err)
}
totalObjects += len(objects)
if err == iterator.Done {
counter++
totalObjects += len(objects)
break
}
if len(objects) > in.BatchSize {
t.Errorf("expected to receive %d objects in each batch, got %d objects in a batch", in.BatchSize, len(objects))
if err != nil {
t.Errorf("df.NextBatch : %v", err)
}
counter++
totalObjects += len(objects)
}
if totalObjects != wantObjects {
t.Errorf("expected to receive %d objects in results, got %d objects in results", wantObjects, totalObjects)

if totalObjects != numObjectsPrefix {
t.Errorf("expected to receive %d objects in results, got %d objects in results", numObjectsPrefix, totalObjects)
}
if counter <= 1 {
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("expected df.NextBatch to be called more than once, got %d times", counter)
}
}

Expand Down
Loading
Loading