Skip to content

Commit

Permalink
Fix ServerInitiatedDirective to reuse the old connection
Browse files Browse the repository at this point in the history
  • Loading branch information
이병준 committed Oct 17, 2024
1 parent e057044 commit ec799b8
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,21 @@ internal class GrpcTransport internal constructor(
listener: MessageSender.OnSendMessageListener
) = Grpc2Call(activeTransport, request, headers, callOptions, listener)

override fun startDirectivesService() {
executor.submit {
deviceGatewayClient?.let {
override fun startDirectivesService(): Boolean {
checkAuthorizationIfEmpty {
Logger.w(TAG, "[startDirectivesService] authorization is empty")
} ?: return false

deviceGatewayClient?.let {
executor.submit {
setState(DetailedState.RECONNECTING, ChangedReason.SERVER_ENDPOINT_CHANGED)
it.startDirectivesService()
} ?: Logger.w(TAG, "[startDirectivesService] deviceGatewayClient is not initialized")
}
} ?: kotlin.run {
Logger.w(TAG, "[startDirectivesService] deviceGatewayClient is not initialized")
return false
}
return true
}

override fun stopDirectivesService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ internal class DeviceGatewayClient(policy: Policy,
private val scheduler = Executors.newSingleThreadScheduledExecutor()

private var pendingHeaders: Map<String, String>? = null
private val pendingStopDirectivesService = AtomicBoolean(false)
private val pendingStop = AtomicBoolean(false)
/**
* Set a policy.
* @return the ServerPolicy
Expand Down Expand Up @@ -109,12 +109,13 @@ internal class DeviceGatewayClient(policy: Policy,
return false
}
}
if(!isStartReceiveServerInitiatedDirective()) {
handleOnConnected()
return true
}
buildDirectivesService()
}

if(!isStartReceiveServerInitiatedDirective()) {
handleOnConnected()
return true
}
buildDirectivesService()
return true
}
/**
Expand Down Expand Up @@ -382,40 +383,42 @@ internal class DeviceGatewayClient(policy: Policy,

private fun buildDirectivesService() {
Logger.d(TAG, "[buildDirectivesService] currentChannel=$currentChannel")

pingService?.shutdown()
directivesService?.shutdown()
currentChannel?.apply {
directivesService =
DirectivesService(
streamingCalls,
this,
this@DeviceGatewayClient
)
pingService =
PingService(
this,
healthCheckPolicy,
this@DeviceGatewayClient
)
synchronized(this) {
pingService?.shutdown()
directivesService?.shutdown()
currentChannel?.apply {
directivesService =
DirectivesService(
streamingCalls,
this,
this@DeviceGatewayClient
)
pingService =
PingService(
this,
healthCheckPolicy,
this@DeviceGatewayClient
)
}
}
}

override fun stopDirectivesService() {
val requests = eventsService?.requests() ?: 0
Logger.d(TAG, "[stopDirectivesService] requests=$requests")
if(requests != 0) {
pendingStopDirectivesService.set(true)
Logger.i(TAG, "[stopDirectivesService] pending stop, requests=$requests")
pendingStop.set(true)
} else {
Logger.d(TAG, "[stopDirectivesService]")
processDisconnect()
processConnection()
}
}

override fun onRequestCompleted() {
if( eventsService?.requests() == 0 /* no more requests */ ) {
if (pendingStopDirectivesService.compareAndSet(true, false)) {
Logger.w(TAG, "[onRequestCompleted] A pending StopDirectivesService is performed.")
if (pendingStop.compareAndSet(true, false)) {
Logger.i(TAG, "[onRequestCompleted] A pending stop is performed.")
processDisconnect()
processConnection()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,12 @@ internal class HTTP2Transport(
listener: MessageSender.OnSendMessageListener
) = HTTP2Call(scheduler, activeTransport, request, headers, listener)

override fun startDirectivesService() {
override fun startDirectivesService() : Boolean {
deviceGatewayClient?.let {
setState(DetailedState.RECONNECTING, ChangedReason.SERVER_ENDPOINT_CHANGED)
it.startDirectivesService()
} ?: Logger.w(TAG, "[startDirectivesService] deviceGatewayClient is not initialized")
return true
}

override fun stopDirectivesService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,14 @@ class MessageRouter(
sidController.release()
}
sidController.setOnCompletionListener(onCompletion)
sidController.start()

createActiveTransport()
if(!sidController.start(activeTransport)) {
setConnectionStatus(
ConnectionStatusListener.Status.CONNECTING,
ConnectionStatusListener.ChangedReason.CLIENT_REQUEST
)
createActiveTransport()
}
Logger.i(TAG, "[start] ServerInitiatedDirective start.")
return true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean

class ServerInitiatedDirectiveController(val TAG: String) {
private var isStart = AtomicBoolean(false)
private var initialized = false
private var completionListenerCalled = AtomicBoolean(false)
private var listener: (() -> Unit)? = null

Expand All @@ -37,16 +38,30 @@ class ServerInitiatedDirectiveController(val TAG: String) {

/**
* Start the DirectivesService
* @return An initialized true or false.
*/
fun start() {
fun start(transport: Transport?): Boolean {
var initialized = initialized
if (isStart.get()) {
Logger.w(TAG, "[start] ServerInitiatedDirective is already started.")
return
return initialized
}
isStart.set(true)
completionListenerCalled.set(false)

Logger.i(TAG, "[start] ServerInitiatedDirective started")

if(initialized) {
if(transport?.startDirectivesService() != true) {
initialized = false
Logger.w(
TAG,
"[start] activeTransport is not possible."
)
}
}
return initialized.also {
this.initialized = true
}
}

fun stop(transport: Transport?) {
Expand All @@ -69,6 +84,7 @@ class ServerInitiatedDirectiveController(val TAG: String) {

fun release() {
listener = null
initialized = false
isStart.set(false)
completionListenerCalled.set(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ interface Transport {
/**
The server can send some directives at certain times.
*/
fun startDirectivesService() = Unit
fun startDirectivesService() : Boolean

/**
Stop receiving server-initiated-directive.
Expand Down

0 comments on commit ec799b8

Please sign in to comment.