Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticseach "Too Many Requests" #73

Closed
fabiomssilva opened this issue Apr 10, 2018 · 16 comments
Closed

Elasticseach "Too Many Requests" #73

fabiomssilva opened this issue Apr 10, 2018 · 16 comments

Comments

@fabiomssilva
Copy link

Hi,

This issue is related with issue #70

While replaying a large number of events to Elasticsearch I find that at certain point elasticsearch reply with:
HTTP/1.1 429 Too Many Requests

Checking the issue I was able to find that all the 'POST' requests to elasticsearch are done at one point int time. I was able to count over 100 requests for every 10ms time period.

Checking the replay code I see that all the requests are done in:

cqrs-eventdenormalizer/lib/definitions/collection.js line 475-490

If we change this block to:

const concurrentOps = 5;
async.series([
  function (callback) {
    async.eachLimit(replVmsToDelete, concurrentOps, commit, callback); // changes
  },
  function (callback) {
    async.eachLimit(replVms, concurrentOps, commit, callback); // changes
  }
], function (err) {
  if (err) {
    debug(err);
  }
  self.replayingVms = {};
  self.replayingVmsToDelete = {};
  self.isReplaying = false;
  callback(err);
});

this will implement rate control on the requests.
But would be good to be able to control the rate while instanciating the repository. (or any other nice place)

There is any possibility to implement this on the lib ?

Please let me know.
Tnks
--Fabio

@adrai
Copy link
Contributor

adrai commented Apr 10, 2018

hmmm.... not so easy

@adrai
Copy link
Contributor

adrai commented Apr 10, 2018

You can try to override the saveReplayingVms in userland first?

@nanov
Copy link
Contributor

nanov commented Apr 10, 2018

This defiantly sounds like a problem, it could be solved on driver side also ( some kind of batch ops mechanism is better anyway ).

I will inspect it deeper in the coming days as I don't have the time currently, a written test to test against would be in great help.

@fabiomssilva
Copy link
Author

fabiomssilva commented Apr 10, 2018

Was thinking a bit on the problem.

Question: Does it make sense to change it to something like:

...
async.eachLimit(replVmsToDelete, 1, commit, callback);
...
async.eachLimit(replVms, 1, commit, callback);
...

This will make the replay to take a bit longer (3-4 seconds more on my case)

Some benchmarking (with one event at a time, 5 at a time and uncontrolled. ):
1-Processed 21144 total events in 49.041s. Bye bye.
5-Processed 21144 total events in 45.281s. Bye bye.
all same time (with errors) -Processed 21144 total events in 41.759s. Bye bye.

The reasoning behind this is:
Because we send all the network requests all at the same time, network latency might make the events to arrive out of order in to the database servers.If we send one by one it will make sure they arrive in order. Does this make sense ?

--Fabio

@nanov
Copy link
Contributor

nanov commented Apr 10, 2018

I don't think the order is relevant as those are different VMs, this also means that the error occurs when there are a lot of VMs, and not events ( even thought VMs are resulted from events it is important to point it out ).

Btw, you could try playing with the "refresh" option in the elasticsearch6 driver ( ie repository ) options, it is 'true" by default, maybe you can see what happens if it is set to refresh: 'wait_for', or the other way around if it is already true.

Again, best way to solve this would be with batch operations ( not only for ES btw. for all DBs that support such operations ) on the driver side, this way it will be way more effective and safe.

@adrai
Copy link
Contributor

adrai commented Apr 10, 2018

@adrai
Copy link
Contributor

adrai commented Apr 10, 2018

fyi:
I've just extended the viewmodel module to be able to make a bulkCommit: https://github.com/adrai/node-viewmodel#some-implementations-support-bulkcommit
inmemory and mongodb implementations are already done.
Feel free to add the implementation for elasticsearch => for reference you can take a look here: https://github.com/adrai/node-viewmodel/blob/master/lib/databases/inmemory.js

With this change now cqrs-eventdenormalizer is able to call the more performant bulk function: d853967

@nanov
Copy link
Contributor

nanov commented Apr 11, 2018

This is great, exactly what i had in mind!

I am on the ES implementation!

@fabiomssilva
Copy link
Author

Hi,

You need any help ?
I have a consistent way to make it happen. I can give a help in the tests.
Or if you need help on the implementation side I can try to take a look.

-fS

@adrai
Copy link
Contributor

adrai commented Apr 11, 2018

elasticsearch6 implementation included in v1.14.3 thanks to @nanov

@nanov
Copy link
Contributor

nanov commented Apr 11, 2018

Still there is some work to be done in order to assure writes in case of a huge amount of vms and provide more accurate errors.

First step would be to write a test for 1000+ Vms bulkcommit and take it from there.

@fabiomssilva
Copy link
Author

fabiomssilva commented Apr 11, 2018

Hi Gents, :)

I try the change and I got into trouble.

I debugged a bit and it looks like when a bulk operation is made with a empty array of vms it fails in Mongo and in Elasticsearch.
I added a check (see bellow --1--) for empty arrays and it looks like working now. Does my code make sense ?

After doing this all runs but I started getting "ConcurrencyError" in some events. I could use some advice on this one :)

There was also a typo on this function vm -> vms (check --2--)

--1--
async.series([
function (callback) {

    if (self.repository.bulkCommit) {
      if (replVmsToDelete.length > 0) {
        if (replVmsToDelete.length === 0) {
          callback()
          return;
        }
        bulkCommit(replVmsToDelete, callback);
        return;
      }
    }
    async.each(replVmsToDelete, commit, callback);
  },
  function (callback) {
    if (self.repository.bulkCommit) {
      if (replVms.length === 0) {
        callback()
        return;
      }
      bulkCommit(replVms, callback);
      return;
    }
    async.each(replVms, commit, callback);
  }
], function (err) {
  if (err) {
    debug(err);
  }
  self.replayingVms = {};
  self.replayingVmsToDelete = {};
  self.isReplaying = false;
  callback(err);
});

--2--

function bulkCommit (vms, callback) {
  self.repository.bulkCommit(prepareVmsForBulkCommit(vms), function (err) {
    if (err) {
      debug(err);
      debug(vms); // Typo was here. 
    }
    callback(err);
  });
}

@adrai
Copy link
Contributor

adrai commented Apr 11, 2018

try v1.14.4

@fabiomssilva
Copy link
Author

Hi,

Tnks. We are checking :)

So far so good :)

Tnks
--fS

@nanov
Copy link
Contributor

nanov commented Apr 16, 2018

@fabiomssilva, you have mentioned you get some ConcurrencyErrors, is that an issue with the implementation or the errors were right?

I did some testing during the weekend, and there are two things that i want to implement this week in order to make the driver more stable in high concurrency scenarios.

One would be to set a maximum bulk operation size, and when exceeded to split those into chunks.

The second would be to implement some sort of buffering mechanism for the normal event handling, this way the driver will be able to handle a huge amount of concurrent events. They way I thought this could be implemented is by settings a max capacity and a timeout, and then the operations will be executed ( in a bulk manner ), when then capacity is reached OR the timeout is exceeded.

@fabiomssilva
Copy link
Author

Hi,

From what I can understand it is fixed with the latest patch.
I dont see any problems anymore.

Tnks a lot.
--fS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants