From ee20de904b951463075d77f2e9848d4a94c1affb Mon Sep 17 00:00:00 2001 From: Tim Dikland <31453088+tdikland@users.noreply.github.com> Date: Fri, 29 Dec 2023 22:11:31 +0100 Subject: [PATCH] Add support for generating Azure Blob Storage signed URLs (#22) * add experimental support for Azure signed URLs * remove dead code, adjust tests * syntax error * Revert "syntax error" This reverts commit 16ac2101b6f651afaba26d8660f21db2695d6a73. * rewrite signer builders * remove stray lifetime annotation --------- Co-authored-by: Tim Dikland --- Cargo.lock | 295 +++++++++++++++--- Cargo.toml | 4 + src/bootstrap.rs | 13 + src/server.rs | 20 +- src/server/routers.rs | 16 +- .../routers/shares/schemas/tables/query.rs | 98 +++--- src/server/services/deltalake.rs | 32 +- src/server/utilities/signed_url.rs | 266 +++++++++------- 8 files changed, 536 insertions(+), 208 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4e9f46..7b47e0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "RustyXML" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" + [[package]] name = "addr2line" version = "0.19.0" @@ -23,7 +29,7 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom", + "getrandom 0.2.8", "once_cell", "version_check", ] @@ -36,7 +42,7 @@ checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ "cfg-if 1.0.0", "const-random", - "getrandom", + "getrandom 0.2.8", "once_cell", "version_check", ] @@ -410,7 +416,7 @@ dependencies = [ "chrono", "hmac 0.11.0", "log", - "rand", + "rand 0.8.5", "serde", "serde_json", "sha2 0.9.9", @@ -566,6 +572,75 @@ dependencies = [ "tower-service", ] +[[package]] +name = "azure_core" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ccd63c07d1fbfb3d4543d7ea800941bf5a30db1911b9b9e4db3b2c4210a434f" +dependencies = [ + "async-trait", + "base64 0.21.0", + "bytes", + "dyn-clone", + "futures", + "getrandom 0.2.8", + "http-types", + "log", + "paste", + "pin-project", + "quick-xml 0.31.0", + "rand 0.8.5", + "reqwest", + "rustc_version", + "serde", + "serde_json", + "time 0.3.30", + "url", + "uuid", +] + +[[package]] +name = "azure_storage" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83ca0a07f89fd72a006da4713e93af3d6c44a693e61a1c3c2e7985de39c182e8" +dependencies = [ + "RustyXML", + "async-trait", + "azure_core", + "bytes", + "futures", + "hmac 0.12.1", + "log", + "serde", + "serde_derive", + "serde_json", + "sha2 0.10.6", + "time 0.3.30", + "url", + "uuid", +] + +[[package]] +name = "azure_storage_blobs" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8096c04d370118323c42b2752aa1883e4880a56ef65239f317b359f263b6e194" +dependencies = [ + "RustyXML", + "azure_core", + "azure_storage", + "bytes", + "futures", + "log", + "serde", + "serde_derive", + "serde_json", + "time 0.3.30", + "url", + "uuid", +] + [[package]] name = "backtrace" version = "0.3.67" @@ -859,7 +934,7 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d7d6ab3c3a2282db210df5f02c4dab6e0a7057af0fb7ebd4070f30fe05c0ddb" dependencies = [ - "getrandom", + "getrandom 0.2.8", "once_cell", "proc-macro-hack", "tiny-keccak", @@ -877,7 +952,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7efb37c3e1ccb1ff97164ad95ac1606e8ccd35b3fa0a7d99a304c7f4a428cc24" dependencies = [ - "time 0.3.20", + "time 0.3.30", "version_check", ] @@ -1128,6 +1203,9 @@ dependencies = [ "async-trait", "axum", "axum-extra", + "azure_core", + "azure_storage", + "azure_storage_blobs", "chrono", "clap", "colored", @@ -1143,7 +1221,7 @@ dependencies = [ "jsonwebtoken", "md5", "once_cell", - "rand", + "rand 0.8.5", "rusoto_core 0.48.0", "rusoto_credential 0.48.0", "rusoto_s3", @@ -1158,6 +1236,7 @@ dependencies = [ "tame-gcs", "testcontainers", "testutils", + "time 0.3.30", "tokio", "tower", "tower-http", @@ -1228,6 +1307,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" +dependencies = [ + "powerfmt", + "serde", +] + [[package]] name = "digest" version = "0.9.0" @@ -1314,6 +1403,12 @@ version = "0.15.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03d8c417d7a8cb362e0c37e5d815f5eb7c37f79ff93707329d5a194e42e54ca0" +[[package]] +name = "dyn-clone" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "545b22097d44f8a9581187cdf93de7a71e4722bf51200cfaba810865b49a495d" + [[package]] name = "dynamodb_lock" version = "0.4.3" @@ -1599,6 +1694,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "wasi 0.9.0+wasi-snapshot-preview1", +] + [[package]] name = "getrandom" version = "0.2.8" @@ -1606,8 +1712,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if 1.0.0", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1850,6 +1958,26 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" +[[package]] +name = "http-types" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" +dependencies = [ + "anyhow", + "async-channel", + "base64 0.13.1", + "futures-lite", + "infer", + "pin-project-lite", + "rand 0.7.3", + "serde", + "serde_json", + "serde_qs", + "serde_urlencoded", + "url", +] + [[package]] name = "httparse" version = "1.8.0" @@ -2013,6 +2141,12 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "infer" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" + [[package]] name = "instant" version = "0.1.12" @@ -2466,7 +2600,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "rand", + "rand 0.8.5", "smallvec", "zeroize", ] @@ -2533,6 +2667,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "object" version = "0.30.3" @@ -2558,8 +2701,8 @@ dependencies = [ "itertools 0.10.5", "parking_lot", "percent-encoding", - "quick-xml", - "rand", + "quick-xml 0.28.2", + "rand 0.8.5", "reqwest", "ring", "rustls-pemfile", @@ -2730,7 +2873,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166" dependencies = [ "base64ct", - "rand_core", + "rand_core 0.6.4", "subtle", ] @@ -2889,6 +3032,12 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -2944,6 +3093,16 @@ dependencies = [ "serde", ] +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.32" @@ -2953,6 +3112,19 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom 0.1.16", + "libc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc", +] + [[package]] name = "rand" version = "0.8.5" @@ -2960,8 +3132,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core 0.5.1", ] [[package]] @@ -2971,7 +3153,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom 0.1.16", ] [[package]] @@ -2980,7 +3171,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.8", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core 0.5.1", ] [[package]] @@ -2998,7 +3198,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" dependencies = [ - "getrandom", + "getrandom 0.2.8", "redox_syscall", "thiserror", ] @@ -3051,10 +3251,12 @@ dependencies = [ "http-body", "hyper", "hyper-rustls 0.23.2", + "hyper-tls", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -3064,6 +3266,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", + "tokio-native-tls", "tokio-rustls 0.23.4", "tokio-util", "tower-service", @@ -3117,7 +3320,7 @@ dependencies = [ "num-traits", "pkcs1", "pkcs8", - "rand_core", + "rand_core 0.6.4", "signature", "spki", "subtle", @@ -3544,22 +3747,22 @@ checksum = "e6b44e8fc93a14e66336d230954dda83d18b4605ccace8fe09bc7514a71ad0bc" [[package]] name = "serde" -version = "1.0.155" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71f2b4817415c6d4210bfe1c7bfcf4801b2d904cb4d0e1a8fdb651013c9e86b8" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.155" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d071a94a3fac4aff69d023a7f411e33f40f3483f8c5190b1953822b6b76d7630" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.28", ] [[package]] @@ -3582,6 +3785,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_qs" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" +dependencies = [ + "percent-encoding", + "serde", + "thiserror", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3738,7 +3952,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e1788eed21689f9cf370582dfc467ef36ed9c707f073528ddafa8d83e3b8500" dependencies = [ "digest 0.10.6", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -3750,7 +3964,7 @@ dependencies = [ "num-bigint", "num-traits", "thiserror", - "time 0.3.20", + "time 0.3.30", ] [[package]] @@ -3969,7 +4183,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rand", + "rand 0.8.5", "rsa", "serde", "sha1", @@ -4010,7 +4224,7 @@ dependencies = [ "md-5 0.10.5", "memchr", "once_cell", - "rand", + "rand 0.8.5", "serde", "serde_json", "sha1", @@ -4141,7 +4355,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "thiserror", - "time 0.3.20", + "time 0.3.30", "url", ] @@ -4178,7 +4392,7 @@ dependencies = [ "hex", "hmac 0.12.1", "log", - "rand", + "rand 0.8.5", "serde", "serde_json", "sha2 0.10.6", @@ -4189,7 +4403,7 @@ name = "testutils" version = "1.0.0" dependencies = [ "chrono", - "rand", + "rand 0.8.5", "regex", "tempfile", "uuid", @@ -4249,11 +4463,16 @@ dependencies = [ [[package]] name = "time" -version = "0.3.20" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ + "deranged", "itoa", + "js-sys", + "libc", + "num_threads", + "powerfmt", "serde", "time-core", "time-macros", @@ -4261,15 +4480,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.0" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.8" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd80a657e71da814b8e5d60d3374fc6d35045062245d80224748ae522dd76f36" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] @@ -4664,7 +4883,7 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79" dependencies = [ - "getrandom", + "getrandom 0.2.8", "serde", ] @@ -4764,6 +4983,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index b082e13..72ccee6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,9 @@ async-session = "3.0.0" async-trait = "0.1.64" axum = { version = "0.6.20", features = ["headers"] } axum-extra = { version = "0.8", features = ["json-lines"] } +azure_core = "0.17.0" +azure_storage = "0.17.0" +azure_storage_blobs = "0.17.0" clap = "4.1.4" deltalake = { version = "0.15.0", features = ["s3", "azure", "gcs"] } futures = "0.3.28" @@ -47,6 +50,7 @@ sqlx = { version = "0.7", features = [ strum = { version = "0.25", features = ["derive"] } strum_macros = "0.25" tame-gcs = { version = "0.12.0", features = ["signing"] } +time = { version = "0.3.30", features = ["local-offset"] } tracing = "0.1.37" tracing-log = "0.1.3" tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] } diff --git a/src/bootstrap.rs b/src/bootstrap.rs index aba097b..fa83499 100644 --- a/src/bootstrap.rs +++ b/src/bootstrap.rs @@ -4,6 +4,7 @@ pub(crate) mod gcp; mod postgres; use anyhow::Context; use anyhow::Result; +use azure_storage::StorageCredentials; use rusoto_credential::ProfileProvider; use sqlx::PgPool; use tame_gcs::signing::ServiceAccount; @@ -32,3 +33,15 @@ pub(crate) fn new_aws_profile_provider() -> Result { std::env::var("AWS_PROFILE").context("failed to get `AWS_PROFILE` environment variable")?; aws::new(&aws_profile) } + +pub(crate) fn new_azure_storage_account() -> Result { + let azure_storage_account_name = std::env::var("AZURE_STORAGE_ACCOUNT_NAME") + .context("failed to get `AZURE_STORAGE_ACCOUNT_NAME` environment variable")?; + let azure_storage_account_key = std::env::var("AZURE_STORAGE_ACCOUNT_KEY") + .context("failed to get `AZURE_STORAGE_ACCOUNT_KEY` environment variable")?; + + Ok(StorageCredentials::access_key( + azure_storage_account_name, + azure_storage_account_key, + )) +} diff --git a/src/server.rs b/src/server.rs index d9d8335..d29f829 100644 --- a/src/server.rs +++ b/src/server.rs @@ -8,6 +8,7 @@ pub(crate) mod utilities; use anyhow::Context; use anyhow::Result; +use azure_storage::StorageCredentials; use rusoto_credential::AwsCredentials; use rusoto_credential::ProvideAwsCredentials; use sqlx::PgPool; @@ -35,6 +36,7 @@ pub struct Server { pg_pool: PgPool, gcp_service_account: Option, aws_credentials: Option, + azure_storage_credentials: Option, } impl Server { @@ -60,16 +62,28 @@ impl Server { if aws_credentials.is_none() { tracing::warn!("failed to load AWS credentials"); } + + let azure_storage_credentials = bootstrap::new_azure_storage_account().ok(); + if azure_storage_credentials.is_none() { + tracing::warn!("failed to load Azure Storage credentials"); + } + Ok(Server { pg_pool, gcp_service_account, aws_credentials, + azure_storage_credentials, }) } pub async fn start(self) -> Result<()> { - routers::bind(self.pg_pool, self.gcp_service_account, self.aws_credentials) - .await - .context("failed to start API server") + routers::bind( + self.pg_pool, + self.gcp_service_account, + self.aws_credentials, + self.azure_storage_credentials, + ) + .await + .context("failed to start API server") } } diff --git a/src/server/routers.rs b/src/server/routers.rs index b658666..f5e6d05 100644 --- a/src/server/routers.rs +++ b/src/server/routers.rs @@ -10,6 +10,7 @@ use axum::middleware; use axum::response::Response; use axum::routing::{get, post}; use axum::Router; +use azure_storage::StorageCredentials; use rusoto_credential::AwsCredentials; use sqlx::PgPool; use tame_gcs::signing::ServiceAccount; @@ -26,6 +27,7 @@ pub struct State { pub pg_pool: PgPool, pub gcp_service_account: Option, pub aws_credentials: Option, + pub azure_credentials: Option, } pub type SharedState = Arc; @@ -38,11 +40,13 @@ async fn route( pg_pool: PgPool, gcp_service_account: Option, aws_credentials: Option, + azure_credentials: Option, ) -> Result { let state = Arc::new(State { pg_pool, gcp_service_account, aws_credentials, + azure_credentials, }); let swagger = SwaggerUi::new("/swagger-ui").url("/api-doc/openapi.json", ApiDoc::openapi()); @@ -127,10 +131,16 @@ pub async fn bind( pg_pool: PgPool, gcp_service_account: Option, aws_credentials: Option, + azure_credentials: Option, ) -> Result<()> { - let app = route(pg_pool, gcp_service_account, aws_credentials) - .await - .context("failed to create axum router")?; + let app = route( + pg_pool, + gcp_service_account, + aws_credentials, + azure_credentials, + ) + .await + .context("failed to create axum router")?; let server_bind = config::fetch::("server_bind"); let addr = server_bind.as_str().parse().context(format!( r#"failed to parse "{}" to SocketAddr"#, diff --git a/src/server/routers/shares/schemas/tables/query.rs b/src/server/routers/shares/schemas/tables/query.rs index 38d959d..f59d007 100644 --- a/src/server/routers/shares/schemas/tables/query.rs +++ b/src/server/routers/shares/schemas/tables/query.rs @@ -1,4 +1,5 @@ use anyhow::anyhow; +use anyhow::Context; use axum::extract::Extension; use axum::extract::Json; use axum::extract::Path; @@ -10,6 +11,8 @@ use axum::response::IntoResponse; use axum::response::Response; use axum_extra::json_lines::JsonLines; use std::str::FromStr; +use std::time::Duration; +use tame_gcs::signing::ServiceAccount; use utoipa::IntoParams; use utoipa::ToSchema; @@ -26,6 +29,7 @@ use crate::server::utilities::json::PartitionFilter as JSONPartitionFilter; use crate::server::utilities::json::PredicateJson; use crate::server::utilities::json::Utility as JSONUtility; use crate::server::utilities::signed_url::Platform; +use crate::server::utilities::signed_url::Signer; use crate::server::utilities::signed_url::Utility as SignedUrlUtility; use crate::server::utilities::sql::PartitionFilter as SQLPartitionFilter; use crate::server::utilities::sql::Utility as SQLUtility; @@ -161,46 +165,51 @@ pub async fn post( }; metadata.to_owned() }; - let url_signer = |name: String| match &platform { - Platform::Aws { url, bucket, path } => { - if let Some(aws_credentials) = &state.aws_credentials { - let file: String = format!("{}/{}", path, name); - let Ok(signed) = SignedUrlUtility::sign_aws( - aws_credentials, - bucket, - &file, - &config::fetch::("signed_url_ttl"), - ) else { - tracing::error!("failed to sign up AWS S3 url"); - return url.clone(); - }; - return signed.into(); + let url_signer: Box = match &platform { + Platform::Aws => { + if let Some(creds) = &state.aws_credentials { + Box::new(SignedUrlUtility::aws_signer( + creds.clone(), + Duration::from_secs(config::fetch::("signed_url_ttl")), + )) + } else { + tracing::error!("No credentials found for AWS S3"); + return Err(anyhow!("Error occurred while signing URLs").into()); } - tracing::warn!("AWS credentials were not set"); - url.clone() } - Platform::Gcp { url, bucket, path } => { - if let Some(gcp_service_account) = &state.gcp_service_account { - let file: String = format!("{}/{}", path, name); - let Ok(signed) = SignedUrlUtility::sign_gcp( - gcp_service_account, - bucket, - &file, - &config::fetch::("signed_url_ttl"), - ) else { - tracing::error!("failed to sign up GCP GCS url"); - return url.clone(); - }; - return signed.into(); + Platform::Azure => { + if let Some(creds) = &state.azure_credentials { + Box::new(SignedUrlUtility::azure_signer( + creds.clone(), + Duration::from_secs(config::fetch::("signed_url_ttl")), + )) + } else { + tracing::error!("No credentials found for Azure Blob Storage"); + return Err(anyhow!("Error occurred while signing URLs").into()); } - tracing::warn!("GCP service account was not set"); - url.clone() } - Platform::None { url } => { - tracing::warn!("no supported platforms"); - url.clone() + Platform::Gcp => { + if let Some(_) = &state.gcp_service_account { + let creds = ServiceAccount::load_json_file( + std::env::var("GOOGLE_APPLICATION_CREDENTIALS") + .context("failed to load GCP credentials")?, + ) + .context("failed to load GCP credentials")?; + Box::new(SignedUrlUtility::gcp_signer( + creds, + Duration::from_secs(config::fetch::("signed_url_ttl")), + )) + } else { + tracing::error!("No credentials found for GCP GCS"); + return Err(anyhow!("Error occurred while signing URLs").into()); + } + } + _ => { + tracing::error!("requested cloud platform is not supported"); + return Err(anyhow!("Error occurred while signing URLs").into()); } }; + let mut headers = HeaderMap::new(); headers.insert(HEADER_NAME, table.version().into()); headers.insert( @@ -211,15 +220,18 @@ pub async fn post( Ok(( StatusCode::OK, headers, - JsonLines::new(DeltalakeService::files_from( - table, - metadata, - predicate_hints, - json_predicate_hints, - payload.limit_hint, - is_time_traveled, - &url_signer, - )), + JsonLines::new( + DeltalakeService::files_from( + table, + metadata, + predicate_hints, + json_predicate_hints, + payload.limit_hint, + is_time_traveled, + &url_signer, + ) + .await, + ), ) .into_response()) } diff --git a/src/server/services/deltalake.rs b/src/server/services/deltalake.rs index 89e5565..03723f4 100644 --- a/src/server/services/deltalake.rs +++ b/src/server/services/deltalake.rs @@ -14,6 +14,7 @@ use utoipa::ToSchema; use crate::server::utilities::deltalake::Utility as DeltalakeUtility; use crate::server::utilities::json::PartitionFilter as JSONPartitionFilter; use crate::server::utilities::json::Utility as JSONUtility; +use crate::server::utilities::signed_url::Signer; use crate::server::utilities::sql::PartitionFilter as SQLPartitionFilter; use crate::server::utilities::sql::Utility as SQLUtility; @@ -116,12 +117,7 @@ pub struct File { } impl File { - fn from( - add: Add, - version: Option, - timestamp: Option, - url_signer: &dyn Fn(String) -> String, - ) -> Self { + fn from(add: Add, version: Option, timestamp: Option) -> Self { let mut partition_values: HashMap = HashMap::new(); for (k, v) in add.partition_values.into_iter() { if let Some(v) = v { @@ -131,7 +127,7 @@ impl File { Self { file: FileDetail { id: format!("{:x}", md5::compute(add.path.as_bytes())), - url: url_signer(add.path), + url: add.path, partition_values, size: add.size, stats: add.stats, @@ -140,6 +136,10 @@ impl File { }, } } + + async fn sign(&mut self, url_signer: &S) { + self.file.url = url_signer.sign(&self.file.url).await.unwrap(); + } } pub struct Service; @@ -220,14 +220,14 @@ impl Service { files } - pub fn files_from( + pub async fn files_from( table: DeltaTable, metadata: DeltaTableMetaData, predicate_hints: Option>, json_predicate_hints: Option, limit_hint: Option, is_time_traveled: bool, - url_signer: &dyn Fn(String) -> String, + url_signer: &S, ) -> impl Stream> { let version = if is_time_traveled { Some(table.version()) @@ -247,14 +247,16 @@ impl Service { let files = Self::filter_with_json_hints(files, table.schema().cloned(), json_predicate_hints); let files = Self::filter_with_limit_hint(files, limit_hint); - let mut files = files + let futures = files .into_iter() - .map(|f| { - Ok::(json!(File::from( - f, version, timestamp, url_signer - ))) + .map(|f| async { + let mut file = File::from(f, version, timestamp); + file.sign(url_signer).await; + Ok::(json!(file)) }) - .collect::>>(); + .collect::>(); + let mut files = futures::future::join_all(futures).await; + let mut ret = vec![ Ok(json!(Protocol::new())), Ok(json!(Metadata::from(metadata))), diff --git a/src/server/utilities/signed_url.rs b/src/server/utilities/signed_url.rs index 622783f..f98f723 100644 --- a/src/server/utilities/signed_url.rs +++ b/src/server/utilities/signed_url.rs @@ -3,6 +3,9 @@ use std::time::Duration; use anyhow::Context; use anyhow::Result; +use azure_storage::shared_access_signature::service_sas::BlobSasPermissions; +use azure_storage::StorageCredentials as Azure; +use azure_storage_blobs::prelude::ClientBuilder; use rusoto_core::Region; use rusoto_credential::AwsCredentials as AWS; use rusoto_s3::util::PreSignedRequest; @@ -17,19 +20,10 @@ use url::Url; #[derive(Debug, PartialEq, Eq)] pub enum Platform { - Aws { - url: String, - bucket: String, - path: String, - }, - Gcp { - url: String, - bucket: String, - path: String, - }, - None { - url: String, - }, + Aws, + Gcp, + Azure, + None, } impl FromStr for Platform { @@ -38,146 +32,200 @@ impl FromStr for Platform { fn from_str(input: &str) -> std::result::Result { let url = Url::parse(input).context("failed to parse URL")?; match url.scheme() { - "s3" => Ok(Self::Aws { - url: String::from(url.as_str()), - bucket: String::from(url.domain().unwrap_or("")), - path: String::from(url.path().strip_prefix('/').unwrap_or("")), - }), - "s3a" => Ok(Self::Aws { - url: String::from(url.as_str()), - bucket: String::from(url.domain().unwrap_or("")), - path: String::from(url.path().strip_prefix('/').unwrap_or("")), - }), - "gs" => Ok(Self::Gcp { - url: String::from(url.as_str()), - bucket: String::from(url.domain().unwrap_or("")), - path: String::from(url.path().strip_prefix('/').unwrap_or("")), - }), - _ => Ok(Self::None { - url: String::from(url.as_str()), - }), + "s3" | "s3a" => Ok(Self::Aws), + "abfss" | "abfs" => Ok(Self::Azure), + "gs" => Ok(Self::Gcp), + _ => Ok(Self::None), } } } -pub struct Utility; +#[async_trait::async_trait] +pub trait Signer: Send + Sync { + async fn sign(&self, path: &str) -> Result; +} + +#[async_trait::async_trait] +impl Signer for Box { + async fn sign(&self, path: &str) -> Result { + (**self).sign(path).await + } +} + +pub struct AwsSigner { + pub aws: AWS, + pub expiration: Duration, +} + +#[async_trait::async_trait] +impl Signer for AwsSigner { + async fn sign(&self, path: &str) -> Result { + let url = Url::parse(path).context("failed to parse URL")?; + let bucket = String::from(url.domain().unwrap_or("")); + let path = String::from(url.path().strip_prefix('/').unwrap_or("")); -impl Utility { - pub fn sign_aws(aws: &AWS, bucket: &str, path: &str, duration: &u64) -> Result { let region = Region::default(); let options = PreSignedRequestOption { - expires_in: Duration::from_secs(*duration), + expires_in: self.expiration, }; let request = GetObjectRequest { bucket: bucket.to_string(), key: path.to_string(), ..Default::default() }; - let url = request.get_presigned_url(®ion, aws, &options); + let url = request.get_presigned_url(®ion, &self.aws, &options); let url = Url::parse(&url).context("failed to parse AWS signed URL")?; - Ok(url) + Ok(url.into()) + } +} + +pub struct AzureSigner { + pub azure: Azure, + pub expiration: Duration, +} + +#[async_trait::async_trait] +impl Signer for AzureSigner { + async fn sign(&self, path: &str) -> Result { + let url = Url::parse(path).context("failed to parse URL")?; + + let storage_account = url + .domain() + .and_then(|d| d.split_once(".")) + .map(|m| m.0) + .unwrap_or(""); + let container = url.username(); + let blob = url.path().strip_prefix("/").unwrap_or(""); + + // build azure blob client + let cb = ClientBuilder::new(storage_account.to_string(), self.azure.clone()); + let blob_client = cb.blob_client(container, blob); + + // generate SAS token for signing URLs + let mut permissions = BlobSasPermissions::default(); + permissions.read = true; + let dt = time::OffsetDateTime::now_utc() + self.expiration; + let sas_token = blob_client + .shared_access_signature(permissions, dt) + .await + .unwrap(); + + // sign blobs with SAS token + let url = blob_client.generate_signed_blob_url(&sas_token)?; + Ok(url.into()) } +} + +pub struct GcpSigner { + pub gcp: GCP, + pub expiration: Duration, +} + +#[async_trait::async_trait] +impl Signer for GcpSigner { + async fn sign(&self, path: &str) -> Result { + let url = Url::parse(path).context("failed to parse URL")?; + let bucket = String::from(url.domain().unwrap_or("")); + let path = String::from(url.path().strip_prefix('/').unwrap_or("")); - pub fn sign_gcp(gcp: &GCP, bucket: &str, path: &str, duration: &u64) -> Result { let bucket = BucketName::try_from(bucket).context("failed to parse bucket name")?; let object = ObjectName::try_from(path).context("failed to parse object name")?; let options = SignedUrlOptional { - duration: Duration::from_secs(*duration), + duration: self.expiration, ..Default::default() }; let signer = UrlSigner::with_ring(); let url = signer - .generate(gcp, &(&bucket, &object), options) + .generate(&self.gcp, &(&bucket, &object), options) .context("failed to generate signed url")?; - Ok(url) + Ok(url.into()) + } +} + +pub struct Utility; + +impl Utility { + pub fn aws_signer(aws: AWS, expiration: Duration) -> AwsSigner { + AwsSigner { aws, expiration } + } + + pub fn azure_signer(azure: Azure, expiration: Duration) -> AzureSigner { + AzureSigner { azure, expiration } + } + + pub fn gcp_signer(gcp: GCP, expiration: Duration) -> GcpSigner { + GcpSigner { gcp, expiration } } } #[cfg(test)] mod tests { use super::*; - use crate::bootstrap; - use rusoto_credential::ProvideAwsCredentials; - use std::str::FromStr; - #[test] - fn test_aws_url() { - let bucket = testutils::rand::string(10); - let path = testutils::rand::string(10); - let url = format!("s3://{}/{}", bucket, path); - let provider = Platform::from_str(&url).expect("should parse s3 url properly"); - if let Platform::Aws { - url: parsed_url, - bucket: parsed_bucket, - path: parsed_path, - } = provider - { - assert_eq!(parsed_url, url); - assert_eq!(parsed_bucket, bucket); - assert_eq!(parsed_path, path); - } else { - panic!("should be parsed as S3 url"); - } - } + use rusoto_credential::AwsCredentials; + use serde_json::json; + use std::str::FromStr; + use tame_gcs::signing::ServiceAccount; - #[test] - fn test_gcp_url() { - let bucket = testutils::rand::string(10); - let path = testutils::rand::string(10); - let url = format!("gs://{}/{}", bucket, path); - let provider = Platform::from_str(&url).expect("should parse gcs url properly"); - if let Platform::Gcp { - url: parsed_url, - bucket: parsed_bucket, - path: parsed_path, - } = provider - { - assert_eq!(parsed_url, url); - assert_eq!(parsed_bucket, bucket); - assert_eq!(parsed_path, path); + #[tokio::test] + async fn test_aws_sign_local() { + let creds = AwsCredentials::new("test", "test", None, None); + if let Ok(Platform::Aws) = Platform::from_str("s3://delta-sharing-test/covid") { + let signer = AwsSigner { + aws: creds, + expiration: Duration::from_secs(300), + }; + if let Ok(url) = signer.sign("s3://delta-sharing-test/covid").await { + println!("{:?}", url); + } else { + panic!("failed to sign S3 url") + } } else { - panic!("should be parsed as GS url"); - } + panic!("failed to parse S3 url"); + }; } - //#[tokio::test] - async fn test_aws_sign_local() { - let aws_profile = std::env::var("AWS_PROFILE").expect("AWS profile should be specified"); - let pp = bootstrap::aws::new(&aws_profile) - .expect("AWS profile provider should be created properly"); - let creds = pp - .credentials() - .await - .expect("AWS credentials should be acquired properly"); - if let Ok(Platform::Aws { bucket, path, .. }) = - Platform::from_str("s3://delta-sharing-test/covid") + #[tokio::test] + async fn test_azure_sign_local() { + let creds = Azure::access_key("mystorageaccount", "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="); + if let Ok(Platform::Azure) = + Platform::from_str("abfss://mycontainer@mystorageaccount.dfs.core.windows.net/myblob") { - if let Ok(url) = Utility::sign_aws(&creds, &bucket, &path, &300) { + let signer = AzureSigner { + azure: creds, + expiration: Duration::from_secs(300), + }; + if let Ok(url) = signer + .sign("abfss://mycontainer@mystorageaccount.dfs.core.windows.net/myblob") + .await + { println!("{:?}", url); + } else { + panic!("failed to sign Azure url") } } else { - panic!("failed to parse S3 url"); + panic!("failed to parse Azure url"); }; } - //#[tokio::test] + #[tokio::test] async fn test_gcp_sign_local() { - let path = format!( - "{}", - shellexpand::tilde( - std::env::var("GOOGLE_APPLICATION_CREDENTIALS") - .ok() - .unwrap_or("~/.gcp/service-account-file.json".into()) - .as_str() - ) - ); - let sa = - bootstrap::gcp::new(&path).expect("GCP service account should be created properly"); - if let Ok(Platform::Gcp { bucket, path, .. }) = - Platform::from_str("gs://delta-sharing-test/covid") - { - if let Ok(url) = Utility::sign_gcp(&sa, &bucket, &path, &300) { + let creds = json!({ + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCestzikbw/l1ga\ntLSJYoiQCtX5EM+Dfh3Xn4lefol5uhX0anmrTWIidZ1CSn512tQc6WUKQOHJhDUW\naxDFEr0bwmN+vvguVHXHIRV4ilVbNykt21g3eWGHBOnY0cCjLbSICEQGcqqSCDHe\nhVkKXH4vkpahtX8P0+DiAMZ1Ou+4AaOK6BTUWDIR63+LRGd3wrx8VhzDPJHK2R/q\nO23bM0E8ktNl0Rs0Q/a2krniFwx7f7e/RZrkJ+kq+f6hZrgYwufij1P87XmNKl1T\nz+7xJXsPc72wQkeqr7AGqJe8KsiX9Ajlms7rWQRvx0sJrJYWnT+t++MqOzSXcwB0\nS1tsCOnjAgMBAAECggEAAcwTE0pvitAlx8ZPGxUvaGOEW880bQ/zD5DiHIdV+uJq\nwuX9Hb2rxFR3pPC0sOn6/Ul+rZiMK1zgwFxoXWBNHZuHG+rWuFOYMoVDOdEQjrli\nW9TAPCnsOvwJF+hRhFI4A/2v+NtjPMcfUB6gzQISIxYdDZTBbRugUCmOax/GkjCx\n+GtlxszPfhqgQCsDTXaoceJZlLGg6aZwW7gyHkXOt1P+DDom0cS6GBxR5aIf13Fr\n+I3/318IVCz7JATLvSMETQD2KMt7KAvC6sdLsHqgPBCNJ3u1WSeXwnwxHdCfTkPn\nWLiRCznz4CeVCiY18z+DopmBYPXUMJKap8SaCNsFNQKBgQDf84Yvu6flUBdhIjVL\nfQd1d/TtXa0D0IpnqqkeX51P5cFfjNCzzwHfIv8NWvoQ2LdVeLE0eiNxgHJYj1M+\nv5/FgmLMZGJVysCNI/g+nkjxr7dVtLtpMI55Vm5/cIlsOk85qGPefmaqrHxO8ryh\niN1Hb6QCsccG+Dh3ZoAgwyi1pwKBgQC1aM3Zh8kXfne54AdNBqU7rVqkt6QKoiuI\niPd9fC+sTh9zbVARa0dQQm55x+V0+PvAZNRLueci6ZqRjAhnm3MCLLAfvTsS594D\nDRpjMXdljMQ064jhvmPQ2Q+h+uPzBtH1I5q08CN4BdKXsN1oz7PSfcNU9KyUu/yn\n2tO/iEKpZQKBgQCijg8ugpXB2zq9JKlum9hYKbQ8vywggrSTvsp244w6PFj6VCoA\n+hcvsiVTul+c7tFUVwC5SJaFgmh9Y7tW5pzALn4sQgkmoL7XM+6y9Q2ZcKQwr7kB\nB1/DLzuRgUwepMxw24tyKmm3JPAuFf9ZeRC1E5IG6qe+pVnHQT1rinz4LQKBgFHb\nTKePgcm8I0IYOLMlAIIBIxmYU8kIjCQ7yZEx7EEPr1liRfLWOYOZtkf1TzCM+OxD\nkxfodsdmKXzrdw9pMWgVyhNIS9OoFKHD09hWhc2oyxAmB8n1Iw0mJMuubhVHSo4W\n1sQ2Z4rM9c3E3ONidX3RicZX8Vfby5HiSBHw5kORAoGAaLRFJmgLq092NNtf5O1u\n+9hs/xVxIlUk4yHq7stpHipiUsy4qPHNr2KNP74Y3Ki3jVqULf0oiq72W/1dssTR\n0HmFe832OfH0gjMJyFtH9t0Wi6WyZpRYYvPA8Hl2tTBuSLsQY3p5A3E5okT8TbId\nx1Ee1vUKIFIVfK+pb/l6/lg=\n-----END PRIVATE KEY-----\n", + "client_email": "real-address@very-good-project-id.iam.gserviceaccount.com", + }); + let sa = ServiceAccount::load_json(creds.to_string().as_bytes()) + .expect("GCP service account should be created properly"); + + if let Ok(Platform::Gcp) = Platform::from_str("gs://delta-sharing-test/covid") { + let signer = GcpSigner { + gcp: sa, + expiration: Duration::from_secs(300), + }; + if let Ok(url) = signer.sign("gs://delta-sharing-test/covid").await { println!("{:?}", url); + } else { + panic!("failed to sign GCS url") } } else { panic!("failed to parse GS url");