API

parfun.parallel(split: ~typing.Callable[[~parfun.kernel.function_signature.NamedArguments], ~typing.Tuple[~parfun.kernel.function_signature.NamedArguments, ~typing.Generator[~parfun.kernel.function_signature.NamedArguments, None, None] | ~typing.Generator[~typing.Tuple[int, ~parfun.kernel.function_signature.NamedArguments] | None, int, None]]], combine_with: ~typing.Callable[[~typing.Iterable[~typing.Any]], ~typing.Any], initial_partition_size: int | ~typing.Callable[[~typing.Any], int] | None = None, fixed_partition_size: int | ~typing.Callable[[~typing.Any], int] | None = None, profile: bool = False, trace_export: str | None = None, partition_size_estimator_factory: ~typing.Callable[[], ~parfun.partition_size_estimator.mixins.PartitionSizeEstimator] = <class 'parfun.partition_size_estimator.linear_regression_estimator.LinearRegessionEstimator'>) Callable

Returns a function decorator that automatically parallelizes a function.

@pf.parallel(
    split=pf.per_argument(
        values=pf.py_list.by_chunk,
    ),
    combine_with=pf.py_list.concat
)
def multiply_by_constant(values: Iterable[int], constant: int):
    return [v * constant for v in values]

# This would be functionally equivalent to running the function inside a single for loop:

results = []
for partition in pf.py_list.by_chunk(values):
    results.append(multiply_by_constant(partition, constant))

return combine_with(results)
Parameters:
  • split

    Partition the data based on the provided partitioning function.

    See api for the list of predefined partitioning functions.

  • combine_with (Callable) – aggregates the results by running the function.

  • initial_partition_size (int | Callable[[PartitionType], int] | None) –

    Overrides the first estimate from the partition size estimator.

    If the value is a callable, the function will be provided with the input to be partitioned and shall return the initial partition size to use.

  • fixed_partition_size (int | Callable[[PartitionType], int] | None) –

    Uses a constant partition size and do not run the partition size estimator.

    If the value is a callable, the function will be provided with the input to be partitioned and shall return the partition size to use.

  • profile (bool) – if true, prints additional debugging information about the parallelization overhead.

  • trace_export (str) – if defined, will export the execution time to the provided CSV file’s path.

  • partition_size_estimator_factory (Callable[[], PartitionSizeEstimator]) – the partition size estimator class to use

Returns:

a decorated function

Return type:

Callable

Backend setup

parfun.set_parallel_backend(backend: str | BackendEngine, *args, **kwargs) None

Initializes and sets the current parfun backend.

set_parallel_backend("local_multiprocessing", max_workers=4, is_process=False)
Parameters:
  • backend (Union[str, BackendEngine]) –

    Supported backend options:

    • "none": disable the current parallel backend.

      Functions decorated with parfun() will run sequentially as if not decorated.

      Partitioning and combining functions will be ignored.

    • "local_single_process": runs the tasks inside the calling Python process.

      Functions decorated with parfun() will partition the input data, and run the combining function on the output data, but will also execute the function inside the calling Python process.

      Mostly intended for debugging purposes.

      See LocalSingleProcessBackend.

    • "local_multiprocessing": runs the tasks in parallel using Python multiprocessing processes.

      See LocalMultiprocessingBackend.

    • "scaler_local": runs the tasks in parallel using an internally managed Scaler cluster.

      See ScalerLocalBackend.

    • "scaler_remote": runs the tasks in parallel using an externally managed Dask cluster.

      See ScalerRemoteBackend.

    • "dask_local": runs the tasks in parallel using an internally managed Dask cluster.

      See DaskLocalClusterBackend.

    • "dask_remote": runs the tasks in parallel using an externally managed Dask cluster.

      See DaskRemoteClusterBackend.

    • "dask_current": runs the tasks in parallel using the currently running Dask client (get_client()).

      See DaskCurrentBackend.

  • args – Additional positional parameters for the backend constructor

  • kwargs – Additional keyword parameters for the backend constructor.

Return type:

None

parfun.set_parallel_backend_context(backend: str | BackendEngine, *args, **kwargs)

Sets a new parallel backend instance in a contextlib’s context.

with set_parallel_backend_context("local_single_processing"):
    some_parallel_computation()
Parameters:

backend (Union[str, BackendEngine]) – See set_parallel_backend().

parfun.get_parallel_backend() BackendEngine | None
Returns:

the current backend instance, or None if no backend is currently set.

Return type:

Optional[BackendEngine]

Backend instances

class parfun.backend.mixins.BackendEngine

Asynchronous task manager interface.

abstract allows_nested_tasks() bool

Indicates if Parfun can submit new tasks from other tasks.

abstract session() BackendSession

Returns a new managed session for submitting tasks.

with backend.session() as session:
    arg_ref = session.preload_value(arg)

    future = session.submit(fn, arg_ref)

    print(future.result())
abstract shutdown()

Shutdowns all resources required by the backend engine.

class parfun.backend.mixins.BackendSession

A task submitting session to a backend engine that manages the lifecycle of the task objects (preloaded values, argument values and future objects).

preload_value(value: Any) Any

Preloads a value to the backend engine.

The returned value will be used when calling submit() instead of the original value.

abstract submit(fn: Callable, *args, **kwargs) ProfiledFuture

Executes an asynchronous computation.

Blocking if no computing resource is available.

class parfun.backend.local_single_process.LocalSingleProcessBackend
allows_nested_tasks() bool

Indicates if Parfun can submit new tasks from other tasks.

session() BackendSession

Returns a new managed session for submitting tasks.

with backend.session() as session:
    arg_ref = session.preload_value(arg)

    future = session.submit(fn, arg_ref)

    print(future.result())
shutdown()

Shutdowns all resources required by the backend engine.

class parfun.backend.local_single_process.LocalSingleProcessSession
submit(fn: Callable, *args, **kwargs) ProfiledFuture

Executes an asynchronous computation.

Blocking if no computing resource is available.

class parfun.backend.local_multiprocessing.LocalMultiprocessingBackend(max_workers: int = 1, is_process: bool = True, **kwargs)

Concurrent engine that uses Python builtin multiprocessing module.

allows_nested_tasks() bool

Indicates if Parfun can submit new tasks from other tasks.

session() LocalMultiprocessingSession

Returns a new managed session for submitting tasks.

with backend.session() as session:
    arg_ref = session.preload_value(arg)

    future = session.submit(fn, arg_ref)

    print(future.result())
shutdown(wait=True)

Shutdowns all resources required by the backend engine.

class parfun.backend.local_multiprocessing.LocalMultiprocessingSession(underlying_executor: Executor)
submit(fn, *args, **kwargs) ProfiledFuture

Executes an asynchronous computation.

Blocking if no computing resource is available.

class parfun.backend.dask.DaskBaseBackend(n_workers: int)
allows_nested_tasks() bool

Indicates if Parfun can submit new tasks from other tasks.

session() DaskSession

Returns a new managed session for submitting tasks.

with backend.session() as session:
    arg_ref = session.preload_value(arg)

    future = session.submit(fn, arg_ref)

    print(future.result())
class parfun.backend.dask.DaskCurrentBackend(n_workers: int)

Uses the current Dask worker context to deduce the backend instance.

This backend should be used by Dask’s worker tasks that desire to access the underlying backend instance.

shutdown()

Shutdowns all resources required by the backend engine.

class parfun.backend.dask.DaskLocalClusterBackend(n_workers: int = 1, dashboard_address=':33333', memory_limit='100GB')

Creates a Dask cluster on the local machine and uses it as a backend engine.

shutdown()

Shutdowns all resources required by the backend engine.

class parfun.backend.dask.DaskRemoteClusterBackend(scheduler_address: str)

Connects to a previously instantiated Dask instance as a backend engine.

shutdown()

Shutdowns all resources required by the backend engine.

class parfun.backend.dask.DaskSession(engine: DaskBaseBackend, n_workers: int)
submit(fn, *args, **kwargs) ProfiledFuture | None

Executes an asynchronous computation.

Blocking if no computing resource is available.

class parfun.backend.scaler.ScalerLocalBackend(scheduler_address: str | None = None, n_workers: int = 1, per_worker_queue_size: int = 1000, allows_nested_tasks: bool = True, logging_paths: Tuple[str, ...] = ('/dev/stdout',), logging_level: str = 'INFO', logging_config_file: str | None = None, **kwargs)

Creates a Scaler cluster on the local machine and uses it as a backend engine.

shutdown()

Shutdowns all resources required by the backend engine.

class parfun.backend.scaler.ScalerRemoteBackend(scheduler_address: str, n_workers: int = 1, allows_nested_tasks: bool = True, **client_kwargs)

Connects to a previously instantiated Scaler instance as a backend engine.

allows_nested_tasks() bool

Indicates if Parfun can submit new tasks from other tasks.

session() ScalerSession

Returns a new managed session for submitting tasks.

with backend.session() as session:
    arg_ref = session.preload_value(arg)

    future = session.submit(fn, arg_ref)

    print(future.result())
shutdown()

Shutdowns all resources required by the backend engine.

class parfun.backend.scaler.ScalerSession(scheduler_address: str, n_workers: int, client_kwargs: Dict)
preload_value(value: Any) ObjectReference

Preloads a value to the backend engine.

The returned value will be used when calling submit() instead of the original value.

submit(fn, *args, **kwargs) ProfiledFuture | None

Executes an asynchronous computation.

Blocking if no computing resource is available.

Partitioning

parfun.partition.object.PartitionGenerator

All partitioning functions must return a Python generator of this type.

There are two ways of writing a partitioning functions:

  • Use regular Python generators (prefered) or iterators, returning partitioned values:

def partition_list_by_chunks(values: List): PartitionGenerator[List]:
    PARTITION_SIZE = len(values) / 100

    for begin in range(0, len(values), PARTITION_SIZE)):
        yield values[begin:begin + PARTITION_SIZE]
  • Use partition size aware Python generators, or smart generators. These are more complex but more efficient. Partition size aware generators must get a suggested partition size through the return value of the yield statement, and yield partition sizes with its partitioned values:

def partition_list_by_chunks(values: List, constant: int) -> PartitionGenerator[Tuple[List, int]]:
    # A first empty call to `yield` is required to obtain the first requested partition size
    requested_partition_size = yield None

    begin = 0
    while begin < len(values):
        end = min(len(values), begin + requested_partition_size)

        partition_size = end - begin
        partition = (values[begin:end], a)

        # Yield the actual partition along its size, and obtain the requested size for the next partition.
        requested_partition_size = yield partition_size, partition

        begin = end

alias of Union[Generator[PartitionType, None, None], Generator[Optional[Tuple[int, PartitionType]], int, None]]

parfun.all_arguments(partition_with: Callable[[...], Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None]]) Callable[[NamedArguments], Tuple[NamedArguments, Generator[NamedArguments, None, None] | Generator[Tuple[int, NamedArguments] | None, int, None]]]

Applies a single partitioning function to all arguments.

@pf.parallel(
    split=pf.all_arguments(pf.dataframe.by_group(by=["year", "month"]),
    ...
)
def func(df_1: pd.DataFrame, df_2: pd.DataFrame):
    ...
parfun.multiple_arguments(partition_on: Tuple[str, ...] | str, partition_with: Callable[[...], Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None]]) Callable[[NamedArguments], Tuple[NamedArguments, Generator[NamedArguments, None, None] | Generator[Tuple[int, NamedArguments] | None, int, None]]]

Applies a single partitioning function to multiple, but not all, arguments.

@pf.parallel(
    split=pf.multiple_arguments(
        ("df_1", "df_2"),
        pf.dataframe.by_group(by=["year", "month"]),
    ),
    ...
)
def func(df_1: pd.DataFrame, df_2: pd.DataFrame, constant: int):
    ...
parfun.per_argument(**partition_arg_with: Callable[[...], Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None]]) Callable[[NamedArguments], Tuple[NamedArguments, Generator[NamedArguments, None, None] | Generator[Tuple[int, NamedArguments] | None, int, None]]]

Applies multiple partitioning functions simultaneously on different function arguments.

@pf.parallel(
    split=pf.per_argument(
        df=pf.dataframe.by_row,
        xs=pf.py_list.by_chunk,
    )
)
def func(df: pd.DataFrame, xs: List, constant: int):
    ...
parfun.partition.utility.with_partition_size(generator: Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None], partition_size: int | Callable[[], int] = 1) Generator[PartitionType, None, None]

Runs a partitioning generator without requiring the partition size estimator.

This function uses the provided partition size input to feed the partitioning generator through Python’s generator.send() method, simulating the parallel function’s behavior.

# Runs the `by_row` partitioning function with a random partition size generator.
with_partition_size(
    pf.dataframe.by_row(df_1, df_2),
    partition_size=lambda: random.randint(1, 10)
)
Parameters:
  • partitions_with – the partitioning generator to execute

  • partition_size – a constant partition size, or a function generating partition sizes

Python lists

A collection of pre-define APIs to help users partition and combine collection, such as lists, arrays or tuples.

parfun.py_list.by_chunk(*iterables: Iterable[PartitionType]) Generator[Tuple[Iterable[PartitionType], ...], None, None] | Generator[Tuple[int, Tuple[Iterable[PartitionType], ...]] | None, int, None]

Partition one or multiple iterables by chunks of identical sizes.

ls_1 = [1, 2, 3, 4]
ls_2 = [1, 4, 9, 16]

with_partition_size(by_chunk, ls_1, ls_2, partition_size=2))
# [((1, 2), (1, 4)), ((3, 4), (9, 16))]
parfun.py_list.concat(values: Iterable[List[ListValue]]) List[ListValue]

Chains a collection of lists in a single list.

concat([[1,2], [3], [4, 5]])  # [1, 2, 3, 4, 5]

Pandas dataframes

A collection of pre-define APIs to partition and combine dataframes.

parfun.dataframe.by_group(*args, **kwargs) Callable[[...], Generator[DataFrame, None, None] | Generator[Tuple[int, DataFrame] | None, int, None]]

Partitions one or multiple Pandas dataframes by groups of identical numbers of rows, similar to pandas.DataFrame.groupby().

See pandas.DataFrame.groupby() for function parameters.

df_1 = pd.DataFrame({"country": ["USA", "China", "Belgium"], "capital": ["Washington", "Beijing", "Brussels"]})
df_2 = pd.DataFrame({"country": ["USA", "China", "Belgium"], "iso_code": ["US", "CN", "BE"]})

with_partition_size(df_by_group(by="country")(df_1, df_2), partition_size=1)

# [(   country   capital
#   2  Belgium  Brussels,
#      country iso_code
#   2  Belgium       BE),
#  (  country  capital
#   1   China  Beijing,
#     country iso_code
#   1   China       CN),
#  (  country     capital
#   0     USA  Washington,
#     country iso_code
#   0     USA       US)]
parfun.dataframe.by_row(*dfs: DataFrame) Generator[Tuple[DataFrame, ...], None, None] | Generator[Tuple[int, Tuple[DataFrame, ...]] | None, int, None]

Partitions one or multiple Pandas dataframes by rows.

If multiple dataframes are given, these returned partitions will be of identical number of rows.

df_1 = pd.DataFrame(range(0, 5))
df_2 = df_1 ** 2

with_partition_size(by_row(df_1, df_2), partition_size=2)

#  (   0
#   1  0
#   2  1,
#      0
#   1  0
#   2  1),
#  (   0
#   3  2
#   4  3,
#      0
#   3  4
#   4  9),
#  (   0
#   5  4,
#       0
#   5  16)]
parfun.dataframe.concat(dfs: Iterable[DataFrame]) DataFrame

Similar to pandas.concat().

df_1 = pd.DataFrame([1,2,3])
df_2 = pd.DataFrame([4,5,6])

print(concat([df_1, df_2]))
#    0
# 0  1
# 1  2
# 2  3
# 3  4
# 4  5
# 5  6