Skip to content

Commit

Permalink
updated s3-postgres-kafka demo
Browse files Browse the repository at this point in the history
  • Loading branch information
martypitt committed Sep 10, 2024
1 parent 4a40803 commit 581add0
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 2 deletions.
78 changes: 76 additions & 2 deletions s3-postgres-kafka/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
# S3 / Postgres / Kafka

This project shows integration between S3, Postgres and Kafka
This project shows integration between S3, Postgres and Kafka.

In this demo, we'll cover:

* Reading from S3
* Enriching S3 data against an API
* Writing enriched data into a database
* Writing enriched data out onto Kafka
* Reading enriched data from Kafka

We use some fictitious cinema ticket sale information for our demo.

## Stub services
This demo project has a number of stub services deployed.

You can see the details by viewing [stack.nebula.kts](/projects/com.petflix:s3-postgres-kafka:0.1.0/source?selectedFile=orbital%2Fnebula%2Fstack.nebula.kts).

These services should start automatically. However, if not, check the docs on [enabling stubs](https://orbitalhq.com/docs/testing/stubbing-services#enabling-stubs)

## Reading ticket sales from S3
Ticket sales are returned from an S3 bucket
Expand All @@ -26,6 +43,8 @@ service TicketsS3Bucket {
## Enrich ticket sales with venue data
The [TicketPricesApi](/services/TicketPricesApi) is a REST API that returns ticket prices for each cinema.

This diagram shows the relationships between the two sevices:

```components
{
"members" : {
Expand Down Expand Up @@ -105,9 +124,64 @@ so automatically transforms the data.
As we need to load some data from a REST API (the [TicketPricesApi](/services/TicketPricesApi)), Orbital automatically invokes
the REST API as before, enriching the data prior to persisting.

### how this works
### How this works

* The `connections.conf` ([source](/projects/com.petflix:s3-postgres-kafka:0.1.0/source?selectedFile=orbital%2Fconfig%2Fconnections.conf)) defines how to connect to Postgres (`my-postgres-db`)


## Writing to Kafka
We can also write the messages from our S3 files to Kafka.

Our schema contains a Kafka service declared named [MyKafkaConnectionService](/services/MyKafkaConnectionService) ([source](/projects/com.petflix:s3-postgres-kafka:0.1.0/source?selectedFile=ticketSales.kafka.taxi))
which publishes a stream of `VenueTicketSalesMessage`:

```taxi
@KafkaService(connectionName = "my-kafka-connection")
service MyKafkaConnectionService {
// A stream of messages, we can read from
@KafkaOperation(topic = "ticket-sales" , offset = "earliest")
stream venueTicketSales : Stream<VenueTicketSalesMessage>
// A publishing operation, allowing us to write messages to the topic
@KafkaOperation( topic = "ticket-sales", offset = "earliest" )
write operation publishTicketSalesMessage(VenueTicketSalesMessage):VenueTicketSalesMessage
}
```

We can issue a query to read data from our S3 bucket, and write to Kafka - again, enriching the data using our REST API
to add cinema ticket prices.

```taxiql
import MyKafkaConnectionService
find { VenueTicketSales[] }
call MyKafkaConnectionService::publishTicketSalesMessage
```

### How this works:
* The `connections.conf` file ([source](/projects/com.petflix:s3-postgres-kafka:0.1.0/source?selectedFile=orbital%2Fconfig%2Fconnections.conf)) defines how to connect to Kafka
* Orbital connects to S3 (as discussed above), and reads the data from the CSV file
* Orbital detects that the S3 bucket's type of `VenueTicketSales` doesn't match the required data of `VenueTicketSalesMessage`, so automatically transforms the data, enriching the message by calling our REST API.
* Orbital writes each message onto Kafka.
* Note: `VenueTicketSalesMessage` doesn't declare a message format (unlike `VenueTicketSales` - which is annotated with `@Csv`). Therefore, it's assumed to be JSON
* Orbital supports writing to Kafka in multiple different formats, like Avro, Protobuf, and more.

## Reading back off from Kafka
To read the messages back off of Kafka, we can issue another query:

```taxiql
import VenueTicketSalesMessage
stream { VenueTicketSalesMessage }
```

This streams data directly from Kafka.

## What's next
That's all for this tutorial.

Other things we didn't explore are:
* [Publishing our data as an API](https://orbitalhq.com/docs/querying/queries-as-endpoints)
* [Creating a streaming pipeline of data](https://orbitalhq.com/docs/querying/streaming-data)
* [Adding security policies](https://orbitalhq.com/docs/data-policies/data-policies)

To find out more, reach out to the Orbital team on [Slack](https://join.slack.com/t/orbitalapi/shared_invite/zt-697laanr-DHGXXak5slqsY9DqwrkzHg) or on [Github](https://github.com/orbitalapi/orbital)
35 changes: 35 additions & 0 deletions s3-postgres-kafka/orbital/config/connections.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
aws {
my-aws-account {
# Optional Parameter. When not provided Orbital will use the [AWS default credentials provider](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default) by default.
accessKey=${NEBULA_S3_ACCESS_KEY}
connectionName=my-aws-account
# Optional parameter for development and testing purposes to point to a different endpoint such as a LocalStack installation.
endPointOverride=${NEBULA_S3_ENDPOINT_OVERRIDE}
# Mandatory
region=eu-west-1
# Optional Parameter. When not provided Orbital will use the [AWS default credentials provider](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default) by default.
secretKey=${NEBULA_S3_SECRET_KEY}
}
}
jdbc {
my-postgres-db {
connectionName=my-postgres-db
connectionParameters {
database=${NEBULA_POSTGRES_DATABASE_NAME}
host=localhost
password=${NEBULA_POSTGRES_PASSWORD}
port=${NEBULA_POSTGRES_PORT}
username=${NEBULA_POSTGRES_USERNAME}
}
jdbcDriver=POSTGRES
}
}
kafka {
my-kafka-connection {
connectionName=my-kafka-connection
connectionParameters {
brokerAddress=${NEBULA_KAFKA_BOOTSTRAP_SERVERS}
groupId=vyne
}
}
}
5 changes: 5 additions & 0 deletions s3-postgres-kafka/orbital/config/services.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
services {
ticketsApi {
url="http://localhost:"${NEBULA_HTTP_PORT}""
}
}
4 changes: 4 additions & 0 deletions s3-postgres-kafka/orbital/nebula/stack.nebula.kts
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,8 @@ Hamilton,BONTN,2024-08-30,Johns Livning Room,E81Q,295,2827.72
call.respondText("""{ "price" : $price }""", ContentType.parse("application/json"))
}
}

kafka {

}
}
22 changes: 22 additions & 0 deletions s3-postgres-kafka/src/ticketSales.kafka.taxi
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation

@KafkaService(connectionName = "my-kafka-connection")
service MyKafkaConnectionService {
@KafkaOperation(topic = "ticket-sales" , offset = "earliest")
stream venueTicketSales : Stream<VenueTicketSalesMessage>

@KafkaOperation( topic = "ticket-sales", offset = "earliest" )
write operation publishTicketSalesMessage(VenueTicketSalesMessage):VenueTicketSalesMessage
}

closed parameter model VenueTicketSalesMessage {
title : Title
filmId: FilmId
screeningDate : ScreeningDate
cinemaName: CinemaName
cinemaId: CinemaId
tickets: TicketsSold
ticketPrice: Price
revenue: TicketRevenue
}

0 comments on commit 581add0

Please sign in to comment.