-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize Parcel Resumption and Reduce Blocking in ChainPorter #1068
Conversation
This commit moves the WaitGroup.Done() call closer to the corresponding WaitGroup.Add(1) call. The purpose of this change is to group the goroutine management code together, making it easier to read and reducing the risk of forgetting to decrement the WaitGroup counter.
This commit introduces a buffer to the ChainPorter.outboundParcels channel. By adding this buffer, the system can handle new parcels without being blocked by resumed pending parcels, improving overall efficiency and reducing potential delays.
Resume any pending parcels in a new goroutine so that we don't delay returning from the `ChainPorter.Start` method.
@@ -132,7 +132,7 @@ func NewChainPorter(cfg *ChainPorterConfig) *ChainPorter { | |||
) | |||
return &ChainPorter{ | |||
cfg: cfg, | |||
outboundParcels: make(chan Parcel), | |||
outboundParcels: make(chan Parcel, 10), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to leave this as unbuffered? Otherwise we would end up trying to start the state machine with multiple packets at once:
taproot-assets/tapfreighter/chain_porter.go
Line 318 in 1b5a4ef
case outboundParcel := <-p.outboundParcels: |
p.Wg.Add(1) | ||
go func() { | ||
defer p.Wg.Done() | ||
startErr = p.resumePendingParcels() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think resumePendingParcels()
needs to be modified to receive Quit signals?
Like this case:
taproot-assets/tapfreighter/chain_porter.go
Line 330 in 1b5a4ef
case <-p.Quit: |
But here:
taproot-assets/tapfreighter/chain_porter.go
Line 194 in 1b5a4ef
p.outboundParcels <- pendingParcel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it even a problem if we delay startup by waiting for pending parcels? Anything that takes a while (e.g. proof transfer) will be done in a goroutine anyway. So I don't see a pressing reason to do things async here.
Also, if we do this in another goroutine, we don't need the buffered channel as already mentioned by @jharveyb. I think it just makes the behavior less deterministic (e.g. with 9 parcels the goroutine finishes almost immediately but with 11 it blocks until complete)...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the start of each 'subsystem' is sequential and blocking on any one Start()
call would block the rest?
Line 185 in a8d8b5a
if err := s.cfg.ChainPorter.Start(); err != nil { |
Even with proof transfer in a goroutine, if we're resuming more than one parcel I think the transfer process for the first one would block resumption of the second? And these wouldn't happen in parallel.
The concrete case I was thinking of was: "If I have a wide transfer (with many recipients), what happens on restart?"
I think they would attempt to be transferred, sequentially, and any issues with proof upload at that point would block the caretaker startup. Maybe I'm wrong about which errors would cause which functions to block though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already spin up a goroutine for each individual parcel:
taproot-assets/tapfreighter/chain_porter.go
Line 337 in 9ac1baa
go p.advanceState(sendPkg, outboundParcel.kit()) |
So at startup, because the main event loop already is a goroutine and just starts another one for each parcel, we can feed in the parcels to resume synchronously and block on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so we can handle parcels in parallel then?
In that case, IIUC, the only startup delay would be calling advanceState()
for each resumed parcel, which should be fast.
And then we actually don't need to modify the current behavior, and the problem I was thinking of doesn't exist.
I'm not sure if we have an existing test that handles multiple parcels at once, that would be good to have to validate this.
@@ -311,8 +314,6 @@ func (p *ChainPorter) QueryParcels(ctx context.Context, | |||
// requests, and attempt to complete a transfer. A response is sent back to the | |||
// caller if a transfer can be completed. Otherwise, an error is returned. | |||
func (p *ChainPorter) mainEventLoop() { | |||
defer p.Wg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a NOTE
comment to this method that it MUST be run as a goroutine.
Normally, the defer p.Wg.Done()
at the start of a method indicates this to someone reading the code.
p.Wg.Add(1) | ||
go func() { | ||
defer p.Wg.Done() | ||
startErr = p.resumePendingParcels() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it even a problem if we delay startup by waiting for pending parcels? Anything that takes a while (e.g. proof transfer) will be done in a goroutine anyway. So I don't see a pressing reason to do things async here.
Also, if we do this in another goroutine, we don't need the buffered channel as already mentioned by @jharveyb. I think it just makes the behavior less deterministic (e.g. with 9 parcels the goroutine finishes almost immediately but with 11 it blocks until complete)...
Closing, see #1068 (comment) |
It would still be good to have an itest that exercised having multiple transfers running concurrently. A simple way to get that would be just delaying mining a block; so submit a transfer, then a second transfer - both should end up waiting for the confirmation, but proceed as normal after a block is mined. |
@jharveyb good idea! I'll spin that into an issue. |
issue: #1081 |
This PR addresses this concern by @jharveyb : #1055 (comment)
Changes
Improved Goroutine Management: The
WaitGroup.Done()
call has been repositioned to align closely with the correspondingWaitGroup.Add(1)
call, making the code easier to read and reducing the risk of errors.Buffered Channel for Outbound Parcels: A buffer has been added to the
outboundParcels
channel, allowing the system to handle new parcels without being blocked by pending ones, which enhances overall performance.Concurrent Resumption of Pending Parcels: The
ChainPorter.resumePendingParcels
method is now executed in a separate goroutine, ensuring that resuming pending parcels does not delay the startup process.