Reactive Paradigm#

Initial Set-Up#

from typing import Callable, List
import reactivex as rx
import reactivex.operators as op
def store_results(results: List[float]):
    def store_value(value: float) -> None:
        results.append(value)

    return store_value

Predecessor and Successor#

def predecessor(
    a: rx.Observable[float], callback: Callable[..., None]
) -> None:
    predecessor: rx.Observable[float] = a.pipe(op.map(lambda x: x - 1))
    predecessor.subscribe(callback)


results = []
predecessor(rx.of(1, 10), store_results(results))

assert results[0] == 0
assert results[1] == 9
def successor(a: rx.Observable[float], callback: Callable[..., None]) -> None:
    successor: rx.Observable[float] = a.pipe(op.map(lambda x: x + 1))
    successor.subscribe(callback)


results = []
successor(rx.of(0, 10), store_results(results))

assert results[0] == 1
assert results[1] == 11

Addition#

def addition(
    a: rx.Observable[float], b: rx.Observable[float], callback: Callable[..., None]
) -> None:
    pair = rx.zip(a, b)
    pair_sum: rx.Observable[float] = pair.pipe(op.map(lambda pair: pair[0] + pair[1]))
    pair_sum.subscribe(callback)


results = []
addition(rx.of(0, 1, 0, 10), rx.of(0, 0, 1, 10), store_results(results))

assert results[0] == 0
assert results[1] == 1
assert results[2] == 1
assert results[3] == 20

Multiplication#

def multiplication(
    a: rx.Observable[float], b: rx.Observable[float], callback: Callable[..., None]
) -> None:
    pair = rx.zip(a, b)
    pair_multiplication: rx.Observable[float] = pair.pipe(
        op.map(lambda pair: pair[0] * pair[1])
    )
    pair_multiplication.subscribe(callback)


results = []
multiplication(rx.of(0, 1, 0, 10), rx.of(0, 0, 1, 10), store_results(results))
assert results[0] == 0
assert results[1] == 0
assert results[2] == 0
assert results[3] == 100

Exponentiation#

def exponentiation(
    a: rx.Observable[float], b: rx.Observable[float], callback: Callable[..., None]
) -> None:
    pair = rx.zip(a, b)
    pair_exponentiation: rx.Observable[float] = pair.pipe(
        op.map(lambda pair: pair[0] ** pair[1])
    )
    pair_exponentiation.subscribe(callback)


results = []
exponentiation(rx.of(1, 0, 3), rx.of(0, 1, 3), store_results(results))
assert results[0] == 1
assert results[1] == 0
assert results[2] == 27