Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: connector for Neo4j #11526

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4e91997
feat: connector for Neo4j
keith-fullsight Oct 3, 2024
d4e3d1a
Merge branch 'master' into feat(ingestion/neo4j)
keith-fullsight Oct 8, 2024
53c2463
feat: connector for Neo4j
keith-fullsight Oct 10, 2024
bbccdca
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Oct 10, 2024
d02b3c1
feat: connector for Neo4j
keith-fullsight Oct 10, 2024
cbcfa2f
feat: connector for Neo4j
keith-fullsight Oct 10, 2024
bc4cfcf
feat: connector for Neo4j
keith-fullsight Oct 11, 2024
d8ad4bd
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Oct 11, 2024
e95d403
Merge branch 'datahub-project:master' into feat(ingestion/neo4j)
k-bartlett Oct 11, 2024
bff830a
feat: connector for Neo4j
keith-fullsight Oct 18, 2024
94e294e
Merge branch 'master' into feat(ingestion/neo4j)
keith-fullsight Oct 18, 2024
28614af
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Oct 18, 2024
5ac8fd7
feat: connector for Neo4j
keith-fullsight Oct 21, 2024
c31a787
Merge remote-tracking branch 'datahub_fork/feat(ingestion/neo4j)' int…
keith-fullsight Oct 21, 2024
9360ce6
feat: connector for Neo4j
keith-fullsight Oct 21, 2024
3153e6a
feat(ingest/transformer/domain): Add support for on conflict do nothi…
asikowitz Oct 18, 2024
20aa223
fix(ingest/looker): Remove bad imports from looker_common (#11663)
feldjay Oct 18, 2024
c4a8001
feat(ingest/looker): include project name in model/explore properties…
hsheth2 Oct 18, 2024
12abda4
feat(ingest/fivetran): protect against high sync volume (#11589)
hsheth2 Oct 18, 2024
bda79bd
feat(sdk):platform-resource - complex queries (#11675)
shirshanka Oct 19, 2024
03c9de6
fix(docs): fix businessattributes doc (#11653)
deepgarg-visa Oct 20, 2024
9b82a7b
feat(ingest/fivetran): add safeguards on table/column lineage (#11674)
hsheth2 Oct 21, 2024
ed7c368
fix(ui): show DataHub logo for DataHub sources in ingestion souces li…
Masterchen09 Oct 21, 2024
44fad3a
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Oct 21, 2024
bde184b
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Oct 22, 2024
067ff52
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Oct 22, 2024
aea2174
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Oct 22, 2024
89b552d
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Oct 23, 2024
ef7390a
Merge remote-tracking branch 'origin/master'
keith-fullsight Oct 23, 2024
c0e667d
Merge remote-tracking branch 'datahub_fork/feat(ingestion/neo4j)' int…
keith-fullsight Oct 23, 2024
b6ed703
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Oct 23, 2024
888b33c
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Oct 30, 2024
1e274ae
Merge branch 'master' into feat(ingestion/neo4j)
k-bartlett Nov 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added datahub-web-react/src/images/neo4j.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
155 changes: 155 additions & 0 deletions metadata-ingestion/docs/sources/neo4j/neo4j.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Source Name

<!-- Set Support Status -->
![Certified](https://img.shields.io/badge/support%20status-certified-brightgreen)
![Incubating](https://img.shields.io/badge/support%20status-incubating-blue)
![Testing](https://img.shields.io/badge/support%20status-testing-lightgrey)
k-bartlett marked this conversation as resolved.
Show resolved Hide resolved

## Integration Details

<!-- Plain-language description of what this integration is meant to do. -->
<!-- Include details about where metadata is extracted from (ie. logs, source API, manifest, etc.) -->
Neo4j metadata will be ingested into DataHub using Call apoc.meta.data(); The data that is returned will be parsed
k-bartlett marked this conversation as resolved.
Show resolved Hide resolved
and will be displayed as Nodes and Relationships in DataHub. Each object will be tagged with describing what kind of DataHub
object it is. The defaults are 'Node' and 'Relationship'. These tag values can be overwritten in the recipe.



## Metadata Ingestion Quickstart

### Prerequisites

In order to ingest metadata from Neo4j, you will need:

* Neo4j instance with APOC installed


### Install the Plugin(s)

Run the following commands to install the relevant plugin(s):

`pip install 'acryl-datahub[neo4j]'`


### Configure the Ingestion Recipe(s)
k-bartlett marked this conversation as resolved.
Show resolved Hide resolved

Use the following recipe(s) to get started with ingestion.

<details>
<summary>View All Recipe Configuartion Options</summary>

| Field | Required | Default | Description |
|--------------------|:--------:|:---------------:|---------------------------------------|
| source | | | |
| `type` | ✅ | `neo4j` | A required field with a default value |
| config | | | |
| `uri` | ✅ | `default_value` | The URI for the Neo4j server |
| `username` | ✅ | None | Neo4j Username |
| `password` | ✅ | None | Neo4j Password
| `gms_server` | ✅ | None |Address for the gms server|
| `node_tag` | ❌ | `Node` |The tag that will be used to show that the Neo4j object is a Node|
| `relationship_tag` | ❌ | `Relationship` |The tag that will be used to show that the Neo4j object is a Relationship|
| `environment` | ✅ | None ||
| sink | | ||
| `type` | ✅ | None ||
| conifg | | ||
| `server` | ✅ | None ||

</details>


```yml
source:
type: 'neo4j'
config:
uri: 'neo4j+ssc://host:7687'
username: 'neo4j'
password: 'password'
gms_server: &gms_server 'http://localhost:8080'
node_tag: 'Node'
relationship_tag: 'Relationship'
environment: 'PROD'

sink:
type: "datahub-rest"
config:
server: *gms_server
```



### Sample data that is returned from Neo4j. This is the data that is parsed and used to create Nodes, Relationships.


Example relationship:
{
relationship_name: {
count: 1,
properties: {},
type: "relationship"
}
}

Example node:
{
key: Neo4j_Node,
value: {
count: 10,
labels: [],
properties: {
node_id: {
unique: true,
indexed: true,
type: "STRING",
existence: false
},
node_name: {
unique: false,
indexed: false,
type: "STRING",
existence: false
}
},
type: "node",
relationships: {
RELATIONSHIP_1: {
count: 10,
direction: "in",
labels: ["Node_1", "Node_2", "Node_3"],
properties: {
relationsip_name: {
indexed: false,
type: "STRING",
existence: false,
array: false
},
relationship_id: {
indexed: false,
type: "INTEGER",
existence: false,
array: false
}
}
},
RELATIONSHIP_2: {
count: 10,
direction: "out",
labels: ["Node_4"],
properties: {
relationship_name: {
indexed: false,
type: "STRING",
existence: false,
array: false
},
relationship_id: {
indexed: false,
type: "INTEGER",
existence: false,
array: false
}
}
}
}
}
}
15 changes: 15 additions & 0 deletions metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
source:
type: 'neo4j'
config:
uri: 'neo4j+ssc://host:7687'
username: 'neo4j'
password: 'password'
gms_server: 'http://localhost:8080'
k-bartlett marked this conversation as resolved.
Show resolved Hide resolved
node_tag: 'Node'
relationship_tag: 'Relationship'
environment: 'PROD'

sink:
type: "datahub-rest"
config:
server: 'http://localhost:8080'
1 change: 0 additions & 1 deletion metadata-ingestion/examples/cli_usage/gen_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class CorpGroupFile(BaseModel):


with open("user/user.dhub.yaml_schema.json", "w") as fp:

fp.write(json.dumps(CorpUserFile.schema(), indent=4))

with open("group/group.dhub.yaml_schema.json", "w") as fp:
Expand Down
15 changes: 15 additions & 0 deletions metadata-ingestion/examples/recipes/neo4j_to_datahub.dhub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
source:
type: 'neo4j'
config:
uri: 'neo4j+ssc://host:7687'
username: 'neo4j'
password: 'password'
gms_server: 'http://localhost:8080'
node_tag: 'Node'
relationship_tag: 'Relationship'
environment: 'PROD'

sink:
type: "datahub-rest"
config:
server: 'http://localhost:8080'
5 changes: 5 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@
"Authlib",
}

neo4j = {"neo4j", "pandas"}

# Note: for all of these, framework_common will be added.
plugins: Dict[str, Set[str]] = {
# Sink plugins.
Expand Down Expand Up @@ -488,6 +490,7 @@
"qlik-sense": sqlglot_lib | {"requests", "websocket-client"},
"sigma": sqlglot_lib | {"requests"},
"sac": sac,
"neo4j": neo4j
}

# This is mainly used to exclude plugins from the Docker image.
Expand Down Expand Up @@ -630,6 +633,7 @@
"qlik-sense",
"sigma",
"sac",
"neo4j"
]
if plugin
for dependency in plugins[plugin]
Expand Down Expand Up @@ -747,6 +751,7 @@
"qlik-sense = datahub.ingestion.source.qlik_sense.qlik_sense:QlikSenseSource",
"sigma = datahub.ingestion.source.sigma.sigma:SigmaSource",
"sac = datahub.ingestion.source.sac.sac:SACSource",
"neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource",
],
"datahub.ingestion.transformer.plugins": [
"pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def to_resource_info(self) -> models.PlatformResourceInfoClass:


class OpenAPIGraphClient:

ENTITY_KEY_ASPECT_MAP = {
aspect_type.ASPECT_INFO.get("keyForEntity"): name
for name, aspect_type in models.ASPECT_NAME_MAP.items()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def create(file: str, graph: Optional[DataHubGraph] = None) -> None:

@classmethod
def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":

structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def __str__(self):


class S3ListIterator(Iterator):

MAX_KEYS = 1000

def __init__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

@dataclass
class ClassificationReportMixin:

num_tables_fetch_sample_values_failed: int = 0

num_tables_classification_attempted: int = 0
Expand Down Expand Up @@ -112,7 +111,6 @@ def classify_schema_fields(
schema_metadata: SchemaMetadata,
sample_data: Union[Dict[str, list], Callable[[], Dict[str, list]]],
) -> None:

if not isinstance(sample_data, Dict):
try:
# TODO: In future, sample_data fetcher can be lazily called if classification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ class BigQueryV2Config(
StatefulProfilingConfigMixin,
ClassificationSourceConfigMixin,
):

include_schema_metadata: bool = Field(
default=True,
description="Whether to ingest the BigQuery schema, i.e. projects, schemas, tables, and views.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ def _process_project(
project_id
)
except Exception as e:

if self.config.project_ids and "not enabled BigQuery." in str(e):
action_mesage = (
"The project has not enabled BigQuery API. "
Expand Down Expand Up @@ -365,7 +364,6 @@ def _process_project_datasets(
bigquery_project: BigqueryProject,
db_tables: Dict[str, List[BigqueryTable]],
) -> Iterable[MetadataWorkUnit]:

db_views: Dict[str, List[BigqueryView]] = {}
db_snapshots: Dict[str, List[BigqueryTableSnapshot]] = {}
project_id = bigquery_project.id
Expand Down Expand Up @@ -1004,7 +1002,6 @@ def get_tables_for_dataset(
) -> Iterable[BigqueryTable]:
# In bigquery there is no way to query all tables in a Project id
with PerfTimer() as timer:

# PARTITIONS INFORMATION_SCHEMA view is not available for BigLake tables
# based on Amazon S3 and Blob Storage data.
# https://cloud.google.com/bigquery/docs/omni-introduction#limitations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ def get_workunits_internal(
def deduplicate_queries(
self, queries: FileBackedList[ObservedQuery]
) -> FileBackedDict[Dict[int, ObservedQuery]]:

# This fingerprint based deduplication is done here to reduce performance hit due to
# repetitive sql parsing while adding observed query to aggregator that would otherwise
# parse same query multiple times. In future, aggregator may absorb this deduplication.
Expand Down Expand Up @@ -328,7 +327,6 @@ def deduplicate_queries(
return queries_deduped

def fetch_query_log(self, project: BigqueryProject) -> Iterable[ObservedQuery]:

# Multi-regions from https://cloud.google.com/bigquery/docs/locations#supported_locations
regions = self.config.region_qualifiers

Expand All @@ -341,7 +339,6 @@ def fetch_query_log(self, project: BigqueryProject) -> Iterable[ObservedQuery]:
def fetch_region_query_log(
self, project: BigqueryProject, region: str
) -> Iterable[ObservedQuery]:

# Each region needs to be a different query
query_log_query = _build_enriched_query_log_query(
project_id=project.id,
Expand Down Expand Up @@ -435,7 +432,6 @@ def _build_enriched_query_log_query(
start_time: datetime,
end_time: datetime,
) -> str:

audit_start_time = start_time.strftime(BQ_DATETIME_FORMAT)
audit_end_time = end_time.strftime(BQ_DATETIME_FORMAT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ def _get_schema_fields(
def _get_schema_metadata(
self, topic: str, platform_urn: str, is_subject: bool
) -> Optional[SchemaMetadata]:

# Process the value schema
schema, fields = self._get_schema_and_fields(
topic=topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@


class PathSpecsConfigMixin(ConfigModel):

path_specs: List[PathSpec] = Field(
description="List of PathSpec. See [below](#path-spec) the details about PathSpec"
)
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ def _get_database_workunits(
)
mcps = reader.get_aspects(from_createdon, self.report.stop_time)
for i, (mcp, createdon) in enumerate(mcps):

if not self.urn_pattern.allowed(str(mcp.entityUrn)):
continue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ def _process_table(
table_name: str,
dataset_name: str,
) -> Iterable[MetadataWorkUnit]:

logger.debug(f"Processing table: {dataset_name}")
table_info = dynamodb_client.describe_table(TableName=table_name)["Table"]
account_id = table_info["TableArn"].split(":")[4]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ def view_fields_from_dict(
type_cls: ViewFieldType,
populate_sql_logic_in_descriptions: bool,
) -> "ViewField":

is_primary_key = field_dict.get("primary_key", "no") == "yes"

name = field_dict["name"]
Expand Down Expand Up @@ -988,13 +987,11 @@ def from_api( # noqa: C901
field_name_vs_raw_explore_field: Dict = {}

if explore.fields is not None:

if explore.fields.dimensions is not None:
for dim_field in explore.fields.dimensions:
if dim_field.name is None:
continue
else:

field_name_vs_raw_explore_field[dim_field.name] = dim_field

view_fields.append(
Expand Down Expand Up @@ -1035,7 +1032,6 @@ def from_api( # noqa: C901
if measure_field.name is None:
continue
else:

field_name_vs_raw_explore_field[
measure_field.name
] = measure_field
Expand Down
Loading