From d9676495daf0fd0846744b93441637ab4052eae2 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Mon, 8 Apr 2024 08:05:08 +0000 Subject: [PATCH] pull identity initialization out of app building Signed-off-by: Zahari Dichev --- Cargo.lock | 193 +++++++++++++++++- linkerd/app/Cargo.toml | 1 + linkerd/app/integration/src/proxy.rs | 37 +++- linkerd/app/integration/src/tests/identity.rs | 15 +- linkerd/app/src/env.rs | 41 ++-- linkerd/app/src/identity.rs | 161 ++++----------- linkerd/app/src/identity/linkerd_config.rs | 118 +++++++++++ linkerd/app/src/identity/spire_config.rs | 100 +++++++++ linkerd/app/src/lib.rs | 86 ++------ linkerd/app/src/spire.rs | 4 +- linkerd/identity/src/credentials.rs | 1 + linkerd/identity/src/metrics.rs | 10 +- linkerd/meshtls/Cargo.toml | 1 + linkerd/meshtls/boring/src/creds/store.rs | 1 + linkerd/meshtls/boring/src/tests.rs | 9 +- linkerd/meshtls/rustls/src/creds/store.rs | 1 + linkerd/meshtls/rustls/src/tests.rs | 9 +- linkerd/meshtls/src/creds.rs | 5 +- linkerd/meshtls/src/lib.rs | 2 + linkerd/meshtls/tests/util.rs | 9 +- linkerd/proxy/identity-client/src/certify.rs | 5 +- linkerd/proxy/spire-client/src/api.rs | 34 ++- linkerd/proxy/spire-client/src/lib.rs | 75 ++----- linkerd2-proxy/src/main.rs | 48 ++++- 24 files changed, 656 insertions(+), 310 deletions(-) create mode 100644 linkerd/app/src/identity/linkerd_config.rs create mode 100644 linkerd/app/src/identity/spire_config.rs diff --git a/Cargo.lock b/Cargo.lock index 7689d9bb4c..a3aaf463f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anyhow" version = "1.0.81" @@ -183,6 +198,12 @@ dependencies = [ "tower-service", ] +[[package]] +name = "b64-ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac1e8deebfdca687fcef6fe024fc92cf8183f203075ce4fda263ae6ea13a8dc3" + [[package]] name = "backtrace" version = "0.3.71" @@ -230,6 +251,12 @@ dependencies = [ "syn", ] +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "1.3.2" @@ -276,6 +303,12 @@ dependencies = [ "fslock", ] +[[package]] +name = "bumpalo" +version = "3.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" + [[package]] name = "bytes" version = "1.6.0" @@ -307,6 +340,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets 0.52.0", +] + [[package]] name = "clang-sys" version = "1.7.0" @@ -327,6 +374,12 @@ dependencies = [ "cc", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "cpp_demangle" version = "0.4.3" @@ -398,7 +451,7 @@ dependencies = [ "asn1-rs", "displaydoc", "nom", - "num-bigint", + "num-bigint 0.4.4", "num-traits", "rusticata-macros", ] @@ -863,6 +916,29 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.4.0" @@ -965,6 +1041,15 @@ dependencies = [ "libc", ] +[[package]] +name = "js-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1033,6 +1118,7 @@ dependencies = [ "linkerd-error", "linkerd-opencensus", "linkerd-tonic-stream", + "pkix", "rangemap", "regex", "thiserror", @@ -1532,6 +1618,7 @@ dependencies = [ "linkerd-io", "linkerd-meshtls-boring", "linkerd-meshtls-rustls", + "linkerd-meshtls-verifier", "linkerd-proxy-transport", "linkerd-stack", "linkerd-tls", @@ -2377,6 +2464,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-bigint" version = "0.4.4" @@ -2550,6 +2648,22 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkix" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6e18f09ac3ee824fac9693352470dcb4b008d211a41f103fa4e80e91e2e6260" +dependencies = [ + "b64-ct", + "bit-vec", + "bitflags 1.3.2", + "chrono", + "lazy_static", + "num-bigint 0.2.6", + "num-integer", + "yasna 0.3.2", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -2774,7 +2888,7 @@ dependencies = [ "pem", "ring", "time", - "yasna", + "yasna 0.5.2", ] [[package]] @@ -3026,7 +3140,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085" dependencies = [ - "num-bigint", + "num-bigint 0.4.4", "num-traits", "thiserror", "time", @@ -3627,6 +3741,60 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" + [[package]] name = "which" version = "4.4.2" @@ -3667,6 +3835,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -3826,6 +4003,16 @@ dependencies = [ "time", ] +[[package]] +name = "yasna" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de7bff972b4f2a06c85f6d8454b09df153af7e3a4ec2aac81db1b105b684ddb" +dependencies = [ + "bit-vec", + "num-bigint 0.2.6", +] + [[package]] name = "yasna" version = "0.5.2" diff --git a/linkerd/app/Cargo.toml b/linkerd/app/Cargo.toml index af443530a3..d9f5535a31 100644 --- a/linkerd/app/Cargo.toml +++ b/linkerd/app/Cargo.toml @@ -34,3 +34,4 @@ tokio-stream = { version = "0.1", features = ["time", "sync"] } tonic = { version = "0.10", default-features = false, features = ["prost"] } tower = "0.4" tracing = "0.1" +pkix = "0.2.3" diff --git a/linkerd/app/integration/src/proxy.rs b/linkerd/app/integration/src/proxy.rs index dda62e4e92..3a521bf7fe 100644 --- a/linkerd/app/integration/src/proxy.rs +++ b/linkerd/app/integration/src/proxy.rs @@ -1,10 +1,12 @@ use super::*; use linkerd_app_core::{ + metrics::{prom, Metrics}, svc::Param, transport::OrigDstAddr, transport::{listen, orig_dst, Keepalive, ListenAddr}, Result, }; + use std::{collections::HashSet, thread}; use tokio::net::TcpStream; @@ -280,6 +282,21 @@ impl Listening { } } +async fn build_identity( + config: app::Config, + metrics: Metrics, + registry: &mut prom::Registry, +) -> Result { + let dns = config.dns.clone().build(); + + debug!("Building Identity client"); + tracing::info_span!("identity").in_scope(|| { + config + .identity + .build(dns.resolver.clone(), metrics.control.clone(), registry) + }) +} + async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { use app::env::Strings; @@ -452,12 +469,30 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { .build() .expect("proxy") .block_on(async move { + let mut registry = prom::Registry::default(); + let (metrics, report) = Metrics::new(config.admin.metrics_retain_idle); + + let identity = + build_identity(config.clone(), metrics.clone(), &mut registry) + .await + .expect("identity"); let bind_in = inbound; let bind_out = outbound; let bind_adm = listen::BindTcp::default(); let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel(); + let main = config - .build(bind_in, bind_out, bind_adm, shutdown_tx, trace_handle) + .build( + bind_in, + bind_out, + bind_adm, + shutdown_tx, + trace_handle, + identity.initialize().await, + registry, + metrics, + report, + ) .await .expect("config"); diff --git a/linkerd/app/integration/src/tests/identity.rs b/linkerd/app/integration/src/tests/identity.rs index 38048bbe79..a74cef1df1 100644 --- a/linkerd/app/integration/src/tests/identity.rs +++ b/linkerd/app/integration/src/tests/identity.rs @@ -131,6 +131,7 @@ async fn http1_accepts_tls_after_identity_is_certified() { } #[tokio::test] +#[ignore] // ignored for now - if app is constructed identity is assumed to be initialized async fn http1_rejects_tls_before_identity_is_certified() { generate_tls_reject_test! {client: client::http1_tls} } @@ -144,6 +145,7 @@ async fn http2_accepts_tls_after_identity_is_certified() { } #[tokio::test] +#[ignore] // ignored for now - if app is constructed identity is assumed to be initialized async fn http2_rejects_tls_before_identity_is_certified() { generate_tls_reject_test! {client: client::http2_tls} } @@ -166,6 +168,10 @@ async fn ready() { .run() .await; + // Make the mock identity service respond to the certify request. + tx.send(certify_rsp) + .expect("certify rx should not be dropped"); + let proxy = proxy::new().identity(id_svc).run_with_test_env(env).await; let client = client::http1(proxy.admin, "localhost"); @@ -176,15 +182,8 @@ async fn ready() { .await .unwrap() }; - // The proxy's identity has not yet been verified, so it should not be - // considered ready. - assert_ne!(ready().await.status(), http::StatusCode::OK); - - // Make the mock identity service respond to the certify request. - tx.send(certify_rsp) - .expect("certify rx should not be dropped"); - // Now, the proxy should be ready. + // The proxy should be ready. assert_eventually!(ready().await.status() == http::StatusCode::OK); } diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 53a1a1ef95..be5667d2f0 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -209,7 +209,6 @@ pub const ENV_IDENTITY_SPIRE_SOCKET: &str = "LINKERD2_PROXY_IDENTITY_SPIRE_SOCKE pub const IDENTITY_SPIRE_BASE: &str = "LINKERD2_PROXY_IDENTITY_SPIRE"; const DEFAULT_SPIRE_BACKOFF: ExponentialBackoff = ExponentialBackoff::new_unchecked(Duration::from_millis(100), Duration::from_secs(1), 0.1); -const SPIFFE_ID_URI_SCHEME: &str = "spiffe"; pub const ENV_IDENTITY_SVC_BASE: &str = "LINKERD2_PROXY_IDENTITY_SVC"; @@ -812,17 +811,14 @@ pub fn parse_config(strings: &S) -> Result .unwrap_or(super::tap::Config::Disabled); let identity = { - let tls = tls?; - match strings.get(ENV_IDENTITY_SPIRE_SOCKET)? { - Some(socket) => match &tls.id { - // TODO: perform stricter SPIFFE ID validation following: - // https://github.com/spiffe/spiffe/blob/27b59b81ba8c56885ac5d4be73b35b9b3305fd7a/standards/SPIFFE-ID.md - identity::Id::Uri(uri) - if uri.scheme().eq_ignore_ascii_case(SPIFFE_ID_URI_SCHEME) => - { - identity::Config::Spire { - tls, + Some(socket) => { + let server_name = + parse(strings, ENV_IDENTITY_IDENTITY_SERVER_NAME, parse_dns_name)?; + + match server_name { + Some(server_name) => identity::Config::Spire(identity::spire_config::Config { + server_name, client: spire::Config { socket_addr: std::sync::Arc::new(socket), backoff: parse_backoff( @@ -831,13 +827,14 @@ pub fn parse_config(strings: &S) -> Result DEFAULT_SPIRE_BACKOFF, )?, }, + }), + + None => { + error!("{} must be set.", ENV_IDENTITY_IDENTITY_SERVER_NAME); + return Err(EnvError::InvalidEnvVar); } } - _ => { - error!("Spire support requires a SPIFFE TLS Id"); - return Err(EnvError::InvalidEnvVar); - } - }, + } None => { let (addr, certify) = parse_linkerd_identity_config(strings)?; @@ -853,9 +850,9 @@ pub fn parse_config(strings: &S) -> Result outbound.http_request_queue.failfast_timeout }; - identity::Config::Linkerd { + identity::Config::Linkerd(identity::linkerd_config::Config { certify, - tls, + tls: tls?, client: ControlConfig { addr, connect, @@ -864,7 +861,7 @@ pub fn parse_config(strings: &S) -> Result failfast_timeout, }, }, - } + }) } } }; @@ -1007,7 +1004,7 @@ fn parse_duration_opt(s: &str) -> Result, ParseError> { parse_duration(s).map(Some) } -fn parse_duration(s: &str) -> Result { +pub fn parse_duration(s: &str) -> Result { use regex::Regex; let re = Regex::new(r"^\s*(\d+)(ms|s|m|h|d)?\s*$").expect("duration regex"); @@ -1026,7 +1023,7 @@ fn parse_duration(s: &str) -> Result { } } -fn parse_socket_addr(s: &str) -> Result { +pub fn parse_socket_addr(s: &str) -> Result { match parse_addr(s)? { Addr::Socket(a) => Ok(a), _ => { @@ -1087,7 +1084,7 @@ fn parse_port_range_set(s: &str) -> Result, ParseError> { Ok(set) } -pub(super) fn parse_dns_name(s: &str) -> Result { +pub fn parse_dns_name(s: &str) -> Result { s.parse().map_err(|_| { error!("Not a valid identity name: {s}"); ParseError::NameError diff --git a/linkerd/app/src/identity.rs b/linkerd/app/src/identity.rs index b9b7df6a7c..ba8ba1f377 100644 --- a/linkerd/app/src/identity.rs +++ b/linkerd/app/src/identity.rs @@ -1,17 +1,18 @@ -use crate::spire; +pub mod linkerd_config; +pub mod spire_config; pub use linkerd_app_core::identity::{client, Id}; + use linkerd_app_core::{ - control, dns, - identity::{ - client::linkerd::Certify, creds, CertMetrics, Credentials, DerX509, Mode, WithCertMetrics, - }, + dns, + identity::{creds, CertMetrics}, metrics::{prom, ControlHttp as ClientMetrics}, Result, }; -use std::{future::Future, pin::Pin, time::SystemTime}; +use std::{future::Future, pin::Pin}; use tokio::sync::watch; -use tracing::Instrument; +use tokio::time; +use tracing::{info_span, Instrument}; #[derive(Debug, thiserror::Error)] #[error("linkerd identity requires a TLS Id and server name to be the same")] @@ -20,15 +21,8 @@ pub struct TlsIdAndServerNameNotMatching(()); #[derive(Clone, Debug)] #[allow(clippy::large_enum_variant)] pub enum Config { - Linkerd { - client: control::Config, - certify: client::linkerd::Config, - tls: TlsParams, - }, - Spire { - client: spire::Config, - tls: TlsParams, - }, + Linkerd(linkerd_config::Config), + Spire(spire_config::Config), } #[derive(Clone, Debug)] @@ -39,23 +33,19 @@ pub struct TlsParams { } pub struct Identity { - receiver: creds::Receiver, - ready: watch::Receiver, + ready: watch::Receiver>, task: Task, } pub type Task = Pin + Send + 'static>>; -/// Wraps a credential with a watch sender that notifies receivers when the store has been updated -/// at least once. -struct NotifyReady { - store: creds::Store, - tx: watch::Sender, -} - // === impl Config === impl Config { + pub fn try_from_env() -> Result { + super::env::Env.try_config().map(|c| c.identity) + } + pub fn build( self, dns: dns::Resolver, @@ -64,112 +54,39 @@ impl Config { ) -> Result { let cert_metrics = CertMetrics::register(registry.sub_registry_with_prefix("identity_cert")); - - Ok(match self { - Self::Linkerd { - client, - certify, - tls, - } => { - // TODO: move this validation into env.rs - let name = match (&tls.id, &tls.server_name) { - (Id::Dns(id), sni) if id == sni => id.clone(), - (_id, _sni) => { - return Err(TlsIdAndServerNameNotMatching(()).into()); - } - }; - - let certify = Certify::from(certify); - let (store, receiver, ready) = watch(tls, cert_metrics)?; - - let task = { - let addr = client.addr.clone(); - let svc = client.build( - dns, - client_metrics, - registry.sub_registry_with_prefix("control_identity"), - receiver.new_client(), - ); - - Box::pin(certify.run(name, store, svc).instrument( - tracing::info_span!("identity", server.addr = %addr).or_current(), - )) - }; - Identity { - receiver, - ready, - task, - } - } - Self::Spire { client, tls } => { - let addr = client.socket_addr.clone(); - let spire = spire::client::Spire::new(tls.id.clone()); - - let (store, receiver, ready) = watch(tls, cert_metrics)?; - let task = - Box::pin(spire.run(store, spire::Client::from(client)).instrument( - tracing::info_span!("spire", server.addr = %addr).or_current(), - )); - - Identity { - receiver, - ready, - task, - } - } - }) - } -} - -fn watch( - tls: TlsParams, - metrics: CertMetrics, -) -> Result<( - WithCertMetrics, - creds::Receiver, - watch::Receiver, -)> { - let (tx, ready) = watch::channel(false); - let (store, receiver) = - Mode::default().watch(tls.id, tls.server_name, &tls.trust_anchors_pem)?; - let cred = WithCertMetrics::new(metrics, NotifyReady { store, tx }); - Ok((cred, receiver, ready)) -} - -// === impl NotifyReady === - -impl Credentials for NotifyReady { - fn set_certificate( - &mut self, - leaf: DerX509, - chain: Vec, - key: Vec, - exp: SystemTime, - ) -> Result<()> { - self.store.set_certificate(leaf, chain, key, exp)?; - let _ = self.tx.send(true); - Ok(()) + match self { + Self::Linkerd(linkerd) => linkerd.build(cert_metrics, dns, client_metrics, registry), + Self::Spire(spire) => spire.build(cert_metrics), + } } } // === impl Identity === impl Identity { - /// Returns a future that is satisfied once certificates have been provisioned. - pub fn ready(&self) -> Pin + Send + 'static>> { + pub async fn initialize(self) -> creds::Receiver { + // spawn the identity task + tokio::spawn(self.task.instrument(info_span!("identity").or_current())); + let mut ready = self.ready.clone(); - Box::pin(async move { - while !*ready.borrow_and_update() { + const TIMEOUT: time::Duration = time::Duration::from_secs(15); + + let mut receiver_fut = Box::pin(async move { + loop { + if let Some(receiver) = (*ready.borrow_and_update()).clone() { + return receiver; + } ready.changed().await.expect("identity sender must be held"); } - }) - } + }); - pub fn receiver(&self) -> creds::Receiver { - self.receiver.clone() - } - - pub fn run(self) -> Task { - self.task + loop { + tokio::select! { + receiver = (&mut receiver_fut) => return receiver, + _ = time::sleep(TIMEOUT) => { + tracing::warn!("Waiting for identity to be initialized..."); + } + } + } } } diff --git a/linkerd/app/src/identity/linkerd_config.rs b/linkerd/app/src/identity/linkerd_config.rs new file mode 100644 index 0000000000..9733da27af --- /dev/null +++ b/linkerd/app/src/identity/linkerd_config.rs @@ -0,0 +1,118 @@ +pub use linkerd_app_core::identity::{client, Id}; +use linkerd_app_core::{ + control, dns, + identity::{ + client::linkerd::Certify, creds, CertMetrics, Credentials, DerX509, Mode, WithCertMetrics, + }, + metrics::{prom, ControlHttp as ClientMetrics}, + Result, +}; +use std::time::SystemTime; +use tokio::sync::watch; +use tracing::Instrument; + +/// Wraps a credential with a watch sender that notifies receivers when the store has been updated +/// at least once. +struct NotifyReady { + store: creds::Store, + tx: watch::Sender, +} + +#[derive(Clone, Debug)] +pub struct Config { + pub client: control::Config, + pub certify: client::linkerd::Config, + pub tls: super::TlsParams, +} + +impl Config { + pub fn build( + self, + cert_metrics: CertMetrics, + dns: dns::Resolver, + client_metrics: ClientMetrics, + registry: &mut prom::Registry, + ) -> Result { + let Self { + client, + certify, + tls, + } = self; + // TODO: move this validation into env.rs + let name = match (&tls.id, &tls.server_name) { + (Id::Dns(id), sni) if id == sni => id.clone(), + (_id, _sni) => { + return Err(super::TlsIdAndServerNameNotMatching(()).into()); + } + }; + + let roots = DerX509(pkix::pem::pem_to_der(&tls.trust_anchors_pem, None).unwrap()); + let certify = Certify::from(certify); + let (store, receiver, mut obtained_cert) = watch(tls, cert_metrics)?; + let (ready_tx, ready_rx) = watch::channel(None); + + let task = + { + let addr = client.addr.clone(); + let svc = client.build( + dns, + client_metrics, + registry.sub_registry_with_prefix("control_identity"), + receiver.new_client(), + ); + + Box::pin(async move { + tokio::spawn(certify.run(name, roots, store, svc).instrument( + tracing::info_span!("identity", server.addr = %addr).or_current(), + )); + + // make sure to wait while a cert has been obtained + while !*obtained_cert.borrow_and_update() { + obtained_cert + .changed() + .await + .expect("identity sender must be held"); + } + + let _ = ready_tx.send(Some(receiver)); + }) + }; + + Ok(super::Identity { + ready: ready_rx, + task, + }) + } +} + +fn watch( + tls: super::TlsParams, + metrics: CertMetrics, +) -> Result<( + WithCertMetrics, + creds::Receiver, + watch::Receiver, +)> { + let (tx, ready) = watch::channel(false); + let (store, receiver) = + Mode::default().watch(tls.id, tls.server_name, &tls.trust_anchors_pem)?; + let cred = WithCertMetrics::new(metrics, NotifyReady { store, tx }); + Ok((cred, receiver, ready)) +} + +// === impl NotifyReady === + +impl Credentials for NotifyReady { + fn set_certificate( + &mut self, + leaf: DerX509, + chain: Vec, + key: Vec, + exp: SystemTime, + roots: DerX509, + ) -> Result<()> { + self.store.set_certificate(leaf, chain, key, exp, roots)?; + let _ = self.tx.send(true); + Ok(()) + } +} diff --git a/linkerd/app/src/identity/spire_config.rs b/linkerd/app/src/identity/spire_config.rs new file mode 100644 index 0000000000..5f31c7781a --- /dev/null +++ b/linkerd/app/src/identity/spire_config.rs @@ -0,0 +1,100 @@ +use crate::spire; + +pub use linkerd_app_core::identity::{client, Id}; +use linkerd_app_core::{ + dns, + identity::{client_identity, creds, CertMetrics, Credentials, DerX509, Mode, WithCertMetrics}, + Result, +}; +use spire::client as spire_client; +use std::time::SystemTime; +use tokio::sync::watch; +use tracing::Instrument; + +#[derive(Debug, thiserror::Error)] +#[error("linkerd identity requires a TLS Id and server name to be the same")] +pub struct TlsIdAndServerNameNotMatching(()); + +#[derive(Clone, Debug)] +pub struct Config { + pub client: spire::Config, + pub server_name: dns::Name, +} + +impl Config { + pub fn build(self, cert_metrics: CertMetrics) -> Result { + let addr = self.client.socket_addr.clone(); + let (store, ready_rx) = SpireInit::new(cert_metrics, self.server_name); + let task = Box::pin( + spire_client::run(store, spire::Client::from(self.client)) + .instrument(tracing::info_span!("spire", server.addr = %addr).or_current()), + ); + + Ok(super::Identity { + ready: ready_rx, + task, + }) + } +} + +struct SpireInit { + store: Option>, + tx: watch::Sender>, + cert_metrics: CertMetrics, + server_name: dns::Name, +} + +impl SpireInit { + fn new( + cert_metrics: CertMetrics, + server_name: dns::Name, + ) -> (Self, watch::Receiver>) { + let (tx, rx) = watch::channel(None); + let init = Self { + store: None, + tx, + cert_metrics, + server_name, + }; + + (init, rx) + } + + fn make_tls_params(&self, roots: DerX509, leaf: DerX509) -> super::TlsParams { + let id = client_identity(&leaf).unwrap(); + let trust_anchors_pem = + pkix::pem::der_to_pem(roots.0.as_slice(), pkix::pem::PEM_CERTIFICATE); + + super::TlsParams { + id, + server_name: self.server_name.clone(), + trust_anchors_pem, + } + } +} + +impl Credentials for SpireInit { + fn set_certificate( + &mut self, + leaf: DerX509, + chain: Vec, + key: Vec, + exp: SystemTime, + roots: DerX509, + ) -> Result<()> { + match &mut self.store { + Some(store) => store.set_certificate(leaf, chain, key, exp, roots), + None => { + let tls = self.make_tls_params(roots.clone(), leaf.clone()); + let (store, receiver) = + Mode::default().watch(tls.id, tls.server_name, &tls.trust_anchors_pem)?; + let mut store = WithCertMetrics::new(self.cert_metrics.clone(), store); + + store.set_certificate(leaf, chain, key, exp, roots)?; + self.store = Some(store); + let _ = self.tx.send(Some(receiver)); + Ok(()) + } + } + } +} diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index bc559263cf..2788e3aca1 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -13,14 +13,14 @@ pub mod spire; pub mod tap; pub use self::metrics::Metrics; -use futures::{future, Future, FutureExt}; +use futures::{future, FutureExt}; use linkerd_app_admin as admin; use linkerd_app_core::{ config::ServerConfig, control::ControlAddr, dns, drain, - metrics::prom, - metrics::FmtMetrics, + identity::creds::{self, Receiver}, + metrics::{prom, FmtMetrics}, serve, svc::Param, transport::{addrs::*, listen::Bind}, @@ -35,7 +35,7 @@ use tokio::{ sync::mpsc, time::{self, Duration}, }; -use tracing::{debug, info, info_span, Instrument}; +use tracing::{debug, info_span, Instrument}; /// Spawns a sidecar proxy. /// @@ -74,7 +74,7 @@ pub struct App { admin: admin::Task, drain: drain::Signal, dst: ControlAddr, - identity: identity::Identity, + identity: creds::Receiver, inbound_addr: Local, oc_collector: oc_collector::OcCollector, outbound_addr: Local, @@ -93,15 +93,21 @@ impl Config { /// /// It is currently required that this be run on a Tokio runtime, since some /// services are created eagerly and must spawn tasks to do so. - pub async fn build( + #[allow(clippy::too_many_arguments)] + pub async fn build( self, bind_in: BIn, bind_out: BOut, bind_admin: BAdmin, shutdown_tx: mpsc::UnboundedSender<()>, log_level: trace::Handle, + identity: Receiver, + mut registry: prom::Registry, + metrics: Metrics, + report: R, ) -> Result where + R: FmtMetrics + Clone + Send + Sync + Unpin + 'static, BIn: Bind + 'static, BIn::Addrs: Param> + Param> @@ -120,7 +126,6 @@ impl Config { dns, dst, policy, - identity, inbound, oc_collector, outbound, @@ -129,28 +134,16 @@ impl Config { .. } = self; debug!("Building app"); - let (metrics, report) = Metrics::new(admin.metrics_retain_idle); - - let mut registry = prom::Registry::default(); debug!("Building DNS client"); let dns = dns.build(); - // Ensure that we've obtained a valid identity before binding any servers. - debug!("Building Identity client"); - let identity = { - info_span!("identity").in_scope(|| { - identity.build(dns.resolver.clone(), metrics.control.clone(), &mut registry) - })? - }; - let (drain_tx, drain_rx) = drain::channel(); debug!(config = ?tap, "Building Tap server"); let tap = { let bind = bind_admin.clone(); - info_span!("tap") - .in_scope(|| tap.build(bind, identity.receiver().server(), drain_rx.clone()))? + info_span!("tap").in_scope(|| tap.build(bind, identity.server(), drain_rx.clone()))? }; debug!("Building Destination client"); @@ -158,8 +151,7 @@ impl Config { let registry = registry.sub_registry_with_prefix("control_destination"); let metrics = metrics.control.clone(); let dns = dns.resolver.clone(); - info_span!("dst") - .in_scope(|| dst.build(dns, metrics, registry, identity.receiver().new_client())) + info_span!("dst").in_scope(|| dst.build(dns, metrics, registry, identity.new_client())) }?; debug!("Building Policy client"); @@ -168,13 +160,13 @@ impl Config { let dns = dns.resolver.clone(); let metrics = metrics.control.clone(); info_span!("policy") - .in_scope(|| policy.build(dns, metrics, registry, identity.receiver().new_client())) + .in_scope(|| policy.build(dns, metrics, registry, identity.new_client())) }?; debug!(config = ?oc_collector, "Building client"); let oc_collector = { let registry = registry.sub_registry_with_prefix("opencensus"); - let identity = identity.receiver().new_client(); + let identity = identity.new_client(); let dns = dns.resolver; let client_metrics = metrics.control.clone(); let metrics = metrics.opencensus; @@ -183,7 +175,7 @@ impl Config { }?; let runtime = ProxyRuntime { - identity: identity.receiver(), + identity: identity.clone(), metrics: metrics.proxy, tap: tap.registry(), span_sink: oc_collector.span_sink(), @@ -241,11 +233,8 @@ impl Config { // Build a task that initializes and runs the proxy stacks. let start_proxy = { let drain_rx = drain_rx.clone(); - let identity_ready = identity.ready(); Box::pin(async move { - Self::await_identity(identity_ready).await; - tokio::spawn( serve::serve(outbound_listen, outbound, drain_rx.clone().signaled()) .instrument(info_span!("outbound").or_current()), @@ -262,7 +251,7 @@ impl Config { registry.register("proxy_build_info", "Proxy build info", BUILD_INFO.metric()); let admin = { - let identity = identity.receiver().server(); + let identity = identity.server(); let metrics = inbound_metrics.clone(); let report = inbound_metrics .and_report(outbound_metrics) @@ -296,21 +285,6 @@ impl Config { tap, }) } - - /// Waits for the proxy's identity to be certified. - /// - /// If this does not complete in a timely fashion, warnings are logged every 15s - async fn await_identity(mut fut: Pin + Send + 'static>>) { - const TIMEOUT: time::Duration = time::Duration::from_secs(15); - loop { - tokio::select! { - _ = (&mut fut) => return, - _ = time::sleep(TIMEOUT) => { - tracing::warn!("Waiting for identity to be initialized..."); - } - } - } - } } impl App { @@ -338,11 +312,11 @@ impl App { } pub fn local_server_name(&self) -> dns::Name { - self.identity.receiver().server_name().clone() + self.identity.server_name().clone() } pub fn local_tls_id(&self) -> identity::Id { - self.identity.receiver().local_id().clone() + self.identity.local_id().clone() } pub fn opencensus_addr(&self) -> Option<&ControlAddr> { @@ -356,7 +330,6 @@ impl App { let App { admin, drain, - identity, oc_collector, start_proxy, tap, @@ -388,25 +361,8 @@ impl App { .instrument(info_span!("admin", listen.addr = %admin.listen_addr)), ); - // Kick off the identity so that the process can become ready. - let local = identity.receiver(); - let local_id = local.local_id().clone(); - let ready = identity.ready(); - tokio::spawn( - identity - .run() - .instrument(info_span!("identity").or_current()), - ); - let latch = admin.latch; - tokio::spawn( - ready - .map(move |()| { - latch.release(); - info!(id = %local_id, "Certified identity"); - }) - .instrument(info_span!("identity").or_current()), - ); + latch.release(); if let tap::Tap::Enabled { registry, serve, .. diff --git a/linkerd/app/src/spire.rs b/linkerd/app/src/spire.rs index f1f449c2e6..5e87d50865 100644 --- a/linkerd/app/src/spire.rs +++ b/linkerd/app/src/spire.rs @@ -11,8 +11,8 @@ const TONIC_DEFAULT_URI: &str = "http://[::]:50051"; #[derive(Clone, Debug)] pub struct Config { - pub(crate) socket_addr: Arc, - pub(crate) backoff: ExponentialBackoff, + pub socket_addr: Arc, + pub backoff: ExponentialBackoff, } // Connects to SPIRE workload API via Unix Domain Socket diff --git a/linkerd/identity/src/credentials.rs b/linkerd/identity/src/credentials.rs index ab861af0d2..9b6b74a018 100644 --- a/linkerd/identity/src/credentials.rs +++ b/linkerd/identity/src/credentials.rs @@ -12,6 +12,7 @@ pub trait Credentials { chain: Vec, key: Vec, expiry: SystemTime, + roots: DerX509, ) -> Result<()>; } diff --git a/linkerd/identity/src/metrics.rs b/linkerd/identity/src/metrics.rs index f9d6917b0a..7126108602 100644 --- a/linkerd/identity/src/metrics.rs +++ b/linkerd/identity/src/metrics.rs @@ -89,8 +89,9 @@ where chain: Vec, key: Vec, expiry: SystemTime, + roots: DerX509, ) -> Result<()> { - if let Err(err) = self.inner.set_certificate(leaf, chain, key, expiry) { + if let Err(err) = self.inner.set_certificate(leaf, chain, key, expiry, roots) { self.metrics.errors.inc(); return Err(err); } @@ -132,6 +133,7 @@ mod tests { _chain: Vec, _key: Vec, _expiry: SystemTime, + _roots: DerX509, ) -> Result<()> { self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed); self.1.map_err(|()| "boop".into()) @@ -155,8 +157,9 @@ mod tests { let chain = vec![leaf.clone()]; let key = vec![0, 1, 2, 3, 4]; let expiry = SystemTime::now() + Duration::from_secs(60 * 60 * 24); // 1 day from now + let anchors = "trust-anchors".as_bytes().to_vec(); assert!(with_cert_metrics - .set_certificate(leaf, chain, key, expiry) + .set_certificate(leaf, chain, key, expiry, DerX509(anchors)) .is_ok()); assert_eq!(with_cert_metrics.metrics.refreshes.get(), 1); @@ -192,8 +195,9 @@ mod tests { let chain = vec![leaf.clone()]; let key = vec![0, 1, 2, 3, 4]; let expiry = SystemTime::now() + Duration::from_secs(60 * 60 * 24); // 1 day from now + let anchors = "trust-anchors".as_bytes().to_vec(); assert!(with_cert_metrics - .set_certificate(leaf, chain, key, expiry) + .set_certificate(leaf, chain, key, expiry, DerX509(anchors)) .is_err()); assert_eq!(with_cert_metrics.metrics.refreshes.get(), 0); diff --git a/linkerd/meshtls/Cargo.toml b/linkerd/meshtls/Cargo.toml index 7e67a36ac3..eb5e223524 100644 --- a/linkerd/meshtls/Cargo.toml +++ b/linkerd/meshtls/Cargo.toml @@ -25,6 +25,7 @@ linkerd-meshtls-boring = { path = "boring", optional = true } linkerd-meshtls-rustls = { path = "rustls", optional = true } linkerd-stack = { path = "../stack" } linkerd-tls = { path = "../tls" } +linkerd-meshtls-verifier = { path = "verifier" } [dev-dependencies] tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] } diff --git a/linkerd/meshtls/boring/src/creds/store.rs b/linkerd/meshtls/boring/src/creds/store.rs index 47b09d9a89..ec04531f53 100644 --- a/linkerd/meshtls/boring/src/creds/store.rs +++ b/linkerd/meshtls/boring/src/creds/store.rs @@ -28,6 +28,7 @@ impl id::Credentials for Store { intermediates: Vec, key_pkcs8: Vec, _expiry: std::time::SystemTime, + _roots: id::DerX509, ) -> Result<()> { let leaf = X509::from_der(&leaf_der)?; diff --git a/linkerd/meshtls/boring/src/tests.rs b/linkerd/meshtls/boring/src/tests.rs index b532a2db24..c75610e075 100644 --- a/linkerd/meshtls/boring/src/tests.rs +++ b/linkerd/meshtls/boring/src/tests.rs @@ -20,7 +20,8 @@ fn can_construct_client_and_server_config_from_valid_settings() { DerX509(FOO_NS1.crt.to_vec()), vec![], FOO_NS1.key.to_vec(), - SystemTime::now() + Duration::from_secs(1000) + SystemTime::now() + Duration::from_secs(1000), + DerX509(FOO_NS1.trust_anchors.to_vec()), ) .is_ok()); } @@ -32,7 +33,8 @@ fn recognize_ca_did_not_issue_cert() { DerX509(FOO_NS1.crt.to_vec()), vec![], FOO_NS1.key.to_vec(), - SystemTime::now() + Duration::from_secs(1000) + SystemTime::now() + Duration::from_secs(1000), + DerX509(FOO_NS1.trust_anchors.to_vec()), ) .is_err()); } @@ -44,7 +46,8 @@ fn recognize_cert_is_not_valid_for_identity() { DerX509(FOO_NS1.crt.to_vec()), vec![], FOO_NS1.key.to_vec(), - SystemTime::now() + Duration::from_secs(1000) + SystemTime::now() + Duration::from_secs(1000), + DerX509(FOO_NS1.trust_anchors.to_vec()), ) .is_err()); } diff --git a/linkerd/meshtls/rustls/src/creds/store.rs b/linkerd/meshtls/rustls/src/creds/store.rs index b0a692856e..3cdccea2ff 100644 --- a/linkerd/meshtls/rustls/src/creds/store.rs +++ b/linkerd/meshtls/rustls/src/creds/store.rs @@ -136,6 +136,7 @@ impl id::Credentials for Store { intermediates: Vec, key: Vec, _expiry: std::time::SystemTime, + _roots: id::DerX509, ) -> Result<()> { let mut chain = Vec::with_capacity(intermediates.len() + 1); chain.push(rustls::Certificate(leaf)); diff --git a/linkerd/meshtls/rustls/src/tests.rs b/linkerd/meshtls/rustls/src/tests.rs index b532a2db24..c75610e075 100644 --- a/linkerd/meshtls/rustls/src/tests.rs +++ b/linkerd/meshtls/rustls/src/tests.rs @@ -20,7 +20,8 @@ fn can_construct_client_and_server_config_from_valid_settings() { DerX509(FOO_NS1.crt.to_vec()), vec![], FOO_NS1.key.to_vec(), - SystemTime::now() + Duration::from_secs(1000) + SystemTime::now() + Duration::from_secs(1000), + DerX509(FOO_NS1.trust_anchors.to_vec()), ) .is_ok()); } @@ -32,7 +33,8 @@ fn recognize_ca_did_not_issue_cert() { DerX509(FOO_NS1.crt.to_vec()), vec![], FOO_NS1.key.to_vec(), - SystemTime::now() + Duration::from_secs(1000) + SystemTime::now() + Duration::from_secs(1000), + DerX509(FOO_NS1.trust_anchors.to_vec()), ) .is_err()); } @@ -44,7 +46,8 @@ fn recognize_cert_is_not_valid_for_identity() { DerX509(FOO_NS1.crt.to_vec()), vec![], FOO_NS1.key.to_vec(), - SystemTime::now() + Duration::from_secs(1000) + SystemTime::now() + Duration::from_secs(1000), + DerX509(FOO_NS1.trust_anchors.to_vec()), ) .is_err()); } diff --git a/linkerd/meshtls/src/creds.rs b/linkerd/meshtls/src/creds.rs index 40893a1846..300d9a4fd1 100644 --- a/linkerd/meshtls/src/creds.rs +++ b/linkerd/meshtls/src/creds.rs @@ -41,13 +41,14 @@ impl Credentials for Store { chain: Vec, key: Vec, exp: SystemTime, + _roots: DerX509, ) -> Result<()> { match self { #[cfg(feature = "boring")] - Self::Boring(store) => store.set_certificate(leaf, chain, key, exp), + Self::Boring(store) => store.set_certificate(leaf, chain, key, exp, _roots), #[cfg(feature = "rustls")] - Self::Rustls(store) => store.set_certificate(leaf, chain, key, exp), + Self::Rustls(store) => store.set_certificate(leaf, chain, key, exp, _roots), #[cfg(not(feature = "__has_any_tls_impls"))] _ => crate::no_tls!(leaf, chain, key, exp), diff --git a/linkerd/meshtls/src/lib.rs b/linkerd/meshtls/src/lib.rs index 39219dbdcf..89c10fa053 100644 --- a/linkerd/meshtls/src/lib.rs +++ b/linkerd/meshtls/src/lib.rs @@ -20,6 +20,8 @@ pub use self::{ client::{ClientIo, Connect, ConnectFuture, NewClient}, server::{Server, ServerIo, TerminateFuture}, }; +pub use linkerd_meshtls_verifier::client_identity; + use linkerd_dns_name as dns; use linkerd_error::{Error, Result}; use linkerd_identity as id; diff --git a/linkerd/meshtls/tests/util.rs b/linkerd/meshtls/tests/util.rs index c294ecfecc..cfaef77091 100644 --- a/linkerd/meshtls/tests/util.rs +++ b/linkerd/meshtls/tests/util.rs @@ -57,7 +57,13 @@ pub fn fails_processing_cert_when_wrong_id_configured(mode: meshtls::Mode) { .expect("should construct"); let err = store - .set_certificate(DerX509(cert), vec![], key, SystemTime::now()) + .set_certificate( + DerX509(cert.clone()), + vec![], + key, + SystemTime::now(), + DerX509(cert), + ) .expect_err("should error"); assert_eq!( @@ -177,6 +183,7 @@ fn load( vec![], ent.key.to_vec(), SystemTime::now() + Duration::from_secs(1000), + DerX509(ent.trust_anchors.to_vec()), ) .expect("certificate must be valid"); diff --git a/linkerd/proxy/identity-client/src/certify.rs b/linkerd/proxy/identity-client/src/certify.rs index ce2839b0df..6cad2ca759 100644 --- a/linkerd/proxy/identity-client/src/certify.rs +++ b/linkerd/proxy/identity-client/src/certify.rs @@ -88,7 +88,7 @@ impl From for Certify { } impl Certify { - pub async fn run(self, name: Name, mut credentials: C, new_client: N) + pub async fn run(self, name: Name, roots: DerX509, mut credentials: C, new_client: N) where C: Credentials, N: NewService<(), Service = S>, @@ -111,6 +111,7 @@ impl Certify { client, &name, &mut credentials, + roots.clone(), ) .await }; @@ -151,6 +152,7 @@ async fn certify( client: S, name: &Name, credentials: &mut C, + roots: DerX509, ) -> Result where C: Credentials, @@ -180,6 +182,7 @@ where intermediate_certificates.into_iter().map(DerX509).collect(), docs.key_pkcs8.clone(), expiry, + roots, )?; Ok(expiry) diff --git a/linkerd/proxy/spire-client/src/api.rs b/linkerd/proxy/spire-client/src/api.rs index 3c3ec16d4f..90a7fa6445 100644 --- a/linkerd/proxy/spire-client/src/api.rs +++ b/linkerd/proxy/spire-client/src/api.rs @@ -1,14 +1,13 @@ use futures::prelude::*; use linkerd_error::{Error, Recover, Result}; use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}; +use linkerd_identity::Credentials; use linkerd_identity::DerX509; -use linkerd_identity::{Credentials, Id}; use linkerd_proxy_http as http; use linkerd_tonic_watch::StreamWatch; use spiffe_proto::client::{ self as api, spiffe_workload_api_client::SpiffeWorkloadApiClient as Client, }; -use std::collections::HashMap; use std::time::{Duration, UNIX_EPOCH}; use tower::Service; use tracing::error; @@ -22,15 +21,15 @@ pub struct NoMatchingSVIDFound(()); #[derive(Clone)] pub struct Svid { - pub(super) spiffe_id: Id, leaf: DerX509, private_key: Vec, intermediates: Vec, + roots: DerX509, } #[derive(Clone)] pub struct SvidUpdate { - svids: HashMap, + svids: Vec, } #[derive(Clone, Debug)] @@ -47,12 +46,7 @@ pub type Watch = StreamWatch>; impl SvidUpdate { pub(super) fn new(svids: Vec) -> Self { - let mut svids_map = HashMap::default(); - for svid in svids.into_iter() { - svids_map.insert(svid.spiffe_id.clone(), svid); - } - - SvidUpdate { svids: svids_map } + SvidUpdate { svids } } } @@ -61,16 +55,16 @@ impl SvidUpdate { impl Svid { #[cfg(test)] pub(super) fn new( - spiffe_id: Id, leaf: DerX509, private_key: Vec, intermediates: Vec, + roots: DerX509, ) -> Self { Self { - spiffe_id, leaf, private_key, intermediates, + roots, } } } @@ -99,13 +93,11 @@ impl TryFrom for Svid { } }; - let spiffe_id = Id::parse_uri(&proto.spiffe_id)?; - Ok(Svid { - spiffe_id, leaf, private_key: proto.x509_svid_key, intermediates: intermediates.to_vec(), + roots: DerX509(proto.bundle), }) } } @@ -200,18 +192,24 @@ impl Recover for GrpcRecover { } } -pub(super) fn process_svid(credentials: &mut C, mut update: SvidUpdate, id: &Id) -> Result<()> +pub(super) fn process_svid(credentials: &mut C, update: SvidUpdate) -> Result<()> where C: Credentials, { - if let Some(svid) = update.svids.remove(id) { + if let Some(svid) = update.svids.into_iter().next() { use x509_parser::prelude::*; let (_, parsed_cert) = X509Certificate::from_der(&svid.leaf.0)?; let exp: u64 = parsed_cert.validity().not_after.timestamp().try_into()?; let exp = UNIX_EPOCH + Duration::from_secs(exp); - return credentials.set_certificate(svid.leaf, svid.intermediates, svid.private_key, exp); + return credentials.set_certificate( + svid.leaf, + svid.intermediates, + svid.private_key, + exp, + svid.roots, + ); } Err(NoMatchingSVIDFound(()).into()) diff --git a/linkerd/proxy/spire-client/src/lib.rs b/linkerd/proxy/spire-client/src/lib.rs index 992e0e1a21..81a8a6a328 100644 --- a/linkerd/proxy/spire-client/src/lib.rs +++ b/linkerd/proxy/spire-client/src/lib.rs @@ -6,47 +6,31 @@ mod api; pub use api::{Api, SvidUpdate}; use linkerd_error::Error; use linkerd_identity::Credentials; -use linkerd_identity::Id; use std::fmt::{Debug, Display}; use tokio::sync::watch; use tower::{util::ServiceExt, Service}; -pub struct Spire { - id: Id, -} - -// === impl Spire === - -impl Spire { - pub fn new(id: Id) -> Self { - Self { id } - } - - pub async fn run(self, credentials: C, mut client: S) - where - C: Credentials, - S: Service<(), Response = tonic::Response>>, - S::Error: Into + Display + Debug, - { - let client = client.ready().await.expect("should be ready"); - let rsp = client - .call(()) - .await - .expect("spire client must gracefully handle errors"); - consume_updates(&self.id, rsp.into_inner(), credentials).await - } +pub async fn run(credentials: C, mut client: S) +where + C: Credentials, + S: Service<(), Response = tonic::Response>>, + S::Error: Into + Display + Debug, +{ + let client = client.ready().await.expect("should be ready"); + let rsp = client + .call(()) + .await + .expect("spire client must gracefully handle errors"); + consume_updates(rsp.into_inner(), credentials).await } -async fn consume_updates( - id: &Id, - mut updates: watch::Receiver, - mut credentials: C, -) where +async fn consume_updates(mut updates: watch::Receiver, mut credentials: C) +where C: Credentials, { loop { let svid_update = updates.borrow_and_update().clone(); - if let Err(error) = api::process_svid(&mut credentials, svid_update, id) { + if let Err(error) = api::process_svid(&mut credentials, svid_update) { tracing::error!(%error, "Error processing SVID update"); } if updates.changed().await.is_err() { @@ -65,13 +49,12 @@ mod tests { use rcgen::{Certificate, CertificateParams, SanType, SerialNumber}; use std::time::SystemTime; - fn gen_svid(id: Id, subject_alt_names: Vec, serial: SerialNumber) -> Svid { + fn gen_svid(subject_alt_names: Vec, serial: SerialNumber) -> Svid { let mut params = CertificateParams::default(); params.subject_alt_names = subject_alt_names; params.serial_number = Some(serial); Svid::new( - id, DerX509( Certificate::from_params(params) .expect("should generate cert") @@ -80,6 +63,7 @@ mod tests { ), Vec::default(), Vec::default(), + DerX509(Vec::default()), ) } @@ -131,6 +115,7 @@ mod tests { _: Vec, _: Vec, _: SystemTime, + _: DerX509, ) -> Result<()> { let (_, cert) = x509_parser::parse_x509_certificate(&leaf.0).unwrap(); let serial = SerialNumber::from_slice(&cert.serial.to_bytes_be()); @@ -142,28 +127,23 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn valid_updates() { let spiffe_san = "spiffe://some-domain/some-workload"; - let spiffe_id = Id::parse_uri("spiffe://some-domain/some-workload").expect("should parse"); let (creds, mut creds_rx) = MockCredentials::new(); - let spire = Spire::new(spiffe_id.clone()); - let serial_1 = SerialNumber::from_slice("some-serial-1".as_bytes()); let update_1 = SvidUpdate::new(vec![gen_svid( - spiffe_id.clone(), vec![SanType::URI(spiffe_san.into())], serial_1.clone(), )]); let (client, svid_tx) = MockClient::new(update_1); - tokio::spawn(spire.run(creds, client)); + tokio::spawn(run(creds, client)); creds_rx.changed().await.unwrap(); assert!(*creds_rx.borrow_and_update() == Some(serial_1)); let serial_2 = SerialNumber::from_slice("some-serial-2".as_bytes()); let update_2 = SvidUpdate::new(vec![gen_svid( - spiffe_id.clone(), vec![SanType::URI(spiffe_san.into())], serial_2.clone(), )]); @@ -177,30 +157,26 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn invalid_update_empty_cert() { let spiffe_san = "spiffe://some-domain/some-workload"; - let spiffe_id = Id::parse_uri("spiffe://some-domain/some-workload").expect("should parse"); let (creds, mut creds_rx) = MockCredentials::new(); - let spire = Spire::new(spiffe_id.clone()); - let serial_1 = SerialNumber::from_slice("some-serial-1".as_bytes()); let update_1 = SvidUpdate::new(vec![gen_svid( - spiffe_id.clone(), vec![SanType::URI(spiffe_san.into())], serial_1.clone(), )]); let (client, svid_tx) = MockClient::new(update_1); - tokio::spawn(spire.run(creds, client)); + tokio::spawn(run(creds, client)); creds_rx.changed().await.unwrap(); assert!(*creds_rx.borrow_and_update() == Some(serial_1.clone())); let invalid_svid = Svid::new( - spiffe_id.clone(), DerX509(Vec::default()), Vec::default(), Vec::default(), + DerX509(Vec::default()), ); let mut update_sent = svid_tx.subscribe(); @@ -218,22 +194,16 @@ mod tests { let spiffe_san = "spiffe://some-domain/some-workload"; let spiffe_san_wrong = "spiffe://some-domain/wrong"; - let spiffe_id = Id::parse_uri("spiffe://some-domain/some-workload").expect("should parse"); - let spiffe_id_wrong = Id::parse_uri("spiffe://some-domain/wrong").expect("should parse"); - let (creds, mut creds_rx) = MockCredentials::new(); - let spire = Spire::new(spiffe_id.clone()); - let serial_1 = SerialNumber::from_slice("some-serial-1".as_bytes()); let update_1 = SvidUpdate::new(vec![gen_svid( - spiffe_id.clone(), vec![SanType::URI(spiffe_san.into())], serial_1.clone(), )]); let (client, svid_tx) = MockClient::new(update_1); - tokio::spawn(spire.run(creds, client)); + tokio::spawn(run(creds, client)); creds_rx.changed().await.unwrap(); assert!(*creds_rx.borrow_and_update() == Some(serial_1.clone())); @@ -241,7 +211,6 @@ mod tests { let serial_2 = SerialNumber::from_slice("some-serial-2".as_bytes()); let mut update_sent = svid_tx.subscribe(); let update_2 = SvidUpdate::new(vec![gen_svid( - spiffe_id_wrong, vec![SanType::URI(spiffe_san_wrong.into())], serial_2.clone(), )]); diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index 51c60b0b8d..2033bcfcb2 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -11,7 +11,7 @@ compile_error!( "at least one of the following TLS implementations must be enabled: 'meshtls-boring', 'meshtls-rustls'" ); -use linkerd_app::{trace, BindTcp, Config, BUILD_INFO}; +use linkerd_app::{identity, metrics::prom, trace, BindTcp, Config, Metrics, BUILD_INFO}; use linkerd_signal as signal; use tokio::{sync::mpsc, time}; use tracing::{debug, info, warn}; @@ -51,16 +51,59 @@ fn main() { } }; + // construct metrics and dns resolver + let (metrics, report) = Metrics::new(config.admin.metrics_retain_idle); + let mut registry = prom::Registry::default(); + let dns = config.dns.clone().build(); + + let identity = { + // Load identity configuration from the environment. + let identity_config = match identity::Config::try_from_env() { + Ok(config) => config, + Err(e) => { + eprintln!("Invalid identity configuration: {}", e); + std::process::exit(EX_USAGE); + } + }; + + debug!("Building Identity client"); + let identity = tracing::info_span!("identity").in_scope(|| { + identity_config.build(dns.resolver.clone(), metrics.control.clone(), &mut registry) + }); + + match identity { + Ok(identity) => identity, + Err(e) => { + eprintln!("Identity client construction: {}", e); + std::process::exit(1); + } + } + }; + // Builds a runtime with the appropriate number of cores: // `LINKERD2_PROXY_CORES` env or the number of available CPUs (as provided // by cgroups, when possible). rt::build().block_on(async move { + let receiver = identity.initialize().await; + let local_id = receiver.local_id(); + info!(id = %local_id, "Certified identity"); + let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel(); let shutdown_grace_period = config.shutdown_grace_period; let bind = BindTcp::with_orig_dst(); let app = match config - .build(bind, bind, BindTcp::default(), shutdown_tx, trace) + .build( + bind, + bind, + BindTcp::default(), + shutdown_tx, + trace, + receiver, + registry, + metrics, + report, + ) .await { Ok(app) => app, @@ -79,7 +122,6 @@ fn main() { Some(addr) => info!("Tap interface on {}", addr), } - // TODO distinguish ServerName and Identity. info!("SNI is {}", app.local_server_name()); info!("Local identity is {}", app.local_tls_id());