We are working on implementing Server-Sent Events (SSE) on all platforms supported by Parsec using reqwest, an HTTP Client written in Rust.
Among these platforms, it must support a Web Browser context.
We wanted to know how the implementation was done for the web browser context to see if it didn’t cause any performance problems or leaks.
Summary
For the study case, we will use the following versions:
Versions
Context
On native platforms, the OS provides APIs to open TCP/UDP sockets. The application then uses those sockets as it sees fit. For example, adding SSL/TLS and HTTP layers to implement the HTTPS protocol, or maybe implement any protocol such as BitTorrent or Tor.
On the Web, the browser provides much higher level APIs, such as HTTP or WebSocket. This is really handy and allows to easily implement web applications on the browser (which plays the role of an OS in a Web context). However, depending on the API implementation details, the results may not be as efficient as a native implementation.
The slow server
Let’s implement a server that takes some time to respond. This should help us to mimic what would happen when sending SSE responses.
The following example uses Python to implement a server sending a response 1 byte at a time.
import socket
import asyncio
async def handle_client(loop: asyncio.AbstractEventLoop, client: socket.socket, _address):
peer_name = client.getpeername()
resp = (
b"HTTP/1.1 418 I'm a teapot\r\n"
b"Content-Length: 5\r\n"
b"Access-Control-Allow-Origin: *\r\n"
b"Access-Control-Expose-Headers: The-Answer-You-Are-Looking-For\r\n"
b"The-Answer-You-Are-Looking-For: 42\r\n"
b"\r\n"
)
await loop.sock_sendall(client, resp)
# Simulate a slow body being sent.
try:
for c in b"12345":
await asyncio.sleep(1)
print(f"Sending byte {c} to {peer_name}")
await loop.sock_sendall(client, c.to_bytes())
except (ConnectionResetError, BrokenPipeError):
pass
client.close()
async def run_server(server: socket.socket):
loop = asyncio.get_running_loop()
while True:
client, address = await loop.sock_accept(server)
await asyncio.create_task(handle_client(loop, client, address))
server = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
server.bind(('localhost', 8418))
server.listen(4)
asyncio.run(run_server(server))
The web client: a naive approach
The following Javascript example uses the Fetch API to request a resource from the slow HTTP server.
async function client() {
console.time("Fetch request");
let response = await fetch("http://localhost:8418");
console.timeLog("Fetch request", "Received response")
console.assert(response.headers.get('The-Answer-You-Are-Looking-For') === '42');
console.timeEnd("Fetch request");
}
client().then(() => console.log("Finished"))
The fetch() would take a couple of seconds (~5s in my case, using Firefox) until receiving the full response from our slow server.
Note: that code will behave differently on
node, the fetch will take~20msbut the will take~5sfor the script to exit.
The native client
Instead of blocking to read the response as a whole, we would prefer reading partial data as soon as it becomes available. In Rust, we would do something like this:
use reqwest::{self, header::HeaderValue};
use std::time::{Duration, Instant};
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let timer = Instant::now();
let response = reqwest::get("http://localhost:8418").await?;
assert_eq!(
response.headers().get("TheAnswerYouAreLookingFor"),
Some(&HeaderValue::from_static("42"))
);
let elapsed = timer.elapsed();
assert!(elapsed < Duration::from_secs(5));
println!("Request took {}ms", elapsed.as_millis());
Ok(())
}
The reqwest::get would take ~10ms.
Being able to not process the complete request is useful. For instance some web server could early abort a request if it was invalid.
This is why we don’t want to rely on the Fetch API when working with SSE on the Web.
The client should be able to read the data as soon as possible without blocking for the complete HTTP response to arrive.
SSE in Parsec
The Parsec server needs to send events to the client using the SSE protocol.
The SSE protocol is built over HTTP and allows to serialize the events in the HTTP response. Theoretically, the HTTP response never ends since the connection needs to be kept open in order to be able to send events.
It is like a very slow HTTP response.
The fact that the HTTP response never ends don’t play nice with the Fetch API that would wait for the end of it.
Fortunately in Web we can use the SSE API.
In Parsec, SSE is implemented using reqwest-eventsource, which relies on reqwest for actual HTTP.
Here is the method that handles HTTP response from reqwest at reqwest-eventsource/src/event_source.rs:145-150:
fn handle_response(&mut self, res: Response) {
self.last_retry.take();
let mut stream = res.bytes_stream().eventsource();
stream.set_last_event_id(self.last_event_id.clone());
self.cur_stream.replace(Box::pin(stream));
}
The important line here is:
let mut stream = res.bytes_stream().eventsource();
Where res is a reqwest::Response.
The .eventsource() is provided by the trait eventsource-stream::Eventsource it consumes the stream returned by .bytes_stream(). .bytes_stream() consumes the response and returns a stream that yields bytes.
So, what’s under the hood?
The goal is to understand how reqwest implements Response::bytes_stream for the target wasm32-unknown-unknown.
Reverse engineering
Reqwest
reqwest has a stream feature to enable Response::bytes_stream in Cargo.toml:
[package]
name = "reqwest"
version = "0.11.18" # remember to update html_root_url
description = "higher level HTTP client library"
# ...
[features]
# ...
stream = ["tokio/fs", "tokio-util", "wasm-streams"]
# ...
[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3.45"
serde_json = "1.0"
wasm-bindgen = "0.2.68"
wasm-bindgen-futures = "0.4.18"
wasm-streams = { version = "0.2", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies.web-sys]
version = "0.3.25"
features = [
"AbortController",
"AbortSignal",
"Headers",
"Request",
"RequestInit",
"RequestMode",
"Response",
"Window",
"FormData",
"Blob",
"BlobPropertyBag",
"ServiceWorkerGlobalScope",
"RequestCredentials",
"File",
"ReadableStream"
]
# ...
When the stream feature is enabled, the wasm-streams dependency is added.
This dependency is used only once in the reqwest codebase at reqwest/src/wasm/response.rs:135-157.
Those lines correspond to the implementation of Response::bytes_stream 😄
impl Response {
/// Convert the response into a `Stream` of `Bytes` from the body.
#[cfg(feature = "stream")]
pub fn bytes_stream(self) -> impl futures_core::Stream<Item = crate::Result<Bytes>> {
let web_response = self.http.into_body();
let abort = self._abort;
let body = web_response
.body()
.expect("could not create wasm byte stream");
let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into());
Box::pin(body.into_stream().map(move |buf_js| {
// Keep the abort guard alive as long as this stream is.
let _abort = &abort;
let buffer = Uint8Array::new(
&buf_js
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?,
);
let mut bytes = vec![0; buffer.length() as usize];
buffer.copy_to(&mut bytes);
Ok(bytes.into())
}))
}
}
Let’s try to understand what this code is doing:
-
self.http.into_body()returnsweb_sys::Responseas per the definition of body at reqwest/src/wasm/response.rs:19-26:/// A Response to a submitted `Request`. pub struct Response { http: http::Response<:response>, _abort: AbortGuard, // Boxed to save space (11 words to 1 word), and it's not accessed // frequently internally. url: Box, } http::Responseis a wrapper around the inner typeweb_sys::Response.into_bodyreturns the Inner typeweb_sys::Response. body.unchecked_into()converts theweb_sys::Responseinto awasm_streams::readable::sys::ReadableStream.wasm_stream::ReadableStream::from_raw()consumes thesys::ReadableStream.- Finally, it converts
ReadableStreaminto aStream.
Wasm-streams
wasm-streams is a crate whose main goal is to communicate with the browser’s stream API:
Working with the Web Streams API in Rust.
This crate provides wrappers around
ReadableStream,WritableStreamandTransformStream. It also supports converting from and into [Stream]s and [Sink]s from the [futures] crate.
We’re interested by 2 elements provided by that crate:
wasm_stream::readable::sys::ReadableStreama wrapper around theReadableStreambrowser APIwasm_stream::readable::ReadableStreama struct that allows to convertJS/Ruststream intoRust/JSstream. In the case ofreqwest, we use it to convert fromJS streamtoRust stream.
Conclusion
The reqwest implementation for bytes_stream in WebAssembly relies on the browser’s Streams API.
This should allow implementing SSE on Parsec with an efficient use of resources and avoiding leaks outside the browser’s internal implementation.
Article written by:
- Florian Bennetot
- Marcos Medrano
- Emmanuel Leblond


