Skip to content

Commit

Permalink
support for indirect backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
splitice committed Sep 13, 2023
1 parent 64cf56c commit 9cf830a
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions lib/Af.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,22 @@ class Af extends EventEmitter {
let shouldResend = false
let signalTimeout = Q.defer()
try {
let areqCancelable
let areqCancelable = Q.defer()
areqCancelable._cancels = new Set()
areqCancelable.cancel = function(){
for(const c in this._cancels){
c.cancel()
}
this._cancels = new Set()
}
areqCancelable.promise.catch(() => { })

const startAreq = async () => {
let cnf

do {
/* resend without src rtg */
cnf = await areqCancelable
cnf = await areqCancelable.promise
cnf = cnf[0]
if (cnf.status === 0 || cnf.status === 'SUCCESS') { // success
this.emit('ind:dataConfirm', { dstEp, afParams });
Expand Down Expand Up @@ -300,9 +308,6 @@ class Af extends EventEmitter {

let isResend = false
const afSend = async () => {
areqCancelable = this.waitFor(afEventCnf, areqTimeout)
areqCancelable.catch(() => { })

let rsp
if (!isBroadcast && dstEp.getSrcRtg && !isResend) {
const srcRtg = dstEp.getSrcRtg()
Expand All @@ -312,12 +317,35 @@ class Af extends EventEmitter {
options |= ZSC.AF.options.SKIP_ROUTING
}
const newAfParams = Object.assign(Af.buildAfSrcRtg(srcRtg), afParams, { options })
rsp = await controller.request('AF', 'dataRequestSrcRtg', newAfParams)
do {
let wf = this.waitFor(afEventCnf, areqTimeout)
areqCancelable._cancels.add(wf)
rsp = await controller.request('AF', 'dataRequestSrcRtg', newAfParams)
if(rsp.status == 178) { // ZApsTableFull
wf.cancel()
areqCancelable.delete(wf)
await Q.delay(Math.random() + 0.5 * 16000)
} else {
wf.then(areqCancelable.resolve, areqCancelable.reject)
}
} while(rsp.status === 178) // ZApsTableFull
shouldResend = true
}
}
if (!rsp) {
rsp = await controller.request('AF', 'dataRequest', afParams) // status code handled by catch
do {
let wf = this.waitFor(afEventCnf, areqTimeout)
areqCancelable._cancels.add(wf)
rsp = await controller.request('AF', 'dataRequest', afParams) // status code handled by catch
if(rsp.status == 178) { // ZApsTableFull
wf.cancel()
await Q.delay(Math.random() + 0.5 * 16000)
areqCancelable.delete(wf)
} else {
wf.then(areqCancelable.resolve, areqCancelable.reject)
}
} while(rsp.status === 178) // ZApsTableFull

}
isResend = true
}
Expand Down

0 comments on commit 9cf830a

Please sign in to comment.