1"""
2Trains a random tree regressor on the California housing dataset from scikit-learn.
3
4Measures the training time when splitting the learning dataset process using Parfun.
5
6Usage:
7
8 $ git clone https://github.com/Citi/parfun && cd parfun
9 $ python -m examples.california_housing.main
10"""
11
12import psutil
13import timeit
14
15from typing import List
16
17import numpy as np
18import pandas as pd
19
20from sklearn.datasets import fetch_california_housing
21from sklearn.base import RegressorMixin
22from sklearn.tree import DecisionTreeRegressor
23
24import parfun as pf
25
26
27class MeanRegressor(RegressorMixin):
28 def __init__(self, regressors: List[RegressorMixin]) -> None:
29 super().__init__()
30 self._regressors = regressors
31
32 def predict(self, X):
33 return np.mean([regressor.predict(X) for regressor in self._regressors])
34
35
36@pf.parallel(
37 split=pf.per_argument(dataframe=pf.dataframe.by_row),
38 combine_with=lambda regressors: MeanRegressor(list(regressors))
39)
40def train_regressor(dataframe: pd.DataFrame, feature_names: List[str], target_name: str) -> RegressorMixin:
41
42 regressor = DecisionTreeRegressor()
43 regressor.fit(dataframe[feature_names], dataframe[[target_name]])
44
45 return regressor
46
47
48if __name__ == "__main__":
49 N_WORKERS = psutil.cpu_count(logical=False)
50
51 dataset = fetch_california_housing(download_if_missing=True)
52
53 feature_names = dataset["feature_names"]
54 target_name = dataset["target_names"][0]
55
56 dataframe = pd.DataFrame(dataset["data"], columns=feature_names)
57 dataframe[target_name] = dataset["target"]
58
59 N_MEASURES = 5
60
61 with pf.set_parallel_backend_context("local_single_process"):
62 regressor = train_regressor(dataframe, feature_names, target_name)
63
64 duration = (
65 timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
66 / N_MEASURES
67 )
68
69 print("Sequential training duration:", duration)
70
71 with pf.set_parallel_backend_context("local_multiprocessing", max_workers=N_WORKERS):
72 regressor = train_regressor(dataframe, feature_names, target_name)
73
74 duration = (
75 timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
76 / N_MEASURES
77 )
78
79 print("Parallel training duration:", duration)