Walkthrough
In this walkthrough, we’ll create some graph computations with Pargraph and analyze NYC taxi data.
Defining delayed
functions
delayed
functions are pure, inseperable functions used to compose a graph computation. The pure nature of these functions allows them to be scheduled and executed concurrently across cores and machines.
In a graph computation, delayed
functions are the nodes and the edges represent the dependencies between delayed
functions.
Example: NYC Yellow Taxi Fare Totals by Year
The NYC Taxi and Limousine Commission publishes data for every taxi trip since 2009. Each data file is in the Parquet format and contains every trip taken in a given month. Loading each file and processing them sequentially takes a very long time, so to speed it up we would like to load and process the files in parallel then perform an aggregation afterward.
Here are some functions that we will need, defined as delayed
functions:
[1]:
import pandas as pd
from pargraph import delayed
@delayed
def get_total_amount_sum(file_url, unit):
"""
Calculate the sum of the 'total_amount' column from a Parquet file and divide it by a given unit.
:param file_url: The URL of the Parquet file
:param unit: The unit to divide the total amount sum by
:return: The sum of the 'total_amount' column divided by the given unit
"""
return (
pd.read_parquet(file_url, columns=["total_amount"])["total_amount"].sum() / unit
)
@delayed
def add(*args):
"""
Calculate the sum of all the arguments provided.
:param args: A variable number of arguments to be summed
:return: The sum of all arguments
"""
return sum(args)
@delayed
def collect_result(*results):
"""
Collect results into a pandas Series, using every other element as the index and the remaining elements as the values.
:param results: A variable number of arguments where even-indexed elements are used as the index and odd-indexed elements as the values
:return: A pandas Series with the specified index and values
"""
return pd.Series(results[1::2], index=results[0::2])
A delayed
function retains the same call semantics as a normal Python function:
[2]:
add(1, 2)
[2]:
3
Defining graph
functions
graph
functions compose a graph by calling delayed
functions and other graph
functions. graph
functions are to delayed
functions as molecules are to atoms.
[3]:
from pargraph import graph
URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02}.parquet"
@graph
def nyc_yellow_taxi_fare_total_for_year(year, unit):
"""
Calculate the total fare amount for a given year by summing the total amounts from each month.
:param year: The year for which to calculate the total fare amount
:param unit: The unit to divide the total amount sum by
:return: Total fare amount for the given year
"""
return add(
*(
get_total_amount_sum(URL.format(year=year, month=month), unit)
for month in range(1, 13)
)
)
A graph
function retains the same call semantics as a normal Python function:
[4]:
nyc_yellow_taxi_fare_total_for_year(year=2024, unit=1_000_000)
[4]:
1145.8691257199998
Converting to graphs
Using to_graph()
, we can generate a graph representation of the function.
[5]:
nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024)
[5]:
Graph(consts={ConstKey(key='_2b4a8a0d1c4540bcba120bf22b548b33'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMS5wYXJxdWV0'), ConstKey(key='_dfb19e5cdf664641a82537a3fb00746c'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMi5wYXJxdWV0'), ConstKey(key='_1bab762fa4bf4f5cb2800bf3db7d2b75'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMy5wYXJxdWV0'), ConstKey(key='_ff769f99997b406583c8ddbbc9d9dd0e'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNC5wYXJxdWV0'), ConstKey(key='_75fb6201a22f46fc9876eb1735b63546'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNS5wYXJxdWV0'), ConstKey(key='_2f8cff99612c4bd09c71e967241e7fbd'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNi5wYXJxdWV0'), ConstKey(key='_990a0d33f4c44747818dc5806943a1b1'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNy5wYXJxdWV0'), ConstKey(key='_91614fbdd23841fb9186e45fdadc61c0'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wOC5wYXJxdWV0'), ConstKey(key='_179683cff2b94209a7097645c2587b49'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wOS5wYXJxdWV0'), ConstKey(key='_f2cf83d1721741aabfe3b0dce420379e'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMC5wYXJxdWV0'), ConstKey(key='_fbfe7b630c374bdd93c911d743dcf7eb'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMS5wYXJxdWV0'), ConstKey(key='_9f13bd84b7f240cdb31d67408fc69185'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMi5wYXJxdWV0')}, inputs={InputKey(key='unit'): None}, nodes={NodeKey(key='get_total_amount_sum_2d8dce044dc94629ae978f471e9a4b27'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_2b4a8a0d1c4540bcba120bf22b548b33'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_5fd6d6d8a5f44649ad9e18325fa1d5fe'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_dfb19e5cdf664641a82537a3fb00746c'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6513415006b942bdbad75203aed04e59'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_1bab762fa4bf4f5cb2800bf3db7d2b75'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_dc205da7c5cc4059b8b10ba4787de141'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_ff769f99997b406583c8ddbbc9d9dd0e'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_4ce85af8e4c84cdfabe19a05691dda9a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_75fb6201a22f46fc9876eb1735b63546'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_d8553e95e0ca4b1390f240921abd0007'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_2f8cff99612c4bd09c71e967241e7fbd'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6620f973d25a461580614a2e99685a93'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_990a0d33f4c44747818dc5806943a1b1'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_8487ed4445f6459b909e501d9e4bd0aa'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_91614fbdd23841fb9186e45fdadc61c0'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6661917548034435b9693327fdb97623'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_179683cff2b94209a7097645c2587b49'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_a4af758ab3844b63b7de85fff5a3172a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_f2cf83d1721741aabfe3b0dce420379e'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_23ffa717da794003984e34ecb82d802a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_fbfe7b630c374bdd93c911d743dcf7eb'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_1feb5a75eeca467e933239c5d3d4f732'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_9f13bd84b7f240cdb31d67408fc69185'), 'unit': InputKey(key='unit')}), NodeKey(key='add_eff00a44974644dfb8071545ddb5dd37'): FunctionCall(function=<function add at 0x000001AAF37E09A0>, args={'0': NodeOutputKey(key='get_total_amount_sum_2d8dce044dc94629ae978f471e9a4b27', output='result'), '1': NodeOutputKey(key='get_total_amount_sum_5fd6d6d8a5f44649ad9e18325fa1d5fe', output='result'), '2': NodeOutputKey(key='get_total_amount_sum_6513415006b942bdbad75203aed04e59', output='result'), '3': NodeOutputKey(key='get_total_amount_sum_dc205da7c5cc4059b8b10ba4787de141', output='result'), '4': NodeOutputKey(key='get_total_amount_sum_4ce85af8e4c84cdfabe19a05691dda9a', output='result'), '5': NodeOutputKey(key='get_total_amount_sum_d8553e95e0ca4b1390f240921abd0007', output='result'), '6': NodeOutputKey(key='get_total_amount_sum_6620f973d25a461580614a2e99685a93', output='result'), '7': NodeOutputKey(key='get_total_amount_sum_8487ed4445f6459b909e501d9e4bd0aa', output='result'), '8': NodeOutputKey(key='get_total_amount_sum_6661917548034435b9693327fdb97623', output='result'), '9': NodeOutputKey(key='get_total_amount_sum_a4af758ab3844b63b7de85fff5a3172a', output='result'), '10': NodeOutputKey(key='get_total_amount_sum_23ffa717da794003984e34ecb82d802a', output='result'), '11': NodeOutputKey(key='get_total_amount_sum_1feb5a75eeca467e933239c5d3d4f732', output='result')})}, outputs={OutputKey(key='result'): NodeOutputKey(key='add_eff00a44974644dfb8071545ddb5dd37', output='result')})
Graphs can also be converted to a task graph using to_dict()
. Notice that all arguments must be provided in order to convert the graph to a task graph:
[6]:
nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dict(unit=1_000_000)
[6]:
({'const_Const_22bddfa577ea4c858eb8e0aeac5ca4e0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet',
'const_Const_c37d2ea7a594424285bb160554d365a4': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet',
'const_Const_19b894a119d74cfab8520f32cbae8f75': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet',
'const_Const_d2eca83adc30467ba1b6d5cf7d46093e': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-04.parquet',
'const_Const_7ae7fd2841234a5a8f7386b4788384e0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-05.parquet',
'const_Const_aab19f56cf3b4d51b3f0532b7485d536': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-06.parquet',
'const_Const_736a1f5be50f42ab89a32325f9db50c0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-07.parquet',
'const_Const_104e0221d61945adb7f11268f0f9fdee': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-08.parquet',
'const_Const_4f086769cc3245558798fc5b4fdb5d0f': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-09.parquet',
'const_Const_ba852608eb694de0958d676b3b430940': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet',
'const_Const_e9690dfc980e443ca1f321a3f864458e': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-11.parquet',
'const_Const_49f572270b10496fafd002f5f2344c45': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-12.parquet',
'input_unit_bb96debe1c5f480f93589e836f4e156c': 1000000,
'node_output_get_total_amount_sum_6fee4a6518764247b8de2d249272810a': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_22bddfa577ea4c858eb8e0aeac5ca4e0',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_ae374f2e96004f84b95f55d8ac8a08ac': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_c37d2ea7a594424285bb160554d365a4',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_c40297479e064da39ca6ad7a615fa618': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_19b894a119d74cfab8520f32cbae8f75',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_0740847276ba4fa9be487c49d8a0e2da': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_d2eca83adc30467ba1b6d5cf7d46093e',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_b0848d68bd1043bebb788c6245fc7b51': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_7ae7fd2841234a5a8f7386b4788384e0',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_17ea5faef84c4864a801276eed9541a0': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_aab19f56cf3b4d51b3f0532b7485d536',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_bd00359f3d564e99856f322eb504db89': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_736a1f5be50f42ab89a32325f9db50c0',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_b8d1af4dbbad4c74918815f33f9d4909': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_104e0221d61945adb7f11268f0f9fdee',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_0ea7100e98c74dcd8b41ef79999be830': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_4f086769cc3245558798fc5b4fdb5d0f',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_f2c987ca4fd54ca18e3dc459dabd38cb': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_ba852608eb694de0958d676b3b430940',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_5de103642b9b406d9342d3a6b129f2da': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_e9690dfc980e443ca1f321a3f864458e',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_41bc34d152dd47459219c0639cfd7ca7': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_49f572270b10496fafd002f5f2344c45',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_add_a5836b630b1c433d898a2aedd7761e55': (<function __main__.add(*args)>,
'node_output_get_total_amount_sum_6fee4a6518764247b8de2d249272810a',
'node_output_get_total_amount_sum_ae374f2e96004f84b95f55d8ac8a08ac',
'node_output_get_total_amount_sum_c40297479e064da39ca6ad7a615fa618',
'node_output_get_total_amount_sum_0740847276ba4fa9be487c49d8a0e2da',
'node_output_get_total_amount_sum_b0848d68bd1043bebb788c6245fc7b51',
'node_output_get_total_amount_sum_17ea5faef84c4864a801276eed9541a0',
'node_output_get_total_amount_sum_bd00359f3d564e99856f322eb504db89',
'node_output_get_total_amount_sum_b8d1af4dbbad4c74918815f33f9d4909',
'node_output_get_total_amount_sum_0ea7100e98c74dcd8b41ef79999be830',
'node_output_get_total_amount_sum_f2c987ca4fd54ca18e3dc459dabd38cb',
'node_output_get_total_amount_sum_5de103642b9b406d9342d3a6b129f2da',
'node_output_get_total_amount_sum_41bc34d152dd47459219c0639cfd7ca7')},
['node_output_add_a5836b630b1c433d898a2aedd7761e55'])
Visualizing graphs
The above outputs are hard to interpret and only particularly useful for a computer. Let’s write a simple helper function to visualize the graphs as dot graphs.
[7]:
from IPython.display import SVG
def get_graph_image(dot_graph):
dot_graph.write_svg("temp.svg")
return SVG("temp.svg")
Using to_dot()
, we can generate a dot graph:
[8]:
get_graph_image(nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dot())
[8]:
If we only want to see the node relations, we can pass the arguments no_input
, no_const
, and no_output
to make the graph less noisy:
[9]:
get_graph_image(
nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dot(
no_input=True, no_const=True, no_output=True
)
)
[9]:
Although not very useful, we can also generate and visualize a graph from a delayed
function:
[10]:
get_graph_image(get_total_amount_sum.to_graph().to_dot())
[10]:
Nested graphs
What if we want to compose multiple graphs into one bigger graph? We can do that by nesting graph functions!
[11]:
URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02}.parquet"
@graph
def nyc_yellow_taxi_fare_total_by_year(years, unit):
"""
Calculate the total fare amount for multiple years
:param years: A list of years for which to calculate the total fare amount
:param unit: The unit to divide the total amount sum by
:return: Total fare amounts for the given years
"""
results = []
for year in years:
results.append(year)
results.append(nyc_yellow_taxi_fare_total_for_year(year, unit))
return collect_result(*results)
Notice that when we visualize the graph that the nyc_yellow_taxi_fare_total_for_year
function is now a node in the graph:
[12]:
get_graph_image(
nyc_yellow_taxi_fare_total_by_year.to_graph(years=range(2010, 2025)).to_dot()
)
[12]:
Executing graphs
First, we need to generate the task graph:
[13]:
task_graph, keys = nyc_yellow_taxi_fare_total_by_year.to_graph(
years=range(2024, 2025)
).to_dict(unit=1_000_000)
Then we can execute it using pargraph.GraphEngine
:
[14]:
from pargraph import GraphEngine
graph_engine = GraphEngine()
result = graph_engine.get(task_graph, keys)
And the result is:
[15]:
result
[15]:
[2024 1145.869126
dtype: float64]
The results are returned as a list according to the keys structure:
[16]:
keys
[16]:
['node_output_collect_result_559f87ac40c44e1d96c6c4677e9922d1']
Hence, we can extract our result by indexing the first element:
[17]:
result[0]
[17]:
2024 1145.869126
dtype: float64