1    type Stream<T> @abstract =
2        let callbacks = StdList<T -> Unit>.new
3        let invoking_callbacks = StdList<T -> Unit>.new
4        let tokens = StdList<Token>.new
5    
6        def subscribe (f : T -> Unit) =
7            callbacks.add f
8    
9            Token { callbacks.remove f }
10   
11       def add (f : T -> Unit) =
12           callbacks.add f
13   
14       def is_empty = callbacks.is_empty
15       def is_not_empty = callbacks.is_not_empty
16   
17       fun destruct =
18           for i = tokens.size - 1 downto 0 do
19               tokens[i].discard
20   
21   type MutStream<T> @[mut_of Stream] =
22       inherit Stream<T>
23   
24       let mut index = 0
25   
26       def push (x : T) =
27           if callbacks.is_not_empty then
28               for f in callbacks do
29                   invoking_callbacks.add f
30   
31               index += 1
32               let current_index = index
33               let version = callbacks.version
34               for f in invoking_callbacks do
35                   if callbacks.version == version || callbacks.contains f then
36                       f x
37                       if current_index <> index then return
38   
39               invoking_callbacks.clear
40   
41       def push_token (token : Token) =
42           tokens.add token
43   
44       def as_readonly = self as Stream<T>
45