Quick start

When to use it

Parfun works well with computations that are CPU-intensive and can be easily divided into independent sub-tasks.

../_images/parallel_scatter_gather.png

Here are a few examples of computations that can be easily parallelized:

  • ✔ Filtering tasks (e.g. pandas.Dataframe.filter())

  • ✔ Operations that are associative or independent (e.g. matrix addition, sum …)

  • ✔ By-row data processing tasks (e.g. cleaning or normalizing input data).

Other tasks cannot easily be parallelized:

  • ✖ Computations on non-partitionable datasets (e.g. median computation, sorting)

  • ✖ I/O intensive tasks (file loading, network communications)

  • ✖ Very short (< 100ms) tasks.

    These tasks are too small for the parallelism gains to exceed the system overhead caused by our parallelization system. Always keep in mind that parallelization is a trade-off between CPU speed and the time needed to transfer data between processes.

Setup and backend selection

First, add the parfun package to your requirements.txt file, or install it using PIP:

pip install parfun

The above command will only install the base package. If you wish to use a more advanced computing backend, such as Scaler or Dask, or to enable Pandas’ support, use the scaler, dask and/or pandas extras:

pip install "parfun[dask,scaler,pandas]"

The library relies on a registered computing backend to schedule and distribute sub-tasks among multiple worker processes.

Before using the library, the user should select the backend instance. This can either be done process wise with set_parallel_backend() or temporarily using a Python context manager with set_parallel_backend_context().

import parfun as pf


if __name__ == "__main__":
    # Set the parallel backend process-wise.
    pf.set_parallel_backend("local_multiprocessing")

    # Set the parallel backend with a Python context.
    with pf.set_parallel_backend_context("scaler_remote", scheduler_address="tcp://scaler.cluster:1243"):
        ...  # Will run the parallel tasks over a remotely setup Scaler cluster.

See set_parallel_backend() for a description of the available backend options.

Your first parallel function

The following example does basic statistical analysis on a stock portfolio. By using Parfun, these metrics are calculated in parallel, splitting the computation by country.

from typing import List

import pandas as pd

import parfun as pf


@pf.parallel(
    split=pf.per_argument(portfolio=pf.dataframe.by_group(by="country")),
    combine_with=pf.dataframe.concat,
)
def relative_metrics(portfolio: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
    """
    Computes relative metrics (difference to mean, median ...) of a dataframe, for each of the requested dataframe's
    values, grouped by country.
    """

    output = portfolio.copy()

    for country in output["country"].unique():
        for column in columns:
            values = output.loc[output["country"] == country, column]

            mean = values.mean()
            std = values.std()

            output.loc[output["country"] == country, f"{column}_diff_to_mean"] = values - mean
            output.loc[output["country"] == country, f"{column}_sq_diff_to_mean"] = (values - mean) ** 2
            output.loc[output["country"] == country, f"{column}_relative_to_mean"] = (values - mean) / std

    return output


if __name__ == "__main__":
    portfolio = pd.DataFrame({
        "company": ["Apple", "ASML", "Volkswagen", "Citigroup", "Tencent"],
        "industry": ["technology", "technology", "manufacturing", "banking", "manufacturing"],
        "country": ["US", "NL", "DE", "US", "CN"],
        "market_cap": [2828000000000, 236000000000, 55550000000, 80310000000, 345000000000],
        "revenue": [397000000000, 27180000000, 312000000000, 79840000000, 79000000000],
        "workforce": [161000, 39850, 650951, 240000, 104503]
    })

    with pf.set_parallel_backend_context("local_multiprocessing"):
        metrics = relative_metrics(portfolio, ["market_cap", "revenue"])

    print(metrics)

In this example, the parallel() decorator is configure to parallelize the execution of relative_metrics() by country, and then to concat the resulting parallel sub-results:

@pf.parallel(
    split=pf.per_argument(portfolio=pf.dataframe.by_group(by="country")),
    combine_with=pf.dataframe.concat,
)
def relative_metrics(portfolio: pd.DataFrame, columns: List[str]) -> pd.DataFrame:

First, we tell the parallel engine how to partition the data with the split parameter. Note that we can safely split the calculation over countries, as the function itself processes these countries independently.

  • We use per_argument() to tell the parallel engine which arguments to partition on. In the example, we will only partition on the portfolio argument, and we don’t touch the columns argument.

  • We use by_group() to specify that the portfolio dataframe can be partitioned over it’s country column.

Finally, the parallel engine needs to know how to combine the results of the partitioned calls to relative_metrics(). This is done with the help of the combine_with parameter. In our example, we just concat the result dataframes with concat().

When executed, the parallel engine with automatically take care of executing the function in parallel, which would schematically look like this:

../_images/parallel_function.png

This parallelization architecture is a well-known pattern named map/reduce or scatter/gather.

Modern computers usually have multiple computing units, or cores. These cores excel when computing data-independent tasks. It’s important to specify a partitioning strategy that leverage this.

Partitioning functions

As seen in the example above, the @parallel decorator accepts a partitioning function (split).

Previously, we applied a single partitioning function (by_group()) on a single argument. However, we could also use per_argument() to apply different partitioning functions on various parameters:

from typing import List

import pandas as pd

import parfun as pf


@pf.parallel(
    split=pf.per_argument(
        factors=pf.py_list.by_chunk,
        dataframe=pf.dataframe.by_row,
    ),
    combine_with=pf.dataframe.concat,
)
def multiply_by_row(factors: List[int], dataframe: pd.DataFrame) -> pd.DataFrame:
    assert len(factors) == len(dataframe)
    return dataframe.multiply(factors, axis=0)


if __name__ == "__main__":
    dataframe = pd.DataFrame({
        "A": [1, 2, 3],
        "B": [4, 5, 6]
    })

    factors = [10, 20, 30]

    with pf.set_parallel_backend_context("local_multiprocessing"):
        result = multiply_by_row(factors, dataframe)

    print(result)
    #     A    B
    # 0  10   40
    # 1  40  100
    # 2  90  180

We are using two partitioning functions, by_chunk() and by_row(). These splits the arguments in equally sized partitions. It’s semantically equivalent to iterating all these partitioned arguments simultaneously:

size = min(len(factors), len(dataframe))
for begin in range(0, size, PARTITION_SIZE):
    end = min(begin + PARTITION_SIZE, size)
    multiply_by_row(factors[begin:end], dataframe.iloc[begin:end])

Alternatively, it might be sometimes desired to run the same partitioning function on all parameters simultaneously with all_arguments():

import pandas as pd

import parfun as pf


@pf.parallel(
    split=pf.all_arguments(pf.dataframe.by_group(by=["year", "month"])),
    combine_with=pf.dataframe.concat,
)
def monthly_sum(sales: pd.DataFrame, costs: pd.DataFrame) -> pd.DataFrame:
    merged = pd.merge(sales, costs, on=["year", "month", "day"], how="outer")
    # Group and sum by day
    grouped = merged.groupby(["year", "month", "day"], as_index=False).sum(numeric_only=True)

    return grouped


if __name__ == "__main__":
    sales = pd.DataFrame({
        "year": [2024, 2024, 2024],
        "month": [1, 1, 2],
        "day": [1, 2, 1],
        "sales": [100, 200, 150]
    })

    costs = pd.DataFrame({
        "year": [2024, 2024, 2024],
        "month": [1, 1, 2],
        "day": [1, 2, 1],
        "costs": [50, 70, 80]
    })

    with pf.set_parallel_backend_context("local_multiprocessing"):
        result = monthly_sum(sales, costs)

    print(result)
    #     year  month  day  sales  costs
    # 0  2024      1    1    100     50
    # 1  2024      1    2    200     70
    # 2  2024      2    1    150     80

Combining functions

In addition to the partitioning function, the @parallel decorator requires a combining function (combine_with).

The library provides useful partitioning and combining functions to deal with Python lists and Pandas dataframes.

Custom partitioning and combining generators

If you wish to implement more complex partitioning schemes, Parfun allows the use of regular Python generators as partitioning and combing functions:

from typing import Generator, Iterable, Tuple

import pandas as pd

import parfun as pf


def partition_by_day_of_week(dataframe: pd.DataFrame) -> Generator[Tuple[pd.DataFrame], None, None]:
    """Divides the computation on the "datetime" value, by day of the week (Monday, Tuesday ...)."""

    for _, partition in dataframe.groupby(dataframe["datetime"].dt.day_of_week):
        yield partition,  # Should always yield a tuple that matches the input parameters.


def combine_results(dataframes: Iterable[pd.DataFrame]) -> pd.DataFrame:
    """Collects the results by concatenating them, and make sure the values are kept sorted by date."""
    return pd.concat(dataframes).sort_values(by="datetime")


@pf.parallel(
    split=pf.all_arguments(partition_by_day_of_week),
    combine_with=combine_results,
)
def daily_mean(dataframe: pd.DataFrame) -> pd.DataFrame:
    return dataframe.groupby(dataframe["datetime"].dt.date).mean(numeric_only=True)


if __name__ == "__main__":
    dataframe = pd.DataFrame({
        # Probing times
        "datetime": pd.to_datetime([
            "2025-04-01 06:00", "2025-04-01 18:00", "2025-04-02 10:00", "2025-04-03 14:00", "2025-04-03 23:00",
            "2025-04-04 08:00", "2025-04-05 12:00", "2025-04-06 07:00", "2025-04-06 20:00", "2025-04-07 09:00",
            "2025-04-08 15:00", "2025-04-09 11:00", "2025-04-10 13:00", "2025-04-11 06:00", "2025-04-12 16:00",
            "2025-04-13 17:00", "2025-04-14 22:00", "2025-04-15 10:00", "2025-04-16 09:00", "2025-04-17 13:00",
            "2025-04-18 14:00", "2025-04-19 18:00", "2025-04-20 07:00", "2025-04-21 20:00", "2025-04-22 15:00",
        ]),
        # Temperature values (°C)
        "temperature": [
            7.2, 10.1, 9.8, 12.5, 11.7,
            8.9, 13.0, 7.5, 10.8, 9.3,
            12.1, 11.5, 13.3, 6.8, 12.7,
            13.5, 9.2, 10.0, 9.9, 11.8,
            12.4, 10.6, 7.9, 9.5, 11.6,
        ],
        # Humidity values (%)
        "humidity": [
            85, 78, 80, 75, 76,
            88, 73, 89, 77, 84,
            72, 74, 70, 90, 71,
            69, 86, 81, 83, 76,
            74, 79, 87, 82, 73,
        ]
    })

    with pf.set_parallel_backend_context("local_multiprocessing"):
        result = daily_mean(dataframe)

    print(result)

To work properly, custom partitioning generators should:

  1. use the yield mechanism, and not return a collection (e.g. a list). Returning a collection instead of using a generator will lead to deteriorated performances and higher memory usage.

  2. accept the parameters to partition, and yield these partitioned parameters as a tuple, in the same order.

When used with per_argument, multiple custom generators can be mixed with pre-defined generators, or with other customer generators.

Partition size estimate

The library tries to automatically determine the optimal size for the parallelly distributed partitions.

Read more about how the library computes the optimal partition size.

You can override how the library choose the partition size to use by either providing either the initial_partition_size: int or fixed_partition_size: int parameter:

import numpy as np
import pandas as pd

import parfun as pf


# With `fixed_partition_size`, the input dataframe will always be split in chunks of 1000 rows.
@pf.parallel(
    split=pf.all_arguments(pf.dataframe.by_row),
    combine_with=sum,
    fixed_partition_size=1000,
)
def fixed_partition_size_sum(dataframe: pd.DataFrame) -> float:
    return dataframe.values.sum()


# With `initial_partition_size`, the input dataframe will be split in chunks of 1000 rows until Parfun's
# machine-learning algorithm find a better estimate.
@pf.parallel(
    split=pf.all_arguments(pf.dataframe.by_row),
    combine_with=sum,
    initial_partition_size=1000,
)
def initial_partition_size_sum(dataframe: pd.DataFrame) -> float:
    return dataframe.values.sum()


# Both `fixed_partition_size` and `initial_partition_size` can accept a callable instead of an integer value. This
# allows for partition sizes to be computed based on the input parameters.
@pf.parallel(
    split=pf.all_arguments(pf.dataframe.by_row),
    combine_with=sum,
    initial_partition_size=lambda dataframe: max(10, len(dataframe) // 4),
)
def computed_partition_size_sum(dataframe: pd.DataFrame) -> float:
    return dataframe.values.sum()


if __name__ == "__main__":
    dataframe = pd.DataFrame(
        np.random.randint(0, 100, size=(100, 3)),
        columns=["alpha", "beta", "gamma"],
    )

    with pf.set_parallel_backend_context("local_multiprocessing"):
        print(fixed_partition_size_sum(dataframe))

        print(initial_partition_size_sum(dataframe))

        print(computed_partition_size_sum(dataframe))

Note

The partition size estimation is disabled for custom partition generators.

Nested parallel function calls

Parfun functions can be safely called from other Parfun functions.

Note

Currently, Scaler is the only backend that can run the inner functions in parallel. Other backends will execute the inner functions sequentially, as regular Python functions.

import pprint
import random
from typing import List

import parfun as pf


@pf.parallel(
    split=pf.all_arguments(pf.py_list.by_chunk),
    combine_with=pf.py_list.concat,
)
def add_vectors(vec_a: List, vec_b: List) -> List:
    """Add two vectors, element-wise."""
    return [a + b for a, b in zip(vec_a, vec_b)]


@pf.parallel(
    split=pf.all_arguments(pf.py_list.by_chunk),
    combine_with=pf.py_list.concat,
)
def add_matrices(mat_a: List[List], mat_b: List[List]) -> List[List]:
    """Add two matrices, row by row."""
    return [add_vectors(vec_a, vec_b) for vec_a, vec_b in zip(mat_a, mat_b)]


if __name__ == "__main__":
    N_ROWS, N_COLS = 10, 10

    mat_a = [[random.randint(0, 99) for _ in range(0, N_COLS)] for _ in range(0, N_ROWS)]
    mat_b = [[random.randint(0, 99) for _ in range(0, N_COLS)] for _ in range(0, N_ROWS)]

    print("A =")
    pprint.pprint(mat_a)

    print("B =")
    pprint.pprint(mat_b)

    with pf.set_parallel_backend_context("local_multiprocessing"):
        result = add_matrices(mat_a, mat_b)

    print("A + B =")
    pprint.pprint(result)

Profiling

The easiest way to profile the speedup provided by a parallel function is to either use Python’s timeit module, or the IPython/Jupyter %timeit command.

In addition, the decorator accepts a profile: bool and a trace_export: Optional[str] parameter that can be used get profiling metrics about the execution of a parallel function.

from typing import List
import random

import parfun as pf


@pf.parallel(
    split=pf.all_arguments(pf.py_list.by_chunk),
    combine_with=sum,
    profile=True,
    trace_export="parallel_sum_trace.csv",
)
def parallel_sum(values: List) -> List:
    return sum(values)


if __name__ == "__main__":
    N_VALUES = 100_000
    values = [random.randint(0, 99) for _ in range(0, N_VALUES)]

    with pf.set_parallel_backend_context("local_multiprocessing"):
        print("Sum =", parallel_sum(values))

Setting profile to True returns a performance summarizing board:

parallel_sum()
     total CPU execution time: 0:00:00.372508.
     compute time: 0:00:00.347168 (93.20%)
             min.: 0:00:00.001521
             max.: 0:00:00.006217
             avg.: 0:00:00.001973
     total parallel overhead: 0:00:00.025340 (6.80%)
             total partitioning: 0:00:00.024997 (6.71%)
             average partitioning: 0:00:00.000142
             total combining: 0:00:00.000343 (0.09%)
     maximum speedup (theoretical): 14.70x
     total partition count: 176
             estimator state: running
             estimated partition size: 3127
  • total CPU execution time is the actual CPU time required to execute the parallel function.

    This duration is larger than the value returned by %timeit. That is because it sums the execution times for all the cores that processed our function. It also include the additional processing required to run Parfun (e.g. for partitioning the input and combining the results).

  • compute time is how much CPU time was spent doing the actual computation.

    This value should roughly match the duration of the original sequential function if measured with %timeit. The min, max and avg values tell us that there is some discrepancy in the partitioned execution of our function, most probably caused by the possibly uneven workload in the partitioned dataset.

  • total parallel overhead, total partitioning and total combining are the overheads related to the parallel execution of the function.

    These overheads limit the performance gains achievable through parallelization.

  • maximum speedup (theoretical) estimates how much faster the function would run on a parallel machine with an infinite number of cores.

    This value is calculated based on the input data size, function behavior, and the measured parallel overheads.

  • total partition count and current estimated partition size describes how Parfun is splitting the input data.

    The library uses heuristics to estimate the optimal partition size. The library tries to find a partition size that provides significant parallel speedup without causing too much parallel overhead. Read more about how the optimal partition size is estimated.

Note

As the library is constantly learning the optimal partition size, the first call to the parallelized function might not produce the most optimal run-times. In these cases, it is recommended to call the function multiple times before analyzing the profiler output.

When setting the trace_export parameter, Parfun will dump the latest parallel function call metrics to the provided CSV file. All durations in this file are in nanoseconds (10E–9).