Skip to content

Commit

Permalink
Allow batch write retries for aerospike (#2476)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker authored Apr 4, 2024
1 parent 3642fdc commit 6829bd6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 16 additions & 3 deletions dozer-sink-aerospike/src/aerospike.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,19 +255,23 @@ impl Client {
})
}

pub(crate) fn config(&self) -> &as_config {
unsafe { &(*self.inner.as_ptr()).config }
}

pub(crate) unsafe fn write_batch(
&self,
batch: *mut as_batch_records,
policy: Option<*const as_policy_batch>,
) -> Result<(), AerospikeError> {
debug!(target: "aerospike_sink", "Writing batch of size {}", batch.as_ref().unwrap().list.size);

let started = Instant::now();
let policy = self.inner.as_ref().config.policies.batch;
as_try(|err| {
aerospike_batch_write(
self.inner.as_ptr(),
err,
&policy as *const as_policy_batch,
policy.unwrap_or(std::ptr::null()),
batch,
)
})?;
Expand Down Expand Up @@ -1210,7 +1214,16 @@ impl<'a> WriteBatch<'a> {
}

pub(crate) fn execute(mut self) -> Result<(), AerospikeError> {
unsafe { self.client.write_batch(self.inner.take().unwrap().as_ptr()) }
let config = self.client.config();
let mut policy = config.policies.batch;
policy.base.max_retries = 2;
policy.base.sleep_between_retries = 1000;
unsafe {
self.client.write_batch(
self.inner.take().unwrap().as_ptr(),
Some((&policy) as *const as_policy_batch),
)
}
}
}

Expand Down

0 comments on commit 6829bd6

Please sign in to comment.