Hi there! welcome back, this time we will be exploring how to implement some kind of websockets rooms using tide and tide-websockets.

The general goal is to allow clients to select or subscribe to a room and receive updates produced by another tasks, but filtered to ensure that each room receive their updates.

For our use case we will using a task tracking tweets related to some topics, from the twitter api. And we will split this topics in different rooms so our clients can subscribe to a room using a websocket connection and receive only the tweets related to the room’s topics.

So, let start creating our app and adding some deps

Base setup

1
2
3
4
5
6
7
$ cargo init tide-ws-rooms && cd tide-ws-room

$ cargo add tide tide-websockets async-std
    Updating 'https://github.com/rust-lang/crates.io-index' index
      Adding tide v0.16.0 to dependencies
      Adding tide-websockets v0.3.0 to dependencies
      Adding async-std v1.9.0 to dependencies

Great! let’s now add the basic structure of the app (and the attributes feature from async-std).

1
2
3
4
5
// Cargo.toml
[dependencies]
tide = "0.16.0"
tide-websockets = "0.3.0"
async-std = { version = "1.9.0", features = ["attributes"] }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// main.rs

use tide_websockets::{Message as WSMessage, WebSocket};

#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
    tide::log::start();

    let mut app = tide::new();
    // serve public dir for assets
    app.at("/public").serve_dir("./public/")?;

    // index route
    app.at("/").serve_file("public/index.html")?;

    // ws route
    app.at("/ws")
        .get(WebSocket::new(|_req, wsc| async move {
            println!("new ws connection");

            Ok(())
        }));

    let port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string());
    let addr = format!("0.0.0.0:{}", port);
    app.listen(addr).await?;

    Ok(())
}

As you can see we are serving and index and a public directory with css assets. The front-end of this app is very similar to the one we use in exploring-sse-with-rust-and-tide-by-streaming-tweets and you can also check the code in the repo. We are also defining a ws route to handle websocket connections, for the moment only print a new line when a connection arrive.

Let’s run to see if we can establish the ws connection.

1
$ cargo run

image

Awesome, we create a new ws connection. Also, you can see the message in the server

1
2
3
4
5
6
7
8
9
tide::log::middleware <-- Request received
    method GET
    path /ws
tide::log::middleware --> Response sent
    method GET
    path /ws
    status 101 - Switching Protocols
    duration 181.388µs
new ws connection

Setup tracking

Let’s now add the logic to track the topics from the twitter api. We will be using the same logic that we use to explore sse, so let’s add those lines to our main file.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
// main.rs
use tide_websockets::{Message as WSMessage, WebSocket};
use async_std::task;
use broadcaster::BroadcastChannel;
use serde::de;
use serde::{Deserialize, Serialize};
use twitter_stream::Token;
use futures::prelude::*;


#[derive(Debug, serde::Deserialize)]
pub struct RequestBody {
    topics: Vec<String>,
}

#[derive(Clone, Debug)]
struct State {
    broadcaster: BroadcastChannel<Tweet>
}

#[derive(Deserialize)]
#[serde(untagged)]
enum StreamMessage {
    Tweet(Tweet),
    Other(de::IgnoredAny),
}

#[derive(Deserialize, Serialize, Clone, Debug)]
struct Tweet {
    id: u64,
    id_str: String,
    text: String,
    user: User,
    timestamp_ms: String,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
struct User {
    id: u64,
    screen_name: String,
    profile_image_url_https: String,
}

async fn spawn_tracker(broadcaster: BroadcastChannel<Tweet>, topics: String) {
    println!("topics : {}", topics);
    let token = Token::from_parts(
        std::env::var("TW_CONSUMER_KEY").expect("missing env var TW_CONSUMER_KEY"),
        std::env::var("TW_CONSUMER_SECRET").expect("missing env var TW_CONSUMER_SECRET"),
        std::env::var("TW_TOKEN").expect("missing env var TW_TOKEN"),
        std::env::var("TW_SECRET").expect("missing env var TW_SECRET"),
    );

    task::spawn(async move {
        let mut tracker = twitter_stream::Builder::new(token.as_ref());
        let mut stream = tracker.track(&topics).listen().try_flatten_stream();

        while let Some(json) = stream.next().await {
            if let Ok(StreamMessage::Tweet(tw)) = serde_json::from_str(&json.unwrap()) {
                //println!("receive a  tweet! ... , {}", tw.text);
                match broadcaster.send(&tw).await {
                    Ok(_) => {}
                    Err(_) => {
                        println!("Error sending to broadcaster")
                    }
                };
            }
        }
    });
}

#[async_std::main]
async fn main() -> Result<(), std::io::Error> {
    dotenv::dotenv().ok();

    tide::log::start();

    let broadcaster = BroadcastChannel::new();

    spawn_tracker(broadcaster.clone(), "hello-world".to_string()).await;

    let mut app = tide::with_state(State { broadcaster });
    // serve public dir for assets
    app.at("/public").serve_dir("./public/")?;

    // index route
    app.at("/").serve_file("public/index.html")?;

    // ws route
    app.at("/ws")
        .get(WebSocket::new(|_req, wsc| async move {
            println!("new ws connection");

            Ok(())
        }));

    let port = std::env::var("PORT").unwrap_or_else(|_| "8080".to_string());
    let addr = format!("0.0.0.0:{}", port);
    app.listen(addr).await?;

    Ok(())
}

We add a couple of things:

  • deps we need.
  • Structs to deserialize tweets.
  • app State struct to hold the broadcaster.

Let’s run again to check that works as expected (remember to set the .env file with the api credentials).

1
2
3
4
5
6
tide::log Logger started
    level Info
topics : hello-world
tide::server Server listening on http://0.0.0.0:8080
thread 'async-std/runtime' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/pepo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.3.0/src/runtime/blocking/pool.rs:85:33
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Nice, the spawned task just panic. I forget to mention that the crate we use to tracks topics from the twitter api use tokio, so we need to enable the feature in async-std

1
async-std = { version = "1.9.0", features = ["attributes", "tokio1"] }

And try again….

1
2
3
4
tide::log Logger started
    level Info
topics : hello-world
tide::server Server listening on http://0.0.0.0:8080

Nice! now it’s working :)

Connecting the dots

Now we have the two parts we want to connect, from one side ws connections and in the other a task that produce tweets.

But first let me make a mention of the Sink trait, because if you look the tide-websockets repo implementing this trait to allow user to split the ws connection and move the tx part out of the handler and use in other parts of the app (for example in the producer to send the tweets directly to the clients) is requested but not implementing Sink has been a deliberate choice in the http-rs ecosystem as you can see in the pr comment. A different approach, of sending messages to the handler with some kind of channel or broadcaster is also mention in the comments (check this comment) and that is the path we will try to connect the dots :)

Now that we have clear the approach we will use, let’s think how to implement it. If you check the code you will notice that the broadcaster is already there but not one is listening the messages. We need now a mechanism to represent the rooms and allow users to subscribe to one or another, also this rooms will have the logic to decide if the receive message (a tweet in our case) should be forwarder to the connection or not.

Let’s add the Room struct now…

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#[derive(Clone, Debug)]
struct Room {
    id: u8,
    topics: Vec<String>,
    regex: Regex,
}

impl Room {
    pub fn should_send(&self, text: &str) -> bool {
        self.regex.is_match(text)
    }
}

Every room will have a vector of topics and a regex used to decide if we need to forward the tweet or not. To use the Regex we also need to add it to our deps.

Now, to make us easy to add more rooms we need to add a couple of helper functions that will get us the topics vector and the regex from an &str, so we can have the room’s topics in a resource file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
fn get_topics(input_str: &str) -> Vec<String> {
    let temp_vec: Vec<&str> = input_str.split('\n').collect();
    let topics: Vec<String> = temp_vec.iter().map(|s| s.to_string()).collect();
    topics
}

fn get_regex(input_str: &str) -> String {
    let temp_vec: Vec<&str> = input_str.split('\n').collect();
    let topics: Vec<String> = temp_vec.iter().map(|s| format!(r"(\b{}\b)", s)).collect();
    topics.join("|")
}

And now in our main.rs code we can include the topics files and used to construct the room

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  let nba_input = include_str!("../public/nba.txt");
    let rust_input = include_str!("../public/rust.txt");
    let premier_input = include_str!("../public/premier.txt");

    let nba_room = Room {
        topics: get_topics(nba_input),
        regex: Regex::new(&get_regex(nba_input)).unwrap(),
    };
    let nba_topics_str = nba_room.topics.join(",");

    let rust_room = Room {
        topics: get_topics(rust_input),
        regex: Regex::new(&get_regex(rust_input)).unwrap(),
    };
    let rust_topics_str = rust_room.topics.join(",");

    let premier_room = Room {
        topics: get_topics(premier_input),
        regex: Regex::new(&get_regex(premier_input)).unwrap(),
    };
    let premier_topics_str = premier_room.topics.join(",");
    let mut rooms: HashMap<String, Room> = HashMap::new();
    rooms.insert("nba".to_string(), nba_room);
    rooms.insert("rust".to_string(), rust_room);
    rooms.insert("premier".to_string(), premier_room);

    // spawn tracker
    let topics_str = format!(
        "{},{},{}",
        nba_topics_str, rust_topics_str, premier_topics_str
    );
    spawn_tracker(broadcaster.clone(), topics_str).await;

    let mut app = tide::with_state(State { broadcaster, rooms });

Also, we add the rooms HashMap to the State and use the topics from the files in the tracker task.

Now we need to finish the logic of the ws handler to forward the appropriated tweets.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    // ws route
    app.at("/ws")
        .get(WebSocket::new(|req: tide::Request<State>, wsc| async move {
            let state = req.state().clone();
            let rooms = state.rooms;
            let broadcaster = state.broadcaster.clone();
            let mut combined_stream = futures_util::stream::select(
                wsc.clone().map(Either::Left),
                broadcaster.clone().map(Either::Right),
            );

            // by default we put new connections in the nba room
            let mut current_room = rooms.get("nba");

            while let Some(item) = combined_stream.next().await {
                match item {
                    Either::Left(Ok(WSMessage::Text(message))) => {
                        println!("message : {}", message);
                        current_room = rooms.get(&message);
                    }

                    Either::Right(tweet) => {
                        if let Some(room) = current_room {
                            if room.should_send(&tweet.text) {
                                wsc.send_json(&tweet).await?;
                            }
                        }
                    }
                    _o => {
                        return Err(tide::http::format_err!("no idea"));
                    }
                }
            }
            Ok(())
        }));

Notice that we add the futures-util dependency to use in the handler.

Let’s try one more time, now we should receive tweets in the web.

image

Nice! It’s working and we can select the room and see the tweets related to those topics.

But if you look the code you will notice that we are using the regex to matching every tweet in every connection and that isn’t good, we can improve the code to have a better performances.

Improving performance

We want to change our code to run the regex only one time per tweet, so we will run N times where N is the number of rooms.

To archive this, let’s start by moving the filtering to the tracker and produce a roomMessage instead of a tweet that will hold the tweet and the identifier of the room.

Add the id to the Room and the RoomMessage to our code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#[derive(Clone, Debug)]
struct Room {
    id: String,
    topics: Vec<String>,
    regex: Regex,
}

struct RoomMessage {
    room_id: String,
    tweet: Tweet
}

And now we need to make some changes:

  • Update the State broadcaster to use RoomMessage.
  • Filtering in the tracker.
  • Remove the regex from the handler and use an if to decide if we need to send.

Let’s change the tracker…

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    task::spawn(async move {
        let mut tracker = twitter_stream::Builder::new(token.as_ref());
        let mut stream = tracker.track(&topics).listen().try_flatten_stream();

        while let Some(json) = stream.next().await {
            if let Ok(StreamMessage::Tweet(tw)) = serde_json::from_str(&json.unwrap()) {
                //println!("receive a  tweet! ... , {}", tw.text);
                for (key, room) in &rooms {
                    if room.should_send(&tw.text) {
                        let msg = RoomMessage {
                            room_id: key.to_string(),
                            tweet: tw.clone()
                        };
                        match broadcaster.send(&msg).await {
                            Ok(_) => {}
                            Err(_) => {
                                println!("Error sending to broadcaster")
                            }
                        };
                    }
                }
            }
        }
    });

And the handler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    app.at("/ws")
        .get(WebSocket::new(|req: tide::Request<State>, wsc| async move {
            let state = req.state().clone();
            let rooms = state.rooms;
            let broadcaster = state.broadcaster.clone();
            let mut combined_stream = futures_util::stream::select(
                wsc.clone().map(Either::Left),
                broadcaster.clone().map(Either::Right),
            );

            // by default we put new connections in the nba room
            let mut current_room = rooms.get("nba");

            while let Some(item) = combined_stream.next().await {
                match item {
                    Either::Left(Ok(WSMessage::Text(message))) => {
                        println!("message : {}", message);
                        current_room = rooms.get(&message);
                    }

                    Either::Right(room_message) => {
                        if let Some(room) = current_room {
                            if room.id == room_message.room_id {
                                wsc.send_json(&room_message.tweet).await?;
                            }
                        }
                    }
                    _o => {
                        return Err(tide::http::format_err!("no idea"));
                    }
                }
            }
            Ok(())
        }))

Nice! we improved the performance and we are now using one regex per tweet per room.

We can run again and see it in action…

image


That’s all for today, we explored how to use rooms with websockets in Tide and improved the performance of our first implementation. This version has a fixed and hardcoded number of rooms/topics, in the next post we will make this dynamic to allow users to create it own rooms.

You can check the complete code in the repo and see it in action in the labs link.

As always, I write this as a learning journal and there could be another more elegant and correct way to do it and any feedback is welcome.

Thanks!