1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};

use slab::Slab;

use crate::task::LocalWaker;

/// Condition allows to notify multiple receivers at the same time
pub struct Condition(Rc<RefCell<Inner>>);

struct Inner {
    data: Slab<Option<LocalWaker>>,
}

impl Default for Condition {
    fn default() -> Self {
        Self::new()
    }
}

impl Condition {
    pub fn new() -> Condition {
        Condition(Rc::new(RefCell::new(Inner { data: Slab::new() })))
    }

    /// Get condition waiter
    pub fn wait(&mut self) -> Waiter {
        let token = self.0.borrow_mut().data.insert(None);
        Waiter {
            token,
            inner: self.0.clone(),
        }
    }

    /// Notify all waiters
    pub fn notify(&self) {
        let inner = self.0.borrow();
        for item in inner.data.iter() {
            if let Some(waker) = item.1 {
                waker.wake();
            }
        }
    }
}

impl Drop for Condition {
    fn drop(&mut self) {
        self.notify()
    }
}

#[must_use = "Waiter do nothing unless polled"]
pub struct Waiter {
    token: usize,
    inner: Rc<RefCell<Inner>>,
}

impl Clone for Waiter {
    fn clone(&self) -> Self {
        let token = self.inner.borrow_mut().data.insert(None);
        Waiter {
            token,
            inner: self.inner.clone(),
        }
    }
}

impl Future for Waiter {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();

        let mut inner = this.inner.borrow_mut();
        let inner = unsafe { inner.data.get_unchecked_mut(this.token) };
        if inner.is_none() {
            let waker = LocalWaker::default();
            waker.register(cx.waker());
            *inner = Some(waker);
            Poll::Pending
        } else if inner.as_mut().unwrap().register(cx.waker()) {
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }
}

impl Drop for Waiter {
    fn drop(&mut self) {
        self.inner.borrow_mut().data.remove(self.token);
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures_util::future::lazy;

    #[actix_rt::test]
    async fn test_condition() {
        let mut cond = Condition::new();
        let mut waiter = cond.wait();
        assert_eq!(
            lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
            Poll::Pending
        );
        cond.notify();
        waiter.await;

        let mut waiter = cond.wait();
        assert_eq!(
            lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
            Poll::Pending
        );
        let mut waiter2 = waiter.clone();
        assert_eq!(
            lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await,
            Poll::Pending
        );

        drop(cond);
        waiter.await;
        waiter2.await;
    }
}