Skip to content

Instantly share code, notes, and snippets.

@rajibchy
Last active December 26, 2023 05:57
Show Gist options
  • Select an option

  • Save rajibchy/ae711298c6a8754d15c1e9acfb45867e to your computer and use it in GitHub Desktop.

Select an option

Save rajibchy/ae711298c6a8754d15c1e9acfb45867e to your computer and use it in GitHub Desktop.
🚀 Multithreading Asynchronous signal mechanism 💯
/*
* Copyright FSys Tech Limited [FSys]. All rights reserved.
*
* This software owned by FSys Tech Limited [FSys] and is protected by copyright law
* and international copyright treaties.
*
* Access to and use of the software is governed by the terms of the applicable FSys Software
* Services Agreement (the Agreement) and Customer end user license agreements granting
* a non-assignable, non-transferable and non-exclusive license to use the software
* for it's own data processing purposes under the terms defined in the Agreement.
*
* Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
* of this source code or associated reference material to any other location for further reproduction
* or redistribution, and any amendments to this copyright notice, are expressly prohibited.
*
* Any reproduction or redistribution for sale or hiring of the Software not in accordance with
* the terms of the Agreement is a violation of copyright law.
*/
// 7:17 PM 4/1/2022
// Rajib Chy
using System;
using System.Threading;
using System.Threading.Tasks;
namespace FSys.Framework {
/// <summary>
/// Thread Safe Concurrent <see cref="Thread"/> sync signaling mechanism
/// </summary>
public sealed class AsyncSignal : TokenBase, IAsyncSignal {
private long _isWaiting = 0;
private long _isDisposed = 0;
private DateTime _lastSignaled;
public int ThreadId => _threadId;
private readonly int _threadId = -1;
private CancellationTokenSource _wtask;
public DateTime LastSignaled => _lastSignaled;
private TaskCompletionSource<bool> _tcs = null;
private readonly object _synchronization = new object( );
public bool IsWaiting => Interlocked.Read( ref _isWaiting ) > 0;
public bool IsDisposed => Interlocked.Read( ref _isDisposed ) > 0;
public AsyncSignal( ) : this( 0, CancellationToken.None ) { }
public AsyncSignal( CancellationToken token ) : this( 0, token ) { }
public AsyncSignal( int thId ) : this( thId, CancellationToken.None ) { }
public AsyncSignal( int thId, CancellationToken token ) : base( token ) {
_threadId = thId; Init( );
}
private void Init( ) {
_lastSignaled = DateTime.Now;
base.Token.Register( OnTaskCanceled );
}
private void OnTaskCanceled( ) {
try {
if ( IsDisposed ) return;
Signal( );
} catch ( Exception ex ) {
EvntLog.Write( ex, "AsyncSignal.OnTaskCanceled" );
}
}
/// <summary>
/// Signal to another <see cref="Thread"/>, if <see cref="TaskCompletionSource{bool}"/> is wait for signal, otherwise skip
/// </summary>
public void Signal( ) {
if ( IsDisposed ) {
// remove for thrade sefty
//throw new ObjectDisposedException( nameof( AsyncSignal ) );
return;
}
try {
if ( !IsWaiting ) return;
lock ( _synchronization ) {
if ( _wtask != null ) {
_lastSignaled = DateTime.Now;
_ = Interlocked.Exchange( ref _isWaiting, 0 );
CancellationTokenSource tsource = Interlocked.Exchange( ref _wtask, null );
if ( tsource != null ) {
tsource.Cancel( );
tsource.Dispose( );
}
}
}
} catch ( Exception ex ) {
EvntLog.Write( ex, "AsyncSignal.Signal" );
}
}
/// <summary>
/// Signal to another <see cref="Thread"/>, if <see cref="TaskCompletionSource{bool}"/> is wait for signal, otherwise skip
/// </summary>
public void Notify( ) => Signal( );
private void OnSignalled( ) {
_ = Interlocked.Exchange( ref _isWaiting, 0 );
ThreadSafe.Exchange( ref _wtask, ThreadSafe.DisposeToken );
_ = Task.Run( ( ) => {
try {
TaskCompletionSource<bool> task = Interlocked.Exchange( ref _tcs, null );
if ( task != null ) {
task.SetResult( true );
}
} catch ( Exception ex ) {
EvntLog.Write( ex, "AsyncSignal.WaitAsync" );
}
} );
}
private void CancelIfRunning( bool disposing = false ) {
CancellationTokenSource otoken = Interlocked.Exchange( ref _wtask, disposing ? null : CancellationTokenSource.CreateLinkedTokenSource( base.Token ) );
if ( otoken != null ) {
// unreachable code;
ThreadSafe.DisposeToken( otoken );
}
TaskCompletionSource<bool> otcs = Interlocked.Exchange( ref _tcs, disposing ? null : new TaskCompletionSource<bool>( ) );
if ( otcs != null ) {
if ( !otcs.Task.IsCompleted ) {
//if( !disposing ) {
// otcs.SetResult( false );
//} else {
// otcs.TrySetException( new Exception( "AsyncSignal.CancelIfRunning Task should be sync" ) );
//}
otcs.SetResult( false );
}
}
}
/// <summary>
/// Wait for signal from another <see cref="Thread"/>
/// </summary>
/// <returns>A <see cref="TaskCompletionSource{bool}"/> <see cref="Task{bool}"/> </returns>
public Task<bool> WaitAsync( ) {
if ( IsDisposed ) {
return Task.FromResult( false );
// remove for thrade sefty
//throw new ObjectDisposedException( nameof( AsyncSignal ) );
}
Task<bool> result = null;
try {
lock ( _synchronization ) {
_ = Interlocked.Exchange( ref _isWaiting, 1 );
CancelIfRunning( );
_wtask.Token.Register( OnSignalled );
result = _tcs.Task;
}
} catch ( Exception ex ) {
EvntLog.Write( ex, "AsyncSignal.WaitAsync" );
result ??= Task.FromResult( false );
}
return result;
}
public void Dispose( ) {
if ( IsDisposed ) return;
_ = Interlocked.Increment( ref _isDisposed );
DisposeOrCreateToken( );
CancelIfRunning( true );
}
}
}
/**
* Copyright (c) 2022, https://github.com/rajibchy All rights reserved.
* Copyrights licensed under the New BSD License.
* See the accompanying LICENSE file for terms.
*/
// 7:17 PM 4/1/2022 and created on 10:33 PM 5/24/2022
// Rajib Chy
using System;
using System.Threading.Tasks;
namespace Sow.Framework {
public interface IAsyncSignal : IDisposable {
void Notify( );
bool IsWaiting { get; }
bool IsDisposed { get; }
Task<bool> WaitAsync( );
DateTime LastSignaled { get; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment