Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
swoehrl-mw committed Jan 25, 2024
1 parent dd34c6a commit 579063f
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 122 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
GITHUB_REF: ${{ github.ref }}
run: |
VERSION=${GITHUB_REF//refs\/tags\/v}
sed -i 's/0.0.1/'"${VERSION}"'/' helm/mqtt-kafka-forwarding-service/Chart.yaml
sed -i 's/0.1.0/'"${VERSION}"'/' helm/mqtt-kafka-forwarding-service/Chart.yaml
sed -i 's/dev/'"${VERSION}"'/' helm/mqtt-kafka-forwarding-service/values.yaml
- name: Publish helm chart
Expand Down
130 changes: 129 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ serde_yaml = "0.9"
yaml-rust = "0.4.5"
serde_json = "1.0"
log = "0.4.20"
simple_logger = "4.3.3"
ctrlc = "3.4.2"
base64 = "0.21.7"
axum = {version="0.7.4"}
prometheus-client = "0.22.0"
lazy_static = "1.4.0"
env_logger = "0.11.0"


[workspace]
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ The service is written in Rust and has the following features:

* Guarantees At-Least-Once operations due to using manual acknowledgement in MQTT
* Optionally wraps MQTT payloads in a json object which preserves the original topic (`{"topic": "foo/bar", "payload": "somebase64edpayload"}`). Can be useful if later processing steps need the original MQTT topic (e.g. if some device-id is encoded in the topic but not repeated in the payload)
* Uses the MQTT topic as the kafka message key. This gives access to the topic even if the wrap option is not used and it makes sure messages from the same MQTT topic end up in the same kafka partition, preserving message ordering.

## Quickstart

The forwarding-service can be deployed in kubernetes using our helm chart:

1. `helm repo add maibornwolff https://maibornwolff.github.io/mqtt-kafka-forwarding-service/`
1. `helm repo add forwarding https://maibornwolff.github.io/mqtt-kafka-forwarding-service/`
2. Create a values.yaml file with your configuration:

```yaml
Expand All @@ -33,7 +34,7 @@ The forwarding-service can be deployed in kubernetes using our helm chart:
wrap_as_json: false
```
3. `helm install mqtt-kafka-forwarding-service maibornwolff/mqtt-kafka-forwarding-service -f values.yaml`
3. `helm install mqtt-kafka-forwarding-service forwarding/mqtt-kafka-forwarding-service -f values.yaml`

Or if you are running outside of Kubernetes the forwarding-service can be deployed as a docker image:

Expand All @@ -50,8 +51,7 @@ You can build your own custom binary by following these steps:

To build your own docker image follow these steps:

1. `docker run --rm -it -v cargo-cache:/root/.cargo/registry -v "$(pwd)":/volume clux/muslrust:1.59.0 cargo build --release`
2. `docker build . -t <my-image-name>`
1. `docker build . -t <my-image-name>`

## Configuration

Expand Down Expand Up @@ -118,7 +118,7 @@ kafka:

The forwarding-service supports two ways for the service to authenticate itsself to MQTT and Kafka:

Client certificate authentication: See [TLS](#TLS) above.
Client certificate authentication: See [TLS](#tls) above.

Username/Password authentication:

Expand Down Expand Up @@ -170,7 +170,7 @@ Performance was measured using the benchmark tool by repeatedly sending 1 millio

Enabling the payload wrapping does not have any measurable impact on performance. Also HiveMQ is not the bottleneck, using QoS 1 the benchmark tool is able to publish about 9000 msg/s to the broker, so way more than the forwarding-service can process. Measuring the performance for QoS 0 was hard as at higher message rates messages get dropped (this is allowed in the specification and can happen in the broker or in the libraries on either the publisher or subscriber side due to overload). As such the provided number is one where no message drops happened. In some measurements about 10000 msg/s were possible without suffering losses, going higher increases the risk of dropped messages exponentially.

We also created a Python implementation of the forwarding service (sourcecode not included in this repository) to compare performance and because python is more common in the company than Rust. We implemented two variants, one using the [confluent-kafka-python library](https://github.com/confluentinc/confluent-kafka-python) and one using the [kafka-python library](https://github.com/dpkp/kafka-python). At first glance the confluent library showed good peak performance of about 1200 msg/s for QoS 1. But when running the benchmark with more messages or severl times then quite quickly the message rate drops significantly to about 300 msg/s. A flamegraph analysis showed that most of the time is spent in kafka library code (communicating with and waiting for the broker), so we assume the performance drop is due to some behaviour of the library. Switching out the confluent library with the third-pary kafka-python library gives a stable performance but slower than the confluent library peak performance.
We also created a Python implementation of the forwarding service (sourcecode not included in this repository) to compare performance and because python is more common in the company than Rust. We implemented two variants, one using the [confluent-kafka-python library](https://github.com/confluentinc/confluent-kafka-python) and one using the [kafka-python library](https://github.com/dpkp/kafka-python). At first glance the confluent library showed good peak performance of about 1200 msg/s for QoS 1. But when running the benchmark with more messages or several times then quite quickly the message rate drops significantly to about 300 msg/s. A flamegraph analysis showed that most of the time is spent in kafka library code (communicating with and waiting for the broker), so we assume the performance drop is due to some behaviour of the library. Switching out the confluent library with the third-pary kafka-python library gives a stable performance but slower than the confluent library peak performance.

| QoS | confluent peak rate | confluent avg rate | kafka-python avg rate |
|-----|---------------------|--------------------|-----------------------|
Expand Down
12 changes: 6 additions & 6 deletions benchmark/src/consume.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::time::Duration;

use crate::message::BenchmarkMessage;
use base64::prelude::*;
use rdkafka::{
consumer::{Consumer, StreamConsumer},
ClientConfig, Message, Offset, TopicPartitionList,
};
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use base64::prelude::*;

static TOPIC_NAME: &str = "stresstest";

Expand Down Expand Up @@ -55,7 +55,8 @@ impl BenchmarkConsumer {
if count == 1 {
start_instant = Instant::now();
}
let message: BenchmarkMessage = unwrap_message(msg.payload().unwrap(), wrapped_payload);
let message: BenchmarkMessage =
unwrap_message(msg.payload().unwrap(), wrapped_payload);
let last = message.last;
if count == 1 && message.id != 0 {
println!("WARNING: First received message does have ID {} instead of 0. Message might be missing.\n", message.id);
Expand All @@ -73,13 +74,12 @@ impl BenchmarkConsumer {
}
}


fn unwrap_message(payload: &[u8], wrapped_payload: bool) -> BenchmarkMessage {
if wrapped_payload {
let msg : WrappedPayload = serde_json::from_slice(payload).unwrap();
let msg: WrappedPayload = serde_json::from_slice(payload).unwrap();
let payload = BASE64_STANDARD.decode(msg.payload).unwrap();
serde_json::from_slice(payload.as_ref()).unwrap()
} else {
serde_json::from_slice(payload).unwrap()
}
}
}
8 changes: 2 additions & 6 deletions benchmark/src/produce.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::message::BenchmarkMessage;
use crate::MQTT_TOPIC;
use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::{
sync::{
Expand All @@ -8,7 +9,6 @@ use std::{
time::Duration,
};
use tokio::time::Instant;
use crate::MQTT_TOPIC;

pub struct BenchmarkProducer {
messages: Vec<BenchmarkMessage>,
Expand Down Expand Up @@ -61,11 +61,7 @@ impl BenchmarkProducer {
};
let payload_string = serde_json::to_string(&message).unwrap();
let payload: &[u8] = payload_string.as_ref();
match self
.client
.publish(MQTT_TOPIC, qos, true, payload)
.await
{
match self.client.publish(MQTT_TOPIC, qos, true, payload).await {
Ok(_) => {
self.messages.push(message);
}
Expand Down
9 changes: 2 additions & 7 deletions benchmark/src/setup.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::KAFKA_TOPIC;
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
client::DefaultClientContext,
ClientConfig,
};
use crate::KAFKA_TOPIC;


pub async fn create_stresstest_topic() {
let client: AdminClient<DefaultClientContext> = ClientConfig::new()
Expand All @@ -19,11 +18,7 @@ pub async fn create_stresstest_topic() {
.await
.expect("Error deleting topic");

let new_topics = [NewTopic::new(
KAFKA_TOPIC,
1,
TopicReplication::Fixed(1),
)];
let new_topics = [NewTopic::new(KAFKA_TOPIC, 1, TopicReplication::Fixed(1))];
client
.create_topics(&new_topics, &opts)
.await
Expand Down
2 changes: 1 addition & 1 deletion helm/mqtt-kafka-forwarding-service/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ apiVersion: v2
name: mqtt-kafka-forwarding-service
description: Helm chart to deploy the mqtt-kafka-forwarding-service
type: application
version: 0.0.1
version: 0.1.0
appVersion: "0.1.0"
17 changes: 17 additions & 0 deletions helm/mqtt-kafka-forwarding-service/templates/servicemonitor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{{- if .Values.prometheus.enabled }}
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "fowarding.fullname" . }}
labels:
{{- include "fowarding.labels" . | nindent 4 }}
spec:
selector:
matchLabels:
{{- include "fowarding.selectorLabels" . | nindent 6 }}
endpoints:
- port: {{ .Values.service.name }}
interval: {{ .Values.prometheus.interval | default "30s" }}
path: "/metrics"
scheme: HTTP
{{- end }}
5 changes: 5 additions & 0 deletions helm/mqtt-kafka-forwarding-service/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ service:
name: http
targetPort: http

prometheus:
# Set to true to deploy a ServiceMonitor to have the forwarding service scraped for metrics
enabled: false
# Scrape internval
interval: 30s

nodeSelector: {}

Expand Down
Loading

0 comments on commit 579063f

Please sign in to comment.