Getting Started with Rust and RabbitMQ using lapin

Last updated: April 13, 2025

1. Introduction: Rust for Messaging

RabbitMQ is a powerful tool for building decoupled, asynchronous systems. When building applications in Rust, you need a client library to interact with a RabbitMQ broker. While several options exist, lapin is a popular, actively maintained, and purely Rust implementation of the AMQP 0.9.1 protocol, designed to work well with asynchronous Rust runtimes like Tokio.

This guide demonstrates the basics of using lapin to connect to RabbitMQ, declare queues, publish messages, and consume messages from a Rust application.

2. The lapin Crate

lapin provides asynchronous methods for interacting with RabbitMQ. It leverages Rust's async/await syntax and integrates smoothly with the Tokio runtime. Key features include:

  • Full AMQP 0.9.1 implementation.
  • Asynchronous API built on Tokio.
  • Support for connections, channels, exchanges, queues, publishing, and consuming.
  • TLS support.

3. Prerequisites

4. Project Setup

Create a new Rust binary project:

cargo new rust_rabbitmq_example
cd rust_rabbitmq_example

Add lapin and tokio to your Cargo.toml:

[package]
name = "rust_rabbitmq_example"
version = "0.1.0"
edition = "2021"

[dependencies]
lapin = "2.3" # Check crates.io for the latest version
tokio = { version = "1", features = ["full"] }
futures-lite = "1" # Often needed for stream handling with lapin

Or use cargo add:

cargo add lapin@2.3
cargo add tokio --features full
cargo add futures-lite

5. Connecting to RabbitMQ

The first step is establishing a connection and creating a channel.

use lapin::{Connection, ConnectionProperties, Result, Channel};
use tokio::runtime::Runtime; // Using Tokio runtime directly for this example

async fn connect() -> Result {
    let addr = "amqp://guest:guest@localhost:5672/%2f"; // Default guest user, replace if needed
    let conn = Connection::connect(
        &addr,
        ConnectionProperties::default().with_tokio() // Use tokio executor
    ).await?; // Use ? for error propagation

    println!("CONNECTED");

    // Create a channel
    let channel = conn.create_channel().await?;
    println!("Created channel");

    Ok(channel)
}

// We'll use this connect function in publisher/consumer examples
// Example of how to run it (not needed for publisher/consumer sections below)
/*
fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        match connect().await {
            Ok(_) => println!("Connection and channel successful!"),
            Err(e) => eprintln!("Error connecting: {}", e),
        }
    });
}
*/

This async function attempts to connect to RabbitMQ running on localhost using default credentials and creates a channel if successful. It uses ConnectionProperties::default().with_tokio() to integrate with the Tokio runtime.

6. Publishing Messages

Let's create a simple publisher that declares a queue and sends a message to it via the default exchange.

6.1 Declaring a Queue

Before sending to a queue, it's good practice to ensure it exists using queue_declare. This operation is idempotent.

6.2 Publishing Logic

We use basic_publish to send the message. We send to the default exchange (empty string ""), which routes messages to queues whose name matches the message's routing key.

6.3 Complete Publisher Code

Modify src/main.rs:

use lapin::{
    options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties, Result, Channel, Queue,
};
use tokio; // Make sure tokio is in scope

async fn connect() -> Result {
    // (Keep the connect function from section 5 here)
    let addr = "amqp://guest:guest@localhost:5672/%2f";
    let conn = Connection::connect(
        &addr,
        ConnectionProperties::default().with_tokio()
    ).await?;
    println!("PUBLISHER: CONNECTED");
    let channel = conn.create_channel().await?;
    println!("PUBLISHER: Created channel");
    Ok(channel)
}

async fn declare_queue(channel: &Channel, queue_name: &str) -> Result {
     let queue = channel
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(), // Use default options
            FieldTable::default(),
        )
        .await?;
    println!("PUBLISHER: Declared queue '{}'", queue_name);
    Ok(queue)
}

#[tokio::main]
async fn main() -> Result<()> {
    let channel = connect().await.expect("Failed to connect to RabbitMQ");
    let queue_name = "hello_rust_queue";

    // Declare the queue (idempotent)
    let _queue = declare_queue(&channel, queue_name).await.expect("Failed to declare queue");

    // Message content
    let payload = b"Hello from Rust Publisher!";
    let routing_key = queue_name; // Route directly to queue via default exchange

    // Publish the message
    channel
        .basic_publish(
            "", // Default exchange
            routing_key,
            BasicPublishOptions::default(),
            payload, // Message body as bytes
            BasicProperties::default(), // Default properties
        )
        .await? // Wait for confirmation (if server confirms)
        .await?; // Wait for the actual publish confirmation

    println!("PUBLISHER: Sent message to queue '{}'", queue_name);

    // Note: Connection closes when `main` exits and `conn` (inside connect) is dropped.
    Ok(())
}

7. Consuming Messages

Now, let's create a consumer that listens to the queue declared by the publisher.

7.1 Declaring the Queue (Idempotent)

It's good practice for the consumer to also declare the queue it expects to exist. If it already exists with compatible parameters, RabbitMQ does nothing; otherwise, it might create it or raise an error if parameters conflict.

7.2 Consuming Logic

We use basic_consume to register a consumer. This returns a stream of incoming messages (Delivery objects). We need to iterate over this stream (using `futures_lite::StreamExt`) and acknowledge each message (basic_ack) after processing to remove it from the queue.

7.3 Complete Consumer Code

Create a new file (e.g., src/consumer.rs) or modify src/main.rs (run only one at a time or rename main functions).

use futures_lite::stream::StreamExt; // For stream iteration
use lapin::{
    options::*, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties, Result, Queue,
};
use tokio;
use std::str;

async fn connect() -> Result {
    // (Keep the connect function from section 5 here)
     let addr = "amqp://guest:guest@localhost:5672/%2f";
    let conn = Connection::connect(
        &addr,
        ConnectionProperties::default().with_tokio()
    ).await?;
    println!("CONSUMER: CONNECTED");
    let channel = conn.create_channel().await?;
    println!("CONSUMER: Created channel");
    Ok(channel)
}

async fn declare_queue(channel: &Channel, queue_name: &str) -> Result {
     let queue = channel
        .queue_declare(
            queue_name,
            QueueDeclareOptions::default(),
            FieldTable::default(),
        )
        .await?;
    println!("CONSUMER: Declared queue '{}'", queue_name);
    Ok(queue)
}

#[tokio::main]
async fn main() -> Result<()> {
    let channel = connect().await.expect("Failed to connect to RabbitMQ");
    let queue_name = "hello_rust_queue";

    // Declare the queue (idempotent)
    let _queue = declare_queue(&channel, queue_name).await.expect("Failed to declare queue");

    println!("CONSUMER: [*] Waiting for messages. To exit press CTRL+C");

    // Create consumer
    let mut consumer = channel
        .basic_consume(
            queue_name,
            "my_consumer", // Unique consumer tag
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await?;

    // Iterate over incoming messages
    while let Some(delivery_result) = consumer.next().await {
        match delivery_result {
            Ok(delivery) => {
                // Decode message payload
                let message_body = str::from_utf8(&delivery.data).unwrap_or("Could not decode");
                println!("CONSUMER: [x] Received '{}'", message_body);

                // Acknowledge the message
                if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
                     eprintln!("CONSUMER: Failed to ack message: {}", e);
                } else {
                    println!("CONSUMER: [x] Done processing and acknowledged.");
                }
            }
            Err(e) => {
                eprintln!("CONSUMER: Error in consumer stream: {}", e);
                break; // Exit loop on consumer error
            }
        }
    }

    Ok(())
}

8. Running the Examples

  1. Ensure your RabbitMQ container is running (docker start my-rabbit if stopped).
  2. Open two terminals in your project directory.
  3. In the first terminal, run the consumer (assuming the code is in src/main.rs):
    cargo run
    It will connect and wait for messages.
  4. Modify src/main.rs to contain the **Publisher** code (or move consumer code to another file and rename its main).
  5. In the second terminal, run the publisher:
    cargo run
  6. Observe the output: The publisher sends the message, and the consumer receives, processes, and acknowledges it.

9. Conclusion

The lapin crate provides a robust and idiomatic way to interact with RabbitMQ from asynchronous Rust applications. By leveraging Tokio and async/await, you can build performant producers and consumers. This guide covered the essential steps: connecting, creating channels, declaring queues, publishing messages, and consuming/acknowledging messages. From here, you can explore more advanced RabbitMQ features like different exchange types, message persistence, error handling, and worker pool patterns.