We are working on implementing Server-Sent Events (SSE) on all platforms supported by Parsec using reqwest
, an HTTP Client written in Rust.
Among these platforms, it must support a Web Browser context.
We wanted to know how the implementation was done for the web browser context to see if it didn’t cause any performance problems or leaks.
Summary
For the study case, we will use the following versions:
Versions
Context
On native platforms, the OS provides APIs to open TCP/UDP sockets. The application then uses those sockets as it sees fit. For example, adding SSL/TLS and HTTP layers to implement the HTTPS protocol, or maybe implement any protocol such as BitTorrent or Tor.
On the Web, the browser provides much higher level APIs, such as HTTP or WebSocket. This is really handy and allows to easily implement web applications on the browser (which plays the role of an OS in a Web context). However, depending on the API implementation details, the results may not be as efficient as a native implementation.
The slow server
Let’s implement a server that takes some time to respond. This should help us to mimic what would happen when sending SSE responses.
The following example uses Python to implement a server sending a response 1 byte at a time.
import socket
import asyncio
async def handle_client(loop: asyncio.AbstractEventLoop, client: socket.socket, _address):
peer_name = client.getpeername()
resp = (
b"HTTP/1.1 418 I'm a teapot\r\n"
b"Content-Length: 5\r\n"
b"Access-Control-Allow-Origin: *\r\n"
b"Access-Control-Expose-Headers: The-Answer-You-Are-Looking-For\r\n"
b"The-Answer-You-Are-Looking-For: 42\r\n"
b"\r\n"
)
await loop.sock_sendall(client, resp)
# Simulate a slow body being sent.
try:
for c in b"12345":
await asyncio.sleep(1)
print(f"Sending byte {c} to {peer_name}")
await loop.sock_sendall(client, c.to_bytes())
except (ConnectionResetError, BrokenPipeError):
pass
client.close()
async def run_server(server: socket.socket):
loop = asyncio.get_running_loop()
while True:
client, address = await loop.sock_accept(server)
await asyncio.create_task(handle_client(loop, client, address))
server = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
server.bind(('localhost', 8418))
server.listen(4)
asyncio.run(run_server(server))
The web client: a naive approach
The following Javascript example uses the Fetch API to request a resource from the slow HTTP server.
async function client() {
console.time("Fetch request");
let response = await fetch("http://localhost:8418");
console.timeLog("Fetch request", "Received response")
console.assert(response.headers.get('The-Answer-You-Are-Looking-For') === '42');
console.timeEnd("Fetch request");
}
client().then(() => console.log("Finished"))
The fetch()
would take a couple of seconds (~5s
in my case, using Firefox) until receiving the full response from our slow server.
Note: that code will behave differently on
node
, the fetch will take~20ms
but the will take~5s
for the script to exit.
The native client
Instead of blocking to read the response as a whole, we would prefer reading partial data as soon as it becomes available. In Rust, we would do something like this:
use reqwest::{self, header::HeaderValue};
use std::time::{Duration, Instant};
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let timer = Instant::now();
let response = reqwest::get("http://localhost:8418").await?;
assert_eq!(
response.headers().get("TheAnswerYouAreLookingFor"),
Some(&HeaderValue::from_static("42"))
);
let elapsed = timer.elapsed();
assert!(elapsed < Duration::from_secs(5));
println!("Request took {}ms", elapsed.as_millis());
Ok(())
}
The reqwest::get
would take ~10ms.
Being able to not process the complete request is useful. For instance some web server could early abort a request if it was invalid.
This is why we don’t want to rely on the Fetch API when working with SSE on the Web.
The client should be able to read the data as soon as possible without blocking for the complete HTTP response to arrive.
SSE in Parsec
The Parsec server needs to send events to the client using the SSE protocol.
The SSE protocol is built over HTTP and allows to serialize the events in the HTTP response. Theoretically, the HTTP response never ends since the connection needs to be kept open in order to be able to send events.
It is like a very slow HTTP response.
The fact that the HTTP response never ends don’t play nice with the Fetch API that would wait for the end of it.
Fortunately in Web we can use the SSE API.
In Parsec, SSE is implemented using reqwest-eventsource
, which relies on reqwest
for actual HTTP.
Here is the method that handles HTTP response from reqwest
at reqwest-eventsource/src/event_source.rs:145-150:
fn handle_response(&mut self, res: Response) {
self.last_retry.take();
let mut stream = res.bytes_stream().eventsource();
stream.set_last_event_id(self.last_event_id.clone());
self.cur_stream.replace(Box::pin(stream));
}
The important line here is:
let mut stream = res.bytes_stream().eventsource();
Where res is a reqwest::Response
.
The .eventsource()
is provided by the trait eventsource-stream::Eventsource
it consumes the stream returned by .bytes_stream()
. .bytes_stream()
consumes the response and returns a stream that yields bytes.
So, what’s under the hood?
The goal is to understand how reqwest
implements Response::bytes_stream
for the target wasm32-unknown-unknown.
Reverse engineering
Reqwest
reqwest
has a stream
feature to enable Response::bytes_stream
in Cargo.toml
:
[package]
name = "reqwest"
version = "0.11.18" # remember to update html_root_url
description = "higher level HTTP client library"
# ...
[features]
# ...
stream = ["tokio/fs", "tokio-util", "wasm-streams"]
# ...
[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3.45"
serde_json = "1.0"
wasm-bindgen = "0.2.68"
wasm-bindgen-futures = "0.4.18"
wasm-streams = { version = "0.2", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies.web-sys]
version = "0.3.25"
features = [
"AbortController",
"AbortSignal",
"Headers",
"Request",
"RequestInit",
"RequestMode",
"Response",
"Window",
"FormData",
"Blob",
"BlobPropertyBag",
"ServiceWorkerGlobalScope",
"RequestCredentials",
"File",
"ReadableStream"
]
# ...
When the stream
feature is enabled, the wasm-streams
dependency is added.
This dependency is used only once in the reqwest
codebase at reqwest/src/wasm/response.rs:135-157.
Those lines correspond to the implementation of Response::bytes_stream
😄
impl Response {
/// Convert the response into a `Stream` of `Bytes` from the body.
#[cfg(feature = "stream")]
pub fn bytes_stream(self) -> impl futures_core::Stream<Item = crate::Result<Bytes>> {
let web_response = self.http.into_body();
let abort = self._abort;
let body = web_response
.body()
.expect("could not create wasm byte stream");
let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into());
Box::pin(body.into_stream().map(move |buf_js| {
// Keep the abort guard alive as long as this stream is.
let _abort = &abort;
let buffer = Uint8Array::new(
&buf_js
.map_err(crate::error::wasm)
.map_err(crate::error::decode)?,
);
let mut bytes = vec![0; buffer.length() as usize];
buffer.copy_to(&mut bytes);
Ok(bytes.into())
}))
}
}
Let’s try to understand what this code is doing:
-
self.http.into_body()
returnsweb_sys::Response
as per the definition of body at reqwest/src/wasm/response.rs:19-26:/// A Response to a submitted `Request`. pub struct Response { http: http::Response<:response>, _abort: AbortGuard, // Boxed to save space (11 words to 1 word), and it's not accessed // frequently internally. url: Box
, } http::Response
is a wrapper around the inner typeweb_sys::Response
.into_body
returns the Inner typeweb_sys::Response
. body.unchecked_into()
converts theweb_sys::Response
into awasm_streams::readable::sys::ReadableStream
.wasm_stream::ReadableStream::from_raw()
consumes thesys::ReadableStream
.- Finally, it converts
ReadableStream
into aStream
.
Wasm-streams
wasm-streams
is a crate whose main goal is to communicate with the browser’s stream API:
Working with the Web Streams API in Rust.
This crate provides wrappers around
ReadableStream
,WritableStream
andTransformStream
. It also supports converting from and into [Stream
]s and [Sink
]s from the [futures
] crate.
We’re interested by 2 elements provided by that crate:
wasm_stream::readable::sys::ReadableStream
a wrapper around theReadableStream
browser APIwasm_stream::readable::ReadableStream
a struct that allows to convertJS/Rust
stream intoRust/JS
stream. In the case ofreqwest
, we use it to convert fromJS stream
toRust stream.
Conclusion
The reqwest
implementation for bytes_stream
in WebAssembly relies on the browser’s Streams API.
This should allow implementing SSE on Parsec with an efficient use of resources and avoiding leaks outside the browser’s internal implementation.
Article written by:
- Florian Bennetot
- Marcos Medrano
- Emmanuel Leblond