Examples

Parallely count bigrams in a text

 1"""
 2Counts the most common two-letters sequences (bigrams) in the content of an URL.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.count_bigrams.main
 8"""
 9
10import collections
11import psutil
12import ssl
13
14from typing import Counter, Iterable, List
15from urllib.request import urlopen
16
17import parfun as pf
18
19
20def sum_counters(counters: Iterable[Counter[str]]) -> Counter[str]:
21    return sum(counters, start=collections.Counter())
22
23
24@pf.parallel(
25    split=pf.per_argument(
26        lines=pf.py_list.by_chunk
27    ),
28    combine_with=sum_counters,
29)
30def count_bigrams(lines: List[str]) -> Counter:
31    counter: Counter[str] = collections.Counter()
32
33    for line in lines:
34        for word in line.split():
35            for first, second in zip(word, word[1:]):
36                bigram = f"{first}{second}"
37                counter[bigram] += 1
38
39    return counter
40
41
42if __name__ == "__main__":
43    N_WORKERS = psutil.cpu_count(logical=False)
44    URL = "https://www.gutenberg.org/ebooks/100.txt.utf-8"
45    TOP_K = 10
46
47    with urlopen(URL, context=ssl._create_unverified_context()) as response:
48        content = response.read().decode("utf-8").splitlines()
49
50    with pf.set_parallel_backend_context("local_multiprocessing", max_workers=N_WORKERS):
51        counts = count_bigrams(content)
52
53    print(f"Top {TOP_K} words:")
54    for word, count in counts.most_common(TOP_K):
55        print(f"\t{word:<10}:\t{count}")

Parallely train a random tree regressor

 1"""
 2Trains a random tree regressor on the California housing dataset from scikit-learn.
 3
 4Measures the training time when splitting the learning dataset process using Parfun.
 5
 6Usage:
 7
 8    $ git clone https://github.com/Citi/parfun && cd parfun
 9    $ python -m examples.california_housing.main
10"""
11
12import psutil
13import timeit
14
15from typing import List
16
17import numpy as np
18import pandas as pd
19
20from sklearn.datasets import fetch_california_housing
21from sklearn.base import RegressorMixin
22from sklearn.tree import DecisionTreeRegressor
23
24import parfun as pf
25
26
27class MeanRegressor(RegressorMixin):
28    def __init__(self, regressors: List[RegressorMixin]) -> None:
29        super().__init__()
30        self._regressors = regressors
31
32    def predict(self, X):
33        return np.mean([regressor.predict(X) for regressor in self._regressors])
34
35
36@pf.parallel(
37    split=pf.per_argument(dataframe=pf.dataframe.by_row),
38    combine_with=lambda regressors: MeanRegressor(list(regressors))
39)
40def train_regressor(dataframe: pd.DataFrame, feature_names: List[str], target_name: str) -> RegressorMixin:
41
42    regressor = DecisionTreeRegressor()
43    regressor.fit(dataframe[feature_names], dataframe[[target_name]])
44
45    return regressor
46
47
48if __name__ == "__main__":
49    N_WORKERS = psutil.cpu_count(logical=False)
50
51    dataset = fetch_california_housing(download_if_missing=True)
52
53    feature_names = dataset["feature_names"]
54    target_name = dataset["target_names"][0]
55
56    dataframe = pd.DataFrame(dataset["data"], columns=feature_names)
57    dataframe[target_name] = dataset["target"]
58
59    N_MEASURES = 5
60
61    with pf.set_parallel_backend_context("local_single_process"):
62        regressor = train_regressor(dataframe, feature_names, target_name)
63
64        duration = (
65            timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
66            / N_MEASURES
67        )
68
69        print("Sequential training duration:", duration)
70
71    with pf.set_parallel_backend_context("local_multiprocessing", max_workers=N_WORKERS):
72        regressor = train_regressor(dataframe, feature_names, target_name)
73
74        duration = (
75            timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
76            / N_MEASURES
77        )
78
79        print("Parallel training duration:", duration)