Skip to content

Commit

Permalink
feat: postgres support not ready obvi
Browse files Browse the repository at this point in the history
postgres changes to almost sync successfully
  • Loading branch information
dav1do committed Apr 3, 2024
1 parent b1d119c commit b0961bc
Show file tree
Hide file tree
Showing 46 changed files with 1,380 additions and 1,114 deletions.
983 changes: 493 additions & 490 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ serde_qs = "0.10.1"
serde_with = "2.1"
sha2 = { version = "0.10", default-features = false }
smallvec = "1.10"
sqlx = { version = "0.7", features = ["sqlite", "runtime-tokio"] }
sqlx = { version = "0.7", features = ["sqlite", "postgres", "runtime-tokio"] }
ssh-key = { version = "0.5.1", default-features = false }
ssi = { version = "0.7", features = ["ed25519"] }
swagger = { version = "6.1", features = [
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM public.ecr.aws/r5b3e0r5/3box/rust-builder:latest as builder
FROM rust-builder:latest as builder

RUN mkdir -p /home/builder/rust-ceramic
WORKDIR /home/builder/rust-ceramic
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Therefore may be useful in ensuring a change
# is ready to pass CI checks.

RUSTFLAGS = -D warnings --cfg tokio_unstable
RUSTFLAGS = --cfg tokio_unstable
CARGO = RUSTFLAGS='${RUSTFLAGS}' cargo

RELEASE_LEVEL ?= minor
Expand Down Expand Up @@ -57,10 +57,12 @@ check-kubo-rpc-server:
.PHONY: check-queries
check-queries:
./ci-scripts/check_queries.sh "sqlite"
./ci-scripts/check_queries.sh "postgres"

.PHONY: check-queries-ci
check-queries-ci:
CI_RUN=1 ./ci-scripts/check_queries.sh "sqlite"
CI_RUN=1 ./ci-scripts/check_queries.sh "postgres"

.PHONY: release
release:
Expand Down
4 changes: 2 additions & 2 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl TryFrom<models::Interest> for ValidatedInterest {
}

#[async_trait]
pub trait AccessInterestStore: Clone + Send + Sync {
pub trait AccessInterestStore: Send + Sync {
type Key: Key;
type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>;

Expand All @@ -131,7 +131,7 @@ pub trait AccessInterestStore: Clone + Send + Sync {
}

#[async_trait]
pub trait AccessModelStore: Clone + Send + Sync {
pub trait AccessModelStore: Send + Sync {
type Key: Key;
type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>;

Expand Down
11 changes: 9 additions & 2 deletions ci-scripts/check_queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,15 @@ cargo install sqlx-cli
# case statement to check if the database is sqlite or postgres
case $DATABASE in
postgres*)
echo "Using postgres is not yet supported"
exit 1
echo "Using postgres"
docker run --name ceramic-pg -e POSTGRES_DB=ceramic -e POSTGRES_PASSWORD=c3ram1c -p 5432:5432 -d postgres:16
absolute_db_path="postgresql://postgres:c3ram1c@localhost:5432/ceramic"
absolute_migrations="$(pwd)/migrations/postgres"
prepare_database $absolute_db_path $absolute_migrations
if [ "$CI_RUN" ]; then
docker stop ceramic-pg
docker rm ceramic-pg
fi
;;
*)
echo "Using sqlite"
Expand Down
2 changes: 2 additions & 0 deletions migrations/postgres/20240321211121_block.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS "block";
9 changes: 9 additions & 0 deletions migrations/postgres/20240321211121_block.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Add up migration script here

CREATE TABLE IF NOT EXISTS "block" (
multihash BYTEA NOT NULL,
bytes BYTEA NOT NULL,
PRIMARY KEY(multihash)
);

SELECT multihash, bytes FROM "block" where false;
3 changes: 3 additions & 0 deletions migrations/postgres/20240321211303_interest.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Add down migration script here

DROP TABLE IF EXISTS interest;
16 changes: 16 additions & 0 deletions migrations/postgres/20240321211303_interest.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Add up migration script here

CREATE TABLE IF NOT EXISTS interest (
order_key BYTEA NOT NULL, -- network_id sort_value controller StreamID event_cid
ahash_0 BIGINT NOT NULL CHECK (ahash_0 >= 0 AND ahash_0 < '4294967296'::BIGINT), -- the ahash is decomposed as [u32; 8]
ahash_1 BIGINT NOT NULL CHECK (ahash_1 >= 0 AND ahash_1 < '4294967296'::BIGINT),
ahash_2 BIGINT NOT NULL CHECK (ahash_2 >= 0 AND ahash_2 < '4294967296'::BIGINT),
ahash_3 BIGINT NOT NULL CHECK (ahash_3 >= 0 AND ahash_3 < '4294967296'::BIGINT),
ahash_4 BIGINT NOT NULL CHECK (ahash_4 >= 0 AND ahash_4 < '4294967296'::BIGINT),
ahash_5 BIGINT NOT NULL CHECK (ahash_5 >= 0 AND ahash_5 < '4294967296'::BIGINT),
ahash_6 BIGINT NOT NULL CHECK (ahash_6 >= 0 AND ahash_6 < '4294967296'::BIGINT),
ahash_7 BIGINT NOT NULL CHECK (ahash_7 >= 0 AND ahash_7 < '4294967296'::BIGINT),
PRIMARY KEY(order_key)
);

SELECT order_key, ahash_0, ahash_1, ahash_2, ahash_3, ahash_4, ahash_5, ahash_6, ahash_7 FROM interest WHERE false;
3 changes: 3 additions & 0 deletions migrations/postgres/20240321211311_event.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Add down migration script here

DROP TABLE IF EXISTS "event";
19 changes: 19 additions & 0 deletions migrations/postgres/20240321211311_event.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Add up migration script here

CREATE TABLE IF NOT EXISTS "event" (
order_key BYTEA NOT NULL UNIQUE, -- network_id sep_key sep_value controller stream_id event_cid
ahash_0 BIGINT NOT NULL CHECK (ahash_0 >= 0 AND ahash_0 < '4294967296'::BIGINT), -- the ahash is decomposed as [u32; 8]
ahash_1 BIGINT NOT NULL CHECK (ahash_1 >= 0 AND ahash_1 < '4294967296'::BIGINT),
ahash_2 BIGINT NOT NULL CHECK (ahash_2 >= 0 AND ahash_2 < '4294967296'::BIGINT),
ahash_3 BIGINT NOT NULL CHECK (ahash_3 >= 0 AND ahash_3 < '4294967296'::BIGINT),
ahash_4 BIGINT NOT NULL CHECK (ahash_4 >= 0 AND ahash_4 < '4294967296'::BIGINT),
ahash_5 BIGINT NOT NULL CHECK (ahash_5 >= 0 AND ahash_5 < '4294967296'::BIGINT),
ahash_6 BIGINT NOT NULL CHECK (ahash_6 >= 0 AND ahash_6 < '4294967296'::BIGINT),
ahash_7 BIGINT NOT NULL CHECK (ahash_7 >= 0 AND ahash_7 < '4294967296'::BIGINT),
cid BYTEA NOT NULL, -- the cid of the event as bytes no 0x00 prefix
discovered TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
delivered BIGINT UNIQUE, -- monotonic increasing counter indicating this can be delivered to clients
PRIMARY KEY(cid)
);

SELECT order_key, ahash_0, ahash_1, ahash_2, ahash_3, ahash_4, ahash_5, ahash_6, ahash_7, cid, discovered, delivered FROM "event" WHERE false;
2 changes: 2 additions & 0 deletions migrations/postgres/20240321211325_event_block.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS event_block;
15 changes: 15 additions & 0 deletions migrations/postgres/20240321211325_event_block.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS event_block (
event_cid BYTEA NOT NULL,
block_multihash BYTEA NOT NULL,
codec BIGINT NOT NULL, -- the codec of the block
idx INTEGER NOT NULL, -- the index of the block in the CAR file
"root" BOOL NOT NULL, -- when true the block is a root in the CAR file
PRIMARY KEY(event_cid, block_multihash),
foreign KEY(event_cid) references event(cid),
foreign KEY(block_multihash) references block(multihash)
);

CREATE INDEX IF NOT EXISTS idx_event_block_multihash ON event_block (block_multihash);

SELECT event_cid, block_multihash, codec, idx, "root" FROM event_block WHERE false;
2 changes: 2 additions & 0 deletions migrations/postgres/20240329005642_root.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS "root";
10 changes: 10 additions & 0 deletions migrations/postgres/20240329005642_root.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS "root" (
tx_hash BYTEA NOT NULL,
"root" BYTEA NOT NULL,
block_hash TEXT NOT NULL,
"timestamp" BIGINT NOT NULL,
PRIMARY KEY(tx_hash)
);

SELECT tx_hash, "root", block_hash, "timestamp" FROM "root" WHERE false;
2 changes: 1 addition & 1 deletion one/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ tokio.workspace = true
tracing.workspace = true
enum-as-inner = "0.6.0"


[features]
default = []
tokio-console = ["ceramic-metrics/tokio-console"]

[dev-dependencies]
expect-test.workspace = true
tracing-test.workspace = true
30 changes: 30 additions & 0 deletions one/src/# pip install multiformats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# pip install multiformats
# pip install cbor2
import multiformats
import cbor2
import sys

# # usage
# > cat something.car | python3 car_view.py

# CAR file format
# | length | Header |
# | length | CID | block |
# ...
# | length | CID | block |

car_bytes = sys.stdin.buffer.read() # read the car file from stdin
length, _, rest = multiformats.varint.decode_raw(car_bytes) # file starts with the header length varint
head = cbor2.loads(rest[:length]) # read the header CBOR object {'roots': [CID], 'version': 1}
rest = rest[length:] # move up the pointer
print(head)

while rest:
# | length | CID | block |
length, _, rest = multiformats.varint.decode_raw(rest) # read the block length varint
cid_and_block = rest[:length]
cid = multiformats.CID.decode(cid_and_block[:36]) # CID first 36 bytes
block = cid_and_block[36:] # block rest bytes
obj = cbor2.loads(block) if cid.codec.name == "dag-cbor" else block
rest = rest[length:] # move up the pointer
print(cid.encode(base="base16"), obj)
26 changes: 17 additions & 9 deletions one/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ async fn slurp(opts: SlurpOpts) -> Result<()> {
output_ceramic_path.display()
);

let pool = SqlitePool::connect(output_ceramic_path, Migrations::Apply)
let _pool = SqlitePool::connect(output_ceramic_path, Migrations::Apply)
.await
.context("Failed to connect to database")?;

let pool =
ceramic_store::PostgresPool::connect(&crate::pg_url(), ceramic_store::Migrations::Apply)
.await?;
let block_store = EventStore::new(pool).await.unwrap();

if let Some(input_ceramic_db) = opts.input_ceramic_db {
Expand All @@ -89,9 +93,14 @@ async fn slurp(opts: SlurpOpts) -> Result<()> {
}

async fn validate(opts: ValidateOpts) -> Result<()> {
let pool = SqlitePool::from_store_dir(opts.store_dir, Migrations::Apply)
let _pool = SqlitePool::from_store_dir(opts.store_dir, Migrations::Apply)
.await
.context("Failed to connect to database")?;

let pool =
ceramic_store::PostgresPool::connect(&crate::pg_url(), ceramic_store::Migrations::Apply)
.await?;

let block_store = EventStore::new(pool.clone())
.await
.with_context(|| "Failed to create block store")?;
Expand Down Expand Up @@ -404,6 +413,7 @@ async fn migrate_from_database(
mod tests {
use super::*;
use crate::ethereum_rpc::EthRpc;
use ceramic_store::PostgresPool;
use multihash::{Code, MultihashDigest};

struct HardCodedEthRpc {}
Expand All @@ -428,10 +438,11 @@ mod tests {
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_validate_time_event() {
// todo: add a negative test.
// Create an in-memory SQLite pool
let pool = SqlitePool::connect_in_memory().await.unwrap();
let pool = PostgresPool::connect_in_memory().await.unwrap();

// Create a new SQLiteBlockStore and SQLiteRootStore
let block_store = EventStore::new(pool.clone()).await.unwrap();
Expand Down Expand Up @@ -531,18 +542,15 @@ mod tests {
37675557714b4355593761653675574e7871596764775068554a624a6846
394546586d39",
];
let mut tx = block_store.begin_tx().await.unwrap();
// let mut tx = block_store.begin_tx().await.unwrap();
for block in blocks {
// Strip whitespace and decode the block from hex
let block = hex::decode(block.replace(['\n', ' '], "")).unwrap();
// Create the CID and store the block.
let hash = Code::Sha2_256.digest(block.as_slice());
block_store
.put_block_tx(&hash, &block.into(), &mut tx)
.await
.unwrap();
block_store.put_block(&hash, &block.into()).await.unwrap();
}
block_store.commit_tx(tx).await.unwrap();
// block_store.commit_tx(tx).await.unwrap();

assert_eq!(
validate_time_event(
Expand Down
28 changes: 27 additions & 1 deletion one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,28 @@ pub async fn run() -> Result<()> {
}
}

fn pg_url() -> String {
let conn = std::env::var("DATABASE_URL")
.ok()
.or_else(|| {
let db = std::env::var("POSTGRES_DB").ok();
let host = std::env::var("POSTGRES_HOST").ok();
let user = std::env::var("POSTGRES_USER").ok();
let password = std::env::var("POSTGRES_PASSWORD").ok();
match (db, host, user, password) {
(Some(db), Some(host), Some(user), Some(password)) => {
Some(format!("postgres://{}:{}@{}/{}", user, password, host, db))
}
_ => None,
}
})
.unwrap_or_else(|| "postgres://postgres:c3ram1c@localhost:5432/ceramic".to_string());

// TODO: don't log the password
info!("Connecting to postgres: {}", conn);
conn
}

type InterestStore = ceramic_store::InterestStore<Sha256a>;
type InterestInterest = FullInterests<Interest>;
type ReconInterest = Server<Interest, Sha256a, MetricsStore<InterestStore>, InterestInterest>;
Expand Down Expand Up @@ -424,10 +446,14 @@ impl Daemon {

// Connect to sqlite
let sql_db_path: PathBuf = dir.join("db.sqlite3");
let sql_pool =
let _sql_pool =
ceramic_store::SqlitePool::connect(&sql_db_path, ceramic_store::Migrations::Apply)
.await?;

let sql_pool =
ceramic_store::PostgresPool::connect(&pg_url(), ceramic_store::Migrations::Apply)
.await?;

// Create recon metrics
let recon_metrics = ceramic_metrics::MetricsHandle::register(recon::Metrics::register);
let store_metrics =
Expand Down
4 changes: 2 additions & 2 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ mod tests {

use async_trait::async_trait;
use ceramic_core::RangeOpen;
use ceramic_store::SqlitePool;
use ceramic_store::{PostgresPool, SqlitePool};
use futures::TryStreamExt;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
Expand Down Expand Up @@ -1382,7 +1382,7 @@ mod tests {
let peer_id = PeerId::from(libp2p_keypair.public());

// Using an in memory DB for the tests for realistic benchmark disk DB is needed.
let sql_pool = SqlitePool::connect_in_memory().await.unwrap();
let sql_pool = PostgresPool::connect_in_memory().await.unwrap();

let metrics = Metrics::register(&mut prometheus_client::registry::Registry::default());
let mut p2p = Node::new(
Expand Down
6 changes: 3 additions & 3 deletions recon/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ where
message = stream.try_next() => {
if let Some(message) = message? {
self.metrics.record(&MessageRecv(&message));
self.handle_incoming(message).await.context("handle incoming")?;
self.handle_incoming(message).await.context("handle incoming message")?;
}
}
// Send want value requests
Expand Down Expand Up @@ -479,7 +479,7 @@ where
}

async fn handle_incoming(&mut self, message: Self::In) -> Result<RemoteStatus> {
trace!(?message, "handle_incoming");
trace!(?message, "handle_incoming initiator");
match message {
ResponderMessage::InterestResponse(interests) => {
let mut ranges = Vec::with_capacity(interests.len());
Expand Down Expand Up @@ -651,7 +651,7 @@ where
}

async fn handle_incoming(&mut self, message: Self::In) -> Result<RemoteStatus> {
trace!(?message, "handle_incoming");
trace!(?message, "handle_incoming responder");
match message {
InitiatorMessage::InterestRequest(interests) => {
let ranges = self.common.recon.process_interests(interests).await?;
Expand Down
Loading

0 comments on commit b0961bc

Please sign in to comment.