ktor-native-worker-tutorial

Part 3: AMQP Setup and Calls

In this part, we’ll explore how to set up AMQP (Advanced Message Queuing Protocol) with RabbitMQ for asynchronous message processing. This enables a worker pattern where HTTP requests queue messages that are processed asynchronously.

Architecture Overview

The messaging system consists of:

Constants Configuration

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Constants.kt

object Constants {

    const val RABBITMQ_EXCHANGE = "notifications_exchange"
    const val RABBITMQ_QUEUE = "notifications_queue"
    const val RABBITMQ_ROUTING_KEY = "notifications.send"

}

These constants define:

MessageBroker Interface

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageBroker.kt

interface MessageBroker {
    suspend fun initialize()
    suspend fun publish(exchange: String, routingKey: String, message: String)
    suspend fun startConsuming(queue: String, handler: MessageHandler)
}

This interface defines three core operations:

RabbitMQ Implementation

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/RabbitMqMessageBroker.kt

class RabbitMqMessageBroker(
    private val coroutineScope: CoroutineScope,
    private val host: String,
    private val port: Int,
    private val user: String,
    private val password: String,
) : MessageBroker {
    private lateinit var amqpConnection: AMQPConnection
    private lateinit var amqpChannel: AMQPChannel

    override suspend fun initialize() {
        amqpConnection = createRobustAMQPConnection(coroutineScope) {
            server {
                host = this@RabbitMqMessageBroker.host
                port = this@RabbitMqMessageBroker.port
                user = this@RabbitMqMessageBroker.user
                password = this@RabbitMqMessageBroker.password
            }
        }

        amqpChannel = amqpConnection.openChannel().also {
            it.exchangeDeclare {
                name = Constants.RABBITMQ_EXCHANGE
                type = BuiltinExchangeType.TOPIC
            }
            it.queueDeclare {
                name = Constants.RABBITMQ_QUEUE
                durable = true
            }
            it.queueBind {
                queue = Constants.RABBITMQ_QUEUE
                exchange = Constants.RABBITMQ_EXCHANGE
                routingKey = Constants.RABBITMQ_ROUTING_KEY
            }
        }
    }

    override suspend fun publish(exchange: String, routingKey: String, message: String) {
        amqpChannel.basicPublish(
            body = message.toByteArray(),
            exchange = exchange,
            routingKey = routingKey,
            properties = properties {
                deliveryMode = 2u // Make message persistent
            }
        )
    }

    override suspend fun startConsuming(queue: String, handler: MessageHandler) {
        amqpChannel.basicConsume(
            queue = queue,
            noAck = false,
            onDelivery = { delivery ->
                handler(delivery.message.routingKey, delivery.message.body.decodeToString())
                amqpChannel.basicAck(delivery.message.deliveryTag, false)
            }
        )
    }
}

Initialization Flow

  1. Connection Creation:
    • Uses createRobustAMQPConnection() for automatic reconnection
    • Configured with RabbitMQ server credentials
    • Scoped to a coroutine scope for lifecycle management
  2. Channel Setup:
    • Opens an AMQP channel on the connection
    • Declares a TOPIC exchange for flexible routing
    • Declares a durable queue (survives server restarts)
    • Binds the queue to the exchange with a routing key
  3. Publishing Messages:
    • Converts message string to bytes
    • Sets delivery mode to 2 (persistent messages)
    • Routes message via exchange and routing key
  4. Consuming Messages:
    • Sets up a consumer on the queue
    • Uses manual acknowledgment (noAck = false)
    • Delegates message handling to the provided handler
    • Acknowledges message after successful processing (basicAck)

Using the Kourier AMQP Client

The project uses the Kourier AMQP client (dev.kourier:amqp-client-robust), which provides:

Dependency Configuration

In build.gradle.kts:

sourceSets {
    commonMain.dependencies {
        implementation(libs.amqp)
        // ... other dependencies
    }
}

And in gradle/libs.versions.toml:

[versions]
amqp = "0.4.0"

[libraries]
amqp = { module = "dev.kourier:amqp-client-robust", version.ref = "amqp" }

Message Handler Interface

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageHandler.kt

interface MessageHandler {
    suspend operator fun invoke(routingKey: String, body: String)
}

The handler uses the invoke operator, allowing instances to be called like functions. It receives:

Notification Event and Handler

Event Data Class ( src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationEvent.kt):

@Serializable
data class SendNotificationEvent(
    val title: String,
    val body: String,
    val token: String,
)

Handler Implementation ( src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationHandler.kt):

class SendNotificationHandler(
    private val notificationService: NotificationService,
) : MessageHandler {
    override suspend fun invoke(routingKey: String, body: String) {
        val event = Serialization.json.decodeFromString<SendNotificationEvent>(body)
        notificationService.sendNotification(
            token = event.token,
            title = event.title,
            body = event.body,
        )
    }
}

Handler Flow

  1. Receives the message body as JSON string
  2. Deserializes to SendNotificationEvent using Kotlinx Serialization
  3. Calls the NotificationService to send the notification
  4. The service is injected via constructor (dependency injection)

Serialization Setup

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Serialization.kt

object Serialization {
    val json = Json {
        ignoreUnknownKeys = true
        explicitNulls = false
    }
}

This configured JSON instance is used for:

Configuration:

Integration with Application

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/config/MessageBroker.kt

fun Application.configureMessageBroker() = runBlocking {
    val messageBroker by inject<MessageBroker>()
    messageBroker.initialize()
    messageBroker.startConsuming(Constants.RABBITMQ_QUEUE, get<SendNotificationHandler>())
}

This configuration function:

  1. Injects the MessageBroker instance from Koin
  2. Initializes the connection and channel
  3. Starts consuming messages with the SendNotificationHandler

Environment Configuration

The RabbitMQ connection is configured via environment variables in the Koin module:

single<MessageBroker> {
    RabbitMqMessageBroker(
        coroutineScope = this@mainModule,
        host = getEnv("RABBITMQ_HOST") ?: "localhost",
        port = getEnv("RABBITMQ_PORT")?.toIntOrNull() ?: 5672,
        user = getEnv("RABBITMQ_USER") ?: "guest",
        password = getEnv("RABBITMQ_PASSWORD") ?: "guest",
    )
}

Default values:

Message Flow

  1. HTTP request arrives at the API endpoint
  2. Route publishes a SendNotificationEvent to RabbitMQ
  3. Message is stored in the queue
  4. Consumer receives the message
  5. SendNotificationHandler deserializes and processes it
  6. NotificationService sends the FCM notification

This architecture provides:

Summary

The AMQP setup demonstrates:

In the next part, we’ll explore how to define HTTP routes that publish messages to this message broker.