In this part, we’ll explore how to set up AMQP (Advanced Message Queuing Protocol) with RabbitMQ for asynchronous message processing. This enables a worker pattern where HTTP requests queue messages that are processed asynchronously.
The messaging system consists of:
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Constants.kt
object Constants {
const val RABBITMQ_EXCHANGE = "notifications_exchange"
const val RABBITMQ_QUEUE = "notifications_queue"
const val RABBITMQ_ROUTING_KEY = "notifications.send"
}
These constants define:
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageBroker.kt
interface MessageBroker {
suspend fun initialize()
suspend fun publish(exchange: String, routingKey: String, message: String)
suspend fun startConsuming(queue: String, handler: MessageHandler)
}
This interface defines three core operations:
initialize(): Set up the connection and declare exchanges/queuespublish(): Send a message to an exchange with a routing keystartConsuming(): Start consuming messages from a queue with a handlerFile: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/RabbitMqMessageBroker.kt
class RabbitMqMessageBroker(
private val coroutineScope: CoroutineScope,
private val host: String,
private val port: Int,
private val user: String,
private val password: String,
) : MessageBroker {
private lateinit var amqpConnection: AMQPConnection
private lateinit var amqpChannel: AMQPChannel
override suspend fun initialize() {
amqpConnection = createRobustAMQPConnection(coroutineScope) {
server {
host = this@RabbitMqMessageBroker.host
port = this@RabbitMqMessageBroker.port
user = this@RabbitMqMessageBroker.user
password = this@RabbitMqMessageBroker.password
}
}
amqpChannel = amqpConnection.openChannel().also {
it.exchangeDeclare {
name = Constants.RABBITMQ_EXCHANGE
type = BuiltinExchangeType.TOPIC
}
it.queueDeclare {
name = Constants.RABBITMQ_QUEUE
durable = true
}
it.queueBind {
queue = Constants.RABBITMQ_QUEUE
exchange = Constants.RABBITMQ_EXCHANGE
routingKey = Constants.RABBITMQ_ROUTING_KEY
}
}
}
override suspend fun publish(exchange: String, routingKey: String, message: String) {
amqpChannel.basicPublish(
body = message.toByteArray(),
exchange = exchange,
routingKey = routingKey,
properties = properties {
deliveryMode = 2u // Make message persistent
}
)
}
override suspend fun startConsuming(queue: String, handler: MessageHandler) {
amqpChannel.basicConsume(
queue = queue,
noAck = false,
onDelivery = { delivery ->
handler(delivery.message.routingKey, delivery.message.body.decodeToString())
amqpChannel.basicAck(delivery.message.deliveryTag, false)
}
)
}
}
createRobustAMQPConnection() for automatic reconnectionnoAck = false)basicAck)The project uses the Kourier AMQP client (dev.kourier:amqp-client-robust),
which
provides:
In build.gradle.kts:
sourceSets {
commonMain.dependencies {
implementation(libs.amqp)
// ... other dependencies
}
}
And in gradle/libs.versions.toml:
[versions]
amqp = "0.4.0"
[libraries]
amqp = { module = "dev.kourier:amqp-client-robust", version.ref = "amqp" }
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageHandler.kt
interface MessageHandler {
suspend operator fun invoke(routingKey: String, body: String)
}
The handler uses the invoke operator, allowing instances to be called like functions. It receives:
routingKey: The routing key of the messagebody: The message body as a stringEvent Data Class (
src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationEvent.kt):
@Serializable
data class SendNotificationEvent(
val title: String,
val body: String,
val token: String,
)
Handler Implementation (
src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationHandler.kt):
class SendNotificationHandler(
private val notificationService: NotificationService,
) : MessageHandler {
override suspend fun invoke(routingKey: String, body: String) {
val event = Serialization.json.decodeFromString<SendNotificationEvent>(body)
notificationService.sendNotification(
token = event.token,
title = event.title,
body = event.body,
)
}
}
SendNotificationEvent using Kotlinx SerializationNotificationService to send the notificationFile: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Serialization.kt
object Serialization {
val json = Json {
ignoreUnknownKeys = true
explicitNulls = false
}
}
This configured JSON instance is used for:
Configuration:
ignoreUnknownKeys = true: Allows adding fields without breaking old consumersexplicitNulls = false: Omits null values from JSON outputFile: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/config/MessageBroker.kt
fun Application.configureMessageBroker() = runBlocking {
val messageBroker by inject<MessageBroker>()
messageBroker.initialize()
messageBroker.startConsuming(Constants.RABBITMQ_QUEUE, get<SendNotificationHandler>())
}
This configuration function:
MessageBroker instance from KoinSendNotificationHandlerThe RabbitMQ connection is configured via environment variables in the Koin module:
single<MessageBroker> {
RabbitMqMessageBroker(
coroutineScope = this@mainModule,
host = getEnv("RABBITMQ_HOST") ?: "localhost",
port = getEnv("RABBITMQ_PORT")?.toIntOrNull() ?: 5672,
user = getEnv("RABBITMQ_USER") ?: "guest",
password = getEnv("RABBITMQ_PASSWORD") ?: "guest",
)
}
Default values:
localhost5672guestguestSendNotificationEvent to RabbitMQSendNotificationHandler deserializes and processes itNotificationService sends the FCM notificationThis architecture provides:
The AMQP setup demonstrates:
In the next part, we’ll explore how to define HTTP routes that publish messages to this message broker.