diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py index 47f21c9f32353..f1a311c6162c6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py @@ -64,7 +64,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryQueriesSourceConfig): schema_api=BigQuerySchemaApi( self.report.schema_api_perf, self.connection, - projects_client=self.config.connection.get_projects_client(), + projects_client=self.connection, ), config=self.config, structured_report=self.report, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index afaaaf51964f8..40273c43f39f6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -276,18 +276,23 @@ def get_workunits_internal( logger.info(f"Found {self.report.num_unique_queries} unique queries") with self.report.audit_log_load_timer, queries_deduped: - i = 0 + last_log_time = datetime.now() + last_report_time = datetime.now() for _, query_instances in queries_deduped.items(): for query in query_instances.values(): - if i > 0 and i % 10000 == 0: + now = datetime.now() + if (now - last_log_time).total_seconds() >= 60: logger.info( - f"Added {i} query log equeries_dedupedntries to SQL aggregator" + f"Added {i} deduplicated query log entries to SQL aggregator" ) + last_log_time = now + + if (now - last_report_time).total_seconds() >= 300: if self.report.sql_aggregator: logger.info(self.report.sql_aggregator.as_string()) + last_report_time = now self.aggregator.add(query) - i += 1 yield from auto_workunit(self.aggregator.gen_metadata())