-
-
Save glureau-betclic/ec1e86a3206112459ac659b226d817c1 to your computer and use it in GitHub Desktop.
| // Original code: https://www.techyourchance.com/concurrency-frameworks-overrated-android/ | |
| import io.reactivex.Single | |
| import io.reactivex.disposables.Disposable | |
| import io.reactivex.functions.BiFunction | |
| import io.reactivex.functions.Consumer | |
| import io.reactivex.schedulers.Schedulers | |
| import java.io.File | |
| import java.io.IOException | |
| import java.util.concurrent.CountDownLatch | |
| import kotlin.random.Random | |
| fun main() { | |
| val latch = CountDownLatch(1) | |
| UploadFilesUseCase() | |
| .uploadFiles(Consumer { state -> | |
| println("Resulting state $state received on " + Thread.currentThread().name) | |
| latch.countDown() | |
| }) | |
| latch.await() | |
| } | |
| class UploadFilesUseCase { | |
| enum class OperationState { UPLOADED, FAILED } | |
| private var disposable: Disposable? = null | |
| fun uploadFiles(listenerOnUiThread: Consumer<OperationState>) { | |
| if (disposable?.isRunning == true) { | |
| // Log attempt | |
| return | |
| } | |
| disposable = Single.zip( | |
| ioSingleFrom(::processAndMergeFilesOfTypeA), | |
| ioSingleFrom(::processAndMergeFilesOfTypeB), | |
| BiFunction<File, File, File> { aResult: File, bResult: File -> | |
| compressMergedFiles(aResult, bResult) | |
| }) | |
| .flatMap(::uploadResultSingle) | |
| .doOnEvent { _, _ -> deleteTempDir() } // Success or Error | |
| .retry(2) | |
| .map { OperationState.UPLOADED } | |
| .onErrorReturnItem(OperationState.FAILED) | |
| .observeOn(Schedulers.computation()) // Or .observeOn(AndroidSchedulers.mainThread()) when on Android | |
| .subscribe(listenerOnUiThread) | |
| } | |
| } | |
| private val Disposable.isRunning | |
| get() = isDisposed.not() | |
| // Code mapping a non Rx library to a Rx application. So not required if using Rx wrappers with your libs. | |
| private fun <T> ioSingleFrom(method: () -> T) = Single.fromCallable(method).subscribeOn(Schedulers.io()) | |
| private fun processAndMergeFilesOfTypeA() = workAndReturnFile("A") | |
| private fun processAndMergeFilesOfTypeB() = workAndReturnFile("B") | |
| private fun compressMergedFiles(fileA: File, fileB: File) = workAndReturnFile("C") | |
| private fun uploadResultSingle(compressedFile: File) = ioSingleFrom { uploadResult(compressedFile) } | |
| // ------------------------------------------------------------------------------------- | |
| // Dummy implementations for you to be able to understand the threading | |
| // ------------------------------------------------------------------------------------- | |
| fun workAndReturnFile(path: String): File { | |
| println("Thread: " + Thread.currentThread().name + " start working on $path") | |
| Thread.sleep(1000) | |
| randomizedErrors(path) | |
| println("Thread: " + Thread.currentThread().name + " $path DONE") | |
| return File(path) | |
| } | |
| private fun uploadResult(compressedFile: File) { | |
| println("Uploading on: " + Thread.currentThread().name + " ($compressedFile)") | |
| Thread.sleep(2000) | |
| randomizedErrors(compressedFile.path) | |
| println("Uploaded from: " + Thread.currentThread().name + " ($compressedFile)") | |
| } | |
| private fun randomizedErrors(path: String) { | |
| if (Random.nextInt(0, 3) == 0) { | |
| println("-------- Unlucky crash appears while processing $path") | |
| throw IOException("random error") | |
| } | |
| } | |
| private fun deleteTempDir() { | |
| println("Deleting files...") | |
| } | |
| /** | |
| * Example of output: | |
| * | |
| Thread: RxCachedThreadScheduler-2 start working on B | |
| Thread: RxCachedThreadScheduler-1 start working on A | |
| Thread: RxCachedThreadScheduler-1 A DONE | |
| -------- Unlucky crash appears while processing B | |
| Deleting files... | |
| Thread: RxCachedThreadScheduler-1 start working on A | |
| Thread: RxCachedThreadScheduler-2 start working on B | |
| Thread: RxCachedThreadScheduler-1 A DONE | |
| Thread: RxCachedThreadScheduler-2 B DONE | |
| Thread: RxCachedThreadScheduler-2 start working on C | |
| Thread: RxCachedThreadScheduler-2 C DONE | |
| Uploading on: RxCachedThreadScheduler-1 (C) | |
| Uploaded from: RxCachedThreadScheduler-1 (C) | |
| Deleting files... | |
| Resulting state UPLOADED received on RxComputationThreadPool-1 | |
| * | |
| * And if 3 unlucky errors: | |
| * | |
| Thread: RxCachedThreadScheduler-1 start working on A | |
| Thread: RxCachedThreadScheduler-2 start working on B | |
| -------- Unlucky crash appears while processing A | |
| Thread: RxCachedThreadScheduler-2 B DONE | |
| Deleting files... | |
| Thread: RxCachedThreadScheduler-2 start working on A | |
| Thread: RxCachedThreadScheduler-3 start working on B | |
| -------- Unlucky crash appears while processing B | |
| Thread: RxCachedThreadScheduler-2 A DONE | |
| Deleting files... | |
| Thread: RxCachedThreadScheduler-2 start working on B | |
| Thread: RxCachedThreadScheduler-1 start working on A | |
| Thread: RxCachedThreadScheduler-2 B DONE | |
| Thread: RxCachedThreadScheduler-1 A DONE | |
| Thread: RxCachedThreadScheduler-1 start working on C | |
| Thread: RxCachedThreadScheduler-1 C DONE | |
| Uploading on: RxCachedThreadScheduler-3 (C) | |
| -------- Unlucky crash appears while processing C | |
| Deleting files... | |
| Resulting state FAILED received on RxComputationThreadPool-1 | |
| */ |
if (disposable?.isDisposed == true) {
I don't think this is safe. Once you successfully run this method, it will forever be silently no-op.
I think the no-op should occur only while disposable is available, and otherwise set disposable to null in doFinally {.
Damn, you're right, thanks! Next time I'll write some unit tests before posting a public gist 🤣
I had some issues with setting the nullability on disposable when I provide streams that can be consumed in multiple places. Sometimes a stream is completed (end of stream, RxAndroidLifecycle = terminal event), sometimes finishes with an error (terminal event), and sometimes it's manually disposed (dispose() -> no terminal event), and if the 2 methods doOnTerminate+doOnDispose are not defined to null the disposable, then it depends of the type of cancellation.
To avoid that, I usually prefer an extension function like
private val Disposable.isRunning
get() = isDisposed.not()
And a condition like this:
if (disposable?.isRunning == true) {
// Log attempt
return
}
I find it more explicit on the intent (if it's already running, then stop), and I don't have to care about releasing a Disposable object (low memory footprint anyway).
What do you think?
Gist is now fixed, but for future readers, this is nothing more than a support for a Twitter conversation. 😉
this is equivalent to
You might even be able to replace it with just