diff --git a/datahub-web-react/src/images/neo4j.png b/datahub-web-react/src/images/neo4j.png new file mode 100644 index 0000000000000..b03b2a4532b3b Binary files /dev/null and b/datahub-web-react/src/images/neo4j.png differ diff --git a/metadata-ingestion/docs/sources/neo4j/neo4j.md b/metadata-ingestion/docs/sources/neo4j/neo4j.md new file mode 100644 index 0000000000000..7bd3cf470e330 --- /dev/null +++ b/metadata-ingestion/docs/sources/neo4j/neo4j.md @@ -0,0 +1,146 @@ +## Integration Details + + + +Neo4j metadata will be ingested into DataHub using +`CALL apoc.meta.schema() YIELD value UNWIND keys(value) AS key RETURN key, value[key] AS value;` +The data that is returned will be parsed +and will be displayed as Nodes and Relationships in DataHub. Each object will be tagged with describing what kind of DataHub +object it is. The defaults are 'Node' and 'Relationship'. These tag values can be overwritten in the recipe. + + + +## Metadata Ingestion Quickstart + +### Prerequisites + +In order to ingest metadata from Neo4j, you will need: + +* Neo4j instance with APOC installed + + +### Install the Plugin(s) + +Run the following commands to install the relevant plugin(s): + + +Use the following recipe(s) to get started with ingestion. + +
+ View All Recipe Configuartion Options + + | Field | Required | Default | Description | + |--------------------|:--------:|:---------------:|---------------------------------------| + | source | | | | + | `type` | ✅ | `neo4j` | A required field with a default value | + | config | | | | + | `uri` | ✅ | `default_value` | The URI for the Neo4j server | + | `username` | ✅ | None | Neo4j Username | + | `password` | ✅ | None | Neo4j Password + | `gms_server` | ✅ | None |Address for the gms server| + | `node_tag` | ❌ | `Node` |The tag that will be used to show that the Neo4j object is a Node| + | `relationship_tag` | ❌ | `Relationship` |The tag that will be used to show that the Neo4j object is a Relationship| + | `environment` | ✅ | None || + | sink | | || + | `type` | ✅ | None || + | conifg | | || + | `server` | ✅ | None || + +
+ + +```yml +source: + type: 'neo4j' + config: + uri: 'neo4j+ssc://host:7687' + username: 'neo4j' + password: 'password' + gms_server: &gms_server 'http://localhost:8080' + node_tag: 'Node' + relationship_tag: 'Relationship' + environment: 'PROD' + +sink: + type: "datahub-rest" + config: + server: *gms_server +``` + + + +### Sample data that is returned from Neo4j. This is the data that is parsed and used to create Nodes, Relationships. + + + Example relationship: + { + relationship_name: { + count: 1, + properties: {}, + type: "relationship" + } + } + + Example node: + { + key: Neo4j_Node, + value: { + count: 10, + labels: [], + properties: { + node_id: { + unique: true, + indexed: true, + type: "STRING", + existence: false + }, + node_name: { + unique: false, + indexed: false, + type: "STRING", + existence: false + } + }, + type: "node", + relationships: { + RELATIONSHIP_1: { + count: 10, + direction: "in", + labels: ["Node_1", "Node_2", "Node_3"], + properties: { + relationsip_name: { + indexed: false, + type: "STRING", + existence: false, + array: false + }, + relationship_id: { + indexed: false, + type: "INTEGER", + existence: false, + array: false + } + } + }, + RELATIONSHIP_2: { + count: 10, + direction: "out", + labels: ["Node_4"], + properties: { + relationship_name: { + indexed: false, + type: "STRING", + existence: false, + array: false + }, + relationship_id: { + indexed: false, + type: "INTEGER", + existence: false, + array: false + } + } + } + } + } + } diff --git a/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml b/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml new file mode 100644 index 0000000000000..61778ef3decef --- /dev/null +++ b/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml @@ -0,0 +1,14 @@ +source: + type: 'neo4j' + config: + uri: 'neo4j+ssc://host:7687' + username: 'neo4j' + password: 'password' + node_tag: 'Node' + relationship_tag: 'Relationship' + environment: 'PROD' + +sink: + type: "datahub-rest" + config: + server: 'http://localhost:8080' \ No newline at end of file diff --git a/metadata-ingestion/examples/cli_usage/gen_schemas.py b/metadata-ingestion/examples/cli_usage/gen_schemas.py index 2fd4683347a3b..80b2c6712977a 100644 --- a/metadata-ingestion/examples/cli_usage/gen_schemas.py +++ b/metadata-ingestion/examples/cli_usage/gen_schemas.py @@ -28,7 +28,6 @@ class CorpGroupFile(BaseModel): with open("user/user.dhub.yaml_schema.json", "w") as fp: - fp.write(json.dumps(CorpUserFile.schema(), indent=4)) with open("group/group.dhub.yaml_schema.json", "w") as fp: diff --git a/metadata-ingestion/examples/recipes/neo4j_to_datahub.dhub.yaml b/metadata-ingestion/examples/recipes/neo4j_to_datahub.dhub.yaml new file mode 100644 index 0000000000000..af5985b1575e2 --- /dev/null +++ b/metadata-ingestion/examples/recipes/neo4j_to_datahub.dhub.yaml @@ -0,0 +1,15 @@ +source: + type: 'neo4j' + config: + uri: 'neo4j+ssc://host:7687' + username: 'neo4j' + password: 'password' + gms_server: 'http://localhost:8080' + node_tag: 'Node' + relationship_tag: 'Relationship' + environment: 'PROD' + +sink: + type: "datahub-rest" + config: + server: 'http://localhost:8080' \ No newline at end of file diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index b10143565efb4..a16281aa07a32 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -516,6 +516,7 @@ "qlik-sense": sqlglot_lib | {"requests", "websocket-client"}, "sigma": sqlglot_lib | {"requests"}, "sac": sac, + "neo4j": {"pandas", "neo4j"}, } # This is mainly used to exclude plugins from the Docker image. @@ -657,6 +658,7 @@ "qlik-sense", "sigma", "sac", + "neo4j" ] if plugin for dependency in plugins[plugin] @@ -774,6 +776,7 @@ "qlik-sense = datahub.ingestion.source.qlik_sense.qlik_sense:QlikSenseSource", "sigma = datahub.ingestion.source.sigma.sigma:SigmaSource", "sac = datahub.ingestion.source.sac.sac:SACSource", + "neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource", ], "datahub.ingestion.transformer.plugins": [ "pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership", diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py index 060d6b00b1a12..44fd33beadec9 100644 --- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py +++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py @@ -151,7 +151,6 @@ def create(file: str, graph: Optional[DataHubGraph] = None) -> None: @classmethod def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties": - structured_property: Optional[ StructuredPropertyDefinitionClass ] = graph.get_aspect(urn, StructuredPropertyDefinitionClass) diff --git a/metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py b/metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py index a135b7b6ce837..9c34c4f83b0a9 100644 --- a/metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py +++ b/metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py @@ -32,7 +32,6 @@ def __str__(self): class S3ListIterator(Iterator): - MAX_KEYS = 1000 def __init__( diff --git a/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py b/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py index 1d381acbf3dbe..98c43079a3bc1 100644 --- a/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py +++ b/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py @@ -33,7 +33,6 @@ @dataclass class ClassificationReportMixin: - num_tables_fetch_sample_values_failed: int = 0 num_tables_classification_attempted: int = 0 @@ -112,7 +111,6 @@ def classify_schema_fields( schema_metadata: SchemaMetadata, sample_data: Union[Dict[str, list], Callable[[], Dict[str, list]]], ) -> None: - if not isinstance(sample_data, Dict): try: # TODO: In future, sample_data fetcher can be lazily called if classification diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index ad293c702a520..4af41921c9fa3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -374,7 +374,6 @@ class BigQueryV2Config( StatefulProfilingConfigMixin, ClassificationSourceConfigMixin, ): - include_schema_metadata: bool = Field( default=True, description="Whether to ingest the BigQuery schema, i.e. projects, schemas, tables, and views.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index 907e5c12e99a1..cdd58de8b10e3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -356,7 +356,6 @@ def _process_project( project_id ) except Exception as e: - if self.config.project_ids and "not enabled BigQuery." in str(e): action_mesage = ( "The project has not enabled BigQuery API. " @@ -417,7 +416,6 @@ def _process_project_datasets( bigquery_project: BigqueryProject, db_tables: Dict[str, List[BigqueryTable]], ) -> Iterable[MetadataWorkUnit]: - db_views: Dict[str, List[BigqueryView]] = {} db_snapshots: Dict[str, List[BigqueryTableSnapshot]] = {} project_id = bigquery_project.id @@ -1184,7 +1182,6 @@ def get_tables_for_dataset( ) -> Iterable[BigqueryTable]: # In bigquery there is no way to query all tables in a Project id with PerfTimer() as timer: - # PARTITIONS INFORMATION_SCHEMA view is not available for BigLake tables # based on Amazon S3 and Blob Storage data. # https://cloud.google.com/bigquery/docs/omni-introduction#limitations diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index afaaaf51964f8..db3f76bb64727 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -299,7 +299,6 @@ def get_workunits_internal( def deduplicate_queries( self, queries: FileBackedList[ObservedQuery] ) -> FileBackedDict[Dict[int, ObservedQuery]]: - # This fingerprint based deduplication is done here to reduce performance hit due to # repetitive sql parsing while adding observed query to aggregator that would otherwise # parse same query multiple times. In future, aggregator may absorb this deduplication. @@ -337,7 +336,6 @@ def deduplicate_queries( return queries_deduped def fetch_query_log(self, project: BigqueryProject) -> Iterable[ObservedQuery]: - # Multi-regions from https://cloud.google.com/bigquery/docs/locations#supported_locations regions = self.config.region_qualifiers @@ -350,7 +348,6 @@ def fetch_query_log(self, project: BigqueryProject) -> Iterable[ObservedQuery]: def fetch_region_query_log( self, project: BigqueryProject, region: str ) -> Iterable[ObservedQuery]: - # Each region needs to be a different query query_log_query = _build_enriched_query_log_query( project_id=project.id, @@ -447,7 +444,6 @@ def _build_enriched_query_log_query( start_time: datetime, end_time: datetime, ) -> str: - audit_start_time = start_time.strftime(BQ_DATETIME_FORMAT) audit_end_time = end_time.strftime(BQ_DATETIME_FORMAT) diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py index 09ce8b5b05203..ed51487ea6dab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py @@ -371,7 +371,6 @@ def _get_schema_fields( def _get_schema_metadata( self, topic: str, platform_urn: str, is_subject: bool ) -> Optional[SchemaMetadata]: - # Process the value schema schema, fields = self._get_schema_and_fields( topic=topic, diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/config.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/config.py index 5f88cf0234947..ede7d3c3c5695 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/config.py @@ -7,7 +7,6 @@ class PathSpecsConfigMixin(ConfigModel): - path_specs: List[PathSpec] = Field( description="List of PathSpec. See [below](#path-spec) the details about PathSpec" ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py index de212ca9a6771..63cea45f75864 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py @@ -107,7 +107,6 @@ def _get_database_workunits( logger.info(f"Fetching database aspects starting from {from_createdon}") mcps = reader.get_aspects(from_createdon, self.report.stop_time) for i, (mcp, createdon) in enumerate(mcps): - if not self.urn_pattern.allowed(str(mcp.entityUrn)): continue diff --git a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py index acda656526ef5..9faa12d5d9bb6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py @@ -235,7 +235,6 @@ def _process_table( table_name: str, dataset_name: str, ) -> Iterable[MetadataWorkUnit]: - logger.debug(f"Processing table: {dataset_name}") table_info = dynamodb_client.describe_table(TableName=table_name)["Table"] account_id = table_info["TableArn"].split(":")[4] diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index 3d1683100474e..1cd3c88a527cb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -307,7 +307,6 @@ def view_fields_from_dict( type_cls: ViewFieldType, populate_sql_logic_in_descriptions: bool, ) -> "ViewField": - is_primary_key = field_dict.get("primary_key", "no") == "yes" name = field_dict["name"] @@ -987,13 +986,11 @@ def from_api( # noqa: C901 field_name_vs_raw_explore_field: Dict = {} if explore.fields is not None: - if explore.fields.dimensions is not None: for dim_field in explore.fields.dimensions: if dim_field.name is None: continue else: - field_name_vs_raw_explore_field[dim_field.name] = dim_field view_fields.append( @@ -1034,7 +1031,6 @@ def from_api( # noqa: C901 if measure_field.name is None: continue else: - field_name_vs_raw_explore_field[ measure_field.name ] = measure_field diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index e42ac7b61c177..cd8ccb8217257 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -604,7 +604,6 @@ def _get_folder_browse_path_v2_entries( def _create_platform_instance_aspect( self, ) -> DataPlatformInstance: - assert ( self.source_config.platform_name ), "Platform name is not set in the configuration." @@ -999,7 +998,6 @@ def _gen_folder_key(self, folder_id: str) -> LookerFolderKey: def _make_dashboard_and_chart_mces( self, looker_dashboard: LookerDashboard ) -> Iterable[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]: - # Step 1: Emit metadata for each Chart inside the Dashboard. chart_events = [] for element in looker_dashboard.dashboard_elements: diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py index 1e60c08fe00c2..6d49d57e07743 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py @@ -55,7 +55,6 @@ def _create_new_liquid_variables_with_default( current_dict: dict = new_dict for key in keys[:-1]: - if key not in current_dict: current_dict[key] = {} @@ -392,7 +391,6 @@ def process_lookml_template_language( source_config: LookMLSourceConfig, view_lkml_file_dict: dict, ) -> None: - if "views" not in view_lkml_file_dict: return @@ -425,7 +423,6 @@ def load_and_preprocess_file( path: Union[str, pathlib.Path], source_config: LookMLSourceConfig, ) -> dict: - parsed = load_lkml(path) process_lookml_template_language( diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py index ce4a242027e11..80be566cdcd46 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py @@ -320,7 +320,6 @@ def get_including_extends( self, field: str, ) -> Optional[Any]: - # According to Looker's inheritance rules, we need to merge the fields(i.e. dimensions, measures and # dimension_groups) from both the child and parent. if field in [DIMENSIONS, DIMENSION_GROUPS, MEASURES]: @@ -345,7 +344,6 @@ def _get_sql_table_name_field(self) -> Optional[str]: return self.get_including_extends(field="sql_table_name") def _is_dot_sql_table_name_present(self) -> bool: - sql_table_name: Optional[str] = self._get_sql_table_name_field() if sql_table_name is None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index e4d8dd19fb791..1096fd3fd3ccc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -143,7 +143,6 @@ def from_looker_dict( extract_col_level_lineage: bool = False, populate_sql_logic_in_descriptions: bool = False, ) -> Optional["LookerView"]: - view_name = view_context.name() logger.debug(f"Handling view {view_name} in model {model_name}") @@ -419,7 +418,6 @@ def _get_custom_properties(self, looker_view: LookerView) -> DatasetPropertiesCl def _build_dataset_mcps( self, looker_view: LookerView ) -> List[MetadataChangeProposalWrapper]: - view_urn = looker_view.id.get_urn(self.source_config) subTypeEvent = MetadataChangeProposalWrapper( @@ -503,7 +501,6 @@ def get_project_name(self, model_name: str) -> str: def get_manifest_if_present(self, folder: pathlib.Path) -> Optional[LookerManifest]: manifest_file = folder / "manifest.lkml" if manifest_file.exists(): - manifest_dict = load_and_preprocess_file( path=manifest_file, source_config=self.source_config ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 057dbca428184..7dd2f9cb20333 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -72,7 +72,6 @@ def resolve_derived_view_urn_of_col_ref( base_folder_path: str, config: LookMLSourceConfig, ) -> List[ColumnRef]: - new_column_refs: List[ColumnRef] = [] for col_ref in column_refs: if is_derived_view(col_ref.table.lower()): @@ -641,7 +640,6 @@ def create_view_upstream( ctx: PipelineContext, reporter: LookMLSourceReport, ) -> AbstractViewUpstream: - if view_context.is_regular_case(): return RegularViewUpstream( view_context=view_context, @@ -666,7 +664,6 @@ def create_view_upstream( view_context.is_sql_based_derived_view_without_fields_case(), ] ): - return DerivedQueryUpstreamSource( view_context=view_context, config=config, diff --git a/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py index 08ed7677c7ab4..9f96f837eb9b3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metadata/lineage.py @@ -210,7 +210,6 @@ def _get_lineage_mcp( # extract the old lineage and save it for the new mcp if preserve_upstream: - client = get_default_graph() old_upstream_lineage = get_aspects_for_entity( diff --git a/metadata-ingestion/src/datahub/ingestion/source/neo4j/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/neo4j/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py b/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py new file mode 100644 index 0000000000000..2060007b2c194 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py @@ -0,0 +1,350 @@ +import logging +import time +from dataclasses import dataclass +from typing import Dict, Iterable, Optional, Type, Union + +import pandas as pd +from neo4j import GraphDatabase +from pydantic.fields import Field + +from datahub.configuration.source_common import EnvConfigMixin +from datahub.emitter.mce_builder import ( + make_data_platform_urn, + make_dataset_urn, + make_tag_urn, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SupportStatus, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph +from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaFieldDataType +from datahub.metadata.schema_classes import ( + AuditStampClass, + BooleanTypeClass, + DatasetPropertiesClass, + DateTypeClass, + GlobalTagsClass, + NumberTypeClass, + OtherSchemaClass, + SchemaFieldClass, + SchemaMetadataClass, + StringTypeClass, + TagAssociationClass, + UnionTypeClass, +) + +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +_type_mapping: Dict[Union[Type, str], Type] = { + "list": UnionTypeClass, + "boolean": BooleanTypeClass, + "integer": NumberTypeClass, + "local_date_time": DateTypeClass, + "float": NumberTypeClass, + "string": StringTypeClass, + "date": DateTypeClass, + "node": StringTypeClass, + "relationship": StringTypeClass, +} + + +class Neo4jConfig(EnvConfigMixin): + username: str = Field(default=None, description="Neo4j Username") + password: str = Field(default=None, description="Neo4j Password") + uri: str = Field(default=None, description="The URI for the Neo4j server") + environment: str = Field(default=None, description="Neo4j env") + node_tag: str = Field( + default="Node", + description="The tag that will be used to show that the Neo4j object is a Node", + ) + relationship_tag: str = Field( + default="Relationship", + description="The tag that will be used to show that the Neo4j object is a Relationship", + ) + platform: str = Field(default="neo4j", description="Neo4j platform") + + +@dataclass +class Neo4jSourceReport(SourceReport): + obj_failures: int = 0 + obj_created: int = 0 + + +@platform_name("Neo4j", id="neo4j") +@config_class(Neo4jConfig) +@support_status(SupportStatus.CERTIFIED) +class Neo4jSource(Source): + def __init__(self, ctx: PipelineContext, config: Neo4jConfig): + self.ctx = ctx + self.config = config + self.report = Neo4jSourceReport() + + @classmethod + def create(cls, config_dict, ctx): + config = Neo4jConfig.parse_obj(config_dict) + return cls(ctx, config) + + def get_field_type(self, attribute_type: Union[type, str]) -> SchemaFieldDataType: + type_class: Optional[type] = _type_mapping.get(attribute_type) + return SchemaFieldDataType(type=type_class()) + + def get_schema_field_class( + self, col_name: str, col_type: str, **kwargs + ) -> SchemaFieldClass: + if kwargs["obj_type"] == "node" and col_type == "relationship": + col_type = "node" + else: + col_type = col_type + return SchemaFieldClass( + fieldPath=col_name, + type=self.get_field_type(col_type), + nativeDataType=col_type, + description=col_type.upper() + if col_type in ("node", "relationship") + else col_type, + lastModified=AuditStampClass( + time=round(time.time() * 1000), actor="urn:li:corpuser:ingestion" + ), + ) + + def add_properties( + self, dataset: str, description=None, custom_properties=None + ) -> MetadataChangeProposalWrapper: + dataset_properties = DatasetPropertiesClass( + description=description, + customProperties=custom_properties, + ) + return MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + platform=self.config.platform, name=dataset, env=self.config.environment + ), + aspect=dataset_properties, + ) + + def generate_neo4j_object( + self, platform: str, dataset: str, columns: list, obj_type=None + ) -> MetadataChangeProposalWrapper: + try: + fields = [ + self.get_schema_field_class(key, value.lower(), obj_type=obj_type) + for d in columns + for key, value in d.items() + ] + mcp = MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + platform=platform, name=dataset, env=self.config.environment + ), + aspect=SchemaMetadataClass( + schemaName=dataset, + platform=make_data_platform_urn(platform), + version=0, + hash="", + platformSchema=OtherSchemaClass(rawSchema=""), + lastModified=AuditStampClass( + time=round(time.time() * 1000), + actor="urn:li:corpuser:ingestion", + ), + fields=fields, + ), + systemMetadata=DatasetPropertiesClass( + customProperties={"properties": "property on object"} + ), + ) + self.report.obj_created += 1 + return mcp + except Exception as e: + log.error(e) + self.report.obj_failures += 1 + + def add_tag_to_dataset( + self, table_name: str, tag_name: str + ) -> MetadataChangeProposalWrapper: + graph = DataHubGraph( + DatahubClientConfig(server=self.ctx.pipeline_config.sink.config["server"]) + ) + dataset_urn = make_dataset_urn( + platform=self.config.platform, name=table_name, env=self.config.environment + ) + current_tags: Optional[GlobalTagsClass] = graph.get_aspect( + entity_urn=dataset_urn, + aspect_type=GlobalTagsClass, + ) + tag_to_add = make_tag_urn(tag_name) + tag_association_to_add = TagAssociationClass(tag=tag_to_add) + + if current_tags: + if tag_to_add not in [x.tag for x in current_tags.tags]: + current_tags.tags.append(TagAssociationClass(tag_to_add)) + else: + current_tags = GlobalTagsClass(tags=[tag_association_to_add]) + return MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=current_tags, + ) + + def get_neo4j_metadata(self, query: str) -> pd.DataFrame: + driver = GraphDatabase.driver( + self.config.uri, auth=(self.config.username, self.config.password) + ) + """ + This process retrieves the metadata for Neo4j objects using an APOC query, which returns a dictionary + with two columns: key and value. The key represents the Neo4j object, while the value contains the + corresponding metadata. + + When data is returned from Neo4j, much of the relationship metadata is stored with the relevant node's + metadata. Consequently, the objects are organized into two separate dataframes: one for nodes and one for + relationships. + + In the node dataframe, several fields are extracted and added as new columns. Similarly, in the relationship + dataframe, certain fields are parsed out, while others require metadata from the nodes dataframe. + + Once the data is parsed and these two dataframes are created, we combine a subset of their columns into a + single dataframe, which will be used to create the DataHub objects. + + See the docs for examples of metadata: metadata-ingestion/docs/sources/neo4j/neo4j.md + """ + log.info(f"{query}") + with driver.session() as session: + result = session.run(query) + data = [record for record in result] + log.info("Closing Neo4j driver") + driver.close() + + node_df = self.process_nodes(data) + rel_df = self.process_relationships(data, node_df) + + union_cols = ["key", "obj_type", "property_data_types", "description"] + df = pd.concat([node_df[union_cols], rel_df[union_cols]]) + + return df + + def process_nodes(self, data): + nodes = [record for record in data if record["value"]["type"] == "node"] + node_df = pd.DataFrame( + nodes, + columns=["key", "value"], + ) + node_df["obj_type"] = node_df["value"].apply( + lambda record: self.get_obj_type(record) + ) + node_df["relationships"] = node_df["value"].apply( + lambda record: self.get_relationships(record) + ) + node_df["properties"] = node_df["value"].apply( + lambda record: self.get_properties(record) + ) + node_df["property_data_types"] = node_df["properties"].apply( + lambda record: self.get_property_data_types(record) + ) + node_df["description"] = node_df.apply( + lambda record: self.get_node_description(record, node_df), axis=1 + ) + return node_df + + def process_relationships(self, data, node_df): + rels = [record for record in data if record["value"]["type"] == "relationship"] + rel_df = pd.DataFrame(rels, columns=["key", "value"]) + rel_df["obj_type"] = rel_df["value"].apply( + lambda record: self.get_obj_type(record) + ) + rel_df["properties"] = rel_df["value"].apply( + lambda record: self.get_properties(record) + ) + rel_df["property_data_types"] = rel_df["properties"].apply( + lambda record: self.get_property_data_types(record) + ) + rel_df["description"] = rel_df.apply( + lambda record: self.get_rel_descriptions(record, node_df), axis=1 + ) + return rel_df + + def get_obj_type(self, record: dict) -> str: + return record["type"] + + def get_rel_descriptions(self, record: dict, df: pd.DataFrame) -> str: + descriptions = [] + for _, row in df.iterrows(): + relationships = row.get("relationships", {}) + for relationship, props in relationships.items(): + if record["key"] == relationship: + if props["direction"] == "in": + for prop in props["labels"]: + descriptions.append( + f"({row['key']})-[{record['key']}]->({prop})" + ) + return "\n".join(descriptions) + + def get_node_description(self, record: dict, df: pd.DataFrame) -> str: + descriptions = [] + for _, row in df.iterrows(): + if record["key"] == row["key"]: + for relationship, props in row["relationships"].items(): + direction = props["direction"] + for node in set(props["labels"]): + if direction == "in": + descriptions.append( + f"({row['key']})<-[{relationship}]-({node})" + ) + elif direction == "out": + descriptions.append( + f"({row['key']})-[{relationship}]->({node})" + ) + + return "\n".join(descriptions) + + def get_property_data_types(self, record: dict) -> list[dict]: + return [{k: v["type"]} for k, v in record.items()] + + def get_properties(self, record: dict) -> str: + return record["properties"] + + def get_relationships(self, record: dict) -> dict: + return record.get("relationships", None) + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + df = self.get_neo4j_metadata( + "CALL apoc.meta.schema() YIELD value UNWIND keys(value) AS key RETURN key, value[key] AS value;" + ) + for index, row in df.iterrows(): + try: + yield MetadataWorkUnit( + id=row["key"], + mcp_raw=self.generate_neo4j_object( + columns=row["property_data_types"], + dataset=row["key"], + platform=self.config.platform, + ), + ) + + yield MetadataWorkUnit( + id=row["key"], + mcp=self.add_tag_to_dataset( + table_name=row["key"], + tag_name=self.config.node_tag + if row["obj_type"] == "node" + else self.config.relationship_tag, + ), + ) + + yield MetadataWorkUnit( + id=row["key"], + mcp=self.add_properties( + dataset=row["key"], + custom_properties=None, + description=row["description"], + ), + ) + + except Exception as e: + raise e + + def get_report(self): + return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index 7072ebf6473df..f55d7a883edef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -464,7 +464,6 @@ def report_dropped(self, ent_name: str) -> None: @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.LINEAGE_COARSE, "Supported. See docs for limitations") class NifiSource(Source): - config: NifiSourceConfig report: NifiSourceReport diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py index 27efad6dc21ca..43c6c9aacd9ce 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py @@ -62,7 +62,6 @@ def parse_custom_sql( env: str, platform_instance: Optional[str], ) -> Optional["SqlParsingResult"]: - logger.debug("Using sqlglot_lineage to parse custom sql") sql_query = remove_special_characters(query) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index a61ad8d289b9d..6b16d3d4f3c1f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -62,7 +62,6 @@ def get_upstream_tables( config: PowerBiDashboardSourceConfig, parameters: Dict[str, str] = {}, ) -> List[resolver.Lineage]: - if table.expression is None: logger.debug(f"There is no M-Query expression in table {table.full_name}") return [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index cc51fcee14104..2b33d3e4f9b2c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -64,7 +64,6 @@ def urn_creator( server: str, qualified_table_name: str, ) -> str: - platform_detail: PlatformDetail = platform_instance_resolver.get_platform_instance( PowerBIPlatformDetail( data_platform_pair=data_platform_pair, @@ -162,7 +161,6 @@ def create_reference_table( arg_list: Tree, table_detail: Dict[str, str], ) -> Optional[ReferencedTable]: - arguments: List[str] = tree_function.strip_char_from_list( values=tree_function.remove_whitespaces_from_list( tree_function.token_values(arg_list) @@ -202,7 +200,6 @@ def create_reference_table( def parse_custom_sql( self, query: str, server: str, database: Optional[str], schema: Optional[str] ) -> Lineage: - dataplatform_tables: List[DataPlatformTable] = [] platform_detail: PlatformDetail = ( @@ -707,7 +704,6 @@ def create_urn_using_old_parser( def create_lineage( self, data_access_func_detail: DataAccessFunctionDetail ) -> Lineage: - arguments: List[str] = tree_function.strip_char_from_list( values=tree_function.remove_whitespaces_from_list( tree_function.token_values(data_access_func_detail.arg_list) @@ -813,7 +809,6 @@ def form_qualified_table_name( table_reference: ReferencedTable, data_platform_pair: DataPlatformPair, ) -> str: - platform_detail: PlatformDetail = ( self.platform_instance_resolver.get_platform_instance( PowerBIPlatformDetail( diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py index e137f175c15ad..b38302c6aa225 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py @@ -586,7 +586,6 @@ def _fill_metadata_from_scan_result( return workspaces def _fill_independent_datasets(self, workspace: Workspace) -> None: - reachable_datasets: List[str] = [] # Find out reachable datasets for dashboard in workspace.dashboards: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py index 8854f9ff48348..2a247d0c63957 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py @@ -126,7 +126,6 @@ def log_http_error(e: BaseException, message: str) -> Any: def get_response_dict(response: requests.Response, error_message: str) -> dict: - result_dict: dict = {} try: response.raise_for_status() diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 76030cea98494..4bc4c1451c262 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -436,7 +436,6 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit def _extract_metadata( self, connection: redshift_connector.Connection, database: str ) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: - yield from self.gen_database_container( database=database, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index e8c70260ebc7c..1863663f98bb2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -804,7 +804,6 @@ def get_dir_to_process( protocol: str, min: bool = False, ) -> List[str]: - # if len(path_spec.include.split("/")) == len(f"{protocol}{bucket_name}/{folder}".split("/")): # return [f"{protocol}{bucket_name}/{folder}"] diff --git a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py index de0904107b9bb..cc5756397aaac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sac/sac.py @@ -404,7 +404,6 @@ def get_model_workunits( columns = self.get_import_data_model_columns(model_id=model.model_id) for column in columns: - schema_field = SchemaFieldClass( fieldPath=column.name, type=self.get_schema_field_data_type(column), diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index 6f9c9259b2784..4a03717754ec2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -234,7 +234,6 @@ def populate_known_query_lineage( def get_known_query_lineage( self, query: Query, dataset_name: str, db_row: UpstreamLineageEdge ) -> Optional[KnownQueryLineageInfo]: - if not db_row.UPSTREAM_TABLES: return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/cockroachdb.py b/metadata-ingestion/src/datahub/ingestion/source/sql/cockroachdb.py index 5356cee7f6ea3..76b72d8e37f74 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/cockroachdb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/cockroachdb.py @@ -28,7 +28,6 @@ class CockroachDBConfig(PostgresConfig): @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") class CockroachDBSource(PostgresSource): - config: CockroachDBConfig def __init__(self, config: CockroachDBConfig, ctx: PipelineContext): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py b/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py index 3823c123d06cf..39d7c4ceddf33 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py @@ -178,7 +178,6 @@ def get_table_names(self, schema: Optional[str] = None) -> List[str]: ] def get_view_names(self, schema: Optional[str] = None) -> List[str]: - schema = self._inspector_instance.dialect.denormalize_name( schema or self.default_schema_name ) @@ -200,7 +199,6 @@ def get_view_names(self, schema: Optional[str] = None) -> List[str]: def get_columns( self, table_name: str, schema: Optional[str] = None, dblink: str = "" ) -> List[dict]: - denormalized_table_name = self._inspector_instance.dialect.denormalize_name( table_name ) @@ -344,7 +342,6 @@ def get_columns( return columns def get_table_comment(self, table_name: str, schema: Optional[str] = None) -> Dict: - denormalized_table_name = self._inspector_instance.dialect.denormalize_name( table_name ) @@ -416,7 +413,6 @@ def _get_constraint_data( def get_pk_constraint( self, table_name: str, schema: Optional[str] = None, dblink: str = "" ) -> Dict: - denormalized_table_name = self._inspector_instance.dialect.denormalize_name( table_name ) @@ -458,7 +454,6 @@ def get_pk_constraint( def get_foreign_keys( self, table_name: str, schema: Optional[str] = None, dblink: str = "" ) -> List: - denormalized_table_name = self._inspector_instance.dialect.denormalize_name( table_name ) @@ -540,7 +535,6 @@ def fkey_rec(): def get_view_definition( self, view_name: str, schema: Optional[str] = None ) -> Union[str, None]: - denormalized_view_name = self._inspector_instance.dialect.denormalize_name( view_name ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py b/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py index 318395d4e66b2..2b10ca1fa57ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py @@ -146,7 +146,11 @@ def urn_count(self) -> int: def compute_percent_entities_changed( new_entities: List[str], old_entities: List[str] ) -> float: - (overlap_count, old_count, _,) = _get_entity_overlap_and_cardinalities( + ( + overlap_count, + old_count, + _, + ) = _get_entity_overlap_and_cardinalities( new_entities=new_entities, old_entities=old_entities ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index c1d899f11f2e1..0eafdb4ad23ba 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -2117,7 +2117,6 @@ def parse_custom_sql( def _enrich_database_tables_with_parsed_schemas( self, parsing_result: SqlParsingResult ) -> None: - in_tables_schemas: Dict[ str, Set[str] ] = transform_parsing_result_to_in_tables_schemas(parsing_result) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py index ce224bde003fd..bb1c297513de1 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py @@ -105,7 +105,6 @@ class SimpleAddDatasetDataProduct(AddDatasetDataProduct): """Transformer that adds a specified dataproduct entity for provided dataset as its asset.""" def __init__(self, config: SimpleDatasetDataProductConfig, ctx: PipelineContext): - generic_config = AddDatasetDataProductConfig( get_data_product_to_add=lambda dataset_urn: config.dataset_to_data_product_urns.get( dataset_urn diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py index ef6ef43fa2d7f..c60f4dca28882 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py @@ -67,7 +67,6 @@ def transform_aspect( def handle_end_of_stream( self, ) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: - mcps: List[ Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] ] = [] diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py index 245a3aa3d9db1..212e018dd64fb 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py @@ -105,7 +105,6 @@ def convert_tag_as_per_mapping(self, tag: str) -> str: def handle_end_of_stream( self, ) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: - return self.owner_mcps def transform_aspect( diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py b/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py index 57af10d1040c8..f6847f234aefe 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py @@ -103,7 +103,6 @@ def create( def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: - in_container_properties_aspect: ContainerPropertiesClass = cast( ContainerPropertiesClass, aspect ) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/tags_to_terms.py b/metadata-ingestion/src/datahub/ingestion/transformer/tags_to_terms.py index 338f191c0829d..7e6125079f16e 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/tags_to_terms.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/tags_to_terms.py @@ -84,7 +84,6 @@ def get_tags_from_schema_metadata( def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: - in_glossary_terms: Optional[GlossaryTermsClass] = cast( Optional[GlossaryTermsClass], aspect ) diff --git a/metadata-ingestion/src/datahub/integrations/assertion/snowflake/metric_sql_generator.py b/metadata-ingestion/src/datahub/integrations/assertion/snowflake/metric_sql_generator.py index 5b079129e0a9c..facc7d107d1ba 100644 --- a/metadata-ingestion/src/datahub/integrations/assertion/snowflake/metric_sql_generator.py +++ b/metadata-ingestion/src/datahub/integrations/assertion/snowflake/metric_sql_generator.py @@ -72,7 +72,6 @@ def _(self, assertion: FixedIntervalFreshnessAssertion) -> str: @metric_sql.register def _(self, assertion: RowCountTotalVolumeAssertion) -> str: - # Can not use information schema here due to error - # Data metric function body cannot refer to the non-deterministic function 'CURRENT_DATABASE_MAIN_METASTORE_ID'. diff --git a/metadata-ingestion/src/datahub/specific/dashboard.py b/metadata-ingestion/src/datahub/specific/dashboard.py index 8228dbc011db2..f57df15914369 100644 --- a/metadata-ingestion/src/datahub/specific/dashboard.py +++ b/metadata-ingestion/src/datahub/specific/dashboard.py @@ -433,7 +433,6 @@ def set_description(self, description: str) -> "DashboardPatchBuilder": def add_custom_properties( self, custom_properties: Optional[Dict[str, str]] = None ) -> "DashboardPatchBuilder": - if custom_properties: for key, value in custom_properties.items(): self.custom_properties_patch_helper.add_property(key, value) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index f0496379c45b8..036d57c94ee96 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -831,7 +831,6 @@ def add_preparsed_query( session_has_temp_tables: bool = True, _is_internal: bool = False, ) -> None: - # Adding tool specific metadata extraction here allows it # to work for both ObservedQuery and PreparsedQuery as # add_preparsed_query it used within add_observed_query. diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index b635f8cb47b6d..265d3f027f028 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -880,7 +880,6 @@ def _sqlglot_lineage_inner( default_schema: Optional[str] = None, default_dialect: Optional[str] = None, ) -> SqlParsingResult: - if not default_dialect: dialect = get_dialect(schema_resolver.platform) else: diff --git a/metadata-ingestion/src/datahub/sql_parsing/tool_meta_extractor.py b/metadata-ingestion/src/datahub/sql_parsing/tool_meta_extractor.py index cdd35c23e3088..0d85002776e5e 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/tool_meta_extractor.py +++ b/metadata-ingestion/src/datahub/sql_parsing/tool_meta_extractor.py @@ -79,7 +79,6 @@ def _extract_mode_query(self, entry: QueryLog) -> bool: return True def extract_bi_metadata(self, entry: QueryLog) -> bool: - for tool, meta_extractor in self.known_tool_extractors: try: if meta_extractor(entry): diff --git a/metadata-ingestion/src/datahub/testing/mcp_diff.py b/metadata-ingestion/src/datahub/testing/mcp_diff.py index 95b8e83c7a64a..5e669a718e9ad 100644 --- a/metadata-ingestion/src/datahub/testing/mcp_diff.py +++ b/metadata-ingestion/src/datahub/testing/mcp_diff.py @@ -206,7 +206,7 @@ def apply_delta(self, golden: List[Dict[str, Any]]) -> None: """ aspect_diffs = [v for d in self.aspect_changes.values() for v in d.values()] for aspect_diff in aspect_diffs: - for (_, old, new) in aspect_diff.aspects_changed.keys(): + for _, old, new in aspect_diff.aspects_changed.keys(): golden[old.delta_info.idx] = new.delta_info.original indices_to_remove = set() diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index 4ea42d568da63..17023c7b388e7 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -43,7 +43,6 @@ def _make_owner_category_list( owner_category_urn: Optional[str], owner_ids: List[str], ) -> List[Dict]: - return [ { "urn": mce_builder.make_owner_urn(owner_id, owner_type), @@ -285,7 +284,6 @@ def convert_to_aspects(self, operation_map: Dict[str, list]) -> Dict[str, Any]: aspect_map[Constants.ADD_TAG_OPERATION] = tag_aspect if Constants.ADD_OWNER_OPERATION in operation_map: - owner_aspect = OwnershipClass( owners=[ OwnerClass( diff --git a/metadata-ingestion/src/datahub/utilities/threaded_iterator_executor.py b/metadata-ingestion/src/datahub/utilities/threaded_iterator_executor.py index 216fa155035d3..04dee0df42237 100644 --- a/metadata-ingestion/src/datahub/utilities/threaded_iterator_executor.py +++ b/metadata-ingestion/src/datahub/utilities/threaded_iterator_executor.py @@ -19,7 +19,6 @@ def process( args_list: Iterable[Tuple[Any, ...]], max_workers: int, ) -> Generator[T, None, None]: - out_q: queue.Queue[T] = queue.Queue() def _worker_wrapper( diff --git a/metadata-ingestion/tests/integration/azure_ad/test_azure_ad.py b/metadata-ingestion/tests/integration/azure_ad/test_azure_ad.py index 7005bc2e4411b..024bb62bbe9ce 100644 --- a/metadata-ingestion/tests/integration/azure_ad/test_azure_ad.py +++ b/metadata-ingestion/tests/integration/azure_ad/test_azure_ad.py @@ -68,7 +68,6 @@ def run_ingest( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, ) as mock_checkpoint: - mock_checkpoint.return_value = mock_datahub_graph mocked_functions_reference( diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py index ef846f698f156..806779475dea9 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery_queries.py @@ -47,7 +47,6 @@ def _generate_queries_cached_file(tmp_path: Path, queries_json_path: Path) -> No @patch("google.cloud.bigquery.Client") @patch("google.cloud.resourcemanager_v3.ProjectsClient") def test_queries_ingestion(project_client, client, pytestconfig, monkeypatch, tmp_path): - test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" mcp_golden_path = f"{test_resources_dir}/bigquery_queries_mcps_golden.json" mcp_output_path = tmp_path / "bigquery_queries_mcps.json" diff --git a/metadata-ingestion/tests/integration/looker/test_looker.py b/metadata-ingestion/tests/integration/looker/test_looker.py index 7238a49cb37d2..8bbf14709ff9f 100644 --- a/metadata-ingestion/tests/integration/looker/test_looker.py +++ b/metadata-ingestion/tests/integration/looker/test_looker.py @@ -1047,7 +1047,6 @@ def test_independent_soft_deleted_looks( mocked_client = mock.MagicMock() with mock.patch("looker_sdk.init40") as mock_sdk: - mock_sdk.return_value = mocked_client setup_mock_look(mocked_client) setup_mock_soft_deleted_look(mocked_client) diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 94b3b103d0548..3185d3cac53a5 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -833,7 +833,6 @@ def test_manifest_parser(pytestconfig: pytest.Config) -> None: @freeze_time(FROZEN_TIME) def test_duplicate_field_ingest(pytestconfig, tmp_path, mock_time): - test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out_file = "duplicate_ingest_mces_output.json" diff --git a/metadata-ingestion/tests/integration/okta/test_okta.py b/metadata-ingestion/tests/integration/okta/test_okta.py index 63ef8793caddd..10148273c9366 100644 --- a/metadata-ingestion/tests/integration/okta/test_okta.py +++ b/metadata-ingestion/tests/integration/okta/test_okta.py @@ -58,14 +58,12 @@ def run_ingest( mocked_functions_reference, recipe, ): - with patch( "datahub.ingestion.source.identity.okta.OktaClient" ) as MockClient, patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, ) as mock_checkpoint: - mock_checkpoint.return_value = mock_datahub_graph mocked_functions_reference(MockClient=MockClient) @@ -277,7 +275,6 @@ def overwrite_group_in_mocked_data(test_resources_dir, MockClient): def _init_mock_okta_client( test_resources_dir, MockClient, mock_users_json=None, mock_groups_json=None ): - okta_users_json_file = ( test_resources_dir / "okta_users.json" if mock_users_json is None diff --git a/metadata-ingestion/tests/integration/oracle/common.py b/metadata-ingestion/tests/integration/oracle/common.py index 79dbda8c30f89..9e2cc42ef1025 100644 --- a/metadata-ingestion/tests/integration/oracle/common.py +++ b/metadata-ingestion/tests/integration/oracle/common.py @@ -33,7 +33,6 @@ def scalar(self): @dataclass class MockConstraints: - constraint_name: str = "mock constraint name" constraint_type: str = "P" local_column: str = "mock column name" diff --git a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py index 0f360d44c38cb..4d69ebeaf588e 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py +++ b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py @@ -467,7 +467,6 @@ def test_scan_all_workspaces( mock_time: datetime.datetime, requests_mock: Any, ) -> None: - test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) @@ -517,7 +516,6 @@ def test_extract_reports( mock_time: datetime.datetime, requests_mock: Any, ) -> None: - enable_logging() test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" @@ -1219,7 +1217,6 @@ def test_independent_datasets_extraction( mock_time: datetime.datetime, requests_mock: Any, ) -> None: - test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api( @@ -1323,7 +1320,6 @@ def test_cll_extraction( mock_time: datetime.datetime, requests_mock: Any, ) -> None: - test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api( @@ -1380,7 +1376,6 @@ def test_cll_extraction_flags( mock_time: datetime.datetime, requests_mock: Any, ) -> None: - register_mock_api( pytestconfig=pytestconfig, request_mock=requests_mock, @@ -1392,7 +1387,6 @@ def test_cll_extraction_flags( ) with pytest.raises(Exception, match=pattern): - Pipeline.create( { "run_id": "powerbi-test", diff --git a/metadata-ingestion/tests/integration/qlik_sense/test_qlik_sense.py b/metadata-ingestion/tests/integration/qlik_sense/test_qlik_sense.py index ee1aafb6cf32d..95f096cc3def3 100644 --- a/metadata-ingestion/tests/integration/qlik_sense/test_qlik_sense.py +++ b/metadata-ingestion/tests/integration/qlik_sense/test_qlik_sense.py @@ -1011,7 +1011,6 @@ def default_config(): def test_qlik_sense_ingest( pytestconfig, tmp_path, requests_mock, mock_websocket_send_request ): - test_resources_dir = pytestconfig.rootpath / "tests/integration/qlik_sense" register_mock_api(request_mock=requests_mock) @@ -1051,7 +1050,6 @@ def test_qlik_sense_ingest( def test_platform_instance_ingest( pytestconfig, tmp_path, requests_mock, mock_websocket_send_request ): - test_resources_dir = pytestconfig.rootpath / "tests/integration/qlik_sense" register_mock_api(request_mock=requests_mock) diff --git a/metadata-ingestion/tests/integration/sigma/test_sigma.py b/metadata-ingestion/tests/integration/sigma/test_sigma.py index 6c01bf6dc80fe..19fa1448fee59 100644 --- a/metadata-ingestion/tests/integration/sigma/test_sigma.py +++ b/metadata-ingestion/tests/integration/sigma/test_sigma.py @@ -420,7 +420,6 @@ def register_mock_api(request_mock: Any, override_data: dict = {}) -> None: @pytest.mark.integration def test_sigma_ingest(pytestconfig, tmp_path, requests_mock): - test_resources_dir = pytestconfig.rootpath / "tests/integration/sigma" register_mock_api(request_mock=requests_mock) @@ -464,7 +463,6 @@ def test_sigma_ingest(pytestconfig, tmp_path, requests_mock): @pytest.mark.integration def test_platform_instance_ingest(pytestconfig, tmp_path, requests_mock): - test_resources_dir = pytestconfig.rootpath / "tests/integration/sigma" register_mock_api(request_mock=requests_mock) @@ -510,7 +508,6 @@ def test_platform_instance_ingest(pytestconfig, tmp_path, requests_mock): @pytest.mark.integration def test_sigma_ingest_shared_entities(pytestconfig, tmp_path, requests_mock): - test_resources_dir = pytestconfig.rootpath / "tests/integration/sigma" override_data = { diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index 8f45be96625a4..9e4bb2f0eb634 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -441,7 +441,6 @@ def default_query_results( # noqa: C901 include_column_lineage=True, ), ): - return [ { "DOWNSTREAM_TABLE_NAME": f"TEST_DB.TEST_SCHEMA.TABLE_{op_idx}", diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index edfc41616e44b..c3ea81905455a 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -277,7 +277,6 @@ def mock_sdk_client( datasources_side_effect: List[dict], sign_out_side_effect: List[dict], ) -> mock.MagicMock: - mock_client = mock.Mock() mocked_metadata = mock.Mock() mocked_metadata.query.side_effect = side_effect_query_metadata_response @@ -1229,7 +1228,6 @@ def test_permission_ingestion(pytestconfig, tmp_path, mock_datahub_graph): @freeze_time(FROZEN_TIME) @pytest.mark.integration def test_permission_mode_switched_error(pytestconfig, tmp_path, mock_datahub_graph): - with mock.patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, diff --git a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py index c078f1b77fd1b..b8b0563a1d24e 100644 --- a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py +++ b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py @@ -282,7 +282,6 @@ def register_mock_data(workspace_client): def mock_hive_sql(query): - if query == "DESCRIBE EXTENDED `bronze_kambi`.`bet` betStatusId": return [ ("col_name", "betStatusId"), diff --git a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py index 73b7790b62e9e..5042c78c2e7b9 100644 --- a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py @@ -16,7 +16,6 @@ def run_test(): - with mock.patch("snowflake.connector.connect") as mock_connect: sf_connection = mock.MagicMock() sf_cursor = mock.MagicMock() diff --git a/metadata-ingestion/tests/unit/api/entities/common/test_serialized_value.py b/metadata-ingestion/tests/unit/api/entities/common/test_serialized_value.py index c9f16bbcef6fc..a72087376b78a 100644 --- a/metadata-ingestion/tests/unit/api/entities/common/test_serialized_value.py +++ b/metadata-ingestion/tests/unit/api/entities/common/test_serialized_value.py @@ -10,7 +10,6 @@ class MyTestModel(BaseModel): def test_base_model(): - test_base_model = MyTestModel( test_string_field="test_string_field", test_int_field=42, @@ -31,7 +30,6 @@ def test_base_model(): def test_dictwrapper(): - from datahub.metadata.schema_classes import DatasetPropertiesClass dataset_properties = DatasetPropertiesClass( @@ -58,7 +56,6 @@ def test_dictwrapper(): def test_raw_dictionary(): - test_object = { "test_string_field": "test_string_field", "test_int_field": 42, diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py b/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py index cafca521ae014..c5c4a378894c3 100644 --- a/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py @@ -104,7 +104,6 @@ def test_incremental_table_lineage(tmp_path, pytestconfig): def test_incremental_table_lineage_empty_upstreams(tmp_path, pytestconfig): - urn = make_dataset_urn(platform, "dataset1") aspect = make_lineage_aspect( "dataset1", diff --git a/metadata-ingestion/tests/unit/bigquery/test_bigqueryv2_usage_source.py b/metadata-ingestion/tests/unit/bigquery/test_bigqueryv2_usage_source.py index 63de742b201a9..3247a64631da7 100644 --- a/metadata-ingestion/tests/unit/bigquery/test_bigqueryv2_usage_source.py +++ b/metadata-ingestion/tests/unit/bigquery/test_bigqueryv2_usage_source.py @@ -184,7 +184,6 @@ def test_bigquery_table_sanitasitation(): def test_unquote_and_decode_unicode_escape_seq(): - # Test with a string that starts and ends with quotes and has Unicode escape sequences input_string = '"Hello \\u003cWorld\\u003e"' expected_output = "Hello " diff --git a/metadata-ingestion/tests/unit/redshift/test_redshift_lineage.py b/metadata-ingestion/tests/unit/redshift/test_redshift_lineage.py index 2e3eb8fde1292..941d13be0a613 100644 --- a/metadata-ingestion/tests/unit/redshift/test_redshift_lineage.py +++ b/metadata-ingestion/tests/unit/redshift/test_redshift_lineage.py @@ -221,7 +221,6 @@ def mock_redshift_connection() -> MagicMock: def mock_graph() -> DataHubGraph: - graph = MagicMock() graph._make_schema_resolver.return_value = SchemaResolver( diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index b1ad9eb5c15d7..2a771a9847abd 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -958,7 +958,6 @@ def test_table_lineage_via_temp_table_disordered_add( @freeze_time(FROZEN_TIME) def test_basic_usage(pytestconfig: pytest.Config) -> None: - frozen_timestamp = parse_user_datetime(FROZEN_TIME) aggregator = SqlParsingAggregator( platform="redshift", diff --git a/metadata-ingestion/tests/unit/test_neo4j_source.py b/metadata-ingestion/tests/unit/test_neo4j_source.py new file mode 100644 index 0000000000000..07f41a37aa36d --- /dev/null +++ b/metadata-ingestion/tests/unit/test_neo4j_source.py @@ -0,0 +1,156 @@ +import unittest + +import pandas as pd + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.neo4j.neo4j_source import Neo4jConfig, Neo4jSource + + +class TestNeo4j(unittest.TestCase): + def setUp(self): + self.neo = Neo4jSource(Neo4jConfig(), PipelineContext(run_id="test")) + self.record_1 = { + "count": 1, + "labels": [], + "properties": { + "id": { + "unique": True, + "indexed": True, + "type": "STRING", + "existence": False, + }, + }, + "type": "node", + "relationships": { + "RELATIONSHIP_1": { + "count": 0, + "direction": "out", + "labels": ["Label_1"], + "properties": {}, + } + }, + } + self.record_2 = { + "count": 2, + "labels": [], + "properties": { + "id": { + "unique": True, + "indexed": True, + "type": "STRING", + "existence": False, + }, + "amount": { + "unique": True, + "indexed": True, + "type": "INTEGER", + "existence": False, + }, + }, + "type": "node", + "relationships": { + "RELATIONSHIP_1": { + "count": 0, + "direction": "out", + "labels": ["Label_1"], + "properties": {}, + }, + "RELATIONSHIP_2": { + "count": 1, + "direction": "in", + "labels": ["Label_1", "Label_2"], + "properties": {}, + }, + }, + } + self.record_3 = {"count": 3, "properties": {}, "type": "relationship"} + self.record_4 = { + "RELATIONSHIP_2": { + "count": 4, + "properties": {}, + "type": "relationship", + "relationships": { + "RELATIONSHIP_1": { + "count": 0, + "direction": "out", + "labels": ["Label_1"], + "properties": {}, + }, + "RELATIONSHIP_2": { + "count": 1, + "direction": "in", + "labels": ["Label_1", "Label_2"], + "properties": {}, + }, + }, + } + } + + def create_df(self): + data = { + "key": ["item1", "item2", "item3", "RELATIONSHIP_2"], + "value": [ + self.record_1, + self.record_2, + self.record_3, + self.record_4, + ], + } + df = pd.DataFrame(data) + return df + + def test_get_obj_type(self): + assert self.neo.get_obj_type(self.record_1) == "node" + assert self.neo.get_obj_type(self.record_2) == "node" + assert self.neo.get_obj_type(self.record_3) == "relationship" + + def test_get_relationships(self): + assert self.neo.get_relationships(self.record_1, self.create_df()) == { + "RELATIONSHIP_1": { + "count": 0, + "direction": "out", + "labels": ["Label_1"], + "properties": {}, + } + } + assert self.neo.get_relationships(self.record_2, self.create_df()) == { + "RELATIONSHIP_1": { + "count": 0, + "direction": "out", + "labels": ["Label_1"], + "properties": {}, + }, + "RELATIONSHIP_2": { + "count": 1, + "direction": "in", + "labels": ["Label_1", "Label_2"], + "properties": {}, + }, + } + assert self.neo.get_relationships(self.record_3, self.create_df()) is None + + def test_get_property_data_types(self): + record_1 = self.record_1.get("properties", None) + record_2 = self.record_2.get("properties", None) + assert self.neo.get_property_data_types(record_1) == [{"id": "STRING"}] + assert self.neo.get_property_data_types(record_2) == [ + {"id": "STRING"}, + {"amount": "INTEGER"}, + ] + + def test_get_properties(self): + assert self.neo.get_properties(self.record_1) == { + "id": { + "unique": True, + "indexed": True, + "type": "STRING", + "existence": False, + }, + } + assert self.neo.get_properties(self.record_2) == self.record_2.get( + "properties", None + ) + + +if __name__ == "__main__": + unittest.main()