Sometimes you really need to be able to abort something that doesn't have abort semantics - in my case, attempts to read files from iCloud containers mid-download on iOS, which can hang until process exit if you initiate them at exactly the wrong time.
So, POSIX's pthread_cancel() to the rescue, with a bunch of attendant boilerplate.
Your project's settings will need some adjustment - allow non-modular includes, defines-module set to YES, a project header file with the required boilerplate, and
In my case, this all lives in a framework project called AppUtils - if it is in the app project, proper, you can use an Objective C bridging header in place of the module map.
Apple has good concurrency utilities - they're kind of flavor-of-the-month in that department. Where they solve the problem, there is no reason to use this. But sometimes you just want to start a thread and have complete control over it's lifecycle and the ability to kill it with impunity and have it actually die.
For completeness, including this since it's not exactly obvious what needs to be in it - stuff that would be generated by the build if we didn't need to spell it out to make this work:
#ifndef AppUtils_h
#define AppUtils_h
#import <Foundation/Foundation.h>
//! Project version number for Player.
FOUNDATION_EXPORT double AppUtilsVersionNumber;
//! Project version string for Player.
FOUNDATION_EXPORT const unsigned char AppUtilsVersionString[];
#endif /* AppUtils_h */More boilerplate required by xcodebuild, to get the C code actually callable from Swift:
framework module AppUtilsFramework {
umbrella header "AppUtils.h"
header "PThreadCleanup.h"
export *
}Header file for our wrapper for pthread_cleanup_push() and pthread_cleanup_pop()
#ifndef PThreadCleanup_h
#define PThreadCleanup_h
#include <pthread.h>
/**
Wraps a call to the function pointer `inner` in `pthread_cleanup_push()` and `pthread_cleanup_pop()`,
which are C macros that open a block with a dangling opening brace, and close the block in the
pop call, so they simply cannot be used usefully independently.
*/
void swift_pthread_cleanup_pushpop(void (*routine)(void *), void *arg, void (*inner)(void *));
#endif /* PThreadCleanup_h */pthread_cleanup_push() is a C function-like macro, which opens a trailing { that all of the
code before a corresponding pthread_cleanup_pop() supplies the } - bewildering until you figure
out that no, you didn't leave out a } in your source. So there is literally no way to use these
directly from Swift.
Instead we need a function that takes a pointer to the code to run, pushes the pointer to the cleanup function, runs the code, and then pops it:
#include "PThreadCleanup.h"
void swift_pthread_cleanup_pushpop(void (*routine)(void *), void *arg, void (*inner)(void *)) {
pthread_cleanup_push(routine, arg);
inner(arg);
pthread_cleanup_pop(1); // If we pass 0, the callback is never called on normal exit
}Now the fun begins. This class is a wrapper for a posix thread, which can be run once, have its state monitored, and be killed (in posix terms, cancelled - cleanup is possible), and can have a cleanup function attached to it which will run whether the thread exits normally or is killed, and will be told in what manner the thread terminated:
import Foundation
import OSLog
import Synchronization
/// Because sometimes you actually need cancellation semantics.
public final class IOThread : @unchecked Sendable, CustomStringConvertible {
private let body : @Sendable (IOThreadControl) -> Void
public let name : String
private let _state = AtomicThreadState()
private let _startedAt : Atomic<UInt64> = Atomic(0)
private let stackSize : Int32
private let expectJoin : Bool
private let schedulePolicy : IOThreadSchedulePolicy
fileprivate let cancellationType : IOThreadCancellationType
/// The thread that will be allocated in `start()`
private var thread : pthread_t? = nil
/// An optional callback to invoke when the thread exits.
private let _onExit : AnyAtomic<(@Sendable (ExitKind) -> Void)?> = .init(value: nil)
/// Gives a reasonable description for logging purposes.
public var description : String {
"IOThread('\(name)' \(state) \(schedulePolicy) \(cancellationType) stack \(stackSize) \(expectJoin ? "expects-join" : "does-not-expect-join"))"
}
/// Get the current state of this thread
public var state : IOThreadState {
_state.state
}
/// Get the length of time since the thread was launched, or 0 if it has not been.
public var age : TimeInterval {
let launchedAt = _startedAt.load(ordering: .acquiring)
if launchedAt == 0 {
return 0
}
let durationNanos = DispatchTime.now().uptimeNanoseconds - launchedAt
return TimeInterval(durationNanos) / 1e9
}
/// Create an IOThread. Unless passed, defaults to the minimum stack size on iOS of 16k. Pass `expectJoin` if you intend to
/// join the thread - in that case some thread *must* join the thread for the kernel to deallocate it cleanly. The default for that
/// is false. The default schedule policy is FIFO - run `man pthread_attr_setschedpolicy` for the meaning of these options.
///
/// Note the default `cancellationType` is `IOThreadCancellationType.Asynchronous`, because the goal of this class is aborting
/// I/O calls that iOS will leave hung until process exit. That is dangerous in terms of resource cleanup; if you are using
/// `IOThread` for some less onerous purpose, consider `Deferred`.
///
public init(name : String = "io", stackSize : Int32 = 16384, expectJoin : Bool = false, schedulePolicy : IOThreadSchedulePolicy = .FirstInFirstOut, cancellationType : IOThreadCancellationType = .Asynchronous, body : @escaping @Sendable (IOThreadControl) -> Void) {
self.name = name
self.body = body
self.stackSize = stackSize
self.expectJoin = expectJoin
self.schedulePolicy = schedulePolicy
self.cancellationType = cancellationType
}
@discardableResult public func start() -> IOThreadStartResult {
if _state.exchange(expected: .unused, to: .started) {
var threadAttrs : _opaque_pthread_attr_t = .init()
defer {
// This struct is copied on thread creation and can be discarded
// once the thread is launched.
pthread_attr_destroy(&threadAttrs)
}
// Configure with OS defaults
pthread_attr_init(&threadAttrs)
// iOS min stack size is 16k, default is 512k (actually allocated on demand).
pthread_attr_setstacksize(&threadAttrs, Int(stackSize))
// Unless we're expecting `join()` to be called, we want the state `PTHREAD_CREATE_DETACHED`
// so the OS will deallocate the thread structure for us on exit.
if !expectJoin {
pthread_attr_setdetachstate(&threadAttrs, PTHREAD_CREATE_DETACHED)
}
// Not sure what else we might want here
pthread_attr_setschedpolicy(&threadAttrs, schedulePolicy.posixConstant)
pthread_attr_setinheritsched(&threadAttrs, PTHREAD_EXPLICIT_SCHED)
// We need a pointer to this instance to do the dance through the POSIX API calls
// and have them know how to invoke the closure we were constructed with and
// the cleanup function.
let selfPointer = UnsafeMutablePointer<IOThread>.allocate(capacity: 1)
selfPointer.pointee = self
let threadPointer : UnsafeMutablePointer<pthread_t?> = UnsafeMutablePointer<pthread_t?>.allocate(capacity: 1)
threadPointer.pointee = self.thread
// Launch the thread
let startResult = pthread_create(threadPointer, nil, onPThread, selfPointer)
// Store the created thread in the ivar
self.thread = threadPointer.pointee
switch startResult {
case 0 :
// Ensure the pthread doesn't leak if not joined - without this, the thread will remain
// allocated until another thread joins it.
if let thread = threadPointer.pointee, !expectJoin {
// This may not be needed now that we are setting PTHREAD_CREATE_DETACHED above...
pthread_detach(thread)
}
return .Success
case EINVAL:
return .InvalidAttribute
case EAGAIN:
return .InsufficientResources
case EPERM:
return .LackingPermissions
default:
// This would mean violating the POSIX standard and really is catastrophic.
fatalError("pthread_create returned a nonsensical error code \(startResult)")
}
}
return .AlreadyStarted
}
/// Join the thread. Returns false if the thread is not started or has exited or not created with `expectJoin = true`,
/// or true if `pthread_join` returns `0`.
public func join() -> Bool {
if !expectJoin {
#if DEBUG
fatalError("Attempt to join IOThread that was not created with join support will always fail.")
#else
Logger.iothread.fault("Attempt to join IOThread that was not created with join support will always fail.")
return false
#endif
}
// Read this once - it's an atomic, so it could change while the if clause below is
// running if we referenced the `var`.
let st = state
if st.contains(.entered) && !st.contains(.finished) && !st.contains(.cancelled){
if let t = thread {
var inner = t.pointee
return pthread_join(&inner, nil) == 0
}
}
return false
}
/// Cancel the running thread, executing any handler passed to `onExit`. Returns true if the thread
/// existed and was actually killed.
public func kill() -> Bool {
// Atomically update the state and only attempt to cancel the thread if the previous value
// matches `[.started, .entered]`.
if _state.exchange(expected : [.started, IOThreadState.entered],
to: [IOThreadState.started, IOThreadState.entered, IOThreadState.cancelled ]) {
Logger.iothread.debug("IO: Try to kill for \(self.state, privacy: .public)")
if let t = thread {
Logger.iothread.trace("IO: Have a thread to kill \(String(describing: t), privacy: .public)")
let killResult = pthread_cancel(&self.thread!.pointee)
Logger.iothread.info("IO: Kill result \(killResult)")
return killResult == 0
} else {
Logger.iothread.warning("IO: No thread to kill: \(self.state)")
return false
}
}
Logger.iothread.trace("IO: Kill fails for state \(self.state, privacy: .public)")
return false
}
/// Add a function to call when the thread finishes or otherwise terminates.
/// This function is invoked via `pthread_cleanup_push()` and will *always* be
/// invoked, including if the thread is killed either internally via timeout
/// or explicitly.
///
/// Does nothing if the thread has already terminated, and logs a warning if called
/// after the thread has started, since that is race-prone.
///
/// Returns self for method-chaining, and should be called before the thread has been
/// started if it is needed.
///
/// This method may be called multiple times and each handler will be invoked in the
/// order they were added on exit.
///
@discardableResult public func onExit(_ f : @escaping @Sendable (ExitKind) -> Void) -> Self {
let st = state
if st.contains(.finished) || st.contains(.exited) {
#if DEBUG
fatalError("Adding an onExit handler after the thread has completed")
#else
Logger.iothread.warning("Adding an onExit handler after the thread has completed or been killed ignored")
return self
#endif
}
_onExit.updating { maybeClosure in
// If we have an existing closure, keep it and call both
if let old = maybeClosure {
maybeClosure = { (kind : ExitKind) in
old(kind)
f(kind)
}
} else {
maybeClosure = f
}
}
if st.contains(.started) {
Logger.iothread.warning(
"Adding an onExit handler after the thread has been started is subject to race conditions and is a bug. The handler may never be called.")
}
return self
}
/// Take any and all exit callbacks added with a call to `onExit`, returning it.
public func takeOnExitCallback() -> (@Sendable (ExitKind) -> Void)? {
_onExit.take()
}
/// Run the cleanup callback if one was provided
fileprivate func cleanup() {
if let f = takeOnExitCallback() { // clears the stored callback so it cannot possibly be called twice
Logger.iothread.trace("IO: Invoke IOThread Cleanup handler on \(Thread.current)")
let arg = ExitKind.from(self.state)
f(arg)
}
}
fileprivate func runBody() {
// If `kill()` has been called, but we get here before the thread scheduler has gotten
// around to aborting the thread, don't
if state.contains(.cancelled) {
return
}
defer {
// Here for correctness, and setting the value as soon as possible,
// but this defer block will not run if the thread is cancelled.
_state.insert(.exited)
}
// Be polite and set the thread name.
name.withCString { ptr in
let _ = pthread_setname_np(ptr)
}
// Store the current timestamp
_startedAt.store(DispatchTime.now().uptimeNanoseconds, ordering: .sequentiallyConsistent)
// Set the entered flag (only after setting the start time)
_state.insert(.entered)
body(IOThreadControl(state: _state))
// `.finished` *could* coexist with `.cancelled`, since we live in a concurrent universe
_state.insert(.finished)
// Indicate we can give up the CPU here, before exit handlers are called (they will
// be called on this thread).
pthread_yield_np()
// Note that we DO NOT call the exit handler here, as the thread could be killed while
// it is running. Instead, `swift_pthread_cleanup_pushpop` guarantees that it will be
// run on exit OR kill.
}
}
// Bare functions for cases where posix calls require a function pointer:
@_cdecl("cleanupHandler")
private func cleanupHandler(_ arg: UnsafeMutableRawPointer?) {
print("IOT: Cleanup handler on \(Thread.current)")
if let pt = arg {
let owner = pt.load(as: IOThread.self)
owner.cleanup()
}
}
@_cdecl("runThreadBody")
private func runThreadBody(pointer: UnsafeMutableRawPointer?) {
if let ptr = pointer {
let owner = ptr.load(as: IOThread.self)
owner.runBody()
}
}
@_cdecl("onPThread")
private func onPThread(pointer: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? {
// Enable cancellation - it is disabled by default
let _ = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nil)
let owner = pointer.load(as: IOThread.self)
let _ = pthread_setcanceltype(owner.cancellationType.posixConstant, nil)
// Do a cancel point before entry
let _ = pthread_testcancel()
// Pushes the cleanup handler function to run on exit, and the thread
// body to run in between push and pop
swift_pthread_cleanup_pushpop(cleanupHandler, pointer, runThreadBody);
return nil
}
/// POSIX scheduler policies. Run `man pthread_attr_setschedpolicy` in a terminal for description.
public enum IOThreadSchedulePolicy : Sendable, Equatable, Hashable, CustomStringConvertible {
case FirstInFirstOut
case RoundRobin
case Other
public var description : String {
switch self {
case .FirstInFirstOut: "First In First Out"
case .RoundRobin: "Round Robin"
case .Other: "Posix Other"
}
}
public var posixConstant : Int32 {
switch self {
case .FirstInFirstOut: SCHED_FIFO
case .RoundRobin: SCHED_RR
case .Other: SCHED_OTHER
}
}
}
/// The thread cancellation type - see `man pthread_setcanceltype` for a not great description
/// of it. Basically, `Deferred` waits until the running code calls `pthread_testcancel` to
/// say it is at a good place to be cancelled, and `Asynchronous` does not and is really f'ing
/// dangerous (and, owing to the reason `IOThread` was created, its default :-) ).
public enum IOThreadCancellationType : Sendable, Equatable, Hashable, CustomStringConvertible {
/// Kill the thread when it politely says it's okay to
case Deferred
/// Kill the thread any old time and pray it cleans up locks
case Asynchronous
public var description : String {
switch self {
case .Deferred: "Deferred"
case .Asynchronous: "Asynchronous"
}
}
/// The integer value from the POSIX standard
public var posixConstant : Int32 {
switch self {
case .Deferred:
PTHREAD_CANCEL_DEFERRED
case .Asynchronous:
PTHREAD_CANCEL_ASYNCHRONOUS
}
}
}
/// Value passed to any callback sent to `IOThread.onExit` to indicate whether the
/// thread exited because thread body completed, or because the thread was killed.
public enum ExitKind : Sendable, Equatable, Hashable, CustomStringConvertible {
/// The thread body ran to completion
case Finished
/// The thread was cancelled at some point before it would have exited normally
case Killed
public var wasKilled : Bool {
switch self {
case .Killed: true
default: false
}
}
public var description: String {
switch self {
case .Finished:
return "Finished"
case .Killed:
return "Killed"
}
}
static func from(_ state : IOThreadState) -> Self {
// There can be both, if the thread body completed, and then the thread
// scheduler suspended us after `.finished` was added to the state but
// before the call ended.
//
// If `.finished` is present, then the work the
// thread did ran to completion regardless of whether it was subsequently
// killed - the work running always has a happened-before relationship to
// `.finished` being set, so the fact that the thread was subsequently
// cancelled is a distinction without a difference.
if !state.contains(.finished) && state.contains(.cancelled) {
return .Killed
}
return .Finished
}
}
/// Allows some measure of control over the cancellability of an IOThread.
public struct IOThreadControl : Sendable {
fileprivate let state : AtomicThreadState
/// Tests whether cancel has been called on the owning IOThread, whether or not that
/// state has been propagated to the actual thread.
public var wasCancelled : Bool {
state.state.contains(.cancelled)
}
/// Run a closure with `PTHREAD_CANCEL_DISABLE` set on the cancel state - critical section.
public func uncancellably(_ f : () throws -> Void) rethrows {
var oldState : Int32 = 0
let setStateResult = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState)
defer {
if setStateResult == 0 {
var ignoredUpdatedState : Int32 = 0
pthread_setcancelstate(oldState, &ignoredUpdatedState)
}
}
try f()
}
}
public enum IOThreadStartResult : Int32, CaseIterable, Sendable, CustomStringConvertible {
/// The call to start succeeded
case Success = 0
/// Another call to start() already started the thread
case AlreadyStarted = 2 // Our own
/// The POSIX `EPERM` message was returned by the attempt to start the thread (permissions)
case LackingPermissions = 1 // EPERM
/// The POSIX `EAGAIN` message was returned by the attempt to start the thread (insufficient resources)
case InsufficientResources = 35 // EAGAIN
/// The POSIX `EINVAL` message was returned by the attempt to start the thread (invalid attribute)
case InvalidAttribute = 22 // EINVAL
public var description : String {
switch self {
case .AlreadyStarted: "Already started"
case .InsufficientResources: "Insufficient resources"
case .InvalidAttribute: "Invalid attribute"
case .LackingPermissions: "Lacking permissions"
case .Success: "Success"
}
}
public var isStarted : Bool {
switch self {
case .Success, .AlreadyStarted: true
default: false
}
}
}
/// Holds a thread-safe, atomic representation of an `IOThreadState` and facilitates
/// atomic operations on it.
fileprivate final class AtomicThreadState : Sendable {
private let _state : Atomic<UInt8> = Atomic(0)
var state : IOThreadState {
get {
IOThreadState(rawValue: _state.load(ordering: .sequentiallyConsistent)).assertSane()
} set(val) {
_state.store(val.rawValue, ordering: .sequentiallyConsistent)
}
}
@discardableResult func insert(_ val : IOThreadState) -> Bool {
let bitsToAdd = val.rawValue
let oldValue = _state.bitwiseOr(bitsToAdd, ordering: .sequentiallyConsistent).oldValue
return oldValue & bitsToAdd != bitsToAdd
}
func exchange(expected : IOThreadState, to : IOThreadState) -> Bool {
if expected == to {
return false
}
return _state.compareExchange(expected: expected.rawValue, desired: to.rawValue, ordering: .sequentiallyConsistent).exchanged
}
}
/// Posible states of an `IOThread`.
public struct IOThreadState : OptionSet, Sendable, Equatable, Hashable, CustomStringConvertible {
/// Post construction state, before any call to `start()`
public static let unused : IOThreadState = .init(rawValue: 0)
/// `start()` has been called and at least appears to have started a thread, whether or not
/// the thread scheduler has scheduled it.
public static let started : IOThreadState = .init(rawValue: 1 << 0)
/// The run function supplied for the thread has been entered.
public static let entered : IOThreadState = .init(rawValue: 1 << 1)
/// The run function supplied for the thread has been run to completion.
public static let finished : IOThreadState = .init(rawValue: 1 << 2)
/// The thread was cancelled after it was launched.
public static let cancelled : IOThreadState = .init(rawValue: 1 << 3)
/// The internal run function for the thread reached its terminus - the
/// `defer` block which runs after the thread body and any yeilds or cancellation
/// points was reached.
public static let exited : IOThreadState = .init(rawValue: 1 << 4)
#if DEBUG
@inline(__always) public func assertSane() -> Self {
if self.rawValue > 1 {
// If the state contains .entered, .finished or .cancelled, it MUST also
// contain .started to have reached that state.
assert(self.rawValue & 1 != 0, "Value of state is not sane: \(description)")
}
return self
}
#else
@inline(__always) public func assertSane() -> Self { self }
#endif
/// Determine if this state is empty and a thread may be started.
public var canStart : Bool {
self == .unused
}
/// Determine if the state is such that a call to kill the thread is
/// likely to encounter it still running.
public var canKill : Bool {
!self.contains(.cancelled) && !self.contains(.exited)
}
public var description : String {
var result = ""
if self.contains(.started) {
result += "started "
}
if self.contains(.entered) {
result += "entered "
}
if self.contains(.finished) {
result += "finished "
}
if self.contains(.cancelled) {
result += "cancelled"
}
if result.isEmpty {
result = "unused"
}
return result
}
public var rawValue : UInt8
public init(rawValue : UInt8) {
self.rawValue = rawValue
}
}
fileprivate extension Logger {
private static let subsystem = Bundle.main.bundleIdentifier!
static let iothread = Logger(subsystem: subsystem, category: "io-thread")
}Including this because IOThread uses it to hold a reference to the exit callback function (there's
probably a way to do it with UnsafeMutablePointer but it's not clear what assumptions about the caller
would be necessary to guarantee the pointer is still live when it is needed).
Swift's Atomics package is a bit anemic, and the newer built-in Synchronization is even more anemic, in terms of being able to create atomic references to arbitrary types.
So this is a utility class which wraps its contents in a reference type, and makes the user
of it responsible for the wrapped type implementing Sendable sanely, using the older Atomics package.
import Atomics
/// This is ugly, but lets us store *anything* in an atomic.
public struct AnyAtomic<T> : Sendable {
private let atomic : ManagedAtomic<ValueHolder<T>>
/// Get the wrapped value.
@inline(__always) public var value : T {
get { atomic.load(ordering: .acquiring).value }
nonmutating set(val) { atomic.store(ValueHolder(val), ordering: .releasing)}
}
public init(value : T) {
atomic = ManagedAtomic(ValueHolder(value))
}
/// Where the wrapped type is an `Optional` type, create an initially empty instance
public init<R>(_ rtype : R.Type) where T == R? {
atomic = ManagedAtomic(ValueHolder(Optional.none))
}
/// Set the value using `.sequentiallyConsistent` store ordering rather than the
/// default, `.releasing`.
public func set(_ value : T) {
atomic.store(ValueHolder(value), ordering: .sequentiallyConsistent)
}
/// Returns true if the value was updated
public func replace(with value : T) -> Bool where T : Equatable {
let nue = ValueHolder(value)
return !(atomic.exchange(nue, ordering: .acquiringAndReleasing) == nue)
}
public func exchange(with value : T) -> T {
atomic.exchange(ValueHolder(value), ordering: .acquiringAndReleasing).value
}
/// Note this is NOT actually an atomic update or guaranteed to succeed.
@discardableResult public func updating<R>(_ f : (inout T) throws -> R) rethrows -> R {
var old = atomic.load(ordering: .sequentiallyConsistent).value
let result = try f(&old)
atomic.store(ValueHolder(old), ordering: .sequentiallyConsistent)
return result
}
/// Exchange the value, returning the old one
public func exchange(_ newValue : T) -> T {
atomic.exchange(ValueHolder(newValue), ordering: .acquiringAndReleasing).value
}
/// Uses the busyloop pattern from Java's atomics to ensure that the backing
/// atomic variable has been observed in the target state at least once, potentially
/// calling the passed function multiple times to do it.
public func ensureSet(_ f : (T) -> T) where T: Equatable {
repeat {
let v = self.value
let nue = f(v)
self.value = nue
if self.value == nue {
break
}
} while true
}
/// Where the wrapped type is an `Optional`, replace the value with `Optional.none` and
/// return it in a single atomic operation.
public func take<R>() -> T where T == R? {
let old = self.exchange(Optional.none)
if let o = old {
return o
}
return nil
}
}
extension AnyAtomic : Hashable where T: Hashable {
public func hash(into hasher: inout Hasher) {
value.hash(into: &hasher)
}
}
extension AnyAtomic : Equatable where T: Equatable {
public static func == (lhs: AnyAtomic<T>, rhs: AnyAtomic<T>) -> Bool {
return lhs.value == rhs.value
}
}
extension AnyAtomic : CustomStringConvertible where T: CustomStringConvertible {
public var description : String { value.description }
}
extension AnyAtomic : CustomDebugStringConvertible where T: CustomDebugStringConvertible {
public var debugDescription: String { value.debugDescription }
}