diff --git a/rust/capture/src/lib.rs b/rust/capture/src/lib.rs index 17079f4b78c74..ebb51deb89048 100644 --- a/rust/capture/src/lib.rs +++ b/rust/capture/src/lib.rs @@ -1,6 +1,7 @@ pub mod api; pub mod config; pub mod limiters; +pub mod passes; pub mod prometheus; pub mod redis; pub mod router; diff --git a/rust/capture/src/passes/invalid_surrogates.rs b/rust/capture/src/passes/invalid_surrogates.rs new file mode 100644 index 0000000000000..4697a07bd7977 --- /dev/null +++ b/rust/capture/src/passes/invalid_surrogates.rs @@ -0,0 +1,312 @@ +use std::str::Chars; + +#[derive(Debug, PartialEq, Clone, Copy)] +enum LastSeen { + Char, // Any regular character + Escape, // We've seen a backslash, not preceded by another backslash +} + +// Unicode unknown character replacement - �, but as a hex escape sequence +const REPLACEMENT: &str = "uFFFD"; +const HIGH_SURROGATE_RANGE: std::ops::Range = 0xD800..0xDBFF; +const LOW_SURROGATE_RANGE: std::ops::Range = 0xDC00..0xDFFF; +const HEX_ESCAPE_LENGTH: usize = 4; + +pub struct InvalidSurrogatesPass<'a> { + input: Chars<'a>, + last_seen: LastSeen, + pending_output: Vec, + pending_ptr: usize, + escape_seq_buf: String, +} + +impl<'a> Iterator for InvalidSurrogatesPass<'a> { + type Item = char; + + fn next(&mut self) -> Option { + self.step() + } +} + +impl<'a> InvalidSurrogatesPass<'a> { + pub fn new(input: Chars<'a>) -> Self { + Self { + input, + last_seen: LastSeen::Char, + pending_output: Vec::with_capacity(32), + pending_ptr: 0, + escape_seq_buf: String::with_capacity(32), + } + } + + fn queue(&mut self, c: char) { + if self.last_seen == LastSeen::Escape { + // When we enter an escape sequence, we swallow the backslash, + // to avoid having to backtrack if we drop an invalid escape sequence. + // So we have to emit it here. + self.pending_output.push('\\'); + self.pending_output.push(c); + self.last_seen = LastSeen::Char; + } else if c == '\\' { + // If we're not already in an escape sequence, enter one, dropping the char to + // avoid needing to backtrack + self.last_seen = LastSeen::Escape; + } else { + // If we're not in an escape sequence, and not entering one, just push + self.last_seen = LastSeen::Char; + self.pending_output.push(c); + } + } + + fn queue_str(&mut self, s: &str) { + for c in s.chars() { + self.queue(c); + } + } + + fn pop(&mut self) -> Option { + // We push chars into the buffer reading left-to-right, and need to emit them + // in the same order, so we have to track our stack index, and reset it when we + // run out of chars to pop. + if self.pending_ptr < self.pending_output.len() { + let c = self.pending_output[self.pending_ptr]; + self.pending_ptr += 1; + Some(c) + } else { + self.pending_output.clear(); + self.pending_ptr = 0; + None + } + } + + fn step(&mut self) -> Option { + if let Some(c) = self.pop() { + return Some(c); + } + + // We're out of input, and we've go no pending output, so we're done + let Some(c) = self.input.next() else { + // If we're all out of input and the last thing we saw was an escape, + // we have to emit that escape character. We just do that directly here, + // knowing the next call around we'll return None. + // Note that since we're parsing strings to get turned into json values, + // we technically know this will be immediately discarded, but there's + // no harm making it "correct" first. + if self.last_seen == LastSeen::Escape { + self.last_seen = LastSeen::Char; + return Some('\\'); + }; + return None; + }; + + match (self.last_seen, c) { + (LastSeen::Escape, 'u') => { + let first_code_point = + match collect_escape_sequence(&mut self.escape_seq_buf, &mut self.input) { + Ok(code_point) => code_point, + Err(None) => { + // We ran out of chars. Push a replacement, and return. + // We drop the collected chars here because, if we'd encountered a syntactically + // important one, it would have been caught as non-hex earlier and returned in + // the branch below. + self.queue_str(REPLACEMENT); + return self.pop(); + } + Err(Some(c)) => { + // We encountered an invalid char. Push the replacement, push the invalid char, and return + self.queue_str(REPLACEMENT); + self.queue(c); + return self.pop(); + } + }; + + // Now, we try to get the second member of the surrogate pair, since we require surrogates to be paired + match self.input.next() { + Some('\\') => { + // We don't push a backslash here because we're already in an escape sequence, + // and it would cause us to exit it - but the specific characters we're going + // to emit isn't known yet, so we can't push those and then a backslash either + } + Some(c) => { + self.queue_str(REPLACEMENT); + self.queue(c); + return self.pop(); + } + None => { + // We didn't get a second escape sequence, so we just drop the first one + self.queue_str(REPLACEMENT); + return self.pop(); + } + } + match self.input.next() { + Some('u') => {} + Some(c) => { + self.queue_str(REPLACEMENT); + self.queue('\\'); // We have to handle that we've already consumed a backslash + self.queue(c); + return self.pop(); + } + None => { + self.queue_str(REPLACEMENT); + self.queue('\\'); // As above + return self.pop(); + } + } + + let second_code_point = + match collect_escape_sequence(&mut self.escape_seq_buf, &mut self.input) { + Ok(code_point) => code_point, + Err(None) => { + self.queue_str(REPLACEMENT); + self.queue('\\'); + self.queue_str(REPLACEMENT); + return self.pop(); + } + Err(Some(c)) => { + self.queue_str(REPLACEMENT); + self.queue('\\'); + self.queue_str(REPLACEMENT); + self.queue(c); + return self.pop(); + } + }; + if HIGH_SURROGATE_RANGE.contains(&first_code_point) + && LOW_SURROGATE_RANGE.contains(&second_code_point) + { + // We have a valid pair of hex escapes, so we should push them. + // TODO - there's way to do this that doesn't require the + // allocation format! implies, but I'm not gonna work it out + // right now - we expect this to be /extremely/ rare + self.queue_str(&format!( + "u{:04X}\\u{:04X}", // First backslash is already in the buffer due to last_seen + first_code_point, second_code_point + )); + } else { + // We didn't get a valid pair, so we just drop the pair entirely + self.queue_str(REPLACEMENT); + self.queue('\\'); + self.queue_str(REPLACEMENT); + } + } + (LastSeen::Char | LastSeen::Escape, c) => { + // emit handles the transition between escape and char for us, + // so we just unconditionally emit here if the last thing we saw + // was a char, or the last thing we saw was an escape, AND the + // current char is not a 'u' (the case above) + self.queue(c); + } + } + + // Because we swallow escape chars to avoid backtracking, we have to recurse + // here to handle the case where we just entered an escape squeuence + self.next() + } +} + +// Collects 4 chars into a hex escape sequence, returning the first char that couldn't be part of +// one, if one was found. If we run out of input, we return Result::Err(None) +fn collect_escape_sequence( + buf: &mut String, + iter: &mut dyn Iterator, +) -> Result> { + buf.clear(); + for _ in 0..HEX_ESCAPE_LENGTH { + let Some(c) = iter.next() else { + return Err(None); + }; + // If this character couldn't be part of a hex escape sequence, we return it + if !c.is_ascii_hexdigit() { + return Err(Some(c)); + } + buf.push(c); + } + // Unwrap safe due to the checking above + Ok(u16::from_str_radix(buf, 16).unwrap()) +} + +#[cfg(test)] +mod test { + use crate::v0_request::RawEvent; + + const RAW_DATA: &str = include_str!("../../tests/invalid_surrogate.json"); + + #[test] + fn test() { + let pass = super::InvalidSurrogatesPass::new(RAW_DATA.chars()); + let data = pass.collect::(); + let res = serde_json::from_str::(&data); + assert!(res.is_ok()) + } + + #[test] + fn test_unpaired_high_surrogate() { + let raw_data = r#"{"event":"\uD800"}"#; + let pass = super::InvalidSurrogatesPass::new(raw_data.chars()); + let data = pass.collect::(); + assert_eq!(data, r#"{"event":"\uFFFD"}"#); + } + + #[test] + fn test_unpaired_low_surrogate() { + let raw_data = r#"{"event":"\uDC00"}"#; + let pass = super::InvalidSurrogatesPass::new(raw_data.chars()); + let data = pass.collect::(); + assert_eq!(data, r#"{"event":"\uFFFD"}"#); + } + + #[test] + fn test_wrong_order_surrogate_pair() { + let raw_data = r#"{"event":"\uDC00\uD800"}"#; + let pass = super::InvalidSurrogatesPass::new(raw_data.chars()); + let data = pass.collect::(); + assert_eq!(data, r#"{"event":"\uFFFD\uFFFD"}"#); + } + + #[test] + fn test_trailing_escape() { + let raw_data = r#"{"event":"\u"}"#; + let pass = super::InvalidSurrogatesPass::new(raw_data.chars()); + let data = pass.collect::(); + assert_eq!(data, r#"{"event":"\uFFFD"}"#); + } + + #[test] + fn test_trailing_escape_pair() { + let raw_data = r#"{"event":"\u\u"}"#; + let pass = super::InvalidSurrogatesPass::new(raw_data.chars()); + let data = pass.collect::(); + assert_eq!(data, r#"{"event":"\uFFFD\uFFFD"}"#); + } + + #[test] + fn test_trailing_escape_pair_high_surrogate() { + let raw_data = r#"{"event":"\uD800\u"}"#; + let pass = super::InvalidSurrogatesPass::new(raw_data.chars()); + let data = pass.collect::(); + assert_eq!(data, r#"{"event":"\uFFFD\uFFFD"}"#); + } + + #[test] + fn test_trailing_escape_pair_low_surrogate() { + let raw_data = r#"{"event":"\uDC00\u"}"#; + let pass = super::InvalidSurrogatesPass::new(raw_data.chars()); + let data = pass.collect::(); + assert_eq!(data, r#"{"event":"\uFFFD\uFFFD"}"#); + } + + #[test] + fn test_trailing_escape_char() { + let raw_data = r#"{"event":"\uD800\"#; + let pass = super::InvalidSurrogatesPass::new(raw_data.chars()); + let data = pass.collect::(); + assert_eq!(data, r#"{"event":"\uFFFD\"#); + } + + #[test] + fn test_valid_pair_trailing_slash() { + let raw_data = r#"{"event":"\uD800\uDC00\"#; + let pass = super::InvalidSurrogatesPass::new(raw_data.chars()); + let data = pass.collect::(); + assert_eq!(data, r#"{"event":"\uD800\uDC00\"#); + } +} diff --git a/rust/capture/src/passes/mod.rs b/rust/capture/src/passes/mod.rs new file mode 100644 index 0000000000000..c4aba6595389e --- /dev/null +++ b/rust/capture/src/passes/mod.rs @@ -0,0 +1 @@ +pub mod invalid_surrogates; diff --git a/rust/capture/src/v0_request.rs b/rust/capture/src/v0_request.rs index e3ea92c7ae53e..328e9872eeea3 100644 --- a/rust/capture/src/v0_request.rs +++ b/rust/capture/src/v0_request.rs @@ -12,6 +12,7 @@ use tracing::instrument; use uuid::Uuid; use crate::api::CaptureError; +use crate::passes::invalid_surrogates::InvalidSurrogatesPass; use crate::prometheus::report_dropped_events; use crate::token::validate_token; @@ -176,6 +177,8 @@ impl RawRequest { s }; + let payload: String = InvalidSurrogatesPass::new(payload.chars()).collect(); + tracing::debug!(json = payload, "decoded event data"); Ok(serde_json::from_str::(&payload)?) } diff --git a/rust/capture/tests/invalid_surrogate.json b/rust/capture/tests/invalid_surrogate.json new file mode 100644 index 0000000000000..7f33bf42717f4 --- /dev/null +++ b/rust/capture/tests/invalid_surrogate.json @@ -0,0 +1,28 @@ +{ + "event": "$snapshot", + "properties": { + "$snapshot_data": [ + { + "windowId": "01924ccf-34f9-764e-b7f9-73c74eb7ed55", + "data": { + "payload": { + "level": "warn", + "payload": ["\\\\\\\",\\\\\\\"emoji_flag\\\\\\\":\\\\\\\"\ud83c...[truncated]", "\"test\""], + "trace": [ + "q/< (https://internal-t.posthog.com/static/recorder.js?v=1.166.0:1:19808)", + "q (https://internal-t.posthog.com/static/recorder.js?v=1.166.0:1:20042)", + "z (https://internal-t.posthog.com/static/recorder.js?v=1.166.0:1:21009)", + "a (https://internal-t.posthog.com/static/recorder.js?v=1.166.0:1:34064)", + "e/this.emit (https://internal-t.posthog.com/static/recorder.js?v=1.166.0:1:35299)", + "e/this.processMutations (https://internal-t.posthog.com/static/recorder.js?v=1.166.0:1:33691)" + ] + }, + "plugin": "rrweb/console@1" + }, + "timestamp": 1727865503680, + "type": 6, + "seen": 1185537021728171 + } + ] + } +}