Skip to content

Commit

Permalink
feat: stream mode for chain-server
Browse files Browse the repository at this point in the history
Implements the stream mode for the chain-server, which outputs
compressed proofs about the entire stream history while using previous
recursive proofs as checkpoints.

Extra: simplify collection of emmited data for `assert-emitted` to avoid
concurrency issues
  • Loading branch information
arthurpaulino committed Mar 22, 2024
1 parent 8bce9f7 commit 1e0df96
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 123 deletions.
3 changes: 1 addition & 2 deletions chain-server/proto/chain-server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ service ChainProver {
message ConfigRequest {}

message ConfigResponse {
uint64 rc = 1;
bytes callable = 2;
bytes config_response_data = 1;
}

message ChainRequest {
Expand Down
34 changes: 22 additions & 12 deletions chain-server/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
use tonic::Request;

use lurk::{
cli::field_data::{de, ser, LurkData},
cli::field_data::{de, ser},
lang::{Coproc, Lang},
lem::store::Store,
proof::{
Expand All @@ -34,7 +34,7 @@ use chain_prover::{
ConfigResponse,
};

use chain_server::{ChainRequestData, ChainResponseData};
use chain_server::{ChainRequestData, ChainResponseData, ConfigResponseData};

fn verify(
proof: &CompressedSNARK<E1<Fr>, SS1<Fr>, SS2<Fr>>,
Expand All @@ -54,18 +54,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let port = env::args().collect::<Vec<_>>()[1].parse::<u16>()?;
let mut client = ChainProverClient::connect(format!("http://127.0.0.1:{port}")).await?;

let ConfigResponse { rc, callable } = client
let ConfigResponse {
config_response_data,
} = client
.config(Request::new(ConfigRequest {}))
.await?
.into_inner();
let rc = usize::try_from(rc)?;
let config_response_data = de::<ConfigResponseData<Fr>>(&config_response_data)?;
let rc = config_response_data.get_rc();

let store = Store::<Fr>::default();
let mut callable = de::<LurkData<Fr>>(&callable).and_then(|ld| ld.interned(&store))?;

let (mut callable, stream_init_callable) = config_response_data.interned(&store)?;

let empty_env = store.intern_empty_env();
let cont_outermost = store.cont_outermost();
let cont_terminal = store.cont_terminal();

let (cont_in, cont_out) = if stream_init_callable.is_some() {
(store.cont_stream_start(), store.cont_stream_pause())
} else {
(store.cont_outermost(), store.cont_terminal())
};

let instance = Instance::new(
rc,
Expand All @@ -89,9 +97,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} = client.chain(request).await?.into_inner();
let chain_response_data = de::<ChainResponseData<Fr>>(&chain_response_data)?;
let (result, next_callable) = chain_response_data.interned(&store)?;
let proof = chain_response_data.extract_proof();
let proof = chain_response_data.get_proof();

let expr_in =
stream_init_callable.unwrap_or_else(|| store.list([callable, argument]));

let expr_in = store.list([callable, argument]);
let expr_out = store.cons(result, next_callable);

print!(
Expand All @@ -101,9 +111,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
stdout.flush()?;

let public_inputs = store.to_scalar_vector(&[expr_in, empty_env, cont_outermost]);
let public_outputs = store.to_scalar_vector(&[expr_out, empty_env, cont_terminal]);
if verify(&proof, &pp, &public_inputs, &public_outputs)? {
let public_inputs = store.to_scalar_vector(&[expr_in, empty_env, cont_in]);
let public_outputs = store.to_scalar_vector(&[expr_out, empty_env, cont_out]);
if verify(proof, &pp, &public_inputs, &public_outputs)? {
println!(" ✓");
} else {
println!(" ✗\nServer's proof didn't verify!");
Expand Down
56 changes: 54 additions & 2 deletions chain-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,57 @@ use lurk::{
},
};

#[derive(Serialize, Deserialize)]
pub struct ConfigResponseData<F: LurkField> {
rc: usize,
callable: ZPtr<F>,
stream_init_callable: Option<ZPtr<F>>,
z_dag: ZDag<F>,
}

impl<F: LurkField> ConfigResponseData<F> {
pub fn new(
rc: usize,
callable: &Ptr,
stream_init_callable: Option<&Ptr>,
store: &Store<F>,
) -> Self {
let mut z_dag = ZDag::default();
let cache = &mut HashMap::default();
let callable = z_dag.populate_with(callable, store, cache);
let stream_init_callable =
stream_init_callable.map(|x| z_dag.populate_with(x, store, cache));
Self {
rc,
callable,
stream_init_callable,
z_dag,
}
}

pub fn interned(&self, store: &Store<F>) -> Result<(Ptr, Option<Ptr>)> {
let cache = &mut HashMap::default();
let callable = self.z_dag.populate_store(&self.callable, store, cache)?;
let stream_init_callable = if let Some(z_ptr) = &self.stream_init_callable {
Some(self.z_dag.populate_store(z_ptr, store, cache)?)
} else {
None
};
Ok((callable, stream_init_callable))
}

#[inline]
pub fn get_rc(&self) -> usize {
self.rc
}
}

impl<F: LurkField> HasFieldModulus for ConfigResponseData<F> {
fn field_modulus() -> String {
F::MODULUS.to_string()
}
}

#[derive(Serialize, Deserialize)]
pub struct ChainRequestData<F: LurkField> {
callable: ZPtr<F>,
Expand Down Expand Up @@ -86,8 +137,9 @@ impl<F: CurveCycleEquipped> ChainResponseData<F> {
Ok((result, next_callable))
}

pub fn extract_proof(self) -> CompressedSNARK<E1<F>, SS1<F>, SS2<F>> {
self.proof
#[inline]
pub fn get_proof(&self) -> &CompressedSNARK<E1<F>, SS1<F>, SS2<F>> {
&self.proof
}
}

Expand Down
Loading

0 comments on commit 1e0df96

Please sign in to comment.