Additional Features

Scaler comes with a number of additional features that can be used to monitor and profile tasks, and customize behavior.

Scaler Top (Monitoring)

Top is a monitoring tool that allows you to see the status of the Scaler. The scheduler prints an address to the logs on startup that can be used to connect to it with the scaler_top CLI command:

scaler_top ipc:///tmp/0.0.0.0_8516_monitor

Which will show an interface similar to the standard Linux top command:

scheduler        | task_manager        |     scheduler_sent         | scheduler_received
      cpu   0.0% |   unassigned      0 |      HeartbeatEcho 283,701 |          Heartbeat 283,701
      rss 130.1m |      running      0 |     ObjectResponse     233 |      ObjectRequest     215
                 |      success 53,704 |           TaskEcho  53,780 |               Task  53,764
                 |       failed     14 |               Task  54,660 |         TaskResult  53,794
                 |     canceled     48 |         TaskResult  53,766 |  DisconnectRequest      21
                 |    not_found     14 |      ObjectRequest     366 |         TaskCancel      60
                                       | DisconnectResponse      21 |    BalanceResponse      15
                                       |         TaskCancel      62 |          GraphTask       6
                                       |     BalanceRequest      15 |
                                       |    GraphTaskResult       6 |
-------------------------------------------------------------------------------------------------
Shortcuts: worker[n] agt_cpu[C] agt_rss[M] cpu[c] rss[m] free[f] sent[w] queued[d] lag[l]

Total 7 worker(s)
                   worker agt_cpu agt_rss [cpu]    rss free sent queued   lag ITL |    client_manager
2732890|sd-1e7d-dfba|d26+    0.5%  111.8m  0.5% 113.3m 1000    0      0 0.7ms 100 |
2732885|sd-1e7d-dfba|56b+    0.0%  111.0m  0.5% 111.2m 1000    0      0 0.7ms 100 | func_to_num_tasks
2732888|sd-1e7d-dfba|108+    0.0%  111.7m  0.5% 111.0m 1000    0      0 0.6ms 100 |
2732891|sd-1e7d-dfba|149+    0.0%  113.0m  0.0% 112.2m 1000    0      0 0.9ms 100 |
2732889|sd-1e7d-dfba|211+    0.5%  111.7m  0.0% 111.2m 1000    0      0   1ms 100 |
2732887|sd-1e7d-dfba|e48+    0.5%  112.6m  0.0% 111.0m 1000    0      0 0.9ms 100 |
2732886|sd-1e7d-dfba|345+    0.0%  111.5m  0.0% 112.8m 1000    0      0 0.8ms 100 |
  • scheduler section shows the scheduler’s resource usage

  • task_manager section shows the status of tasks

  • scheduler_sent section counts the number of each type of message sent by the scheduler

  • scheduler_received section counts the number of each type of message received by the scheduler

  • worker section shows worker details, you can use shortcuts to sort by columns, and the * in the column header shows which column is being used for sorting

    • agt_cpu/agt_rss means cpu/memory usage of the worker agent

    • cpu/rss means cpu/memory usage of the worker

    • free means number of free task slots for the worker

    • sent means how many tasks scheduler sent to the worker

    • queued means how many tasks worker received and enqueued

    • lag means the latency between scheduler and the worker

    • ITL means is debug information

      • I means processor initialized

      • T means have a task or not

      • L means task lock

Task Profiling

To get the execution time of a task, submit it with profiling turned on.

We need to call fut.result() to ensure that the task has been completed.

from scaler import Client

def calculate(sec: int):
    return sec * 1

client = Client(address="tcp://127.0.0.1:2345")
fut = client.submit(calculate, 1, profiling=True)

# this will execute the task
fut.result()

# contains task run duration time in microseconds
fut.profiling_info().duration_us

# contains the peak memory usage in bytes for the task, this memory peak is sampled every second
fut.profiling_info().peak_memory

Send Object

Scaler can send objects to the workers. This is useful for sending large objects that are needed for the tasks, and are reused frequently. This allows you to avoid the overhead of sending the object multiple times.

  • The object is sent to the workers only once

  • The Scaler API will return a special reference to the object

  • Workers can use this reference to access the object, but this reference must be provided as a positional argument * The reference cannot be nested in another reference or inside a list, etc.

"""
This example demonstrates how to use the Client.send_object() method.
This method is used to submit large objects to the cluster.
Users can then reuse this object without needing to retransmit it multiple times.
"""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo

large_object = [1, 2, 3, 4, 5]


def query(object_reference, idx):
    return object_reference[idx]


def main():
    # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
    cluster = SchedulerClusterCombo(n_workers=1)

    with Client(address=cluster.get_address()) as client:
        # Send the "large" to the cluster for reuse. Providing a name for the object is optional.
        # This method returns a reference to the object that we can use in place of the original object.
        large_object_ref = client.send_object(large_object, name="large_object")

        # Reuse through object reference
        # Note that this example is not very interesting, since query is essentially a cheap operation that should be
        # done in local end. We chose this operation since it demonstrates that operation and operator defined on the
        # original type (list) can be applied to the reference.
        fut1 = client.submit(query, large_object_ref, 0)
        fut2 = client.submit(query, large_object_ref, 1)

        # Get the result from the future.
        print(fut1.result())
        print(fut2.result())

    cluster.shutdown()


if __name__ == "__main__":
    main()

Graph Submission

Some tasks are complex and depend on the output of other tasks. Scaler supports submitting tasks as a DAG graph and will handle executing the dependencies in the correct order and communicating between tasks.

"""This example shows how to utilize graph task functionality provided by scaler."""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def inc(i):
    return i + 1


def add(a, b):
    return a + b


def minus(a, b):
    return a - b


# A graph task is defined as a dict with str as the key type and val_t as the value type, where val_t is defined as
# follows:
# Union[Any, Tuple[Union[Callable, str], ...]
# Each value can be one of the following:
# - a basic data type (int, List, etc.),
# - a callable,
# - a tuple of the form (Callable, key_t val1, key_t val2, ...)
# that represents a function call.
graph = {
    "a": 2,
    "b": 2,
    "c": (inc, "a"),  # c = a + 1 = 2 + 1 = 3
    "d": (add, "a", "b"),  # d = a + b = 2 + 2 = 4
    "e": (minus, "d", "c"),  # e = d - c = 4 - 3 = 1
    "f": add,
}


def main():
    # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
    cluster = SchedulerClusterCombo(n_workers=1)

    with Client(address=cluster.get_address()) as client:
        # See graph's definition for more detail.
        # The result is a dictionary that contains the requested keys.
        # Each value provided in the graph will be evaluated and passed back.
        result = client.get(graph, keys=["a", "b", "c", "d", "e", "f"])
        print(result.get("e"))
        print(result)  # {'a': 2, 'b': 2, 'c': 3, 'd': 4, 'e': 1, 'f': <function add at 0x70af1e29b4c0>}

    cluster.shutdown()


if __name__ == "__main__":
    main()

Nested Tasks

Tasks can depend on other tasks’ result without using graph.

"""This example shows how to created nested tasks. Please see graphtask_nested_client.py for more information."""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


# Calculate fibonacci sequence with nested client.
# Each intermediate call in the recursive process is submitted to the client.
def fibonacci(client: Client, n: int):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        a = client.submit(fibonacci, client, n - 1)
        b = client.submit(fibonacci, client, n - 2)
        return a.result() + b.result()


def main():
    # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
    cluster = SchedulerClusterCombo(n_workers=1)

    with Client(address=cluster.get_address()) as client:
        result = client.submit(fibonacci, client, 8).result()
        print(result)  # 21

    cluster.shutdown()


if __name__ == "__main__":
    main()

Dynamically Building Graph Within a Task

When the execution graph is undetermined until runtime, one may build graph dynamically on remote end.

"""This example shows how to build graph dynamically in the remote side"""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def minus(a, b):
    return a - b


def fibonacci(clnt: Client, n: int):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        # Dynamically building graph in the worker side is okay.
        # BE WARNED! You are not suppose to use it like that. This is to demonstrate the ability instead of intention
        # of what graph can do. This should rarely be done. Redesign if you find yourself in this position. With the
        # ability to dynamically build a graph, one can even concatenate the source graph to its child (as long as they
        # evaluate to a value).
        fib_graph = {"n": n, "one": 1, "two": 2, "n_minus_one": (minus, "n", "one"), "n_minus_two": (minus, "n", "two")}
        res = clnt.get(fib_graph, keys=["n_minus_one", "n_minus_two"])
        n_minus_one = res.get("n_minus_one")
        n_minus_two = res.get("n_minus_two")
        a = clnt.submit(fibonacci, clnt, n_minus_one)
        b = clnt.submit(fibonacci, clnt, n_minus_two)
        return a.result() + b.result()


def main():
    # For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
    cluster = SchedulerClusterCombo(n_workers=10)

    with Client(address=cluster.get_address()) as client:
        result = client.submit(fibonacci, client, 8).result()
        print(result)  # 21

    cluster.shutdown()


if __name__ == "__main__":
    main()

Client Disconnect and Shutdown

By default, the Scheduler runs in protected mode. For more information, see the protected section.

If the Scheduler is not in protected mode, the Client can shutdown the Cluster by calling shutdown().

"""This example shows how to clear resources owned by a client and how to disconnect a client from the scheduler."""

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def main():
    cluster = SchedulerClusterCombo(n_workers=10)
    client = Client(address=cluster.get_address())
    # Client.clear() will clear all computation resources owned by the client. All unfinished tasks will be cancelled,
    # and all object reference will be invalidated. The client can submit tasks as it wishes.
    client.clear()

    # Once disconnect is called, this client is invalidated, and no tasks can be installed on this client.
    # Should the user wish to initiate more tasks, they should instantiate another Client. The scheduler's running
    # state will not be affected by this method.
    client.disconnect()

    # The user may also choose to shutdown the scheduler while disconnecting client from the scheduler. Such a request
    # is not guaranteed to succeed, as the scheduler can only be closed when it is not running under "protected" mode.
    # client.shutdown()

    cluster.shutdown()


if __name__ == "__main__":
    main()

Custom Serialization

Scaler uses cloudpickle by default for serialization. You can use a custom serializer by passing it to the Client.

The serializer API has only two methods: serialize and deserialize, and these are responsible for serializing and deserializing functions, function arguments, and function results.

Note

All libraries used for serialization must be installed on workers.

serialize(obj: Any) bytes
Parameters:

obj – the object to be serialized, can be function object, argument object, or function result object

Returns:

serialized bytes of the object

Serialize the object to bytes. This serialization method is called for the function object each argument, and function’s result, for example:

def add(a, b):
    return a + b

client.submit(add, 1, 2)

serialize will be called four times:

  • Once for the add function

  • Once for the argument 1

  • Once for 2

  • Once for the result of the task

The client will then deserialize the result.

deserialize(payload: bytes) Any
Parameters:

payload – the serialized bytes of the object, can be function object, argument object, or function result object

Returns:

any deserialized object

Deserialize the bytes into the original object, this deserialize method is used to deserialize the function and each argument as received by the workers, and the result of the task as received by the client.

Below is an example implementation of a custom serializer that uses a different serialization/deserialization method for different types of objects. It uses a simple tagging system to indicate the type of object being serialized/deserialized.

  • Dataframes are serialized into the parquet format

  • Integers are serialized as 4-byte integers

  • All other objects are serialized using cloudpickle

import enum
import pickle
import struct
from io import BytesIO
from typing import Any

import pandas as pd
from cloudpickle import cloudpickle

from scaler import Serializer


class ObjType(enum.Enum):
    General = b"G"
    Integer = b"I"
    DataFrame = b"D"


class CustomSerializer(Serializer):
    @staticmethod
    def serialize(obj: Any) -> bytes:
        if isinstance(obj, pd.DataFrame):
            buf = BytesIO()
            obj.to_parquet(buf)
            return ObjType.DataFrame.value + buf.getvalue()

        if isinstance(obj, int):
            return ObjType.Integer.value + struct.pack("I", obj)

        return ObjType.General.value + cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)

    @staticmethod
    def deserialize(payload: bytes) -> Any:
        obj_type = ObjType(payload[0])
        payload = payload[1:]

        if obj_type == ObjType.DataFrame:
            buf = BytesIO(payload)
            return pd.read_parquet(buf)

        if obj_type == ObjType.Integer:
            return struct.unpack("I", payload)[0]

        return cloudpickle.loads(payload)