Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offer two database options - SQLite and Postgres #309

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2ce0014
generated sqlite code
mkorpela Apr 11, 2024
34c40ba
go install -tags sqlite3 github.com/golang-migrate/migrate/v4/cmd/mig…
mkorpela Apr 11, 2024
f3e4cd3
make everything awaitable
mkorpela Apr 11, 2024
2c6fcde
sqlite
mkorpela Apr 12, 2024
c584c75
listing assistants
mkorpela Apr 12, 2024
6ad76e9
more working
mkorpela Apr 15, 2024
775e1cc
Add Chroma vectorstore.
bakar-io Apr 17, 2024
1df1076
Update checkpointer.
bakar-io Apr 17, 2024
d251e84
Add aiosqlite dependency.
bakar-io Apr 21, 2024
0a29bfb
Separete storage methods for pg and sqlite.
bakar-io Apr 21, 2024
827b96f
Separate postgres and sqlite migrations.
bakar-io Apr 21, 2024
ce0f462
Fix circular import.
bakar-io Apr 22, 2024
326e46c
Prevent concurrent requests from raising UniqueViolationError for pg'…
bakar-io Apr 22, 2024
e078dfa
Merge branch 'langchain-main' into pg-sqlite
bakar-io Apr 22, 2024
755dd15
Format.
bakar-io Apr 22, 2024
5fa0483
poetry lock --no-update.
bakar-io Apr 22, 2024
74b9c2c
Fix SqliteCheckpointer's init.
bakar-io Apr 22, 2024
36dbed8
Include STORAGE_TYPE in the make test command.
bakar-io Apr 22, 2024
5a49941
Use storage settings during tests.
bakar-io Apr 22, 2024
735ce54
Update readme.
bakar-io Apr 22, 2024
5b98719
Temporarily use PGVector for both sqlite and postgres.
bakar-io Apr 30, 2024
c5cc23e
Cleanup.
bakar-io Apr 30, 2024
84543cd
Merge branch 'langchain-main' into pg-sqlite
bakar-io Apr 30, 2024
3fd7d6a
Update public assistants storage method.
bakar-io Apr 30, 2024
edf3d0b
Add a TODO comment for the future.
bakar-io Apr 30, 2024
d5c3327
Minor fixes and cleanup.
bakar-io Apr 30, 2024
b0ff0ca
Create storage setup and teardown methods.
bakar-io May 3, 2024
c1d78c7
Merge branch 'main' into pg-sqlite
bakar-io May 3, 2024
37b2413
poetry lock --no-update
bakar-io May 3, 2024
bf3bd82
Clean up PostgresStorage.
bakar-io May 3, 2024
5e2a9dd
WIP/POC sharing a single sqlite connection across the application.
bakar-io May 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,7 @@ npm-debug.log*
yarn-debug.log*
yarn-error.log*
pnpm-debug.log*


# Local db files
opengpts.db
37 changes: 27 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Because this is open source, if you do not like those architectures or want to m
## Quickstart with Docker

This project supports a Docker-based setup, streamlining installation and execution. It automatically builds images for
the frontend and backend and sets up Postgres using docker-compose.
the frontend and backend and sets up either SQLite or Postgres using docker-compose.


1. **Prerequisites:**
Expand Down Expand Up @@ -71,14 +71,18 @@ the frontend and backend and sets up Postgres using docker-compose.


4. **Run with Docker Compose:**
In the root directory of the project, execute:
In the root directory of the project, execute one of the following commands to start the services:

```
```shell
# For SQLite based setup
docker compose up

# For Postgres based setup
docker compose -f docker-compose.pg.yml up
```

This command builds the Docker images for the frontend and backend from their respective Dockerfiles and starts all
necessary services, including Postgres.
necessary services, including SQLite/Postgres.

5. **Access the Application:**
With the services running, access the frontend at [http://localhost:5173](http://localhost:5173), substituting `5173` with the
Expand All @@ -87,8 +91,12 @@ the frontend and backend and sets up Postgres using docker-compose.

6. **Rebuilding After Changes:**
If you make changes to either the frontend or backend, rebuild the Docker images to reflect these changes. Run:
```
```shell
# For SQLite based setup
docker compose up --build

# For Postgres based setup
docker compose -f docker-compose.pg.yml up --build
```
This command rebuilds the images with your latest changes and restarts the services.

Expand All @@ -115,6 +123,16 @@ pip install poetry
pip install langchain-community
```

### Persistence Layer

The backend supports using SQLite and Postgres for saving agent configurations and chat message history. Set the `STORAGE_TYPE` environment variable to `sqlite` or `postgres`:

```shell
export STORAGE_TYPE=postgres
```

SQLite requires no configuration (apart from [running migrations](####migrations)). The database file will be created in the `backend` directory. However, to configure and use Postgres, follow the instructions below:

**Install Postgres and the Postgres Vector Extension**
```
brew install postgresql pgvector
Expand All @@ -123,8 +141,7 @@ brew services start postgresql

**Configure persistence layer**

The backend uses Postgres for saving agent configurations and chat message history.
In order to use this, you need to set the following environment variables:
Set the following environment variables:

```shell
export POSTGRES_HOST=localhost
Expand All @@ -148,9 +165,9 @@ psql -d opengpts
CREATE ROLE postgres WITH LOGIN SUPERUSER CREATEDB CREATEROLE;
```

**Install Golang Migrate**
#### Migrations

Database migrations are managed with [golang-migrate](https://github.com/golang-migrate/migrate).
Database migrations for both SQLite and Postgres are managed with [golang-migrate](https://github.com/golang-migrate/migrate).

On MacOS, you can install it with `brew install golang-migrate`. Instructions for other OSs or the Golang toolchain,
can be found [here](https://github.com/golang-migrate/migrate/blob/master/cmd/migrate/README.md#installation).
Expand All @@ -160,7 +177,7 @@ Once `golang-migrate` is installed, you can run all the migrations with:
make migrate
```

This will enable the backend to use Postgres as a vector database and create the initial tables.
This will create the initial tables.


**Install backend dependencies**
Expand Down
10 changes: 8 additions & 2 deletions backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ start:
poetry run uvicorn app.server:app --reload --port 8100

migrate:
migrate -database postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@$(POSTGRES_HOST):$(POSTGRES_PORT)/$(POSTGRES_DB)?sslmode=disable -path ./migrations up
ifeq ($(STORAGE_TYPE),postgres)
@echo "Running Postgres migrations..."
migrate -database postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@$(POSTGRES_HOST):$(POSTGRES_PORT)/$(POSTGRES_DB)?sslmode=disable -path ./migrations/postgres up
else
@echo "Running SQLite migrations..."
migrate -database sqlite3://$(PWD)/opengpts.db -path ./migrations/sqlite up
endif

test:
# We need to update handling of env variables for tests
YDC_API_KEY=placeholder OPENAI_API_KEY=placeholder poetry run pytest $(TEST_FILE)
STORAGE_TYPE=postgres YDC_API_KEY=placeholder OPENAI_API_KEY=placeholder poetry run pytest $(TEST_FILE)


test_watch:
Expand Down
4 changes: 2 additions & 2 deletions backend/app/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from app.agent_types.tools_agent import get_tools_agent_executor
from app.agent_types.xml_agent import get_xml_agent_executor
from app.chatbot import get_chatbot_executor
from app.checkpoint import PostgresCheckpoint
from app.checkpoint import Checkpointer
from app.llms import (
get_anthropic_llm,
get_google_llm,
Expand Down Expand Up @@ -73,7 +73,7 @@ class AgentType(str, Enum):

DEFAULT_SYSTEM_MESSAGE = "You are a helpful assistant."

CHECKPOINTER = PostgresCheckpoint(serde=pickle, at=CheckpointAt.END_OF_STEP)
CHECKPOINTER = Checkpointer(serde=pickle, at=CheckpointAt.END_OF_STEP)


def get_agent_executor(
Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/assistants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from fastapi import APIRouter, HTTPException, Path
from pydantic import BaseModel, Field

import app.storage as storage
from app.auth.handlers import AuthedUser
from app.schema import Assistant
from app.storage.storage import storage

router = APIRouter()

Expand Down
6 changes: 3 additions & 3 deletions backend/app/api/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from app.agent import agent
from app.auth.handlers import AuthedUser
from app.storage import get_assistant, get_thread
from app.storage.storage import storage
from app.stream import astream_state, to_sse

router = APIRouter()
Expand All @@ -30,11 +30,11 @@ class CreateRunPayload(BaseModel):


async def _run_input_and_config(payload: CreateRunPayload, user_id: str):
thread = await get_thread(user_id, payload.thread_id)
thread = await storage.get_thread(user_id, payload.thread_id)
if not thread:
raise HTTPException(status_code=404, detail="Thread not found")

assistant = await get_assistant(user_id, str(thread["assistant_id"]))
assistant = await storage.get_assistant(user_id, str(thread["assistant_id"]))
if not assistant:
raise HTTPException(status_code=404, detail="Assistant not found")

Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from langchain.schema.messages import AnyMessage
from pydantic import BaseModel, Field

import app.storage as storage
from app.auth.handlers import AuthedUser
from app.schema import Thread
from app.storage.storage import storage

router = APIRouter()

Expand Down
2 changes: 1 addition & 1 deletion backend/app/auth/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from fastapi import Depends, HTTPException, Request
from fastapi.security.http import HTTPBearer

import app.storage as storage
from app.auth.settings import AuthType, settings
from app.schema import User
from app.storage.storage import storage


class AuthHandler(ABC):
Expand Down
54 changes: 49 additions & 5 deletions backend/app/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from datetime import datetime
from typing import AsyncIterator, Optional

import aiosqlite
from langchain_core.messages import BaseMessage
from langchain_core.runnables import ConfigurableFieldSpec, RunnableConfig
from langgraph.checkpoint import BaseCheckpointSaver
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver
from langgraph.checkpoint.base import (
Checkpoint,
CheckpointAt,
Expand All @@ -13,7 +15,9 @@
SerializerProtocol,
)

from app.lifespan import get_pg_pool
from app.storage.settings import StorageType
from app.storage.settings import settings as storage_settings
from app.storage.storage import storage


def loads(value: bytes) -> Checkpoint:
Expand All @@ -24,7 +28,7 @@ def loads(value: bytes) -> Checkpoint:
return loaded


class PostgresCheckpoint(BaseCheckpointSaver):
class PostgresCheckpointer(BaseCheckpointSaver):
def __init__(
self,
*,
Expand Down Expand Up @@ -54,7 +58,7 @@ def put(self, config: RunnableConfig, checkpoint: Checkpoint) -> RunnableConfig:
raise NotImplementedError

async def alist(self, config: RunnableConfig) -> AsyncIterator[CheckpointTuple]:
async with get_pg_pool().acquire() as db, db.transaction():
async with storage.get_pool().acquire() as db, db.transaction():
thread_id = config["configurable"]["thread_id"]
async for value in db.cursor(
"SELECT checkpoint, thread_ts, parent_ts FROM checkpoints WHERE thread_id = $1 ORDER BY thread_ts DESC",
Expand All @@ -81,7 +85,7 @@ async def alist(self, config: RunnableConfig) -> AsyncIterator[CheckpointTuple]:
async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
thread_id = config["configurable"]["thread_id"]
thread_ts = config["configurable"].get("thread_ts")
async with get_pg_pool().acquire() as conn:
async with storage.get_pool().acquire() as conn:
if thread_ts:
if value := await conn.fetchrow(
"SELECT checkpoint, parent_ts FROM checkpoints WHERE thread_id = $1 AND thread_ts = $2",
Expand Down Expand Up @@ -125,7 +129,7 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:

async def aput(self, config: RunnableConfig, checkpoint: Checkpoint) -> None:
thread_id = config["configurable"]["thread_id"]
async with get_pg_pool().acquire() as conn:
async with storage.get_pool().acquire() as conn:
await conn.execute(
"""
INSERT INTO checkpoints (thread_id, thread_ts, parent_ts, checkpoint)
Expand All @@ -145,3 +149,43 @@ async def aput(self, config: RunnableConfig, checkpoint: Checkpoint) -> None:
"thread_ts": checkpoint["ts"],
}
}


class SqliteCheckpointer(AsyncSqliteSaver):
conn: aiosqlite.Connection = None

def __init__(
self,
*,
serde: Optional[SerializerProtocol] = None,
at: Optional[CheckpointAt] = None,
) -> None:
super().__init__(conn=None, serde=serde, at=at)

@property
def config_specs(self) -> list[ConfigurableFieldSpec]:
return [
ConfigurableFieldSpec(
id="thread_id",
annotation=Optional[str],
name="Thread ID",
description=None,
default=None,
is_shared=True,
),
CheckpointThreadTs,
]

async def setup(self) -> None:
if self.is_setup:
return
self.conn = storage.get_conn()
self.is_setup = True


if storage_settings.storage_type == StorageType.POSTGRES:
Checkpointer = PostgresCheckpointer
elif storage_settings.storage_type == StorageType.SQLITE:
Checkpointer = SqliteCheckpointer
else:
raise NotImplementedError()
35 changes: 3 additions & 32 deletions backend/app/lifespan.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,12 @@
import os
from contextlib import asynccontextmanager

import asyncpg
import orjson
from fastapi import FastAPI

_pg_pool = None


def get_pg_pool() -> asyncpg.pool.Pool:
return _pg_pool


async def _init_connection(conn) -> None:
await conn.set_type_codec(
"json",
encoder=lambda v: orjson.dumps(v).decode(),
decoder=orjson.loads,
schema="pg_catalog",
)
await conn.set_type_codec(
"uuid", encoder=lambda v: str(v), decoder=lambda v: v, schema="pg_catalog"
)
from app.storage.storage import storage


@asynccontextmanager
async def lifespan(app: FastAPI):
global _pg_pool

_pg_pool = await asyncpg.create_pool(
database=os.environ["POSTGRES_DB"],
user=os.environ["POSTGRES_USER"],
password=os.environ["POSTGRES_PASSWORD"],
host=os.environ["POSTGRES_HOST"],
port=os.environ["POSTGRES_PORT"],
init=_init_connection,
)
await storage.setup()
yield
await _pg_pool.close()
_pg_pool = None
await storage.teardown()
2 changes: 1 addition & 1 deletion backend/app/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from fastapi.exceptions import HTTPException
from fastapi.staticfiles import StaticFiles

import app.storage as storage
from app.api import router as api_router
from app.auth.handlers import AuthedUser
from app.lifespan import lifespan
from app.storage.storage import storage
from app.upload import convert_ingestion_input_to_blob, ingest_runnable

logger = logging.getLogger(__name__)
Expand Down
Loading
Loading