Skip to content

Commit

Permalink
Add async gracefulShutdown (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
gjcairo authored Sep 13, 2023
1 parent b69c630 commit 45a31b6
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 19 deletions.
18 changes: 5 additions & 13 deletions Sources/ServiceLifecycle/AsyncGracefulShutdownSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
/// An async sequence that emits an element once graceful shutdown has been triggered.
///
/// This sequence is a broadcast async sequence and will only produce one value and then finish.
///
/// - Note: This sequence respects cancellation and thus is `throwing`.
@usableFromInline
struct AsyncGracefulShutdownSequence: AsyncSequence, Sendable {
@usableFromInline
Expand All @@ -34,19 +36,9 @@ struct AsyncGracefulShutdownSequence: AsyncSequence, Sendable {
init() {}

@inlinable
func next() async -> Element? {
var cont: AsyncStream<Void>.Continuation!
let stream = AsyncStream<Void> { cont = $0 }
let continuation = cont!

return await withTaskGroup(of: Void.self) { _ in
await withGracefulShutdownHandler {
await stream.first { _ in true }
} onGracefulShutdown: {
continuation.yield(())
continuation.finish()
}
}
func next() async throws -> Element? {
try await CancellationWaiter().wait()
return ()
}
}
}
50 changes: 50 additions & 0 deletions Sources/ServiceLifecycle/CancellationWaiter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftServiceLifecycle open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftServiceLifecycle project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftServiceLifecycle project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// An actor that provides a function to wait on cancellation/graceful shutdown.
@usableFromInline
actor CancellationWaiter {
private var taskContinuation: CheckedContinuation<Void, Error>?

@usableFromInline
init() {}

@usableFromInline
func wait() async throws {
try await withTaskCancellationHandler {
try await withGracefulShutdownHandler {
try await withCheckedThrowingContinuation { continuation in
self.taskContinuation = continuation
}
} onGracefulShutdown: {
Task {
await self.finish()
}
}
} onCancel: {
Task {
await self.finish(throwing: CancellationError())
}
}
}

private func finish(throwing error: Error? = nil) {
if let error {
self.taskContinuation?.resume(throwing: error)
} else {
self.taskContinuation?.resume()
}
self.taskContinuation = nil
}
}
20 changes: 19 additions & 1 deletion Sources/ServiceLifecycle/GracefulShutdown.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ public func withGracefulShutdownHandler<T>(
return try await operation()
}

/// Waits until graceful shutdown is triggered.
///
/// This method suspends the caller until graceful shutdown is triggered. If the calling task is cancelled before
/// graceful shutdown is triggered then this method will throw a `CancellationError`.
///
/// - Throws: `CancellationError` if the task is cancelled.
public func gracefulShutdown() async throws {
try await AsyncGracefulShutdownSequence().first { _ in true }
}

/// This is just a helper type for the result of our task group.
enum ValueOrGracefulShutdown<T: Sendable>: Sendable {
case value(T)
Expand All @@ -72,7 +82,7 @@ public func cancelOnGracefulShutdown<T: Sendable>(_ operation: @Sendable @escapi
}

group.addTask {
for await _ in AsyncGracefulShutdownSequence() {
for try await _ in AsyncGracefulShutdownSequence() {
return .gracefulShutdown
}

Expand Down Expand Up @@ -138,6 +148,8 @@ public final class GracefulShutdownManager: @unchecked Sendable {
fileprivate var handlerCounter: UInt64 = 0
/// A boolean indicating if we have been shutdown already.
fileprivate var isShuttingDown = false
/// Continuations to resume after all of the handlers have been executed.
fileprivate var gracefulShutdownFinishedContinuations = [CheckedContinuation<Void, Never>]()
}

private let state = LockedValueBox(State())
Expand Down Expand Up @@ -191,6 +203,12 @@ public final class GracefulShutdownManager: @unchecked Sendable {
}

state.handlers.removeAll()

for continuation in state.gracefulShutdownFinishedContinuations {
continuation.resume()
}

state.gracefulShutdownFinishedContinuations.removeAll()
}
}
}
10 changes: 5 additions & 5 deletions Sources/ServiceLifecycle/ServiceGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public actor ServiceGroup: Sendable {

// Using a result here since we want a task group that has non-throwing child tasks
// but the body itself is throwing
let result = await withTaskGroup(of: ChildTaskResult.self, returning: Result<Void, Error>.self) { group in
let result = try await withThrowingTaskGroup(of: ChildTaskResult.self, returning: Result<Void, Error>.self) { group in
// First we have to register our signals.
let gracefulShutdownSignals = await UnixSignalsSequence(trapping: self.gracefulShutdownSignals)
let cancellationSignals = await UnixSignalsSequence(trapping: self.cancellationSignals)
Expand Down Expand Up @@ -228,7 +228,7 @@ public actor ServiceGroup: Sendable {
// This is an optional task that listens to graceful shutdowns from the parent task
if let _ = TaskLocals.gracefulShutdownManager {
group.addTask {
for await _ in AsyncGracefulShutdownSequence() {
for try await _ in AsyncGracefulShutdownSequence() {
return .gracefulShutdownCaught
}

Expand Down Expand Up @@ -276,7 +276,7 @@ public actor ServiceGroup: Sendable {
// We are going to wait for any of the services to finish or
// the signal sequence to throw an error.
while !group.isEmpty {
let result: ChildTaskResult? = await group.next()
let result: ChildTaskResult? = try await group.next()

switch result {
case .serviceFinished(let service, let index):
Expand Down Expand Up @@ -452,7 +452,7 @@ public actor ServiceGroup: Sendable {

private func shutdownGracefully(
services: [ServiceGroupConfiguration.ServiceConfiguration?],
group: inout TaskGroup<ChildTaskResult>,
group: inout ThrowingTaskGroup<ChildTaskResult, Error>,
gracefulShutdownManagers: [GracefulShutdownManager]
) async throws {
guard case .running = self.state else {
Expand Down Expand Up @@ -481,7 +481,7 @@ public actor ServiceGroup: Sendable {

gracefulShutdownManager.shutdownGracefully()

let result = await group.next()
let result = try await group.next()

switch result {
case .serviceFinished(let service, let index):
Expand Down
50 changes: 50 additions & 0 deletions Tests/ServiceLifecycleTests/GracefulShutdownTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,54 @@ final class GracefulShutdownTests: XCTestCase {
XCTAssertTrue(Task.isShuttingDownGracefully)
}
}

func testWaitForGracefulShutdown() async throws {
try await testGracefulShutdown { gracefulShutdownTestTrigger in
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await Task.sleep(for: .milliseconds(10))

This comment has been minimized.

Copy link
@vlm

vlm Sep 28, 2023

Would it work the same if it was Task.yield() instead?

This comment has been minimized.

Copy link
@FranzBusch

FranzBusch Sep 28, 2023

Contributor

Yeah it will work the same. The test here is just trying to trigger the graceful shutdown after the await gracefulShutdown line has been hit

gracefulShutdownTestTrigger.triggerGracefulShutdown()
}

try await withGracefulShutdownHandler {
try await gracefulShutdown()
} onGracefulShutdown: {
// No-op
}

try await group.waitForAll()
}
}
}

func testWaitForGracefulShutdown_WhenAlreadyShutdown() async throws {
try await testGracefulShutdown { gracefulShutdownTestTrigger in
gracefulShutdownTestTrigger.triggerGracefulShutdown()

try await withGracefulShutdownHandler {
try await Task.sleep(for: .milliseconds(10))
try await gracefulShutdown()
} onGracefulShutdown: {
// No-op
}
}
}

func testWaitForGracefulShutdown_Cancellation() async throws {
do {
try await testGracefulShutdown { _ in
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await gracefulShutdown()
}

group.cancelAll()
try await group.waitForAll()
}
}
XCTFail("Expected CancellationError to be thrown")
} catch {
XCTAssertTrue(error is CancellationError)
}
}
}

0 comments on commit 45a31b6

Please sign in to comment.