Skip to content

Commit

Permalink
Merge pull request #6 from remoteoss/add-bulk-inserts
Browse files Browse the repository at this point in the history
feat: add bulk inserts
  • Loading branch information
eliyarson authored Jul 4, 2024
2 parents cefb161 + e4df22c commit 21433df
Show file tree
Hide file tree
Showing 3 changed files with 877 additions and 3 deletions.
35 changes: 33 additions & 2 deletions snowflake_utils/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from enum import Enum
from functools import partial

from pydantic import BaseModel
from snowflake.connector.cursor import SnowflakeCursor
from typing_extensions import Self

from datetime import datetime, date
from .queries import connect, execute_statement
import logging

Expand Down Expand Up @@ -149,6 +148,27 @@ def get_create_table_statement(
"""

def bulk_insert(
self,
records,
full_refresh: bool = False,
) -> None:
with connect() as connection:
cursor = connection.cursor()
_execute_statement = partial(execute_statement, cursor)
_execute_statement(self.get_create_schema_statement())
_execute_statement(self.get_create_table_statement(full_refresh))
for k in records:
cols = ", ".join([k for k in records[k].keys()])
vals = ", ".join([_type_cast(v) for v in records[k].values()])
_execute_statement(
f"""
INSERT INTO {self.schema_}.{self.name}({cols})
VALUES ({vals})
"""
)
return None

def copy_into(
self,
path: str,
Expand Down Expand Up @@ -356,3 +376,14 @@ def _inserts(columns: list[Column], old_columns: dict[str, str]) -> str:
_possibly_cast(f'tmp."{c.name}"', old_columns.get(c.name), c.data_type)
for c in columns
)


def _type_cast(s: any) -> any:
if isinstance(s, (int, float)):
return str(s)
elif isinstance(s, str):
return f"'{s}'"
elif isinstance(s, (datetime, date)):
return f"'{s.isoformat()}'"
else:
return f"'{s}'"
Loading

0 comments on commit 21433df

Please sign in to comment.