Websockets with Rust and Actix Web

The Actix framework for rust is an actor based framework strictly following the actor pattern. REST APIs can be built simply and intuitively. One of the main reasons I chose it over rocket was, at the time, it runs on stable rust!

The syntax is very easy to work with and will be familiar to Java / Spring developers if that is your background. Let’s take a look at a very simple API

//main.rs
use actix_cors::Cors;
use actix_web::{Responder, HttpResponse, HttpServer, App, get};

#[get("/")]
async fn get() -> impl Responder {
    println!("GET /");
    HttpResponse::Ok().body("test")
}

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    println!("Started");
    HttpServer::new(move || {
        App::new()
            .wrap(Cors::new().send_wildcard().finish())
            .service(get)
    })
        .bind("0.0.0.0:8120")?
        .run()
        .await
}

The few dependencies you will need are

actix-cors="0.2.0"
actix-rt = "1.1.1"
actix-web = "2.0.0"

Checkout the docs for more detailed examples! When looking at using websockets, personally I hit a few stumbling blocks on how best to put the pieces together. There are some code examples but they seem long winded and lack any real guidance on what they are doing. Let’s have a look at a simpler example than the ones in the repo but with a bit more meat on the bones than you get in the docs.

Using the example from the docs we start with

use actix_cors::Cors;
use actix_web::{web, Responder, HttpResponse, HttpServer, App, get, Error, HttpRequest};
use actix::{Actor, StreamHandler};
use actix_web_actors::ws;


#[get("/")]
async fn get() -> impl Responder {
    println!("GET /");
    HttpResponse::Ok().body("test")
}

/// Define http actor
struct MyWs;

impl Actor for MyWs {
    type Context = ws::WebsocketContext<Self>;
}

/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
    fn handle(
        &mut self,
        msg: Result<ws::Message, ws::ProtocolError>,
        ctx: &mut Self::Context,
    ) {
        match msg {
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
            Ok(ws::Message::Text(text)) => ctx.text(text),
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            _ => (),
        }
    }
}

async fn index(req: HttpRequest, stream: web::Payload) -> HttpResponse {
    let resp = ws::start(MyWs {}, &req, stream).unwrap();
    println!("{:?}", resp);
    resp
}

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    println!("Started");
    HttpServer::new(move || {
        App::new()
            .wrap(Cors::new().send_wildcard().finish())
            .service(get)
            .route("/ws/", web::get().to(index))
    })
        .bind("0.0.0.0:8120")?
        .run()
        .await
}

So we have added a new index function and a MyWs struct. The index function is the entry point to the websocket connection, API consumers will be accessing this method to switch protocols and upgrade to a websocket connection. This will call the actix-web-actors method ws::start passing a new instance of our MyWs struct and the original request and stream.

MyWs

Looking at the call made to ws::start, the actor struct that is passed to the method is constrained so that it must implement Actor with a type definition for WebsocketContext and StreamHandler<..>. These have both been provided in the example code.

The type definition is quite interesting and is used as an argument to the StreamHandler implementation and ensures that the instance passed to ws::start has its methods called when websocket actions are performed. This can be seen with the ctx methods: pong, text and binary as called in the handle function in the StreamHandler implementation.

This is all great and clients can push data to the server, we can receive it and do stuff. There is however a different case that is almost always the reason I end up using websockets, updating a UI in realtime as things happen.

Publishing data to clients

With some small additions to the example given, we can enable publishing data on the websocket to connected clients. This is best achieved through leveraging the actor pattern inherent in the actix framework. We will extend our actor to handle the types of messages we will publish and we will alter the way we start the web socket in order to get a handle on the actor for use by our publishing mechanism

The additions to the actor

#[derive(Message)]
#[rtype(result = "()")]
pub struct Payload<T> {
    pub payload: T,
}

impl<T> Handler<Payload<T>> for MyWs where T: Serialize + Debug {
    type Result = ();

    fn handle(&mut self, msg: Payload<T>, ctx: &mut Self::Context) {
        println!("handle {:?}", msg.payload);
        ctx.text(serde_json::to_string(&msg.payload).expect("Cannot serialize"));
    }
}

The people from actix have been very kind here and given us some macro rules for implementing the required code for our payload struct. Simply add the 2 macros as above to whatever struct you want to send to your actor.

The new implementation is for handling the messages. Notice the type definition matching the rtype of the Payload macro? This is for tying together what should be returned from the handle function.

Now we need a way of sending messages to this method. To do this we alter our index method above to start the websocket and return a handle we can use to send messages to the actor

async fn index(req: HttpRequest, stream: web::Payload) -> HttpResponse {
    let (addr, resp) = ws::start_with_addr(MyWs {}, &req, stream).unwrap();
    let recipient = addr.recipient();
    task::spawn(async move {
        loop {
            println!("Send ping");
            let result = recipient.send(Payload { payload: "ping".to_string() });
            let result = result.await;
            result.unwrap();
            sleep(Duration::from_secs(1));
        }
    });
    println!("{:?}", resp);
    resp
}

It should be noted that tokio::task is being used here to spawn the thread. This allows the async send method to be called on the recipient. Notice how the send and await calls are on separate lines. This is because the recipient does not implement Send and cannot be used across an await in a single line. If this is combined to a single line, you will get compile errors saying future is not Send. That took me ages to debug! Hopefully this will save your bacon

The full source code for this walk through is available here

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: