diff --git a/bigquery/storage_integration_test.go b/bigquery/storage_integration_test.go index 492561c8a488..e8c15879cdab 100644 --- a/bigquery/storage_integration_test.go +++ b/bigquery/storage_integration_test.go @@ -99,7 +99,7 @@ func TestIntegration_StorageReadClientProject(t *testing.T) { table.ProjectID = "bigquery-public-data" it := table.Read(ctx) - _, err := countIteratorRows(it) + total, err := countIteratorRows(it) if err != nil { t.Fatal(err) } @@ -112,6 +112,19 @@ func TestIntegration_StorageReadClientProject(t *testing.T) { if !strings.HasPrefix(session.bqSession.Name, expectedPrefix) { t.Fatalf("expected read session to have prefix %q: but found %s:", expectedPrefix, session.bqSession.Name) } + + // create session with different project + it = table.Read(ctx, WithReadSessionProjectID("bigquery-public-data")) + if it.IsAccelerated() { + t.Fatal("storage api should not be used due to lack of permissions") + } + newTotal, err := countIteratorRows(it) + if err != nil { + t.Fatal(err) + } + if total != newTotal { + t.Fatalf("expected total to be %d, but got %d", total, newTotal) + } } func TestIntegration_StorageReadFromSources(t *testing.T) { diff --git a/bigquery/table.go b/bigquery/table.go index 944a836d8dd6..bd1492247b80 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -967,14 +967,38 @@ func (t *Table) Delete(ctx context.Context) (err error) { }) } +type tableReadOption struct { + readSessionProjectID string +} + +// TableReadOption allows requests to alter the behavior of reading from a table. +type TableReadOption func(*tableReadOption) + +// WithReadSessionProjectID allows to create the read session with the specified project that has the necessary permissions to do so. +func WithReadSessionProjectID(project string) TableReadOption { + return func(tro *tableReadOption) { + tro.readSessionProjectID = project + } +} + // Read fetches the contents of the table. -func (t *Table) Read(ctx context.Context) *RowIterator { - return t.read(ctx, fetchPage) +func (t *Table) Read(ctx context.Context, opts ...TableReadOption) *RowIterator { + return t.read(ctx, fetchPage, opts...) } -func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator { +func (t *Table) read(ctx context.Context, pf pageFetcher, opts ...TableReadOption) *RowIterator { + tro := &tableReadOption{} + for _, o := range opts { + o(tro) + } + + // fallback to the client's project ID. + if tro.readSessionProjectID == "" { + tro.readSessionProjectID = t.c.projectID + } + if t.c.isStorageReadAvailable() { - it, err := newStorageRowIteratorFromTable(ctx, t, t.c.projectID, false) + it, err := newStorageRowIteratorFromTable(ctx, t, tro.readSessionProjectID, false) if err == nil { return it }