diff --git a/Cargo.lock b/Cargo.lock index 80289443..54204f04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,7 +180,7 @@ dependencies = [ "memchr", "num_cpus", "once_cell", - "pin-project-lite 0.2.0", + "pin-project-lite 0.2.3", "pin-utils", "slab", "wasm-bindgen-futures", @@ -266,9 +266,9 @@ checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" [[package]] name = "byteorder" -version = "1.3.4" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" +checksum = "ae44d1a3d5a19df61dd0c8beb138458ac2a53a7ac09eba97d55592540004306b" [[package]] name = "bytes" @@ -276,6 +276,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" +[[package]] +name = "bytes" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad1f8e949d755f9d79112b5bb46938e0ef9d3804a0b16dfab13aafcaa5f0fa72" + [[package]] name = "cache-padded" version = "1.1.1" @@ -427,9 +433,9 @@ dependencies = [ [[package]] name = "fluvio" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cb9b07c583f2daab342f9f7932d982b15bdfe0fe31df96824efeaa089efdb1b" +checksum = "968a112691b52f72b994250b1fab39571075886add09fe6228c5a7c34119d35c" dependencies = [ "async-channel", "async-mutex", @@ -447,7 +453,7 @@ dependencies = [ "fluvio-types", "futures-util", "once_cell", - "semver 0.11.0", + "semver", "serde", "serde_json", "thiserror", @@ -506,7 +512,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64eb7c1fe99a8d2c7720a8a55cadda7e4971852b1de89ed0b163b664edb7f9e8" dependencies = [ - "bytes", + "bytes 0.5.6", "content_inspector", "crc32c", "fluvio-future", @@ -514,7 +520,7 @@ dependencies = [ "flv-util", "futures-util", "log", - "semver 0.11.0", + "semver", ] [[package]] @@ -534,7 +540,7 @@ dependencies = [ "native-tls", "nix", "openssl", - "pin-project 1.0.2", + "pin-project 1.0.4", "rustls", "thiserror", "tracing", @@ -548,7 +554,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5378886be367e1bbaa5b9400e2ed09aaa768a854787d2e2abd5644dec4119d70" dependencies = [ - "bytes", + "bytes 0.5.6", "fluvio-future", "fluvio-protocol-api", "fluvio-protocol-codec", @@ -575,7 +581,7 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b24aefa4cd4fa185411fd093df2b0cb4114f07eb05fea1f5df15659ec62383a" dependencies = [ - "bytes", + "bytes 0.5.6", "fluvio-protocol-core", "log", "tokio-util", @@ -587,7 +593,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b87eb252268040aa1e9696dbb61fc5b7f61c013db73e8c96e69a5632ad2f6e2" dependencies = [ - "bytes", + "bytes 0.5.6", "log", ] @@ -630,7 +636,7 @@ dependencies = [ "async-mutex", "async-net", "async-trait", - "bytes", + "bytes 0.5.6", "cfg-if 1.0.0", "chashmap", "event-listener", @@ -639,7 +645,7 @@ dependencies = [ "futures-util", "log", "once_cell", - "pin-project 1.0.2", + "pin-project 1.0.4", "thiserror", "tokio", "tokio-util", @@ -652,7 +658,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "725114c2aebc4d4323ff53e6f4ac6378f2d99a85d9de23b82ae3b39d56453274" dependencies = [ - "bytes", + "bytes 0.5.6", "fluvio-dataplane-protocol", "fluvio-protocol", "log", @@ -747,24 +753,24 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" [[package]] name = "futures-channel" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64" +checksum = "f01c61843314e95f96cc9245702248733a3a3d744e43e2e755e3c7af8348a0a9" dependencies = [ "futures-core", ] [[package]] name = "futures-core" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748" +checksum = "db8d3b0917ff63a2a96173133c02818fac4a746b0a57569d3baca9ec0e945e08" [[package]] name = "futures-io" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb" +checksum = "e37c1a51b037b80922864b8eed90692c5cd8abd4c71ce49b77146caa47f3253b" [[package]] name = "futures-lite" @@ -777,15 +783,15 @@ dependencies = [ "futures-io", "memchr", "parking", - "pin-project-lite 0.2.0", + "pin-project-lite 0.2.3", "waker-fn", ] [[package]] name = "futures-macro" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556" +checksum = "0f8719ca0e1f3c5e34f3efe4570ef2c0610ca6da85ae7990d472e9cbfba13664" dependencies = [ "proc-macro-hack", "proc-macro2", @@ -795,24 +801,24 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d" +checksum = "f6adabac1290109cfa089f79192fb6244ad2c3f1cc2281f3e1dd987592b71feb" [[package]] name = "futures-task" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d" +checksum = "a92a0843a2ff66823a8f7c77bffe9a09be2b64e533562c412d63075643ec0038" dependencies = [ "once_cell", ] [[package]] name = "futures-util" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2" +checksum = "036a2107cdeb57f6d7322f1b6c363dad67cd63ca3b7d1b925bdf75bd5d96cda9" dependencies = [ "futures-core", "futures-io", @@ -820,26 +826,13 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project 1.0.2", + "pin-project-lite 0.2.3", "pin-utils", "proc-macro-hack", "proc-macro-nested", "slab", ] -[[package]] -name = "generator" -version = "0.6.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cdc09201b2e8ca1b19290cf7e65de2246b8e91fb6874279722189c4de7b94dc" -dependencies = [ - "cc", - "libc", - "log", - "rustc_version", - "winapi 0.3.9", -] - [[package]] name = "getrandom" version = "0.1.16" @@ -881,7 +874,7 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535" dependencies = [ - "bytes", + "bytes 0.5.6", "fnv", "futures-core", "futures-sink", @@ -912,11 +905,11 @@ dependencies = [ [[package]] name = "http" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84129d298a6d57d246960ff8eb831ca4af3f96d29e2e28848dae275408658e26" +checksum = "7245cd7449cc792608c3c8a9eaf69bd4eabbabf802713748fd739c98b82f0747" dependencies = [ - "bytes", + "bytes 1.0.0", "fnv", "itoa", ] @@ -927,7 +920,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" dependencies = [ - "bytes", + "bytes 0.5.6", "http", ] @@ -949,7 +942,7 @@ version = "0.13.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ad767baac13b44d4529fcf58ba2cd0995e36e7b435bc5b039de6f47e880dbf" dependencies = [ - "bytes", + "bytes 0.5.6", "futures-channel", "futures-core", "futures-util", @@ -959,7 +952,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.2", + "pin-project 1.0.4", "socket2", "tokio", "tower-service", @@ -973,7 +966,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed" dependencies = [ - "bytes", + "bytes 0.5.6", "hyper", "native-tls", "tokio", @@ -1089,9 +1082,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.81" +version = "0.2.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" +checksum = "89203f3fba0a3795506acaad8ebce3c80c0af93f994d5a1d7a0b1eeb23271929" [[package]] name = "log" @@ -1102,19 +1095,6 @@ dependencies = [ "cfg-if 0.1.10", ] -[[package]] -name = "loom" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed" -dependencies = [ - "cfg-if 0.1.10", - "generator", - "scoped-tls", - "serde", - "serde_json", -] - [[package]] name = "matchers" version = "0.0.1" @@ -1411,7 +1391,7 @@ checksum = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa" dependencies = [ "libc", "rand 0.4.6", - "smallvec 0.6.13", + "smallvec 0.6.14", "winapi 0.3.9", ] @@ -1441,11 +1421,11 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.0.2" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7" +checksum = "95b70b68509f17aa2857863b6fa00bf21fc93674c7a8893de2f469f6aa7ca2f2" dependencies = [ - "pin-project-internal 1.0.2", + "pin-project-internal 1.0.4", ] [[package]] @@ -1461,9 +1441,9 @@ dependencies = [ [[package]] name = "pin-project-internal" -version = "1.0.2" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f" +checksum = "caa25a6393f22ce819b0f50e0be89287292fda8d425be38ee0ca14c4931d9e71" dependencies = [ "proc-macro2", "quote", @@ -1478,9 +1458,9 @@ checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b" [[package]] name = "pin-project-lite" -version = "0.2.0" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" +checksum = "ba36e0a6cc5a4c645073f4984f1ed55d09f5857d4de7c14550baa81a39ef5a17" [[package]] name = "pin-utils" @@ -1640,9 +1620,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.4.2" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c" +checksum = "d9251239e129e16308e70d853559389de218ac275b515068abc96829d05b948a" dependencies = [ "aho-corasick", "memchr", @@ -1662,9 +1642,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.21" +version = "0.6.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" +checksum = "b5eb417147ba9860a96cfe72a0b93bf88fee1744b5636ec99ab20c1aa9376581" [[package]] name = "remove_dir_all" @@ -1682,7 +1662,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0718f81a8e14c4dbb3b34cf23dc6aaf9ab8a0dfec160c534b3dbca1aaa21f47c" dependencies = [ "base64 0.13.0", - "bytes", + "bytes 0.5.6", "encoding_rs", "futures-core", "futures-util", @@ -1698,7 +1678,7 @@ dependencies = [ "mime_guess", "native-tls", "percent-encoding", - "pin-project-lite 0.2.0", + "pin-project-lite 0.2.3", "serde", "serde_urlencoded", "tokio", @@ -1737,15 +1717,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "rustc_version" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" -dependencies = [ - "semver 0.9.0", -] - [[package]] name = "rustls" version = "0.18.1" @@ -1775,12 +1746,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "scoped-tls" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" - [[package]] name = "sct" version = "0.6.0" @@ -1814,35 +1779,20 @@ dependencies = [ "libc", ] -[[package]] -name = "semver" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" -dependencies = [ - "semver-parser 0.7.0", -] - [[package]] name = "semver" version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" dependencies = [ - "semver-parser 0.10.1", + "semver-parser", ] [[package]] name = "semver-parser" -version = "0.7.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" - -[[package]] -name = "semver-parser" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42ef146c2ad5e5f4b037cd6ce2ebb775401729b19a82040c1beac9d36c7d1428" +checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7" dependencies = [ "pest", ] @@ -1892,12 +1842,11 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4921be914e16899a80adefb821f8ddb7974e3f1250223575a44ed994882127" +checksum = "79c719719ee05df97490f80a45acfc99e5a30ce98a1e4fb67aee422745ae14e3" dependencies = [ "lazy_static", - "loom", ] [[package]] @@ -1927,18 +1876,18 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" [[package]] name = "smallvec" -version = "0.6.13" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6" +checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0" dependencies = [ "maybe-uninit", ] [[package]] name = "smallvec" -version = "1.5.1" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae524f056d7d770e174287294f562e95044c68e88dec909a00d2094805db9d75" +checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" [[package]] name = "socket2" @@ -1971,9 +1920,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "syn" -version = "1.0.56" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9802ddde94170d186eeee5005b798d9c159fa970403f1be19976d0cfb939b72" +checksum = "cc60a3d73ea6594cd712d830cc1f0390fd71542d8c8cd24e70cc54cdfd5e05d5" dependencies = [ "proc-macro2", "quote", @@ -2016,9 +1965,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +checksum = "bb9bc092d0d51e76b2b19d9d85534ffc9ec2db959a2523cdae0697e2972cd447" dependencies = [ "lazy_static", ] @@ -2055,7 +2004,7 @@ version = "0.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "099837d3464c16a808060bb3f02263b412f6fafcb5d01c533d309985fbeebe48" dependencies = [ - "bytes", + "bytes 0.5.6", "fnv", "futures-core", "iovec", @@ -2095,7 +2044,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ - "bytes", + "bytes 0.5.6", "futures-core", "futures-io", "futures-sink", @@ -2127,7 +2076,7 @@ checksum = "9f47026cdc4080c07e49b37087de021820269d996f581aac150ef9e5583eefe3" dependencies = [ "cfg-if 1.0.0", "log", - "pin-project-lite 0.2.0", + "pin-project-lite 0.2.3", "tracing-attributes", "tracing-core", ] @@ -2197,7 +2146,7 @@ dependencies = [ "serde", "serde_json", "sharded-slab", - "smallvec 1.5.1", + "smallvec 1.6.1", "thread_local", "tracing", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index 66182dc5..7137b181 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,11 @@ crate-type = ["cdylib"] test = false [dependencies] -log = "0.4.8" +log = "0.4.11" serde = { version = "1.0.110", features = ["derive"] } serde_json = "1.0.53" node-bindgen = "4.1.0" -fluvio = { version = "0.3.1", features = ["admin"] } +fluvio = { version = "0.3.2", features = ["admin"] } fluvio-future = { version = "0.1.13", features = ["tls", "task", "io"] } [build-dependencies] diff --git a/src/admin.rs b/src/admin.rs index 45c7b883..30a820c2 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -43,20 +43,20 @@ const RESOLUTION_KEY: &str = "resolution"; const LSR_KEY: &str = "lsr"; const REPLICAS_KEY: &str = "replicas"; -pub struct FluvioAdminWrapper(FluvioAdmin); - -impl FluvioAdminWrapper { - pub fn new(client: FluvioAdmin) -> Self { - Self(client) +impl From for FluvioAdminJS { + fn from(inner: FluvioAdmin) -> Self { + Self { inner: Some(inner) } } } -impl TryIntoJs for FluvioAdminWrapper { +impl TryIntoJs for FluvioAdminJS { fn try_to_js(self, js_env: &JsEnv) -> Result { debug!("converting FluvioWrapper to js"); let new_instance = FluvioAdminJS::new_instance(js_env, vec![])?; debug!("instance created"); - FluvioAdminJS::unwrap_mut(js_env, new_instance)?.set_client(self.0); + if let Some(inner) = self.inner { + FluvioAdminJS::unwrap_mut(js_env, new_instance)?.set_client(inner); + } Ok(new_instance) } } diff --git a/src/connect.rs b/src/connect.rs index cf7c13aa..2276a952 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,19 +1,19 @@ use node_bindgen::derive::node_bindgen; use fluvio::{Fluvio, FluvioConfig, FluvioError}; -use crate::fluvio::FluvioWrapper; +use crate::fluvio::FluvioJS; #[node_bindgen()] -async fn connect(host_addr: Option) -> Result { +async fn connect(host_addr: Option) -> Result { match host_addr { Some(host) => { let config = FluvioConfig::new(host); let socket = Fluvio::connect_with_config(&config).await?; - Ok(FluvioWrapper::from(socket)) + Ok(FluvioJS::from(socket)) } None => { let socket = Fluvio::connect().await?; - Ok(FluvioWrapper::from(socket)) + Ok(FluvioJS::from(socket)) } } } diff --git a/src/consumer.rs b/src/consumer.rs index d1ea3e9f..34f12783 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -50,22 +50,14 @@ const VALUE_KEY: &str = "value"; const BATCHES_KEY: &str = "batches"; -pub struct PartitionConsumerWrapper { - client: PartitionConsumer, -} - -impl PartitionConsumerWrapper { - pub fn new(client: PartitionConsumer) -> Self { - Self { client } - } -} - -impl TryIntoJs for PartitionConsumerWrapper { +impl TryIntoJs for PartitionConsumerJS { fn try_to_js(self, js_env: &JsEnv) -> Result { - debug!("converting FluvioWrapper to js"); + debug!("converting PartitionConsumerJS to js"); let new_instance = PartitionConsumerJS::new_instance(js_env, vec![])?; debug!("instance created"); - PartitionConsumerJS::unwrap_mut(js_env, new_instance)?.set_client(self.client); + if let Some(inner) = self.inner { + PartitionConsumerJS::unwrap_mut(js_env, new_instance)?.set_client(inner); + } Ok(new_instance) } } @@ -74,6 +66,12 @@ pub struct PartitionConsumerJS { inner: Option, } +impl From for PartitionConsumerJS { + fn from(inner: PartitionConsumer) -> Self { + Self { inner: Some(inner) } + } +} + #[node_bindgen] impl PartitionConsumerJS { #[node_bindgen(constructor)] @@ -132,6 +130,7 @@ impl PartitionConsumerJS { Err(e) => cb("error".to_owned(), format!("{}", e)), } } + debug!("Stream ended!"); Ok(()) } diff --git a/src/fluvio.rs b/src/fluvio.rs index 9ad443d9..d2591cfd 100644 --- a/src/fluvio.rs +++ b/src/fluvio.rs @@ -1,7 +1,7 @@ use crate::CLIENT_NOT_FOUND_ERROR_MSG; -use crate::admin::FluvioAdminWrapper; -use crate::consumer::PartitionConsumerWrapper; -use crate::producer::TopicProducerWrapper; +use crate::admin::FluvioAdminJS; +use crate::consumer::PartitionConsumerJS; +use crate::producer::TopicProducerJS; use log::debug; @@ -14,21 +14,20 @@ use node_bindgen::core::val::JsEnv; use node_bindgen::sys::napi_value; use node_bindgen::core::JSClass; -// simple wrapper to facilitate conversion to JS Class -pub struct FluvioWrapper(Fluvio); - -impl From for FluvioWrapper { - fn from(client: Fluvio) -> Self { - Self(client) +impl From for FluvioJS { + fn from(inner: Fluvio) -> Self { + Self { inner: Some(inner) } } } -impl TryIntoJs for FluvioWrapper { +impl TryIntoJs for FluvioJS { fn try_to_js(self, js_env: &JsEnv) -> Result { - debug!("converting FluvioWrapper to js"); + debug!("converting FluvioJS to js"); let new_instance = FluvioJS::new_instance(js_env, vec![])?; debug!("instance created"); - FluvioJS::unwrap_mut(js_env, new_instance)?.set_client(self.0); + if let Some(inner) = self.inner { + FluvioJS::unwrap_mut(js_env, new_instance)?.set_client(inner); + } Ok(new_instance) } } @@ -49,10 +48,10 @@ impl FluvioJS { } #[node_bindgen] - async fn admin(&mut self) -> Result { + async fn admin(&mut self) -> Result { if let Some(client) = &mut self.inner { let admin_client = client.admin().await; - Ok(FluvioAdminWrapper::new(admin_client)) + Ok(FluvioAdminJS::from(admin_client)) } else { Err(FluvioError::Other(CLIENT_NOT_FOUND_ERROR_MSG.to_owned())) } @@ -63,9 +62,9 @@ impl FluvioJS { &mut self, topic: String, partition: i32, - ) -> Result { + ) -> Result { if let Some(client) = &mut self.inner { - Ok(PartitionConsumerWrapper::new( + Ok(PartitionConsumerJS::from( client.partition_consumer(topic, partition).await?, )) } else { @@ -74,11 +73,9 @@ impl FluvioJS { } #[node_bindgen] - async fn topic_producer(&mut self, topic: String) -> Result { + async fn topic_producer(&mut self, topic: String) -> Result { if let Some(client) = &mut self.inner { - Ok(TopicProducerWrapper::new( - client.topic_producer(topic).await?, - )) + Ok(TopicProducerJS::from(client.topic_producer(topic).await?)) } else { Err(FluvioError::Other(CLIENT_NOT_FOUND_ERROR_MSG.to_owned())) } diff --git a/src/producer.rs b/src/producer.rs index a8496050..77a9418c 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -11,22 +11,14 @@ use node_bindgen::core::TryIntoJs; use node_bindgen::sys::napi_value; use node_bindgen::core::JSClass; -pub struct TopicProducerWrapper { - client: TopicProducer, -} - -impl TopicProducerWrapper { - pub fn new(client: TopicProducer) -> Self { - Self { client } - } -} - -impl TryIntoJs for TopicProducerWrapper { +impl TryIntoJs for TopicProducerJS { fn try_to_js(self, js_env: &JsEnv) -> Result { debug!("converting FluvioWrapper to js"); let new_instance = TopicProducerJS::new_instance(js_env, vec![])?; debug!("instance created"); - TopicProducerJS::unwrap_mut(js_env, new_instance)?.set_client(self.client); + if let Some(inner) = self.inner { + TopicProducerJS::unwrap_mut(js_env, new_instance)?.set_client(inner); + } Ok(new_instance) } } @@ -35,6 +27,12 @@ pub struct TopicProducerJS { inner: Option, } +impl From for TopicProducerJS { + fn from(inner: TopicProducer) -> Self { + Self { inner: Some(inner) } + } +} + #[node_bindgen] impl TopicProducerJS { #[node_bindgen(constructor)]