Concurrency dan Race Condition di Kotlin: Panduan Lengkap dari Thread hingga Coroutines
21 min read

Concurrency dan Race Condition di Kotlin: Panduan Lengkap dari Thread hingga Coroutines

Kotlin adalah bahasa yang tumbuh di atas ekosistem JVM, mewarisi seluruh model threading Java — platform thread, synchronized, java.util.concurrent, volatile — semuanya bisa dipakai langsung dari Kotlin. Namun Kotlin tidak berhenti di sana. JetBrains merancang Coroutines sebagai solusi concurrency first-class yang lebih ekspresif, lebih aman, dan jauh lebih ringan dibanding thread tradisional. Coroutine bukan fitur bahasa dalam arti keyword baru di kompiler — ia adalah library (kotlinx.coroutines) yang dibangun di atas mekanisme suspension yang disediakan kompiler Kotlin. Hasilnya adalah model concurrency yang terasa seperti kode synchronous biasa, tapi bisa berjalan secara concurrent tanpa memblokir thread OS.

Memahami concurrency di Kotlin berarti memahami dua dunia sekaligus: warisan dari Java yang tetap relevan (terutama untuk interoperabilitas dengan library Java dan kode CPU-bound), dan Coroutines beserta ekosistemnya (Channel, Flow, CoroutineScope, structured concurrency) yang menjadi cara idiomatik Kotlin untuk I/O-bound concurrency. Artikel ini membahas keduanya secara mendalam — termasuk race condition yang bisa terjadi di keduanya, cara mendeteksinya, dan cara mencegahnya dengan mekanisme sinkronisasi yang tepat.

Thread di Kotlin: Warisan dari Java

Karena Kotlin berjalan di atas JVM, semua primitif threading Java tersedia langsung. Kotlin hanya menambahkan sedikit sintaks yang lebih bersih:

// Membuat thread dengan lambda (Kotlin idiom)
val thread = Thread {
    println("Berjalan di: ${Thread.currentThread().name}")
}
thread.start()

// Atau dengan extension function bawaan Kotlin
val t = thread(name = "worker-1") { // dari kotlin.concurrent
    println("Berjalan di: ${Thread.currentThread().name}")
}

// Thread dengan daemon flag
thread(isDaemon = true, name = "background") {
    while (true) {
        doBackgroundWork()
        Thread.sleep(1000)
    }
}

Seluruh java.util.concurrentExecutorService, ReentrantLock, AtomicInteger, ConcurrentHashMap — bisa dipakai langsung dari Kotlin karena interop penuh dengan Java. Namun di Kotlin modern, kebutuhan untuk membuat thread secara manual sangat jarang; hampir selalu ada solusi yang lebih baik via Coroutines.


Race Condition di Kotlin: Sama Berbahayanya dengan Java

Race condition di Kotlin bekerja persis seperti di Java — karena keduanya berbagi model memori JVM. Kode yang mengakses shared mutable state dari beberapa thread tanpa sinkronisasi menghasilkan perilaku yang tidak deterministik.

Contoh Klasik: Counter Tidak Aman

// ANTI-PATTERN: race condition pada shared counter
var counter = 0 // shared mutable state

val threads = (1..10).map {
    Thread {
        repeat(1000) {
            counter++ // TIDAK AMAN: baca-tambah-tulis bukan operasi atomik
        }
    }
}

threads.forEach { it.start() }
threads.forEach { it.join() }

println(counter) // bukan 10000 — hasilnya tidak terprediksi setiap kali dijalankan

Visibility Problem di Kotlin

// ANTI-PATTERN: visibility problem
class TaskRunner {
    private var running = true // tidak ada jaminan visibility antar thread

    fun stop() {
        running = false // thread lain mungkin tidak pernah melihat ini
    }

    fun run() {
        while (running) { // mungkin loop selamanya di CPU cache
            doWork()
        }
    }
}

// BENAR: @Volatile annotation (setara dengan Java volatile)
class SafeTaskRunner {
    @Volatile
    private var running = true // dijamin visible ke semua thread

    fun stop() { running = false }

    fun run() {
        while (running) {
            doWork()
        }
    }
}

Sinkronisasi dengan synchronized dan Lock

Kotlin mendukung @Synchronized annotation dan synchronized() function:

// Menggunakan synchronized block
class SafeCounter {
    private var count = 0
    private val lock = Any() // objek sebagai monitor

    fun increment() {
        synchronized(lock) {
            count++
        }
    }

    fun getCount(): Int = synchronized(lock) { count }
}

// Menggunakan @Synchronized annotation (setara synchronized method di Java)
class SynchronizedCounter {
    private var count = 0

    @Synchronized
    fun increment() { count++ }

    @Synchronized
    fun getCount(): Int = count
}

// Menggunakan ReentrantLock dari Java
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock // extension function Kotlin

class LockCounter {
    private var count = 0
    private val lock = ReentrantLock()

    fun increment() = lock.withLock { count++ } // withLock auto-unlock di finally

    fun getCount(): Int = lock.withLock { count }
}

Atomic Classes

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.LongAdder

// AtomicInteger untuk counter thread-safe tanpa lock
val atomicCounter = AtomicInteger(0)
atomicCounter.incrementAndGet()
atomicCounter.addAndGet(5)
atomicCounter.compareAndSet(10, 20) // CAS

// LongAdder untuk high-contention increment-only counter
val adder = LongAdder()
adder.increment()
val total = adder.sum()

Memperkenalkan Coroutines: Concurrency Tanpa Thread Baru

Coroutine adalah unit eksekusi yang bisa di-suspend (dijeda) dan di-resume (dilanjutkan) tanpa memblokir thread yang menjalankannya. Berbeda dengan thread yang ketika melakukan I/O akan “tidur” dan menunggu di sana (memblokir thread OS), coroutine yang melakukan operasi suspend akan melepaskan thread-nya sehingga thread tersebut bisa mengerjakan coroutine lain.

sequenceDiagram
    participant T as Thread
    participant C1 as Coroutine 1
    participant C2 as Coroutine 2
    T->>C1: menjalankan C1
    C1->>C1: doWork()
    C1-->>T: suspend (menunggu I/O)
    T->>C2: menjalankan C2 (thread bebas!)
    C2->>C2: doOtherWork()
    C2-->>T: suspend (menunggu I/O)
    T->>C1: resume C1 (I/O selesai)
    C1->>C1: lanjut setelah I/O

Satu thread bisa menjalankan ribuan coroutine bergantian, membuat coroutine jauh lebih ringan dari thread dan cocok untuk aplikasi dengan banyak concurrent I/O.

Menambahkan Dependency

// build.gradle.kts
dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
    // Untuk Android:
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.0")
    // Untuk Spring Boot:
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.0")
}

Suspend Function: Fondasi Coroutines

Sebuah fungsi yang bisa di-suspend ditandai dengan keyword suspend. Fungsi ini hanya bisa dipanggil dari coroutine lain atau fungsi suspend lainnya — tidak bisa dari kode biasa secara langsung.

// Fungsi suspend — bisa dijeda tanpa memblokir thread
suspend fun fetchUser(id: Int): User {
    delay(500) // suspend selama 500ms, thread TIDAK diblokir
    return User(id, "John Doe")
}

suspend fun fetchOrders(userId: Int): List<Order> {
    delay(300) // suspend, thread bebas mengerjakan coroutine lain
    return listOf(Order(1), Order(2))
}

// Menggunakan suspend function — harus dari dalam coroutine
suspend fun loadDashboard() {
    val user = fetchUser(1)     // suspend di sini
    val orders = fetchOrders(1) // lalu suspend lagi
    println("${user.name}: ${orders.size} orders")
}
delay() adalah suspend function dari kotlinx.coroutines yang menjeda coroutine tanpa memblokir thread — berbeda dengan Thread.sleep() yang memblokir thread OS. Gunakan selalu delay() di dalam coroutine, bukan Thread.sleep().

CoroutineScope dan Structured Concurrency

Setiap coroutine harus dibuat di dalam sebuah CoroutineScope. Scope ini mendefinisikan lifecycle dari coroutine — ketika scope dibatalkan, semua coroutine di dalamnya juga dibatalkan secara otomatis. Inilah inti dari structured concurrency: coroutine tidak bisa “bocor” keluar dari scope yang mendefinisikannya.

import kotlinx.coroutines.*

fun main() = runBlocking { // membuat scope dan block thread hingga selesai
    println("Mulai di: ${Thread.currentThread().name}")

    launch { // meluncurkan coroutine baru di dalam scope ini
        delay(1000)
        println("Coroutine 1 selesai")
    }

    launch {
        delay(500)
        println("Coroutine 2 selesai")
    }

    println("Menunggu semua coroutine selesai...")
    // runBlocking menunggu sampai semua child coroutine selesai
}

launch vs async

launch digunakan untuk coroutine yang tidak mengembalikan nilai (fire-and-forget). async digunakan ketika kamu butuh hasil dari coroutine (mengembalikan Deferred<T>).

runBlocking {
    // launch: tidak ada return value
    val job: Job = launch {
        delay(1000)
        println("Pekerjaan selesai")
    }
    job.join() // tunggu job selesai

    // async: mengembalikan Deferred<T>, bisa di-await
    val deferred: Deferred<Int> = async {
        delay(1000)
        42 // return value
    }
    val result = deferred.await() // suspend hingga hasil tersedia
    println("Hasil: $result")

    // Menjalankan dua tugas secara paralel dengan async
    val deferredA = async { fetchUser(1) }
    val deferredB = async { fetchOrders(1) }

    val user = deferredA.await()   // tunggu keduanya
    val orders = deferredB.await()
    // fetchUser dan fetchOrders berjalan BERSAMAAN, bukan berurutan
}
flowchart LR
    subgraph Sequential
        A[fetchUser] --> B[fetchOrders]
        B --> C[Selesai - 800ms]
    end
    subgraph Parallel dengan async
        D[fetchUser - 500ms] --> F[await A + B - 500ms]
        E[fetchOrders - 300ms] --> F
        F --> G[Selesai - 500ms]
    end

Coroutine Dispatcher: Menentukan Thread Mana yang Dipakai

CoroutineDispatcher menentukan di thread mana sebuah coroutine berjalan. Ini adalah pengganti idiomatik Kotlin untuk memilih antara thread pool yang berbeda.

// Dispatcher.Default: thread pool untuk CPU-intensive work
// Jumlah thread = jumlah core CPU
launch(Dispatchers.Default) {
    val result = heavyComputation() // perhitungan berat
}

// Dispatchers.IO: thread pool untuk I/O operations
// Jumlah thread lebih banyak (default 64, atau jumlah core jika lebih besar)
launch(Dispatchers.IO) {
    val data = readFile("data.txt")     // file I/O
    val response = httpClient.get(url)   // network call
    val rows = database.query(sql)       // database query
}

// Dispatchers.Main: main/UI thread (Android, JavaFX, Swing)
launch(Dispatchers.Main) {
    updateUI(result) // hanya di Android/GUI environment
}

// Dispatchers.Unconfined: tidak terikat thread tertentu (jarang dipakai)
launch(Dispatchers.Unconfined) {
    println(Thread.currentThread().name) // berjalan di thread pemanggil
}

Berpindah Dispatcher dalam Satu Coroutine

suspend fun loadAndProcess(): String {
    // Mulai dengan I/O: ambil data dari network
    val rawData = withContext(Dispatchers.IO) {
        httpClient.get("https://api.example.com/data")
    }

    // Pindah ke Default: proses data (CPU-intensive)
    val processed = withContext(Dispatchers.Default) {
        parseAndTransform(rawData)
    }

    // Kembali ke Main untuk update UI (di Android)
    withContext(Dispatchers.Main) {
        showResult(processed)
    }

    return processed
}
DispatcherThread PoolCocok untuk
DefaultCPU coresParsing, sorting, komputasi
IO64+ threadsFile, network, database
Main1 (UI thread)Update UI
UnconfinedThread pemanggilTesting, kasus khusus

Race Condition di Coroutines

Meskipun coroutine terasa sequential dalam penulisannya, race condition tetap bisa terjadi ketika beberapa coroutine mengakses shared mutable state — terutama jika berjalan di dispatcher berbeda atau di coroutine yang diluncurkan secara paralel.

Contoh: Counter Race Condition di Coroutines

// ANTI-PATTERN: race condition di coroutines
var counter = 0 // shared mutable state

runBlocking {
    val jobs = (1..1000).map {
        launch(Dispatchers.Default) {
            counter++ // TIDAK AMAN meski terlihat seperti kode sequential!
        }
    }
    jobs.forEach { it.join() }
}

println(counter) // bukan 1000 — ada lost updates

Ini terjadi karena Dispatchers.Default menggunakan banyak thread, dan counter++ tetap bukan operasi atomik di level CPU.

Solusi 1: Atomic dari Java

import java.util.concurrent.atomic.AtomicInteger

// BENAR: AtomicInteger untuk counter sederhana
val counter = AtomicInteger(0)

runBlocking {
    val jobs = (1..1000).map {
        launch(Dispatchers.Default) {
            counter.incrementAndGet() // atomik
        }
    }
    jobs.forEach { it.join() }
}

println(counter.get()) // selalu 1000

Solusi 2: Mutex dari Coroutines

kotlinx.coroutines.sync.Mutex adalah versi suspend-friendly dari mutex — saat tidak bisa mendapat lock, ia meng-suspend coroutine (bukan memblokir thread):

import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

// BENAR: Mutex coroutine-friendly
val mutex = Mutex()
var counter = 0

runBlocking {
    val jobs = (1..1000).map {
        launch(Dispatchers.Default) {
            mutex.withLock { // suspend jika lock tidak tersedia, tidak blokir thread
                counter++
            }
        }
    }
    jobs.forEach { it.join() }
}

println(counter) // selalu 1000

Solusi 3: Single-Thread Confinement

Pola paling idiomatik di Kotlin: batasi akses ke shared state hanya dari satu coroutine/thread:

// BENAR: batasi akses counter ke single-threaded dispatcher
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

runBlocking {
    val jobs = (1..1000).map {
        launch(Dispatchers.Default) {
            withContext(counterContext) {
                counter++ // aman karena hanya satu thread yang mengeksekusi ini
            }
        }
    }
    jobs.forEach { it.join() }
}

println(counter) // selalu 1000
counterContext.close()

Solusi 4: Actor Pattern dengan Channel

Mengikuti filosofi “share by communicating” — satu coroutine memiliki state, yang lain berkomunikasi via channel:

import kotlinx.coroutines.channels.*

// Actor: coroutine yang memproses pesan secara sequential
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // state hanya dipegang oleh actor ini
    for (msg in channel) {
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

runBlocking {
    val counter = counterActor()

    val jobs = (1..1000).map {
        launch(Dispatchers.Default) {
            counter.send(IncCounter) // kirim pesan, tidak ada shared state
        }
    }
    jobs.forEach { it.join() }

    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter: ${response.await()}") // selalu 1000

    counter.close()
}

Channel: Komunikasi Antar Coroutine

Channel di Kotlin Coroutines adalah setara dengan Go channel — “pipa” bertipe untuk komunikasi antar coroutine dengan sinkronisasi bawaan.

import kotlinx.coroutines.channels.*

runBlocking {
    // Unbuffered channel: pengirim suspend sampai ada penerima
    val channel = Channel<Int>()

    launch {
        for (i in 1..5) {
            println("Mengirim $i")
            channel.send(i) // suspend jika tidak ada penerima
        }
        channel.close() // beritahu penerima tidak ada data lagi
    }

    for (value in channel) { // iterate sampai channel ditutup
        println("Menerima $value")
    }
}

// Buffered channel
val buffered = Channel<Int>(capacity = 10)

// Rendezvous (unbuffered, default)
val rendezvous = Channel<Int>(Channel.RENDEZVOUS)

// Unlimited buffer (hati-hati: bisa OOM)
val unlimited = Channel<Int>(Channel.UNLIMITED)

// Drop oldest jika penuh
val dropping = Channel<Int>(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)

Fan-Out dan Fan-In dengan Channel

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++)
}

fun CoroutineScope.processNumber(id: Int, numbers: ReceiveChannel<Int>) = launch {
    for (msg in numbers) {
        println("Worker $id memproses $msg")
    }
}

// Fan-out: satu producer, banyak consumer
runBlocking {
    val producer = produceNumbers()
    repeat(3) { id ->
        processNumber(id, producer) // 3 worker berbagi channel yang sama
    }
    delay(1000)
    producer.cancel() // hentikan producer
}

// Fan-in: gabungkan beberapa channel menjadi satu
fun CoroutineScope.mergeChannels(vararg channels: ReceiveChannel<Int>): ReceiveChannel<Int> =
    produce {
        for (channel in channels) {
            launch {
                for (value in channel) send(value)
            }
        }
    }

Flow: Asynchronous Data Streams

Flow adalah abstraksi untuk cold asynchronous stream — sebuah urutan nilai yang diproduksi secara asynchronous dan hanya mulai mengalir ketika ada yang mengcollect-nya.

import kotlinx.coroutines.flow.*

// Membuat Flow
fun numbersFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100) // operasi suspend di dalam flow builder
        emit(i)    // memancarkan nilai
    }
}

// Collecting Flow (terminal operator, suspend function)
runBlocking {
    numbersFlow()
        .filter { it % 2 == 0 }       // transformasi: filter
        .map { it * it }               // transformasi: map
        .collect { value ->            // terminal operator
            println(value)             // 4, 16
        }
}

Operator Flow yang Umum Dipakai

val flow = flowOf(1, 2, 3, 4, 5)

// Transformasi
flow.map { it * 2 }               // transformasi setiap elemen
flow.filter { it > 2 }           // filter elemen
flow.flatMapMerge { fetchData(it) } // flatMap concurrent
flow.transform { emit(it); emit(it * 2) } // emit beberapa nilai per elemen

// Kombinasi
flow1.zip(flow2) { a, b -> a + b }     // zip dua flow
flow1.combine(flow2) { a, b -> a + b } // combine (re-emit saat salah satu berubah)
merge(flow1, flow2)                     // merge beberapa flow menjadi satu

// Kontrol
flow.take(3)           // hanya ambil 3 elemen pertama
flow.debounce(300)     // tunggu 300ms tenang sebelum emit (cocok untuk search input)
flow.distinctUntilChanged() // skip jika nilai sama dengan sebelumnya
flow.retry(3)          // retry hingga 3 kali jika error
flow.catch { e -> emit(defaultValue) } // error handling

// Terminal
flow.toList()          // kumpulkan semua ke List (suspend)
flow.first()           // ambil elemen pertama saja (suspend)
flow.count()           // hitung jumlah elemen (suspend)
flow.reduce { acc, value -> acc + value } // reduce (suspend)

Cold vs Hot Flow

Flow biasa adalah cold — setiap collector mendapat aliran data dari awal, dan flow hanya berjalan saat ada collector:

val coldFlow = flow {
    println("Flow mulai berjalan")
    emit(1)
    emit(2)
}

runBlocking {
    coldFlow.collect { println("Collector 1: $it") }
    // "Flow mulai berjalan" tercetak lagi untuk collector kedua
    coldFlow.collect { println("Collector 2: $it") }
}

SharedFlow dan StateFlow adalah hot — berjalan terlepas dari ada collector atau tidak, dan bisa dibagikan ke banyak collector:


SharedFlow dan StateFlow: Hot Streams

SharedFlow: Broadcast ke Banyak Collector

import kotlinx.coroutines.flow.*

val sharedFlow = MutableSharedFlow<String>(
    replay = 1,           // subscriber baru langsung dapat 1 nilai terakhir
    extraBufferCapacity = 10 // buffer tambahan agar emit tidak suspend
)

runBlocking {
    // Emitter
    launch {
        repeat(5) { i ->
            sharedFlow.emit("Event $i")
            delay(100)
        }
    }

    // Dua collector menerima event yang sama
    launch {
        sharedFlow.collect { println("Collector A: $it") }
    }

    launch {
        sharedFlow.collect { println("Collector B: $it") }
    }

    delay(1000)
}

StateFlow: State Holder yang Reaktif

StateFlow adalah SharedFlow khusus yang selalu punya nilai saat ini, mirip LiveData di Android:

import kotlinx.coroutines.flow.*

class ViewModel {
    private val _uiState = MutableStateFlow(UiState.Loading)
    val uiState: StateFlow<UiState> = _uiState.asStateFlow() // expose sebagai read-only

    suspend fun loadData() {
        _uiState.value = UiState.Loading

        try {
            val data = repository.fetchData()
            _uiState.value = UiState.Success(data)
        } catch (e: Exception) {
            _uiState.value = UiState.Error(e.message ?: "Terjadi kesalahan")
        }
    }
}

// Di UI / collector
viewModel.uiState
    .collect { state ->
        when (state) {
            is UiState.Loading -> showLoading()
            is UiState.Success -> showData(state.data)
            is UiState.Error -> showError(state.message)
        }
    }
Flow (Cold)SharedFlow (Hot)StateFlow (Hot)
Mulai berjalanSaat ada collectorSelaluSelalu
Nilai saat iniTidak adaTidak ada (kecuali replay)Ya, selalu
Banyak collectorSetiap dapat aliran sendiriSemua dapat event yang samaSemua dapat state terkini
Cocok untukData pipeline, queryEvent/event busUI State, config

Structured Concurrency di Kotlin

Structured concurrency adalah salah satu kontribusi terbesar Kotlin Coroutines. Prinsipnya sederhana: coroutine tidak bisa hidup lebih lama dari scope yang membuatnya. Ini secara otomatis mencegah coroutine leak.

// Ketika scope di-cancel, semua child coroutine ikut di-cancel
val scope = CoroutineScope(Dispatchers.Default)

scope.launch {
    launch { delay(Long.MAX_VALUE) } // child 1
    launch { delay(Long.MAX_VALUE) } // child 2
    delay(Long.MAX_VALUE)            // parent
}

scope.cancel() // semua coroutine di atas langsung di-cancel

coroutineScope vs supervisorScope

// coroutineScope: jika satu child gagal, semua child lain di-cancel
suspend fun fetchDashboard() = coroutineScope {
    val user = async { fetchUser() }      // jika ini gagal...
    val orders = async { fetchOrders() }  // ...ini juga di-cancel

    DashboardData(user.await(), orders.await())
}

// supervisorScope: kegagalan satu child tidak mempengaruhi child lain
suspend fun fetchAll() = supervisorScope {
    val user = async { fetchUser() }
    val orders = async { fetchOrders() }
    val recommendations = async { fetchRecommendations() }

    // Rekomendasi mungkin gagal, tapi user dan orders tetap diproses
    DashboardData(
        user = user.await(),
        orders = orders.await(),
        recommendations = try { recommendations.await() } catch (e: Exception) { emptyList() }
    )
}
flowchart TD
    subgraph "coroutineScope — kegagalan menyebar"
        CS[coroutineScope] --> CA[async: fetchUser ✓]
        CS --> CB[async: fetchOrders ✗ GAGAL]
        CB -->|cancel| CA
        CB -->|gagal| CS
    end
    subgraph "supervisorScope — kegagalan terisolasi"
        SS[supervisorScope] --> SA[async: fetchUser ✓]
        SS --> SB[async: fetchOrders ✗ GAGAL]
        SS --> SC[async: fetchRecs ✓]
        SB -->|tidak mempengaruhi| SA
        SB -->|tidak mempengaruhi| SC
    end

Cancellation dan Exception Handling

Cooperative Cancellation

Coroutine di Kotlin bersifat cooperative — ia hanya bisa di-cancel di titik-titik suspension. Kode CPU-bound yang tidak pernah suspend tidak bisa di-cancel secara otomatis:

// ANTI-PATTERN: tidak cooperative, tidak bisa di-cancel
val job = launch {
    var i = 0
    while (true) { // loop tanpa suspend point
        i++
        // tidak ada suspension di sini — cancel() tidak akan berpengaruh
    }
}
job.cancel() // tidak akan bekerja!

// BENAR: periksa isActive atau gunakan yield()
val job = launch {
    var i = 0
    while (isActive) { // cek apakah masih aktif
        i++
    }
}

// Atau gunakan yield() untuk memberi kesempatan cancel/schedule
val job = launch {
    var i = 0
    while (true) {
        yield() // suspension point: cek cancel dan beri giliran scheduler
        i++
    }
}

job.cancel()

CancellationException

Ketika coroutine di-cancel, CancellationException dilempar di titik suspension berikutnya. Penting: jangan tangkap CancellationException dan menelannya:

// ANTI-PATTERN: menelan CancellationException
launch {
    try {
        delay(1000)
    } catch (e: Exception) { // menangkap SEMUA exception termasuk CancellationException!
        println("Error: ${e.message}") // seharusnya tidak dilakukan ini
        // coroutine tidak benar-benar di-cancel
    }
}

// BENAR: hanya tangkap yang bukan CancellationException
launch {
    try {
        delay(1000)
    } catch (e: CancellationException) {
        throw e // re-throw CancellationException!
    } catch (e: Exception) {
        println("Error: ${e.message}") // tangkap error lainnya
    }
}

// Atau gunakan try/finally untuk cleanup
launch {
    try {
        delay(1000)
        doWork()
    } finally {
        // cleanup selalu dijalankan, baik selesai normal maupun di-cancel
        closeResources()
    }
}

CoroutineExceptionHandler

Untuk menangkap uncaught exception dari launch (bukan async):

val handler = CoroutineExceptionHandler { _, exception ->
    println("Uncaught exception: $exception")
    // log, lapor ke Sentry, dsb
}

val scope = CoroutineScope(Dispatchers.Default + handler)

scope.launch {
    throw RuntimeException("Sesuatu yang tidak terduga!")
    // CoroutineExceptionHandler akan dipanggil
}

Anti-Pattern Umum di Kotlin Coroutines

Anti-Pattern 1: GlobalScope

// ANTI-PATTERN: GlobalScope — coroutine tidak terikat lifecycle apapun
fun loadData() {
    GlobalScope.launch { // bahaya: hidup selamanya, bisa leak!
        val data = fetchFromNetwork()
        updateUI(data)
    }
}

// BENAR: gunakan scope yang terikat lifecycle
class MyViewModel : ViewModel() {
    fun loadData() {
        viewModelScope.launch { // di-cancel otomatis saat ViewModel destroyed
            val data = fetchFromNetwork()
            _uiState.value = UiState.Success(data)
        }
    }
}

// Atau buat scope sendiri yang bisa di-cancel
class DataLoader {
    private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

    fun load() {
        scope.launch { /* ... */ }
    }

    fun destroy() {
        scope.cancel() // cancel semua coroutine
    }
}

Anti-Pattern 2: Blocking di dalam Coroutine

// ANTI-PATTERN: memanggil blocking code di coroutine tanpa dispatcher IO
suspend fun fetchData(): String {
    val response = OkHttpClient().newCall(request).execute() // BLOCKING!
    // ini memblokir carrier thread, mengurangi efektivitas coroutine
    return response.body?.string() ?: ""
}

// BENAR: wrap blocking code dengan withContext(Dispatchers.IO)
suspend fun fetchData(): String {
    return withContext(Dispatchers.IO) {
        val response = OkHttpClient().newCall(request).execute() // blocking OK di sini
        response.body?.string() ?: ""
    }
}

// BENAR: lebih baik lagi, gunakan library yang sudah suspend-aware
suspend fun fetchData(): String {
    return httpClient.get(url).bodyAsText() // ktor client: sudah suspend, tidak blocking
}

Anti-Pattern 3: Tidak Menangani Exception di async

// ANTI-PATTERN: exception di async baru muncul saat await dipanggil
// dan mudah terlewatkan
val deferred = async {
    throw RuntimeException("Gagal!")
}
// exception belum muncul di sini...

delay(100)
deferred.await() // baru meledak di sini — mudah terlupa

// BENAR: tangani exception saat await
val result = try {
    deferred.await()
} catch (e: Exception) {
    defaultValue
}

// Atau gunakan runCatching
val result = runCatching { deferred.await() }
    .getOrElse { defaultValue }

Anti-Pattern 4: Membuat Dispatcher Baru yang Tidak Diclose

// ANTI-PATTERN: dispatcher baru tanpa close — thread leak
suspend fun doWork() {
    val dispatcher = newSingleThreadContext("worker")
    withContext(dispatcher) {
        // ...
    }
    // dispatcher tidak di-close, thread tetap hidup selamanya!
}

// BENAR: close dispatcher setelah selesai
suspend fun doWork() {
    val dispatcher = newSingleThreadContext("worker")
    try {
        withContext(dispatcher) {
            // ...
        }
    } finally {
        dispatcher.close() // pastikan thread dibebaskan
    }
}

Anti-Pattern 5: Race Condition pada Shared StateFlow

// ANTI-PATTERN: modifikasi StateFlow dari beberapa coroutine tanpa sinkronisasi
val _count = MutableStateFlow(0)

repeat(1000) {
    launch(Dispatchers.Default) {
        _count.value = _count.value + 1 // RACE CONDITION: baca-modifikasi-tulis!
    }
}

// BENAR: gunakan update() yang atomik
repeat(1000) {
    launch(Dispatchers.Default) {
        _count.update { it + 1 } // atomik: tidak ada lost update
    }
}

Pola Concurrency Produksi di Kotlin

Retry dengan Exponential Backoff

suspend fun <T> retryWithBackoff(
    times: Int = 3,
    initialDelay: Long = 100,
    maxDelay: Long = 1000,
    factor: Double = 2.0,
    block: suspend () -> T
): T {
    var currentDelay = initialDelay
    repeat(times - 1) { attempt ->
        try {
            return block()
        } catch (e: Exception) {
            if (e is CancellationException) throw e // jangan retry cancel
            println("Percobaan ${attempt + 1} gagal: ${e.message}, retry dalam ${currentDelay}ms")
        }
        delay(currentDelay)
        currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
    }
    return block() // percobaan terakhir, biarkan exception menyebar
}

// Penggunaan
val result = retryWithBackoff(times = 3, initialDelay = 100) {
    httpClient.get("https://api.example.com/data")
}

Timeout

import kotlinx.coroutines.*

// withTimeout: lempar TimeoutCancellationException jika melebihi batas
val result = withTimeout(5000L) {
    fetchDataFromNetwork() // harus selesai dalam 5 detik
}

// withTimeoutOrNull: return null jika timeout, tidak throw exception
val result: String? = withTimeoutOrNull(5000L) {
    fetchDataFromNetwork()
} ?: "default value"

Parallel Decomposition

// Menjalankan banyak request secara paralel dan mengumpulkan hasilnya
suspend fun fetchAllUsers(ids: List<Int>): List<User> = coroutineScope {
    ids.map { id ->
        async { fetchUser(id) } // setiap request berjalan paralel
    }.awaitAll() // tunggu semua selesai, throw jika ada yang gagal
}

// Dengan kontrol batas concurrency (semaphore)
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit

suspend fun fetchAllUsersLimited(ids: List<Int>, maxConcurrent: Int = 10): List<User> {
    val semaphore = Semaphore(maxConcurrent) // maksimal 10 request bersamaan
    return coroutineScope {
        ids.map { id ->
            async {
                semaphore.withPermit { // tunggu izin sebelum mulai
                    fetchUser(id)
                }
            }
        }.awaitAll()
    }
}

Worker Pool dengan Channel

fun CoroutineScope.workerPool(
    workers: Int,
    jobs: ReceiveChannel<Int>,
    results: SendChannel<Int>
) {
    repeat(workers) { workerId ->
        launch(Dispatchers.IO) {
            for (job in jobs) {
                val result = processJob(job)
                results.send(result)
                println("Worker $workerId selesai job $job")
            }
        }
    }
}

runBlocking {
    val jobs = Channel<Int>(Channel.UNLIMITED)
    val results = Channel<Int>(Channel.UNLIMITED)

    // Isi jobs
    repeat(100) { jobs.send(it) }
    jobs.close()

    // Jalankan 5 worker paralel
    workerPool(workers = 5, jobs = jobs, results = results)

    // Kumpulkan hasil
    repeat(100) { println("Hasil: ${results.receive()}") }
}

Testing Coroutines

Menguji kode concurrent selalu tricky. kotlinx-coroutines-test menyediakan tool khusus:

import kotlinx.coroutines.test.*
import kotlin.test.*

// TestCoroutineScheduler untuk kontrol waktu virtual
class UserViewModelTest {

    @Test
    fun `loading state berubah dengan benar`() = runTest {
        val viewModel = UserViewModel()

        viewModel.loadUser(1)

        // advanceUntilIdle: jalankan semua coroutine yang pending
        advanceUntilIdle()

        assertEquals(UiState.Success::class, viewModel.uiState.value::class)
    }

    @Test
    fun `delay di-virtualisasi tanpa menunggu waktu nyata`() = runTest {
        var result = 0

        launch {
            delay(10_000) // 10 detik — tapi test tidak benar-benar menunggu 10 detik!
            result = 42
        }

        advanceTimeBy(10_001) // majukan waktu virtual
        assertEquals(42, result)
    }
}

Checklist Review Kode Concurrent Kotlin

THREAD SAFETY DASAR:
  □ Apakah semua akses ke var yang dipakai dari beberapa coroutine/thread aman?
  □ Apakah @Volatile digunakan untuk flag yang dibaca dari thread berbeda?
  □ Apakah tidak ada HashMap/ArrayList biasa yang diakses dari beberapa thread?

COROUTINE SCOPE:
  □ Apakah GlobalScope tidak digunakan? (gunakan scope yang terikat lifecycle)
  □ Apakah setiap CoroutineScope yang dibuat manual di-cancel saat tidak dipakai?
  □ Apakah tidak ada coroutine yang bisa "bocor" keluar dari scope-nya?

DISPATCHER:
  □ Apakah blocking I/O selalu di dalam withContext(Dispatchers.IO)?
  □ Apakah CPU-intensive work di dalam withContext(Dispatchers.Default)?
  □ Apakah tidak ada Thread.sleep() di dalam coroutine (gunakan delay())?

CANCELLATION:
  □ Apakah CancellationException tidak ditelan (catch Exception tanpa re-throw)?
  □ Apakah long-running CPU task memeriksa isActive atau memanggil yield()?
  □ Apakah cleanup di dalam finally block (bukan hanya di catch)?

EXCEPTION HANDLING:
  □ Apakah exception dari async {} di-handle saat await()?
  □ Apakah ada CoroutineExceptionHandler untuk uncaught exception dari launch?
  □ Apakah coroutineScope vs supervisorScope dipilih dengan benar sesuai kebutuhan?

SHARED STATE:
  □ Apakah tidak ada shared mutable var yang diakses dari Dispatchers.Default?
  □ Apakah MutableStateFlow.update{} digunakan (bukan value = value + 1)?
  □ Apakah Mutex coroutine (bukan java.util.concurrent) digunakan untuk suspend context?

FLOW:
  □ Apakah Flow tidak di-collect di background thread yang salah?
  □ Apakah SharedFlow/StateFlow digunakan untuk hot streams, Flow untuk cold?
  □ Apakah exception di dalam flow di-handle dengan catch operator?

RESOURCE:
  □ Apakah newSingleThreadContext / newFixedThreadPoolContext selalu di-close?
  □ Apakah Channel selalu di-close dari sisi pengirim?
  □ Apakah tidak ada resource leak di dalam coroutine yang mungkin di-cancel?

Ringkasan

  • Kotlin mewarisi seluruh model threading Java — synchronized, @Volatile, java.util.concurrent, AtomicInteger — semuanya bisa dipakai langsung dengan sintaks Kotlin yang lebih ringkas.
  • Race condition tetap bisa terjadi di Kotlin, baik di thread biasa maupun di coroutine yang berjalan di Dispatchers.Default (multi-threaded).
  • suspend function bisa dijeda tanpa memblokir thread — fondasi dari seluruh model concurrency coroutine.
  • launch untuk fire-and-forget (return Job); async untuk tugas yang mengembalikan nilai (return Deferred<T> yang bisa di-await).
  • CoroutineDispatcher menentukan thread pool: Default untuk CPU-bound, IO untuk I/O-bound, Main untuk UI thread.
  • Structured concurrency memastikan coroutine tidak hidup lebih lama dari scope-nya — cegah coroutine leak secara otomatis.
  • coroutineScope menyebarkan kegagalan ke semua sibling; supervisorScope mengisolasi kegagalan agar child lain tetap berjalan.
  • Mutex dari kotlinx.coroutines.sync adalah versi suspend-friendly dari lock — suspend coroutine tanpa memblokir thread saat menunggu lock.
  • Untuk shared counter sederhana, AtomicInteger/AtomicLong lebih efisien. Untuk state yang lebih kompleks, Mutex atau single-thread confinement lebih tepat.
  • Channel untuk komunikasi antar coroutine (push-based); Flow untuk stream data yang di-pull oleh collector.
  • StateFlow untuk reaktif UI state (selalu punya nilai saat ini); SharedFlow untuk event/broadcast ke banyak collector.
  • MutableStateFlow.update { } untuk modifikasi state secara atomik — hindari _state.value = _state.value + 1 yang rawan race condition.
  • Jangan gunakan GlobalScope — selalu ikat coroutine ke scope yang punya lifecycle (ViewModel scope, lifecycle scope, atau scope manual yang di-cancel).
  • Jangan telan CancellationException — tangkap spesifik exception yang diharapkan, dan re-throw CancellationException.
  • Untuk testing, gunakan runTest dan advanceUntilIdle() / advanceTimeBy() agar waktu virtual bisa dikontrol tanpa menunggu durasi sesungguhnya.

Portofolio