[rank_math_breadcrumb]

How the reqwest HTTP client streams responses in a Web context

by | Nov 30, 2023 | Technology

We are working on implementing Server-Sent Events (SSE) on all platforms supported by Parsec using reqwestan HTTP Client written in Rust. Among these platforms, it must support a Web Browser context.

We wanted to know how the implementation was done for the web browser context to see if it didn't cause any performance problems or leaks.

Summary

For the study case, we will use the following versions:

Versions

Context

On native platforms, the OS provides APIs to open TCP/UDP sockets. The application then uses those sockets as it sees fit. For example, adding SSL/TLS and HTTP layers to implement the HTTPS protocol, or maybe implement any protocol such as BitTorrent or Tor.

On the Web, the browser provides much higher level APIs, such as HTTP or WebSocket. This is really handy and allows to easily implement web applications on the browser (which plays the role of an OS in a Web context). However, depending on the API implementation details, the results may not be as efficient as a native implementation.

The slow server

Let's implement a server that takes some time to respond. This should help us to mimic what would happen when sending SSE responses.

The following example uses Python to implement a server sending a response 1 byte at a time.

import socket
import asyncio

async def handle_client(loop: asyncio.AbstractEventLoop, client: socket.socket, _address):
  peer_name = client.getpeername()
  resp = (
    b"HTTP/1.1 418 I'm a teapot\r\n"
    b"Content-Length: 5\r\n"
    b"Access-Control-Allow-Origin: *\r\n"
    b"Access-Control-Expose-Headers: The-Answer-You-Are-Looking-For\r\n"
    b"The-Answer-You-Are-Looking-For: 42\r\n"
    b"\r\n"
  )
  await loop.sock_sendall(client, resp)
  # Simulate a slow body being sent.
  try:
    for c in b"12345":
      await asyncio.sleep(1)
      print(f"Sending byte {c} to {peer_name}")
      await loop.sock_sendall(client, c.to_bytes())
  except (ConnectionResetError, BrokenPipeError):
    pass
  client.close()

async def run_server(server: socket.socket):
  loop = asyncio.get_running_loop()
  while True:
    client, address = await loop.sock_accept(server)
    await asyncio.create_task(handle_client(loop, client, address))

server = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
server.bind(('localhost', 8418))
server.listen(4)
asyncio.run(run_server(server))

The web client: a naive approach

The following Javascript example uses the Fetch API to request a resource from the slow HTTP server.

async function client() {
  console.time("Fetch request");
  let response = await fetch("http://localhost:8418");
  console.timeLog("Fetch request", "Received response")
  console.assert(response.headers.get('The-Answer-You-Are-Looking-For') === '42');
  console.timeEnd("Fetch request");
}

client().then(() => console.log("Finished"))

The fetch() would take a couple of seconds (~5s in my case, using Firefox) until receiving the full response from our slow server.

Note: that code will behave differently on nodethe fetch will take ~20ms but the will take ~5s for the script to exit.

The native client

Instead of blocking to read the response as a whole, we would prefer reading partial data as soon as it becomes available. In Rust, we would do something like this:

use reqwest::{self, header::HeaderValue};
use std::time::{Duration, Instant};

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
  let timer = Instant::now();
  let response = reqwest::get("http://localhost:8418").await?;
  assert_eq!(
      response.headers().get("TheAnswerYouAreLookingFor"),
      Some(&HeaderValue::from_static("42"))
  );
  let elapsed = timer.elapsed();

  assert!(elapsed < Duration::from_secs(5));
  println!("Request took {}ms", elapsed.as_millis());
  Ok(())
}

The reqwest::get would take ~10ms.

Being able to not process the complete request is useful. For instance some web server could early abort a request if it was invalid.

This is why we don't want to rely on the Fetch API when working with SSE on the Web.

The client should be able to read the data as soon as possible without blocking for the complete HTTP response to arrive.

SSE in Parsec

The Parsec server needs to send events to the client using the SSE protocol.

The SSE protocol is built over HTTP and allows to serialize the events in the HTTP response. Theoretically, the HTTP response never ends since the connection needs to be kept open in order to be able to send events.

It is like a very slow HTTP response.

The fact that the HTTP response never ends don't play nice with the Fetch API that would wait for the end of it.

Fortunately in Web we can use the SSE API.

In Parsec, SSE is implemented using reqwest-eventsourcewhich relies on reqwest for actual HTTP. Here is the method that handles HTTP response from reqwest at reqwest-eventsource/src/event_source.rs:145-150:

fn handle_response(&mut self, res: Response) {
    self.last_retry.take();
    let mut stream = res.bytes_stream().eventsource();
    stream.set_last_event_id(self.last_event_id.clone());
    self.cur_stream.replace(Box::pin(stream));
}

The important line here is:

let mut stream = res.bytes_stream().eventsource();

Where res is a reqwest::Response. The .eventsource() is provided by the trait eventsource-stream::Eventsource it consumes the stream returned by .bytes_stream(). .bytes_stream() consumes the response and returns a stream that yields bytes.

So, what's under the hood?

The goal is to understand how reqwest implements Response::bytes_stream for the target wasm32-unknown-unknown.

Reverse engineering

Reqwest

reqwest has a stream feature to enable Response::bytes_stream in Cargo.toml:

[package]
name = "reqwest"
version = "0.11.18" # remember to update html_root_url
description = "higher level HTTP client library"

# ...

[features]
# ...

stream = ["tokio/fs", "tokio-util", "wasm-streams"]

# ...

[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3.45"
serde_json = "1.0"
wasm-bindgen = "0.2.68"
wasm-bindgen-futures = "0.4.18"
wasm-streams = { version = "0.2", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies.web-sys]
version = "0.3.25"
features = [
    "AbortController",
    "AbortSignal",
    "Headers",
    "Request",
    "RequestInit",
    "RequestMode",
    "Response",
    "Window",
    "FormData",
    "Blob",
    "BlobPropertyBag",
    "ServiceWorkerGlobalScope",
    "RequestCredentials",
    "File",
    "ReadableStream"
]

# ...

When the stream feature is enabled, the wasm-streams dependency is added. This dependency is used only once in the reqwest codebase at reqwest/src/wasm/response.rs:135-157. Those lines correspond to the implementation of Response::bytes_stream 😄

impl Response {
    /// Convert the response into a `Stream` of `Bytes` from the body.
    #[cfg(feature = "stream")]
    pub fn bytes_stream(self) -> impl futures_core::Stream<Item = crate::Result<Bytes>> {
        let web_response = self.http.into_body();
        let abort = self._abort;
        let body = web_response
            .body()
            .expect("could not create wasm byte stream");
        let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into());
        Box::pin(body.into_stream().map(move |buf_js| {
            // Keep the abort guard alive as long as this stream is.
            let _abort = &abort;
            let buffer = Uint8Array::new(
                &buf_js
                    .map_err(crate::error::wasm)
                    .map_err(crate::error::decode)?,
            );
            let mut bytes = vec![0; buffer.length() as usize];
            buffer.copy_to(&mut bytes);
            Ok(bytes.into())
        }))
    }
}

Let's try to understand what this code is doing:

  1. self.http.into_body() returns web_sys::Response as per the definition of body at reqwest/src/wasm/response.rs:19-26:
    /// A Response to a submitted `Request`.
    pub struct Response {
      http: http::Response,
      _abort: AbortGuard,
      // Boxed to save space (11 words to 1 word), and it's not accessed
      // frequently internally.
      url: Box,
    }

    http::Response is a wrapper around the inner type web_sys::Response. into_body returns the Inner type web_sys::Response.

  2. body.unchecked_into() converts the web_sys::Response into a wasm_streams::readable::sys::ReadableStream.
  3. wasm_stream::ReadableStream::from_raw() consumes the sys::ReadableStream.
  4. Finally, it converts ReadableStream into a Stream.

Wasm-streams

wasm-streams is a crate whose main goal is to communicate with the browser's stream API:

Working with the Web Streams API in Rust.

This crate provides wrappers around ReadableStream, WritableStream and TransformStream. It also supports converting from and into [Stream]s and [Sink]s from the [futures] crate.

wasm-streams/src/lib.rs:1-6

We're interested by 2 elements provided by that crate:

  1. wasm_stream::readable::sys::ReadableStream a wrapper around the ReadableStream browser API
  2. wasm_stream::readable::ReadableStream a struct that allows to convert JS/Rust stream into Rust/JS stream. In the case of reqwestwe use it to convert from JS stream to Rust stream.

Conclusion

The reqwest implementation for bytes_stream in WebAssembly relies on the browser's Streams API. This should allow implementing SSE on Parsec with an efficient use of resources and avoiding leaks outside the browser's internal implementation.

Article written by:

  • Florian Bennetot
  • Marcos Medrano
  • Emmanuel Leblond

By Florian

In the same category

Optimize Rust build &amp; test for CI

Optimize Rust build & test for CI

Last year, we migrated our CI to GitHub Actions after previously using Azure Pipelines. We took advantage of the migration to improve our CI. This article will summarize the different steps we have taken to enhance our CI when working with Rust. Parallelize Run...