diff --git a/.env b/.env
new file mode 100644
index 0000000..57908a4
--- /dev/null
+++ b/.env
@@ -0,0 +1,5 @@
+# This file is used by the python ad api module to select the API endpoint type
+# Do not modify unless you want to test the cli tool in sandbox environment
+# environment variables defined inside a .env file
+# AWS_ENV=SANDBOX
+AWS_ENV=PRODUCTION
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4e0c20c
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,11 @@
+# build system
+/.venv
+/cdk.out
+amz_stream_infra/__pycache__/
+amz_stream_cli/__pycache__/
+tests/__pycache__/
+tests/unit/__pycache__/
+
+# jetbrains
+/.idea
+/*.iml
\ No newline at end of file
diff --git a/LICENSE b/LICENSE
index 09951d9..1bb4f21 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,5 +1,3 @@
-MIT No Attribution
-
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
diff --git a/README.md b/README.md
index 7f92204..02ee730 100644
--- a/README.md
+++ b/README.md
@@ -1,17 +1,195 @@
-## My Project
+# Amazon Marketing Stream reference implementation using AWS CDK
-TODO: Fill this README out!
+This project contains an example implementation and infrastructure code to:
-Be sure to:
+1. Provisions necessary AWS infrastructure to receive and store Amazon Marketing Stream data, as well as confirm Stream dataset subscriptions.
+2. Subscribe to datasets and manage subscriptions using a CLI.
-* Change the title in this README
-* Edit your repository description on GitHub
+## Disclaimer
+This is a reference implementation, and not the only definitive way to consume Amazon Marketing Stream data. Note that this implementation is subject to change and future releases may not be backwards compatible.
-## Security
+## Solution architecture
+![Architecture diagram](architecture.png)
-See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information.
+This application is developed using Python and AWS Cloud Development Kit (CDK).
-## License
+The application provisions the following AWS infrastructure components for each dataset and region combination:
-This library is licensed under the MIT-0 License. See the LICENSE file.
+- An [SQS queue](https://docs.aws.amazon.com/sqs/index.html) (StreamIngressQueue) that receives initial messages from Stream.
+- A [lambda](https://docs.aws.amazon.com/lambda/index.html) (StreamFanoutLambda) that identifies whether a message contains subscription details or data.
+- A second SQS queue (SubscriptionConfirmationQueue) that forwards subscription confirmation messages to a second lambda (SubscriptionConfirmationLambda) that confirms the subscription.
+- An [SNS topic](https://docs.aws.amazon.com/sns/index.html) (StreamFanoutDataTopic that) forwards data through a [KinesisDateFirehouse](https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html) (StreamStorageFirehose) to an [S3 bucket](https://docs.aws.amazon.com/s3/index.html) (StreamStorageBucket) where the data is stored.
+Note: The provisioning of each SQS queue also includes an associated [dead-letter queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html).
+
+## Supported datasets
+
+This package includes following templates for all available datasets for advertising regions NA, EU, and FE. All NA stacks will be deployed in AWS region `us-east-1`, EU stacks will be deployed in AWS region `eu-west-1`, and FE stacks will be deployed in AWS region `us-west-2` to minimize latency of message delivery. For more information on datasets, see the [Stream data guide](https://advertising.amazon.com/API/docs/en-us/amazon-marketing-stream/data-guide).
+
+* AmzStream-NA-sp-traffic
+* AmzStream-NA-sp-conversion
+* AmzStream-NA-budget-usage
+* AmzStream-NA-sd-traffic
+* AmzStream-NA-sd-conversion
+* AmzStream-EU-sp-traffic
+* AmzStream-EU-sp-conversion
+* AmzStream-EU-budget-usage
+* AmzStream-FE-sp-traffic
+* AmzStream-FE-sp-conversion
+* AmzStream-FE-budget-usage
+
+## Development prerequisites
+
+- [AWS account](https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-creating.html)
+- [AWS Cloud Development Kit (CDK)](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html)
+- Python 3.7 or later including pip and virtualenv
+
+We recommend exploring the contents of this project and familiarizing yourself with the AWS infrastructure before deploying.
+
+## Deployment steps
+
+1. Initialize your project and activate a virtualenv. The `cdk.json` file tells the CDK Toolkit how to execute your app. This project is set up like a standard Python project. The initialization process creates a virtualenv within this project, stored under the .venv directory. To create the virtualenv, it assumes that there is a `python3` executable in your path with access to the `venv` package. If the automatic creation of the virtualenv fails, you can always create the virtualenv manually once the init process completes.
+
+ **Manually create a virtualenv on MacOS and Linux**
+
+ ```
+ $ python3 -m venv .venv
+ ```
+
+ After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.
+
+ ```
+ $ source .venv/bin/activate
+ ```
+
+ **Manually create a virtualenv on Windows**
+
+ ```
+ % .venv\Scripts\activate.bat
+ ```
+
+2. Install the required dependencies.
+
+ ```
+ $ pip install -r requirements.txt
+ ```
+
+3. Synthesize the CloudFormation templates for this code.
+
+ ```
+ $ cdk synth
+ ```
+
+ To view the CloudFormation templates created by the synthesize step.
+
+ ```
+ $ cdk ls
+ ```
+
+4. Deploy CloudFormation templates.
+
+ Depending on your requirements, you can choose to deploy all CloudFormation templates or individual templates.
+
+ ```
+ $ cdk deploy --all
+ ```
+
+ or
+
+ ```
+ $ cdk deploy AmzStream-NA-sp-traffic
+ ```
+
+ At the end of deployment, your output should resemble:
+
+ ```
+ Outputs:
+ AmzStream-NA-sp-traffic.IngressIngressQueue91B67342 = arn:aws:sqs:us-east-1:2xxxxxxxxxxx:AmzStream-NA-sp-traffic-IngressQueue26236266-Jvxxxxxxxxxx
+ AmzStream-NA-sp-traffic.StorageLandingZoneBucketFE2101CB = arn:aws:s3:::amzstream-na-sp-traffic-storagelz10f6c360-1hxxxxxxxxxxx
+ Stack ARN:
+ arn:aws:cloudformation:us-east-1:2xxxxxxxxxxx:stack/AmzStream-NA-sp-traffic/57151cc0-b625-11ed-a641-12730e200e31
+ ```
+
+ Note:
+ * This example uses `AmzStream-NA-sp-traffic` as an example.
+ * `AmzStream-NA-sp-traffic.IngressIngressQueue91B67342` is the name of the example queue that will receive messages for dataset `sp-traffic` from NA region.
+ * `arn:aws:sqs:us-east-1:2xxxxxxxxxxx:AmzStream-NA-sp-traffic-IngressQueue26236266-Jvxxxxxxxxxx` is the ARN of the example queue and should be used for field `destinationArn` while calling the subscription API as listed in the [subscription step](https://advertising.amazon.com/API/docs/en-us/amazon-marketing-stream/onboarding#step-3-subscribe-to-amazon-marketing-stream-datasets) of the onboarding guide.
+ * `AmzStream-NA-sp-traffic.StorageLandingZoneBucketFE2101CB` is the name of the example S3 bucket that will store all the received messages for this dataset.
+
+## Useful CDK commands
+
+* `cdk ls` Lists all stacks in the app
+* `cdk synth` Emits the synthesized CloudFormation template
+* `cdk bootstrap` Deploys the CDK toolkit stack into an AWS environment
+* `cdk deploy` Deploys this stack to your default AWS account/region
+* `cdk diff` Compares deployed stack with current state
+* `cdk docs` Opens CDK documentation
+
+## Using the Stream subscription management command line tool
+
+We provide a Stream subscription management command line tool that supports following commands:
+
+* Create - Creates an Amazon Marketing Stream subscription.
+* Get - Gets information on a Amazon Marketing Stream subscription by ID.
+* List - Lists all Amazon Marketing Stream subscriptions associated with your Amazon Advertising API profile.
+* Update - Updates an Amazon Marketing Stream subscription by ID.
+
+In order to use the CLI, you must create a credentials.yml file with your Amazon Ads API credentials. If you don't have credentials for the Ads API, review the [Onboarding process](https://advertising.amazon.com/API/docs/en-us/onboarding/overview).
+
+### Search path for credentials.yml
+
+* macOS and Other Unix: `~/.config/python-ad-api`
+* Windows: `%APPDATA%\python-ad-api` where the APPDATA environment variable falls back to `%HOME%\AppData\Roaming` if undefined
+
+For more information, see [Python Confuse module help](https://confuse.readthedocs.io/en/latest/usage.html#search-paths).
+
+Example: `~/.config/python-ad-api/credentials.yml`
+
+```javascript
+version: '1.0'
+
+default:
+ refresh_token: 'your-refresh-token'
+ client_id: 'your-client-id'
+ client_secret: 'your-client-secret'
+ profile_id: 'your-profile-id'
+```
+
+### Querying and aggregating Stream data
+Once you start receiving Stream data in AWS, you can learn more about aggregating and querying Stream data in our documentation.
+
+- [Querying Stream data](https://advertising.amazon.com/API/docs/en-us/amazon-marketing-stream/querying-data)
+- [Aggregating and joining traffic and conversion Stream data](https://advertising.amazon.com/API/docs/en-us/amazon-marketing-stream/aggregating-data)
+
+### Using the CLI
+
+You can view instructions for using the CLI using `python -m amz_stream_cli --help`.
+
+Example:
+
+```
+% python -m amz_stream_cli --help
+
+
+ Usage: amz_stream_cli [OPTIONS] COMMAND [ARGS]...
+
+╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────────────────╮
+│ --version -v Show the application's version and exit. │
+│ --install-completion Install completion for the current shell. │
+│ --show-completion Show completion for the current shell, to copy it or customize the installation. │
+│ --help Show this message and exit. │
+╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
+╭─ Commands ─────────────────────────────────────────────────────────────────────────────────────────────────────────╮
+│ create Creates Amazon Marketing Stream subscription. │
+│ get Gets information on specific Amazon Marketing Stream subscription by ID. │
+│ list Lists all Amazon Marketing Stream subscriptions associated with your Amazon Advertising API account. │
+│ update Updates specific Amazon Marketing Stream subscription by ID. │
+╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
+```
+
+For help on individual commands, use the following:
+
+* `python -m amz_stream_cli create --help`
+* `python -m amz_stream_cli get --help`
+* `python -m amz_stream_cli list --help`
+* `python -m amz_stream_cli update --help`
diff --git a/amz_stream_cli/__init__.py b/amz_stream_cli/__init__.py
new file mode 100644
index 0000000..d3cb51d
--- /dev/null
+++ b/amz_stream_cli/__init__.py
@@ -0,0 +1,17 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+__app_name__ = "amz_stream_cli"
+__version__ = "0.0.1"
diff --git a/amz_stream_cli/__main__.py b/amz_stream_cli/__main__.py
new file mode 100644
index 0000000..d69f6b2
--- /dev/null
+++ b/amz_stream_cli/__main__.py
@@ -0,0 +1,24 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+from amz_stream_cli import cli, __app_name__
+
+
+def main():
+ cli.app(prog_name=__app_name__)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/amz_stream_cli/cli.py b/amz_stream_cli/cli.py
new file mode 100644
index 0000000..9110a4b
--- /dev/null
+++ b/amz_stream_cli/cli.py
@@ -0,0 +1,160 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+from amz_stream_cli import __app_name__, __version__
+from amz_stream_cli.stream_api import AdvertisingApiRegion, Stream, DataSet, SubscriptionUpdateEntityStatus
+from rich.console import Console
+from rich.table import Table
+from typing import Optional
+import json
+import typer
+
+
+app = typer.Typer()
+console = Console()
+
+
+def _version_callback(value: bool) -> None:
+ if value:
+ console.print(f"{__app_name__} v{__version__}")
+ raise typer.Exit()
+
+
+def _subscription_to_table(subscription: dict) -> Optional[Table]:
+ table = Table("Field", "Value")
+ for field, value in subscription.items():
+ table.add_row(field, value)
+ return table
+
+
+def _check_for_error_message_from_api(payload: dict) -> None:
+ if 'message' in payload:
+ console.print("Error message received from API:")
+ console.print(payload['message'])
+ raise typer.Exit(-1)
+
+
+@app.command(name="create",
+ short_help="Creates Amazon Marketing Stream subscription.",
+ help="""
+ Example usage:\n
+ python -m amz_stream_cli create
+ --destination-arn arn:aws:sqs:us-east-1:xxxxxxxxxxxx:AmzStream-NA-sp-traffic-IngressQueuexxxxx
+ -client-request-token my-unique-idempotency-string-token
+ --data-set-id sp-traffic
+ --notes "This is a marketing stream subscription"
+ """)
+def create_subscription(
+ api_region: AdvertisingApiRegion = typer.Option(AdvertisingApiRegion.NA, "--api-region", "-a", help="Advertising API region to use. Default is NA."),
+ destination_arn: str = typer.Option(..., "--destination-arn", "-e", help="AWS ARN of the destination endpoint associated with the subscription. Supported destination types: SQS"),
+ client_request_token: str = typer.Option(..., "--client-request-token", "-c", help="Unique value supplied by the caller used to track identical API requests. Should request be re-tried, "
+ "the caller should supply the same value."),
+ notes: str = typer.Option(None, "--notes", "-n", help="Additional details associated with the subscription."),
+ data_set_id: DataSet = typer.Option(..., "--data-set-id", "-d", help="DataSet ID to use for the subscription.")
+) -> None:
+ create_subscription_dict = {
+ "destinationArn": destination_arn,
+ "clientRequestToken": client_request_token,
+ "dataSetId": data_set_id.value
+ }
+ if notes is not None:
+ create_subscription_dict["notes"] = notes
+
+ response = Stream(marketplace=AdvertisingApiRegion.get_marketplace(api_region)) \
+ .create_subscription(body=json.dumps(create_subscription_dict))
+ _check_for_error_message_from_api(response.payload)
+ table = _subscription_to_table(response.payload)
+ console.print("Subscription has been created!")
+ console.print(table)
+
+
+@app.command(name="update",
+ short_help="Updates specific Amazon Marketing Stream subscription by ID.",
+ help="""
+ Example usage:\n
+ python -m amz_stream_cli update
+ --subscription-id amzn1.fead.xxxx.xxxxxxxxxxxx
+ --status ARCHIVED
+ --notes "Subscription archived from CLI tool"
+ """)
+def update_subscription(
+ api_region: AdvertisingApiRegion = typer.Option(AdvertisingApiRegion.NA, "--api-region", "-a", help="Advertising API region to use. Default is NA."),
+ subscription_id: str = typer.Option(..., "--subscription-id", "-s", help="Subscription ID of the subscription that will be updated."),
+ status: SubscriptionUpdateEntityStatus = typer.Option(..., "--status", "-t", help="Status to use for the subscription."),
+ notes: str = typer.Option(None, "--notes", "-n", help="Notes for the subscription update."),
+) -> None:
+ update_subscription_dict = {
+ "status": status.value
+ }
+ if notes is not None:
+ update_subscription_dict["notes"] = notes
+
+ response = Stream(marketplace=AdvertisingApiRegion.get_marketplace(api_region)) \
+ .update_subscription(subscription_id=subscription_id, body=json.dumps(update_subscription_dict))
+
+ _check_for_error_message_from_api(response.payload)
+ console.print("Subscription ID {} has been {}!".format(subscription_id, status.value))
+
+
+@app.command(name="get",
+ short_help="Gets information on specific Amazon Marketing Stream subscription by ID.",
+ help="""
+ Example usage:\n
+ python -m amz_stream_cli get
+ --subscription-id amzn1.fead.xxxx.xxxxxxxxxxxx
+ """)
+def get_subscription(
+ api_region: AdvertisingApiRegion = typer.Option(AdvertisingApiRegion.NA, "--api-region", "-a", help="Advertising API region to use. Default is NA."),
+ subscription_id: str = typer.Option(..., "--subscription-id", "-s", help="Subscription ID of the subscription to be fetched."),
+) -> None:
+ response = Stream(marketplace=AdvertisingApiRegion.get_marketplace(api_region)) \
+ .get_subscription(subscription_id=subscription_id)
+ _check_for_error_message_from_api(response.payload)
+ subscription = response.payload['subscription']
+ table = _subscription_to_table(subscription)
+ console.print(table)
+
+
+@app.command(name="list",
+ short_help="Lists all Amazon Marketing Stream subscriptions associated with your Amazon Advertising API account.",
+ help="""
+ Example usage:\n
+ python -m amz_stream_cli list \n
+ """)
+def list_subscriptions(
+ api_region: AdvertisingApiRegion = typer.Option(AdvertisingApiRegion.NA, "--api-region", "-a", help="Advertising API region to use. Default is NA.")
+) -> None:
+ response = Stream(marketplace=AdvertisingApiRegion.get_marketplace(api_region)).list_subscriptions()
+ _check_for_error_message_from_api(response.payload)
+ if 'subscriptions' in response.payload:
+ subscriptions = sorted(response.payload['subscriptions'], key=lambda d: d['status'])
+ for subscription in subscriptions:
+ console.print(_subscription_to_table(subscription))
+ else:
+ console.print("No subscriptions found!")
+ raise typer.Exit()
+
+
+@app.callback()
+def main(version: Optional[bool] = typer.Option(
+ None,
+ "--version",
+ "-v",
+ help="Show the application's version and exit.",
+ callback=_version_callback,
+ is_eager=True,
+)
+) -> None:
+ return
diff --git a/amz_stream_cli/stream_api.py b/amz_stream_cli/stream_api.py
new file mode 100644
index 0000000..37ffed5
--- /dev/null
+++ b/amz_stream_cli/stream_api.py
@@ -0,0 +1,74 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+from ad_api.base import ApiResponse, Client, Marketplaces, sp_endpoint, fill_query_params
+from amz_stream_cli import __version__
+from enum import Enum
+
+
+class AdvertisingApiRegion(str, Enum):
+ NA = "NA"
+ EU = "EU"
+ FE = "FE"
+
+ @staticmethod
+ def get_marketplace(api_region):
+ if api_region == AdvertisingApiRegion.NA:
+ return Marketplaces.NA
+ elif api_region == AdvertisingApiRegion.EU:
+ return Marketplaces.EU
+ elif api_region == AdvertisingApiRegion.FE:
+ return Marketplaces.JP
+ else:
+ raise Exception(f"Unsupported region: {api_region}")
+
+
+class DataSet(str, Enum):
+ sp_traffic = "sp-traffic"
+ sp_conversion = "sp-conversion"
+ budget_usage = "budget-usage"
+ sd_traffic = "sd-traffic"
+ sd_conversion = "sd-conversion"
+
+
+class SubscriptionUpdateEntityStatus(str, Enum):
+ archived = "ARCHIVED"
+
+
+class Stream(Client):
+ @sp_endpoint('/streams/subscriptions', method='POST')
+ def create_subscription(self, **kwargs) -> ApiResponse:
+ return self._request(kwargs.pop('path'), data=kwargs.pop('body'), params=kwargs,
+ headers=self._add_additional_cli_headers())
+
+ @sp_endpoint('/streams/subscriptions/{}', method='PUT')
+ def update_subscription(self, subscription_id, **kwargs) -> ApiResponse:
+ return self._request(fill_query_params(kwargs.pop('path'), subscription_id),
+ data=kwargs.pop('body'), params=kwargs, headers=self._add_additional_cli_headers())
+
+ @sp_endpoint('/streams/subscriptions/{}', method='GET')
+ def get_subscription(self, subscription_id, **kwargs) -> ApiResponse:
+ return self._request(fill_query_params(kwargs.pop('path'), subscription_id), params=kwargs,
+ headers=self._add_additional_cli_headers())
+
+ @sp_endpoint('/streams/subscriptions', method='GET')
+ def list_subscriptions(self, **kwargs) -> ApiResponse:
+ return self._request(kwargs.pop('path'), params=kwargs, headers=self._add_additional_cli_headers())
+
+ @staticmethod
+ def _add_additional_cli_headers():
+ additional_headers = {'x-amzn-stream-cli-version': __version__}
+ return additional_headers
+
diff --git a/amz_stream_infra/__init__.py b/amz_stream_infra/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/amz_stream_infra/infra_rollout.py b/amz_stream_infra/infra_rollout.py
new file mode 100644
index 0000000..be76072
--- /dev/null
+++ b/amz_stream_infra/infra_rollout.py
@@ -0,0 +1,28 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import aws_cdk as cdk
+
+from .stack_definitions import AmzStreamConsumerStack
+
+
+def rollout_stacks(app: cdk.App, config: dict):
+ ambassadors_config = config['ambassadors']
+ datasets_config = config['datasets']
+ installation_region_config = config['consumerStackInstallationAwsRegion']
+ for advertising_region in datasets_config:
+ for dataset_config in datasets_config[advertising_region]:
+ AmzStreamConsumerStack(app, advertising_region, installation_region_config[advertising_region],
+ dataset_config, ambassadors_config)
diff --git a/amz_stream_infra/stack_definitions.py b/amz_stream_infra/stack_definitions.py
new file mode 100644
index 0000000..a9a6233
--- /dev/null
+++ b/amz_stream_infra/stack_definitions.py
@@ -0,0 +1,268 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+from constructs import Construct
+from aws_cdk import (
+ Environment,
+ Duration,
+ Stack,
+ Tags,
+ CfnOutput,
+ aws_iam as iam,
+ aws_sqs as sqs,
+ aws_sns as sns,
+ aws_lambda as _lambda,
+ aws_lambda_event_sources as lambda_events,
+ aws_s3 as s3,
+ aws_kinesisfirehose_alpha as firehose,
+ aws_kinesisfirehose_destinations_alpha as destinations,
+ aws_lambda_event_sources as lambda_event_source
+)
+
+
+class DataSetScopedConstruct(Construct):
+ """
+ Base construct which has scoped to dataset
+ """
+ def __init__(self, scope: Construct, construct_id: str, ambassadors_config, dataset_config):
+ super().__init__(scope, construct_id)
+ self.ambassadors_config = ambassadors_config
+ self.dataset_config = dataset_config
+
+
+class AmzStreamAmbassadorsInfra(DataSetScopedConstruct):
+
+ def __init__(self, scope: Construct, construct_id: str, ambassadors_config, dataset_config) -> None:
+ super().__init__(scope, construct_id, ambassadors_config, dataset_config)
+
+ self.reviewer_principal = iam.Role.from_role_arn(self,
+ "Reviewer",
+ self.ambassadors_config['reviewerArn'],
+ mutable=False)
+
+ def grant_review(self, queue: sqs.Queue):
+ queue.grant(self.reviewer_principal, "sqs:GetQueueAttributes")
+
+
+class AmzStreamStreamDeliveryInfra(DataSetScopedConstruct):
+
+ def __init__(self, scope, construct_id, ambassadors_config, dataset_config) -> None:
+ super().__init__(scope, construct_id, ambassadors_config, dataset_config)
+
+ def grant_stream_delivery(self, queue: sqs.Queue):
+ stream_delivery_principal = iam.PrincipalWithConditions(
+ iam.ServicePrincipal("sns.amazonaws.com"),
+ {"ArnLike": {"aws:SourceArn": self.dataset_config["snsSourceArn"]}}
+ )
+ queue.grant_send_messages(stream_delivery_principal)
+
+
+class StreamIngress(DataSetScopedConstruct):
+
+ def __init__(self,
+ scope: Construct,
+ construct_id: str,
+ ambassadors_config,
+ dataset_config,
+ visibility_timeout_s: int = 60,
+ receive_message_wait_time_s: int = 20,
+ retention_period_s: int = 1209600,
+ max_receive_count: int = 10) -> None:
+ super().__init__(scope, construct_id, ambassadors_config, dataset_config)
+
+ self.ambassadors = AmzStreamAmbassadorsInfra(self, "AmzStreamAmbassadorsInfra", ambassadors_config, dataset_config)
+ self.stream_delivery_infra = AmzStreamStreamDeliveryInfra(self, "AmzStreamStreamDeliveryInfra",
+ ambassadors_config, dataset_config)
+
+ self.ingress_dlq = sqs.Queue(
+ self,
+ "Dlq",
+ visibility_timeout=Duration.seconds(visibility_timeout_s),
+ retention_period=Duration.seconds(retention_period_s)
+ )
+
+ self.ingress_queue = sqs.Queue(
+ self,
+ "Queue",
+ visibility_timeout=Duration.seconds(visibility_timeout_s),
+ receive_message_wait_time=Duration.seconds(receive_message_wait_time_s),
+ dead_letter_queue=sqs.DeadLetterQueue(
+ max_receive_count=max_receive_count,
+ queue=self.ingress_dlq
+ )
+ )
+
+ self.ingress_queue_stack_output = CfnOutput(
+ self,
+ "IngressQueue",
+ value=self.ingress_queue.queue_arn
+ )
+
+ self.ambassadors.grant_review(self.ingress_queue)
+ self.stream_delivery_infra.grant_stream_delivery(self.ingress_queue)
+
+
+class StreamFanout(DataSetScopedConstruct):
+
+ def __init__(
+ self,
+ scope: Construct,
+ construct_id: str,
+ ambassadors_config,
+ dataset_config,
+ visibility_timeout_s: int = 60,
+ max_receive_count: int = 10
+ ) -> None:
+ super().__init__(scope, construct_id, ambassadors_config, dataset_config)
+
+ self.data_fanout_topic = sns.Topic(
+ self, "DataTopic"
+ )
+
+ self.subscription_confirmation_dlq = sqs.Queue(
+ self,
+ "SubsConfirmationDlq",
+ visibility_timeout=Duration.seconds(visibility_timeout_s)
+ )
+
+ self.subscription_confirmation_queue = sqs.Queue(
+ self, "SubsConfirmationQueue",
+ visibility_timeout=Duration.seconds(visibility_timeout_s),
+ dead_letter_queue=sqs.DeadLetterQueue(
+ max_receive_count=max_receive_count,
+ queue=self.subscription_confirmation_dlq
+ )
+ )
+
+ self.fanout_lambda = _lambda.Function(
+ self, "Lambda",
+ runtime=_lambda.Runtime.PYTHON_3_9,
+ handler="stream_fanout_lambda.handler",
+ code=_lambda.Code.from_asset(path="lambda"),
+ environment={
+ "DATA_FANOUT_TOPIC_ARN": self.data_fanout_topic.topic_arn,
+ "SUBSCRIPTION_CONFIRMATION_QUEUE_URL": self.subscription_confirmation_queue.queue_url
+ }
+ )
+ self.data_fanout_topic.grant_publish(self.fanout_lambda)
+ self.subscription_confirmation_queue.grant_send_messages(self.fanout_lambda)
+
+ def subscribe_to_stream(self, stream_ingress: StreamIngress):
+ invoke_event_source = lambda_events.SqsEventSource(stream_ingress.ingress_queue)
+ self.fanout_lambda.add_event_source(invoke_event_source)
+
+
+class StreamLanding(DataSetScopedConstruct):
+ def __init__(self, scope: Construct, construct_id: str, ambassadors_config, dataset_config) -> None:
+ super().__init__(scope, construct_id, ambassadors_config, dataset_config)
+
+ self.lz_bucket = s3.Bucket(self, "LZ")
+ self.lz_bucket_output = CfnOutput(
+ self,
+ "LandingZoneBucket",
+ value=self.lz_bucket.bucket_arn
+ )
+
+ prefix = "year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
+ self.firehose = firehose.DeliveryStream(
+ self,
+ "Firehose",
+ destinations=[
+ destinations.S3Bucket(
+ self.lz_bucket,
+ data_output_prefix=f"{dataset_config['dataSetId']}/{prefix}",
+ error_output_prefix=f"errors/{dataset_config['dataSetId']}/"
+ )
+ ]
+ )
+
+ self.sns_subscriptions_role = iam.Role(
+ self,
+ "SnsSubsRole",
+ assumed_by=iam.ServicePrincipal("sns.amazonaws.com")
+ )
+ self.firehose.grant_put_records(self.sns_subscriptions_role)
+
+ def subscribe_to_fanout(self, stream_fanout: StreamFanout):
+
+ self.firehose_subscription = sns.Subscription(
+ self,
+ "FirehoseSub",
+ topic=stream_fanout.data_fanout_topic,
+ endpoint=self.firehose.delivery_stream_arn,
+ protocol=sns.SubscriptionProtocol.FIREHOSE,
+ subscription_role_arn=self.sns_subscriptions_role.role_arn,
+ raw_message_delivery=True
+ )
+
+
+class SubscriptionConfirmation(DataSetScopedConstruct):
+ def __init__(
+ self, scope:
+ Construct,
+ construct_id: str,
+ ambassadors_config,
+ dataset_config) -> None:
+ super().__init__(scope, construct_id, ambassadors_config, dataset_config)
+
+ self.confirmation_lambda = _lambda.Function(
+ self, "Lambda",
+ runtime=_lambda.Runtime.PYTHON_3_9,
+ handler="subscription_confirmation_lambda.handler",
+ code=_lambda.Code.from_asset(path="lambda")
+ )
+
+ self.confirmation_lambda.add_to_role_policy(
+ iam.PolicyStatement(
+ actions=["sns:ConfirmSubscription"],
+ resources=["*"]
+ )
+ )
+
+ def subscribe_to_fanout(self, stream_fanout: StreamFanout):
+ sqs_event_source = lambda_event_source.SqsEventSource(
+ stream_fanout.subscription_confirmation_queue
+ )
+ self.confirmation_lambda.add_event_source(sqs_event_source)
+
+
+class AmzStreamConsumerStack(Stack):
+
+ def __init__(self, scope: Construct, advertising_region, installation_region, dataset_config, ambassadors_config, **kwargs) -> None:
+ super().__init__(
+ scope,
+ f"AmzStream-{advertising_region}-{dataset_config['dataSetId']}",
+ description=f"Amazon Marketing Stream Consumer "
+ f"for Advertising region: {advertising_region} Dataset: {dataset_config['dataSetId']}",
+ env=Environment(region=installation_region),
+ **kwargs
+ )
+
+ self.stream_ingress = StreamIngress(self, "Ingress", ambassadors_config, dataset_config)
+
+ self.stream_fanout = StreamFanout(self, "Fanout", ambassadors_config, dataset_config)
+ self.stream_fanout.subscribe_to_stream(self.stream_ingress)
+
+ self.stream_storage = StreamLanding(self, "Storage", ambassadors_config, dataset_config)
+ self.stream_storage.subscribe_to_fanout(self.stream_fanout)
+
+ self.subscription_confirmation = SubscriptionConfirmation(self, "SubsConfirmation",
+ ambassadors_config, dataset_config)
+ self.subscription_confirmation.subscribe_to_fanout(self.stream_fanout)
+
+ Tags.of(self).add("data_set_id", dataset_config['dataSetId'])
+
+
+
diff --git a/app.py b/app.py
new file mode 100644
index 0000000..2a85905
--- /dev/null
+++ b/app.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python3
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import yaml
+import aws_cdk as cdk
+from amz_stream_infra import infra_rollout
+
+try:
+ with open("stream_infrastructure_config.yml", "r") as f:
+ config = yaml.safe_load(f)
+except yaml.YAMLError as e:
+ print("Error in configuration file:", e)
+ raise e
+except FileNotFoundError as e:
+ print("Configuration file not found:", e)
+ raise e
+except Exception as e:
+ print("Unknown exception while loading config:", e)
+ raise e
+
+app = cdk.App()
+
+infra_rollout.rollout_stacks(app, config)
+
+app.synth()
diff --git a/architecture.png b/architecture.png
new file mode 100644
index 0000000..724127c
Binary files /dev/null and b/architecture.png differ
diff --git a/cdk.json b/cdk.json
new file mode 100644
index 0000000..0706ae3
--- /dev/null
+++ b/cdk.json
@@ -0,0 +1,37 @@
+{
+ "app": "python3 app.py",
+ "watch": {
+ "include": [
+ "**"
+ ],
+ "exclude": [
+ "README.md",
+ "cdk*.json",
+ "requirements*.txt",
+ "source.bat",
+ "**/__init__.py",
+ "python/__pycache__",
+ "tests"
+ ]
+ },
+ "context": {
+ "@aws-cdk/aws-apigateway:usagePlanKeyOrderInsensitiveId": true,
+ "@aws-cdk/core:stackRelativeExports": true,
+ "@aws-cdk/aws-rds:lowercaseDbIdentifier": true,
+ "@aws-cdk/aws-lambda:recognizeVersionProps": true,
+ "@aws-cdk/aws-lambda:recognizeLayerVersion": true,
+ "@aws-cdk/aws-cloudfront:defaultSecurityPolicyTLSv1.2_2021": true,
+ "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true,
+ "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true,
+ "@aws-cdk/core:checkSecretUsage": true,
+ "@aws-cdk/aws-iam:minimizePolicies": true,
+ "@aws-cdk/core:validateSnapshotRemovalPolicy": true,
+ "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true,
+ "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true,
+ "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true,
+ "@aws-cdk/core:target-partitions": [
+ "aws",
+ "aws-cn"
+ ]
+ }
+}
diff --git a/lambda/__init__.py b/lambda/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/lambda/aws_clients.py b/lambda/aws_clients.py
new file mode 100644
index 0000000..dd19ef8
--- /dev/null
+++ b/lambda/aws_clients.py
@@ -0,0 +1,19 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import boto3
+
+sns_client = boto3.client("sns")
+sqs_client = boto3.client("sqs")
diff --git a/lambda/batch.py b/lambda/batch.py
new file mode 100644
index 0000000..3f233c5
--- /dev/null
+++ b/lambda/batch.py
@@ -0,0 +1,19 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+def batch_of(data, max_batch_size):
+ for i in range(0, len(data), max_batch_size):
+ yield data[i:i + max_batch_size]
+
diff --git a/lambda/sqs_consuming_lambda.py b/lambda/sqs_consuming_lambda.py
new file mode 100644
index 0000000..3adc37b
--- /dev/null
+++ b/lambda/sqs_consuming_lambda.py
@@ -0,0 +1,70 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import json
+import logging as log
+import batch
+
+
+def is_subscription_confirmation(message):
+ body = json.loads(message["body"])
+ return body.get("Type", "") == "SubscriptionConfirmation"
+
+
+def get_messages_list(event):
+ return event.get("Records", [])
+
+
+def get_message_body(message):
+ return json.loads(message["body"])
+
+
+def as_error_id(message):
+ return {"itemIdentifier": message.get("messageId")}
+
+
+def default_batch_error_handler(error, context=None):
+ log.error("%s, additional info: %s", error, context)
+
+
+def process_messages_in_batches(
+ all_messages,
+ messages_filter,
+ batch_callback,
+ batch_failures,
+ max_batch_size,
+ error_handler=default_batch_error_handler,
+):
+ filtered_messages = list(filter(messages_filter, all_messages))
+
+ for next_batch in batch.batch_of(filtered_messages, max_batch_size):
+ try:
+ batch_callback(next_batch, batch_failures, error_handler)
+ except Exception as error:
+ # failure in callback, fail entire micro-batch
+ batch_failures.extend(next_batch)
+ error_handler(error, json.dumps(next_batch))
+
+
+def batch_handler(event, entire_batch_callback):
+ all_messages = get_messages_list(event)
+ batch_failures = []
+
+ entire_batch_callback(all_messages, batch_failures)
+
+ return {'batchItemFailures': [as_error_id(i) for i in batch_failures]}
+
+
+
diff --git a/lambda/stream_fanout_lambda.py b/lambda/stream_fanout_lambda.py
new file mode 100644
index 0000000..d3b165a
--- /dev/null
+++ b/lambda/stream_fanout_lambda.py
@@ -0,0 +1,80 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import json
+import os
+import aws_clients
+import sqs_consuming_lambda as sqs_lambda
+
+
+def on_route_to_sns(messages_batch, batch_failures, error_handler, destination_topic_arn):
+ batch_to_publish = [
+ {
+ "Id": str(i),
+ "Message": message["body"] + '\n',
+ }
+ for i, message in enumerate(messages_batch)
+ ]
+
+ response = aws_clients.sns_client.publish_batch(
+ TopicArn=destination_topic_arn,
+ PublishBatchRequestEntries=batch_to_publish
+ )
+ failures = response.get("Failed", [])
+ if failures:
+ error_handler(
+ f"Partial batch failure from SNS, {len(failures)} failed out of {len(messages_batch)}",
+ json.dumps(failures)
+ )
+
+ batch_failures.extend([messages_batch[int(failure["Id"])] for failure in failures])
+
+
+def on_route_to_sqs(messages_batch, batch_failures, error_handler, destination_queue_url):
+ for message in messages_batch:
+ try:
+ aws_clients.sqs_client.send_message(
+ QueueUrl=destination_queue_url,
+ MessageBody=message["body"]
+ )
+ except Exception as error:
+ batch_failures.append(message)
+ error_handler(error, json.dumps(message))
+
+
+def on_entire_batch(all_messages, batch_failures):
+ fanout_config = [
+ (
+ lambda x: not sqs_lambda.is_subscription_confirmation(x),
+ lambda x, y, z: on_route_to_sns(x, y, z, os.environ["DATA_FANOUT_TOPIC_ARN"])
+ ),
+ (
+ sqs_lambda.is_subscription_confirmation,
+ lambda x, y, z: on_route_to_sqs(x, y, z, os.environ["SUBSCRIPTION_CONFIRMATION_QUEUE_URL"])
+ )
+ ]
+
+ for messages_filter, batch_callback in fanout_config:
+ sqs_lambda.process_messages_in_batches(
+ all_messages,
+ messages_filter,
+ batch_callback,
+ batch_failures,
+ max_batch_size=10
+ )
+
+
+def handler(event, context):
+ return sqs_lambda.batch_handler(event, on_entire_batch)
diff --git a/lambda/subscription_confirmation_lambda.py b/lambda/subscription_confirmation_lambda.py
new file mode 100644
index 0000000..b9e899e
--- /dev/null
+++ b/lambda/subscription_confirmation_lambda.py
@@ -0,0 +1,52 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import aws_clients
+import json
+import sqs_consuming_lambda as sqs_lambda
+
+
+def on_confirm_subscription(messages_batch, batch_failures, error_handler):
+ for message in messages_batch:
+ try:
+ body = message["body"]
+ print(f"Confirmation request: {body}")
+
+ confirmation_request = json.loads(body)
+ topic_arn = confirmation_request["TopicArn"]
+ subs_token = confirmation_request["Token"]
+
+ aws_clients.sns_client.confirm_subscription(
+ TopicArn=topic_arn,
+ Token=subs_token
+ )
+ print(f"Confirmed: {body}")
+ except Exception as error:
+ batch_failures.append(message)
+ error_handler(error, json.dumps(message))
+
+
+def on_entire_batch(all_messages, batch_failures):
+ sqs_lambda.process_messages_in_batches(
+ all_messages,
+ lambda x: True,
+ on_confirm_subscription,
+ batch_failures,
+ max_batch_size=10
+ )
+
+
+def handler(event, context):
+ return sqs_lambda.batch_handler(event, on_entire_batch)
diff --git a/requirements-dev.txt b/requirements-dev.txt
new file mode 100644
index 0000000..9270945
--- /dev/null
+++ b/requirements-dev.txt
@@ -0,0 +1 @@
+pytest==6.2.5
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..507ba8b
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,9 @@
+aws-cdk-lib>=2.45
+aws-cdk.aws-kinesisfirehose-alpha
+aws-cdk.aws-kinesisfirehose-destinations-alpha
+boto3>=1.24
+constructs>=10.0.0,<11.0.0
+pyyaml
+python-amazon-ad-api>=0.3.7
+requests
+typer[all]
\ No newline at end of file
diff --git a/source.bat b/source.bat
new file mode 100644
index 0000000..9e1a834
--- /dev/null
+++ b/source.bat
@@ -0,0 +1,13 @@
+@echo off
+
+rem The sole purpose of this script is to make the command
+rem
+rem source .venv/bin/activate
+rem
+rem (which activates a Python virtualenv on Linux or Mac OS X) work on Windows.
+rem On Windows, this command just runs this batch file (the argument is ignored).
+rem
+rem Now we don't need to document a Windows command for activating a virtualenv.
+
+echo Executing .venv\Scripts\activate.bat for you
+.venv\Scripts\activate.bat
diff --git a/stream_infrastructure_config.yml b/stream_infrastructure_config.yml
new file mode 100644
index 0000000..6b8d861
--- /dev/null
+++ b/stream_infrastructure_config.yml
@@ -0,0 +1,48 @@
+# Amazon Marketing Stream Configuration File
+
+ambassadors:
+ reviewerArn: arn:aws:iam::926844853897:role/ReviewerRole
+
+datasets:
+ NA:
+ - dataSetId: sp-traffic
+ snsSourceArn: arn:aws:sns:us-east-1:906013806264:*
+
+ - dataSetId: sp-conversion
+ snsSourceArn: arn:aws:sns:us-east-1:802324068763:*
+
+ - dataSetId: budget-usage
+ snsSourceArn: arn:aws:sns:us-east-1:055588217351:*
+
+ - dataSetId: sd-traffic
+ snsSourceArn: arn:aws:sns:us-east-1:370941301809:*
+
+ - dataSetId: sd-conversion
+ snsSourceArn: arn:aws:sns:us-east-1:877712924581:*
+
+ EU:
+ - dataSetId: sp-traffic
+ snsSourceArn: arn:aws:sns:eu-west-1:668473351658:*
+
+ - dataSetId: sp-conversion
+ snsSourceArn: arn:aws:sns:eu-west-1:562877083794:*
+
+ - dataSetId: budget-usage
+ snsSourceArn: arn:aws:sns:eu-west-1:675750596317:*
+
+ FE:
+ - dataSetId: sp-traffic
+ snsSourceArn: arn:aws:sns:us-west-2:074266271188:*
+
+ - dataSetId: sp-conversion
+ snsSourceArn: arn:aws:sns:us-west-2:622939981599:*
+
+ - dataSetId: budget-usage
+ snsSourceArn: arn:aws:sns:us-west-2:100899330244:*
+
+# This config defines the AWS region where Stream consumer stack will be installed in your AWS account.
+# The selected regions ensure minimal latency in message consumption.
+consumerStackInstallationAwsRegion:
+ NA: us-east-1
+ EU: eu-west-1
+ FE: us-west-2
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/unit/test_amz_stream_wings_stack.py b/tests/unit/test_amz_stream_wings_stack.py
new file mode 100644
index 0000000..6e8ab53
--- /dev/null
+++ b/tests/unit/test_amz_stream_wings_stack.py
@@ -0,0 +1,49 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import aws_cdk as core
+import aws_cdk.assertions as assertions
+from amz_stream_infra.stack_definitions import AmzStreamConsumerStack
+
+AMBASSADOR_CONFIG = {
+ "reviewerArn": "arn:aws:iam::926844853897:role/ReviewerRole"
+}
+
+DATASET_CONFIG = {
+ "NA": [
+ {
+ "dataSetId": "sp-traffic",
+ "snsSourceArn": "arn:aws:sns:us-east-1:906013806264:*"
+ }
+ ]
+}
+
+
+def test_sqs_queue_created():
+ app = core.App()
+ stack = AmzStreamConsumerStack(app, "NA", "us-east-1", DATASET_CONFIG["NA"][0], AMBASSADOR_CONFIG)
+ template = assertions.Template.from_stack(stack)
+
+ template.has_resource_properties("AWS::SQS::Queue", {
+ "VisibilityTimeout": 60
+ })
+
+
+def test_sns_topic_created():
+ app = core.App()
+ stack = AmzStreamConsumerStack(app, "NA", "us-east-1", DATASET_CONFIG["NA"][0], AMBASSADOR_CONFIG)
+ template = assertions.Template.from_stack(stack)
+
+ template.resource_count_is("AWS::SNS::Topic", 1)