Reactive Paradigm#
Initial Set-Up#
from typing import Callable, List
import reactivex as rx
import reactivex.operators as op
def store_results(results: List[float]):
def store_value(value: float) -> None:
results.append(value)
return store_value
Predecessor and Successor#
def predecessor(
a: rx.Observable[float], callback: Callable[..., None]
) -> None:
predecessor: rx.Observable[float] = a.pipe(op.map(lambda x: x - 1))
predecessor.subscribe(callback)
results = []
predecessor(rx.of(1, 10), store_results(results))
assert results[0] == 0
assert results[1] == 9
def successor(a: rx.Observable[float], callback: Callable[..., None]) -> None:
successor: rx.Observable[float] = a.pipe(op.map(lambda x: x + 1))
successor.subscribe(callback)
results = []
successor(rx.of(0, 10), store_results(results))
assert results[0] == 1
assert results[1] == 11
Addition#
def addition(
a: rx.Observable[float], b: rx.Observable[float], callback: Callable[..., None]
) -> None:
pair = rx.zip(a, b)
pair_sum: rx.Observable[float] = pair.pipe(op.map(lambda pair: pair[0] + pair[1]))
pair_sum.subscribe(callback)
results = []
addition(rx.of(0, 1, 0, 10), rx.of(0, 0, 1, 10), store_results(results))
assert results[0] == 0
assert results[1] == 1
assert results[2] == 1
assert results[3] == 20
Multiplication#
def multiplication(
a: rx.Observable[float], b: rx.Observable[float], callback: Callable[..., None]
) -> None:
pair = rx.zip(a, b)
pair_multiplication: rx.Observable[float] = pair.pipe(
op.map(lambda pair: pair[0] * pair[1])
)
pair_multiplication.subscribe(callback)
results = []
multiplication(rx.of(0, 1, 0, 10), rx.of(0, 0, 1, 10), store_results(results))
assert results[0] == 0
assert results[1] == 0
assert results[2] == 0
assert results[3] == 100
Exponentiation#
def exponentiation(
a: rx.Observable[float], b: rx.Observable[float], callback: Callable[..., None]
) -> None:
pair = rx.zip(a, b)
pair_exponentiation: rx.Observable[float] = pair.pipe(
op.map(lambda pair: pair[0] ** pair[1])
)
pair_exponentiation.subscribe(callback)
results = []
exponentiation(rx.of(1, 0, 3), rx.of(0, 1, 3), store_results(results))
assert results[0] == 1
assert results[1] == 0
assert results[2] == 27