Skip to content

Commit

Permalink
fix(propdefs): back to quick_cache, new issuing strategy (#25295)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Oct 1, 2024
1 parent 761c6ce commit 4e41c0f
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 69 deletions.
15 changes: 14 additions & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,5 @@ url = { version = "2.5.0 " }
uuid = { version = "1.6.1", features = ["v7", "serde"] }
neon = "1"
moka = {version = "0.12.8", features = ["sync"] }
quick_cache = "0.6.9"
ahash = "0.8.11"
3 changes: 2 additions & 1 deletion rust/property-defs-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ axum = { workspace = true }
serve-metrics = { path = "../common/serve_metrics" }
metrics = { workspace = true }
chrono = { workspace = true }
moka = { workspace = true }
quick_cache = { workspace = true }
common-metrics = { path = "../common/metrics" }
common-alloc = { path = "../common/alloc" }
common-kafka = { path = "../common/kafka" }
ahash = { workspace = true }
uuid = { workspace = true }
rand = { workspace = true }

[lints]
workspace = true
4 changes: 2 additions & 2 deletions rust/property-defs-rs/src/app_context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use health::{HealthHandle, HealthRegistry};
use moka::sync::Cache;
use quick_cache::sync::Cache;
use sqlx::{postgres::PgPoolOptions, PgPool};
use time::Duration;
use tracing::warn;
Expand Down Expand Up @@ -34,7 +34,7 @@ impl AppContext {
.register("worker".to_string(), Duration::seconds(60))
.await;

let group_type_cache = Cache::new(config.group_type_cache_size as u64);
let group_type_cache = Cache::new(config.group_type_cache_size);

Ok(Self {
pool,
Expand Down
114 changes: 62 additions & 52 deletions rust/property-defs-rs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,22 @@ use axum::{routing::get, Router};
use common_kafka::kafka_consumer::{RecvErr, SingleTopicConsumer};

use futures::future::ready;
use moka::sync::{Cache, CacheBuilder};
use property_defs_rs::{
app_context::AppContext,
config::{Config, TeamFilterMode, TeamList},
metrics_consts::{
BATCH_ACQUIRE_TIME, CACHE_CONSUMED, COMPACTED_UPDATES, EMPTY_EVENTS, EVENTS_RECEIVED,
EVENT_PARSE_ERROR, FORCED_SMALL_BATCH, ISSUE_FAILED, PERMIT_WAIT_TIME, RECV_DEQUEUED,
SKIPPED_DUE_TO_TEAM_FILTER, TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE,
UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME, WORKER_BLOCKED,
BATCH_ACQUIRE_TIME, CACHE_CONSUMED, CHUNK_SIZE, COMPACTED_UPDATES, DUPLICATES_IN_BATCH,
EMPTY_EVENTS, EVENTS_RECEIVED, EVENT_PARSE_ERROR, FORCED_SMALL_BATCH, ISSUE_FAILED,
RECV_DEQUEUED, SKIPPED_DUE_TO_TEAM_FILTER, UPDATES_FILTERED_BY_CACHE, UPDATES_PER_EVENT,
UPDATES_SEEN, UPDATE_ISSUE_TIME, WORKER_BLOCKED,
},
types::{Event, Update},
};

use quick_cache::sync::Cache;
use serve_metrics::{serve, setup_metrics_routes};
use tokio::{
sync::{
mpsc::{self, error::TrySendError},
Semaphore,
},
sync::mpsc::{self, error::TrySendError},
task::JoinHandle,
};
use tracing::{error, info, warn};
Expand Down Expand Up @@ -65,7 +62,7 @@ fn start_health_liveness_server(config: &Config, context: Arc<AppContext>) -> Jo
async fn spawn_producer_loop(
consumer: SingleTopicConsumer,
channel: mpsc::Sender<Update>,
shared_cache: Arc<Cache<Update, (), ahash::RandomState>>,
shared_cache: Arc<Cache<Update, ()>>,
skip_threshold: usize,
compaction_batch_size: usize,
team_filter_mode: TeamFilterMode,
Expand Down Expand Up @@ -165,11 +162,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
start_health_liveness_server(&config, context.clone());

let (tx, mut rx) = mpsc::channel(config.update_batch_size * config.channel_slots_per_worker);
let transaction_limit = Arc::new(Semaphore::new(config.max_concurrent_transactions));

let cache = CacheBuilder::new(config.cache_capacity as u64)
.time_to_live(Duration::from_secs(config.cache_ttl_seconds))
.build_with_hasher(ahash::RandomState::default());
let cache = Cache::new(config.cache_capacity);

let cache = Arc::new(cache);

Expand Down Expand Up @@ -218,48 +212,64 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
batch_time.fin();

metrics::gauge!(TRANSACTION_LIMIT_SATURATION).set(
(config.max_concurrent_transactions - transaction_limit.available_permits()) as f64,
);
// We de-duplicate the batch, in case racing inserts slipped through the shared-cache filter. This
// is important because duplicate updates touch the same row, and we issue in parallel, so we'd end
// up deadlocking ourselves. We can still encounter deadlocks due to other pods, but those should
// be rarer, and we use retries to handle them.
let start_len = batch.len();
batch.sort_unstable();
batch.dedup();

metrics::counter!(DUPLICATES_IN_BATCH).increment((start_len - batch.len()) as u64);

let cache_utilization = cache.entry_count() as f64 / config.cache_capacity as f64;
let cache_utilization = cache.len() as f64 / config.cache_capacity as f64;
metrics::gauge!(CACHE_CONSUMED).set(cache_utilization);

// We unconditionally wait to wait for a transaction permit - this is our backpressure mechanism. If we
// fail to acquire a permit for long enough, we will fail liveness checks (but that implies our ongoing
// transactions are halted, at which point DB health is a concern).
let permit_acquire_time = common_metrics::timing_guard(PERMIT_WAIT_TIME, &[]);
// This semaphore will never be closed.
let permit = transaction_limit.clone().acquire_owned().await.unwrap();
permit_acquire_time.fin();

let m_context = context.clone();
let m_cache = cache.clone();
tokio::spawn(async move {
let _permit = permit;
let mut tries = 0;
let issue_time = common_metrics::timing_guard(UPDATE_ISSUE_TIME, &[]);
// We occasionally enocounter deadlocks while issuing updates, so we retry a few times, and
// if we still fail, we drop the batch and clear it's content from the cached update set, because
// we assume everything in it will be seen again.
while let Err(e) = m_context.issue(&mut batch, cache_utilization).await {
error!("Issue failed: {:?}, sleeping for 100ms", e);
tries += 1;
if tries > 3 {
metrics::counter!(ISSUE_FAILED).increment(1);
error!("Too many tries, dropping batch");
// We clear any updates that were in this batch from the cache, so that
// if we see them again we'll try again to issue them.
batch.iter().for_each(|u| {
m_cache.remove(u);
});
issue_time.label("outcome", "failed").fin();
return;
// We split our update batch into chunks, one per transaction. We know each update touches
// exactly one row, so we can issue the chunks in parallel, and smaller batches issue faster,
// which helps us with inter-pod deadlocking and retries.
let chunk_size = batch.len() / config.max_concurrent_transactions;
let mut chunks = vec![Vec::with_capacity(chunk_size); config.max_concurrent_transactions];
for (i, update) in batch.drain(..).enumerate() {
chunks[i % config.max_concurrent_transactions].push(update);
}

metrics::gauge!(CHUNK_SIZE).set(chunk_size as f64);

let mut handles = Vec::new();
let issue_time = common_metrics::timing_guard(UPDATE_ISSUE_TIME, &[]);
for mut chunk in chunks {
let m_context = context.clone();
let m_cache = cache.clone();
let handle = tokio::spawn(async move {
let mut tries = 0;
// We occasionally enocounter deadlocks while issuing updates, so we retry a few times, and
// if we still fail, we drop the batch and clear it's content from the cached update set, because
// we assume everything in it will be seen again.
while let Err(e) = m_context.issue(&mut chunk, cache_utilization).await {
tries += 1;
if tries > 3 {
metrics::counter!(ISSUE_FAILED).increment(1);
error!("Too many tries, dropping batch");
// We clear any updates that were in this batch from the cache, so that
// if we see them again we'll try again to issue them.
chunk.iter().for_each(|u| {
m_cache.remove(u);
});
return;
}

let jitter = rand::random::<u64>() % 50;
warn!("Issue failed: {:?}, sleeping for {}ms", e, jitter);
tokio::time::sleep(Duration::from_millis(jitter)).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
handles.push(handle);
}

issue_time.label("outcome", "success").fin();
});
for handle in handles {
handle.await?;
}
issue_time.fin();
}
}
4 changes: 2 additions & 2 deletions rust/property-defs-rs/src/metrics_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ pub const UPDATES_SEEN: &str = "prop_defs_seen_updates";
pub const WORKER_BLOCKED: &str = "prop_defs_worker_blocked";
pub const UPDATES_PER_EVENT: &str = "prop_defs_updates_per_event";
pub const UPDATES_FILTERED_BY_CACHE: &str = "prop_defs_filtered_by_cache";
pub const TRANSACTION_LIMIT_SATURATION: &str = "prop_defs_transaction_limit_saturation";
pub const EMPTY_EVENTS: &str = "prop_defs_empty_events";
pub const EVENT_PARSE_ERROR: &str = "prop_defs_event_parse_error";
pub const BATCH_ACQUIRE_TIME: &str = "prop_defs_batch_acquire_time_ms";
pub const PERMIT_WAIT_TIME: &str = "prop_defs_permit_wait_time_ms";
pub const UPDATE_ISSUE_TIME: &str = "prop_defs_update_issue_time_ms";
pub const CACHE_CONSUMED: &str = "prop_defs_cache_space";
pub const RECV_DEQUEUED: &str = "prop_defs_recv_dequeued";
Expand All @@ -22,3 +20,5 @@ pub const UPDATES_SKIPPED: &str = "prop_defs_skipped_updates";
pub const GROUP_TYPE_READS: &str = "prop_defs_group_type_reads";
pub const SKIPPED_DUE_TO_TEAM_FILTER: &str = "prop_defs_skipped_due_to_team_filter";
pub const ISSUE_FAILED: &str = "prop_defs_issue_failed";
pub const CHUNK_SIZE: &str = "prop_defs_chunk_size";
pub const DUPLICATES_IN_BATCH: &str = "prop_defs_duplicates_in_batch";
22 changes: 11 additions & 11 deletions rust/property-defs-rs/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub const SKIP_PROPERTIES: [&str; 9] = [
"$groups",
];

#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
pub enum PropertyParentType {
Event = 1,
Person = 2,
Expand All @@ -44,7 +44,7 @@ impl From<PropertyParentType> for i32 {
}
}

#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq, PartialOrd, Ord)]
pub enum PropertyValueType {
DateTime,
String,
Expand All @@ -66,7 +66,7 @@ impl fmt::Display for PropertyValueType {
}

// The grouptypemapping table uses i32's, but we get group types by name, so we have to resolve them before DB writes, sigh
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub enum GroupType {
Unresolved(String),
Resolved(String, i32),
Expand All @@ -81,7 +81,7 @@ impl GroupType {
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub struct PropertyDefinition {
pub team_id: i32,
pub name: String,
Expand All @@ -94,23 +94,23 @@ pub struct PropertyDefinition {
pub query_usage_30_day: Option<i64>, // Deprecated
}

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub struct EventDefinition {
pub name: String,
pub team_id: i32,
pub last_seen_at: DateTime<Utc>,
pub last_seen_at: DateTime<Utc>, // Always floored to our update rate for last_seen, so this Eq derive is safe for deduping
}

// Derived hash since these are keyed on all fields in the DB
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
#[derive(Clone, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
pub struct EventProperty {
team_id: i32,
event: String,
property: String,
pub team_id: i32,
pub event: String,
pub property: String,
}

// Represents a generic update, but comparable, allowing us to dedupe and cache updates
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
#[derive(Clone, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
pub enum Update {
Event(EventDefinition),
Property(PropertyDefinition),
Expand Down

0 comments on commit 4e41c0f

Please sign in to comment.