Skip to content

Commit

Permalink
Improve timeout logic #2515
Browse files Browse the repository at this point in the history
  • Loading branch information
이병준 committed Aug 20, 2024
1 parent 50140a2 commit abe1600
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,55 @@ import com.skt.nugu.sdk.core.interfaces.message.MessageSender
import com.skt.nugu.sdk.core.interfaces.transport.Transport
import com.skt.nugu.sdk.core.interfaces.message.Status
import com.skt.nugu.sdk.core.interfaces.message.Status.Companion.withDescription
import com.skt.nugu.sdk.core.interfaces.transport.CallOptions
import com.skt.nugu.sdk.core.utils.Logger
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import com.skt.nugu.sdk.core.interfaces.message.Call as MessageCall

internal class Grpc2Call(val transport: Transport?, val request: MessageRequest, val headers: Map<String, String>?, listener: MessageSender.OnSendMessageListener) :
MessageCall {
internal class Grpc2Call(
val transport: Transport?,
val request: MessageRequest,
private val headers: Map<String, String>?,
private val callOptions: CallOptions?,
listener: MessageSender.OnSendMessageListener
) : MessageCall {
private var executed = false
private var canceled = false
private var completed = false
private var callback: MessageSender.Callback? = null
private var eventListener: MessageSender.EventListener? = null
private var sendMessageListener: MessageSender.OnSendMessageListener? = listener
private var noAck = false
private var callTimeoutMillis = 1000 * 10L
private var callTimeoutMillis = callOptions?.callTimeoutMillis ?: (1000 * 10L)
private var invokeStartEvent = true
private var lastRequestTimeMillis = 0L
private var lastResponseTimeMillis = 0L

companion object{
companion object {
private const val TAG = "GrpcCall"
}

override fun request() = request
override fun headers() = headers
override fun enqueue(callback: MessageSender.Callback?, eventListener: MessageSender.EventListener?): Boolean {
override fun enqueue(
callback: MessageSender.Callback?, eventListener: MessageSender.EventListener?
): Boolean {
synchronized(this) {
if (executed) {
callback?.onFailure(request(),Status(
Status.Code.FAILED_PRECONDITION
).withDescription("Already Executed"))
callback?.onFailure(
request(), Status(
Status.Code.FAILED_PRECONDITION
).withDescription("Already Executed")
)
return false
}
if (canceled) {
callback?.onFailure(request(),Status(
Status.Code.CANCELLED
).withDescription("Already canceled"))
callback?.onFailure(
request(), Status(
Status.Code.CANCELLED
).withDescription("Already canceled")
)
return false
}
executed = true
Expand All @@ -69,7 +83,7 @@ internal class Grpc2Call(val transport: Transport?, val request: MessageRequest,
return false
}

if(noAck) {
if (noAck) {
onComplete(Status.OK)
}
return true
Expand Down Expand Up @@ -126,7 +140,7 @@ internal class Grpc2Call(val transport: Transport?, val request: MessageRequest,
onComplete(Status.FAILED_PRECONDITION.withDescription("send() called while not connected"))
}

if(noAck) {
if (noAck) {
onComplete(Status.OK)
}
try {
Expand All @@ -138,7 +152,9 @@ internal class Grpc2Call(val transport: Transport?, val request: MessageRequest,
}

override fun onStart() {
if(invokeStartEvent) {
lastResponseTimeMillis = System.currentTimeMillis()

if (invokeStartEvent) {
invokeStartEvent = false
callback?.onResponseStart(request())
}
Expand Down Expand Up @@ -179,9 +195,20 @@ internal class Grpc2Call(val transport: Transport?, val request: MessageRequest,
callTimeoutMillis = millis
return this
}

override fun callTimeout() = callTimeoutMillis

override fun reschedule() {
// no op
override fun reschedule() = Unit

override fun waitForReady(): Boolean {
return callOptions?.waitForReady ?: true
}

override fun getLastResponseTimeMillis() = lastResponseTimeMillis

override fun getLastRequestTimeMillis() = lastRequestTimeMillis

override fun updateLastRequestTimeMillis() {
lastRequestTimeMillis = System.currentTimeMillis()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ internal class GrpcTransport internal constructor(
messageConsumer = messageConsumer,
transportObserver = deviceGatewayObserver,
authDelegate = authDelegate,
callOptions = callOptions,
channelOptions = channelOptions,
isStartReceiveServerInitiatedDirective = isStartReceiveServerInitiatedDirective
).let {
Expand Down Expand Up @@ -410,7 +409,7 @@ internal class GrpcTransport internal constructor(
request: MessageRequest,
headers: Map<String, String>?,
listener: MessageSender.OnSendMessageListener
) = Grpc2Call(activeTransport, request, headers, listener)
) = Grpc2Call(activeTransport, request, headers, callOptions, listener)

override fun startDirectivesService() {
executor.submit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import com.skt.nugu.sdk.core.interfaces.message.MessageConsumer
import com.skt.nugu.sdk.core.interfaces.message.Call
import com.skt.nugu.sdk.core.interfaces.message.request.AttachmentMessageRequest
import com.skt.nugu.sdk.core.interfaces.message.request.EventMessageRequest
import com.skt.nugu.sdk.core.interfaces.transport.CallOptions
import com.skt.nugu.sdk.core.interfaces.transport.ChannelOptions
import com.skt.nugu.sdk.core.utils.Logger
import devicegateway.grpc.AttachmentMessage
Expand All @@ -54,7 +53,6 @@ internal class DeviceGatewayClient(policy: Policy,
private var messageConsumer: MessageConsumer?,
private var transportObserver: DeviceGatewayTransport.TransportObserver?,
private val authDelegate: AuthDelegate,
private val callOptions: CallOptions?,
private val channelOptions: ChannelOptions?,
private val isStartReceiveServerInitiatedDirective: () -> Boolean)
:
Expand Down Expand Up @@ -374,8 +372,7 @@ internal class DeviceGatewayClient(policy: Policy,
EventsService(
it,
this@DeviceGatewayClient,
scheduler,
callOptions
scheduler
)
}
}
Expand Down
Loading

0 comments on commit abe1600

Please sign in to comment.