Examples

API Usage Examples

Backend Setup

 1"""
 2Shows the two ways of initializing a Parfun backend.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.backend_setup
 8"""
 9
10import parfun as pf
11
12
13if __name__ == "__main__":
14    # Set the parallel backend process-wise.
15    pf.set_parallel_backend("local_multiprocessing")
16
17    # Set the parallel backend with a Python context.
18    with pf.set_parallel_backend_context("scaler_remote", scheduler_address="tcp://scaler.cluster:1243"):
19        ...  # Will run the parallel tasks over a remotely setup Scaler cluster.

Partitioning API

all_arguments

 1"""
 2Uses `all_arguments` to partition all the input data of a parallel function.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.all_arguments
 8"""
 9
10import pandas as pd
11
12import parfun as pf
13
14
15@pf.parallel(
16    split=pf.all_arguments(pf.dataframe.by_group(by=["year", "month"])),
17    combine_with=pf.dataframe.concat,
18)
19def monthly_sum(sales: pd.DataFrame, costs: pd.DataFrame) -> pd.DataFrame:
20    merged = pd.merge(sales, costs, on=["year", "month", "day"], how="outer")
21    # Group and sum by day
22    grouped = merged.groupby(["year", "month", "day"], as_index=False).sum(numeric_only=True)
23
24    return grouped
25
26
27if __name__ == "__main__":
28    sales = pd.DataFrame({
29        "year": [2024, 2024, 2024],
30        "month": [1, 1, 2],
31        "day": [1, 2, 1],
32        "sales": [100, 200, 150]
33    })
34
35    costs = pd.DataFrame({
36        "year": [2024, 2024, 2024],
37        "month": [1, 1, 2],
38        "day": [1, 2, 1],
39        "costs": [50, 70, 80]
40    })
41
42    with pf.set_parallel_backend_context("local_multiprocessing"):
43        result = monthly_sum(sales, costs)
44
45    print(result)
46    #     year  month  day  sales  costs
47    # 0  2024      1    1    100     50
48    # 1  2024      1    2    200     70
49    # 2  2024      2    1    150     80

per_argument

 1"""
 2Uses `per_argument` to partition the input data from multiple arguments.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.per_argument
 8"""
 9
10from typing import List
11
12import pandas as pd
13
14import parfun as pf
15
16
17@pf.parallel(
18    split=pf.per_argument(
19        factors=pf.py_list.by_chunk,
20        dataframe=pf.dataframe.by_row,
21    ),
22    combine_with=pf.dataframe.concat,
23)
24def multiply_by_row(factors: List[int], dataframe: pd.DataFrame) -> pd.DataFrame:
25    assert len(factors) == len(dataframe)
26    return dataframe.multiply(factors, axis=0)
27
28
29if __name__ == "__main__":
30    dataframe = pd.DataFrame({
31        "A": [1, 2, 3],
32        "B": [4, 5, 6]
33    })
34
35    factors = [10, 20, 30]
36
37    with pf.set_parallel_backend_context("local_multiprocessing"):
38        result = multiply_by_row(factors, dataframe)
39
40    print(result)
41    #     A    B
42    # 0  10   40
43    # 1  40  100
44    # 2  90  180

Custom Partition Function

 1"""
 2Shows how to use custom Python generators and functions as partitioning and combining functions.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.custom_generators
 8"""
 9
10from typing import Generator, Iterable, Tuple
11
12import pandas as pd
13
14import parfun as pf
15
16
17def partition_by_day_of_week(dataframe: pd.DataFrame) -> Generator[Tuple[pd.DataFrame], None, None]:
18    """Divides the computation on the "datetime" value, by day of the week (Monday, Tuesday ...)."""
19
20    for _, partition in dataframe.groupby(dataframe["datetime"].dt.day_of_week):
21        yield partition,  # Should always yield a tuple that matches the input parameters.
22
23
24def combine_results(dataframes: Iterable[pd.DataFrame]) -> pd.DataFrame:
25    """Collects the results by concatenating them, and make sure the values are kept sorted by date."""
26    return pd.concat(dataframes).sort_values(by="datetime")
27
28
29@pf.parallel(
30    split=pf.all_arguments(partition_by_day_of_week),
31    combine_with=combine_results,
32)
33def daily_mean(dataframe: pd.DataFrame) -> pd.DataFrame:
34    return dataframe.groupby(dataframe["datetime"].dt.date).mean(numeric_only=True)
35
36
37if __name__ == "__main__":
38    dataframe = pd.DataFrame({
39        # Probing times
40        "datetime": pd.to_datetime([
41            "2025-04-01 06:00", "2025-04-01 18:00", "2025-04-02 10:00", "2025-04-03 14:00", "2025-04-03 23:00",
42            "2025-04-04 08:00", "2025-04-05 12:00", "2025-04-06 07:00", "2025-04-06 20:00", "2025-04-07 09:00",
43            "2025-04-08 15:00", "2025-04-09 11:00", "2025-04-10 13:00", "2025-04-11 06:00", "2025-04-12 16:00",
44            "2025-04-13 17:00", "2025-04-14 22:00", "2025-04-15 10:00", "2025-04-16 09:00", "2025-04-17 13:00",
45            "2025-04-18 14:00", "2025-04-19 18:00", "2025-04-20 07:00", "2025-04-21 20:00", "2025-04-22 15:00",
46        ]),
47        # Temperature values (°C)
48        "temperature": [
49            7.2, 10.1, 9.8, 12.5, 11.7,
50            8.9, 13.0, 7.5, 10.8, 9.3,
51            12.1, 11.5, 13.3, 6.8, 12.7,
52            13.5, 9.2, 10.0, 9.9, 11.8,
53            12.4, 10.6, 7.9, 9.5, 11.6,
54        ],
55        # Humidity values (%)
56        "humidity": [
57            85, 78, 80, 75, 76,
58            88, 73, 89, 77, 84,
59            72, 74, 70, 90, 71,
60            69, 86, 81, 83, 76,
61            74, 79, 87, 82, 73,
62        ]
63    })
64
65    with pf.set_parallel_backend_context("local_multiprocessing"):
66        result = daily_mean(dataframe)
67
68    print(result)

Enforced Partition Size

 1"""
 2Uses `all_arguments` to partition all the input data of a parallel function.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.partition_size
 8"""
 9
10import numpy as np
11import pandas as pd
12
13import parfun as pf
14
15
16# With `fixed_partition_size`, the input dataframe will always be split in chunks of 1000 rows.
17@pf.parallel(
18    split=pf.all_arguments(pf.dataframe.by_row),
19    combine_with=sum,
20    fixed_partition_size=1000,
21)
22def fixed_partition_size_sum(dataframe: pd.DataFrame) -> float:
23    return dataframe.values.sum()
24
25
26# With `initial_partition_size`, the input dataframe will be split in chunks of 1000 rows until Parfun's
27# machine-learning algorithm find a better estimate.
28@pf.parallel(
29    split=pf.all_arguments(pf.dataframe.by_row),
30    combine_with=sum,
31    initial_partition_size=1000,
32)
33def initial_partition_size_sum(dataframe: pd.DataFrame) -> float:
34    return dataframe.values.sum()
35
36
37# Both `fixed_partition_size` and `initial_partition_size` can accept a callable instead of an integer value. This
38# allows for partition sizes to be computed based on the input parameters.
39@pf.parallel(
40    split=pf.all_arguments(pf.dataframe.by_row),
41    combine_with=sum,
42    initial_partition_size=lambda dataframe: max(10, len(dataframe) // 4),
43)
44def computed_partition_size_sum(dataframe: pd.DataFrame) -> float:
45    return dataframe.values.sum()
46
47
48if __name__ == "__main__":
49    dataframe = pd.DataFrame(
50        np.random.randint(0, 100, size=(100, 3)),
51        columns=["alpha", "beta", "gamma"],
52    )
53
54    with pf.set_parallel_backend_context("local_multiprocessing"):
55        print(fixed_partition_size_sum(dataframe))
56        print(initial_partition_size_sum(dataframe))
57        print(computed_partition_size_sum(dataframe))

Profiling

 1"""
 2Demonstrates the use of the `profile` and `trace_export` parameters for profiling Parfun's performances.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.profiling
 8"""
 9
10from typing import List
11import random
12
13import parfun as pf
14
15
16@pf.parallel(
17    split=pf.all_arguments(pf.py_list.by_chunk),
18    combine_with=sum,
19    profile=True,
20    trace_export="parallel_sum_trace.csv",
21)
22def parallel_sum(values: List) -> List:
23    return sum(values)
24
25
26if __name__ == "__main__":
27    N_VALUES = 100_000
28    values = [random.randint(0, 99) for _ in range(0, N_VALUES)]
29
30    with pf.set_parallel_backend_context("local_multiprocessing"):
31        print("Sum =", parallel_sum(values))

Nested Parfun Calls

 1"""
 2Shows how a Parfun function can be called from within another Parfun function.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.nested_functions
 8"""
 9
10import pprint
11import random
12from typing import List
13
14import parfun as pf
15
16
17@pf.parallel(
18    split=pf.all_arguments(pf.py_list.by_chunk),
19    combine_with=pf.py_list.concat,
20)
21def add_vectors(vec_a: List, vec_b: List) -> List:
22    """Add two vectors, element-wise."""
23    return [a + b for a, b in zip(vec_a, vec_b)]
24
25
26@pf.parallel(
27    split=pf.all_arguments(pf.py_list.by_chunk),
28    combine_with=pf.py_list.concat,
29)
30def add_matrices(mat_a: List[List], mat_b: List[List]) -> List[List]:
31    """Add two matrices, row by row."""
32    return [add_vectors(vec_a, vec_b) for vec_a, vec_b in zip(mat_a, mat_b)]
33
34
35if __name__ == "__main__":
36    N_ROWS, N_COLS = 10, 10
37
38    mat_a = [[random.randint(0, 99) for _ in range(0, N_COLS)] for _ in range(0, N_ROWS)]
39    mat_b = [[random.randint(0, 99) for _ in range(0, N_COLS)] for _ in range(0, N_ROWS)]
40
41    print("A =")
42    pprint.pprint(mat_a)
43
44    print("B =")
45    pprint.pprint(mat_b)
46
47    with pf.set_parallel_backend_context("local_multiprocessing"):
48        result = add_matrices(mat_a, mat_b)
49
50    print("A + B =")
51    pprint.pprint(result)

Application Examples

Count Bigrams In A Text Parallelly

 1"""
 2Counts the most common two-letters sequences (bigrams) in the content of an URL.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.count_bigrams.main
 8"""
 9
10import collections
11import psutil
12import ssl
13
14from typing import Counter, Iterable, List
15from urllib.request import urlopen
16
17import parfun as pf
18
19
20def sum_counters(counters: Iterable[Counter[str]]) -> Counter[str]:
21    return sum(counters, start=collections.Counter())
22
23
24@pf.parallel(
25    split=pf.per_argument(
26        lines=pf.py_list.by_chunk
27    ),
28    combine_with=sum_counters,
29)
30def count_bigrams(lines: List[str]) -> Counter:
31    counter: Counter[str] = collections.Counter()
32
33    for line in lines:
34        for word in line.split():
35            for first, second in zip(word, word[1:]):
36                bigram = f"{first}{second}"
37                counter[bigram] += 1
38
39    return counter
40
41
42if __name__ == "__main__":
43    N_WORKERS = psutil.cpu_count(logical=False)
44    URL = "https://www.gutenberg.org/ebooks/100.txt.utf-8"
45    TOP_K = 10
46
47    with urlopen(URL, context=ssl._create_unverified_context()) as response:
48        content = response.read().decode("utf-8").splitlines()
49
50    with pf.set_parallel_backend_context("local_multiprocessing", max_workers=N_WORKERS):
51        counts = count_bigrams(content)
52
53    print(f"Top {TOP_K} words:")
54    for word, count in counts.most_common(TOP_K):
55        print(f"\t{word:<10}:\t{count}")

Parallel Training Random-Forest On Californian Housing Data

 1"""
 2Trains a random tree regressor on the California housing dataset from scikit-learn.
 3
 4Measures the training time when splitting the learning dataset process using Parfun.
 5
 6Usage:
 7
 8    $ git clone https://github.com/Citi/parfun && cd parfun
 9    $ python -m examples.california_housing.main
10"""
11
12import psutil
13import timeit
14
15from typing import List
16
17import numpy as np
18import pandas as pd
19
20from sklearn.datasets import fetch_california_housing
21from sklearn.base import RegressorMixin
22from sklearn.tree import DecisionTreeRegressor
23
24import parfun as pf
25
26
27class MeanRegressor(RegressorMixin):
28    def __init__(self, regressors: List[RegressorMixin]) -> None:
29        super().__init__()
30        self._regressors = regressors
31
32    def predict(self, X):
33        return np.mean([regressor.predict(X) for regressor in self._regressors])
34
35
36@pf.parallel(
37    split=pf.per_argument(dataframe=pf.dataframe.by_row),
38    combine_with=lambda regressors: MeanRegressor(list(regressors))
39)
40def train_regressor(dataframe: pd.DataFrame, feature_names: List[str], target_name: str) -> RegressorMixin:
41
42    regressor = DecisionTreeRegressor()
43    regressor.fit(dataframe[feature_names], dataframe[[target_name]])
44
45    return regressor
46
47
48if __name__ == "__main__":
49    N_WORKERS = psutil.cpu_count(logical=False)
50
51    dataset = fetch_california_housing(download_if_missing=True)
52
53    feature_names = dataset["feature_names"]
54    target_name = dataset["target_names"][0]
55
56    dataframe = pd.DataFrame(dataset["data"], columns=feature_names)
57    dataframe[target_name] = dataset["target"]
58
59    N_MEASURES = 5
60
61    with pf.set_parallel_backend_context("local_single_process"):
62        regressor = train_regressor(dataframe, feature_names, target_name)
63
64        duration = (
65            timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
66            / N_MEASURES
67        )
68
69        print("Sequential training duration:", duration)
70
71    with pf.set_parallel_backend_context("local_multiprocessing", max_workers=N_WORKERS):
72        regressor = train_regressor(dataframe, feature_names, target_name)
73
74        duration = (
75            timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
76            / N_MEASURES
77        )
78
79        print("Parallel training duration:", duration)

Compute Electricity Production Statistics Parallelly

  1"""
  2Based on the monthly electricity production data from ENTSO-E, plots the percentage of renewable energy production for
  3the European electricity grid.
  4
  5Usage:
  6
  7    $ git clone https://github.com/Citi/parfun && cd parfun
  8    $ pip install -r examples/requirements.txt
  9    $ python -m examples.europe_electricity.main [--plot]
 10
 11"""
 12
 13import sys
 14from typing import List
 15
 16import pandas as pd
 17
 18import parfun as pf
 19
 20
 21def fetch_production_data(year: int) -> pd.DataFrame:
 22    """
 23    Downloads the monthly production data for the given year.
 24
 25    Sourced from https://www.entsoe.eu/data/power-stats/.
 26    """
 27
 28    url = f"https://www.entsoe.eu/publications/data/power-stats/{year}/monthly_domestic_values_{year}.csv"
 29
 30    result = pd.read_csv(url, sep=r"\t|,|;", engine="python")
 31
 32    # Some newer datasets use "Area" instead of "Country"
 33    if "Area" in result.columns:
 34        result["Country"] = result["Area"]
 35
 36    return result[["Year", "Month", "Category", "Country", "ProvidedValue"]]
 37
 38
 39def make_consumption_negative(production_data: pd.DataFrame) -> pd.DataFrame:
 40    """
 41    Make consumption values negative production values.
 42
 43    Some production categories have positive consumption values (e.g. "Consumption of Hydro Water Reservoir"). This
 44    function transforms these values in their production counter parts, but with a negative value. This simplifies
 45    subsequent processing.
 46    """
 47
 48    PREFIX = "Consumption of "
 49
 50    result = production_data.copy()
 51
 52    is_consumption = result["Category"].str.startswith(PREFIX)
 53
 54    result.loc[is_consumption, "Category"] = result.loc[is_consumption, "Category"].str.replace(PREFIX, "", regex=False)
 55    result.loc[is_consumption, "ProvidedValue"] *= -1
 56
 57    return result
 58
 59
 60def group_production_by_type(production_data: pd.DataFrame) -> pd.DataFrame:
 61    """Groups and sums all production data by type ("Fossil", "Nuclear", "Renewable" and "Other")."""
 62
 63    fossil_sources = {
 64        "Fossil Gas", "Fossil Hard coal", "Fossil Oil",
 65        "Fossil Brown coal/Lignite", "Fossil Coal-derived gas",
 66        "Fossil Oil shale", "Fossil Peat",
 67    }
 68    nuclear_sources = {"Nuclear"}
 69    renewable_sources = {
 70        "Biomass", "Solar", "Wind Onshore", "Wind Offshore", "Geothermal",
 71        "Hydro Pumped Storage", "Hydro Run-of-river and poundage",
 72        "Hydro Water Reservoir", "Marine", "Other renewable",
 73    }
 74
 75    def map_category(category: str) -> str:
 76        if category in fossil_sources:
 77            return "Fossil"
 78        elif category in nuclear_sources:
 79            return "Nuclear"
 80        elif category in renewable_sources:
 81            return "Renewable"
 82        else:
 83            return "Other"
 84
 85    result = production_data.copy()
 86
 87    result["EnergyType"] = result["Category"].map(map_category)
 88    del result["Category"]
 89
 90    return result.groupby(["Year", "Month", "EnergyType"])["ProvidedValue"].sum().reset_index()
 91
 92
 93def monthly_percentage_production(production_data: pd.DataFrame) -> pd.DataFrame:
 94    """Returns the monthly production percentage for every month and every energy source type."""
 95
 96    result = production_data.pivot_table(index=["Year", "Month"], columns="EnergyType", values="ProvidedValue")
 97
 98    result = result.div(result.sum(axis=1), axis=0) * 100  # make it percentages
 99
100    # Uses datetime for year-month
101    result.index = pd.to_datetime({
102        "year": result.index.get_level_values(0),
103        "month": result.index.get_level_values(1),
104        "day": 1,
105    })
106
107    result.sort_index(ascending=True)  # sort by date
108
109    return result
110
111
112@pf.parallel(
113    split=pf.all_arguments(pf.py_list.by_chunk),
114    combine_with=pf.dataframe.concat,
115    initial_partition_size=2,
116)
117def get_monthly_percentage_production(years: List[int]) -> pd.DataFrame:
118    processed_yearly_data = []
119    for year in years:
120        yearly_production_data = fetch_production_data(year)
121
122        yearly_production_data = make_consumption_negative(yearly_production_data)
123        yearly_production_data = group_production_by_type(yearly_production_data)
124        yearly_production_data = monthly_percentage_production(yearly_production_data)
125
126        processed_yearly_data.append(yearly_production_data)
127
128    return pd.concat(processed_yearly_data)
129
130
131def plot_electricity_production(production_percentages: pd.DataFrame) -> None:
132    import matplotlib.pyplot as plt
133
134    colors = {
135        "Fossil": "lightcoral",
136        "Nuclear": "violet",
137        "Renewable": "lightgreen",
138        "Other": "lightsteelblue",
139    }
140
141    production_percentages.index = production_percentages.index.strftime("%b %Y")
142    production_percentages.plot(kind="bar", stacked=True, figsize=(10, 6), width=1, color=colors)
143
144    plt.title("Europe's monthly electricity production by source")
145    plt.ylabel("Percentage (%)")
146    plt.xlabel('Month')
147    plt.legend(title="Energy source", loc='upper left')
148    plt.grid(axis="y", linestyle="--")
149    plt.ylim(0, 100)
150
151    plt.tight_layout()
152    plt.show()
153
154
155def main():
156    YEARS = list(range(2019, 2025))
157
158    with pf.set_parallel_backend_context("local_multiprocessing"):
159        processed_data = get_monthly_percentage_production(YEARS)
160
161    if "--plot" in sys.argv[1:]:
162        plot_electricity_production(processed_data)
163    else:
164        print(processed_data)
165
166
167if __name__ == "__main__":
168    main()

Compute Portfolio Metrics Parallelly

 1"""
 2Based on a portfolio of stocks, computes basic statistics.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.portfolio_metrics.main
 8"""
 9
10from typing import List
11
12import pandas as pd
13
14import parfun as pf
15
16
17@pf.parallel(
18    split=pf.per_argument(portfolio=pf.dataframe.by_group(by="country")),
19    combine_with=pf.dataframe.concat,
20)
21def relative_metrics(portfolio: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
22    """
23    Computes relative metrics (difference to mean, median ...) of a dataframe, for each of the requested dataframe's
24    values, grouped by country.
25    """
26
27    output = portfolio.copy()  # do not modify the input dataframe.
28
29    for country in output["country"].unique():
30        for column in columns:
31            values = output.loc[output["country"] == country, column]
32
33            mean = values.mean()
34            std = values.std()
35
36            output.loc[output["country"] == country, f"{column}_diff_to_mean"] = values - mean
37            output.loc[output["country"] == country, f"{column}_sq_diff_to_mean"] = (values - mean) ** 2
38            output.loc[output["country"] == country, f"{column}_relative_to_mean"] = (values - mean) / std
39
40    return output
41
42
43if __name__ == "__main__":
44    portfolio = pd.DataFrame({
45        "company": ["Apple", "Citigroup", "ASML", "Volkswagen", "Tencent"],
46        "industry": ["technology", "banking", "technology", "manufacturing", "manufacturing"],
47        "country": ["US", "US", "NL", "DE", "CN"],
48        "market_cap": [2828000000000, 80310000000, 236000000000, 55550000000, 345000000000],
49        "revenue": [397000000000, 79840000000, 27180000000, 312000000000, 79000000000],
50        "workforce": [161000, 240000, 39850, 650951, 104503]
51    })
52
53    with pf.set_parallel_backend_context("local_multiprocessing"):
54        metrics = relative_metrics(portfolio, ["market_cap", "revenue"])
55
56    print(metrics)