diff --git a/chain-server/proto/chain-server.proto b/chain-server/proto/chain-server.proto index 1e8643b8c..3d5ac6624 100644 --- a/chain-server/proto/chain-server.proto +++ b/chain-server/proto/chain-server.proto @@ -10,8 +10,7 @@ service ChainProver { message ConfigRequest {} message ConfigResponse { - uint64 rc = 1; - bytes callable = 2; + bytes config_response_data = 1; } message ChainRequest { diff --git a/chain-server/src/client.rs b/chain-server/src/client.rs index f8720484f..fa4530f49 100644 --- a/chain-server/src/client.rs +++ b/chain-server/src/client.rs @@ -11,7 +11,7 @@ use std::{ use tonic::Request; use lurk::{ - cli::field_data::{de, ser, LurkData}, + cli::field_data::{de, ser}, lang::{Coproc, Lang}, lem::store::Store, proof::{ @@ -34,7 +34,7 @@ use chain_prover::{ ConfigResponse, }; -use chain_server::{ChainRequestData, ChainResponseData}; +use chain_server::{ChainRequestData, ChainResponseData, ConfigResponseData}; fn verify( proof: &CompressedSNARK, SS1, SS2>, @@ -54,18 +54,26 @@ async fn main() -> Result<(), Box> { let port = env::args().collect::>()[1].parse::()?; let mut client = ChainProverClient::connect(format!("http://127.0.0.1:{port}")).await?; - let ConfigResponse { rc, callable } = client + let ConfigResponse { + config_response_data, + } = client .config(Request::new(ConfigRequest {})) .await? .into_inner(); - let rc = usize::try_from(rc)?; + let config_response_data = de::>(&config_response_data)?; + let rc = config_response_data.get_rc(); let store = Store::::default(); - let mut callable = de::>(&callable).and_then(|ld| ld.interned(&store))?; + + let (mut callable, stream_init_callable) = config_response_data.interned(&store)?; let empty_env = store.intern_empty_env(); - let cont_outermost = store.cont_outermost(); - let cont_terminal = store.cont_terminal(); + + let (cont_in, cont_out) = if stream_init_callable.is_some() { + (store.cont_stream_start(), store.cont_stream_pause()) + } else { + (store.cont_outermost(), store.cont_terminal()) + }; let instance = Instance::new( rc, @@ -89,9 +97,11 @@ async fn main() -> Result<(), Box> { } = client.chain(request).await?.into_inner(); let chain_response_data = de::>(&chain_response_data)?; let (result, next_callable) = chain_response_data.interned(&store)?; - let proof = chain_response_data.extract_proof(); + let proof = chain_response_data.get_proof(); + + let expr_in = + stream_init_callable.unwrap_or_else(|| store.list([callable, argument])); - let expr_in = store.list([callable, argument]); let expr_out = store.cons(result, next_callable); print!( @@ -101,9 +111,9 @@ async fn main() -> Result<(), Box> { ); stdout.flush()?; - let public_inputs = store.to_scalar_vector(&[expr_in, empty_env, cont_outermost]); - let public_outputs = store.to_scalar_vector(&[expr_out, empty_env, cont_terminal]); - if verify(&proof, &pp, &public_inputs, &public_outputs)? { + let public_inputs = store.to_scalar_vector(&[expr_in, empty_env, cont_in]); + let public_outputs = store.to_scalar_vector(&[expr_out, empty_env, cont_out]); + if verify(proof, &pp, &public_inputs, &public_outputs)? { println!(" ✓"); } else { println!(" ✗\nServer's proof didn't verify!"); diff --git a/chain-server/src/lib.rs b/chain-server/src/lib.rs index cbdbb4009..4a54851e9 100644 --- a/chain-server/src/lib.rs +++ b/chain-server/src/lib.rs @@ -16,6 +16,57 @@ use lurk::{ }, }; +#[derive(Serialize, Deserialize)] +pub struct ConfigResponseData { + rc: usize, + callable: ZPtr, + stream_init_callable: Option>, + z_dag: ZDag, +} + +impl ConfigResponseData { + pub fn new( + rc: usize, + callable: &Ptr, + stream_init_callable: Option<&Ptr>, + store: &Store, + ) -> Self { + let mut z_dag = ZDag::default(); + let cache = &mut HashMap::default(); + let callable = z_dag.populate_with(callable, store, cache); + let stream_init_callable = + stream_init_callable.map(|x| z_dag.populate_with(x, store, cache)); + Self { + rc, + callable, + stream_init_callable, + z_dag, + } + } + + pub fn interned(&self, store: &Store) -> Result<(Ptr, Option)> { + let cache = &mut HashMap::default(); + let callable = self.z_dag.populate_store(&self.callable, store, cache)?; + let stream_init_callable = if let Some(z_ptr) = &self.stream_init_callable { + Some(self.z_dag.populate_store(z_ptr, store, cache)?) + } else { + None + }; + Ok((callable, stream_init_callable)) + } + + #[inline] + pub fn get_rc(&self) -> usize { + self.rc + } +} + +impl HasFieldModulus for ConfigResponseData { + fn field_modulus() -> String { + F::MODULUS.to_string() + } +} + #[derive(Serialize, Deserialize)] pub struct ChainRequestData { callable: ZPtr, @@ -86,8 +137,9 @@ impl ChainResponseData { Ok((result, next_callable)) } - pub fn extract_proof(self) -> CompressedSNARK, SS1, SS2> { - self.proof + #[inline] + pub fn get_proof(&self) -> &CompressedSNARK, SS1, SS2> { + &self.proof } } diff --git a/chain-server/src/server.rs b/chain-server/src/server.rs index 3aca22065..c0def2dde 100644 --- a/chain-server/src/server.rs +++ b/chain-server/src/server.rs @@ -3,6 +3,7 @@ use anyhow::Result; use camino::Utf8PathBuf; use clap::{Args, Parser, Subcommand}; use halo2curves::bn256::Fr; +use nova::supernova::RecursiveSNARK; use once_cell::sync::OnceCell; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ @@ -18,18 +19,20 @@ use lurk::{ zstore::ZStore, }, coprocessor::Coprocessor, - dual_channel::dummy_terminal, - field::LurkField, + dual_channel::{dummy_terminal, pair_terminals}, lang::{Coproc, Lang}, lem::{ - eval::{evaluate, make_cprocs_funcs_from_lang, make_eval_step_from_config, EvalConfig}, + eval::{ + evaluate, make_cprocs_funcs_from_lang, make_eval_step_from_config, resume_stream, + start_stream, EvalConfig, + }, pointers::{Ptr, ZPtr}, store::Store, tag::Tag, Func, }, proof::{ - nova::{CurveCycleEquipped, Dual}, + nova::{CurveCycleEquipped, Dual, E1}, supernova::{PublicParams, SuperNovaProver}, Prover, RecursiveSNARKTrait, }, @@ -47,9 +50,9 @@ use chain_prover::{ ChainRequest, ChainResponse, ConfigRequest, ConfigResponse, }; -use chain_server::{ChainRequestData, ChainResponseData}; +use chain_server::{ChainRequestData, ChainResponseData, ConfigResponseData}; -struct ChainProverService> { +struct StandaloneService> { callable: Arc>, store: Arc>, // TODO: add the store to the state to allow memory cleansing limit: usize, @@ -60,7 +63,7 @@ struct ChainProverService> { session: Option, } -impl> ChainProverService { +impl> StandaloneService { fn new( callable: Ptr, store: Store, @@ -90,7 +93,7 @@ impl> ChainProverService { impl< F: CurveCycleEquipped + DeserializeOwned + Serialize, C: Coprocessor + Serialize + DeserializeOwned + 'static, - > ChainProver for ChainProverService + > ChainProver for StandaloneService where ::Repr: Abomonation, as ff::PrimeField>::Repr: Abomonation, @@ -116,7 +119,7 @@ where } // assemble call expression - let call_expr = self.store.list([*callable_state, argument]); + let call_expr = self.store.list([callable, argument]); // evaluate to produce the frames let frames = evaluate( @@ -158,12 +161,12 @@ where .prover .prove_from_frames(pp, &frames, &self.store, None) .map_err(|e| Status::internal(e.to_string()))?; - let proof = proof + let compressed_proof = proof .compress(pp) .map_err(|e| Status::internal(e.to_string()))?; // the above compression operated on a recursive proof, so the following `into_owned()` should // not involve cloning - let proof = proof + let compressed_proof = compressed_proof .into_owned() .get_compressed() .ok_or(Status::internal("Failed to retrieve the compressed SNARK"))?; @@ -173,13 +176,13 @@ where &result, &next_callable, &self.store, - proof, + compressed_proof, )) .map_err(|e| Status::internal(e.to_string()))?; // save the session if let Some(session) = &self.session { - let session_data = SessionData::pack(self, &next_callable); + let session_data = SessionData::pack_standalone(self, &next_callable); dump(session_data, session).map_err(|e| Status::internal(e.to_string()))?; } @@ -198,42 +201,264 @@ where } async fn config(&self, _: Request) -> Result, Status> { - let rc = usize::try_into(self.prover.reduction_count()) - .map_err(|_e| Status::failed_precondition("Failed to convert rc to u32"))?; let callable = self .callable .lock() .map_err(|e| Status::aborted(e.to_string()))?; - let callable = ser(LurkData::new(&callable, &self.store)) - .map_err(|e| Status::internal(e.to_string()))?; - Ok(Response::new(ConfigResponse { rc, callable })) + let config_response_data = ser(ConfigResponseData::new( + self.prover.reduction_count(), + &callable, + None, + &self.store, + )) + .map_err(|e| Status::internal(e.to_string()))?; + Ok(Response::new(ConfigResponse { + config_response_data, + })) + } +} + +struct StreamState { + callable: Ptr, + result_and_proof: Option<(Ptr, RecursiveSNARK>)>, +} + +struct StreamService> { + state: Arc>>, + first_callable: Ptr, + store: Arc>, // TODO: add the store to the state to allow memory cleansing + limit: usize, + lurk_step: Func, + cprocs: Vec, + prover: SuperNovaProver, + public_params: OnceCell>, + session: Option, +} + +impl> StreamService { + fn new( + callable: Ptr, + first_callable: Ptr, + result_and_proof: Option<(Ptr, RecursiveSNARK>)>, + store: Store, + limit: usize, + lang: Lang, + rc: usize, + session: Option, + ) -> Self { + let eval_config = EvalConfig::new_nivc(&lang); + let lurk_step = make_eval_step_from_config(&eval_config); + let cprocs = make_cprocs_funcs_from_lang(&lang); + let prover = SuperNovaProver::<_, C>::new(rc, Arc::new(lang)); + Self { + state: Arc::new(Mutex::new(StreamState { + callable, + result_and_proof, + })), + first_callable, + store: Arc::new(store), + limit, + lurk_step, + cprocs, + prover, + public_params: OnceCell::new(), + session, + } + } +} + +#[tonic::async_trait] +impl< + F: CurveCycleEquipped + DeserializeOwned + Serialize, + C: Coprocessor + Serialize + DeserializeOwned + 'static, + > ChainProver for StreamService +where + ::Repr: Abomonation, + as ff::PrimeField>::Repr: Abomonation, +{ + async fn chain( + &self, + request: Request, + ) -> Result, Status> { + // deserialize and intern the provided callable state and argument + let ChainRequest { chain_request_data } = request.into_inner(); + let (callable, argument) = de::>(&chain_request_data) + .and_then(|d| d.interned(&self.store)) + .map_err(|e| Status::invalid_argument(e.to_string()))?; + + // retrieve callable state + let mut state = self + .state + .lock() + .map_err(|e| Status::aborted(e.to_string()))?; + + if !self.store.ptr_eq(&callable, &state.callable) { + return Err(Status::invalid_argument("Invalid callable state provided")); + } + + let (t1, t2) = pair_terminals(); + let lang_setup: Option<(_, &[_], &Lang<_, _>)> = + Some((&self.lurk_step, &self.cprocs, self.prover.lang())); + + let frames = if let Some((result, _)) = &state.result_and_proof { + // got a previous result we can use to resume the stream + t2.send(self.store.intern_nil()) + .map_err(|e| Status::internal(e.to_string()))?; + t2.send(argument) + .map_err(|e| Status::internal(e.to_string()))?; + let input = vec![ + self.store.cons(*result, callable), + self.store.intern_empty_env(), + self.store.cont_stream_pause(), + ]; + resume_stream(lang_setup, input, &self.store, self.limit, &t1) + } else { + // no previous result so we start the stream + t2.send(argument) + .map_err(|e| Status::internal(e.to_string()))?; + start_stream(lang_setup, callable, &self.store, self.limit, &t1) + } + .map_err(|e| Status::data_loss(e.to_string()))?; + + // retrieve the result of the call + let Some((Some(expr_out), Some(cont_out))) = frames + .last() + .map(|frame| (frame.output.first(), frame.output.last())) + else { + return Err(Status::internal("Failed to get the evaluation result")); + }; + + // make sure that the evaluation terminated appropriatelly + match cont_out.tag() { + Tag::Cont(ContTag::StreamPause) => { + // get the car/cdr of the result to retrieve the chain result and + // the next callable + let (result, next_callable) = self.store.fetch_cons(expr_out).ok_or_else(|| { + Status::failed_precondition("Call didn't result in a cons expression") + })?; + + // retrieve (or compute if needed) the public params for proving + let pp = self + .public_params + .get_or_try_init(|| { + supernova_public_params(&Instance::new_supernova(&self.prover, true)) + }) + .map_err(|e| Status::internal(e.to_string()))?; + + let previous_proof = state + .result_and_proof + .as_ref() + .map(|(_, proof)| proof.clone()); + + // prove then compress the proof + let (proof, ..) = self + .prover + .prove_from_frames(pp, &frames, &self.store, previous_proof) + .map_err(|e| Status::internal(e.to_string()))?; + let compressed_proof = proof + .compress(pp) + .map_err(|e| Status::internal(e.to_string()))?; + // the above compression operated on a recursive proof, so the following `into_owned()` should + // not involve cloning + let compressed_proof = compressed_proof + .into_owned() + .get_compressed() + .ok_or(Status::internal("Failed to retrieve the compressed SNARK"))?; + + let Some(recursive_proof) = proof.get_recursive() else { + return Err(Status::internal("Not a recursive proof")); + }; + + // produce the data for the response + let chain_response_data = ser(ChainResponseData::new( + &result, + &next_callable, + &self.store, + compressed_proof, + )) + .map_err(|e| Status::internal(e.to_string()))?; + + // save the session + if let Some(session) = &self.session { + let session_data = SessionData::pack_stream( + self, + &next_callable, + Some((&result, recursive_proof.clone())), + ); + dump(session_data, session).map_err(|e| Status::internal(e.to_string()))?; + } + + // now it's safe to set the new state since no error has occurred so far + *state = StreamState { + callable: next_callable, + result_and_proof: Some((result, recursive_proof)), + }; + + Ok(Response::new(ChainResponse { + chain_response_data, + })) + } + Tag::Cont(ContTag::Error) => Err(Status::invalid_argument("Evaluation error")), + Tag::Cont(_) => Err(Status::resource_exhausted("Unfinished evaluation")), + _ => Err(Status::internal("Invalid continuation tag")), + } + } + + async fn config(&self, _: Request) -> Result, Status> { + let state = self + .state + .lock() + .map_err(|e| Status::aborted(e.to_string()))?; + let config_response_data = ser(ConfigResponseData::new( + self.prover.reduction_count(), + &state.callable, + Some(&self.first_callable), + &self.store, + )) + .map_err(|e| Status::internal(e.to_string()))?; + Ok(Response::new(ConfigResponse { + config_response_data, + })) } } +#[derive(Serialize, Deserialize)] +struct StreamSessionData { + first_callable: ZPtr, + result_and_proof: Option<(ZPtr, RecursiveSNARK>)>, +} + /// Holds data from a cached session #[derive(Serialize, Deserialize)] -struct SessionData { +struct SessionData { callable: ZPtr, + stream_session_data: Option>, z_store: ZStore, limit: usize, lang: Lang, rc: usize, } +enum ServiceWrapper> { + Standalone(StandaloneService), + Stream(StreamService), +} + impl> SessionData { - fn pack(data: &ChainProverService, callable: &Ptr) -> Self { - let ChainProverService { + fn pack_standalone(svc: &StandaloneService, callable: &Ptr) -> Self { + let StandaloneService { store, limit, prover, .. - } = data; - let (z_store, callable) = ZStore::from_store_and_ptr(store, callable); + } = svc; + let (z_store, callable, _) = ZStore::from_store_and_ptr(store, callable); let limit = *limit; let lang = (*prover.lang().clone()).clone(); let rc = prover.reduction_count(); Self { callable, + stream_session_data: None, z_store, limit, lang, @@ -241,27 +466,84 @@ impl> SessionData { } } - fn unpack(self, session: Utf8PathBuf) -> Result> { - let Self { + fn pack_stream( + svc: &StreamService, + callable: &Ptr, + result_and_proof: Option<(&Ptr, RecursiveSNARK>)>, + ) -> Self { + let StreamService { + first_callable, + store, + limit, + prover, + .. + } = svc; + let (mut z_store, callable, mut cache) = ZStore::from_store_and_ptr(store, callable); + let first_callable = z_store.populate_with(first_callable, store, &mut cache); + let result_and_proof = result_and_proof + .map(|(result, proof)| (z_store.populate_with(result, store, &mut cache), proof)); + let stream_session_data = Some(StreamSessionData { + first_callable, + result_and_proof, + }); + let limit = *limit; + let lang = (*prover.lang().clone()).clone(); + let rc = prover.reduction_count(); + Self { callable, + stream_session_data, z_store, limit, lang, rc, - } = self; - let (store, callable) = z_store.to_store_and_ptr(&callable)?; - Ok(ChainProverService::new( + } + } + + fn unpack(self, session: Utf8PathBuf) -> Result> { + let Self { callable, - store, + stream_session_data, + z_store, limit, lang, rc, - Some(session), - )) + } = self; + let (store, callable, mut cache) = z_store.to_store_and_ptr(&callable)?; + if let Some(StreamSessionData { + first_callable, + result_and_proof, + }) = stream_session_data + { + let first_callable = z_store.populate_store(&first_callable, &store, &mut cache)?; + let result_and_proof = if let Some((result, proof)) = result_and_proof { + Some((z_store.populate_store(&result, &store, &mut cache)?, proof)) + } else { + None + }; + Ok(ServiceWrapper::Stream(StreamService::new( + callable, + first_callable, + result_and_proof, + store, + limit, + lang, + rc, + Some(session), + ))) + } else { + Ok(ServiceWrapper::Standalone(StandaloneService::new( + callable, + store, + limit, + lang, + rc, + Some(session), + ))) + } } } -impl HasFieldModulus for SessionData { +impl HasFieldModulus for SessionData { fn field_modulus() -> String { F::MODULUS.to_string() } @@ -299,6 +581,10 @@ struct InitArgs { #[clap(value_parser)] callable: String, + /// Flag to start the server in stream mode + #[arg(long)] + stream: bool, + /// Flag to use a persisted commitment as the callable state #[arg(long)] comm: bool, @@ -346,7 +632,7 @@ struct ResumeArgs { fn get_service_and_address< F: CurveCycleEquipped + DeserializeOwned, C: Coprocessor + DeserializeOwned, ->() -> Result<(ChainProverService, SocketAddr), Box> { +>() -> Result<(ServiceWrapper, SocketAddr), Box> { let Cli { command } = Cli::parse(); let local_ip = |port| SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port)); match command { @@ -365,20 +651,32 @@ fn get_service_and_address< let LurkData:: { z_ptr, z_dag } = load(&Utf8PathBuf::from(&init_args.callable))?; z_dag.populate_store_simple(&z_ptr, &store)? }; - let svc = ChainProverService::new( - callable, - store, - init_args.get_limit(), - lang, - init_args.get_rc(), - init_args.session, - ); + let svc = if init_args.stream { + ServiceWrapper::Stream(StreamService::new( + callable, + callable, + None, + store, + init_args.get_limit(), + lang, + init_args.get_rc(), + init_args.session, + )) + } else { + ServiceWrapper::Standalone(StandaloneService::new( + callable, + store, + init_args.get_limit(), + lang, + init_args.get_rc(), + init_args.session, + )) + }; Ok((svc, local_ip(init_args.port))) } Command::Resume(resume_args) => { let session = resume_args.session; - let session_data: SessionData = load(&session)?; - let svc = session_data.unpack(session)?; + let svc = load::>(&session).and_then(|sd| sd.unpack(session))?; Ok((svc, local_ip(resume_args.port))) } } @@ -387,11 +685,18 @@ fn get_service_and_address< #[tokio::main] async fn main() -> Result<(), Box> { let (svc, addr) = get_service_and_address::>()?; - - Server::builder() - .add_service(ChainProverServer::new(svc)) - .serve(addr) - .await?; + macro_rules! serve { + ($svc:expr) => { + Server::builder() + .add_service(ChainProverServer::new($svc)) + .serve(addr) + .await? + }; + } + match svc { + ServiceWrapper::Standalone(svc) => serve!(svc), + ServiceWrapper::Stream(svc) => serve!(svc), + } Ok(()) } diff --git a/src/cli/repl/mod.rs b/src/cli/repl/mod.rs index 4a33efacf..fc6424eef 100644 --- a/src/cli/repl/mod.rs +++ b/src/cli/repl/mod.rs @@ -13,12 +13,7 @@ use rustyline::{ use rustyline_derive::{Completer, Helper, Highlighter, Hinter}; use serde::{de::DeserializeOwned, Serialize}; use std::{ - cell::OnceCell, - collections::HashMap, - fs::read_to_string, - io::Write, - sync::{Arc, Mutex}, - thread, + cell::OnceCell, collections::HashMap, fs::read_to_string, io::Write, sync::Arc, thread, time::Duration, }; use tracing::info; @@ -506,15 +501,6 @@ where /// if the final continuation is terminal or error fn eval_expr_collecting_emitted(&self, expr: Ptr) -> Result> { let (t1, t2) = pair_terminals::(); - let store_clone = self.store.clone(); - let emitted = Arc::new(Mutex::new(vec![])); - let emitted_clone = emitted.clone(); - thread::spawn(move || { - for ptr in t2.iter() { - println!("{}", ptr.fmt_to_string_simple(&store_clone)); - emitted_clone.lock().unwrap().push(ptr); - } - }); let (ptrs, iterations) = evaluate_simple_with_env::( Some(self.lang_setup()), expr, @@ -523,9 +509,12 @@ where self.limit, &t1, )?; - thread::sleep(Duration::from_millis(10)); // wait for last t2 iteration + let emitted_vec = t2.collect(); + for emitted in emitted_vec.iter() { + println!("{}", emitted.fmt_to_string_simple(&self.store)); + } if matches!(ptrs[2].tag(), Tag::Cont(ContTag::Terminal | ContTag::Error)) { - Ok(emitted.lock().unwrap().to_owned()) + Ok(emitted_vec) } else { bail!( "Limit reached after {}", diff --git a/src/cli/zstore.rs b/src/cli/zstore.rs index f876a0c86..dbaa3112e 100644 --- a/src/cli/zstore.rs +++ b/src/cli/zstore.rs @@ -369,30 +369,36 @@ impl ZStore { Ok(store) } - pub fn to_store_and_ptr(&self, z_ptr: &ZPtr) -> Result<(Store, Ptr)> { + pub fn to_store_and_ptr( + &self, + z_ptr: &ZPtr, + ) -> Result<(Store, Ptr, HashMap, Ptr>)> { let store = Store::default(); - let cache = &mut HashMap::default(); + let mut cache = HashMap::default(); for (FWrap(hash), (secret, z_payload)) in &self.comms { - let payload = self.populate_store(z_payload, &store, cache)?; + let payload = self.populate_store(z_payload, &store, &mut cache)?; store.add_comm(*hash, *secret, payload); } - let ptr = self.populate_store(z_ptr, &store, cache)?; - Ok((store, ptr)) + let ptr = self.populate_store(z_ptr, &store, &mut cache)?; + Ok((store, ptr, cache)) } - pub fn from_store_and_ptr(store: &Store, ptr: &Ptr) -> (Self, ZPtr) { + pub fn from_store_and_ptr( + store: &Store, + ptr: &Ptr, + ) -> (Self, ZPtr, HashMap>) { let mut z_store = ZStore::default(); - let cache = &mut HashMap::default(); + let mut cache = HashMap::default(); for (FWrap(hash), img) in store.comms.clone().into_tuple_vec() { - let payload = z_store.populate_with(&img.1, store, cache); + let payload = z_store.populate_with(&img.1, store, &mut cache); z_store.add_comm(hash, img.0, payload) } - let z_ptr = z_store.populate_with(ptr, store, cache); - (z_store, z_ptr) + let z_ptr = z_store.populate_with(ptr, store, &mut cache); + (z_store, z_ptr, cache) } #[inline] - pub(crate) fn populate_with( + pub fn populate_with( &mut self, ptr: &Ptr, store: &Store, @@ -408,7 +414,7 @@ impl ZStore { } #[inline] - pub(crate) fn populate_store( + pub fn populate_store( &self, z_ptr: &ZPtr, store: &Store, diff --git a/src/lem/eval.rs b/src/lem/eval.rs index f13fd4bb7..94948f07f 100644 --- a/src/lem/eval.rs +++ b/src/lem/eval.rs @@ -300,7 +300,7 @@ pub fn start_stream>( start_stream_with_env( lang_setup, callable, - store.intern_nil(), + store.intern_empty_env(), store, limit, ch_terminal, @@ -328,10 +328,9 @@ pub fn resume_stream>( } #[inline] -pub fn start_stream_simple_with_env>( +pub fn start_stream_simple>( lang_setup: Option<(&Func, &[Func], &Lang)>, callable: Ptr, - env: Ptr, store: &Store, limit: usize, ch_terminal: &ChannelTerminal, @@ -339,7 +338,7 @@ pub fn start_stream_simple_with_env>( evaluate_simple_with_env_and_cont( lang_setup, callable, - env, + store.intern_empty_env(), store.cont_stream_start(), store, limit, @@ -347,24 +346,6 @@ pub fn start_stream_simple_with_env>( ) } -#[inline] -pub fn start_stream_simple>( - lang_setup: Option<(&Func, &[Func], &Lang)>, - callable: Ptr, - store: &Store, - limit: usize, - ch_terminal: &ChannelTerminal, -) -> Result<(Vec, usize)> { - start_stream_simple_with_env( - lang_setup, - callable, - store.intern_nil(), - store, - limit, - ch_terminal, - ) -} - #[inline] pub fn resume_stream_simple>( lang_setup: Option<(&Func, &[Func], &Lang)>, @@ -1441,7 +1422,7 @@ fn apply_cont(cprocs: &[(&Symbol, usize)], ivc: bool) -> Func { match result.tag { Expr::Cons => { let cont: Cont::StreamPause = HASH_8_ZEROS; - return (result, env, cont, ret); + return (result, empty_env, cont, ret); } }; return (result, env, err, errctrl); @@ -1936,16 +1917,16 @@ fn make_thunk() -> Func { aux_func!(make_thunk(expr, env, cont, ctrl): 3 => { match ctrl.value { Symbol("make-thunk") => { + // We erase the environment as to not leak any information about internal variables. + let empty_env: Expr::Env; match cont.tag { Cont::Outermost => { let cont: Cont::Terminal = HASH_8_ZEROS; - // We erase the environment as to not leak any information about internal variables. - let empty_env: Expr::Env; return (expr, empty_env, cont) } Cont::StreamDispatch => { let cont: Cont::StreamPause = HASH_8_ZEROS; - return (expr, env, cont); + return (expr, empty_env, cont); } }; let thunk: Expr::Thunk = cons2(expr, cont); diff --git a/src/lem/store.rs b/src/lem/store.rs index 1c57f0c7b..dd338911b 100644 --- a/src/lem/store.rs +++ b/src/lem/store.rs @@ -741,24 +741,34 @@ impl Store { intern_ptrs!(self, Tag::Expr(Fun), arg, body, env, self.dummy()) } + #[inline] + fn cont_atom(&self, cont_tag: ContTag) -> Ptr { + Ptr::new(Tag::Cont(cont_tag), RawPtr::Atom(self.hash8zeros_idx)) + } + #[inline] pub fn cont_outermost(&self) -> Ptr { - Ptr::new(Tag::Cont(Outermost), RawPtr::Atom(self.hash8zeros_idx)) + self.cont_atom(Outermost) } #[inline] pub fn cont_error(&self) -> Ptr { - Ptr::new(Tag::Cont(ContTag::Error), RawPtr::Atom(self.hash8zeros_idx)) + self.cont_atom(ContTag::Error) } #[inline] pub fn cont_terminal(&self) -> Ptr { - Ptr::new(Tag::Cont(Terminal), RawPtr::Atom(self.hash8zeros_idx)) + self.cont_atom(Terminal) } #[inline] pub fn cont_stream_start(&self) -> Ptr { - Ptr::new(Tag::Cont(StreamStart), RawPtr::Atom(self.hash8zeros_idx)) + self.cont_atom(StreamStart) + } + + #[inline] + pub fn cont_stream_pause(&self) -> Ptr { + self.cont_atom(StreamPause) } /// Function specialized on deconstructing `Cons` pointers into their car/cdr diff --git a/src/lem/tests/stream.rs b/src/lem/tests/stream.rs index d6ff7ea21..2ab913772 100644 --- a/src/lem/tests/stream.rs +++ b/src/lem/tests/stream.rs @@ -38,6 +38,7 @@ fn assert_start_stream( start_stream_simple::>(None, callable, store, LIMIT, &t1).unwrap(); let (result, _) = store.fetch_cons(&output[0]).unwrap(); assert_eq!(result, expected_result); + assert_eq!(output[1], store.intern_empty_env()); expect_eq(iterations, expected_iterations); output } @@ -56,6 +57,7 @@ fn assert_resume_stream( resume_stream_simple::>(None, input, store, LIMIT, &t1).unwrap(); let (result, _) = store.fetch_cons(&output[0]).unwrap(); assert_eq!(result, expected_result); + assert_eq!(output[1], store.intern_empty_env()); expect_eq(iterations, expected_iterations); output }