Additional Features
Scaler comes with a number of additional features that can be used to monitor and profile tasks, and customize behavior.
Scaler Top (Monitoring)
Top is a monitoring tool that allows you to see the status of the Scaler. The scheduler prints an address to the logs on startup that can be used to connect to it with the scaler_top CLI command:
scaler_top ipc:///tmp/0.0.0.0_8516_monitor
Which will show an interface similar to the standard Linux top command:
scheduler | task_manager | scheduler_sent | scheduler_received
cpu 0.0% | unassigned 0 | HeartbeatEcho 283,701 | Heartbeat 283,701
rss 130.1m | running 0 | ObjectResponse 233 | ObjectRequest 215
| success 53,704 | TaskEcho 53,780 | Task 53,764
| failed 14 | Task 54,660 | TaskResult 53,794
| canceled 48 | TaskResult 53,766 | DisconnectRequest 21
| not_found 14 | ObjectRequest 366 | TaskCancel 60
| DisconnectResponse 21 | BalanceResponse 15
| TaskCancel 62 | GraphTask 6
| BalanceRequest 15 |
| GraphTaskResult 6 |
-------------------------------------------------------------------------------------------------
Shortcuts: worker[n] agt_cpu[C] agt_rss[M] cpu[c] rss[m] free[f] sent[w] queued[d] lag[l]
Total 7 worker(s)
worker agt_cpu agt_rss [cpu] rss free sent queued lag ITL | client_manager
2732890|sd-1e7d-dfba|d26+ 0.5% 111.8m 0.5% 113.3m 1000 0 0 0.7ms 100 |
2732885|sd-1e7d-dfba|56b+ 0.0% 111.0m 0.5% 111.2m 1000 0 0 0.7ms 100 | func_to_num_tasks
2732888|sd-1e7d-dfba|108+ 0.0% 111.7m 0.5% 111.0m 1000 0 0 0.6ms 100 |
2732891|sd-1e7d-dfba|149+ 0.0% 113.0m 0.0% 112.2m 1000 0 0 0.9ms 100 |
2732889|sd-1e7d-dfba|211+ 0.5% 111.7m 0.0% 111.2m 1000 0 0 1ms 100 |
2732887|sd-1e7d-dfba|e48+ 0.5% 112.6m 0.0% 111.0m 1000 0 0 0.9ms 100 |
2732886|sd-1e7d-dfba|345+ 0.0% 111.5m 0.0% 112.8m 1000 0 0 0.8ms 100 |
scheduler section shows the scheduler’s resource usage
task_manager section shows the status of tasks
scheduler_sent section counts the number of each type of message sent by the scheduler
scheduler_received section counts the number of each type of message received by the scheduler
worker section shows worker details, you can use shortcuts to sort by columns, and the * in the column header shows which column is being used for sorting
agt_cpu/agt_rss means cpu/memory usage of the worker agent
cpu/rss means cpu/memory usage of the worker
free means number of free task slots for the worker
sent means how many tasks scheduler sent to the worker
queued means how many tasks worker received and enqueued
lag means the latency between scheduler and the worker
ITL means is debug information
I means processor initialized
T means have a task or not
L means task lock
Task Profiling
To get the execution time of a task, submit it with profiling turned on.
We need to call fut.result()
to ensure that the task has been completed.
from scaler import Client
def calculate(sec: int):
return sec * 1
client = Client(address="tcp://127.0.0.1:2345")
fut = client.submit(calculate, 1, profiling=True)
# this will execute the task
fut.result()
# contains task run duration time in microseconds
fut.profiling_info().duration_us
# contains the peak memory usage in bytes for the task, this memory peak is sampled every second
fut.profiling_info().peak_memory
Send Object
Scaler can send objects to the workers. This is useful for sending large objects that are needed for the tasks, and are reused frequently. This allows you to avoid the overhead of sending the object multiple times.
The object is sent to the workers only once
The Scaler API will return a special reference to the object
Workers can use this reference to access the object, but this reference must be provided as a positional argument * The reference cannot be nested in another reference or inside a list, etc.
"""
This example demonstrates how to use the Client.send_object() method.
This method is used to submit large objects to the cluster.
Users can then reuse this object without needing to retransmit it multiple times.
"""
from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo
large_object = [1, 2, 3, 4, 5]
def query(object_reference, idx):
return object_reference[idx]
def main():
# For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
cluster = SchedulerClusterCombo(n_workers=1)
with Client(address=cluster.get_address()) as client:
# Send the "large" to the cluster for reuse. Providing a name for the object is optional.
# This method returns a reference to the object that we can use in place of the original object.
large_object_ref = client.send_object(large_object, name="large_object")
# Reuse through object reference
# Note that this example is not very interesting, since query is essentially a cheap operation that should be
# done in local end. We chose this operation since it demonstrates that operation and operator defined on the
# original type (list) can be applied to the reference.
fut1 = client.submit(query, large_object_ref, 0)
fut2 = client.submit(query, large_object_ref, 1)
# Get the result from the future.
print(fut1.result())
print(fut2.result())
cluster.shutdown()
if __name__ == "__main__":
main()
Graph Submission
Some tasks are complex and depend on the output of other tasks. Scaler supports submitting tasks as a DAG graph and will handle executing the dependencies in the correct order and communicating between tasks.
"""This example shows how to utilize graph task functionality provided by scaler."""
from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo
def inc(i):
return i + 1
def add(a, b):
return a + b
def minus(a, b):
return a - b
# A graph task is defined as a dict with str as the key type and val_t as the value type, where val_t is defined as
# follows:
# Union[Any, Tuple[Union[Callable, str], ...]
# Each value can be one of the following:
# - a basic data type (int, List, etc.),
# - a callable,
# - a tuple of the form (Callable, key_t val1, key_t val2, ...)
# that represents a function call.
graph = {
"a": 2,
"b": 2,
"c": (inc, "a"), # c = a + 1 = 2 + 1 = 3
"d": (add, "a", "b"), # d = a + b = 2 + 2 = 4
"e": (minus, "d", "c"), # e = d - c = 4 - 3 = 1
"f": add,
}
def main():
# For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
cluster = SchedulerClusterCombo(n_workers=1)
with Client(address=cluster.get_address()) as client:
# See graph's definition for more detail.
# The result is a dictionary that contains the requested keys.
# Each value provided in the graph will be evaluated and passed back.
result = client.get(graph, keys=["a", "b", "c", "d", "e", "f"])
print(result.get("e"))
print(result) # {'a': 2, 'b': 2, 'c': 3, 'd': 4, 'e': 1, 'f': <function add at 0x70af1e29b4c0>}
cluster.shutdown()
if __name__ == "__main__":
main()
Nested Tasks
Tasks can depend on other tasks’ result without using graph.
"""This example shows how to created nested tasks. Please see graphtask_nested_client.py for more information."""
from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo
# Calculate fibonacci sequence with nested client.
# Each intermediate call in the recursive process is submitted to the client.
def fibonacci(client: Client, n: int):
if n == 0:
return 0
elif n == 1:
return 1
else:
a = client.submit(fibonacci, client, n - 1)
b = client.submit(fibonacci, client, n - 2)
return a.result() + b.result()
def main():
# For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
cluster = SchedulerClusterCombo(n_workers=1)
with Client(address=cluster.get_address()) as client:
result = client.submit(fibonacci, client, 8).result()
print(result) # 21
cluster.shutdown()
if __name__ == "__main__":
main()
Dynamically Building Graph Within a Task
When the execution graph is undetermined until runtime, one may build graph dynamically on remote end.
"""This example shows how to build graph dynamically in the remote side"""
from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo
def minus(a, b):
return a - b
def fibonacci(clnt: Client, n: int):
if n == 0:
return 0
elif n == 1:
return 1
else:
# Dynamically building graph in the worker side is okay.
# BE WARNED! You are not suppose to use it like that. This is to demonstrate the ability instead of intention
# of what graph can do. This should rarely be done. Redesign if you find yourself in this position. With the
# ability to dynamically build a graph, one can even concatenate the source graph to its child (as long as they
# evaluate to a value).
fib_graph = {"n": n, "one": 1, "two": 2, "n_minus_one": (minus, "n", "one"), "n_minus_two": (minus, "n", "two")}
res = clnt.get(fib_graph, keys=["n_minus_one", "n_minus_two"])
n_minus_one = res.get("n_minus_one")
n_minus_two = res.get("n_minus_two")
a = clnt.submit(fibonacci, clnt, n_minus_one)
b = clnt.submit(fibonacci, clnt, n_minus_two)
return a.result() + b.result()
def main():
# For an explanation on how SchedulerClusterCombo and Client work, please see simple_client.py
cluster = SchedulerClusterCombo(n_workers=10)
with Client(address=cluster.get_address()) as client:
result = client.submit(fibonacci, client, 8).result()
print(result) # 21
cluster.shutdown()
if __name__ == "__main__":
main()
Client Disconnect and Shutdown
By default, the Scheduler runs in protected mode. For more information, see the protected section.
If the Scheduler is not in protected mode, the Client can shutdown the Cluster by calling shutdown()
.
"""This example shows how to clear resources owned by a client and how to disconnect a client from the scheduler."""
from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo
def main():
cluster = SchedulerClusterCombo(n_workers=10)
client = Client(address=cluster.get_address())
# Client.clear() will clear all computation resources owned by the client. All unfinished tasks will be cancelled,
# and all object reference will be invalidated. The client can submit tasks as it wishes.
client.clear()
# Once disconnect is called, this client is invalidated, and no tasks can be installed on this client.
# Should the user wish to initiate more tasks, they should instantiate another Client. The scheduler's running
# state will not be affected by this method.
client.disconnect()
# The user may also choose to shutdown the scheduler while disconnecting client from the scheduler. Such a request
# is not guaranteed to succeed, as the scheduler can only be closed when it is not running under "protected" mode.
# client.shutdown()
cluster.shutdown()
if __name__ == "__main__":
main()
Custom Serialization
Scaler uses cloudpickle
by default for serialization. You can use a custom serializer by passing it to the Client.
The serializer API has only two methods: serialize
and deserialize
, and these are responsible for serializing and deserializing functions, function arguments, and function results.
Note
All libraries used for serialization must be installed on workers.
- serialize(obj: Any) bytes
- Parameters:
obj – the object to be serialized, can be function object, argument object, or function result object
- Returns:
serialized bytes of the object
Serialize the object to bytes. This serialization method is called for the function object each argument, and function’s result, for example:
def add(a, b):
return a + b
client.submit(add, 1, 2)
serialize
will be called four times:
Once for the
add
functionOnce for the argument
1
Once for
2
Once for the result of the task
The client will then deserialize
the result.
- deserialize(payload: bytes) Any
- Parameters:
payload – the serialized bytes of the object, can be function object, argument object, or function result object
- Returns:
any deserialized object
Deserialize the bytes into the original object, this deserialize method is used to deserialize the function and each argument as received by the workers, and the result of the task as received by the client.
Below is an example implementation of a custom serializer that uses a different serialization/deserialization method for different types of objects. It uses a simple tagging system to indicate the type of object being serialized/deserialized.
Dataframes are serialized into the parquet format
Integers are serialized as 4-byte integers
All other objects are serialized using cloudpickle
import enum
import pickle
import struct
from io import BytesIO
from typing import Any
import pandas as pd
from cloudpickle import cloudpickle
from scaler import Serializer
class ObjType(enum.Enum):
General = b"G"
Integer = b"I"
DataFrame = b"D"
class CustomSerializer(Serializer):
@staticmethod
def serialize(obj: Any) -> bytes:
if isinstance(obj, pd.DataFrame):
buf = BytesIO()
obj.to_parquet(buf)
return ObjType.DataFrame.value + buf.getvalue()
if isinstance(obj, int):
return ObjType.Integer.value + struct.pack("I", obj)
return ObjType.General.value + cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
@staticmethod
def deserialize(payload: bytes) -> Any:
obj_type = ObjType(payload[0])
payload = payload[1:]
if obj_type == ObjType.DataFrame:
buf = BytesIO(payload)
return pd.read_parquet(buf)
if obj_type == ObjType.Integer:
return struct.unpack("I", payload)[0]
return cloudpickle.loads(payload)