Concurrency dan Race Condition di Kotlin: Panduan Lengkap dari Thread hingga Coroutines
Kotlin adalah bahasa yang tumbuh di atas ekosistem JVM, mewarisi seluruh model threading Java — platform thread, synchronized, java.util.concurrent, volatile — semuanya bisa dipakai langsung dari Kotlin. Namun Kotlin tidak berhenti di sana. JetBrains merancang Coroutines sebagai solusi concurrency first-class yang lebih ekspresif, lebih aman, dan jauh lebih ringan dibanding thread tradisional. Coroutine bukan fitur bahasa dalam arti keyword baru di kompiler — ia adalah library (kotlinx.coroutines) yang dibangun di atas mekanisme suspension yang disediakan kompiler Kotlin. Hasilnya adalah model concurrency yang terasa seperti kode synchronous biasa, tapi bisa berjalan secara concurrent tanpa memblokir thread OS.
Memahami concurrency di Kotlin berarti memahami dua dunia sekaligus: warisan dari Java yang tetap relevan (terutama untuk interoperabilitas dengan library Java dan kode CPU-bound), dan Coroutines beserta ekosistemnya (Channel, Flow, CoroutineScope, structured concurrency) yang menjadi cara idiomatik Kotlin untuk I/O-bound concurrency. Artikel ini membahas keduanya secara mendalam — termasuk race condition yang bisa terjadi di keduanya, cara mendeteksinya, dan cara mencegahnya dengan mekanisme sinkronisasi yang tepat.
Thread di Kotlin: Warisan dari Java
Karena Kotlin berjalan di atas JVM, semua primitif threading Java tersedia langsung. Kotlin hanya menambahkan sedikit sintaks yang lebih bersih:
// Membuat thread dengan lambda (Kotlin idiom)
val thread = Thread {
println("Berjalan di: ${Thread.currentThread().name}")
}
thread.start()
// Atau dengan extension function bawaan Kotlin
val t = thread(name = "worker-1") { // dari kotlin.concurrent
println("Berjalan di: ${Thread.currentThread().name}")
}
// Thread dengan daemon flag
thread(isDaemon = true, name = "background") {
while (true) {
doBackgroundWork()
Thread.sleep(1000)
}
}
Seluruh java.util.concurrent — ExecutorService, ReentrantLock, AtomicInteger, ConcurrentHashMap — bisa dipakai langsung dari Kotlin karena interop penuh dengan Java. Namun di Kotlin modern, kebutuhan untuk membuat thread secara manual sangat jarang; hampir selalu ada solusi yang lebih baik via Coroutines.
Race Condition di Kotlin: Sama Berbahayanya dengan Java
Race condition di Kotlin bekerja persis seperti di Java — karena keduanya berbagi model memori JVM. Kode yang mengakses shared mutable state dari beberapa thread tanpa sinkronisasi menghasilkan perilaku yang tidak deterministik.
Contoh Klasik: Counter Tidak Aman
// ANTI-PATTERN: race condition pada shared counter
var counter = 0 // shared mutable state
val threads = (1..10).map {
Thread {
repeat(1000) {
counter++ // TIDAK AMAN: baca-tambah-tulis bukan operasi atomik
}
}
}
threads.forEach { it.start() }
threads.forEach { it.join() }
println(counter) // bukan 10000 — hasilnya tidak terprediksi setiap kali dijalankan
Visibility Problem di Kotlin
// ANTI-PATTERN: visibility problem
class TaskRunner {
private var running = true // tidak ada jaminan visibility antar thread
fun stop() {
running = false // thread lain mungkin tidak pernah melihat ini
}
fun run() {
while (running) { // mungkin loop selamanya di CPU cache
doWork()
}
}
}
// BENAR: @Volatile annotation (setara dengan Java volatile)
class SafeTaskRunner {
@Volatile
private var running = true // dijamin visible ke semua thread
fun stop() { running = false }
fun run() {
while (running) {
doWork()
}
}
}
Sinkronisasi dengan synchronized dan Lock
Kotlin mendukung @Synchronized annotation dan synchronized() function:
// Menggunakan synchronized block
class SafeCounter {
private var count = 0
private val lock = Any() // objek sebagai monitor
fun increment() {
synchronized(lock) {
count++
}
}
fun getCount(): Int = synchronized(lock) { count }
}
// Menggunakan @Synchronized annotation (setara synchronized method di Java)
class SynchronizedCounter {
private var count = 0
@Synchronized
fun increment() { count++ }
@Synchronized
fun getCount(): Int = count
}
// Menggunakan ReentrantLock dari Java
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock // extension function Kotlin
class LockCounter {
private var count = 0
private val lock = ReentrantLock()
fun increment() = lock.withLock { count++ } // withLock auto-unlock di finally
fun getCount(): Int = lock.withLock { count }
}
Atomic Classes
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.LongAdder
// AtomicInteger untuk counter thread-safe tanpa lock
val atomicCounter = AtomicInteger(0)
atomicCounter.incrementAndGet()
atomicCounter.addAndGet(5)
atomicCounter.compareAndSet(10, 20) // CAS
// LongAdder untuk high-contention increment-only counter
val adder = LongAdder()
adder.increment()
val total = adder.sum()
Memperkenalkan Coroutines: Concurrency Tanpa Thread Baru
Coroutine adalah unit eksekusi yang bisa di-suspend (dijeda) dan di-resume (dilanjutkan) tanpa memblokir thread yang menjalankannya. Berbeda dengan thread yang ketika melakukan I/O akan “tidur” dan menunggu di sana (memblokir thread OS), coroutine yang melakukan operasi suspend akan melepaskan thread-nya sehingga thread tersebut bisa mengerjakan coroutine lain.
sequenceDiagram
participant T as Thread
participant C1 as Coroutine 1
participant C2 as Coroutine 2
T->>C1: menjalankan C1
C1->>C1: doWork()
C1-->>T: suspend (menunggu I/O)
T->>C2: menjalankan C2 (thread bebas!)
C2->>C2: doOtherWork()
C2-->>T: suspend (menunggu I/O)
T->>C1: resume C1 (I/O selesai)
C1->>C1: lanjut setelah I/OSatu thread bisa menjalankan ribuan coroutine bergantian, membuat coroutine jauh lebih ringan dari thread dan cocok untuk aplikasi dengan banyak concurrent I/O.
Menambahkan Dependency
// build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
// Untuk Android:
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.0")
// Untuk Spring Boot:
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.0")
}
Suspend Function: Fondasi Coroutines
Sebuah fungsi yang bisa di-suspend ditandai dengan keyword suspend. Fungsi ini hanya bisa dipanggil dari coroutine lain atau fungsi suspend lainnya — tidak bisa dari kode biasa secara langsung.
// Fungsi suspend — bisa dijeda tanpa memblokir thread
suspend fun fetchUser(id: Int): User {
delay(500) // suspend selama 500ms, thread TIDAK diblokir
return User(id, "John Doe")
}
suspend fun fetchOrders(userId: Int): List<Order> {
delay(300) // suspend, thread bebas mengerjakan coroutine lain
return listOf(Order(1), Order(2))
}
// Menggunakan suspend function — harus dari dalam coroutine
suspend fun loadDashboard() {
val user = fetchUser(1) // suspend di sini
val orders = fetchOrders(1) // lalu suspend lagi
println("${user.name}: ${orders.size} orders")
}
delay()adalah suspend function darikotlinx.coroutinesyang menjeda coroutine tanpa memblokir thread — berbeda denganThread.sleep()yang memblokir thread OS. Gunakan selaludelay()di dalam coroutine, bukanThread.sleep().
CoroutineScope dan Structured Concurrency
Setiap coroutine harus dibuat di dalam sebuah CoroutineScope. Scope ini mendefinisikan lifecycle dari coroutine — ketika scope dibatalkan, semua coroutine di dalamnya juga dibatalkan secara otomatis. Inilah inti dari structured concurrency: coroutine tidak bisa “bocor” keluar dari scope yang mendefinisikannya.
import kotlinx.coroutines.*
fun main() = runBlocking { // membuat scope dan block thread hingga selesai
println("Mulai di: ${Thread.currentThread().name}")
launch { // meluncurkan coroutine baru di dalam scope ini
delay(1000)
println("Coroutine 1 selesai")
}
launch {
delay(500)
println("Coroutine 2 selesai")
}
println("Menunggu semua coroutine selesai...")
// runBlocking menunggu sampai semua child coroutine selesai
}
launch vs async
launch digunakan untuk coroutine yang tidak mengembalikan nilai (fire-and-forget). async digunakan ketika kamu butuh hasil dari coroutine (mengembalikan Deferred<T>).
runBlocking {
// launch: tidak ada return value
val job: Job = launch {
delay(1000)
println("Pekerjaan selesai")
}
job.join() // tunggu job selesai
// async: mengembalikan Deferred<T>, bisa di-await
val deferred: Deferred<Int> = async {
delay(1000)
42 // return value
}
val result = deferred.await() // suspend hingga hasil tersedia
println("Hasil: $result")
// Menjalankan dua tugas secara paralel dengan async
val deferredA = async { fetchUser(1) }
val deferredB = async { fetchOrders(1) }
val user = deferredA.await() // tunggu keduanya
val orders = deferredB.await()
// fetchUser dan fetchOrders berjalan BERSAMAAN, bukan berurutan
}
flowchart LR
subgraph Sequential
A[fetchUser] --> B[fetchOrders]
B --> C[Selesai - 800ms]
end
subgraph Parallel dengan async
D[fetchUser - 500ms] --> F[await A + B - 500ms]
E[fetchOrders - 300ms] --> F
F --> G[Selesai - 500ms]
endCoroutine Dispatcher: Menentukan Thread Mana yang Dipakai
CoroutineDispatcher menentukan di thread mana sebuah coroutine berjalan. Ini adalah pengganti idiomatik Kotlin untuk memilih antara thread pool yang berbeda.
// Dispatcher.Default: thread pool untuk CPU-intensive work
// Jumlah thread = jumlah core CPU
launch(Dispatchers.Default) {
val result = heavyComputation() // perhitungan berat
}
// Dispatchers.IO: thread pool untuk I/O operations
// Jumlah thread lebih banyak (default 64, atau jumlah core jika lebih besar)
launch(Dispatchers.IO) {
val data = readFile("data.txt") // file I/O
val response = httpClient.get(url) // network call
val rows = database.query(sql) // database query
}
// Dispatchers.Main: main/UI thread (Android, JavaFX, Swing)
launch(Dispatchers.Main) {
updateUI(result) // hanya di Android/GUI environment
}
// Dispatchers.Unconfined: tidak terikat thread tertentu (jarang dipakai)
launch(Dispatchers.Unconfined) {
println(Thread.currentThread().name) // berjalan di thread pemanggil
}
Berpindah Dispatcher dalam Satu Coroutine
suspend fun loadAndProcess(): String {
// Mulai dengan I/O: ambil data dari network
val rawData = withContext(Dispatchers.IO) {
httpClient.get("https://api.example.com/data")
}
// Pindah ke Default: proses data (CPU-intensive)
val processed = withContext(Dispatchers.Default) {
parseAndTransform(rawData)
}
// Kembali ke Main untuk update UI (di Android)
withContext(Dispatchers.Main) {
showResult(processed)
}
return processed
}
| Dispatcher | Thread Pool | Cocok untuk |
|---|---|---|
Default | CPU cores | Parsing, sorting, komputasi |
IO | 64+ threads | File, network, database |
Main | 1 (UI thread) | Update UI |
Unconfined | Thread pemanggil | Testing, kasus khusus |
Race Condition di Coroutines
Meskipun coroutine terasa sequential dalam penulisannya, race condition tetap bisa terjadi ketika beberapa coroutine mengakses shared mutable state — terutama jika berjalan di dispatcher berbeda atau di coroutine yang diluncurkan secara paralel.
Contoh: Counter Race Condition di Coroutines
// ANTI-PATTERN: race condition di coroutines
var counter = 0 // shared mutable state
runBlocking {
val jobs = (1..1000).map {
launch(Dispatchers.Default) {
counter++ // TIDAK AMAN meski terlihat seperti kode sequential!
}
}
jobs.forEach { it.join() }
}
println(counter) // bukan 1000 — ada lost updates
Ini terjadi karena Dispatchers.Default menggunakan banyak thread, dan counter++ tetap bukan operasi atomik di level CPU.
Solusi 1: Atomic dari Java
import java.util.concurrent.atomic.AtomicInteger
// BENAR: AtomicInteger untuk counter sederhana
val counter = AtomicInteger(0)
runBlocking {
val jobs = (1..1000).map {
launch(Dispatchers.Default) {
counter.incrementAndGet() // atomik
}
}
jobs.forEach { it.join() }
}
println(counter.get()) // selalu 1000
Solusi 2: Mutex dari Coroutines
kotlinx.coroutines.sync.Mutex adalah versi suspend-friendly dari mutex — saat tidak bisa mendapat lock, ia meng-suspend coroutine (bukan memblokir thread):
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
// BENAR: Mutex coroutine-friendly
val mutex = Mutex()
var counter = 0
runBlocking {
val jobs = (1..1000).map {
launch(Dispatchers.Default) {
mutex.withLock { // suspend jika lock tidak tersedia, tidak blokir thread
counter++
}
}
}
jobs.forEach { it.join() }
}
println(counter) // selalu 1000
Solusi 3: Single-Thread Confinement
Pola paling idiomatik di Kotlin: batasi akses ke shared state hanya dari satu coroutine/thread:
// BENAR: batasi akses counter ke single-threaded dispatcher
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
runBlocking {
val jobs = (1..1000).map {
launch(Dispatchers.Default) {
withContext(counterContext) {
counter++ // aman karena hanya satu thread yang mengeksekusi ini
}
}
}
jobs.forEach { it.join() }
}
println(counter) // selalu 1000
counterContext.close()
Solusi 4: Actor Pattern dengan Channel
Mengikuti filosofi “share by communicating” — satu coroutine memiliki state, yang lain berkomunikasi via channel:
import kotlinx.coroutines.channels.*
// Actor: coroutine yang memproses pesan secara sequential
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // state hanya dipegang oleh actor ini
for (msg in channel) {
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
runBlocking {
val counter = counterActor()
val jobs = (1..1000).map {
launch(Dispatchers.Default) {
counter.send(IncCounter) // kirim pesan, tidak ada shared state
}
}
jobs.forEach { it.join() }
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter: ${response.await()}") // selalu 1000
counter.close()
}
Channel: Komunikasi Antar Coroutine
Channel di Kotlin Coroutines adalah setara dengan Go channel — “pipa” bertipe untuk komunikasi antar coroutine dengan sinkronisasi bawaan.
import kotlinx.coroutines.channels.*
runBlocking {
// Unbuffered channel: pengirim suspend sampai ada penerima
val channel = Channel<Int>()
launch {
for (i in 1..5) {
println("Mengirim $i")
channel.send(i) // suspend jika tidak ada penerima
}
channel.close() // beritahu penerima tidak ada data lagi
}
for (value in channel) { // iterate sampai channel ditutup
println("Menerima $value")
}
}
// Buffered channel
val buffered = Channel<Int>(capacity = 10)
// Rendezvous (unbuffered, default)
val rendezvous = Channel<Int>(Channel.RENDEZVOUS)
// Unlimited buffer (hati-hati: bisa OOM)
val unlimited = Channel<Int>(Channel.UNLIMITED)
// Drop oldest jika penuh
val dropping = Channel<Int>(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)
Fan-Out dan Fan-In dengan Channel
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++)
}
fun CoroutineScope.processNumber(id: Int, numbers: ReceiveChannel<Int>) = launch {
for (msg in numbers) {
println("Worker $id memproses $msg")
}
}
// Fan-out: satu producer, banyak consumer
runBlocking {
val producer = produceNumbers()
repeat(3) { id ->
processNumber(id, producer) // 3 worker berbagi channel yang sama
}
delay(1000)
producer.cancel() // hentikan producer
}
// Fan-in: gabungkan beberapa channel menjadi satu
fun CoroutineScope.mergeChannels(vararg channels: ReceiveChannel<Int>): ReceiveChannel<Int> =
produce {
for (channel in channels) {
launch {
for (value in channel) send(value)
}
}
}
Flow: Asynchronous Data Streams
Flow adalah abstraksi untuk cold asynchronous stream — sebuah urutan nilai yang diproduksi secara asynchronous dan hanya mulai mengalir ketika ada yang mengcollect-nya.
import kotlinx.coroutines.flow.*
// Membuat Flow
fun numbersFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(100) // operasi suspend di dalam flow builder
emit(i) // memancarkan nilai
}
}
// Collecting Flow (terminal operator, suspend function)
runBlocking {
numbersFlow()
.filter { it % 2 == 0 } // transformasi: filter
.map { it * it } // transformasi: map
.collect { value -> // terminal operator
println(value) // 4, 16
}
}
Operator Flow yang Umum Dipakai
val flow = flowOf(1, 2, 3, 4, 5)
// Transformasi
flow.map { it * 2 } // transformasi setiap elemen
flow.filter { it > 2 } // filter elemen
flow.flatMapMerge { fetchData(it) } // flatMap concurrent
flow.transform { emit(it); emit(it * 2) } // emit beberapa nilai per elemen
// Kombinasi
flow1.zip(flow2) { a, b -> a + b } // zip dua flow
flow1.combine(flow2) { a, b -> a + b } // combine (re-emit saat salah satu berubah)
merge(flow1, flow2) // merge beberapa flow menjadi satu
// Kontrol
flow.take(3) // hanya ambil 3 elemen pertama
flow.debounce(300) // tunggu 300ms tenang sebelum emit (cocok untuk search input)
flow.distinctUntilChanged() // skip jika nilai sama dengan sebelumnya
flow.retry(3) // retry hingga 3 kali jika error
flow.catch { e -> emit(defaultValue) } // error handling
// Terminal
flow.toList() // kumpulkan semua ke List (suspend)
flow.first() // ambil elemen pertama saja (suspend)
flow.count() // hitung jumlah elemen (suspend)
flow.reduce { acc, value -> acc + value } // reduce (suspend)
Cold vs Hot Flow
Flow biasa adalah cold — setiap collector mendapat aliran data dari awal, dan flow hanya berjalan saat ada collector:
val coldFlow = flow {
println("Flow mulai berjalan")
emit(1)
emit(2)
}
runBlocking {
coldFlow.collect { println("Collector 1: $it") }
// "Flow mulai berjalan" tercetak lagi untuk collector kedua
coldFlow.collect { println("Collector 2: $it") }
}
SharedFlow dan StateFlow adalah hot — berjalan terlepas dari ada collector atau tidak, dan bisa dibagikan ke banyak collector:
SharedFlow dan StateFlow: Hot Streams
SharedFlow: Broadcast ke Banyak Collector
import kotlinx.coroutines.flow.*
val sharedFlow = MutableSharedFlow<String>(
replay = 1, // subscriber baru langsung dapat 1 nilai terakhir
extraBufferCapacity = 10 // buffer tambahan agar emit tidak suspend
)
runBlocking {
// Emitter
launch {
repeat(5) { i ->
sharedFlow.emit("Event $i")
delay(100)
}
}
// Dua collector menerima event yang sama
launch {
sharedFlow.collect { println("Collector A: $it") }
}
launch {
sharedFlow.collect { println("Collector B: $it") }
}
delay(1000)
}
StateFlow: State Holder yang Reaktif
StateFlow adalah SharedFlow khusus yang selalu punya nilai saat ini, mirip LiveData di Android:
import kotlinx.coroutines.flow.*
class ViewModel {
private val _uiState = MutableStateFlow(UiState.Loading)
val uiState: StateFlow<UiState> = _uiState.asStateFlow() // expose sebagai read-only
suspend fun loadData() {
_uiState.value = UiState.Loading
try {
val data = repository.fetchData()
_uiState.value = UiState.Success(data)
} catch (e: Exception) {
_uiState.value = UiState.Error(e.message ?: "Terjadi kesalahan")
}
}
}
// Di UI / collector
viewModel.uiState
.collect { state ->
when (state) {
is UiState.Loading -> showLoading()
is UiState.Success -> showData(state.data)
is UiState.Error -> showError(state.message)
}
}
Flow (Cold) | SharedFlow (Hot) | StateFlow (Hot) | |
|---|---|---|---|
| Mulai berjalan | Saat ada collector | Selalu | Selalu |
| Nilai saat ini | Tidak ada | Tidak ada (kecuali replay) | Ya, selalu |
| Banyak collector | Setiap dapat aliran sendiri | Semua dapat event yang sama | Semua dapat state terkini |
| Cocok untuk | Data pipeline, query | Event/event bus | UI State, config |
Structured Concurrency di Kotlin
Structured concurrency adalah salah satu kontribusi terbesar Kotlin Coroutines. Prinsipnya sederhana: coroutine tidak bisa hidup lebih lama dari scope yang membuatnya. Ini secara otomatis mencegah coroutine leak.
// Ketika scope di-cancel, semua child coroutine ikut di-cancel
val scope = CoroutineScope(Dispatchers.Default)
scope.launch {
launch { delay(Long.MAX_VALUE) } // child 1
launch { delay(Long.MAX_VALUE) } // child 2
delay(Long.MAX_VALUE) // parent
}
scope.cancel() // semua coroutine di atas langsung di-cancel
coroutineScope vs supervisorScope
// coroutineScope: jika satu child gagal, semua child lain di-cancel
suspend fun fetchDashboard() = coroutineScope {
val user = async { fetchUser() } // jika ini gagal...
val orders = async { fetchOrders() } // ...ini juga di-cancel
DashboardData(user.await(), orders.await())
}
// supervisorScope: kegagalan satu child tidak mempengaruhi child lain
suspend fun fetchAll() = supervisorScope {
val user = async { fetchUser() }
val orders = async { fetchOrders() }
val recommendations = async { fetchRecommendations() }
// Rekomendasi mungkin gagal, tapi user dan orders tetap diproses
DashboardData(
user = user.await(),
orders = orders.await(),
recommendations = try { recommendations.await() } catch (e: Exception) { emptyList() }
)
}
flowchart TD
subgraph "coroutineScope — kegagalan menyebar"
CS[coroutineScope] --> CA[async: fetchUser ✓]
CS --> CB[async: fetchOrders ✗ GAGAL]
CB -->|cancel| CA
CB -->|gagal| CS
end
subgraph "supervisorScope — kegagalan terisolasi"
SS[supervisorScope] --> SA[async: fetchUser ✓]
SS --> SB[async: fetchOrders ✗ GAGAL]
SS --> SC[async: fetchRecs ✓]
SB -->|tidak mempengaruhi| SA
SB -->|tidak mempengaruhi| SC
endCancellation dan Exception Handling
Cooperative Cancellation
Coroutine di Kotlin bersifat cooperative — ia hanya bisa di-cancel di titik-titik suspension. Kode CPU-bound yang tidak pernah suspend tidak bisa di-cancel secara otomatis:
// ANTI-PATTERN: tidak cooperative, tidak bisa di-cancel
val job = launch {
var i = 0
while (true) { // loop tanpa suspend point
i++
// tidak ada suspension di sini — cancel() tidak akan berpengaruh
}
}
job.cancel() // tidak akan bekerja!
// BENAR: periksa isActive atau gunakan yield()
val job = launch {
var i = 0
while (isActive) { // cek apakah masih aktif
i++
}
}
// Atau gunakan yield() untuk memberi kesempatan cancel/schedule
val job = launch {
var i = 0
while (true) {
yield() // suspension point: cek cancel dan beri giliran scheduler
i++
}
}
job.cancel()
CancellationException
Ketika coroutine di-cancel, CancellationException dilempar di titik suspension berikutnya. Penting: jangan tangkap CancellationException dan menelannya:
// ANTI-PATTERN: menelan CancellationException
launch {
try {
delay(1000)
} catch (e: Exception) { // menangkap SEMUA exception termasuk CancellationException!
println("Error: ${e.message}") // seharusnya tidak dilakukan ini
// coroutine tidak benar-benar di-cancel
}
}
// BENAR: hanya tangkap yang bukan CancellationException
launch {
try {
delay(1000)
} catch (e: CancellationException) {
throw e // re-throw CancellationException!
} catch (e: Exception) {
println("Error: ${e.message}") // tangkap error lainnya
}
}
// Atau gunakan try/finally untuk cleanup
launch {
try {
delay(1000)
doWork()
} finally {
// cleanup selalu dijalankan, baik selesai normal maupun di-cancel
closeResources()
}
}
CoroutineExceptionHandler
Untuk menangkap uncaught exception dari launch (bukan async):
val handler = CoroutineExceptionHandler { _, exception ->
println("Uncaught exception: $exception")
// log, lapor ke Sentry, dsb
}
val scope = CoroutineScope(Dispatchers.Default + handler)
scope.launch {
throw RuntimeException("Sesuatu yang tidak terduga!")
// CoroutineExceptionHandler akan dipanggil
}
Anti-Pattern Umum di Kotlin Coroutines
Anti-Pattern 1: GlobalScope
// ANTI-PATTERN: GlobalScope — coroutine tidak terikat lifecycle apapun
fun loadData() {
GlobalScope.launch { // bahaya: hidup selamanya, bisa leak!
val data = fetchFromNetwork()
updateUI(data)
}
}
// BENAR: gunakan scope yang terikat lifecycle
class MyViewModel : ViewModel() {
fun loadData() {
viewModelScope.launch { // di-cancel otomatis saat ViewModel destroyed
val data = fetchFromNetwork()
_uiState.value = UiState.Success(data)
}
}
}
// Atau buat scope sendiri yang bisa di-cancel
class DataLoader {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
fun load() {
scope.launch { /* ... */ }
}
fun destroy() {
scope.cancel() // cancel semua coroutine
}
}
Anti-Pattern 2: Blocking di dalam Coroutine
// ANTI-PATTERN: memanggil blocking code di coroutine tanpa dispatcher IO
suspend fun fetchData(): String {
val response = OkHttpClient().newCall(request).execute() // BLOCKING!
// ini memblokir carrier thread, mengurangi efektivitas coroutine
return response.body?.string() ?: ""
}
// BENAR: wrap blocking code dengan withContext(Dispatchers.IO)
suspend fun fetchData(): String {
return withContext(Dispatchers.IO) {
val response = OkHttpClient().newCall(request).execute() // blocking OK di sini
response.body?.string() ?: ""
}
}
// BENAR: lebih baik lagi, gunakan library yang sudah suspend-aware
suspend fun fetchData(): String {
return httpClient.get(url).bodyAsText() // ktor client: sudah suspend, tidak blocking
}
Anti-Pattern 3: Tidak Menangani Exception di async
// ANTI-PATTERN: exception di async baru muncul saat await dipanggil
// dan mudah terlewatkan
val deferred = async {
throw RuntimeException("Gagal!")
}
// exception belum muncul di sini...
delay(100)
deferred.await() // baru meledak di sini — mudah terlupa
// BENAR: tangani exception saat await
val result = try {
deferred.await()
} catch (e: Exception) {
defaultValue
}
// Atau gunakan runCatching
val result = runCatching { deferred.await() }
.getOrElse { defaultValue }
Anti-Pattern 4: Membuat Dispatcher Baru yang Tidak Diclose
// ANTI-PATTERN: dispatcher baru tanpa close — thread leak
suspend fun doWork() {
val dispatcher = newSingleThreadContext("worker")
withContext(dispatcher) {
// ...
}
// dispatcher tidak di-close, thread tetap hidup selamanya!
}
// BENAR: close dispatcher setelah selesai
suspend fun doWork() {
val dispatcher = newSingleThreadContext("worker")
try {
withContext(dispatcher) {
// ...
}
} finally {
dispatcher.close() // pastikan thread dibebaskan
}
}
Anti-Pattern 5: Race Condition pada Shared StateFlow
// ANTI-PATTERN: modifikasi StateFlow dari beberapa coroutine tanpa sinkronisasi
val _count = MutableStateFlow(0)
repeat(1000) {
launch(Dispatchers.Default) {
_count.value = _count.value + 1 // RACE CONDITION: baca-modifikasi-tulis!
}
}
// BENAR: gunakan update() yang atomik
repeat(1000) {
launch(Dispatchers.Default) {
_count.update { it + 1 } // atomik: tidak ada lost update
}
}
Pola Concurrency Produksi di Kotlin
Retry dengan Exponential Backoff
suspend fun <T> retryWithBackoff(
times: Int = 3,
initialDelay: Long = 100,
maxDelay: Long = 1000,
factor: Double = 2.0,
block: suspend () -> T
): T {
var currentDelay = initialDelay
repeat(times - 1) { attempt ->
try {
return block()
} catch (e: Exception) {
if (e is CancellationException) throw e // jangan retry cancel
println("Percobaan ${attempt + 1} gagal: ${e.message}, retry dalam ${currentDelay}ms")
}
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
return block() // percobaan terakhir, biarkan exception menyebar
}
// Penggunaan
val result = retryWithBackoff(times = 3, initialDelay = 100) {
httpClient.get("https://api.example.com/data")
}
Timeout
import kotlinx.coroutines.*
// withTimeout: lempar TimeoutCancellationException jika melebihi batas
val result = withTimeout(5000L) {
fetchDataFromNetwork() // harus selesai dalam 5 detik
}
// withTimeoutOrNull: return null jika timeout, tidak throw exception
val result: String? = withTimeoutOrNull(5000L) {
fetchDataFromNetwork()
} ?: "default value"
Parallel Decomposition
// Menjalankan banyak request secara paralel dan mengumpulkan hasilnya
suspend fun fetchAllUsers(ids: List<Int>): List<User> = coroutineScope {
ids.map { id ->
async { fetchUser(id) } // setiap request berjalan paralel
}.awaitAll() // tunggu semua selesai, throw jika ada yang gagal
}
// Dengan kontrol batas concurrency (semaphore)
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
suspend fun fetchAllUsersLimited(ids: List<Int>, maxConcurrent: Int = 10): List<User> {
val semaphore = Semaphore(maxConcurrent) // maksimal 10 request bersamaan
return coroutineScope {
ids.map { id ->
async {
semaphore.withPermit { // tunggu izin sebelum mulai
fetchUser(id)
}
}
}.awaitAll()
}
}
Worker Pool dengan Channel
fun CoroutineScope.workerPool(
workers: Int,
jobs: ReceiveChannel<Int>,
results: SendChannel<Int>
) {
repeat(workers) { workerId ->
launch(Dispatchers.IO) {
for (job in jobs) {
val result = processJob(job)
results.send(result)
println("Worker $workerId selesai job $job")
}
}
}
}
runBlocking {
val jobs = Channel<Int>(Channel.UNLIMITED)
val results = Channel<Int>(Channel.UNLIMITED)
// Isi jobs
repeat(100) { jobs.send(it) }
jobs.close()
// Jalankan 5 worker paralel
workerPool(workers = 5, jobs = jobs, results = results)
// Kumpulkan hasil
repeat(100) { println("Hasil: ${results.receive()}") }
}
Testing Coroutines
Menguji kode concurrent selalu tricky. kotlinx-coroutines-test menyediakan tool khusus:
import kotlinx.coroutines.test.*
import kotlin.test.*
// TestCoroutineScheduler untuk kontrol waktu virtual
class UserViewModelTest {
@Test
fun `loading state berubah dengan benar`() = runTest {
val viewModel = UserViewModel()
viewModel.loadUser(1)
// advanceUntilIdle: jalankan semua coroutine yang pending
advanceUntilIdle()
assertEquals(UiState.Success::class, viewModel.uiState.value::class)
}
@Test
fun `delay di-virtualisasi tanpa menunggu waktu nyata`() = runTest {
var result = 0
launch {
delay(10_000) // 10 detik — tapi test tidak benar-benar menunggu 10 detik!
result = 42
}
advanceTimeBy(10_001) // majukan waktu virtual
assertEquals(42, result)
}
}
Checklist Review Kode Concurrent Kotlin
THREAD SAFETY DASAR:
□ Apakah semua akses ke var yang dipakai dari beberapa coroutine/thread aman?
□ Apakah @Volatile digunakan untuk flag yang dibaca dari thread berbeda?
□ Apakah tidak ada HashMap/ArrayList biasa yang diakses dari beberapa thread?
COROUTINE SCOPE:
□ Apakah GlobalScope tidak digunakan? (gunakan scope yang terikat lifecycle)
□ Apakah setiap CoroutineScope yang dibuat manual di-cancel saat tidak dipakai?
□ Apakah tidak ada coroutine yang bisa "bocor" keluar dari scope-nya?
DISPATCHER:
□ Apakah blocking I/O selalu di dalam withContext(Dispatchers.IO)?
□ Apakah CPU-intensive work di dalam withContext(Dispatchers.Default)?
□ Apakah tidak ada Thread.sleep() di dalam coroutine (gunakan delay())?
CANCELLATION:
□ Apakah CancellationException tidak ditelan (catch Exception tanpa re-throw)?
□ Apakah long-running CPU task memeriksa isActive atau memanggil yield()?
□ Apakah cleanup di dalam finally block (bukan hanya di catch)?
EXCEPTION HANDLING:
□ Apakah exception dari async {} di-handle saat await()?
□ Apakah ada CoroutineExceptionHandler untuk uncaught exception dari launch?
□ Apakah coroutineScope vs supervisorScope dipilih dengan benar sesuai kebutuhan?
SHARED STATE:
□ Apakah tidak ada shared mutable var yang diakses dari Dispatchers.Default?
□ Apakah MutableStateFlow.update{} digunakan (bukan value = value + 1)?
□ Apakah Mutex coroutine (bukan java.util.concurrent) digunakan untuk suspend context?
FLOW:
□ Apakah Flow tidak di-collect di background thread yang salah?
□ Apakah SharedFlow/StateFlow digunakan untuk hot streams, Flow untuk cold?
□ Apakah exception di dalam flow di-handle dengan catch operator?
RESOURCE:
□ Apakah newSingleThreadContext / newFixedThreadPoolContext selalu di-close?
□ Apakah Channel selalu di-close dari sisi pengirim?
□ Apakah tidak ada resource leak di dalam coroutine yang mungkin di-cancel?
Ringkasan
- Kotlin mewarisi seluruh model threading Java —
synchronized,@Volatile,java.util.concurrent,AtomicInteger— semuanya bisa dipakai langsung dengan sintaks Kotlin yang lebih ringkas.- Race condition tetap bisa terjadi di Kotlin, baik di thread biasa maupun di coroutine yang berjalan di
Dispatchers.Default(multi-threaded).suspendfunction bisa dijeda tanpa memblokir thread — fondasi dari seluruh model concurrency coroutine.launchuntuk fire-and-forget (returnJob);asyncuntuk tugas yang mengembalikan nilai (returnDeferred<T>yang bisa di-await).CoroutineDispatchermenentukan thread pool:Defaultuntuk CPU-bound,IOuntuk I/O-bound,Mainuntuk UI thread.- Structured concurrency memastikan coroutine tidak hidup lebih lama dari scope-nya — cegah coroutine leak secara otomatis.
coroutineScopemenyebarkan kegagalan ke semua sibling;supervisorScopemengisolasi kegagalan agar child lain tetap berjalan.Mutexdarikotlinx.coroutines.syncadalah versi suspend-friendly dari lock — suspend coroutine tanpa memblokir thread saat menunggu lock.- Untuk shared counter sederhana,
AtomicInteger/AtomicLonglebih efisien. Untuk state yang lebih kompleks, Mutex atau single-thread confinement lebih tepat.Channeluntuk komunikasi antar coroutine (push-based);Flowuntuk stream data yang di-pull oleh collector.StateFlowuntuk reaktif UI state (selalu punya nilai saat ini);SharedFlowuntuk event/broadcast ke banyak collector.MutableStateFlow.update { }untuk modifikasi state secara atomik — hindari_state.value = _state.value + 1yang rawan race condition.- Jangan gunakan
GlobalScope— selalu ikat coroutine ke scope yang punya lifecycle (ViewModel scope, lifecycle scope, atau scope manual yang di-cancel).- Jangan telan
CancellationException— tangkap spesifik exception yang diharapkan, dan re-throwCancellationException.- Untuk testing, gunakan
runTestdanadvanceUntilIdle()/advanceTimeBy()agar waktu virtual bisa dikontrol tanpa menunggu durasi sesungguhnya.