Skip to content

Commit

Permalink
[Integration][Octopus] Updated integration to ingest data from all sp…
Browse files Browse the repository at this point in the history
…aces (#958)

# Description

What - An update to octopus integration to enable it ingest data from
all spaces

Why - The previous version ingest all spaces but not data from all
spaces, api assumes default space when `SpaceId` is not provided as a
path parameter.

How - 
- Updated client to loop over the spaces to add `spaceId` parameter to
the path of all resources
- Updated webhook handling to include`spaceId` in the path to get single
resources.
 - Cached the spaces to optimize performance during the resync event. 
- Updated the delete event to use specific `Id` to prevent deleting
related resources
- Made the example url in description of `serverUrl` in the `spec.yaml`
more expressive

## Type of change

Please leave one option from the following and delete the rest:

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [x] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners


### Integration testing checklist

- [x] Integration able to create all default resources from scratch
- [x] Resync able to create entities
- [x] Resync able to update entities
- [x] Resync able to detect and delete entities
- [x] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [x] Docs PR link
[here](port-labs/port-docs#1530)

### Preflight checklist

- [ ] Handled rate limiting
- [x] Handled pagination
- [x] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

![image](https://github.com/user-attachments/assets/68e81673-3f72-4b00-a14e-ac3b785a1f46)

![image](https://github.com/user-attachments/assets/64ce1589-5830-4b3b-b8dd-f5ee0927151e)

## API Documentation

Provide links to the API documentation used for this integration.
[Octopus Rest Api docs](https://octopus.com/docs/octopus-rest-api)

---------

Co-authored-by: PagesCoffy <isaac.p.coffie@gmail.com>
  • Loading branch information
oiadebayo and PeyGis authored Oct 10, 2024
1 parent 87ac4c1 commit 9fe159b
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 63 deletions.
2 changes: 1 addition & 1 deletion integrations/octopus/.port/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ configurations:
- name: serverUrl
required: true
type: url
description: The base URL of your Octopus Deploy instance. It should include the protocol (e.g., https://).
description: The base URL of your Octopus Deploy instance. It should include the protocol (e.g. <a href="https://demo.octopus.com" target="_blank">https://demo.octopus.com</a>).
- name: appHost
required: false
type: url
Expand Down
7 changes: 7 additions & 0 deletions integrations/octopus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

# Port_Ocean 0.1.16-beta (2024-10-10)

### Improvements

- Updated the integration to ingest resources from all spaces instead of the default space


# Port_Ocean 0.1.15-beta (2024-10-09)

### Improvements
Expand Down
71 changes: 46 additions & 25 deletions integrations/octopus/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from enum import StrEnum
from typing import Any, AsyncGenerator, Optional
from loguru import logger
from port_ocean.utils.cache import cache_iterator_result
from port_ocean.utils import http_async_client
from httpx import HTTPStatusError, Timeout

Expand All @@ -10,6 +12,14 @@
MAX_ITEMS_LIMITATION = 100


class ObjectKind(StrEnum):
SPACE = "space"
PROJECT = "project"
DEPLOYMENT = "deployment"
RELEASE = "release"
MACHINE = "machine"


class OctopusClient:
def __init__(self, server_url: str, octopus_api_key: str) -> None:
self.octopus_url = f"{server_url.rstrip('/')}/api/"
Expand Down Expand Up @@ -47,15 +57,17 @@ async def get_paginated_resources(
self,
kind: str,
params: Optional[dict[str, Any]] = None,
path_parameter: Optional[str] = None,
) -> AsyncGenerator[list[dict[str, Any]], None]:
"""Fetch paginated data from the Octopus Deploy API."""
endpoint = f"{path_parameter}/{kind}s" if path_parameter else f"{kind}s"
if params is None:
params = {}
params["skip"] = 0
params["take"] = PAGE_SIZE
page = 0
while True:
response = await self._send_api_request(f"{kind}s", params=params)
response = await self._send_api_request(endpoint, params=params)
items = response.get("Items", [])
last_page = response.get("LastPageNumber", 0)
yield items
Expand All @@ -70,14 +82,20 @@ async def get_paginated_resources(
page += 1

async def get_single_resource(
self, resource_kind: str, resource_id: str
self, resource_kind: str, resource_id: str, space_id: str
) -> dict[str, Any]:
"""Get a single resource by kind and ID."""
return await self._send_api_request(f"{resource_kind}/{resource_id}")
return await self._send_api_request(f"{space_id}/{resource_kind}/{resource_id}")

async def _get_all_spaces(self) -> list[dict[str, Any]]:
async def get_single_space(self, space_id: str) -> dict[str, Any]:
"""Get a single space by ID."""
return await self._send_api_request(f"{ObjectKind.SPACE}s/{space_id}")

@cache_iterator_result()
async def get_all_spaces(self) -> AsyncGenerator[list[dict[str, Any]], None]:
"""Get all spaces in the Octopus instance."""
return await self._send_api_request("spaces/all")
async for spaces in self.get_paginated_resources(ObjectKind.SPACE):
yield spaces

async def _create_subscription(
self, space_id: str, app_host: str
Expand All @@ -90,7 +108,7 @@ async def _create_subscription(
"WebhookTimeout": WEBHOOK_TIMEOUT,
},
"IsDisabled": False,
"Name": f"Port Subscription - {space_id}",
"Name": f"Port Subscription - {app_host}",
"SpaceId": space_id,
}
logger.info(
Expand All @@ -100,25 +118,28 @@ async def _create_subscription(
endpoint, json_data=subscription_data, method="POST"
)

async def create_webhook_subscription(self, app_host: str) -> dict[str, Any]:
async def create_webhook_subscription(self, app_host: str, space_id: str) -> None:
"""Create a new subscription for all spaces."""
for space in await self._get_all_spaces():
try:
response = await self._create_subscription(space["Id"], app_host)
if response.get("Id"):
logger.info(
f"Subscription created for space '{space['Id']}' with ID {response['Id']}"
)
else:
logger.error(
f"Failed to create subscription for space '{space['Id']}'"
)
except Exception as e:
logger.error(f"Unexpected error for space '{space['Id']}': {str(e)}")
return {"ok": True}
try:
response = await self._create_subscription(space_id, app_host)
if response.get("Id"):
logger.info(
f"Subscription created for space '{space_id}' with ID {response['Id']}"
)
else:
logger.error(f"Failed to create subscription for space '{space_id}'")
except Exception as e:
logger.error(f"Unexpected error for space '{space_id}': {str(e)}")

async def get_webhook_subscriptions(self) -> list[dict[str, Any]]:
async def get_webhook_subscriptions(
self,
space_id: str,
) -> AsyncGenerator[list[dict[str, Any]], None]:
"""Get existing subscriptions."""
response = await self._send_api_request("subscriptions/all")
logger.info(f"Retrieved {len(response)} subscriptions.")
return response
async for subscriptions in self.get_paginated_resources(
"subscription", path_parameter=space_id
):
logger.info(
f"Retrieved {len(subscriptions)} subscriptions for space {space_id}."
)
yield subscriptions
103 changes: 67 additions & 36 deletions integrations/octopus/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from enum import StrEnum
from typing import Any, Dict
from loguru import logger
from port_ocean.context.ocean import ocean
from port_ocean.utils.async_iterators import stream_async_iterators_tasks
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from client import OctopusClient
from client import OctopusClient, ObjectKind

TRACKED_EVENTS = [
"spaces",
Expand All @@ -14,14 +14,6 @@
]


class ObjectKind(StrEnum):
SPACE = "space"
PROJECT = "project"
DEPLOYMENT = "deployment"
RELEASE = "release"
MACHINE = "machine"


@ocean.on_start()
async def on_start() -> None:
logger.info("Starting Port Ocean Octopus integration")
Expand All @@ -48,50 +40,89 @@ async def setup_application() -> None:
)
return
octopus_client = await init_client()
existing_subscriptions = await octopus_client.get_webhook_subscriptions()
existing_webhook_uris = {
subscription.get("EventNotificationSubscription", {}).get("WebhookURI")
for subscription in existing_subscriptions
}
webhook_uri = f"{app_host}/integration/webhook"
if webhook_uri in existing_webhook_uris:
logger.info(f"Webhook already exists with URI: {webhook_uri}")
else:
await octopus_client.create_webhook_subscription(app_host)
logger.info(f"Webhook created with URI: {webhook_uri}")
async for spaces in octopus_client.get_all_spaces():
space_tasks = [
(space.get("Id"), octopus_client.get_webhook_subscriptions(space.get("Id")))
for space in spaces
if space.get("Id")
]

for space_id, task in space_tasks:
async for subscriptions in task:
existing_webhook_uris = {
subscription.get("EventNotificationSubscription", {}).get(
"WebhookURI"
)
for subscription in subscriptions
}
webhook_uri = f"{app_host}/integration/webhook"
if webhook_uri in existing_webhook_uris:
logger.info(
f"Webhook already exists with URI: {webhook_uri} for space {space_id}"
)
else:
await octopus_client.create_webhook_subscription(app_host, space_id)
logger.info(
f"Webhook created with URI: {webhook_uri} for space {space_id}"
)


@ocean.on_resync()
async def resync_resources(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
if kind == ObjectKind.SPACE:
return
octopus_client = await init_client()
async for spaces in octopus_client.get_all_spaces():
tasks = [
octopus_client.get_paginated_resources(kind, path_parameter=space["Id"])
for space in spaces
if space["Id"]
]
async for batch in stream_async_iterators_tasks(*tasks):
yield batch


@ocean.on_resync(ObjectKind.SPACE)
async def resync_spaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
octopus_client = await init_client()
async for resource_batch in octopus_client.get_paginated_resources(kind):
logger.info(f"Received length {len(resource_batch)} of {kind} ")
yield resource_batch
async for spaces in octopus_client.get_all_spaces():
logger.info(f"Received batch {len(spaces)} spaces")
yield spaces


@ocean.router.post("/webhook")
async def handle_webhook_request(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle the webhook request from Octopus Deploy.
"""
logger.debug(f"Received webhook event: {data}")
payload = data.get("Payload", {}).get("Event", {})
related_document_ids = payload.get("RelatedDocumentIds", [])
event_category = payload.get("Category", "")
space_id = payload["SpaceId"]
client = await init_client()
for resource_id in related_document_ids:
logger.info(f"Received webhook event with ID: {resource_id}")
resource_prefix = resource_id.split("-")[0].lower()
if resource_prefix in TRACKED_EVENTS:
kind = ObjectKind(resource_prefix.rstrip("s"))
try:
if event_category == "Deleted":
await ocean.unregister_raw(kind, [{"Id": resource_id}])
else:
if event_category == "Deleted":
resource_id = (
payload.get("ChangeDetails", {}).get("DocumentContext", {}).get("Id")
)
if resource_id and resource_id.split("-")[0].lower() in TRACKED_EVENTS:
kind = ObjectKind(resource_id.split("-")[0].lower().rstrip("s"))
await ocean.unregister_raw(kind, [{"Id": resource_id}])
else:
for resource_id in related_document_ids:
logger.info(f"Received webhook event with ID: {resource_id}")
resource_prefix = resource_id.split("-")[0].lower()
if resource_prefix in TRACKED_EVENTS:
if resource_prefix == ObjectKind.SPACE:
await client.get_single_space(space_id)
return {"ok": True}
kind = ObjectKind(resource_prefix.rstrip("s"))
try:
resource_data = await client.get_single_resource(
resource_prefix, resource_id
resource_prefix, resource_id, space_id
)
await ocean.register_raw(kind, [resource_data])
except Exception as e:
logger.error(f"Failed to process resource {resource_id}: {e}")
except Exception as e:
logger.error(f"Failed to process resource {resource_id}: {e}")
logger.info("Webhook event processed")
return {"ok": True}
2 changes: 1 addition & 1 deletion integrations/octopus/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "octopus"
version = "0.1.15-beta"
version = "0.1.16-beta"
description = "This integration ingest data from octopus deploy"
authors = ["Adebayo Iyanuoluwa <ioluwadunsinadebayo@gmail.com>"]

Expand Down

0 comments on commit 9fe159b

Please sign in to comment.