Skip to content

Commit

Permalink
feat: use anyhow (#270)
Browse files Browse the repository at this point in the history
Move to anyhow
  • Loading branch information
morenol committed Sep 19, 2023
1 parent 24b31a7 commit 410e3e6
Show file tree
Hide file tree
Showing 23 changed files with 153 additions and 435 deletions.
22 changes: 15 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ members = [
resolver = "2"

[workspace.dependencies]
anyhow = "1.0.38"
fluvio-future = "0.6"
serde_yaml = { version = "0.9.0", default-features = false }
6 changes: 3 additions & 3 deletions src/k8-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
edition = "2021"
name = "k8-client"
version = "10.1.0"
version = "11.0.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Core Kubernetes metadata traits"
repository = "https://github.com/infinyon/k8-api"
Expand All @@ -18,6 +18,7 @@ native_tls = ["fluvio-future/native2_tls"]
rust_tls = ["rustls", "fluvio-future/tls"]

[dependencies]
anyhow = { workspace = true }
cfg-if = "1.0"
tracing = "0.1.19"
bytes = "1.0.1"
Expand All @@ -34,12 +35,11 @@ serde_json = "1.0.40"
serde_qs = "0.12.0"
async-trait = "0.1.52"
fluvio-future = { workspace = true, features=["net", "task"] }
k8-metadata-client = { version="5.1.0", path="../k8-metadata-client" }
k8-metadata-client = { version="6.0.0", path="../k8-metadata-client" }
k8-diff = { version="0.1.0", path="../k8-diff" }
k8-config = { version="2.0.0", path="../k8-config" }
k8-types = { version="0.8.0", path="../k8-types", features=["core", "batch"] }


[dev-dependencies]
rand = "0.8.3"
once_cell = "1.10.0"
Expand Down
4 changes: 1 addition & 3 deletions src/k8-client/k8-fixtures/src/test_fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use std::path::{Path, PathBuf};
use k8_metadata_core::metadata::K8Watch;
use k8_metadata::client::TokenStreamResult;
use k8_metadata::topic::{TopicSpec, TopicStatus};
use k8_client::ClientError;


//
// Topic Watch Fixtures
Expand Down Expand Up @@ -62,7 +60,7 @@ pub fn create_topic_watch(ttw: &TestTopicWatch) -> K8Watch<TopicSpec, TopicStatu

pub fn create_topic_stream_result(
ttw_list: &TestTopicWatchList,
) -> TokenStreamResult<TopicSpec, TopicStatus,ClientError> {
) -> TokenStreamResult<TopicSpec, TopicStatus> {
let mut topic_watch_list = vec![];
for ttw in ttw_list {
topic_watch_list.push(Ok(create_topic_watch(&ttw)));
Expand Down
9 changes: 4 additions & 5 deletions src/k8-client/src/cert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ use std::io::Error as IoError;
use std::io::ErrorKind;
use std::path::Path;

use anyhow::Result;
use tracing::debug;

use k8_config::K8Config;
use k8_config::KubeConfig;
use k8_config::PodConfig;
use k8_config::AuthProviderDetail;

use crate::ClientError;

pub trait ConfigBuilder: Sized {
type Client;

fn new() -> Self;

fn build(self) -> Result<Self::Client, ClientError>;
fn build(self) -> Result<Self::Client>;

fn load_ca_certificate(self, ca_path: impl AsRef<Path>) -> Result<Self, IoError>;

Expand Down Expand Up @@ -76,7 +75,7 @@ where
&self.config
}

pub fn token(&self) -> Result<Option<String>, ClientError> {
pub fn token(&self) -> Result<Option<String>> {
if let Some(token) = &self.external_token {
Ok(Some(token.clone()))
} else if let K8Config::KubeConfig(context) = &self.k8_config() {
Expand Down Expand Up @@ -119,7 +118,7 @@ where
self.k8_config().api_path().to_owned()
}

pub fn build(self) -> Result<B::Client, ClientError> {
pub fn build(self) -> Result<B::Client> {
self.builder.build()
}

Expand Down
55 changes: 26 additions & 29 deletions src/k8-client/src/client/client_impl.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use serde::Deserialize;
use serde::Serialize;
use std::fmt::Debug;
use std::fmt::Display;
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use bytes::Buf;
use futures_util::future::FutureExt;
Expand All @@ -12,6 +11,7 @@ use futures_util::stream::BoxStream;
use futures_util::stream::Stream;
use futures_util::stream::StreamExt;
use futures_util::stream::TryStreamExt;
use http::header::InvalidHeaderValue;
use hyper::body::aggregate;
use hyper::body::Bytes;
use hyper::header::HeaderValue;
Expand All @@ -21,20 +21,20 @@ use hyper::header::CONTENT_TYPE;
use hyper::Body;
use hyper::Request;
use hyper::Uri;
use serde::{Serialize, Deserialize};
use serde::de::DeserializeOwned;
use serde_json::Value;
use tracing::debug;
use tracing::error;
use tracing::trace;

use k8_types::UpdatedK8Obj;
use k8_types::{UpdatedK8Obj, MetaStatus};
use k8_config::K8Config;
use crate::meta_client::{ListArg, MetadataClient, NameSpace, PatchMergeType, TokenStreamResult};
use k8_types::{InputK8Obj, K8List, K8Meta, K8Obj, DeleteStatus, K8Watch, Spec, UpdateK8ObjStatus};
use k8_types::options::{ListOptions, DeleteOptions};

use crate::uri::{item_uri, items_uri};
use crate::ClientError;
use crate::meta_client::{ListArg, MetadataClient, NameSpace, PatchMergeType, TokenStreamResult};

use super::wstream::WatchStream;
use super::{HyperClient, HyperConfigBuilder, ListStream, LogStream};
Expand Down Expand Up @@ -63,12 +63,12 @@ pub struct VersionInfo {

impl K8Client {
// load using default k8 config
pub fn try_default() -> Result<Self, ClientError> {
pub fn try_default() -> Result<Self> {
let config = K8Config::load()?;
Self::new(config)
}

pub fn new(config: K8Config) -> Result<Self, ClientError> {
pub fn new(config: K8Config) -> Result<Self> {
let helper = HyperConfigBuilder::new(config)?;
let host = helper.host();
let token = helper.token()?;
Expand All @@ -81,7 +81,7 @@ impl K8Client {
})
}

pub async fn server_version(&self) -> Result<VersionInfo, ClientError> {
pub async fn server_version(&self) -> Result<VersionInfo> {
let uri = format!("{}/version", self.host);
let info = self
.handle_request(Request::get(uri).body(Body::empty())?)
Expand All @@ -94,7 +94,7 @@ impl K8Client {
&self.host
}

fn finish_request<B>(&self, request: &mut Request<B>) -> Result<(), ClientError>
fn finish_request<B>(&self, request: &mut Request<B>) -> Result<(), InvalidHeaderValue>
where
B: Into<Body>,
{
Expand All @@ -108,7 +108,7 @@ impl K8Client {
}

/// handle request. this is async function
async fn handle_request<T>(&self, mut request: Request<Body>) -> Result<T, ClientError>
async fn handle_request<T>(&self, mut request: Request<Body>) -> Result<T>
where
T: DeserializeOwned,
{
Expand Down Expand Up @@ -139,14 +139,14 @@ impl K8Client {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).map_err(|err| {
error!("unable to read error response: {}", err);
ClientError::HttpResponse(status)
err
})?;
trace!("error response: {}", String::from_utf8_lossy(&buffer));
let api_status = serde_json::from_slice(&buffer).map_err(|err| {
let api_status: MetaStatus = serde_json::from_slice(&buffer).map_err(|err| {
error!("json error: {}", err);
err
})?;
Err(ClientError::ApiResponse(api_status))
Err(api_status.into())
}
}

Expand All @@ -157,7 +157,6 @@ impl K8Client {

let request = http::Request::get(uri)
.body(Body::empty())
.map_err(ClientError::from)
.and_then(|mut req| {
self.finish_request(&mut req)?;
Ok(req)
Expand Down Expand Up @@ -191,7 +190,7 @@ impl K8Client {
}

/// return get stream of uri
fn stream<S>(&self, uri: Uri) -> impl Stream<Item = TokenStreamResult<S, ClientError>> + '_
fn stream<S>(&self, uri: Uri) -> impl Stream<Item = TokenStreamResult<S>> + '_
where
K8Watch<S>: DeserializeOwned,
S: Spec + 'static,
Expand Down Expand Up @@ -227,7 +226,7 @@ impl K8Client {
&self,
namespace: N,
options: Option<ListOptions>,
) -> Result<K8List<S>, ClientError>
) -> Result<K8List<S>>
where
S: Spec,
N: Into<NameSpace> + Send + Sync,
Expand All @@ -243,7 +242,7 @@ impl K8Client {

/// replace existing object.
/// object must exist
pub async fn replace_item<S>(&self, value: UpdatedK8Obj<S>) -> Result<K8Obj<S>, ClientError>
pub async fn replace_item<S>(&self, value: UpdatedK8Obj<S>) -> Result<K8Obj<S>>
where
S: Spec,
{
Expand Down Expand Up @@ -272,7 +271,7 @@ impl K8Client {
namespace: &str,
pod_name: &str,
container_name: &str,
) -> Result<LogStream, ClientError> {
) -> Result<LogStream> {
let sub_resource = format!("/log?container={}&follow={}", container_name, false);
let uri = item_uri::<k8_types::core::pod::PodSpec>(
self.hostname(),
Expand All @@ -287,10 +286,8 @@ impl K8Client {

#[async_trait]
impl MetadataClient for K8Client {
type MetadataClientError = ClientError;

/// retrieval a single item
async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<K8Obj<S>, ClientError>
async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<K8Obj<S>>
where
S: Spec,
M: K8Meta + Send + Sync,
Expand All @@ -306,7 +303,7 @@ impl MetadataClient for K8Client {
&self,
namespace: N,
option: Option<ListArg>,
) -> Result<K8List<S>, ClientError>
) -> Result<K8List<S>>
where
S: Spec,
N: Into<NameSpace> + Send + Sync,
Expand Down Expand Up @@ -336,7 +333,7 @@ impl MetadataClient for K8Client {
&self,
metadata: &M,
option: Option<DeleteOptions>,
) -> Result<DeleteStatus<S>, ClientError>
) -> Result<DeleteStatus<S>>
where
S: Spec,
M: K8Meta + Send + Sync,
Expand Down Expand Up @@ -370,12 +367,12 @@ impl MetadataClient for K8Client {
Ok(DeleteStatus::ForegroundDelete(status))
}
} else {
Err(ClientError::Other(format!("missing kind: {:#?}", values)))
Err(anyhow::anyhow!("missing kind: {:#?}", values))
}
}

/// create new object
async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>, ClientError>
async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>>
where
S: Spec,
{
Expand All @@ -400,7 +397,7 @@ impl MetadataClient for K8Client {
}

/// update status
async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>, ClientError>
async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>>
where
S: Spec,
{
Expand Down Expand Up @@ -432,7 +429,7 @@ impl MetadataClient for K8Client {
metadata: &M,
patch: &Value,
merge_type: PatchMergeType,
) -> Result<K8Obj<S>, ClientError>
) -> Result<K8Obj<S>>
where
S: Spec,
M: K8Meta + Display + Send + Sync,
Expand Down Expand Up @@ -463,7 +460,7 @@ impl MetadataClient for K8Client {
metadata: &M,
patch: &Value,
merge_type: PatchMergeType,
) -> Result<K8Obj<S>, ClientError>
) -> Result<K8Obj<S>>
where
S: Spec,
M: K8Meta + Display + Send + Sync,
Expand Down Expand Up @@ -498,7 +495,7 @@ impl MetadataClient for K8Client {
&self,
namespace: N,
resource_version: Option<String>,
) -> BoxStream<'_, TokenStreamResult<S, Self::MetadataClientError>>
) -> BoxStream<'_, TokenStreamResult<S>>
where
S: Spec + 'static,
S::Status: 'static,
Expand Down
Loading

0 comments on commit 410e3e6

Please sign in to comment.