h1
Tide Channels
-
2020-01-29
WebSocket (WS) support for Tide has been a long-anticipated feature. More recently requests for Server Sent Events (SSE) support have started to pop up as well. In this post we’ll look at the motivation, requirements, and design for WS and SSE support in Tide. But to not keep you waiting, this is the Tl;Dr of what we’ve come up with:
let mut app = tide::new();
app.at("/sse").get(tide::sse()); // Endpoint to connect new SSE channels on.
app.at("/").get(async |req| {
req.sse().send(b"hello Chashu").await?; // Send a message over the SSE channel.
Response::new(200)
});
app.listen("127.0.0.1:8080").await?;
Note: unlike some of my other posts that document work that’s been completed already, this post documents designs that are currently underway. The upside of this is that I can share my notes as I’m doing research! The downside is that the designs outlined here don’t exist yet, and will likely change during implementation and stabilization.
Why channels?
Before diving into how WS and SSE support could be exposed it’s worth asking why we want these features in the first place. Live chat is a pretty common example, but however fun that is to write, most uses are more boring. Examples include:
- When a new notification is available, update the notification count.
- When a new email is available, fetch it into the inbox.
- When kicking off a CI process, push the terminal output into the browser.
These features could all be implemented by periodically making HTTP requests to the server to check whether new data is available. But instead using WS or SSE streams allows that data to be sent to the client as soon as the data is available. This often results in this information being presented sooner in browsers, which often feels good 1.
It’s hard to definitively say whether WS/SSE performs better than repeatedly polling. With HTTP/1.1 it probably is. But with HTTP/2.0 many of the downsides of making repeated requests are gone, and keeping a separate WS connection open might in fact be slower. But work is being done on enabling WS support over HTTP/2.0 so it’s hard to make any definitive statements about performance. It all depends.
I think the best way to think about WS and SSE is as networked channels. Messages are sent by one side and received by the other. SSE provides unidirectional channels. WS provides bidirectional channels (both sides of the channel can send messages to the other side). As we’ll see later on, features such as broadcasting and multi-producer support can be enabled for either channel kind.
Which APIs do we need?
Implementations for async SSE and WS streams already exist on crates.io.
Examples include
async-tungstenite
and
sse-codec
. But while these
are great building blocks to build upon, exposing them from Tide without
analyzing how people would want to use them would result in a poor end-user
experience 2 3.
Many frameworks (of any kind) feel like they were designed by walking down a checklist of features. On paper these frameworks are great because they tick all boxes and infinitely flexible. But in their flexibility they fail to provide meaningful abstractions, shifting the burden of using APIs correctly onto the user. This rarely leads to pleasant user experiences.
While talking about abstractions: finding the right layer to abstract on is a tedious process. Abstract too much, and unexpected uses that should be valid become impossible. Abstract too little, and APIs become hard to use and prone to error. The task of API design is to find the middle ground, and balance that with other forces such as consistency and performance. And getting things right takes time.
Channel implementations for HTTP servers can roughly be split up into two categories 4:
- Sending messages from inside HTTP endpoints. For example: a file upload is triggered, and we want to send progress events back to the client while we process it.
- Sending messages from outside HTTP endpoints. For example: our email service creates a RabbitMQ event, which is picked up by our server, and notifies a connected client.
I initially missed the distinction here, but Julian Gruber pointed out on Twitter that we did in fact need something like this.
This calls for the introduction of two-ish APIs: one for sending messages from inside HTTP endpoints, and one for sending messages outside of it. And in turn there should be variants for both SSE and WS.
Another consideration is that there should be a mechanism to easily access channels associated with the same peer. But it should also be possible to access channels associated with other peers.
Channel states
In theory any channel can exist in one of 3 states:
- connected
- not connected, but will connect
- will definitely not connect
However we can’t make any meaningful distinction at the framework layer between “currently not connected”, and “will never connect” because we can’t know for sure whether we’ve encountered a race condition, or if this is intended behavior. 5
The way to provide stricter guarantees around Tide’s APIs is for application authors to encode stricter guarantees for their applications by writing Extension Traits. This is part of Tide’s layered design.
If an API is public clients may or may not provide websocket support as well. This is different from applications where the author controls both client and server impls, where that guarantee can be made. Tide needs to support both, so we can’t codify any assumptions about what it means when a client isn’t connected.
API design
The design we’re going for in Tide is to have a managed pool of client connections inside the framework, and expose convenience methods to access them from endpoints and on registration.
Channels will be subdivided under two flavors: Server Sent Events (SSE), and
WebSockets (WS). Their APIs will be most mostly similar, with the main
difference being that WS channels can receive messages and implement
Stream<Item = Message>
.
Registering a new channel
Establishing a new channel is similar for both SSE and WS:
let mut app = tide::new();
app.at("/sse").get(tide::sse());
app.at("/ws").get(tide::ws());
In the case of WS an HTTP upgrade is performed, which has a handshake procedure and some other details. While in the SSE case a response is instantiated by sending the right response headers, and then keeping the body stream open.
In addition to shorthand functions, we should also allow configuration of both channel kinds. By default the shorthand function would provide things like timeouts, keepalive messages, and other values. But these should all be configurable through the constructors.
let mut app tide::new();
let ws = tide::ws().timeout(Duration::from_secs(5));
app.at("/ws").get(ws);
Registering external channel handlers
Similar to async-std’s channels, Tide’s channels would be cloned so each request can get their own copy and we don’t ever have to worry about lifetimes.
On creation the channel structs should provide hooks so that when a new
channel is instantiated, a callback is called providing the channel itself.
Probably for good measure it’d also be useful to provide a copy of
State
.
let mut app tide::new();
let ws = tide::ws_with(async |ws: WebSocket, _state| {
println!("new websocket created");
});
app.at("/ws").get(ws);
The purpose of this API is to integrate with external resources that might want to send messages to peers outside of regular the scope of an HTTP request.
An open question here remains how to expose further metadata. Perhaps the
WebSocket
struct should hold information such as peer_addr
, but that’s
still unclear.
Using channels inside endpoints
Using channels inside endpoints should have a convenient API. Both
req.sse()
and req.ws()
would always return a valid channel instance
associated with the current client address.
let mut app = tide::new();
app.at("/ws").get(tide::ws()); // endpoint to establish a websocket connection on
app.at("/").get(async |req| {
let socket = req.ws(); // access the socket
socket.send(b"hello chashu").await?; // send a message
let msg = socket.recv().await?; // receive a message
println!("message received: {}", msg); // print a message
Response::new(200)
});
app.listen("127.0.0.1:8080").await?;
Connectivity states can be checked through the is_open
method. (See
Channel States for more on this). Say we don’t want to
send a message unless we have an SSE connection, we could express it as
follows:
let mut app = tide::new();
app.at("/sse").get(tide::sse());
app.at("/").get(async |req| {
let sse = req.sse();
if sse.is_open() {
sse.send(b"hello Nori").await?;
}
Response::new(200)
});
app.listen("127.0.0.1:8080").await?;
Support for accessing channels associated with other peers would not be possible in the first iteration for these APIs. See group support for future directions we’re exploring.
Matching channels with remote peers
Something to keep in mind is that we can’t match remote peers with channels
purely on incoming TCP request addresses since those could be routed by a
proxy. Instead we should be aware of x-forwarded-for
and forwarded
headers.
It seems likely there is a set of APIs here that we need to define that exposes the “actual IP address” of the remote peers. Any bugs around this logic seem like they’d be a typical case of: “worked on my machine, but broke once in production because of proxies” so we should put some effort in to get this right.
Internally to the framework we should keep a hashmap that matches remote
peers with channels, and shares the right channel with the provided
Request
.
update(2020/01/30): as was pointed out on twitter, in order to safely associate peers with channels we need to introduce a session mechanism first. Matching by IP or other params has dire security implications.
Disconnects
Another topic to consider are disconnects, and retries. Both SSE and WS
handle disconnects and retries differently, but the gist is that on
disconnect we we should assume that the client might reconnect for a given
duration. And once that is over all pending channels should fail with an
io::Error
(since the remote is gone).
While the channel is awaiting reconnection, the is_open
method should
continue to return true
. However the senders will remain waiting and might
eventually end. This is acceptable behavior since if an SSE connection
dropped midway through a session, the client itself likely dropped as well,
at which point it seems acceptable to return an error from the endpoint.
If this proves to be insufficient, we may consider extending Tide’s channel APIs. There’s a fair chance this might be the case, so it’s something to keep in mind.
Heartbeat
In order for channels to keep their respective connections open, they need to regularly send data over the TCP connection. Failure to do so usually results in the underlying operating system closing a connection and returning a timeout error. Sending messages at a set interval for this purpose is also known as a “heartbeat”.
For Server Sent Events, heartbeats should be implemented using comments. These are empty messages that are ignored by client implementations.
The WebSocket specification provides
ping
and pong
frames that can be used to maintain a heartbeat with a peer.
Interface overview
These are the interfaces we’re proposing for the various channel interfaces:
use async_std::sync::{Sender, Receiver};
// The messages sent and received by Tide's channels.
pub enum Message {
Text(String),
Binary(Vec<u8>),
Ping,
}
impl From<String> for Message {};
impl From<Vec<u8>> for Message {};
impl TryFrom<Message> for String {};
impl TryFrom<Message> for Vec<u8> {};
// The websocket interface.
pub struct WebSocket((Sender, Receiver));
impl WebSocket {
fn is_open(&self) -> bool;
async fn send(&self, impl Into<Message>) -> io::Result<()>;
async fn send_json(&self, &impl Serialize) -> io::Result<()>;
async fn recv(&self) -> io::Result<Message>;
async fn recv_json<T: DeserializeOwned>(&self) -> io::Result<T>;
}
impl Stream<Item = Message> for WebSocket {};
// The server sent events interface.
pub struct ServerSentEvents(Sender);
impl ServerSentEvents {
/// The id of the next message.
fn id(&self) -> u64;
fn is_open(&self) -> bool;
async fn send(&self, impl Into<Message>) -> io::Result<()>;
async fn send_json(&self, &impl Serialize) -> io::Result<()>;
}
// Impls on Tide's Request struct
impl Request {
fn ws(&self) -> WebSocket;
fn sse(&self) -> ServerSentEvents;
}
And these are the interfaces for the various endpoints and endpoint constructors:
/// Top-level functions to create an SSE endpoint
pub fn sse() -> ServerSendEventsEndpoint;
pub fn sse_with(f: impl AsyncFn<ServerSentEvents>) -> ServerSendEventsEndpoint;
/// Top-level function to create an SSE endpoint
pub fn ws() -> WebSocketEndpoint;
pub fn ws_with(f: impl AsyncFn<WebSocket>) -> WebSocketEndpoint;
/// The SSE endpoint struct.
pub struct ServerSentEventsEndpoint(fn: Option<AsyncFn<_>>);
impl ServerSentEventsEndpoint {
pub fn new() -> Self;
pub fn with(fn: impl AsyncFn<ServerSentEvents>) -> Self;
pub fn set_timeout(self) -> Self;
}
impl tide::Endpoint for ServerSentEventsEndpoint {};
/// The Websocket endpoint struct.
pub struct WebSocketEndpoint(fn: Option<AsyncFn<_>>);
impl WebSocketEndpoint {
pub fn new() -> Self;
pub fn with(fn: impl AsyncFn<WebSocketEndpoint>) -> Self;
pub fn set_timeout(self) -> Self;
}
impl tide::Endpoint for WebSocketEndpoint {};
Future directions
This post currently only describes a design for channels in Tide. Logical next steps would include: implementation, feedback, and stabilization. But looking beyond that, there are some features that don’t haven’t been fleshed out yet.
Group support
The APIs we’ve described so far only operate on individual channels. Inside an HTTP request it’s clear which channel is connected to the client. But what if you want to send messages over channels connected to different clients? This often means manually writing grouping logic to keep track of who belongs to which group.
In Phoenix channels, all channels are grouped under topics, which makes it easy to send messages to multiple channels. This is great for features such as group chat, or real-time collaboration.
For Tide we want it to be possible to create groups, but do that as an extension of singular APIs. Unlike Phoenix, a design goal here is to not require a specialized client library to interact with Tide’s channels. Tide should be flexible enough to replace existing production servers, without requiring a redesign of the client logic. A rough API sketch:
let mut app = tide::new();
app.at("/wss").get(tide::ws());
app.at("/room/:name/join").post(async |req| {
let group_name = format!("room:{}", req.params("name")?);
req.ws().join_group(&group_name).await;
req.ws().broadcast(&group_name, format!("{} has joined the room", req.peer_addr())).await?;
Response::new(200)
});
app.listen("127.0.0.1:8080").await?;
In the example above all we provide is a mechanism to more conveniently
address peers that are also connected to the same server. In real-world
servers this would probably abstracted into a set of shared logic,
implemented as a RequestExt
. This would allow abstracting the
stringly typed APIs into a reusable interface throughout the application:
use my_app::RequestExt;
let mut app = tide::new();
app.at("/wss").get(tide::ws());
app.at("/room/:name/join").post(async |req| {
req.room().join().await?;
req.room().broadcast("test, test").await?;
Response::new(200)
});
app.listen("127.0.0.1:8080").await?;
HTTP/2 push streams
On the surface HTTP/2 push and SSE appear to have much in common. Both enable sending messages from the server to the client, without the client requesting them. However they’re intended for slightly different purposes.
In practice HTTP/2 push is being designed to best handle static resources. Specs such as Cache Digests for HTTP/2 making their way in, and packages such as node-h2-auto-push are turning HTTP/2 push into something that works really well for that.
In contrast SSE support in devtools is improving, and becoming easier to inspect and debug small messages sent by the server. The different protocols are being optimized for different purposes, and that’s valid 6.
For more on HTTP/2 push, I can recommend the article: “Node.js can HTTP/2 push!”
However if we were to expose an API for it, it’d probably be along these lines 7:
let mut app = tide::new();
app.at("/index.html").get(async |req| {
let mut res = Response::new(200);
res.set_body(Path::new("./assets/index.html"));
let mut css = Response::new(200);
css.set_body(Path::new("./assets/bundle.css"));
res.push("/bundle.css", css);
Ok(res)
});
app.listen("127.0.0.1:8080").await?;
The main consideration for any HTTP/2 push API is that we stay
true to Tide’s Request/Response model. This is analogous to the Node.js HTTP/2
response.createPushResponse()
API that’s part of the HTTP/1 compat layer. The step after this would
probably be to build cache-aware auto-push middleware, but that comes later.
Summary
In this post we’ve shared the motivation, constraints, and proposed design for Tide’s channels APIs. These APIs cover support for Server Sent Events, and WebSockets.
In addition we’ve looked ahead for ways we can extend these APIs to work with “groups”, and shared a rough draft of an HTTP/2 push interface as well.
As I said at the start of this post, this one is a bit different than usual. Today is January 29th 2020, and the entirety of this post was drafted and published in a single day. This means there’s a fair chance designs laid out here might have shortcomings, and will change over time. But I wanted to get ahead of the curve, and share the research and motivations underlying these designs.
My personal takeaway from researching the various channel APIs for Tide, is that channels present a really nice interface, and lots of network things can probably be expressed as channels! I’ve used both Server Sent Events and WebSockets before, but they never really felt right. I think with this API we might be onto something that could make this easier.
Either way that’s about all I can share for now. It’s back to writing different posts. Happy Wednesday!