Examples

Distributed sorting

 1from typing import List
 2
 3import numpy as np
 4
 5from pargraph import GraphEngine, delayed, graph
 6
 7
 8@delayed
 9def filter_array(array: np.ndarray, low: float, high: float) -> np.ndarray:
10    """
11    Filter an array to include only values within the specified range.
12
13    :param array: The input array to be filtered
14    :param low: The lower bound of the range
15    :param high: The upper bound of the range
16    :return: A new array containing only the values within the specified range
17    """
18    return array[(array >= low) & (array <= high)]
19
20
21@delayed
22def sort_array(array: np.ndarray) -> np.ndarray:
23    """
24    Sort array
25
26    :param array: The input array to be sorted
27    :return: A new array containing the sorted values
28    """
29    return np.sort(array)
30
31
32@delayed
33def reduce_arrays(*arrays: np.ndarray) -> np.ndarray:
34    """
35    Reduce arrays by concatenating them
36
37    :param arrays: A variable number of arrays to be concatenated
38    :return: A new array containing all the concatenated values
39    """
40    return np.concatenate(arrays)
41
42
43@graph
44def map_reduce_sort(array: np.ndarray, partition_count: int) -> np.ndarray:
45    """
46    Map reduce sort
47
48    :param array: The input array to be sorted
49    :param partition_count: The number of partitions to divide the array into
50    :return: A new array containing the sorted values
51    """
52    return reduce_arrays(
53        *(
54            sort_array(filter_array(array, i / partition_count, (i + 1) / partition_count))
55            for i in range(partition_count)
56        )
57    )
58
59
60@graph
61def map_reduce_sort_recursive(
62    array: np.ndarray, partition_counts: List[int], _low: float = 0, _high: float = 1
63) -> np.ndarray:
64    """
65    Map reduce sort recursively
66
67    :param array: The input array to be sorted
68    :param partition_counts: A list of partition counts for recursive sorting
69    :param _low: The lower bound of the values range (not supposed to be used publicly)
70    :param _high: The upper bound of the values range (not supposed to be used publicly)
71    :return: A new array containing the sorted values
72    """
73    if len(partition_counts) == 0:
74        return sort_array(array)
75
76    partition_count, *partition_counts = partition_counts
77
78    sorted_partitions = []
79    for i in range(partition_count):
80        low = _low + (_high - _low) * (i / partition_count)
81        high = _low + (_high - _low) * ((i + 1) / partition_count)
82        sorted_partitions.append(map_reduce_sort_recursive(filter_array(array, low, high), partition_counts, low, high))
83
84    return reduce_arrays(*sorted_partitions)
85
86
87if __name__ == "__main__":
88    task_graph, keys = map_reduce_sort_recursive.to_graph(partition_counts=[2, 2, 2]).to_dict(
89        array=np.random.rand(1_000_000)
90    )
91
92    graph_engine = GraphEngine()
93    print(graph_engine.get(task_graph, keys)[0])

Aggregating NYC taxi data

 1import pandas as pd
 2
 3from pargraph import GraphEngine, delayed, graph
 4
 5
 6@delayed
 7def get_total_amount_sum(file_url, unit):
 8    """
 9    Calculate the sum of the 'total_amount' column from a Parquet file and divide it by a given unit.
10
11    :param file_url: The URL of the Parquet file
12    :param unit: The unit to divide the total amount sum by
13    :return: The sum of the 'total_amount' column divided by the given unit
14    """
15    return pd.read_parquet(file_url, columns=["total_amount"])["total_amount"].sum() / unit
16
17
18@delayed
19def add(*args):
20    """
21    Calculate the sum of all the arguments provided.
22
23    :param args: A variable number of arguments to be summed
24    :return: The sum of all arguments
25    """
26    return sum(args)
27
28
29@delayed
30def collect_result(*results):
31    """
32    Collect results, using every other element as the index and the remaining elements as the values.
33
34    :param results: Arguments where even-indexed elements are used as the index and odd-indexed elements as the values
35    :return: A pandas Series with the specified index and values
36    """
37    return pd.Series(results[1::2], index=results[0::2])
38
39
40URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02}.parquet"
41
42
43@graph
44def nyc_yellow_taxi_fare_total_for_year(year, unit):
45    """
46    Calculate the total fare amount for a given year by summing the total amounts from each month.
47
48    :param year: The year for which to calculate the total fare amount
49    :param unit: The unit to divide the total amount sum by
50    :return: Total fare amount for the given year
51    """
52    return add(*(get_total_amount_sum(URL.format(year=year, month=month), unit) for month in range(1, 13)))
53
54
55@graph
56def nyc_yellow_taxi_fare_total_by_year(years, unit):
57    """
58    Calculate the total fare amount for multiple years.
59
60    :param years: A list of years for which to calculate the total fare amount
61    :param unit: The unit to divide the total amount sum by
62    :return: Total fare amounts for the given years
63    """
64    results = []
65
66    for year in years:
67        results.append(year)
68        results.append(nyc_yellow_taxi_fare_total_for_year(year, unit))
69
70    return collect_result(*results)
71
72
73if __name__ == "__main__":
74    task_graph, keys = nyc_yellow_taxi_fare_total_by_year.to_graph(years=range(2024, 2025)).to_dict(unit=1_000_000)
75
76    graph_engine = GraphEngine()
77    print(graph_engine.get(task_graph, keys)[0])

Pythagoras theorem using Python operations

 1from pargraph import GraphEngine, graph
 2
 3
 4@graph
 5def pythagoras(a: float, b: float) -> float:
 6    """
 7    Calculate the length of the hypotenuse of a right triangle given the lengths of the other two sides.
 8
 9    :param a: Length of one side of the triangle.
10    :param b: Length of the other side of the triangle.
11    :return: Length of the hypotenuse.
12    """
13    return (a**2 + b**2) ** 0.5
14
15
16if __name__ == "__main__":
17    task_graph, keys = pythagoras.to_graph().to_dict(a=3, b=4)
18
19    graph_engine = GraphEngine()
20    print(graph_engine.get(task_graph, keys)[0])