-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT PR]POC code to batch async shards fetch per node #7269
Conversation
Gradle Check (Jenkins) Run Completed with:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of the flow :
We're making 2 calls:
- first for shards_started : to know on which nodes a particular shard is present
- and second for getting shard metadata : to get the metadata of the shard files
Here we're calling both of the API on a shard level. As we need to do it in Async manner we have fetchers stored in a concurrent map for each shard. One fetcher is responsible for fetching the data for one shard. That's why current maps have key as shardId.
Now we want to move this fetching on node level to give a support of batching. For that you are trying to add a NodeLevelFetcher (AsyncShardsFetchPerNode).
Comments/Questions on strategy:
- We should change the strategy at both of the calls not just one.
- Without changing ReplicaSHardAllocator how are we ensuring that metadata fetch is done on node level ?
- AllocationService is making calls in a for loop for every shard for allocateUnassigned method which calls GatewayAllocator then ultimately PSA's makeAllocationDecision is called. I want to understand how this flow is being per node without changing AllocationService at all ?
Comments on code
- Please do not copy paste whole class even for draft PR, make use of existing code and move it to common place to it's to understand exactly what is the change.
- Make holostic changes where gradle check is not failing, if it's breaking any of the existing changes, please fix those first.
- Please try to convey the impact of the change as in how are we benefitting from reduced number of transport calls with almost similar amount of data ?
- Let's try to not change the order of flow like whatever is happening after allocation let it happen in that order only and same for before allocation.
* Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from | ||
* it nodes that are no longer part of the state. | ||
*/ | ||
private void fillShardCacheWithDataNodes(Map<String, AsyncShardsFetchPerNode.NodeEntry<T>> shardCache, DiscoveryNodes nodes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cache is still on node level ?
I'm assuming that the cache object is for every NodeLevelFetcher, so it should contain data for every shard. In that way, shardId should be the new key ..
How can we're managing the map structure same when the whole class is new of a node level fetching ?
Thanks @Gaurav614 for taking up this change. Comments on strategy and overall code changes (Not reviewing the code, just reviewing the strategy):
|
* An action that lists the relevant shard data that needs to be fetched. | ||
*/ | ||
public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> { | ||
void list(DiscoveryNode[] nodes, Map<ShardId,String> shardsIdMap, ActionListener<NodesResponse> listener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldnt this be single node and List of shards/ Map of shards
* A node entry, holding the state of the fetched data for a specific shard | ||
* for a giving node. | ||
*/ | ||
static class NodeEntry<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldnt this be a ShardEntry class now as top level map is keyed with NodeId
} | ||
|
||
@Override | ||
public void beforeAllocation(final RoutingAllocation allocation) { | ||
assert primaryShardAllocator != null; | ||
assert replicaShardAllocator != null; | ||
ensureAsyncFetchStorePrimaryRecency(allocation); | ||
|
||
//build the view of shards per node here by doing transport calls on nodes and populate shardsPerNode | ||
collectShardsPerNode(allocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beforeAllocation should be lightweight. This should happen during allocation logic execution.
@shwetathareja @amkhar Thanks a ton for taking time in reviewing and providing valuable insights on it.
@amkhar We cant do this, One calls is used for primary Shards allocation and other one in RSA. Currently the algo first assigns primary and then replicas
@amkhar The code changes are not for RSA right now. The focus is on PSA. We will add a new transport Action like we added for the data that PSA needs.
So the idea is currently AllocationService once calls beforeAllocation() here I am using this to create the view(the data PSA needs) here This way when AllocationService calls GatewayAllocator for assigning the shards, we already have the data collected(if not, then we just bail out that allocation) and when PSA calls fetchData to makeAllocation Decision we return the data from our view of data by adapting to what PSA currently understands here And as @shwetathareja visual diagrams needs to be added for better understanding. I am working on it and will soon add.
Do you have any idea how to convey that? General observation have been in large clusters with high number of shards Transport gets choked due to the async Fetches. With this change there will be less stress on transport.
There's nothing changing afterAllocation, we are using it to clear our caches and DS like existing code is also doing. In before allocation we are adding a code to populate the view in the GatewayAllocator. Since AllocationService calls GatewayAllocator for all unassigned shards in loop, we dont want to populate the view(collectShardsPerNode) for every call. That's why its better to keep the view even beforeAllocationStarts(@shwetathareja your answer to inline comment for it). |
@shwetathareja Tried to capture the details in the flow diagram, Updated the description |
Gradle Check (Jenkins) Run Completed with:
|
This PR is stalled because it has been open for 30 days with no activity. Remove stalled label or comment or this will be closed in 7 days. |
This PR was closed because it has been stalled for 7 days with no activity. |
Description
This is the the draft PR to discuss on the idea of batching the
AsyncShardFetch
used inGatewayAllocator
.In the current flow of code
PrimaryShardAllocator/ReplicaShardAllocator
(PSA/RSA) callsGatewayAllocator
to do Transport calls per shard to N data nodes for async fetching the metadata of shards from all nodes. This information is then used by PSA/RSA to make allocation Decision. CurrentlyGatewayAllocator
maintains two such mapsasyncFetchStarted
andasyncFetchStore
to store async dataSo we make M(number of unassigned shards)*N(number of data nodes) transport calls for assigning Unassigned Shards.
The current POC is done to batch the async shards Transport call per node and maintain that view of shards metadata from all nodes in GatewayAllocator.
The changes tries to replace one current map
asyncFetchStarted
whose data(TransportNodesListGatewayStartedShards.NodeGatewayStartedShards
) is used in PSA primarily, withshardsPerNode
(naming can be done better here though) to maintain the same set of data in list(TransportNodesCollectGatewayStartedShard.ListOfNodeGatewayStartedShards
) per node.Please note the changes are done in crude way(just to convey idea) with reusing of code from current classes and are draft changes
Flow diagrams:
More simpler view:
Whats changing:
Current Transport Calls
Optimized Transport Calls
List of changes:
GatewayAllocator
shardsPerNode
to replaceasyncFetchStarted
. The map will contain the current view of fetched data of unassigned shards from all nodes. This will then be used to send the data after adaptation to PSAAsyncShardFetch
beforeAllocation()
addedcollectShardsPerNode(allocation)
as a responsibility. beforeAllocation() is called by AllocationService before starting the allocation. This will help in populating theshardsPerNode
and when the allocation decision needs to be taken in allocateUnassigned, the data can be used readily by PSA.collectShardsPerNode()
. Task to build the view of shards MD from all nodesTestAsyncShardFetch
. Inner class analogousInternalAsyncFetch
TestInternalPrimaryShardAllocator
. Inner class analogousInternalPrimaryShardAllocator
. ThefetchData()
here do not actually does the heavylifting of fetching, rather adapts the current data in shardsPerNode to theAsyncShardFetch.FetchResult
that PSA understands. This currently helps to minimize the scope of changes in POC. We can later accommodate to revamp PSA/RSA to make allocationDecision based on the batched data.Added New Transport action
TransportNodesCollectGatewayStartedShard
, analogousTransportNodesListGatewayStartedShards
. This is added for just for simplicity of code and we can think of merging it to a single transport actionAdded a new
AsyncShardsFetchPerNode
class to keep do a track async fetch of shards(shardsToCustomDataPathMap) from all nodes. It is analogous to AsyncShardFetch class which does per the same thing per shard.We can also later remove that class or merge this class into AsyncShardFetch depending on the POC approvalTesting
Did a happy test manually by spinning two data nodes and a single master with 16 shards. Restarted the data nodes, shards turned from Unassigned to Assigned .
Also tested on number of transport calls made by adding a log line in
nodeOperation
, can verify that a single transport call per node is made(if unassigned shards set is not updated). So thats reduction of transport calls from M*N to N.Issues Resolved
#5098
This might be extension of the above issue since it might not solve completely the memory issue, but solves the transport calls optimization part.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.