Examples

This page shows some real life examples. Before reading this page, be sure that you have read examples in the Quickstart section. Those examples are better starting point than examples in this page.

Basic Usage

Simple Client

Shows how to send a basic task to scheduler

"""
This example demonstrates the most basic implementation to work with scaler
Scaler applications have three parts - scheduler, cluster, and client.
Scheduler is used to schedule works send from client to cluster.
Cluster, composed of 1 or more worker(s), are used to execute works.
Client is used to send tasks to scheduler.

This example shows a client sends 100 tasks, where each task represents the
execution of math.sqrt function and get back the results.
"""

import math

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def main():
    # Instantiate a SchedulerClusterCombo which contains a scheduler and a cluster that contains n_workers workers. In
    # this case, there are 10 workers. There are more options to control the behavior of SchedulerClusterCombo, you can
    # check them out in other examples.
    cluster = SchedulerClusterCombo(n_workers=10)

    # Instantiate a Client that represents a client aforementioned. One may submit task using client.
    # Since client is sending task(s) to the scheduler, we need to know the address that the scheduler has. In this
    # case, we can get the address using cluster.get_address()
    with Client(address=cluster.get_address()) as client:
        # Submits 100 tasks
        futures = [
            # In each iteration of the loop, we submit one task to the scheduler. Each task represents the execution of
            # a function defined by you.
            # Note: Users are responsible to correctly provide the argument(s) of a function that the user wish to call.
            # Fail to do so results in exception.
            # This is to demonstrate client.submit(). A better way to implement this particular case is to use
            # client.map(). See `map_client.py` for more detail.
            client.submit(math.sqrt, i)
            for i in range(0, 100)
        ]

        # Each call to Client.submit returns a future. Users are expected to keep the future until the task has been
        # finished, or cancelled. The future returned by Client.submit is the only way to get results from corresponding
        # tasks. In this case, future.result() will return a float, but this can be any type should the user wish.
        result = sum(future.result() for future in futures)

        print(result)  # 661.46

    cluster.shutdown()


if __name__ == "__main__":
    main()

Client Mapping Tasks

Shows how to use client.map()

"""
This example shows how to use the Client.map() method.
Client.map() allows the user to invoke a callable many times with different values.
For more information on the map operation, refer to
https://en.wikipedia.org/wiki/Map_(higher-order_function)
"""

import math

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def main():
    # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
    cluster = SchedulerClusterCombo(n_workers=10)

    with Client(address=cluster.get_address()) as client:
        # map each integer in [0, 100) through math.sqrt()
        # the first parameter is the function to call, and the second is a list of argument tuples
        # (x,) denotes a tuple of length one
        results = client.map(math.sqrt, [(x,) for x in range(100)])

        # Collect the results and sums them
        result = sum(results)

        print(result)

    cluster.shutdown()


if __name__ == "__main__":
    main()

Graph Task

Shows how to send a graph based task to scheduler

"""This example shows how to utilize graph task functionality provided by scaler."""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def inc(i):
    return i + 1


def add(a, b):
    return a + b


def minus(a, b):
    return a - b


# A graph task is defined as a dict with str as the key type and val_t as the value type, where val_t is defined as
# follows:
# Union[Any, Tuple[Union[Callable, str], ...]
# Each value can be one of the following:
# - a basic data type (int, List, etc.),
# - a callable,
# - a tuple of the form (Callable, key_t val1, key_t val2, ...)
# that represents a function call.
graph = {
    "a": 2,
    "b": 2,
    "c": (inc, "a"),  # c = a + 1 = 2 + 1 = 3
    "d": (add, "a", "b"),  # d = a + b = 2 + 2 = 4
    "e": (minus, "d", "c"),  # e = d - c = 4 - 3 = 1
    "f": add,
}


def main():
    # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
    cluster = SchedulerClusterCombo(n_workers=1)

    with Client(address=cluster.get_address()) as client:
        # See graph's definition for more detail.
        # The result is a dictionary that contains the requested keys.
        # Each value provided in the graph will be evaluated and passed back.
        result = client.get(graph, keys=["a", "b", "c", "d", "e", "f"])
        print(result.get("e"))
        print(result)  # {'a': 2, 'b': 2, 'c': 3, 'd': 4, 'e': 1, 'f': <function add at 0x70af1e29b4c0>}

    cluster.shutdown()


if __name__ == "__main__":
    main()

Nested Task (Submit A Parallel Task Within A Parallel Task)

Shows how to send a nested task to scheduler

"""This example shows how to created nested tasks. Please see graphtask_nested_client.py for more information."""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


# Calculate fibonacci sequence with nested client.
# Each intermediate call in the recursive process is submitted to the client.
def fibonacci(client: Client, n: int):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        a = client.submit(fibonacci, client, n - 1)
        b = client.submit(fibonacci, client, n - 2)
        return a.result() + b.result()


def main():
    # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
    cluster = SchedulerClusterCombo(n_workers=1)

    with Client(address=cluster.get_address()) as client:
        result = client.submit(fibonacci, client, 8).result()
        print(result)  # 21

    cluster.shutdown()


if __name__ == "__main__":
    main()

Nested Graph Task (Submit Graph Tasks Recursively)

Shows how to dynamically build graph in the remote end

Warning

This is a toy example, it’s not recommended to build recursion that deep, as it will be slow.

"""This example shows how to build graph dynamically in the remote side"""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def minus(a, b):
    return a - b


def fibonacci(clnt: Client, n: int):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        # Dynamically building graph in the worker side is okay.
        # BE WARNED! You are not suppose to use it like that. This is to demonstrate the ability instead of intention
        # of what graph can do. This should rarely be done. Redesign if you find yourself in this position. With the
        # ability to dynamically build a graph, one can even concatenate the source graph to its child (as long as they
        # evaluate to a value).
        fib_graph = {"n": n, "one": 1, "two": 2, "n_minus_one": (minus, "n", "one"), "n_minus_two": (minus, "n", "two")}
        res = clnt.get(fib_graph, keys=["n_minus_one", "n_minus_two"])
        n_minus_one = res.get("n_minus_one")
        n_minus_two = res.get("n_minus_two")
        a = clnt.submit(fibonacci, clnt, n_minus_one)
        b = clnt.submit(fibonacci, clnt, n_minus_two)
        return a.result() + b.result()


def main():
    # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
    cluster = SchedulerClusterCombo(n_workers=10)

    with Client(address=cluster.get_address()) as client:
        result = client.submit(fibonacci, client, 8).result()
        print(result)  # 21

    cluster.shutdown()


if __name__ == "__main__":
    main()

Disconnect Client

Shows how to disconnect a client from scheduler

"""This example shows how to clear resources owned by a client and how to disconnect a client from the scheduler."""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def main():
    cluster = SchedulerClusterCombo(n_workers=10)
    client = Client(address=cluster.get_address())
    # Client.clear() will clear all computation resources owned by the client. All unfinished tasks will be cancelled,
    # and all object reference will be invalidated. The client can submit tasks as it wishes.
    client.clear()

    # Once disconnect is called, this client is invalidated, and no tasks can be installed on this client.
    # Should the user wish to initiate more tasks, they should instantiate another Client. The scheduler's running
    # state will not be affected by this method.
    client.disconnect()

    # The user may also choose to shutdown the scheduler while disconnecting client from the scheduler. Such a request
    # is not guaranteed to succeed, as the scheduler can only be closed when it is not running under "protected" mode.
    # client.shutdown()

    cluster.shutdown()


if __name__ == "__main__":
    main()

Applications

Calculate Implied Volatility

This example calculate implied volatility for many given market price. We use client.map to achieve such goal. Notice that we provide chunk of data as input to find_volatilities. This is because client.map makes sense only in two cases: A. The body of the function you are submitting is quite large; B. The function is very computation heavy or is possible to block.

"""
This program computes the implied Black-Scholes volatility given market price and model price.

This program is revised based on
https://stackoverflow.com/questions/61289020/fast-implied-volatility-calculation-in-python

Usage:
    $ git clone https://github.com/Citi/scaler && cd scaler
    $ pip install -r examples/applications/requirements_applications.txt
    $ python -m examples.applications.implied_volatility
"""

from timeit import default_timer
from typing import List

import numpy as np
import psutil
from scipy.stats import norm

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def bs_call(S, K, T, r, vol):
    d1 = (np.log(S / K) + (r + 0.5 * vol**2) * T) / (vol * np.sqrt(T))
    d2 = d1 - vol * np.sqrt(T)
    return S * norm.cdf(d1) - np.exp(-r * T) * K * norm.cdf(d2)


def bs_vega(S, K, T, r, sigma):
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    return S * norm.pdf(d1) * np.sqrt(T)


def find_volatility(target_value, S, K, T, r) -> float:
    MAX_ITERATIONS = 200
    PRECISION = 1.0e-5
    sigma = 0.5
    for _ in range(0, MAX_ITERATIONS):
        price = bs_call(S, K, T, r, sigma)
        vega = bs_vega(S, K, T, r, sigma)
        diff = target_value - price  # our root
        if abs(diff) < PRECISION:
            return sigma
        sigma = sigma + diff / vega  # f(x) / f'(x)
    return sigma  # value wasn't found, return best guess so far


def find_volatilities(dataset: np.ndarray) -> List[float]:
    return [find_volatility(*sample) for sample in dataset]


def generate_synthetic_data(n_samples: int) -> np.ndarray:
    S = np.random.randint(100, 200, n_samples)  # stock prices
    K = S * 1.25  # strike prices
    T = np.ones(n_samples)  # time maturity (year)
    r = np.random.randint(0, 3, n_samples) / 100  # risk-free-rate
    vol = np.random.randint(15, 50, n_samples) / 100  # volatility
    prices = bs_call(S, K, T, r, vol)

    return np.column_stack((prices, S, K, T, r))


def main():
    N_SAMPLES = 20_000
    N_SAMPLES_PER_TASK = 1_000

    n_workers = psutil.cpu_count()
    if n_workers is None:
        n_workers = 4

    cluster = SchedulerClusterCombo(n_workers=n_workers)

    dataset = generate_synthetic_data(N_SAMPLES)

    # Split the dataset in chunks of up to `N_SAMPLES_PER_TASK`.
    per_task_dataset = [
        (dataset[data_begin : data_begin + N_SAMPLES_PER_TASK],)
        for data_begin in range(0, N_SAMPLES, N_SAMPLES_PER_TASK)
    ]

    # Process the dataset in parallel, concatenate the results
    with Client(address=cluster.get_address()) as client:
        start = default_timer()
        results = np.concatenate(client.map(find_volatilities, per_task_dataset))
        duration = default_timer() - start

    cluster.shutdown()

    print(f"Computed {len(results)} stock volatilities in {duration:.2f}s")


if __name__ == "__main__":
    main()

Get Option Close Price Parallelly

This example gets option closing price for a specified ticker and the start date. We use client.map to reduce the overhead introduced by slow IO speed.

"""
This program gets closing price of a given ticker and start dates. This program is revised based on
https://stackoverflow.com/a/77342764

Usage:
    $ git clone https://github.com/Citi/scaler && cd scaler
    $ pip install -r examples/applications/requirements_applications.txt
    $ python -m examples.applications.yfinance_historical_price
"""

import datetime
from typing import Optional

import psutil
import yfinance as yf

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def get_option_data(stock_symbol: str, expiration_date: Optional[str], option_type: str, strike: float):
    stock = yf.Ticker(stock_symbol)
    option_chain = stock.option_chain(expiration_date)
    options = getattr(option_chain, "calls" if option_type.startswith("call") else "puts")
    option_data = options[options["strike"] == strike]
    return option_data


def get_option_history_data(contract_symbol, days_before_expiration: int = 30):
    option = yf.Ticker(contract_symbol)
    option_info = option.info
    option_expiration_date = datetime.datetime.fromtimestamp(option_info["expireDate"])

    start_date = option_expiration_date - datetime.timedelta(days=days_before_expiration)
    option_history = option.history(start=start_date)
    return option_history


def get_option_close_prices_with_strike(strike):
    stock_symbol = "AAPL"
    expiration_date = None  # User may wish to specify expiration_date
    days_before_expiration = 30
    option_type = "call"

    res = []
    option_data = get_option_data(stock_symbol, expiration_date, option_type, strike)
    for _, od in option_data.iterrows():
        contract_symbol = od["contractSymbol"]
        option_history = get_option_history_data(contract_symbol, days_before_expiration)
        first_option_history = option_history.iloc[0]
        first_option_history_date = option_history.index[0]
        first_option_history_close = first_option_history["Close"]
        res.append((contract_symbol, first_option_history_close, first_option_history_date))
    return res


def main():
    n_workers = psutil.cpu_count()
    if n_workers is None:
        n_workers = 4

    strike_start = 170
    strike_end = 250

    cluster = SchedulerClusterCombo(n_workers=n_workers)

    with Client(address=cluster.get_address()) as client:
        results = client.map(
            get_option_close_prices_with_strike, [(strike,) for strike in range(strike_start, strike_end)]
        )

    cluster.shutdown()

    for lists_of_closing_dates in results:
        for contract_symbol, first_option_history_close, first_option_history_date in lists_of_closing_dates:
            print(
                f"For {contract_symbol}, the closing price was ${first_option_history_close:.2f} on "
                f"{first_option_history_date}."
            )


if __name__ == "__main__":
    main()

Distributed Image Processing

This example uses the Pillow library with Scaler to resize images in parallel.

"""
This example uses the Pillow library with Scaler to resize images in parallel.
"""

import os
import sys
import tempfile
from multiprocessing import cpu_count
from PIL import Image, UnidentifiedImageError
from scaler import SchedulerClusterCombo, Client


def process_image(source: str, dest: str):
    try:
        im = Image.open(source)
    except UnidentifiedImageError:
        return  # ignore non-image files

    # resize the image if it's too big
    if im.width > 1024 or im.height > 1024:
        im.thumbnail((1024, 1024))

    # this works because the workers are being run on the same machine as the client
    im.save(dest)
    im.close()

    print(f"Saved processed image into {dest}")


def main():
    script_path = os.path.dirname(os.path.abspath(__file__))

    if len(sys.argv) != 2:
        source_dir = os.path.join(script_path, "..", "images")
        print(f"Directory not provided as argument, using default: {source_dir}")
    else:
        source_dir = sys.argv[1]

    cluster = SchedulerClusterCombo(n_workers=cpu_count())

    with Client(address=cluster.get_address()) as client:
        with tempfile.TemporaryDirectory() as dest_dir:
            client.map(process_image, [
                (os.path.join(source_dir, filename), os.path.join(dest_dir, filename))
                for filename in os.listdir(source_dir)]
            )

    cluster.shutdown()


if __name__ == "__main__":
    main()

Parallel Timeseries Cross-Validation

This example uses the Prophet library with Scaler to perform parallelized cross-validation.

"""
This example uses the Prophet library with Scaler to perform parallelized cross-validation.
This reveals a ~4x speedup over the non-parallelized version.
"""

import pandas as pd
from prophet import Prophet
import prophet.diagnostics
from timeit import default_timer as timer
from multiprocessing import cpu_count
from scaler import SchedulerClusterCombo, Client


def main():
    cluster = SchedulerClusterCombo(n_workers=cpu_count())

    with Client(address=cluster.get_address()) as client:
        # Load the data
        df = pd.read_csv(
            "https://raw.githubusercontent.com/facebook/prophet/master/examples/example_wp_log_peyton_manning.csv",
            parse_dates=["ds"]
        )

        model = Prophet(daily_seasonality=False)

        # Scaler is not involved in the fitting process
        model.fit(df)

        # This adapts the Scaler client to the Prophet diagnostics API
        class Adapter:
            def __init__(self, client: Client):
                self.client = client

            def map(self, func, *iterables):
                return self.client.map(func, [args for args in zip(*iterables)])

        # non-parallelized cross-validation
        start = timer()
        prophet.diagnostics.cross_validation(
            model,
            initial="730 days",
            period="180 days",
            horizon="365 days",
            parallel=None
        )
        non_parallel_time = timer() - start

        # Parallelized cross-validation via Scaler
        start = timer()
        prophet.diagnostics.cross_validation(
            model,
            initial="730 days",
            period="180 days",
            horizon="365 days",
            parallel=Adapter(client)
        )
        parallel_time = timer() - start

    cluster.shutdown()

    print("-" * 30)
    print(f"Non-parallel time: {non_parallel_time:.2f}s")
    print(f"Parallel time: {parallel_time:.2f}s")
    print(f"Speedup: {non_parallel_time / parallel_time:.1f}x")


if __name__ == "__main__":
    main()