From 95e4a8f607c5b5b4738be721022fe82d87ebf4b5 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 3 May 2024 23:35:09 +0200 Subject: [PATCH] feat: add missing discovery routes --- delta-sharing/server/src/server.rs | 223 ++++++++++++++++++++++++++--- 1 file changed, 206 insertions(+), 17 deletions(-) diff --git a/delta-sharing/server/src/server.rs b/delta-sharing/server/src/server.rs index 5e5672c..d7027f7 100644 --- a/delta-sharing/server/src/server.rs +++ b/delta-sharing/server/src/server.rs @@ -4,8 +4,9 @@ use axum::extract::{Path, Query, State}; use axum::{routing::get, Json, Router}; use axum_extra::{headers, headers::authorization::Bearer, TypedHeader}; use delta_sharing_core::{ - DiscoveryHandler, GetShareRequest, GetShareResponse, ListSharesRequest, ListSharesResponse, - RecipientHandler, + DiscoveryHandler, GetShareRequest, GetShareResponse, ListSchemaTablesRequest, + ListSchemaTablesResponse, ListSchemasRequest, ListSchemasResponse, ListShareTablesRequest, + ListShareTablesResponse, ListSharesRequest, ListSharesResponse, RecipientHandler, }; use serde::Deserialize; @@ -63,10 +64,89 @@ async fn get_share( Ok(Json(response)) } +async fn list_schemas( + State(state): State>, + autorization: Option>>, + pagination: Query, + Path(share): Path, +) -> Result> { + let request = ListSchemasRequest { + max_results: pagination.0.max_results, + page_token: pagination.0.page_token, + share: share.to_ascii_lowercase(), + }; + + let recipient = state + .auth + .get_recipient(autorization.map(|a| a.0.token().to_string())) + .await?; + + let response = state.discovery.list_schemas(request, recipient).await?; + + Ok(Json(response)) +} + +async fn list_share_tables( + State(state): State>, + autorization: Option>>, + pagination: Query, + Path(share): Path, +) -> Result> { + let request = ListShareTablesRequest { + max_results: pagination.0.max_results, + page_token: pagination.0.page_token, + share: share.to_ascii_lowercase(), + }; + + let recipient = state + .auth + .get_recipient(autorization.map(|a| a.0.token().to_string())) + .await?; + + let response = state + .discovery + .list_share_tables(request, recipient) + .await?; + + Ok(Json(response)) +} + +async fn list_schema_tables( + State(state): State>, + autorization: Option>>, + pagination: Query, + Path((share, schema)): Path<(String, String)>, +) -> Result> { + let request = ListSchemaTablesRequest { + max_results: pagination.0.max_results, + page_token: pagination.0.page_token, + share: share.to_ascii_lowercase(), + schema: schema.to_ascii_lowercase(), + }; + + let recipient = state + .auth + .get_recipient(autorization.map(|a| a.0.token().to_string())) + .await?; + + let response = state + .discovery + .list_schema_tables(request, recipient) + .await?; + + Ok(Json(response)) +} + pub fn get_router(state: DeltaSharingState) -> Router { Router::new() .route("/shares", get(list_shares)) .route("/shares/:share", get(get_share)) + .route("/shares/:share/schemas", get(list_schemas)) + .route("/shares/:share/all-tables", get(list_share_tables)) + .route( + "/shares/:share/schemas/:schema/tables", + get(list_schema_tables), + ) .with_state(state) } @@ -82,13 +162,16 @@ mod tests { use super::*; use crate::tests::test_handler; - #[tokio::test] - async fn test_list_shares() { - let state = DeltaSharingState { + fn get_state() -> DeltaSharingState<()> { + DeltaSharingState { discovery: Arc::new(test_handler()), auth: Arc::new(VoidRecipientHandler {}), - }; - let app = get_router(state); + } + } + + #[tokio::test] + async fn test_list_shares() { + let app = get_router(get_state()); let request = Request::builder() .uri("/shares") @@ -109,11 +192,7 @@ mod tests { #[tokio::test] async fn test_get_share() { - let state = DeltaSharingState { - discovery: Arc::new(test_handler()), - auth: Arc::new(VoidRecipientHandler {}), - }; - let app = get_router(state); + let app = get_router(get_state()); let request = Request::builder() .uri("/shares/share1") @@ -134,11 +213,7 @@ mod tests { #[tokio::test] async fn test_get_share_not_found() { - let state = DeltaSharingState { - discovery: Arc::new(test_handler()), - auth: Arc::new(VoidRecipientHandler {}), - }; - let app = get_router(state); + let app = get_router(get_state()); let request: Request = Request::builder() .uri("/shares/nonexistent") @@ -152,4 +227,118 @@ mod tests { let response = app.oneshot(request).await.unwrap(); assert_eq!(response.status(), StatusCode::NOT_FOUND); } + + #[tokio::test] + async fn test_list_schemas() { + let app = get_router(get_state()); + + let request = Request::builder() + .uri("/shares/share1/schemas") + .header( + header::AUTHORIZATION, + HeaderValue::from_str("Bearer token").unwrap(), + ) + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + assert!(response.status().is_success()); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + let result = serde_json::from_slice::(&body).unwrap(); + assert_eq!(result.items.len(), 1); + } + + #[tokio::test] + async fn test_list_schemas_not_found() { + let app = get_router(get_state()); + + let request: Request = Request::builder() + .uri("/shares/nonexistent/schemas") + .header( + header::AUTHORIZATION, + HeaderValue::from_str("Bearer token").unwrap(), + ) + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn test_list_share_tables() { + let app = get_router(get_state()); + + let request = Request::builder() + .uri("/shares/share1/all-tables") + .header( + header::AUTHORIZATION, + HeaderValue::from_str("Bearer token").unwrap(), + ) + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + assert!(response.status().is_success()); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + let result = serde_json::from_slice::(&body).unwrap(); + assert_eq!(result.items.len(), 1); + } + + #[tokio::test] + async fn test_list_share_tables_not_found() { + let app = get_router(get_state()); + + let request: Request = Request::builder() + .uri("/shares/nonexistent/all-tables") + .header( + header::AUTHORIZATION, + HeaderValue::from_str("Bearer token").unwrap(), + ) + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn test_list_schema_tables() { + let app = get_router(get_state()); + + let request = Request::builder() + .uri("/shares/share1/schemas/schema1/tables") + .header( + header::AUTHORIZATION, + HeaderValue::from_str("Bearer token").unwrap(), + ) + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + assert!(response.status().is_success()); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + let result = serde_json::from_slice::(&body).unwrap(); + assert_eq!(result.items.len(), 1); + } + + #[tokio::test] + async fn test_list_schema_tables_not_found() { + let app = get_router(get_state()); + + let request: Request = Request::builder() + .uri("/shares/share1/schemas/nonexistent/tables") + .header( + header::AUTHORIZATION, + HeaderValue::from_str("Bearer token").unwrap(), + ) + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } }