diff --git a/Cargo.lock b/Cargo.lock index a604e9e..7f84ec3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -979,6 +979,8 @@ name = "k8-client" version = "11.0.0" dependencies = [ "anyhow", + "async-channel", + "async-lock", "async-trait", "base64 0.13.1", "bytes", @@ -998,6 +1000,7 @@ dependencies = [ "serde", "serde_json", "serde_qs", + "serde_yaml", "tokio", "tracing", ] diff --git a/src/k8-client/Cargo.toml b/src/k8-client/Cargo.toml index 2ddb776..bfb4f7a 100644 --- a/src/k8-client/Cargo.toml +++ b/src/k8-client/Cargo.toml @@ -13,12 +13,15 @@ readme = "README.md" [features] default = ["openssl_tls"] k8 = [] +memory_client = ["async-channel", "async-lock", "serde_yaml"] openssl_tls = ["fluvio-future/openssl_tls"] native_tls = ["fluvio-future/native2_tls"] rust_tls = ["rustls", "fluvio-future/tls"] [dependencies] anyhow = { workspace = true } +async-channel = { version = "1.9.0", optional = true } +async-lock = { version = "2.8.0", optional = true } cfg-if = "1.0" tracing = "0.1.19" bytes = "1.0.1" @@ -33,6 +36,7 @@ pin-utils = "0.1.0" serde = { version="1.0.136", features=['derive'] } serde_json = "1.0.40" serde_qs = "0.12.0" +serde_yaml = { workspace = true, optional = true } async-trait = "0.1.52" fluvio-future = { workspace = true, features=["net", "task"] } k8-metadata-client = { version="6.0.0", path="../k8-metadata-client" } diff --git a/src/k8-client/src/client/memory.rs b/src/k8-client/src/client/memory.rs new file mode 100644 index 0000000..277b0be --- /dev/null +++ b/src/k8-client/src/client/memory.rs @@ -0,0 +1,505 @@ +use std::collections::HashMap; +use std::sync::Arc; +use core::fmt::Display; + +use anyhow::{anyhow, Result}; +use async_channel::{Sender, Receiver, bounded}; +use async_lock::Mutex; +use async_lock::RwLock; +use tracing::debug; +use futures_util::FutureExt; +use futures_util::StreamExt; +use futures_util::stream::BoxStream; +use serde::Serialize; +use serde::de::DeserializeOwned; +use serde_yaml::Value; + +use k8_types::Spec; +use k8_types::ListMetadata; +use k8_types::MetaStatus; +use k8_types::K8Watch; +use k8_metadata_client::TokenStreamResult; +use k8_metadata_client::PatchMergeType; +use k8_metadata_client::NameSpace; +use k8_metadata_client::ListArg; +use k8_metadata_client::MetadataClient; +use k8_types::ObjectMeta; + +use k8_types::options::DeleteOptions; +use k8_types::UpdateK8ObjStatus; +use k8_types::DeleteStatus; +use k8_types::K8List; +use k8_types::InputK8Obj; +use k8_types::K8Meta; +use k8_types::K8Obj; + +/// Store for specific type +/// It is used to store data in memory +/// It also provides the structs used to watch for changes +#[derive(Debug)] +struct SpecStore { + data: Arc>>, + sender: Arc>, + receiver: Receiver, +} + +impl Default for SpecStore { + fn default() -> Self { + let (sender, receiver) = bounded(100); + + Self { + data: Arc::new(RwLock::new(HashMap::new())), + sender: Arc::new(sender), + receiver, + } + } +} + +impl SpecStore { + async fn get(&self, key: &str) -> anyhow::Result> + where + S: DeserializeOwned, + { + let lock = self.data.read().await; + let Some(value) = lock.get(key) else { + return Ok(None); + }; + + let output = value.clone(); + drop(lock); + + Ok(serde_yaml::from_value(output)?) + } + + async fn insert(&self, key: String, mut k8_obj: K8Obj) -> anyhow::Result<()> + where + S: Serialize + Spec + Clone + std::fmt::Debug, + { + let mut lock = self.data.write().await; + + let maybe_old = lock.get(&key); + + let watch: K8Watch = if let Some(old_version) = maybe_old { + let old_k8_obj: K8Obj = serde_yaml::from_value(old_version.clone())?; + let old_resource_version = old_k8_obj + .metadata + .resource_version + .parse::() + .unwrap_or_default(); + k8_obj.metadata.resource_version = (old_resource_version + 1).to_string(); + + K8Watch::MODIFIED(k8_obj.clone()) + } else { + K8Watch::ADDED(k8_obj.clone()) + }; + + let value = serde_yaml::to_value(k8_obj)?; + + lock.insert(key, value); + + drop(lock); + + let watch_value = serde_yaml::to_value(watch)?; + + let _ = self.sender.send(watch_value).await; + + Ok(()) + } + + async fn items(&self) -> anyhow::Result>> + where + S: Spec, + { + let lock = self.data.read().await; + let items: Result>, _> = lock + .values() + .map(|value| serde_yaml::from_value(value.clone())) + .collect(); + + Ok(items?) + } + + async fn remove(&self, key: &str) -> Result> + where + S: Spec, + { + let mut lock = self.data.write().await; + let Some(value) = lock.remove(key) else { + return Ok(None); + }; + + drop(lock); + + let k8_obj: K8Obj = serde_yaml::from_value(value.clone())?; + + let watch: K8Watch = K8Watch::DELETED(k8_obj); + + let watch_value = serde_yaml::to_value(watch)?; + + let _ = self.sender.send(watch_value).await; + + Ok(Some(value)) + } + + fn watch_stream(&self) -> BoxStream<'static, TokenStreamResult> + where + S: Spec + 'static, + S::Status: 'static, + S::Header: 'static, + { + self.receiver + .clone() + .map(|f| { + Ok(vec![ + serde_yaml::from_value::>(f.clone()).map_err(|err| err.into()) + ]) + }) + .boxed() + } +} + +/// In-memory implementation of MetadataClient +/// This is used for testing and in also in contexts where we don't have Kubernetes and +/// there is not need to persist data +#[derive(Debug, Default)] +pub struct MemoryClient { + data: Mutex>>, +} + +impl MemoryClient { + pub fn new_shared() -> Arc { + Arc::new(Self::default()) + } + + async fn get_store(&self) -> Arc { + let kind: String = S::kind(); + let mut stores = self.data.lock().await; + if let Some(store) = stores.get(&kind) { + store.clone() + } else { + let store = Arc::new(SpecStore::default()); + stores.insert(kind, store.clone()); + store + } + } + + pub async fn retrieve_items_inner(&self) -> Result> { + let store = self.get_store::().await; + let items: Vec> = store.items().await?; + Ok(K8List { + api_version: S::api_version(), + kind: S::kind(), + metadata: ListMetadata { + _continue: None, + resource_version: "0".to_owned(), + }, + items, + }) + } +} + +#[async_trait::async_trait] +impl MetadataClient for MemoryClient { + /// retrieval a single item + async fn retrieve_item(&self, metadata: &M) -> Result> + where + S: Spec, + M: K8Meta + Send + Sync, + { + let store = self.get_store::().await; + + let name: String = metadata.name().to_owned(); + let Ok(Some(data)) = store.get::>(&name).await else { + return Err(anyhow!("not found")); + }; + + Ok(data) + } + + async fn retrieve_items_with_option( + &self, + _namespace: N, + _option: Option, + ) -> Result> + where + S: Spec, + N: Into + Send + Sync, + { + self.retrieve_items_inner().await + } + + fn retrieve_items_in_chunks<'a, S, N>( + self: Arc, + _namespace: N, + _limit: u32, + _option: Option, + ) -> BoxStream<'a, K8List> + where + S: Spec + 'static, + N: Into + Send + Sync + 'static, + { + futures_util::stream::pending().boxed() + } + + async fn delete_item_with_option( + &self, + metadata: &M, + _option: Option, + ) -> Result> + where + S: Spec, + M: K8Meta + Send + Sync, + { + let store = self.get_store::().await; + let key = metadata.name(); + + store.remove::(key).await?; + + Ok(DeleteStatus::Deleted(MetaStatus { + api_version: S::api_version(), + code: None, + details: None, + kind: S::kind(), + reason: None, + status: k8_types::StatusEnum::SUCCESS, + message: None, + })) + } + + /// create new object + async fn create_item(&self, value: InputK8Obj) -> Result> + where + S: Spec, + { + let store = self.get_store::().await; + let key = value.metadata.name.clone(); + + let mut k8_obj: K8Obj = K8Obj::new(key.clone(), value.spec); + + let metadata = value.metadata; + + k8_obj.metadata = ObjectMeta { + name: metadata.name, + owner_references: metadata.owner_references, + labels: metadata.labels, + namespace: metadata.namespace, + annotations: metadata.annotations, + finalizers: metadata.finalizers, + ..Default::default() + }; + + store.insert(key, k8_obj.clone()).await?; + + Ok(k8_obj) + } + + /// update status + async fn update_status(&self, value: &UpdateK8ObjStatus) -> Result> + where + S: Spec, + { + let store = self.get_store::().await; + + let key = value.metadata.name.clone(); + debug!(key,?value.status,"start updating status"); + + let k8_value: Option> = store.get(&key).await?; + let k8_value = k8_value.ok_or(anyhow!("not found"))?; + + let k8_obj = k8_value.set_status(value.status.clone()); + debug!(key,?value.status,"overwrite set"); + + store.insert(key, k8_obj.clone()).await?; + + debug!("done"); + + Ok(k8_obj) + } + + /// patch existing with spec + async fn patch( + &self, + _metadata: &M, + _patch: &serde_json::Value, + _merge_type: PatchMergeType, + ) -> Result> + where + S: Spec, + M: K8Meta + Display + Send + Sync, + { + // TODO: implement or move to another trait + unimplemented!() + } + + /// patch status + async fn patch_status( + &self, + _metadata: &M, + _patch: &serde_json::Value, + _merge_type: PatchMergeType, + ) -> Result> + where + S: Spec, + M: K8Meta + Display + Send + Sync, + { + // TODO: implement or move to another trait + unimplemented!() + } + + /// stream items since resource versions + fn watch_stream_since( + &self, + _namespace: N, + _resource_version: Option, + ) -> BoxStream<'_, TokenStreamResult> + where + S: Spec + 'static, + S::Status: 'static, + S::Header: 'static, + N: Into, + { + let ft_stream = async move { + let kind: String = S::kind(); + + let mut lock = self.data.lock().await; + let store = lock.entry(kind).or_default(); + let st2 = store.clone(); + drop(lock); + st2.watch_stream() + }; + + ft_stream.flatten_stream().boxed() + } +} + +#[cfg(test)] +mod test { + use k8_metadata_client::MetadataClient; + use k8_types::{Spec, DefaultHeader, Crd, CrdNames, Status, K8Obj, K8Watch}; + use serde::{Serialize, Deserialize}; + + use super::MemoryClient; + + #[derive(Serialize, Deserialize, Default, Debug, Eq, PartialEq, Clone)] + pub struct MySpec { + value: i32, + } + + #[derive(Serialize, Deserialize, Default, Debug, Eq, PartialEq, Clone)] + pub struct MySpecStatus { + value: i32, + } + + impl Status for MySpecStatus {} + + impl Spec for MySpec { + type Status = MySpecStatus; + type Header = DefaultHeader; + + fn metadata() -> &'static Crd { + &Crd { + group: "test.fluvio", + version: "v1", + names: CrdNames { + kind: "myspec", + plural: "myspecs", + singular: "myspec", + }, + } + } + } + #[fluvio_future::test] + async fn test_metadata_client_impl() { + use futures_util::StreamExt; + let client = MemoryClient::new_shared(); + + let mut stream = client.watch_stream_since::("".into(), None); + + // test create + let my_spec = MySpec { value: 10 }; + let value = K8Obj::new("test".to_owned(), my_spec).as_input(); + client.create_item(value).await.expect("failed to create"); + + let next_value = stream.next().await; + + let values_diffs = next_value.expect("value added"); + let mut values_diffs = values_diffs.expect("unexpected error"); + assert_eq!(values_diffs.len(), 1, "there must be only one value added"); + + let value = values_diffs + .pop() + .expect("expected value") + .expect("expected success"); + + let K8Watch::ADDED(k8_obj) = value else { + panic!("expected added"); + }; + assert_eq!(k8_obj.spec.value, 10); + assert_eq!(k8_obj.status.value, 0); + + // test update + let my_spec = MySpec { value: 15 }; + let value = K8Obj::new("test".to_owned(), my_spec).as_input(); + client.create_item(value).await.expect("failed to create"); + + let next_value = stream.next().await; + let values_diffs = next_value.expect("value added"); + let mut values_diffs = values_diffs.expect("unexpected error"); + assert_eq!(values_diffs.len(), 1, "there must be only one value diff"); + + let value = values_diffs + .pop() + .expect("expected value") + .expect("expected success"); + + let K8Watch::MODIFIED(k8_obj) = value else { + panic!("expected modified"); + }; + assert_eq!(k8_obj.spec.value, 15); + assert_eq!(k8_obj.status.value, 0); + + // test update status + let my_status = MySpecStatus { value: 20 }; + let my_spec = MySpec { value: 15 }; + let value = K8Obj::new("test".to_owned(), my_spec).as_status_update(my_status); + client + .update_status(&value) + .await + .expect("failed to update status"); + let next_value = stream.next().await; + let values_diffs = next_value.expect("value added"); + let mut values_diffs = values_diffs.expect("unexpected error"); + assert_eq!(values_diffs.len(), 1, "there must be only one value diff"); + + let value = values_diffs + .pop() + .expect("expected value") + .expect("expected success"); + + let K8Watch::MODIFIED(k8_obj) = value else { + panic!("expected modified"); + }; + assert_eq!(k8_obj.spec.value, 15); + assert_eq!(k8_obj.status.value, 20); + + // test delete + let meta = k8_obj.metadata.clone(); + client + .delete_item_with_option::(&meta, None) + .await + .expect("failed to delete"); + let next_value = stream.next().await; + let values_diffs = next_value.expect("value added"); + let mut values_diffs = values_diffs.expect("unexpected error"); + assert_eq!(values_diffs.len(), 1, "there must be only one value added"); + + let value = values_diffs + .pop() + .expect("expected value") + .expect("expected success"); + + let K8Watch::DELETED(_) = value else { + panic!("expected deleted"); + }; + } +} diff --git a/src/k8-client/src/client/mod.rs b/src/k8-client/src/client/mod.rs index 92575b2..8816dab 100644 --- a/src/k8-client/src/client/mod.rs +++ b/src/k8-client/src/client/mod.rs @@ -1,5 +1,7 @@ mod client_impl; mod log_stream; +#[cfg(feature = "memory_client")] +pub mod memory; mod list_stream; mod wstream; diff --git a/src/k8-metadata-client/src/client.rs b/src/k8-metadata-client/src/client.rs index 171866c..15a740d 100644 --- a/src/k8-metadata-client/src/client.rs +++ b/src/k8-metadata-client/src/client.rs @@ -181,6 +181,11 @@ pub trait MetadataClient: Send + Sync { code: Some(404), .. }) = err.downcast_ref() { + debug!( + "{}: item '{}' not found, creating ...", + S::label(), + value.metadata.name + ); let created_item = self.create_item(value).await?; Ok(ApplyResult::Created(created_item)) } else { diff --git a/src/k8-metadata-client/src/nothing.rs b/src/k8-metadata-client/src/nothing.rs index 3835d2f..82df682 100644 --- a/src/k8-metadata-client/src/nothing.rs +++ b/src/k8-metadata-client/src/nothing.rs @@ -4,8 +4,7 @@ use std::fmt::Debug; use std::fmt::Display; use std::sync::Arc; -use anyhow::Result; -use anyhow::anyhow; +use anyhow::{anyhow, Result}; use async_trait::async_trait; use futures_util::stream::BoxStream; use futures_util::stream::StreamExt;