Skip to content

Commit

Permalink
feat: save pipeline config to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
niross committed Nov 23, 2023
1 parent 95926e5 commit 0e9b6bf
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
25 changes: 24 additions & 1 deletion dataworkspace/dataworkspace/apps/datasets/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
UserAccessType,
)
from dataworkspace.apps.datasets.model_utils import external_model_class

from dataworkspace.apps.datasets.pipelines.utils import split_schema_table
from dataworkspace.apps.eventlog.models import EventLog
from dataworkspace.apps.core.charts.models import ChartBuilderChart
from dataworkspace.apps.eventlog.utils import log_event
Expand Down Expand Up @@ -2973,6 +2973,29 @@ def save(self, force_insert=False, force_update=False, using=None, update_fields
self._original_table_name = self.table_name
self._original_config = self.config
super().save(force_insert, force_update, using, update_fields)
self.save_dataflow_pipeline_config()

def get_config_file_path(self):
return f"{settings.DATAFLOW_PIPELINES_PREFIX}/pipeline-{self.id}.json"

def save_dataflow_pipeline_config(self):
schema_name, table_name = split_schema_table(self.table_name)
client = get_s3_client()
client.put_object(
Bucket=settings.AWS_UPLOADS_BUCKET,
Key=self.get_config_file_path(),
Body=json.dumps(
{
"id": self.id,
"type": self.type,
"schedule": self.schedule,
"schema": schema_name,
"table": table_name,
"config": self.config,
"enabled": True,
}
),
)


class PipelineVersion(TimeStampedModel):
Expand Down
1 change: 1 addition & 0 deletions dataworkspace/dataworkspace/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ def sort_database_config(database_list):
LONG_RUNNING_QUERY_ALERT_THRESHOLD = env.get("LONG_RUNNING_QUERY_ALERT_THRESHOLD", "15 minutes")

DATAFLOW_IMPORTS_BUCKET_ROOT = "data-flow-imports"
DATAFLOW_PIPELINES_PREFIX = f"{DATAFLOW_IMPORTS_BUCKET_ROOT}/dw-pipelines"
DATAFLOW_API_CONFIG = {
"DATAFLOW_BASE_URL": env.get("DATAFLOW_BASE_URL"),
"DATAFLOW_HAWK_ID": env.get("DATAFLOW_HAWK_ID"),
Expand Down

0 comments on commit 0e9b6bf

Please sign in to comment.