ExamplesBy LevelBy TopicLearning Paths
743 Fundamental

743-session-types — Session Types

Functional Programming

Tutorial

The Problem

Communication protocols have strict ordering requirements: you must connect before sending, send before receiving, and receive before closing. Violating the order causes protocol errors that are caught only at runtime, often in production. Session types encode the entire protocol as a sequence of type-level state transitions, making it impossible to call operations out of order. Pioneered in pi-calculus research, session types are used in the session_types Rust crate and Haskell's sesh library for typed channel communication.

🎯 Learning Outcomes

  • • Model a request-response protocol as a four-state typestate: Connected, RequestSent, ResponseReceived, Closed
  • • Enforce protocol ordering: send_request only on Connected, recv_response only on RequestSent
  • • Collect a log of protocol events while maintaining typestate discipline
  • • Understand how session types generalize typestate to multi-party protocols
  • • See how consuming self in each step prevents protocol reuse or skipping
  • Code Example

    #![allow(clippy::all)]
    use std::collections::VecDeque;
    /// 743: Session Types — protocol safety via typestate
    /// The protocol: Connect → SendRequest → RecvResponse → Close
    /// Violating the order is a COMPILE ERROR.
    use std::marker::PhantomData;
    
    // ── Protocol state markers ─────────────────────────────────────────────────────
    
    pub struct Connected;
    pub struct RequestSent;
    pub struct ResponseReceived;
    pub struct Closed;
    
    // ── Channel (simulated in-memory) ─────────────────────────────────────────────
    
    struct Channel {
        outbox: VecDeque<Vec<u8>>,
        inbox: VecDeque<Vec<u8>>,
    }
    
    impl Channel {
        fn new() -> Self {
            Channel {
                outbox: VecDeque::new(),
                inbox: VecDeque::new(),
            }
        }
    
        fn send(&mut self, data: Vec<u8>) {
            // Simulate: echo response into inbox
            let response = format!("RESP:{}", String::from_utf8_lossy(&data)).into_bytes();
            self.outbox.push_back(data);
            self.inbox.push_back(response);
        }
    
        fn recv(&mut self) -> Option<Vec<u8>> {
            self.inbox.pop_front()
        }
    }
    
    // ── Session ────────────────────────────────────────────────────────────────────
    
    pub struct Session<State> {
        channel: Channel,
        log: Vec<String>,
        _state: PhantomData<State>,
    }
    
    /// Create a new session — starts in `Connected` state.
    pub fn open_session() -> Session<Connected> {
        println!("[Session] Connected");
        Session {
            channel: Channel::new(),
            log: Vec::new(),
            _state: PhantomData,
        }
    }
    
    impl Session<Connected> {
        /// Connected → RequestSent (must happen before recv)
        pub fn send_request(mut self, method: &str, path: &str) -> Session<RequestSent> {
            let msg = format!("{} {}", method, path);
            println!("[Session] → Sending: {}", msg);
            self.channel.send(msg.into_bytes());
            self.log.push(format!("SENT: {} {}", method, path));
            Session {
                channel: self.channel,
                log: self.log,
                _state: PhantomData,
            }
        }
    }
    
    impl Session<RequestSent> {
        /// RequestSent → ResponseReceived
        pub fn receive_response(mut self) -> (String, Session<ResponseReceived>) {
            let data = self.channel.recv().expect("no response in channel");
            let response = String::from_utf8_lossy(&data).into_owned();
            println!("[Session] ← Received: {}", response);
            self.log.push(format!("RECV: {}", response));
            let sess = Session {
                channel: self.channel,
                log: self.log,
                _state: PhantomData,
            };
            (response, sess)
        }
    }
    
    impl Session<ResponseReceived> {
        /// ResponseReceived → Closed (or back to Connected for next request)
        pub fn close(mut self) -> Session<Closed> {
            println!("[Session] Closed. {} log entries.", self.log.len());
            Session {
                channel: self.channel,
                log: self.log,
                _state: PhantomData,
            }
        }
    
        /// Alternatively: send another request (pipeline)
        pub fn send_next_request(mut self, method: &str, path: &str) -> Session<RequestSent> {
            let msg = format!("{} {}", method, path);
            println!("[Session] → Pipeline: {}", msg);
            self.channel.send(msg.into_bytes());
            self.log.push(format!("SENT: {} {}", method, path));
            Session {
                channel: self.channel,
                log: self.log,
                _state: PhantomData,
            }
        }
    }
    
    impl Session<Closed> {
        pub fn log_entries(&self) -> &[String] {
            &self.log
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn happy_path_compiles_and_runs() {
            let s = open_session();
            let s = s.send_request("GET", "/test");
            let (resp, s) = s.receive_response();
            let closed = s.close();
            assert!(resp.contains("GET /test"));
            assert_eq!(closed.log_entries().len(), 2);
        }
    
        #[test]
        fn response_echoes_request() {
            let s = open_session().send_request("POST", "/data");
            let (resp, _s) = s.receive_response();
            assert!(resp.contains("POST /data"), "got: {}", resp);
            _s.close();
        }
    
        #[test]
        fn pipeline_two_requests() {
            let s = open_session()
                .send_request("GET", "/a")
                .receive_response()
                .1
                .send_next_request("GET", "/b")
                .receive_response()
                .1
                .close();
            assert_eq!(s.log_entries().len(), 4);
        }
    }

    Key Differences

  • Expressiveness: Full session types (as in session_types crate) encode the entire protocol as a type; Rust's typestate approach requires a new Session<S> struct per state.
  • Multiparty: OCaml's mpst-ocaml supports multiparty session types (multiple protocol participants); Rust's typestate handles only binary (two-party) protocols easily.
  • Duality: Session types have a dual type for the other end of the channel; Rust typestate typically only encodes one side of the protocol.
  • Runtime: Both have zero runtime overhead for the state tracking — protocol violations are caught entirely at compile time.
  • OCaml Approach

    OCaml implements session types using continuation-passing style or GADTs. A channel ('send, 'recv) channel carries phantom types for the send and receive types at each step. Libraries like mpst-ocaml implement multiparty session types for distributed systems. The continuation-passing approach threads the protocol through function types: connect : unit -> (send_t -> recv_t -> close_t -> 'a) -> 'a.

    Full Source

    #![allow(clippy::all)]
    use std::collections::VecDeque;
    /// 743: Session Types — protocol safety via typestate
    /// The protocol: Connect → SendRequest → RecvResponse → Close
    /// Violating the order is a COMPILE ERROR.
    use std::marker::PhantomData;
    
    // ── Protocol state markers ─────────────────────────────────────────────────────
    
    pub struct Connected;
    pub struct RequestSent;
    pub struct ResponseReceived;
    pub struct Closed;
    
    // ── Channel (simulated in-memory) ─────────────────────────────────────────────
    
    struct Channel {
        outbox: VecDeque<Vec<u8>>,
        inbox: VecDeque<Vec<u8>>,
    }
    
    impl Channel {
        fn new() -> Self {
            Channel {
                outbox: VecDeque::new(),
                inbox: VecDeque::new(),
            }
        }
    
        fn send(&mut self, data: Vec<u8>) {
            // Simulate: echo response into inbox
            let response = format!("RESP:{}", String::from_utf8_lossy(&data)).into_bytes();
            self.outbox.push_back(data);
            self.inbox.push_back(response);
        }
    
        fn recv(&mut self) -> Option<Vec<u8>> {
            self.inbox.pop_front()
        }
    }
    
    // ── Session ────────────────────────────────────────────────────────────────────
    
    pub struct Session<State> {
        channel: Channel,
        log: Vec<String>,
        _state: PhantomData<State>,
    }
    
    /// Create a new session — starts in `Connected` state.
    pub fn open_session() -> Session<Connected> {
        println!("[Session] Connected");
        Session {
            channel: Channel::new(),
            log: Vec::new(),
            _state: PhantomData,
        }
    }
    
    impl Session<Connected> {
        /// Connected → RequestSent (must happen before recv)
        pub fn send_request(mut self, method: &str, path: &str) -> Session<RequestSent> {
            let msg = format!("{} {}", method, path);
            println!("[Session] → Sending: {}", msg);
            self.channel.send(msg.into_bytes());
            self.log.push(format!("SENT: {} {}", method, path));
            Session {
                channel: self.channel,
                log: self.log,
                _state: PhantomData,
            }
        }
    }
    
    impl Session<RequestSent> {
        /// RequestSent → ResponseReceived
        pub fn receive_response(mut self) -> (String, Session<ResponseReceived>) {
            let data = self.channel.recv().expect("no response in channel");
            let response = String::from_utf8_lossy(&data).into_owned();
            println!("[Session] ← Received: {}", response);
            self.log.push(format!("RECV: {}", response));
            let sess = Session {
                channel: self.channel,
                log: self.log,
                _state: PhantomData,
            };
            (response, sess)
        }
    }
    
    impl Session<ResponseReceived> {
        /// ResponseReceived → Closed (or back to Connected for next request)
        pub fn close(mut self) -> Session<Closed> {
            println!("[Session] Closed. {} log entries.", self.log.len());
            Session {
                channel: self.channel,
                log: self.log,
                _state: PhantomData,
            }
        }
    
        /// Alternatively: send another request (pipeline)
        pub fn send_next_request(mut self, method: &str, path: &str) -> Session<RequestSent> {
            let msg = format!("{} {}", method, path);
            println!("[Session] → Pipeline: {}", msg);
            self.channel.send(msg.into_bytes());
            self.log.push(format!("SENT: {} {}", method, path));
            Session {
                channel: self.channel,
                log: self.log,
                _state: PhantomData,
            }
        }
    }
    
    impl Session<Closed> {
        pub fn log_entries(&self) -> &[String] {
            &self.log
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn happy_path_compiles_and_runs() {
            let s = open_session();
            let s = s.send_request("GET", "/test");
            let (resp, s) = s.receive_response();
            let closed = s.close();
            assert!(resp.contains("GET /test"));
            assert_eq!(closed.log_entries().len(), 2);
        }
    
        #[test]
        fn response_echoes_request() {
            let s = open_session().send_request("POST", "/data");
            let (resp, _s) = s.receive_response();
            assert!(resp.contains("POST /data"), "got: {}", resp);
            _s.close();
        }
    
        #[test]
        fn pipeline_two_requests() {
            let s = open_session()
                .send_request("GET", "/a")
                .receive_response()
                .1
                .send_next_request("GET", "/b")
                .receive_response()
                .1
                .close();
            assert_eq!(s.log_entries().len(), 4);
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn happy_path_compiles_and_runs() {
            let s = open_session();
            let s = s.send_request("GET", "/test");
            let (resp, s) = s.receive_response();
            let closed = s.close();
            assert!(resp.contains("GET /test"));
            assert_eq!(closed.log_entries().len(), 2);
        }
    
        #[test]
        fn response_echoes_request() {
            let s = open_session().send_request("POST", "/data");
            let (resp, _s) = s.receive_response();
            assert!(resp.contains("POST /data"), "got: {}", resp);
            _s.close();
        }
    
        #[test]
        fn pipeline_two_requests() {
            let s = open_session()
                .send_request("GET", "/a")
                .receive_response()
                .1
                .send_next_request("GET", "/b")
                .receive_response()
                .1
                .close();
            assert_eq!(s.log_entries().len(), 4);
        }
    }

    Exercises

  • Extend the protocol to include an Authenticated state between Connected and RequestSent, requiring authenticate(token: &str) before any requests can be sent.
  • Implement a server-side dual session type ServerSession<State> that mirrors the client transitions in reverse order.
  • Write a typed RPC client that enforces the protocol Connect → Auth → (Request → Response)*N → Disconnect where N requests can be made before disconnecting.
  • Open Source Repos