Examples
Distributed sorting
1from typing import List
2
3import numpy as np
4
5from pargraph import GraphEngine, delayed, graph
6
7
8@delayed
9def filter_array(array: np.ndarray, low: float, high: float) -> np.ndarray:
10 """
11 Filter an array to include only values within the specified range.
12
13 :param array: The input array to be filtered
14 :param low: The lower bound of the range
15 :param high: The upper bound of the range
16 :return: A new array containing only the values within the specified range
17 """
18 return array[(array >= low) & (array <= high)]
19
20
21@delayed
22def sort_array(array: np.ndarray) -> np.ndarray:
23 """
24 Sort array
25
26 :param array: The input array to be sorted
27 :return: A new array containing the sorted values
28 """
29 return np.sort(array)
30
31
32@delayed
33def reduce_arrays(*arrays: np.ndarray) -> np.ndarray:
34 """
35 Reduce arrays by concatenating them
36
37 :param arrays: A variable number of arrays to be concatenated
38 :return: A new array containing all the concatenated values
39 """
40 return np.concatenate(arrays)
41
42
43@graph
44def map_reduce_sort(array: np.ndarray, partition_count: int) -> np.ndarray:
45 """
46 Map reduce sort
47
48 :param array: The input array to be sorted
49 :param partition_count: The number of partitions to divide the array into
50 :return: A new array containing the sorted values
51 """
52 return reduce_arrays(
53 *(
54 sort_array(filter_array(array, i / partition_count, (i + 1) / partition_count))
55 for i in range(partition_count)
56 )
57 )
58
59
60@graph
61def map_reduce_sort_recursive(
62 array: np.ndarray, partition_counts: List[int], _low: float = 0, _high: float = 1
63) -> np.ndarray:
64 """
65 Map reduce sort recursively
66
67 :param array: The input array to be sorted
68 :param partition_counts: A list of partition counts for recursive sorting
69 :param _low: The lower bound of the values range (not supposed to be used publicly)
70 :param _high: The upper bound of the values range (not supposed to be used publicly)
71 :return: A new array containing the sorted values
72 """
73 if len(partition_counts) == 0:
74 return sort_array(array)
75
76 partition_count, *partition_counts = partition_counts
77
78 sorted_partitions = []
79 for i in range(partition_count):
80 low = _low + (_high - _low) * (i / partition_count)
81 high = _low + (_high - _low) * ((i + 1) / partition_count)
82 sorted_partitions.append(map_reduce_sort_recursive(filter_array(array, low, high), partition_counts, low, high))
83
84 return reduce_arrays(*sorted_partitions)
85
86
87if __name__ == "__main__":
88 task_graph, keys = map_reduce_sort_recursive.to_graph(partition_counts=[2, 2, 2]).to_dict(
89 array=np.random.rand(1_000_000)
90 )
91
92 graph_engine = GraphEngine()
93 print(graph_engine.get(task_graph, keys)[0])
Aggregating NYC taxi data
1import pandas as pd
2
3from pargraph import GraphEngine, delayed, graph
4
5
6@delayed
7def get_total_amount_sum(file_url, unit):
8 """
9 Calculate the sum of the 'total_amount' column from a Parquet file and divide it by a given unit.
10
11 :param file_url: The URL of the Parquet file
12 :param unit: The unit to divide the total amount sum by
13 :return: The sum of the 'total_amount' column divided by the given unit
14 """
15 return pd.read_parquet(file_url, columns=["total_amount"])["total_amount"].sum() / unit
16
17
18@delayed
19def add(*args):
20 """
21 Calculate the sum of all the arguments provided.
22
23 :param args: A variable number of arguments to be summed
24 :return: The sum of all arguments
25 """
26 return sum(args)
27
28
29@delayed
30def collect_result(*results):
31 """
32 Collect results, using every other element as the index and the remaining elements as the values.
33
34 :param results: Arguments where even-indexed elements are used as the index and odd-indexed elements as the values
35 :return: A pandas Series with the specified index and values
36 """
37 return pd.Series(results[1::2], index=results[0::2])
38
39
40URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02}.parquet"
41
42
43@graph
44def nyc_yellow_taxi_fare_total_for_year(year, unit):
45 """
46 Calculate the total fare amount for a given year by summing the total amounts from each month.
47
48 :param year: The year for which to calculate the total fare amount
49 :param unit: The unit to divide the total amount sum by
50 :return: Total fare amount for the given year
51 """
52 return add(*(get_total_amount_sum(URL.format(year=year, month=month), unit) for month in range(1, 13)))
53
54
55@graph
56def nyc_yellow_taxi_fare_total_by_year(years, unit):
57 """
58 Calculate the total fare amount for multiple years.
59
60 :param years: A list of years for which to calculate the total fare amount
61 :param unit: The unit to divide the total amount sum by
62 :return: Total fare amounts for the given years
63 """
64 results = []
65
66 for year in years:
67 results.append(year)
68 results.append(nyc_yellow_taxi_fare_total_for_year(year, unit))
69
70 return collect_result(*results)
71
72
73if __name__ == "__main__":
74 task_graph, keys = nyc_yellow_taxi_fare_total_by_year.to_graph(years=range(2024, 2025)).to_dict(unit=1_000_000)
75
76 graph_engine = GraphEngine()
77 print(graph_engine.get(task_graph, keys)[0])
Pythagoras theorem using Python operations
1from pargraph import GraphEngine, graph
2
3
4@graph
5def pythagoras(a: float, b: float) -> float:
6 """
7 Calculate the length of the hypotenuse of a right triangle given the lengths of the other two sides.
8
9 :param a: Length of one side of the triangle.
10 :param b: Length of the other side of the triangle.
11 :return: Length of the hypotenuse.
12 """
13 return (a**2 + b**2) ** 0.5
14
15
16if __name__ == "__main__":
17 task_graph, keys = pythagoras.to_graph().to_dict(a=3, b=4)
18
19 graph_engine = GraphEngine()
20 print(graph_engine.get(task_graph, keys)[0])