Skip to content

Commit

Permalink
Merge pull request #2 from mbrignone/quiz_model
Browse files Browse the repository at this point in the history
Quiz model
  • Loading branch information
matinone authored Nov 22, 2023
2 parents 02b1961 + a8a9fd3 commit e265aa6
Show file tree
Hide file tree
Showing 27 changed files with 1,609 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,6 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# SQLite database file
*.db
Empty file added app/api/__init__.py
Empty file.
7 changes: 7 additions & 0 deletions app/api/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from fastapi import APIRouter

from app.api.endpoints import question, quiz

api_router = APIRouter()
api_router.include_router(quiz.router)
api_router.include_router(question.router)
Empty file added app/api/endpoints/__init__.py
Empty file.
66 changes: 66 additions & 0 deletions app/api/endpoints/question.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from typing import Annotated, Any

from fastapi import APIRouter, Depends, HTTPException, status

import app.models as models
import app.schemas as schemas
from app.models.database import AsyncSessionDep

router = APIRouter(prefix="/questions", tags=["question"])


async def get_question_from_id(
question_id: int, db: AsyncSessionDep
) -> models.Question:
question = await models.Question.get(db=db, id=question_id)
if not question:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Question not found"
)

return question


@router.get(
"/{question_id}",
response_model=schemas.QuestionReturn,
status_code=status.HTTP_200_OK,
summary="Get question by id",
response_description="The requested question (if it exists)",
)
async def get_question(
question: Annotated[models.Question, Depends(get_question_from_id)]
) -> Any:
return question


@router.put(
"/{question_id}",
response_model=schemas.QuestionReturn,
status_code=status.HTTP_200_OK,
summary="Update question by id",
response_description="The updated question (if it exists)",
)
async def update_question(
update_data: schemas.QuestionUpdate,
question: Annotated[models.Question, Depends(get_question_from_id)],
db: AsyncSessionDep,
) -> Any:
updated_question = await models.Question.update(
db=db, current=question, new=update_data
)

return updated_question


@router.delete(
"/{question_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete quiz by id",
)
async def delete_question(
question: Annotated[models.Question, Depends(get_question_from_id)],
db: AsyncSessionDep,
) -> None:
await models.Question.delete(db=db, db_obj=question)
# body will be empty when using status code 204
125 changes: 125 additions & 0 deletions app/api/endpoints/quiz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from typing import Annotated, Any

from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.exc import IntegrityError

import app.models as models
import app.schemas as schemas
from app.models.database import AsyncSessionDep

router = APIRouter(prefix="/quiz", tags=["quiz"])


async def get_quiz_from_id(quiz_id: int, db: AsyncSessionDep) -> models.Quiz:
quiz = await models.Quiz.get(db=db, id=quiz_id)
if not quiz:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Quiz not found"
)

return quiz


@router.post(
"",
response_model=schemas.QuizReturn,
status_code=status.HTTP_201_CREATED,
summary="Create a new quiz",
response_description="The new created quiz",
)
async def create_quiz(db: AsyncSessionDep, quiz: schemas.QuizCreate) -> Any:
new_quiz = await models.Quiz.create(db=db, quiz=quiz)
return new_quiz


@router.get(
"",
response_model=list[schemas.QuizReturn],
status_code=status.HTTP_200_OK,
summary="Get available quizzes sorted by creation date",
response_description="The list of quizzes",
)
async def get_quizzes(
db: AsyncSessionDep,
offset: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=0)] = 25,
) -> Any:
quizzes = await models.Quiz.get_multiple(offset=offset, limit=limit, db=db)
return quizzes


@router.get(
"/{quiz_id}",
response_model=schemas.QuizReturn,
status_code=status.HTTP_200_OK,
summary="Get quiz by id",
response_description="The requested quiz (if it exists)",
)
async def get_quiz(quiz: Annotated[models.Quiz, Depends(get_quiz_from_id)]) -> Any:
return quiz


@router.put(
"/{quiz_id}",
response_model=schemas.QuizReturn,
status_code=status.HTTP_200_OK,
summary="Update quiz by id",
response_description="The updated quiz (if it exists)",
)
async def update_quiz(
update_data: schemas.QuizUpdate,
quiz: Annotated[models.Quiz, Depends(get_quiz_from_id)],
db: AsyncSessionDep,
) -> Any:
updated_quiz = await models.Quiz.update(db=db, current=quiz, new=update_data)

return updated_quiz


@router.delete(
"/{quiz_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete quiz by id",
)
async def delete_quiz(
quiz: Annotated[models.Quiz, Depends(get_quiz_from_id)],
db: AsyncSessionDep,
) -> None:
await models.Quiz.delete(db=db, db_obj=quiz)
# body will be empty when using status code 204


@router.post(
"/{quiz_id}/questions",
response_model=schemas.QuestionReturn,
status_code=status.HTTP_201_CREATED,
summary="Create a question associated to the quiz",
response_description="The created question",
)
async def create_question_for_quiz(
quiz_id: int, question: schemas.QuestionCreate, db: AsyncSessionDep
) -> Any:
question.quiz_id = quiz_id
try:
new_question = await models.Question.create(db=db, question=question)
except IntegrityError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Quiz not found"
) from exc

return new_question


@router.get(
"/{quiz_id}/questions",
response_model=list[schemas.QuestionReturn],
status_code=status.HTTP_200_OK,
summary="Get all questions associated to the quiz",
response_description="The list of questions associated to the quiz",
)
async def get_all_questions_from_quiz(
quiz_id: int,
quiz: Annotated[models.Quiz, Depends(get_quiz_from_id)],
db: AsyncSessionDep,
) -> Any:
return await quiz.awaitable_attrs.questions
Empty file added app/core/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions app/core/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from functools import lru_cache

from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
APP_NAME: str = "Quiz App"
ENVIRONMENT: str = "dev"
DB_URL: str = "sqlite+aiosqlite:///./sqlite_dev.db"
ECHO_SQL: bool = False
USE_ALEMBIC: bool = False

model_config = SettingsConfigDict(env_file=".env")


@lru_cache
def get_settings() -> Settings:
return Settings()
18 changes: 14 additions & 4 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
from fastapi import FastAPI

app = FastAPI()
from app.api.api import api_router
from app.core.settings import get_settings
from app.models.database import init_db

app = FastAPI(title="Quiz App")
app.include_router(api_router, prefix="/api")

@app.get("/")
async def root():
return {"message": "Hello World"}

if __name__ == "__main__":
import asyncio # noqa: I001
import uvicorn

if get_settings().ENVIRONMENT != "prod":
asyncio.run(init_db())

uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)
4 changes: 4 additions & 0 deletions app/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# ruff: noqa: F401 (unused imports)

from .question import Question
from .quiz import Quiz
98 changes: 98 additions & 0 deletions app/models/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from typing import Annotated, Any, AsyncGenerator, Self

from fastapi import Depends
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import (
AsyncAttrs,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql import func

from app.core.settings import get_settings


class Base(AsyncAttrs, DeclarativeBase):
"""
This Base class also defines common methods for CRUD operations.
AsyncAttrs is used because of:
https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#asyncio-orm-avoid-lazyloads
"""

id: Mapped[int] = mapped_column(primary_key=True)

@classmethod
async def get(cls, db: AsyncSession, id: int) -> Self | None:
result = await db.execute(select(cls).where(cls.id == id))
return result.scalar()

@classmethod
async def update(
cls, db: AsyncSession, current: Self, new: BaseModel | dict[str, Any]
) -> Self:
if isinstance(new, dict):
update_data = new
else:
# exclude_unset=True to avoid updating to default values
update_data = new.model_dump(exclude_unset=True)

current_data = jsonable_encoder(current)
for field in current_data:
if field in update_data:
setattr(current, field, update_data[field])

if hasattr(current, "updated_at"):
current.updated_at = func.now()

db.add(current)
await db.commit()
await db.refresh(current)

return current

@classmethod
async def delete(cls, db: AsyncSession, db_obj) -> Self:
await db.delete(db_obj)
await db.commit()

return db_obj

@classmethod
async def delete_by_id(cls, db: AsyncSession, id: int) -> Self | None:
db_obj = await cls.get(db, id)
if db_obj:
await db.delete(db_obj)
await db.commit()

return db_obj


settings = get_settings()
async_engine = create_async_engine(
settings.DB_URL, pool_pre_ping=True, echo=settings.ECHO_SQL
)
AsyncSessionLocal = async_sessionmaker(bind=async_engine, autoflush=False, future=True)


async def init_db():
if not settings.USE_ALEMBIC:
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)


async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""
Dependency to create/close a new session per request, making sure that the session
is always closed even if there is an error (thanks to the async context manager).
"""
async with AsyncSessionLocal() as session:
yield session


# type alias for the database session
AsyncSessionDep = Annotated[AsyncSession, Depends(get_session)]
44 changes: 44 additions & 0 deletions app/models/question.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from datetime import datetime
from typing import TYPE_CHECKING, Self

from sqlalchemy import DateTime, ForeignKey, Integer, String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func

from app.models.database import Base
from app.schemas import QuestionCreate

if TYPE_CHECKING:
from app.models.quiz import Quiz


class Question(Base):
__tablename__ = "questions"

quiz_id: Mapped[int] = mapped_column(ForeignKey("quizzes.id"))
content: Mapped[str] = mapped_column(String(256), nullable=False)
type: Mapped[str] = mapped_column(String(64), nullable=False)
points: Mapped[int] = mapped_column(Integer)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True))

# have to use "Quiz" to avoid circular dependencies
quiz: Mapped["Quiz"] = relationship("Quiz", back_populates="questions")

@classmethod
async def create(cls, db: AsyncSession, question: QuestionCreate) -> Self:
new_question = cls(
quiz_id=question.quiz_id,
content=question.content,
type=question.type,
points=question.points,
)
new_question.updated_at = func.now()
db.add(new_question)
await db.commit()
await db.refresh(new_question)

return new_question
Loading

0 comments on commit e265aa6

Please sign in to comment.