Skip to content

Commit

Permalink
feat(capture-rs): implement replay capture in capture-rs (#24461)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankh authored Aug 21, 2024
1 parent 112aa68 commit 8a6020d
Show file tree
Hide file tree
Showing 10 changed files with 413 additions and 75 deletions.
29 changes: 14 additions & 15 deletions docker-compose.base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ services:
}
handle @replay-capture {
reverse_proxy replay-capture:8000
reverse_proxy replay-capture:3000
}
handle {
Expand Down Expand Up @@ -155,32 +155,31 @@ services:

capture:
image: ghcr.io/posthog/posthog/capture:master
build:
context: rust/
args:
BIN: capture
restart: on-failure
environment:
ADDRESS: '0.0.0.0:3000'
KAFKA_TOPIC: 'events_plugin_ingestion'
KAFKA_HOSTS: 'kafka:9092'
REDIS_URL: 'redis://redis:6379/'
CAPTURE_MODE: events

replay-capture:
image: ghcr.io/posthog/posthog/replay-capture:master
image: ghcr.io/posthog/posthog/capture:master
build:
context: vector/replay-capture
context: rust/
args:
BIN: capture
restart: on-failure
entrypoint: ['sh', '-c']
command:
- |
set -x
# seed empty required data files
mkdir -p /etc/vector/data
echo "token" > /etc/vector/data/quota_limited_teams.csv
echo "session_id" > /etc/vector/data/overflow_sessions.csv
exec vector -v --watch-config
environment:
KAFKA_EVENTS_TOPIC: session_recording_snapshot_item_events
KAFKA_OVERFLOW_TOPIC: session_recording_snapshot_item_overflow
KAFKA_BOOSTRAP_SERVERS: 'kafka:9092'
ADDRESS: '0.0.0.0:3000'
KAFKA_TOPIC: 'session_recording_snapshot_item_events'
KAFKA_HOSTS: 'kafka:9092'
REDIS_URL: 'redis://redis:6379/'
CAPTURE_MODE: recordings

plugins:
command: ./bin/plugin-server --no-restart-loop
Expand Down
20 changes: 19 additions & 1 deletion rust/capture/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub enum CaptureResponseCode {
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct CaptureResponse {
pub status: CaptureResponseCode,

#[serde(skip_serializing_if = "Option::is_none")]
pub quota_limited: Option<Vec<String>>,
}

#[derive(Error, Debug)]
Expand All @@ -32,6 +35,14 @@ pub enum CaptureError {
EmptyDistinctId,
#[error("event submitted without a distinct_id")]
MissingDistinctId,
#[error("replay event submitted without snapshot data")]
MissingSnapshotData,
#[error("replay event submitted without session id")]
MissingSessionId,
#[error("replay event submitted without window id")]
MissingWindowId,
#[error("replay event has invalid session id")]
InvalidSessionId,

#[error("event submitted without an api_key")]
NoTokenError,
Expand Down Expand Up @@ -64,7 +75,11 @@ impl IntoResponse for CaptureError {
| CaptureError::EmptyDistinctId
| CaptureError::MissingDistinctId
| CaptureError::EventTooBig
| CaptureError::NonRetryableSinkError => (StatusCode::BAD_REQUEST, self.to_string()),
| CaptureError::NonRetryableSinkError
| CaptureError::MissingSessionId
| CaptureError::MissingWindowId
| CaptureError::InvalidSessionId
| CaptureError::MissingSnapshotData => (StatusCode::BAD_REQUEST, self.to_string()),

CaptureError::NoTokenError
| CaptureError::MultipleTokensError
Expand All @@ -87,6 +102,7 @@ pub enum DataType {
ClientIngestionWarning,
HeatmapMain,
ExceptionMain,
SnapshotMain,
}
#[derive(Clone, Debug, Serialize, Eq, PartialEq)]
pub struct ProcessedEvent {
Expand All @@ -103,6 +119,8 @@ pub struct ProcessedEvent {
)]
pub sent_at: Option<OffsetDateTime>,
pub token: String,
#[serde(skip_serializing)]
pub session_id: Option<String>,
}

impl ProcessedEvent {
Expand Down
21 changes: 21 additions & 0 deletions rust/capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,24 @@ use std::{net::SocketAddr, num::NonZeroU32};

use envconfig::Envconfig;

#[derive(Debug, PartialEq, Clone)]
pub enum CaptureMode {
Events,
Recordings,
}

impl std::str::FromStr for CaptureMode {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_lowercase().as_ref() {
"events" => Ok(CaptureMode::Events),
"recordings" => Ok(CaptureMode::Recordings),
_ => Err(format!("Unknown Capture Type: {s}")),
}
}
}

#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(default = "false")]
Expand Down Expand Up @@ -37,6 +55,9 @@ pub struct Config {
#[envconfig(default = "true")]
pub export_prometheus: bool,
pub redis_key_prefix: Option<String>,

#[envconfig(default = "events")]
pub capture_mode: CaptureMode,
}

#[derive(Envconfig, Clone)]
Expand Down
35 changes: 27 additions & 8 deletions rust/capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use crate::{
limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource, v0_endpoint,
};

use crate::config::CaptureMode;
use crate::prometheus::{setup_metrics_recorder, track_metrics};

const EVENT_BODY_SIZE: usize = 2 * 1024 * 1024; // 2MB
const BATCH_BODY_SIZE: usize = 20 * 1024 * 1024; // 20MB, up from the default 2MB used for normal event payloads
const RECORDING_BODY_SIZE: usize = 20 * 1024 * 1024; // 20MB, up from the default 2MB used for normal event payloads

#[derive(Clone)]
pub struct State {
Expand All @@ -43,6 +45,7 @@ pub fn router<
redis: Arc<R>,
billing: BillingLimiter,
metrics: bool,
capture_mode: CaptureMode,
) -> Router {
let state = State {
sink: Arc::new(sink),
Expand Down Expand Up @@ -106,14 +109,30 @@ pub fn router<
.route("/_readiness", get(index))
.route("/_liveness", get(move || ready(liveness.get_status())));

let router = Router::new()
.merge(batch_router)
.merge(event_router)
.merge(status_router)
.layer(TraceLayer::new_for_http())
.layer(cors)
.layer(axum::middleware::from_fn(track_metrics))
.with_state(state);
let recordings_router = Router::new()
.route(
"/s",
post(v0_endpoint::recording)
.get(v0_endpoint::recording)
.options(v0_endpoint::options),
)
.route(
"/s/",
post(v0_endpoint::recording)
.get(v0_endpoint::recording)
.options(v0_endpoint::options),
)
.layer(DefaultBodyLimit::max(RECORDING_BODY_SIZE));

let router = match capture_mode {
CaptureMode::Events => Router::new().merge(batch_router).merge(event_router),
CaptureMode::Recordings => Router::new().merge(recordings_router),
}
.merge(status_router)
.layer(TraceLayer::new_for_http())
.layer(cors)
.layer(axum::middleware::from_fn(track_metrics))
.with_state(state);

// Don't install metrics unless asked to
// Installing a global recorder when capture is used as a library (during tests etc)
Expand Down
2 changes: 2 additions & 0 deletions rust/capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ where
redis_client,
billing,
config.export_prometheus,
config.capture_mode,
)
} else {
let sink_liveness = liveness
Expand Down Expand Up @@ -86,6 +87,7 @@ where
redis_client,
billing,
config.export_prometheus,
config.capture_mode,
)
};

Expand Down
14 changes: 13 additions & 1 deletion rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_trait::async_trait;
use health::HealthHandle;
use metrics::{counter, gauge, histogram};
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
Expand Down Expand Up @@ -179,6 +180,8 @@ impl KafkaSink {
})?;

let event_key = event.key();
let session_id = event.session_id.as_deref();

let (topic, partition_key): (&str, Option<&str>) = match &event.data_type {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
DataType::AnalyticsMain => {
Expand All @@ -199,6 +202,10 @@ impl KafkaSink {
),
DataType::HeatmapMain => (&self.heatmaps_topic, Some(event_key.as_str())),
DataType::ExceptionMain => (&self.exceptions_topic, Some(event_key.as_str())),
DataType::SnapshotMain => (
&self.main_topic,
Some(session_id.ok_or(CaptureError::MissingSessionId)?),
),
};

match self.producer.send_result(FutureRecord {
Expand All @@ -207,7 +214,10 @@ impl KafkaSink {
partition: None,
key: partition_key,
timestamp: None,
headers: None,
headers: Some(OwnedHeaders::new().insert(Header {
key: "token",
value: Some(&event.token),
})),
}) {
Ok(ack) => Ok(ack),
Err((e, _)) => match e.rdkafka_error_code() {
Expand Down Expand Up @@ -361,6 +371,7 @@ mod tests {
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
session_id: None,
};

// Wait for producer to be healthy, to keep kafka_message_timeout_ms short and tests faster
Expand Down Expand Up @@ -393,6 +404,7 @@ mod tests {
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
session_id: None,
};
match sink.send(big_event).await {
Err(CaptureError::EventTooBig) => {} // Expected
Expand Down
Loading

0 comments on commit 8a6020d

Please sign in to comment.