Sometimes there is need to cancel some asynchronous execution while launching a new one with the same id, sometimes vice versa. This library provides an easy API to control every asynchronous task (no matter Coroutines, Rx, or something else). More details of the usage can be found in Medium post
Follow these instructions to implement this library in your project
- add depencencies
- add to project
allprojects {
repositories {
maven { url "" }
- add to module
implementation 'dev.temirlan.common:task:1.0.1'
- implement your own
or find some that matches your preferences (Coroutines or Rx) from samples below - put
in abstract Presenter or ViewModel to have an easy access in every Presenter
abstract class AbstractPresenter<T : AbstractContract.View> : MvpPresenter<T>(), AbstractContract.Presenter {
protected val taskHandler = TaskHandler()
- don't forget to cancel all tasks on presenter or viewmodel destroy
abstract class AbstractPresenter<T : AbstractContract.View> : MvpPresenter<T>(), AbstractContract.Presenter {
override fun onDestroy() {
- define and launch a task with TaskHandler's
method in your presenter(viewmodel) as shown in example
override fun onSetCardAsDefaultClicked(cardModel: CardModel) {
val setCardAsDefaultTask = CoroutineTask(
"setCardAsDefault", // set an id that will correspond to current task
Task.Strategy.KillFirst, // set the necessary strategy (KillFirst will destroy previous task with the same id)
{ billingRepository.setCardAsDefault( }, // provide suspend function (cause we use CoroutineTask)
{ // use the result of the execution
{ // handle exeption if something goes wrong
Task is an interface that contains the following methods. Implement and use it to execute your domain logic
interface Task {
fun getId(): String // id that will be used by taskhandler to identify the same tasks
fun execute(onFinish: () -> Unit) // method that executes the task. onFinish is callback function to inform TaskHandler about completing
fun cancel() // method that cancels the task
fun getStatus(): Status // get the status of the task
fun getStrategy(): Strategy // identifying a Strategy to inform TaskHandler about what to do with the previous task with the same id (KillFirst or KeepFirst). See description below
Status - class that informs about the current task state
sealed class Status {
object InProgress : Status()
object Completed : Status()
object Cancelled : Status()
- KeepFirst - TaskHandler keeps the previous task with the same id and doesn't start current
- KillFirst - TaskHandler cancels the previous task with the same id and starts the current task
sealed class Strategy {
object KeepFirst : Strategy()
object KillFirst : Strategy()
class CoroutineTask<T>(
private val id: String,
private val taskStrategy: Task.Strategy,
private val function: suspend () -> T,
private val onSuccess: (T) -> Unit,
private val onError: (Throwable) -> Unit,
private val scope: CoroutineScope = GlobalScope
) : Task {
private var job: Job? = null
override fun getId(): String {
return id
override fun execute(onFinish: () -> Unit) {
if (job == null) {
job = scope.launch {
try {
val result = function()
withContext(Dispatchers.Main) { onSuccess(result) }
} catch (e: Exception) {
withContext(Dispatchers.Main) { onError(e) }
} finally {
withContext(Dispatchers.Main) { onFinish() }
override fun cancel() {
override fun getStatus(): Task.Status {
return when {
job?.isCompleted == true -> Task.Status.Completed
job?.isActive == true -> Task.Status.InProgress
job?.isCancelled == true -> Task.Status.Cancelled
else -> Task.Status.InProgress
override fun getStrategy(): Task.Strategy {
return taskStrategy
class FlowTask<T>(
private val id: String,
private val taskStrategy: Task.Strategy,
private val function: suspend () -> Flow<T>,
private val onNext: (T) -> Unit,
private val onError: (Throwable) -> Unit,
private val scope: CoroutineScope = GlobalScope
) : Task {
private var job: Job? = null
override fun getId(): String {
return id
override fun execute(onFinish: () -> Unit) {
if (job == null) {
job = scope.launch {
try {
function().collect(object : FlowCollector<T> {
override suspend fun emit(value: T) {
withContext(Dispatchers.Main) { onNext(value) }
} catch (e: Exception) {
withContext(Dispatchers.Main) { onError(e) }
override fun cancel() {
override fun getStatus(): Task.Status {
return when {
job?.isCompleted == true -> Task.Status.Completed
job?.isActive == true -> Task.Status.InProgress
job?.isCancelled == true -> Task.Status.Cancelled
else -> Task.Status.InProgress
override fun getStrategy(): Task.Strategy {
return taskStrategy
class SingleTask<T>(
private val id: String,
private val taskStrategy: Task.Strategy,
private val single: Single<T>,
private val onSuccess: (T) -> Unit,
private val onError: (Throwable) -> Unit,
private var scheduler: Scheduler =
) : Task {
private var disposable: Disposable? = null
override fun getId(): String {
return id
override fun execute(onFinish: () -> Unit) {
if (disposable == null) {
disposable = single
.subscribe(onSuccess, onError)
override fun cancel() {
override fun getStatus(): Task.Status {
if (disposable != null && disposable?.isDisposed == false) {
return Task.Status.InProgress
return Task.Status.Completed
override fun getStrategy(): Task.Strategy {
return taskStrategy
class CompletableTask(
private val id: String,
private val taskStrategy: Task.Strategy,
private val completable: Completable,
private val onComplete: () -> Unit,
private val onError: (Throwable) -> Unit,
private var scheduler: Scheduler =
) : Task {
private var disposable: Disposable? = null
override fun getId(): String {
return id
override fun execute(onFinish: () -> Unit) {
if (disposable == null) {
disposable = completable
.subscribe(onComplete, onError)
override fun cancel() {
override fun getStatus(): Task.Status {
if (disposable != null && disposable?.isDisposed == false) {
return Task.Status.InProgress
return Task.Status.Completed
override fun getStrategy(): Task.Strategy {
return taskStrategy
class FlowableTask<T>(
private val id: String,
private val taskStrategy: Task.Strategy,
private val flowable: Flowable<T>,
private val onNext: (T) -> Unit,
private val onComplete: () -> Unit,
private val onError: (Throwable) -> Unit,
private var scheduler: Scheduler =
) : Task {
private var disposable: Disposable? = null
override fun getId(): String {
return id
override fun execute(onFinish: () -> Unit) {
if (disposable == null) {
disposable = flowable
.subscribe(onNext, onError, onComplete)
override fun cancel() {
override fun getStatus(): Task.Status {
if (disposable != null && disposable?.isDisposed == false) {
return Task.Status.InProgress
return Task.Status.Completed
override fun getStrategy(): Task.Strategy {
return taskStrategy
This repo is open for contributing
- Temirlan Kuntubayev - Initial work - github