1 type Stream<T> @abstract = 2 let callbacks = StdList<T -> Unit>.new 3 let invoking_callbacks = StdList<T -> Unit>.new 4 let tokens = StdList<Token>.new 5 6 def subscribe (f : T -> Unit) = 7 callbacks.add f 8 9 Token { callbacks.remove f } 10 11 def add (f : T -> Unit) = 12 callbacks.add f 13 14 def is_empty = callbacks.is_empty 15 def is_not_empty = callbacks.is_not_empty 16 17 fun destruct = 18 for i = tokens.size - 1 downto 0 do 19 tokens[i].discard 20 21 type MutStream<T> @[mut_of Stream] = 22 inherit Stream<T> 23 24 let mut index = 0 25 26 def push (x : T) = 27 if callbacks.is_not_empty then 28 for f in callbacks do 29 invoking_callbacks.add f 30 31 index += 1 32 let current_index = index 33 let version = callbacks.version 34 for f in invoking_callbacks do 35 if callbacks.version == version || callbacks.contains f then 36 f x 37 if current_index <> index then return 38 39 invoking_callbacks.clear 40 41 def push_token (token : Token) = 42 tokens.add token 43 44 def as_readonly = self as Stream<T> 45