From 758c25611d7e6e0ac2ac25626b64b693ff93dad6 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Thu, 3 Oct 2024 11:58:47 +0300 Subject: [PATCH] feat(err): we no longer read straight from capture (#25357) --- rust/Cargo.lock | 44 +------------------ rust/Cargo.toml | 1 - rust/error-tracking/Cargo.toml | 3 +- rust/error-tracking/src/app_context.rs | 6 +-- rust/error-tracking/src/config.rs | 8 +--- rust/error-tracking/src/lib.rs | 1 - rust/error-tracking/src/main.rs | 32 +++----------- rust/error-tracking/src/team_cache.rs | 58 -------------------------- 8 files changed, 10 insertions(+), 143 deletions(-) delete mode 100644 rust/error-tracking/src/team_cache.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index a3126214c319a..2eec65238bdb1 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1167,12 +1167,11 @@ dependencies = [ "common-alloc", "common-kafka", "common-metrics", - "common-types", "envconfig", "health", "metrics", - "moka", "rdkafka", + "serde_json", "sqlx", "thiserror", "tokio", @@ -2412,26 +2411,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "moka" -version = "0.12.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" -dependencies = [ - "crossbeam-channel", - "crossbeam-epoch", - "crossbeam-utils", - "once_cell", - "parking_lot", - "quanta 0.12.2", - "rustc_version", - "smallvec", - "tagptr", - "thiserror", - "triomphe", - "uuid", -] - [[package]] name = "native-tls" version = "0.2.11" @@ -3393,15 +3372,6 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" -[[package]] -name = "rustc_version" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" -dependencies = [ - "semver", -] - [[package]] name = "rustix" version = "0.37.27" @@ -4112,12 +4082,6 @@ dependencies = [ "libc", ] -[[package]] -name = "tagptr" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" - [[package]] name = "tempfile" version = "3.10.0" @@ -4504,12 +4468,6 @@ dependencies = [ "tracing-log", ] -[[package]] -name = "triomphe" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" - [[package]] name = "try-lock" version = "0.2.5" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 33c16e0221d7c..a060d845a5e89 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -91,6 +91,5 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = { version = "2.5.0 " } uuid = { version = "1.6.1", features = ["v7", "serde"] } neon = "1" -moka = {version = "0.12.8", features = ["sync"] } quick_cache = "0.6.9" ahash = "0.8.11" \ No newline at end of file diff --git a/rust/error-tracking/Cargo.toml b/rust/error-tracking/Cargo.toml index 5541c64ddc61f..fdf81846f6d32 100644 --- a/rust/error-tracking/Cargo.toml +++ b/rust/error-tracking/Cargo.toml @@ -14,11 +14,10 @@ axum = { workspace = true } metrics = { workspace = true } common-metrics = { path = "../common/metrics" } common-alloc = { path = "../common/alloc" } -common-types = { path = "../common/types" } common-kafka = { path = "../common/kafka" } thiserror = { workspace = true } sqlx = { workspace = true } -moka = { workspace = true } +serde_json = { workspace = true } [lints] workspace = true diff --git a/rust/error-tracking/src/app_context.rs b/rust/error-tracking/src/app_context.rs index 6ec9e2eb0b86e..3d21c2bd24b78 100644 --- a/rust/error-tracking/src/app_context.rs +++ b/rust/error-tracking/src/app_context.rs @@ -5,14 +5,13 @@ use health::{HealthHandle, HealthRegistry}; use sqlx::{postgres::PgPoolOptions, PgPool}; use tracing::info; -use crate::{config::Config, error::Error, team_cache::TeamCache}; +use crate::{config::Config, error::Error}; pub struct AppContext { pub health_registry: HealthRegistry, pub worker_liveness: HealthHandle, pub consumer: SingleTopicConsumer, pub pool: PgPool, - pub team_cache: TeamCache, } impl AppContext { @@ -32,14 +31,11 @@ impl AppContext { config.consumer.kafka_consumer_topic ); - let team_cache = TeamCache::new(config.team_cache_capacity, config.team_cache_ttl_secs); - Ok(Self { health_registry, worker_liveness, consumer, pool, - team_cache, }) } } diff --git a/rust/error-tracking/src/config.rs b/rust/error-tracking/src/config.rs index b3e3875efbe14..b29630710127b 100644 --- a/rust/error-tracking/src/config.rs +++ b/rust/error-tracking/src/config.rs @@ -21,17 +21,11 @@ pub struct Config { // Rust service connect directly to postgres, not via pgbouncer, so we keep this low #[envconfig(default = "4")] pub max_pg_connections: u32, - - #[envconfig(default = "300")] - pub team_cache_ttl_secs: u64, - - #[envconfig(default = "10000")] - pub team_cache_capacity: u64, } impl Config { pub fn init_with_defaults() -> Result { - ConsumerConfig::set_defaults("error-tracking-rs", "exceptions_ingestions"); + ConsumerConfig::set_defaults("error-tracking-rs", "exception_symbolification_events"); Self::init_from_env() } } diff --git a/rust/error-tracking/src/lib.rs b/rust/error-tracking/src/lib.rs index a8ef97726b652..806fb49e9cef5 100644 --- a/rust/error-tracking/src/lib.rs +++ b/rust/error-tracking/src/lib.rs @@ -1,4 +1,3 @@ pub mod app_context; pub mod config; pub mod error; -pub mod team_cache; diff --git a/rust/error-tracking/src/main.rs b/rust/error-tracking/src/main.rs index ac7af460753a0..a291694161ea0 100644 --- a/rust/error-tracking/src/main.rs +++ b/rust/error-tracking/src/main.rs @@ -3,11 +3,11 @@ use std::{future::ready, sync::Arc}; use axum::{routing::get, Router}; use common_kafka::kafka_consumer::RecvErr; use common_metrics::{serve, setup_metrics_routes}; -use common_types::{CapturedEvent, Team}; use envconfig::Envconfig; use error_tracking::{app_context::AppContext, config::Config, error::Error}; +use serde_json::Value; use tokio::task::JoinHandle; -use tracing::{error, info, warn}; +use tracing::{error, info}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; common_alloc::used!(); @@ -55,7 +55,9 @@ async fn main() -> Result<(), Error> { loop { context.worker_liveness.report_healthy().await; - let (event, offset): (CapturedEvent, _) = match context.consumer.json_recv().await { + // Just grab the event as a serde_json::Value and immediately drop it, + // we can work out a real type for it later (once we're deployed etc) + let (_, offset): (Value, _) = match context.consumer.json_recv().await { Ok(r) => r, Err(RecvErr::Kafka(e)) => { return Err(e.into()); // Just die if we recieve a Kafka error @@ -63,35 +65,13 @@ async fn main() -> Result<(), Error> { Err(err) => { // If we failed to parse the message, or it was empty, just log and continue, our // consumer has already stored the offset for us. - metrics::counter!("error_tracking_errors").increment(1); + metrics::counter!("error_tracking_errors", "cause" => "recv_err").increment(1); error!("Error receiving message: {:?}", err); continue; } }; metrics::counter!("error_tracking_events_received").increment(1); - // Team verification - // If we encounter a sqlx error, just die. The by_token call explicitly - // returns an Option, so any error here is a PG failure, and us falling - // over sheds load. - let token = &event.token; - let pool = &context.pool; - let team = context - .team_cache - .get_or_insert_with(token, async move { Ok(Team::by_token(pool, token).await?) }) - .await?; - - match team { - Some(t) => t, - None => { - // If we failed to find the team, just store the offset and drop the event, we don't care about it - offset.store().unwrap(); - warn!("Team not found for token: {}", event.token); - metrics::counter!("error_tracking_team_not_found").increment(1); - continue; - } - }; - // This is where the rest of the processing would go offset.store().unwrap(); } diff --git a/rust/error-tracking/src/team_cache.rs b/rust/error-tracking/src/team_cache.rs deleted file mode 100644 index f8a15f9b6b417..0000000000000 --- a/rust/error-tracking/src/team_cache.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::{future::Future, time::Duration}; - -use common_types::Team; -use moka::sync::{Cache, CacheBuilder}; -use tokio::sync::RwLock; - -use crate::error::Error; - -// This /could/ be moved into common/types, but I'd have to take -// a dependency on tokio and moka to do it, and I don't want to -// do that just yet (although it is used across here and feature -// flags, so....) -pub struct TeamCache { - // The lock here isn't necessary (the Cache is concurrent), - // but is used to ensure the DB is only hit once. - // Note that we cache the none-case to prevent - // people hammering us with false tokens and bringing - // down PG. - teams: RwLock>>, -} - -impl TeamCache { - pub fn new(capacity: u64, ttl_seconds: u64) -> Self { - let cache = CacheBuilder::new(capacity) - .time_to_live(Duration::from_secs(ttl_seconds)) - .build(); - - Self { - teams: RwLock::new(cache), - } - } - - pub async fn get_or_insert_with(&self, token: &str, f: F) -> Result, Error> - where - F: Future, Error>>, - { - let teams = self.teams.read().await; - if let Some(team) = teams.get(token) { - return Ok(team.clone()); - } - drop(teams); - let teams = self.teams.write().await; - if let Some(team) = teams.get(token) { - return Ok(team.clone()); - } - let team = f.await?; - teams.insert(token.to_string(), team.clone()); - Ok(team) - } - - pub async fn contains(&self, token: &str) -> bool { - self.teams.read().await.contains_key(token) - } - - pub async fn remove(&self, token: &str) { - self.teams.write().await.remove(token); - } -}