ExamplesBy LevelBy TopicLearning Paths
754 Fundamental

754-testing-async-code — Testing Async Code

Functional Programming

Tutorial Video

Text description (accessibility)

This video demonstrates the "754-testing-async-code — Testing Async Code" functional Rust example. Difficulty level: Fundamental. Key concepts covered: Functional Programming. Async code introduces new testing challenges: futures must be driven to completion, timeouts must be tested without real waits, and concurrent operations must be coordinated. Key difference from OCaml: 1. **Runtime**: Rust uses `#[tokio::test]` or `#[async_std::test]` to run async tests; OCaml uses `Lwt_main.run (test_fn ())` or `Eio_main.run`.

Tutorial

The Problem

Async code introduces new testing challenges: futures must be driven to completion, timeouts must be tested without real waits, and concurrent operations must be coordinated. In production Rust, #[tokio::test] provides a single-threaded or multi-threaded test runtime. This example uses threads and channels as a sync-compatible substitute to demonstrate the core patterns: driving async work to completion, testing timeouts, and injecting controllable fake implementations.

🎯 Learning Outcomes

  • • Test thread-based concurrent code using channels as synchronization primitives
  • • Simulate timeout behavior with recv_timeout and verify timeout errors
  • • Use dependency injection to replace real async clients with controllable fakes
  • • Understand why #[tokio::test] is needed for real async code and how it relates to sync patterns
  • • Structure tests that verify both success and error paths through concurrent code
  • Code Example

    #[tokio::test]
    async fn test_fetch() {
        let client = HttpClient::new();
        let result = client.fetch("https://example.com").await;
        assert!(result.is_ok());
    }

    Key Differences

  • Runtime: Rust uses #[tokio::test] or #[async_std::test] to run async tests; OCaml uses Lwt_main.run (test_fn ()) or Eio_main.run.
  • Timeout testing: Rust's tokio::time::pause/advance enables time manipulation without real waits; OCaml's Core.Time_ns with mocked clock serves the same purpose.
  • Mock ease: Both languages use trait/module injection for mock HTTP clients; Rust's trait objects give runtime flexibility while OCaml's functor approach is compile-time.
  • Cancellation: Tokio provides structured cancellation via CancellationToken; OCaml/Lwt uses promise cancellation; this example uses thread-join timeouts.
  • OCaml Approach

    OCaml's Lwt and Eio (effect-based) runtimes require their own test runners. Lwt_main.run drives a promise to completion in tests. Alcotest_lwt provides Alcotest_lwt.test_case for async test cases. OCaml's Mock_clock from Core_kernel allows time manipulation in tests without real sleeps. The eio library's Eio_mock provides controllable IO for testing.

    Full Source

    #![allow(clippy::all)]
    //! # Testing Async Code
    //!
    //! Patterns for testing async code (using threads as std-only analog).
    
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    
    /// A response from our "async" service
    #[derive(Debug, PartialEq)]
    pub struct Response {
        pub status: u16,
        pub body: String,
    }
    
    /// Simulated async HTTP client (using threads)
    pub struct HttpClient {
        pub timeout: Duration,
    }
    
    impl HttpClient {
        pub fn new(timeout: Duration) -> Self {
            HttpClient { timeout }
        }
    
        /// Fetch a URL (simulated with thread + channel)
        pub fn fetch(&self, url: &str) -> Result<Response, String> {
            let (tx, rx) = mpsc::channel();
            let url = url.to_string();
    
            thread::spawn(move || {
                // Simulate network delay
                thread::sleep(Duration::from_millis(10));
    
                let response = if url.contains("404") {
                    Response {
                        status: 404,
                        body: "Not Found".to_string(),
                    }
                } else if url.contains("error") {
                    return; // Simulate timeout by not sending
                } else {
                    Response {
                        status: 200,
                        body: format!("Response from {}", url),
                    }
                };
    
                let _ = tx.send(response);
            });
    
            rx.recv_timeout(self.timeout)
                .map_err(|_| "Request timed out".to_string())
        }
    }
    
    /// A service that depends on the HTTP client
    pub struct ApiService {
        client: HttpClient,
    }
    
    impl ApiService {
        pub fn new(client: HttpClient) -> Self {
            ApiService { client }
        }
    
        pub fn get_user(&self, id: u64) -> Result<String, String> {
            let url = format!("https://api.example.com/users/{}", id);
            let response = self.client.fetch(&url)?;
    
            if response.status == 200 {
                Ok(response.body)
            } else {
                Err(format!("HTTP {}", response.status))
            }
        }
    }
    
    /// Retry with backoff
    pub fn retry_with_backoff<F, T, E>(
        max_attempts: usize,
        initial_delay: Duration,
        mut f: F,
    ) -> Result<T, E>
    where
        F: FnMut() -> Result<T, E>,
    {
        let mut delay = initial_delay;
        for attempt in 1..=max_attempts {
            match f() {
                Ok(result) => return Ok(result),
                Err(e) if attempt == max_attempts => return Err(e),
                Err(_) => {
                    thread::sleep(delay);
                    delay *= 2;
                }
            }
        }
        unreachable!()
    }
    
    /// Parallel fetch multiple URLs
    pub fn fetch_all(client: &HttpClient, urls: &[&str]) -> Vec<Result<Response, String>> {
        let handles: Vec<_> = urls
            .iter()
            .map(|url| {
                let url = url.to_string();
                let timeout = client.timeout;
                thread::spawn(move || {
                    let c = HttpClient::new(timeout);
                    c.fetch(&url)
                })
            })
            .collect();
    
        handles
            .into_iter()
            .map(|h| h.join().unwrap_or(Err("Thread panicked".to_string())))
            .collect()
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_fetch_success() {
            let client = HttpClient::new(Duration::from_secs(1));
            let result = client.fetch("https://example.com");
            assert!(result.is_ok());
            assert_eq!(result.unwrap().status, 200);
        }
    
        #[test]
        fn test_fetch_404() {
            let client = HttpClient::new(Duration::from_secs(1));
            let result = client.fetch("https://example.com/404");
            assert!(result.is_ok());
            assert_eq!(result.unwrap().status, 404);
        }
    
        #[test]
        fn test_fetch_timeout() {
            let client = HttpClient::new(Duration::from_millis(1));
            let result = client.fetch("https://example.com/error");
            assert!(result.is_err());
            assert!(result.unwrap_err().contains("timed out"));
        }
    
        #[test]
        fn test_api_service_get_user() {
            let client = HttpClient::new(Duration::from_secs(1));
            let service = ApiService::new(client);
            let result = service.get_user(123);
            assert!(result.is_ok());
        }
    
        #[test]
        fn test_retry_succeeds_first_try() {
            let mut attempts = 0;
            let result = retry_with_backoff(3, Duration::from_millis(1), || {
                attempts += 1;
                Ok::<_, &str>(42)
            });
            assert_eq!(result, Ok(42));
            assert_eq!(attempts, 1);
        }
    
        #[test]
        fn test_retry_succeeds_after_failures() {
            let mut attempts = 0;
            let result = retry_with_backoff(3, Duration::from_millis(1), || {
                attempts += 1;
                if attempts < 3 {
                    Err("transient error")
                } else {
                    Ok(42)
                }
            });
            assert_eq!(result, Ok(42));
            assert_eq!(attempts, 3);
        }
    
        #[test]
        fn test_fetch_all_parallel() {
            let client = HttpClient::new(Duration::from_secs(1));
            let urls = ["https://a.com", "https://b.com", "https://c.com"];
            let results = fetch_all(&client, &urls);
            assert_eq!(results.len(), 3);
            assert!(results.iter().all(|r| r.is_ok()));
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn test_fetch_success() {
            let client = HttpClient::new(Duration::from_secs(1));
            let result = client.fetch("https://example.com");
            assert!(result.is_ok());
            assert_eq!(result.unwrap().status, 200);
        }
    
        #[test]
        fn test_fetch_404() {
            let client = HttpClient::new(Duration::from_secs(1));
            let result = client.fetch("https://example.com/404");
            assert!(result.is_ok());
            assert_eq!(result.unwrap().status, 404);
        }
    
        #[test]
        fn test_fetch_timeout() {
            let client = HttpClient::new(Duration::from_millis(1));
            let result = client.fetch("https://example.com/error");
            assert!(result.is_err());
            assert!(result.unwrap_err().contains("timed out"));
        }
    
        #[test]
        fn test_api_service_get_user() {
            let client = HttpClient::new(Duration::from_secs(1));
            let service = ApiService::new(client);
            let result = service.get_user(123);
            assert!(result.is_ok());
        }
    
        #[test]
        fn test_retry_succeeds_first_try() {
            let mut attempts = 0;
            let result = retry_with_backoff(3, Duration::from_millis(1), || {
                attempts += 1;
                Ok::<_, &str>(42)
            });
            assert_eq!(result, Ok(42));
            assert_eq!(attempts, 1);
        }
    
        #[test]
        fn test_retry_succeeds_after_failures() {
            let mut attempts = 0;
            let result = retry_with_backoff(3, Duration::from_millis(1), || {
                attempts += 1;
                if attempts < 3 {
                    Err("transient error")
                } else {
                    Ok(42)
                }
            });
            assert_eq!(result, Ok(42));
            assert_eq!(attempts, 3);
        }
    
        #[test]
        fn test_fetch_all_parallel() {
            let client = HttpClient::new(Duration::from_secs(1));
            let urls = ["https://a.com", "https://b.com", "https://c.com"];
            let results = fetch_all(&client, &urls);
            assert_eq!(results.len(), 3);
            assert!(results.iter().all(|r| r.is_ok()));
        }
    }

    Deep Comparison

    OCaml vs Rust: Testing Async Code

    Async Test Setup

    Rust (tokio)

    #[tokio::test]
    async fn test_fetch() {
        let client = HttpClient::new();
        let result = client.fetch("https://example.com").await;
        assert!(result.is_ok());
    }
    

    Rust (std-only with threads)

    #[test]
    fn test_fetch() {
        let client = HttpClient::new(Duration::from_secs(1));
        let result = client.fetch("https://example.com");
        assert!(result.is_ok());
    }
    

    OCaml (Lwt)

    let%lwt () =
      let%lwt response = Http_client.fetch "https://example.com" in
      assert (response.status = 200);
      Lwt.return ()
    

    Timeout Testing

    Rust

    #[test]
    fn test_timeout() {
        let client = HttpClient::new(Duration::from_millis(1));
        let result = client.fetch("https://slow.example.com");
        assert!(result.is_err());
    }
    

    OCaml

    let test_timeout () =
      let timeout = Lwt_unix.timeout 0.001 in
      match Lwt_main.run (Lwt.pick [timeout; fetch url]) with
      | exception Lwt_unix.Timeout -> ()
      | _ -> failwith "expected timeout"
    

    Retry with Backoff

    Rust

    pub fn retry_with_backoff<F, T, E>(
        max_attempts: usize,
        initial_delay: Duration,
        mut f: F,
    ) -> Result<T, E> {
        let mut delay = initial_delay;
        for attempt in 1..=max_attempts {
            match f() {
                Ok(result) => return Ok(result),
                Err(e) if attempt == max_attempts => return Err(e),
                Err(_) => {
                    thread::sleep(delay);
                    delay *= 2;
                }
            }
        }
    }
    

    Key Differences

    AspectOCamlRust
    Async runtimeLwt, Asynctokio, async-std
    Test attributeppx_lwt#[tokio::test]
    ChannelsLwt_mvarmpsc, crossbeam
    TimeoutLwt_unix.timeoutrecv_timeout
    ParallelismLwt.jointokio::join!

    Exercises

  • Add a retry wrapper that retries a failed HTTP request up to N times with exponential backoff, and write tests that verify retry counts using a CountingMockClient.
  • Implement a Circuit Breaker that opens after 3 consecutive failures and write tests that verify the open/half-open/closed state transitions.
  • Write a test for concurrent requests using thread::scope to spawn 10 parallel requests and verify all responses are received correctly.
  • Open Source Repos