From 75ab4ad2c7fae864483e9eefe33f9bccf1e1f5fd Mon Sep 17 00:00:00 2001 From: Justin Date: Thu, 26 Sep 2024 11:53:21 -0400 Subject: [PATCH 1/7] bigquery: Create table read session using the client's project --- bigquery/storage_client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bigquery/storage_client.go b/bigquery/storage_client.go index f63f649ceba0..e9a1209483bd 100644 --- a/bigquery/storage_client.go +++ b/bigquery/storage_client.go @@ -109,6 +109,7 @@ func (c *readClient) sessionForTable(ctx context.Context, table *Table, rsProjec rs := &readSession{ ctx: ctx, + projectID: c.projectID, table: table, tableID: tableID, projectID: rsProjectID, @@ -142,6 +143,8 @@ func (rs *readSession) start() error { if maxStreamCount == 0 { preferredMinStreamCount = int32(rs.settings.maxWorkerCount) } + + // Create a read session on the table in the underlying readClient project. createReadSessionRequest := &storagepb.CreateReadSessionRequest{ Parent: fmt.Sprintf("projects/%s", rs.projectID), ReadSession: &storagepb.ReadSession{ From f6a127bd682bbb02180f18530855f133600651fd Mon Sep 17 00:00:00 2001 From: Justin Date: Thu, 26 Sep 2024 14:04:51 -0400 Subject: [PATCH 2/7] add TableReadOption to have table projectID as fallback --- bigquery/storage_client.go | 9 +++++++-- bigquery/table.go | 26 +++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/bigquery/storage_client.go b/bigquery/storage_client.go index e9a1209483bd..a1fae7674b7e 100644 --- a/bigquery/storage_client.go +++ b/bigquery/storage_client.go @@ -107,9 +107,15 @@ func (c *readClient) sessionForTable(ctx context.Context, table *Table, rsProjec settings.maxStreamCount = 1 } + // configure where the read session is created + readSessionProjectID := table.ProjectID + if useClientProject { + readSessionProjectID = c.projectID + } + rs := &readSession{ ctx: ctx, - projectID: c.projectID, + readSessionProjectID: readSessionProjectID, table: table, tableID: tableID, projectID: rsProjectID, @@ -144,7 +150,6 @@ func (rs *readSession) start() error { preferredMinStreamCount = int32(rs.settings.maxWorkerCount) } - // Create a read session on the table in the underlying readClient project. createReadSessionRequest := &storagepb.CreateReadSessionRequest{ Parent: fmt.Sprintf("projects/%s", rs.projectID), ReadSession: &storagepb.ReadSession{ diff --git a/bigquery/table.go b/bigquery/table.go index 944a836d8dd6..4327f078d5d8 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -967,14 +967,34 @@ func (t *Table) Delete(ctx context.Context) (err error) { }) } +type tableReadOption struct { + useClientProject bool +} + +// TableReadOption allows requests to alter the behavior of reading from a table. +type TableReadOption func(*tableReadOption) + +// WithClientProject allows the read session to be created from the client project +// when reading from the table, instead of the table's project. +func WithClientProject(b bool) TableReadOption { + return func(tro *tableReadOption) { + tro.useClientProject = true + } +} + // Read fetches the contents of the table. -func (t *Table) Read(ctx context.Context) *RowIterator { +func (t *Table) Read(ctx context.Context, opts ...TableReadOption) *RowIterator { return t.read(ctx, fetchPage) } -func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator { +func (t *Table) read(ctx context.Context, pf pageFetcher, opts ...TableReadOption) *RowIterator { + var tro *tableReadOption + for _, o := range opts { + o(tro) + } + if t.c.isStorageReadAvailable() { - it, err := newStorageRowIteratorFromTable(ctx, t, t.c.projectID, false) + it, err := newStorageRowIteratorFromTable(ctx, t, false, tro.useClientProject) if err == nil { return it } From 057d673d8977e2111568e6ca3b6a1a4eaf2444f7 Mon Sep 17 00:00:00 2001 From: Justin Date: Mon, 30 Sep 2024 09:51:56 -0400 Subject: [PATCH 3/7] small fix --- bigquery/table.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/table.go b/bigquery/table.go index 4327f078d5d8..1f6e45e8c0f2 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -976,7 +976,7 @@ type TableReadOption func(*tableReadOption) // WithClientProject allows the read session to be created from the client project // when reading from the table, instead of the table's project. -func WithClientProject(b bool) TableReadOption { +func WithClientProject() TableReadOption { return func(tro *tableReadOption) { tro.useClientProject = true } @@ -988,7 +988,7 @@ func (t *Table) Read(ctx context.Context, opts ...TableReadOption) *RowIterator } func (t *Table) read(ctx context.Context, pf pageFetcher, opts ...TableReadOption) *RowIterator { - var tro *tableReadOption + tro := &tableReadOption{useClientProject: false} for _, o := range opts { o(tro) } From 3aa58b050a8e52af104f6c13b25049da7ddb66e3 Mon Sep 17 00:00:00 2001 From: Justin Date: Mon, 30 Sep 2024 10:22:48 -0400 Subject: [PATCH 4/7] pass the option --- bigquery/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/table.go b/bigquery/table.go index 1f6e45e8c0f2..012d58aba790 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -984,7 +984,7 @@ func WithClientProject() TableReadOption { // Read fetches the contents of the table. func (t *Table) Read(ctx context.Context, opts ...TableReadOption) *RowIterator { - return t.read(ctx, fetchPage) + return t.read(ctx, fetchPage, opts...) } func (t *Table) read(ctx context.Context, pf pageFetcher, opts ...TableReadOption) *RowIterator { From 9d5da3ca06ea307c786b234fab40e98b57c20aa6 Mon Sep 17 00:00:00 2001 From: Justin Date: Mon, 30 Sep 2024 13:57:59 -0400 Subject: [PATCH 5/7] change to WithReadSessionProject option in anticipation of https://github.com/googleapis/google-cloud-go/pull/10932 --- bigquery/storage_client.go | 7 ------- bigquery/storage_integration_test.go | 9 +++++++++ bigquery/table.go | 17 ++++++++++------- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/bigquery/storage_client.go b/bigquery/storage_client.go index a1fae7674b7e..8c48ed308ca1 100644 --- a/bigquery/storage_client.go +++ b/bigquery/storage_client.go @@ -107,15 +107,8 @@ func (c *readClient) sessionForTable(ctx context.Context, table *Table, rsProjec settings.maxStreamCount = 1 } - // configure where the read session is created - readSessionProjectID := table.ProjectID - if useClientProject { - readSessionProjectID = c.projectID - } - rs := &readSession{ ctx: ctx, - readSessionProjectID: readSessionProjectID, table: table, tableID: tableID, projectID: rsProjectID, diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index 492561c8a488..d1fac66add5b 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -112,6 +112,15 @@ func TestIntegration_StorageReadClientProject(t *testing.T) { if !strings.HasPrefix(session.bqSession.Name, expectedPrefix) { t.Fatalf("expected read session to have prefix %q: but found %s:", expectedPrefix, session.bqSession.Name) } + + it = table.Read(ctx, WithReadSessionProject("bigquery-public-data")) + _, err = countIteratorRows(it) + if err != nil { + t.Fatal(err) + } + if it.IsAccelerated() { + t.Fatal("expected storage api to not be used") + } } func TestIntegration_StorageReadFromSources(t *testing.T) { diff --git a/bigquery/table.go b/bigquery/table.go index 012d58aba790..56058f46daf5 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -968,17 +968,16 @@ func (t *Table) Delete(ctx context.Context) (err error) { } type tableReadOption struct { - useClientProject bool + readSessionProject string } // TableReadOption allows requests to alter the behavior of reading from a table. type TableReadOption func(*tableReadOption) -// WithClientProject allows the read session to be created from the client project -// when reading from the table, instead of the table's project. -func WithClientProject() TableReadOption { +// WithReadSessionProject allows to create the read session with the specified project that has the necessary permissions to do so. +func WithReadSessionProject(project string) TableReadOption { return func(tro *tableReadOption) { - tro.useClientProject = true + tro.readSessionProject = project } } @@ -988,13 +987,17 @@ func (t *Table) Read(ctx context.Context, opts ...TableReadOption) *RowIterator } func (t *Table) read(ctx context.Context, pf pageFetcher, opts ...TableReadOption) *RowIterator { - tro := &tableReadOption{useClientProject: false} + tro := &tableReadOption{} for _, o := range opts { o(tro) } + if tro.readSessionProject == "" { + tro.readSessionProject = t.c.projectID + } + if t.c.isStorageReadAvailable() { - it, err := newStorageRowIteratorFromTable(ctx, t, false, tro.useClientProject) + it, err := newStorageRowIteratorFromTable(ctx, t, tro.readSessionProject, false) if err == nil { return it } From dab0dfae6c5ed2fffd4080f1ce02337b4b50159c Mon Sep 17 00:00:00 2001 From: Justin Date: Tue, 1 Oct 2024 18:21:59 -0400 Subject: [PATCH 6/7] nitpicks --- bigquery/storage_client.go | 1 - bigquery/storage_integration_test.go | 2 +- bigquery/table.go | 15 ++++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/bigquery/storage_client.go b/bigquery/storage_client.go index 8c48ed308ca1..f63f649ceba0 100644 --- a/bigquery/storage_client.go +++ b/bigquery/storage_client.go @@ -142,7 +142,6 @@ func (rs *readSession) start() error { if maxStreamCount == 0 { preferredMinStreamCount = int32(rs.settings.maxWorkerCount) } - createReadSessionRequest := &storagepb.CreateReadSessionRequest{ Parent: fmt.Sprintf("projects/%s", rs.projectID), ReadSession: &storagepb.ReadSession{ diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index d1fac66add5b..b137e0176349 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -113,7 +113,7 @@ func TestIntegration_StorageReadClientProject(t *testing.T) { t.Fatalf("expected read session to have prefix %q: but found %s:", expectedPrefix, session.bqSession.Name) } - it = table.Read(ctx, WithReadSessionProject("bigquery-public-data")) + it = table.Read(ctx, WithReadSessionProjectID("bigquery-public-data")) _, err = countIteratorRows(it) if err != nil { t.Fatal(err) diff --git a/bigquery/table.go b/bigquery/table.go index 56058f46daf5..bd1492247b80 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -968,16 +968,16 @@ func (t *Table) Delete(ctx context.Context) (err error) { } type tableReadOption struct { - readSessionProject string + readSessionProjectID string } // TableReadOption allows requests to alter the behavior of reading from a table. type TableReadOption func(*tableReadOption) -// WithReadSessionProject allows to create the read session with the specified project that has the necessary permissions to do so. -func WithReadSessionProject(project string) TableReadOption { +// WithReadSessionProjectID allows to create the read session with the specified project that has the necessary permissions to do so. +func WithReadSessionProjectID(project string) TableReadOption { return func(tro *tableReadOption) { - tro.readSessionProject = project + tro.readSessionProjectID = project } } @@ -992,12 +992,13 @@ func (t *Table) read(ctx context.Context, pf pageFetcher, opts ...TableReadOptio o(tro) } - if tro.readSessionProject == "" { - tro.readSessionProject = t.c.projectID + // fallback to the client's project ID. + if tro.readSessionProjectID == "" { + tro.readSessionProjectID = t.c.projectID } if t.c.isStorageReadAvailable() { - it, err := newStorageRowIteratorFromTable(ctx, t, tro.readSessionProject, false) + it, err := newStorageRowIteratorFromTable(ctx, t, tro.readSessionProjectID, false) if err == nil { return it } From 0bf5f7f11ef8e22bd7af9adc829a911b6e88014e Mon Sep 17 00:00:00 2001 From: Justin Date: Wed, 2 Oct 2024 09:43:10 -0400 Subject: [PATCH 7/7] improve test --- bigquery/storage_integration_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index b137e0176349..e8c15879cdab 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -99,7 +99,7 @@ func TestIntegration_StorageReadClientProject(t *testing.T) { table.ProjectID = "bigquery-public-data" it := table.Read(ctx) - _, err := countIteratorRows(it) + total, err := countIteratorRows(it) if err != nil { t.Fatal(err) } @@ -113,13 +113,17 @@ func TestIntegration_StorageReadClientProject(t *testing.T) { t.Fatalf("expected read session to have prefix %q: but found %s:", expectedPrefix, session.bqSession.Name) } + // create session with different project it = table.Read(ctx, WithReadSessionProjectID("bigquery-public-data")) - _, err = countIteratorRows(it) + if it.IsAccelerated() { + t.Fatal("storage api should not be used due to lack of permissions") + } + newTotal, err := countIteratorRows(it) if err != nil { t.Fatal(err) } - if it.IsAccelerated() { - t.Fatal("expected storage api to not be used") + if total != newTotal { + t.Fatalf("expected total to be %d, but got %d", total, newTotal) } }