Skip to content

Commit

Permalink
feat: add missing discovery routes
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed May 3, 2024
1 parent fa99766 commit 95e4a8f
Showing 1 changed file with 206 additions and 17 deletions.
223 changes: 206 additions & 17 deletions delta-sharing/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use axum::extract::{Path, Query, State};
use axum::{routing::get, Json, Router};
use axum_extra::{headers, headers::authorization::Bearer, TypedHeader};
use delta_sharing_core::{
DiscoveryHandler, GetShareRequest, GetShareResponse, ListSharesRequest, ListSharesResponse,
RecipientHandler,
DiscoveryHandler, GetShareRequest, GetShareResponse, ListSchemaTablesRequest,
ListSchemaTablesResponse, ListSchemasRequest, ListSchemasResponse, ListShareTablesRequest,
ListShareTablesResponse, ListSharesRequest, ListSharesResponse, RecipientHandler,
};
use serde::Deserialize;

Expand Down Expand Up @@ -63,10 +64,89 @@ async fn get_share<T: Send>(
Ok(Json(response))
}

async fn list_schemas<T: Send>(
State(state): State<DeltaSharingState<T>>,
autorization: Option<TypedHeader<headers::Authorization<Bearer>>>,
pagination: Query<Pagination>,
Path(share): Path<String>,
) -> Result<Json<ListSchemasResponse>> {
let request = ListSchemasRequest {
max_results: pagination.0.max_results,
page_token: pagination.0.page_token,
share: share.to_ascii_lowercase(),
};

let recipient = state
.auth
.get_recipient(autorization.map(|a| a.0.token().to_string()))
.await?;

let response = state.discovery.list_schemas(request, recipient).await?;

Ok(Json(response))
}

async fn list_share_tables<T: Send>(
State(state): State<DeltaSharingState<T>>,
autorization: Option<TypedHeader<headers::Authorization<Bearer>>>,
pagination: Query<Pagination>,
Path(share): Path<String>,
) -> Result<Json<ListShareTablesResponse>> {
let request = ListShareTablesRequest {
max_results: pagination.0.max_results,
page_token: pagination.0.page_token,
share: share.to_ascii_lowercase(),
};

let recipient = state
.auth
.get_recipient(autorization.map(|a| a.0.token().to_string()))
.await?;

let response = state
.discovery
.list_share_tables(request, recipient)
.await?;

Ok(Json(response))
}

async fn list_schema_tables<T: Send>(
State(state): State<DeltaSharingState<T>>,
autorization: Option<TypedHeader<headers::Authorization<Bearer>>>,
pagination: Query<Pagination>,
Path((share, schema)): Path<(String, String)>,
) -> Result<Json<ListSchemaTablesResponse>> {
let request = ListSchemaTablesRequest {
max_results: pagination.0.max_results,
page_token: pagination.0.page_token,
share: share.to_ascii_lowercase(),
schema: schema.to_ascii_lowercase(),
};

let recipient = state
.auth
.get_recipient(autorization.map(|a| a.0.token().to_string()))
.await?;

let response = state
.discovery
.list_schema_tables(request, recipient)
.await?;

Ok(Json(response))
}

pub fn get_router<T: Send + Clone + 'static>(state: DeltaSharingState<T>) -> Router {
Router::new()
.route("/shares", get(list_shares))
.route("/shares/:share", get(get_share))
.route("/shares/:share/schemas", get(list_schemas))
.route("/shares/:share/all-tables", get(list_share_tables))
.route(
"/shares/:share/schemas/:schema/tables",
get(list_schema_tables),
)
.with_state(state)
}

Expand All @@ -82,13 +162,16 @@ mod tests {
use super::*;
use crate::tests::test_handler;

#[tokio::test]
async fn test_list_shares() {
let state = DeltaSharingState {
fn get_state() -> DeltaSharingState<()> {
DeltaSharingState {
discovery: Arc::new(test_handler()),
auth: Arc::new(VoidRecipientHandler {}),
};
let app = get_router(state);
}
}

#[tokio::test]
async fn test_list_shares() {
let app = get_router(get_state());

let request = Request::builder()
.uri("/shares")
Expand All @@ -109,11 +192,7 @@ mod tests {

#[tokio::test]
async fn test_get_share() {
let state = DeltaSharingState {
discovery: Arc::new(test_handler()),
auth: Arc::new(VoidRecipientHandler {}),
};
let app = get_router(state);
let app = get_router(get_state());

let request = Request::builder()
.uri("/shares/share1")
Expand All @@ -134,11 +213,7 @@ mod tests {

#[tokio::test]
async fn test_get_share_not_found() {
let state = DeltaSharingState {
discovery: Arc::new(test_handler()),
auth: Arc::new(VoidRecipientHandler {}),
};
let app = get_router(state);
let app = get_router(get_state());

let request: Request<Body> = Request::builder()
.uri("/shares/nonexistent")
Expand All @@ -152,4 +227,118 @@ mod tests {
let response = app.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}

#[tokio::test]
async fn test_list_schemas() {
let app = get_router(get_state());

let request = Request::builder()
.uri("/shares/share1/schemas")
.header(
header::AUTHORIZATION,
HeaderValue::from_str("Bearer token").unwrap(),
)
.body(Body::empty())
.unwrap();

let response = app.oneshot(request).await.unwrap();
assert!(response.status().is_success());

let body = response.into_body().collect().await.unwrap().to_bytes();
let result = serde_json::from_slice::<ListSchemasResponse>(&body).unwrap();
assert_eq!(result.items.len(), 1);
}

#[tokio::test]
async fn test_list_schemas_not_found() {
let app = get_router(get_state());

let request: Request<Body> = Request::builder()
.uri("/shares/nonexistent/schemas")
.header(
header::AUTHORIZATION,
HeaderValue::from_str("Bearer token").unwrap(),
)
.body(Body::empty())
.unwrap();

let response = app.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}

#[tokio::test]
async fn test_list_share_tables() {
let app = get_router(get_state());

let request = Request::builder()
.uri("/shares/share1/all-tables")
.header(
header::AUTHORIZATION,
HeaderValue::from_str("Bearer token").unwrap(),
)
.body(Body::empty())
.unwrap();

let response = app.oneshot(request).await.unwrap();
assert!(response.status().is_success());

let body = response.into_body().collect().await.unwrap().to_bytes();
let result = serde_json::from_slice::<ListShareTablesResponse>(&body).unwrap();
assert_eq!(result.items.len(), 1);
}

#[tokio::test]
async fn test_list_share_tables_not_found() {
let app = get_router(get_state());

let request: Request<Body> = Request::builder()
.uri("/shares/nonexistent/all-tables")
.header(
header::AUTHORIZATION,
HeaderValue::from_str("Bearer token").unwrap(),
)
.body(Body::empty())
.unwrap();

let response = app.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}

#[tokio::test]
async fn test_list_schema_tables() {
let app = get_router(get_state());

let request = Request::builder()
.uri("/shares/share1/schemas/schema1/tables")
.header(
header::AUTHORIZATION,
HeaderValue::from_str("Bearer token").unwrap(),
)
.body(Body::empty())
.unwrap();

let response = app.oneshot(request).await.unwrap();
assert!(response.status().is_success());

let body = response.into_body().collect().await.unwrap().to_bytes();
let result = serde_json::from_slice::<ListSchemaTablesResponse>(&body).unwrap();
assert_eq!(result.items.len(), 1);
}

#[tokio::test]
async fn test_list_schema_tables_not_found() {
let app = get_router(get_state());

let request: Request<Body> = Request::builder()
.uri("/shares/share1/schemas/nonexistent/tables")
.header(
header::AUTHORIZATION,
HeaderValue::from_str("Bearer token").unwrap(),
)
.body(Body::empty())
.unwrap();

let response = app.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
}

0 comments on commit 95e4a8f

Please sign in to comment.