Examples
API Usage Examples
Backend Setup
1"""
2Shows the two ways of initializing a Parfun backend.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.backend_setup
8"""
9
10import parfun as pf
11
12
13if __name__ == "__main__":
14 # Set the parallel backend process-wise.
15 pf.set_parallel_backend("local_multiprocessing")
16
17 # Set the parallel backend with a Python context.
18 with pf.set_parallel_backend_context("scaler_remote", scheduler_address="tcp://scaler.cluster:1243"):
19 ... # Will run the parallel tasks over a remotely setup Scaler cluster.
Partitioning API
all_arguments
1"""
2Uses `all_arguments` to partition all the input data of a parallel function.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.all_arguments
8"""
9
10import pandas as pd
11
12import parfun as pf
13
14
15@pf.parallel(
16 split=pf.all_arguments(pf.dataframe.by_group(by=["year", "month"])),
17 combine_with=pf.dataframe.concat,
18)
19def monthly_sum(sales: pd.DataFrame, costs: pd.DataFrame) -> pd.DataFrame:
20 merged = pd.merge(sales, costs, on=["year", "month", "day"], how="outer")
21 # Group and sum by day
22 grouped = merged.groupby(["year", "month", "day"], as_index=False).sum(numeric_only=True)
23
24 return grouped
25
26
27if __name__ == "__main__":
28 sales = pd.DataFrame({
29 "year": [2024, 2024, 2024],
30 "month": [1, 1, 2],
31 "day": [1, 2, 1],
32 "sales": [100, 200, 150]
33 })
34
35 costs = pd.DataFrame({
36 "year": [2024, 2024, 2024],
37 "month": [1, 1, 2],
38 "day": [1, 2, 1],
39 "costs": [50, 70, 80]
40 })
41
42 with pf.set_parallel_backend_context("local_multiprocessing"):
43 result = monthly_sum(sales, costs)
44
45 print(result)
46 # year month day sales costs
47 # 0 2024 1 1 100 50
48 # 1 2024 1 2 200 70
49 # 2 2024 2 1 150 80
per_argument
1"""
2Uses `per_argument` to partition the input data from multiple arguments.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.per_argument
8"""
9
10from typing import List
11
12import pandas as pd
13
14import parfun as pf
15
16
17@pf.parallel(
18 split=pf.per_argument(
19 factors=pf.py_list.by_chunk,
20 dataframe=pf.dataframe.by_row,
21 ),
22 combine_with=pf.dataframe.concat,
23)
24def multiply_by_row(factors: List[int], dataframe: pd.DataFrame) -> pd.DataFrame:
25 assert len(factors) == len(dataframe)
26 return dataframe.multiply(factors, axis=0)
27
28
29if __name__ == "__main__":
30 dataframe = pd.DataFrame({
31 "A": [1, 2, 3],
32 "B": [4, 5, 6]
33 })
34
35 factors = [10, 20, 30]
36
37 with pf.set_parallel_backend_context("local_multiprocessing"):
38 result = multiply_by_row(factors, dataframe)
39
40 print(result)
41 # A B
42 # 0 10 40
43 # 1 40 100
44 # 2 90 180
Custom Partition Function
1"""
2Shows how to use custom Python generators and functions as partitioning and combining functions.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.custom_generators
8"""
9
10from typing import Generator, Iterable, Tuple
11
12import pandas as pd
13
14import parfun as pf
15
16
17def partition_by_day_of_week(dataframe: pd.DataFrame) -> Generator[Tuple[pd.DataFrame], None, None]:
18 """Divides the computation on the "datetime" value, by day of the week (Monday, Tuesday ...)."""
19
20 for _, partition in dataframe.groupby(dataframe["datetime"].dt.day_of_week):
21 yield partition, # Should always yield a tuple that matches the input parameters.
22
23
24def combine_results(dataframes: Iterable[pd.DataFrame]) -> pd.DataFrame:
25 """Collects the results by concatenating them, and make sure the values are kept sorted by date."""
26 return pd.concat(dataframes).sort_values(by="datetime")
27
28
29@pf.parallel(
30 split=pf.all_arguments(partition_by_day_of_week),
31 combine_with=combine_results,
32)
33def daily_mean(dataframe: pd.DataFrame) -> pd.DataFrame:
34 return dataframe.groupby(dataframe["datetime"].dt.date).mean(numeric_only=True)
35
36
37if __name__ == "__main__":
38 dataframe = pd.DataFrame({
39 # Probing times
40 "datetime": pd.to_datetime([
41 "2025-04-01 06:00", "2025-04-01 18:00", "2025-04-02 10:00", "2025-04-03 14:00", "2025-04-03 23:00",
42 "2025-04-04 08:00", "2025-04-05 12:00", "2025-04-06 07:00", "2025-04-06 20:00", "2025-04-07 09:00",
43 "2025-04-08 15:00", "2025-04-09 11:00", "2025-04-10 13:00", "2025-04-11 06:00", "2025-04-12 16:00",
44 "2025-04-13 17:00", "2025-04-14 22:00", "2025-04-15 10:00", "2025-04-16 09:00", "2025-04-17 13:00",
45 "2025-04-18 14:00", "2025-04-19 18:00", "2025-04-20 07:00", "2025-04-21 20:00", "2025-04-22 15:00",
46 ]),
47 # Temperature values (°C)
48 "temperature": [
49 7.2, 10.1, 9.8, 12.5, 11.7,
50 8.9, 13.0, 7.5, 10.8, 9.3,
51 12.1, 11.5, 13.3, 6.8, 12.7,
52 13.5, 9.2, 10.0, 9.9, 11.8,
53 12.4, 10.6, 7.9, 9.5, 11.6,
54 ],
55 # Humidity values (%)
56 "humidity": [
57 85, 78, 80, 75, 76,
58 88, 73, 89, 77, 84,
59 72, 74, 70, 90, 71,
60 69, 86, 81, 83, 76,
61 74, 79, 87, 82, 73,
62 ]
63 })
64
65 with pf.set_parallel_backend_context("local_multiprocessing"):
66 result = daily_mean(dataframe)
67
68 print(result)
Enforced Partition Size
1"""
2Uses `all_arguments` to partition all the input data of a parallel function.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.partition_size
8"""
9
10import numpy as np
11import pandas as pd
12
13import parfun as pf
14
15
16# With `fixed_partition_size`, the input dataframe will always be split in chunks of 1000 rows.
17@pf.parallel(
18 split=pf.all_arguments(pf.dataframe.by_row),
19 combine_with=sum,
20 fixed_partition_size=1000,
21)
22def fixed_partition_size_sum(dataframe: pd.DataFrame) -> float:
23 return dataframe.values.sum()
24
25
26# With `initial_partition_size`, the input dataframe will be split in chunks of 1000 rows until Parfun's
27# machine-learning algorithm find a better estimate.
28@pf.parallel(
29 split=pf.all_arguments(pf.dataframe.by_row),
30 combine_with=sum,
31 initial_partition_size=1000,
32)
33def initial_partition_size_sum(dataframe: pd.DataFrame) -> float:
34 return dataframe.values.sum()
35
36
37# Both `fixed_partition_size` and `initial_partition_size` can accept a callable instead of an integer value. This
38# allows for partition sizes to be computed based on the input parameters.
39@pf.parallel(
40 split=pf.all_arguments(pf.dataframe.by_row),
41 combine_with=sum,
42 initial_partition_size=lambda dataframe: max(10, len(dataframe) // 4),
43)
44def computed_partition_size_sum(dataframe: pd.DataFrame) -> float:
45 return dataframe.values.sum()
46
47
48if __name__ == "__main__":
49 dataframe = pd.DataFrame(
50 np.random.randint(0, 100, size=(100, 3)),
51 columns=["alpha", "beta", "gamma"],
52 )
53
54 with pf.set_parallel_backend_context("local_multiprocessing"):
55 print(fixed_partition_size_sum(dataframe))
56 print(initial_partition_size_sum(dataframe))
57 print(computed_partition_size_sum(dataframe))
Profiling
1"""
2Demonstrates the use of the `profile` and `trace_export` parameters for profiling Parfun's performances.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.profiling
8"""
9
10from typing import List
11import random
12
13import parfun as pf
14
15
16@pf.parallel(
17 split=pf.all_arguments(pf.py_list.by_chunk),
18 combine_with=sum,
19 profile=True,
20 trace_export="parallel_sum_trace.csv",
21)
22def parallel_sum(values: List) -> List:
23 return sum(values)
24
25
26if __name__ == "__main__":
27 N_VALUES = 100_000
28 values = [random.randint(0, 99) for _ in range(0, N_VALUES)]
29
30 with pf.set_parallel_backend_context("local_multiprocessing"):
31 print("Sum =", parallel_sum(values))
Nested Parfun Calls
1"""
2Shows how a Parfun function can be called from within another Parfun function.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.nested_functions
8"""
9
10import pprint
11import random
12from typing import List
13
14import parfun as pf
15
16
17@pf.parallel(
18 split=pf.all_arguments(pf.py_list.by_chunk),
19 combine_with=pf.py_list.concat,
20)
21def add_vectors(vec_a: List, vec_b: List) -> List:
22 """Add two vectors, element-wise."""
23 return [a + b for a, b in zip(vec_a, vec_b)]
24
25
26@pf.parallel(
27 split=pf.all_arguments(pf.py_list.by_chunk),
28 combine_with=pf.py_list.concat,
29)
30def add_matrices(mat_a: List[List], mat_b: List[List]) -> List[List]:
31 """Add two matrices, row by row."""
32 return [add_vectors(vec_a, vec_b) for vec_a, vec_b in zip(mat_a, mat_b)]
33
34
35if __name__ == "__main__":
36 N_ROWS, N_COLS = 10, 10
37
38 mat_a = [[random.randint(0, 99) for _ in range(0, N_COLS)] for _ in range(0, N_ROWS)]
39 mat_b = [[random.randint(0, 99) for _ in range(0, N_COLS)] for _ in range(0, N_ROWS)]
40
41 print("A =")
42 pprint.pprint(mat_a)
43
44 print("B =")
45 pprint.pprint(mat_b)
46
47 with pf.set_parallel_backend_context("local_multiprocessing"):
48 result = add_matrices(mat_a, mat_b)
49
50 print("A + B =")
51 pprint.pprint(result)
Application Examples
Count Bigrams In A Text Parallelly
1"""
2Counts the most common two-letters sequences (bigrams) in the content of an URL.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.count_bigrams.main
8"""
9
10import collections
11import psutil
12import ssl
13
14from typing import Counter, Iterable, List
15from urllib.request import urlopen
16
17import parfun as pf
18
19
20def sum_counters(counters: Iterable[Counter[str]]) -> Counter[str]:
21 return sum(counters, start=collections.Counter())
22
23
24@pf.parallel(
25 split=pf.per_argument(
26 lines=pf.py_list.by_chunk
27 ),
28 combine_with=sum_counters,
29)
30def count_bigrams(lines: List[str]) -> Counter:
31 counter: Counter[str] = collections.Counter()
32
33 for line in lines:
34 for word in line.split():
35 for first, second in zip(word, word[1:]):
36 bigram = f"{first}{second}"
37 counter[bigram] += 1
38
39 return counter
40
41
42if __name__ == "__main__":
43 N_WORKERS = psutil.cpu_count(logical=False)
44 URL = "https://www.gutenberg.org/ebooks/100.txt.utf-8"
45 TOP_K = 10
46
47 with urlopen(URL, context=ssl._create_unverified_context()) as response:
48 content = response.read().decode("utf-8").splitlines()
49
50 with pf.set_parallel_backend_context("local_multiprocessing", max_workers=N_WORKERS):
51 counts = count_bigrams(content)
52
53 print(f"Top {TOP_K} words:")
54 for word, count in counts.most_common(TOP_K):
55 print(f"\t{word:<10}:\t{count}")
Parallel Training Random-Forest On Californian Housing Data
1"""
2Trains a random tree regressor on the California housing dataset from scikit-learn.
3
4Measures the training time when splitting the learning dataset process using Parfun.
5
6Usage:
7
8 $ git clone https://github.com/Citi/parfun && cd parfun
9 $ python -m examples.california_housing.main
10"""
11
12import psutil
13import timeit
14
15from typing import List
16
17import numpy as np
18import pandas as pd
19
20from sklearn.datasets import fetch_california_housing
21from sklearn.base import RegressorMixin
22from sklearn.tree import DecisionTreeRegressor
23
24import parfun as pf
25
26
27class MeanRegressor(RegressorMixin):
28 def __init__(self, regressors: List[RegressorMixin]) -> None:
29 super().__init__()
30 self._regressors = regressors
31
32 def predict(self, X):
33 return np.mean([regressor.predict(X) for regressor in self._regressors])
34
35
36@pf.parallel(
37 split=pf.per_argument(dataframe=pf.dataframe.by_row),
38 combine_with=lambda regressors: MeanRegressor(list(regressors))
39)
40def train_regressor(dataframe: pd.DataFrame, feature_names: List[str], target_name: str) -> RegressorMixin:
41
42 regressor = DecisionTreeRegressor()
43 regressor.fit(dataframe[feature_names], dataframe[[target_name]])
44
45 return regressor
46
47
48if __name__ == "__main__":
49 N_WORKERS = psutil.cpu_count(logical=False)
50
51 dataset = fetch_california_housing(download_if_missing=True)
52
53 feature_names = dataset["feature_names"]
54 target_name = dataset["target_names"][0]
55
56 dataframe = pd.DataFrame(dataset["data"], columns=feature_names)
57 dataframe[target_name] = dataset["target"]
58
59 N_MEASURES = 5
60
61 with pf.set_parallel_backend_context("local_single_process"):
62 regressor = train_regressor(dataframe, feature_names, target_name)
63
64 duration = (
65 timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
66 / N_MEASURES
67 )
68
69 print("Sequential training duration:", duration)
70
71 with pf.set_parallel_backend_context("local_multiprocessing", max_workers=N_WORKERS):
72 regressor = train_regressor(dataframe, feature_names, target_name)
73
74 duration = (
75 timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
76 / N_MEASURES
77 )
78
79 print("Parallel training duration:", duration)
Compute Electricity Production Statistics Parallelly
1"""
2Based on the monthly electricity production data from ENTSO-E, plots the percentage of renewable energy production for
3the European electricity grid.
4
5Usage:
6
7 $ git clone https://github.com/Citi/parfun && cd parfun
8 $ pip install -r examples/requirements.txt
9 $ python -m examples.europe_electricity.main [--plot]
10
11"""
12
13import sys
14from typing import List
15
16import pandas as pd
17
18import parfun as pf
19
20
21def fetch_production_data(year: int) -> pd.DataFrame:
22 """
23 Downloads the monthly production data for the given year.
24
25 Sourced from https://www.entsoe.eu/data/power-stats/.
26 """
27
28 url = f"https://www.entsoe.eu/publications/data/power-stats/{year}/monthly_domestic_values_{year}.csv"
29
30 result = pd.read_csv(url, sep=r"\t|,|;", engine="python")
31
32 # Some newer datasets use "Area" instead of "Country"
33 if "Area" in result.columns:
34 result["Country"] = result["Area"]
35
36 return result[["Year", "Month", "Category", "Country", "ProvidedValue"]]
37
38
39def make_consumption_negative(production_data: pd.DataFrame) -> pd.DataFrame:
40 """
41 Make consumption values negative production values.
42
43 Some production categories have positive consumption values (e.g. "Consumption of Hydro Water Reservoir"). This
44 function transforms these values in their production counter parts, but with a negative value. This simplifies
45 subsequent processing.
46 """
47
48 PREFIX = "Consumption of "
49
50 result = production_data.copy()
51
52 is_consumption = result["Category"].str.startswith(PREFIX)
53
54 result.loc[is_consumption, "Category"] = result.loc[is_consumption, "Category"].str.replace(PREFIX, "", regex=False)
55 result.loc[is_consumption, "ProvidedValue"] *= -1
56
57 return result
58
59
60def group_production_by_type(production_data: pd.DataFrame) -> pd.DataFrame:
61 """Groups and sums all production data by type ("Fossil", "Nuclear", "Renewable" and "Other")."""
62
63 fossil_sources = {
64 "Fossil Gas", "Fossil Hard coal", "Fossil Oil",
65 "Fossil Brown coal/Lignite", "Fossil Coal-derived gas",
66 "Fossil Oil shale", "Fossil Peat",
67 }
68 nuclear_sources = {"Nuclear"}
69 renewable_sources = {
70 "Biomass", "Solar", "Wind Onshore", "Wind Offshore", "Geothermal",
71 "Hydro Pumped Storage", "Hydro Run-of-river and poundage",
72 "Hydro Water Reservoir", "Marine", "Other renewable",
73 }
74
75 def map_category(category: str) -> str:
76 if category in fossil_sources:
77 return "Fossil"
78 elif category in nuclear_sources:
79 return "Nuclear"
80 elif category in renewable_sources:
81 return "Renewable"
82 else:
83 return "Other"
84
85 result = production_data.copy()
86
87 result["EnergyType"] = result["Category"].map(map_category)
88 del result["Category"]
89
90 return result.groupby(["Year", "Month", "EnergyType"])["ProvidedValue"].sum().reset_index()
91
92
93def monthly_percentage_production(production_data: pd.DataFrame) -> pd.DataFrame:
94 """Returns the monthly production percentage for every month and every energy source type."""
95
96 result = production_data.pivot_table(index=["Year", "Month"], columns="EnergyType", values="ProvidedValue")
97
98 result = result.div(result.sum(axis=1), axis=0) * 100 # make it percentages
99
100 # Uses datetime for year-month
101 result.index = pd.to_datetime({
102 "year": result.index.get_level_values(0),
103 "month": result.index.get_level_values(1),
104 "day": 1,
105 })
106
107 result.sort_index(ascending=True) # sort by date
108
109 return result
110
111
112@pf.parallel(
113 split=pf.all_arguments(pf.py_list.by_chunk),
114 combine_with=pf.dataframe.concat,
115 initial_partition_size=2,
116)
117def get_monthly_percentage_production(years: List[int]) -> pd.DataFrame:
118 processed_yearly_data = []
119 for year in years:
120 yearly_production_data = fetch_production_data(year)
121
122 yearly_production_data = make_consumption_negative(yearly_production_data)
123 yearly_production_data = group_production_by_type(yearly_production_data)
124 yearly_production_data = monthly_percentage_production(yearly_production_data)
125
126 processed_yearly_data.append(yearly_production_data)
127
128 return pd.concat(processed_yearly_data)
129
130
131def plot_electricity_production(production_percentages: pd.DataFrame) -> None:
132 import matplotlib.pyplot as plt
133
134 colors = {
135 "Fossil": "lightcoral",
136 "Nuclear": "violet",
137 "Renewable": "lightgreen",
138 "Other": "lightsteelblue",
139 }
140
141 production_percentages.index = production_percentages.index.strftime("%b %Y")
142 production_percentages.plot(kind="bar", stacked=True, figsize=(10, 6), width=1, color=colors)
143
144 plt.title("Europe's monthly electricity production by source")
145 plt.ylabel("Percentage (%)")
146 plt.xlabel('Month')
147 plt.legend(title="Energy source", loc='upper left')
148 plt.grid(axis="y", linestyle="--")
149 plt.ylim(0, 100)
150
151 plt.tight_layout()
152 plt.show()
153
154
155def main():
156 YEARS = list(range(2019, 2025))
157
158 with pf.set_parallel_backend_context("local_multiprocessing"):
159 processed_data = get_monthly_percentage_production(YEARS)
160
161 if "--plot" in sys.argv[1:]:
162 plot_electricity_production(processed_data)
163 else:
164 print(processed_data)
165
166
167if __name__ == "__main__":
168 main()
Compute Portfolio Metrics Parallelly
1"""
2Based on a portfolio of stocks, computes basic statistics.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.portfolio_metrics.main
8"""
9
10from typing import List
11
12import pandas as pd
13
14import parfun as pf
15
16
17@pf.parallel(
18 split=pf.per_argument(portfolio=pf.dataframe.by_group(by="country")),
19 combine_with=pf.dataframe.concat,
20)
21def relative_metrics(portfolio: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
22 """
23 Computes relative metrics (difference to mean, median ...) of a dataframe, for each of the requested dataframe's
24 values, grouped by country.
25 """
26
27 output = portfolio.copy() # do not modify the input dataframe.
28
29 for country in output["country"].unique():
30 for column in columns:
31 values = output.loc[output["country"] == country, column]
32
33 mean = values.mean()
34 std = values.std()
35
36 output.loc[output["country"] == country, f"{column}_diff_to_mean"] = values - mean
37 output.loc[output["country"] == country, f"{column}_sq_diff_to_mean"] = (values - mean) ** 2
38 output.loc[output["country"] == country, f"{column}_relative_to_mean"] = (values - mean) / std
39
40 return output
41
42
43if __name__ == "__main__":
44 portfolio = pd.DataFrame({
45 "company": ["Apple", "Citigroup", "ASML", "Volkswagen", "Tencent"],
46 "industry": ["technology", "banking", "technology", "manufacturing", "manufacturing"],
47 "country": ["US", "US", "NL", "DE", "CN"],
48 "market_cap": [2828000000000, 80310000000, 236000000000, 55550000000, 345000000000],
49 "revenue": [397000000000, 79840000000, 27180000000, 312000000000, 79000000000],
50 "workforce": [161000, 240000, 39850, 650951, 104503]
51 })
52
53 with pf.set_parallel_backend_context("local_multiprocessing"):
54 metrics = relative_metrics(portfolio, ["market_cap", "revenue"])
55
56 print(metrics)