Home

Awesome

<a href="https://www.swimos.org"><img src="https://docs.swimos.org/readme/marlin-blue.svg" align="left"></a> <br><br><br>

Ratchet

Ratchet is a fast, robust, lightweight and fully asynchronous implementation of RFC6455 (The WebSocket protocol). Complete with an optional implementation of RFC7692 (Compression Extensions For WebSocket).

Ratchet powers SwimOS on Rust; a framework for real-time streaming data applications.

Crates.io

Documentation

Features

Testing

Ratchet is fully tested and passes every Autobahn test for both client and server modes.

Examples

Client

#[tokio::main]
async fn main() -> Result<(), Error> {
  let stream = TcpStream::connect("127.0.0.1:9001").await?;
  
  let upgraded = subscribe(
    WebSocketConfig::default(),
    stream,
    "ws://127.0.0.1/hello".try_into_request()?,
  )
  .await?;

  let UpgradedClient { socket, subprotocol } = upgraded;
  let mut buf = BytesMut::new();

  loop {
    match websocket.read(&mut buf).await? {
      Message::Text => {
        websocket.write(&mut buf, PayloadType::Text).await?;
        buf.clear();
      }
      Message::Binary => {
        websocket.write(&mut buf, PayloadType::Binary).await?;
        buf.clear();
      }
      Message::Ping | Message::Pong => {
        // Ping messages are transparently handled by Ratchet
      }
      Message::Close(_) => break Ok(()),
    }
  }
}

Server

#[tokio::main]
async fn main() -> Result<(), Error> {
    let listener = TcpListener::bind("127.0.0.1:9001").await?;
    let mut incoming = TcpListenerStream::new(listener);

    while let Some(socket) = incoming.next().await {
        let socket = socket?;

        // An upgrader contains information about what the peer has requested.
        let mut upgrader = ratchet::accept_with(
            socket,
            WebSocketConfig::default(),
            NoExtProvider,
            SubprotocolRegistry::default(),
        )
        .await?;

        // You could opt to reject the connection
        // upgrader.reject(WebSocketResponse::new(404)?).await?;
        // continue;
      
        // Or you could reject the connection with headers
        // upgrader.reject(WebSocketResponse::with_headers(404, headers)?).await;
        // continue;

        let UpgradedServer {
            request,
            mut websocket,
            subprotocol,
        } = upgrader.upgrade().await?;
        
        let mut buf = BytesMut::new();

        loop {
            match websocket.read(&mut buf).await? {
                Message::Text => {
                    websocket.write(&mut buf, PayloadType::Text).await?;
                    buf.clear();
                }
                Message::Binary => {
                    websocket.write(&mut buf, PayloadType::Binary).await?;
                    buf.clear();
                }
                Message::Ping | Message::Pong => {
                  // Ping messages are transparently handled by Ratchet
                }
                Message::Close(_) => break,
            }
        }
    }
    
    Ok(())
}

Deflate

  let mut websocket = ratchet::accept_with(
      socket,
      WebSocketConfig::default(),
      DeflateProvider,
      SubprotocolRegistry::default(),
  )
  .await?;

Split

// A split operation will only fail if the WebSocket is already closed.
let (mut sender, mut receiver) = websocket.split()?;
    
loop {
    match receiver.read(&mut buf).await? {
        Message::Text => {
            sender.write(&mut buf, PayloadType::Text).await?;
            buf.clear();
        }
        Message::Binary => {
            sender.write(&mut buf, PayloadType::Binary).await?;
            buf.clear();
        }
        Message::Ping | Message::Pong => {}
        Message::Close(_) => break Ok(()),
    }
}

License

Ratchet is licensed under the Apache License 2.0