Skip to content

Commit

Permalink
Various fixes for CLI output (#2262)
Browse files Browse the repository at this point in the history
* Log loaded config files

* Remove printing logo

* Rename Home directory to Data directory in user-facing messages

* Initiating -> Initializing

* Simplify

* Remove check_update
  • Loading branch information
Jesse-Bakker authored Dec 22, 2023
1 parent 356ccd4 commit 0dc64b4
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 162 deletions.
35 changes: 23 additions & 12 deletions dozer-cli/src/cli/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use dozer_tracing::LabelsAndProgress;
use dozer_types::models::config::default_cache_max_map_size;
use dozer_types::prettytable::{row, Table};
use dozer_types::serde_json;
use dozer_types::tracing::info;
use dozer_types::{models::config::Config, serde_yaml};
use handlebars::Handlebars;
use std::collections::BTreeMap;
use std::io::{self, Read};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::runtime::Runtime;

Expand All @@ -24,8 +24,8 @@ pub async fn init_config(
config_token: Option<String>,
config_overrides: Vec<(String, serde_json::Value)>,
ignore_pipe: bool,
) -> Result<Config, CliError> {
let mut config = load_config(config_paths, config_token, ignore_pipe).await?;
) -> Result<(Config, Vec<String>), CliError> {
let (mut config, loaded_files) = load_config(config_paths, config_token, ignore_pipe).await?;

config = apply_overrides(&config, config_overrides)?;

Expand All @@ -35,7 +35,7 @@ pub async fn init_config(
let page_size = page_size::get() as u64;
config.cache_max_map_size = Some(cache_max_map_size / page_size * page_size);

Ok(config)
Ok((config, loaded_files))
}

pub fn get_base_dir() -> Result<Utf8PathBuf, CliError> {
Expand All @@ -61,8 +61,10 @@ pub async fn list_sources(
ignore_pipe: bool,
filter: Option<String>,
) -> Result<(), OrchestrationError> {
let config = init_config(config_paths, config_token, config_overrides, ignore_pipe).await?;
let (config, loaded_files) =
init_config(config_paths, config_token, config_overrides, ignore_pipe).await?;
let dozer = init_dozer(runtime, config, Default::default())?;
info!("Loaded config from: {}", loaded_files.join(", "));
let connection_map = dozer.list_connectors().await?;
let mut table_parent = Table::new();
for (connection_name, (tables, schemas)) in connection_map {
Expand Down Expand Up @@ -99,14 +101,17 @@ async fn load_config(
config_url_or_paths: Vec<String>,
config_token: Option<String>,
ignore_pipe: bool,
) -> Result<Config, CliError> {
) -> Result<(Config, Vec<String>), CliError> {
let read_stdin = atty::isnt(Stream::Stdin) && !ignore_pipe;
let first_config_path = config_url_or_paths.get(0);
match first_config_path {
None => Err(ConfigurationFilePathNotProvided),
Some(path) => {
if path.starts_with("https://") || path.starts_with("http://") {
load_config_from_http_url(path, config_token).await
Ok((
load_config_from_http_url(path, config_token).await?,
vec![path.to_owned()],
))
} else {
load_config_from_file(config_url_or_paths, read_stdin)
}
Expand All @@ -131,21 +136,27 @@ async fn load_config_from_http_url(
pub fn load_config_from_file(
config_path: Vec<String>,
read_stdin: bool,
) -> Result<Config, CliError> {
let stdin_path = PathBuf::from("<stdin>");
) -> Result<(Config, Vec<String>), CliError> {
let stdin_path = "<stdin>";
let input = if read_stdin {
let mut input = String::new();
io::stdin()
.read_to_string(&mut input)
.map_err(|e| CannotReadConfig(stdin_path, e))?;
.map_err(|e| CannotReadConfig(stdin_path.into(), e))?;
Some(input)
} else {
None
};

let config_template = combine_config(config_path.clone(), input)?;
let mut loaded_files = Vec::new();
if input.is_some() {
loaded_files.push(stdin_path.to_owned());
}

let (config_template, files) = combine_config(config_path.clone(), input)?;
loaded_files.extend_from_slice(&files);
match config_template {
Some(template) => parse_config(&template),
Some(template) => Ok((parse_config(&template)?, loaded_files)),
None => Err(FailedToFindConfigurationFiles(config_path.join(", "))),
}
}
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/cli/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub fn generate_config_repl() -> Result<(), OrchestrationError> {
}),
),
(
format!("question: Home directory ({:}): ", default_home_dir()),
format!("question: Data directory ({:}): ", default_home_dir()),
Box::new(move |(home_dir, config)| {
if home_dir.is_empty() {
config.home_dir = Some(default_home_dir());
Expand Down
12 changes: 6 additions & 6 deletions dozer-cli/src/config_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use glob::glob;
pub fn combine_config(
config_paths: Vec<String>,
stdin_yaml: Option<String>,
) -> Result<Option<String>, ConfigCombineError> {
) -> Result<(Option<String>, Vec<String>), ConfigCombineError> {
let mut combined_yaml = serde_yaml::Value::Mapping(Mapping::new());

let mut loaded_files = Vec::new();
let mut config_found = false;
for pattern in config_paths {
let files_glob = glob(&pattern).map_err(WrongPatternOfConfigFilesGlob)?;
Expand All @@ -32,8 +33,8 @@ pub fn combine_config(
if name.contains(".yml") || name.contains(".yaml") {
config_found = true;
}

add_file_content_to_config(&mut combined_yaml, name, content)?;
loaded_files.push(name.to_owned());
}
}
}
Expand All @@ -48,11 +49,10 @@ pub fn combine_config(

if config_found {
// `serde_yaml::from_value` will return deserialization error, not sure why.
serde_yaml::to_string(&combined_yaml)
.map_err(CannotSerializeToString)
.map(Some)
let string = serde_yaml::to_string(&combined_yaml).map_err(CannotSerializeToString)?;
Ok((Some(string), loaded_files))
} else {
Ok(None)
Ok((None, vec![]))
}
}

Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl LiveState {

let cli = Cli::parse();

let config = init_config(
let (config, _) = init_config(
cli.config_paths.clone(),
cli.config_token.clone(),
cli.config_overrides.clone(),
Expand Down
164 changes: 25 additions & 139 deletions dozer-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,20 @@ use dozer_api::shutdown;
use dozer_cli::cli::cloud::CloudCommands;
use dozer_cli::cli::types::{Cli, Commands, ConnectorCommand, RunCommands, SecurityCommands};
use dozer_cli::cli::{generate_config_repl, init_config};
use dozer_cli::cli::{init_dozer, list_sources, LOGO};
use dozer_cli::cli::{init_dozer, list_sources};
use dozer_cli::cloud::{cloud_app_context::CloudAppContext, CloudClient, DozerGrpcCloudClient};
use dozer_cli::errors::{CliError, CloudError, OrchestrationError};
use dozer_cli::{live, set_ctrl_handler, set_panic_hook};
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::config::Config;
use dozer_types::models::telemetry::{TelemetryConfig, TelemetryMetricsConfig};
use dozer_types::serde::Deserialize;
use dozer_types::tracing::{error, error_span, info};
use futures::stream::{AbortHandle, Abortable};
use std::cmp::Ordering;
use std::convert::identity;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::time;

use dozer_types::log::{debug, warn};
use std::time::Duration;
use std::{env, process};
use std::process;

fn main() {
if let Err(e) = run() {
Expand All @@ -30,94 +25,6 @@ fn main() {
}
}

fn render_logo() {
const VERSION: &str = env!("CARGO_PKG_VERSION");

println!("{LOGO}");
println!("\nDozer Version: {VERSION}\n");
}

#[derive(Deserialize, Debug)]
#[serde(crate = "dozer_types::serde")]
struct DozerPackage {
#[serde(rename(deserialize = "latestVersion"))]
pub latest_version: String,
#[serde(rename(deserialize = "availableAssets"))]
pub _available_assets: Vec<String>,
pub link: String,
}

fn version_to_vector(version: &str) -> Vec<i32> {
version.split('.').map(|s| s.parse().unwrap()).collect()
}

fn compare_versions(v1: Vec<i32>, v2: Vec<i32>) -> bool {
for i in 0..v1.len() {
match v1.get(i).cmp(&v2.get(i)) {
Ordering::Greater => return true,
Ordering::Less => return false,
Ordering::Equal => continue,
}
}
false
}

async fn check_update() {
const VERSION: &str = env!("CARGO_PKG_VERSION");
let dozer_env = std::env::var("DOZER_ENV").unwrap_or("local".to_string());
let dozer_dev = std::env::var("DOZER_DEV").unwrap_or("ext".to_string());
let query = vec![
("version", VERSION),
("build", std::env::consts::ARCH),
("os", std::env::consts::OS),
("env", &dozer_env),
("dev", &dozer_dev),
];

let request_url = "https://metadata.dev.getdozer.io/";

let client = reqwest::Client::new();

let mut printed = false;

loop {
let response = client
.get(&request_url.to_string())
.query(&query)
.send()
.await;

match response {
Ok(r) => {
if !printed {
let package: DozerPackage = r.json().await.unwrap();
let current = version_to_vector(VERSION);
let remote = version_to_vector(&package.latest_version);

if compare_versions(remote, current) {
info!("A new version of Dozer is available.");
info!(
"You can download v{}, from {}.",
package.latest_version, package.link
);
printed = true;
}
}
}
Err(e) => {
// We dont show error if error is connection error, because mostly it happens
// when main thread is shutting down before request completes.
if !e.is_connect() {
warn!("Unable to fetch the latest metadata");
}

debug!("Updates check error: {}", e);
}
}
time::sleep(Duration::from_secs(2 * 60 * 60)).await;
}
}

fn run() -> Result<(), OrchestrationError> {
// Reloading trace layer seems impossible, so we are running Cli::parse in a closure
// and then initializing it after reading the configuration. This is a hacky workaround, but it works.
Expand All @@ -136,7 +43,7 @@ fn run() -> Result<(), OrchestrationError> {
// Now we have access to telemetry configuration. Telemetry must be initialized in tokio runtime.
let app_id = config_res
.as_ref()
.map(|c| c.cloud.app_id.as_deref().unwrap_or(&c.app_name))
.map(|(c, _)| c.cloud.app_id.as_deref().unwrap_or(&c.app_name))
.ok();

// We always enable telemetry when running live.
Expand All @@ -148,7 +55,7 @@ fn run() -> Result<(), OrchestrationError> {
} else {
config_res
.as_ref()
.map(|c| c.telemetry.clone())
.map(|(c, _)| c.telemetry.clone())
.unwrap_or_default()
};

Expand All @@ -158,7 +65,8 @@ fn run() -> Result<(), OrchestrationError> {
if let Commands::Cloud(cloud) = &cli.cmd {
return run_cloud(cloud, runtime, &cli);
}
let config = config_res?;
let (config, config_files) = config_res?;
info!("Loaded config from: {}", config_files.join(", "));

let dozer = init_dozer(
runtime.clone(),
Expand All @@ -170,26 +78,16 @@ fn run() -> Result<(), OrchestrationError> {
// run individual servers
(match cli.cmd {
Commands::Run(run) => match run.command {
Some(RunCommands::Api) => {
render_logo();
dozer.runtime.block_on(dozer.run_api(shutdown_receiver))
}
Some(RunCommands::App) => {
render_logo();
dozer
.runtime
.block_on(dozer.run_apps(shutdown_receiver, None))
}
Some(RunCommands::Api) => dozer.runtime.block_on(dozer.run_api(shutdown_receiver)),
Some(RunCommands::App) => dozer
.runtime
.block_on(dozer.run_apps(shutdown_receiver, None)),
Some(RunCommands::Lambda) => {
render_logo();
dozer.runtime.block_on(dozer.run_lambda(shutdown_receiver))
}
None => {
render_logo();
dozer
.runtime
.block_on(dozer.run_all(shutdown_receiver, run.locked))
}
None => dozer
.runtime
.block_on(dozer.run_all(shutdown_receiver, run.locked)),
},
Commands::Security(security) => match security.command {
SecurityCommands::GenerateToken => {
Expand Down Expand Up @@ -235,7 +133,6 @@ fn run() -> Result<(), OrchestrationError> {
panic!("This should not happen as it is handled in parse_and_generate");
}
Commands::Live(live_flags) => {
render_logo();
dozer.runtime.block_on(live::start_live_server(
&dozer.runtime,
shutdown_receiver,
Expand All @@ -256,10 +153,11 @@ fn run_cloud(
runtime: Arc<Runtime>,
cli: &Cli,
) -> Result<(), OrchestrationError> {
render_logo();
let cloud = cloud.clone();

let config = init_configuration(cli, runtime.clone()).ok();
let config = init_configuration(cli, runtime.clone())
.ok()
.map(|(config, _)| config);
let mut cloud_client = CloudClient::new(cloud.clone(), config.clone(), runtime.clone());
match cloud.command.clone() {
CloudCommands::Deploy(deploy) => cloud_client.deploy(deploy, cli.config_paths.clone()),
Expand Down Expand Up @@ -312,27 +210,15 @@ fn parse_and_generate() -> Result<Cli, OrchestrationError> {
)
}

fn init_configuration(cli: &Cli, runtime: Arc<Runtime>) -> Result<Config, CliError> {
dozer_tracing::init_telemetry_closure(
None,
&Default::default(),
|| -> Result<Config, CliError> {
let res = runtime.block_on(init_config(
cli.config_paths.clone(),
cli.config_token.clone(),
cli.config_overrides.clone(),
cli.ignore_pipe,
));

match res {
Ok(config) => {
runtime.spawn(check_update());
Ok(config)
}
Err(e) => Err(e),
}
},
)
fn init_configuration(cli: &Cli, runtime: Arc<Runtime>) -> Result<(Config, Vec<String>), CliError> {
dozer_tracing::init_telemetry_closure(None, &Default::default(), || -> Result<_, CliError> {
runtime.block_on(init_config(
cli.config_paths.clone(),
cli.config_token.clone(),
cli.config_overrides.clone(),
cli.ignore_pipe,
))
})
}

fn display_error(e: &OrchestrationError) {
Expand Down
Loading

0 comments on commit 0dc64b4

Please sign in to comment.