Skip to content

Commit

Permalink
feat: in-memory metadata client and move to anyhow
Browse files Browse the repository at this point in the history
  • Loading branch information
morenol committed Sep 18, 2023
1 parent 577ad7e commit f43728e
Show file tree
Hide file tree
Showing 18 changed files with 650 additions and 375 deletions.
24 changes: 18 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ members = [
resolver = "2"

[workspace.dependencies]
anyhow = "1.0.38"
fluvio-future = "0.6"
serde_yaml = { version = "0.9.0", default-features = false }
8 changes: 6 additions & 2 deletions src/k8-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
edition = "2021"
name = "k8-client"
version = "10.1.0"
version = "11.0.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Core Kubernetes metadata traits"
repository = "https://github.com/infinyon/k8-api"
Expand All @@ -18,6 +18,7 @@ native_tls = ["fluvio-future/native2_tls"]
rust_tls = ["rustls", "fluvio-future/tls"]

[dependencies]
anyhow = { workspace = true }
cfg-if = "1.0"
tracing = "0.1.19"
bytes = "1.0.1"
Expand All @@ -32,12 +33,15 @@ pin-utils = "0.1.0"
serde = { version="1.0.136", features=['derive'] }
serde_json = "1.0.40"
serde_qs = "0.12.0"
serde_yaml = { workspace = true }
async-trait = "0.1.52"
fluvio-future = { workspace = true, features=["net", "task"] }
k8-metadata-client = { version="5.1.0", path="../k8-metadata-client" }
k8-metadata-client = { version="6.0.0", path="../k8-metadata-client" }
k8-diff = { version="0.1.0", path="../k8-diff" }
k8-config = { version="2.0.0", path="../k8-config" }
k8-types = { version="0.8.0", path="../k8-types", features=["core", "batch"] }
async-lock = "2.8.0"
async-channel = "1.9.0"


[dev-dependencies]
Expand Down
9 changes: 4 additions & 5 deletions src/k8-client/src/cert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ use std::io::Error as IoError;
use std::io::ErrorKind;
use std::path::Path;

use anyhow::Result;
use tracing::debug;

use k8_config::K8Config;
use k8_config::KubeConfig;
use k8_config::PodConfig;
use k8_config::AuthProviderDetail;

use crate::ClientError;

pub trait ConfigBuilder: Sized {
type Client;

fn new() -> Self;

fn build(self) -> Result<Self::Client, ClientError>;
fn build(self) -> Result<Self::Client>;

fn load_ca_certificate(self, ca_path: impl AsRef<Path>) -> Result<Self, IoError>;

Expand Down Expand Up @@ -76,7 +75,7 @@ where
&self.config
}

pub fn token(&self) -> Result<Option<String>, ClientError> {
pub fn token(&self) -> Result<Option<String>> {
if let Some(token) = &self.external_token {
Ok(Some(token.clone()))
} else if let K8Config::KubeConfig(context) = &self.k8_config() {
Expand Down Expand Up @@ -119,7 +118,7 @@ where
self.k8_config().api_path().to_owned()
}

pub fn build(self) -> Result<B::Client, ClientError> {
pub fn build(self) -> Result<B::Client> {
self.builder.build()
}

Expand Down
49 changes: 24 additions & 25 deletions src/k8-client/src/client/client_impl.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use http::header::InvalidHeaderValue;
use k8_types::MetaStatus;
use serde::Deserialize;
use serde::Serialize;
use std::fmt::Debug;
use std::fmt::Display;
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use bytes::Buf;
use futures_util::future::FutureExt;
Expand Down Expand Up @@ -34,7 +37,6 @@ use k8_types::{InputK8Obj, K8List, K8Meta, K8Obj, DeleteStatus, K8Watch, Spec, U
use k8_types::options::{ListOptions, DeleteOptions};

use crate::uri::{item_uri, items_uri};
use crate::ClientError;

use super::wstream::WatchStream;
use super::{HyperClient, HyperConfigBuilder, ListStream, LogStream};
Expand Down Expand Up @@ -63,12 +65,12 @@ pub struct VersionInfo {

impl K8Client {
// load using default k8 config
pub fn try_default() -> Result<Self, ClientError> {
pub fn try_default() -> Result<Self> {
let config = K8Config::load()?;
Self::new(config)
}

pub fn new(config: K8Config) -> Result<Self, ClientError> {
pub fn new(config: K8Config) -> Result<Self> {
let helper = HyperConfigBuilder::new(config)?;
let host = helper.host();
let token = helper.token()?;
Expand All @@ -81,7 +83,7 @@ impl K8Client {
})
}

pub async fn server_version(&self) -> Result<VersionInfo, ClientError> {
pub async fn server_version(&self) -> Result<VersionInfo> {
let uri = format!("{}/version", self.host);
let info = self
.handle_request(Request::get(uri).body(Body::empty())?)
Expand All @@ -94,7 +96,7 @@ impl K8Client {
&self.host
}

fn finish_request<B>(&self, request: &mut Request<B>) -> Result<(), ClientError>
fn finish_request<B>(&self, request: &mut Request<B>) -> Result<(), InvalidHeaderValue>
where
B: Into<Body>,
{
Expand All @@ -108,7 +110,7 @@ impl K8Client {
}

/// handle request. this is async function
async fn handle_request<T>(&self, mut request: Request<Body>) -> Result<T, ClientError>
async fn handle_request<T>(&self, mut request: Request<Body>) -> Result<T>
where
T: DeserializeOwned,
{
Expand Down Expand Up @@ -139,14 +141,14 @@ impl K8Client {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).map_err(|err| {
error!("unable to read error response: {}", err);
ClientError::HttpResponse(status)
err
})?;
trace!("error response: {}", String::from_utf8_lossy(&buffer));
let api_status = serde_json::from_slice(&buffer).map_err(|err| {
let api_status: MetaStatus = serde_json::from_slice(&buffer).map_err(|err| {
error!("json error: {}", err);
err
})?;
Err(ClientError::ApiResponse(api_status))
Err(api_status.into())
}
}

Expand All @@ -157,7 +159,6 @@ impl K8Client {

let request = http::Request::get(uri)
.body(Body::empty())
.map_err(ClientError::from)
.and_then(|mut req| {
self.finish_request(&mut req)?;
Ok(req)
Expand Down Expand Up @@ -191,7 +192,7 @@ impl K8Client {
}

/// return get stream of uri
fn stream<S>(&self, uri: Uri) -> impl Stream<Item = TokenStreamResult<S, ClientError>> + '_
fn stream<S>(&self, uri: Uri) -> impl Stream<Item = TokenStreamResult<S>> + '_
where
K8Watch<S>: DeserializeOwned,
S: Spec + 'static,
Expand Down Expand Up @@ -227,7 +228,7 @@ impl K8Client {
&self,
namespace: N,
options: Option<ListOptions>,
) -> Result<K8List<S>, ClientError>
) -> Result<K8List<S>>
where
S: Spec,
N: Into<NameSpace> + Send + Sync,
Expand All @@ -243,7 +244,7 @@ impl K8Client {

/// replace existing object.
/// object must exist
pub async fn replace_item<S>(&self, value: UpdatedK8Obj<S>) -> Result<K8Obj<S>, ClientError>
pub async fn replace_item<S>(&self, value: UpdatedK8Obj<S>) -> Result<K8Obj<S>>
where
S: Spec,
{
Expand Down Expand Up @@ -272,7 +273,7 @@ impl K8Client {
namespace: &str,
pod_name: &str,
container_name: &str,
) -> Result<LogStream, ClientError> {
) -> Result<LogStream> {
let sub_resource = format!("/log?container={}&follow={}", container_name, false);
let uri = item_uri::<k8_types::core::pod::PodSpec>(
self.hostname(),
Expand All @@ -287,10 +288,8 @@ impl K8Client {

#[async_trait]
impl MetadataClient for K8Client {
type MetadataClientError = ClientError;

/// retrieval a single item
async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<K8Obj<S>, ClientError>
async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<K8Obj<S>>
where
S: Spec,
M: K8Meta + Send + Sync,
Expand All @@ -306,7 +305,7 @@ impl MetadataClient for K8Client {
&self,
namespace: N,
option: Option<ListArg>,
) -> Result<K8List<S>, ClientError>
) -> Result<K8List<S>>
where
S: Spec,
N: Into<NameSpace> + Send + Sync,
Expand Down Expand Up @@ -336,7 +335,7 @@ impl MetadataClient for K8Client {
&self,
metadata: &M,
option: Option<DeleteOptions>,
) -> Result<DeleteStatus<S>, ClientError>
) -> Result<DeleteStatus<S>>
where
S: Spec,
M: K8Meta + Send + Sync,
Expand Down Expand Up @@ -370,12 +369,12 @@ impl MetadataClient for K8Client {
Ok(DeleteStatus::ForegroundDelete(status))
}
} else {
Err(ClientError::Other(format!("missing kind: {:#?}", values)))
Err(anyhow::anyhow!("missing kind: {:#?}", values))
}
}

/// create new object
async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>, ClientError>
async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>>
where
S: Spec,
{
Expand All @@ -400,7 +399,7 @@ impl MetadataClient for K8Client {
}

/// update status
async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>, ClientError>
async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>>
where
S: Spec,
{
Expand Down Expand Up @@ -432,7 +431,7 @@ impl MetadataClient for K8Client {
metadata: &M,
patch: &Value,
merge_type: PatchMergeType,
) -> Result<K8Obj<S>, ClientError>
) -> Result<K8Obj<S>>
where
S: Spec,
M: K8Meta + Display + Send + Sync,
Expand Down Expand Up @@ -463,7 +462,7 @@ impl MetadataClient for K8Client {
metadata: &M,
patch: &Value,
merge_type: PatchMergeType,
) -> Result<K8Obj<S>, ClientError>
) -> Result<K8Obj<S>>
where
S: Spec,
M: K8Meta + Display + Send + Sync,
Expand Down Expand Up @@ -498,7 +497,7 @@ impl MetadataClient for K8Client {
&self,
namespace: N,
resource_version: Option<String>,
) -> BoxStream<'_, TokenStreamResult<S, Self::MetadataClientError>>
) -> BoxStream<'_, TokenStreamResult<S>>
where
S: Spec + 'static,
S::Status: 'static,
Expand Down
Loading

0 comments on commit f43728e

Please sign in to comment.