Walkthrough
In this walkthrough, we’ll create our very own graphs using Pargraph.
Defining delayed
functions
delayed
functions are pure and inseparable. These functions can be composed to implement graph functions and their pure nature allows them to be scheduled and executed concurrently across cores and/or hosts.
In a graph, delayed
functions represent the nodes and the edges represent the dependencies between them.
Example: NYC Yellow Taxi Fare Totals by Year
NYC Taxi and Limousine Commission publishes data of every taxi trip since 2009. Each data file is in the Parquet format and contains each trip taken in a given month. Loading each file and processing them sequentially takes a very long time, so to speed it up we would like to load and process the files in parallel then perform an aggregation afterward.
Here are some functions that we will need, defined as delayed
functions:
[1]:
import pandas as pd
from pargraph import delayed
@delayed
def get_total_amount_sum(file_url, unit):
"""
Calculate the sum of the 'total_amount' column from a Parquet file and divide it by a given unit.
:param file_url: The URL of the Parquet file
:param unit: The unit to divide the total amount sum by
:return: The sum of the 'total_amount' column divided by the given unit
"""
return (
pd.read_parquet(file_url, columns=["total_amount"])["total_amount"].sum() / unit
)
@delayed
def add(*args):
"""
Calculate the sum of all the arguments provided.
:param args: A variable number of arguments to be summed
:return: The sum of all arguments
"""
return sum(args)
@delayed
def collect_result(*results):
"""
Collect results into a pandas Series, using every other element as the index and the remaining elements as the values.
:param results: A variable number of arguments where even-indexed elements are used as the index and odd-indexed elements as the values
:return: A pandas Series with the specified index and values
"""
return pd.Series(results[1::2], index=results[0::2])
A delayed
function retains the same call semantics as a normal Python function:
[2]:
add(1, 2)
[2]:
3
Defining graph
functions
graph
functions contain a combination of graph
and delayed
functions. graph
functions are to delayed
functions as molecules are to atoms.
[3]:
from pargraph import graph
URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02}.parquet"
@graph
def nyc_yellow_taxi_fare_total_for_year(year, unit):
"""
Calculate the total fare amount for a given year by summing the total amounts from each month.
:param year: The year for which to calculate the total fare amount
:param unit: The unit to divide the total amount sum by
:return: Total fare amount for the given year
"""
return add(
*(
get_total_amount_sum(URL.format(year=year, month=month), unit)
for month in range(1, 13)
)
)
A graph
function retains the same call semantics as a normal Python function:
[4]:
nyc_yellow_taxi_fare_total_for_year(year=2024, unit=1_000_000)
[4]:
1145.8691257199998
Converting to graphs
Using to_graph()
, we can generate a graph representation of the pipeline function we’ve created:
[5]:
nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024)
[5]:
Graph(consts={ConstKey(key='_2b4a8a0d1c4540bcba120bf22b548b33'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMS5wYXJxdWV0'), ConstKey(key='_dfb19e5cdf664641a82537a3fb00746c'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMi5wYXJxdWV0'), ConstKey(key='_1bab762fa4bf4f5cb2800bf3db7d2b75'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMy5wYXJxdWV0'), ConstKey(key='_ff769f99997b406583c8ddbbc9d9dd0e'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNC5wYXJxdWV0'), ConstKey(key='_75fb6201a22f46fc9876eb1735b63546'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNS5wYXJxdWV0'), ConstKey(key='_2f8cff99612c4bd09c71e967241e7fbd'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNi5wYXJxdWV0'), ConstKey(key='_990a0d33f4c44747818dc5806943a1b1'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNy5wYXJxdWV0'), ConstKey(key='_91614fbdd23841fb9186e45fdadc61c0'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wOC5wYXJxdWV0'), ConstKey(key='_179683cff2b94209a7097645c2587b49'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wOS5wYXJxdWV0'), ConstKey(key='_f2cf83d1721741aabfe3b0dce420379e'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMC5wYXJxdWV0'), ConstKey(key='_fbfe7b630c374bdd93c911d743dcf7eb'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMS5wYXJxdWV0'), ConstKey(key='_9f13bd84b7f240cdb31d67408fc69185'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMi5wYXJxdWV0')}, inputs={InputKey(key='unit'): None}, nodes={NodeKey(key='get_total_amount_sum_2d8dce044dc94629ae978f471e9a4b27'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_2b4a8a0d1c4540bcba120bf22b548b33'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_5fd6d6d8a5f44649ad9e18325fa1d5fe'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_dfb19e5cdf664641a82537a3fb00746c'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6513415006b942bdbad75203aed04e59'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_1bab762fa4bf4f5cb2800bf3db7d2b75'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_dc205da7c5cc4059b8b10ba4787de141'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_ff769f99997b406583c8ddbbc9d9dd0e'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_4ce85af8e4c84cdfabe19a05691dda9a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_75fb6201a22f46fc9876eb1735b63546'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_d8553e95e0ca4b1390f240921abd0007'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_2f8cff99612c4bd09c71e967241e7fbd'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6620f973d25a461580614a2e99685a93'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_990a0d33f4c44747818dc5806943a1b1'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_8487ed4445f6459b909e501d9e4bd0aa'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_91614fbdd23841fb9186e45fdadc61c0'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6661917548034435b9693327fdb97623'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_179683cff2b94209a7097645c2587b49'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_a4af758ab3844b63b7de85fff5a3172a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_f2cf83d1721741aabfe3b0dce420379e'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_23ffa717da794003984e34ecb82d802a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_fbfe7b630c374bdd93c911d743dcf7eb'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_1feb5a75eeca467e933239c5d3d4f732'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_9f13bd84b7f240cdb31d67408fc69185'), 'unit': InputKey(key='unit')}), NodeKey(key='add_eff00a44974644dfb8071545ddb5dd37'): FunctionCall(function=<function add at 0x000001AAF37E09A0>, args={'0': NodeOutputKey(key='get_total_amount_sum_2d8dce044dc94629ae978f471e9a4b27', output='result'), '1': NodeOutputKey(key='get_total_amount_sum_5fd6d6d8a5f44649ad9e18325fa1d5fe', output='result'), '2': NodeOutputKey(key='get_total_amount_sum_6513415006b942bdbad75203aed04e59', output='result'), '3': NodeOutputKey(key='get_total_amount_sum_dc205da7c5cc4059b8b10ba4787de141', output='result'), '4': NodeOutputKey(key='get_total_amount_sum_4ce85af8e4c84cdfabe19a05691dda9a', output='result'), '5': NodeOutputKey(key='get_total_amount_sum_d8553e95e0ca4b1390f240921abd0007', output='result'), '6': NodeOutputKey(key='get_total_amount_sum_6620f973d25a461580614a2e99685a93', output='result'), '7': NodeOutputKey(key='get_total_amount_sum_8487ed4445f6459b909e501d9e4bd0aa', output='result'), '8': NodeOutputKey(key='get_total_amount_sum_6661917548034435b9693327fdb97623', output='result'), '9': NodeOutputKey(key='get_total_amount_sum_a4af758ab3844b63b7de85fff5a3172a', output='result'), '10': NodeOutputKey(key='get_total_amount_sum_23ffa717da794003984e34ecb82d802a', output='result'), '11': NodeOutputKey(key='get_total_amount_sum_1feb5a75eeca467e933239c5d3d4f732', output='result')})}, outputs={OutputKey(key='result'): NodeOutputKey(key='add_eff00a44974644dfb8071545ddb5dd37', output='result')})
Graphs can also be converted to a task graph using to_dict()
. Notice that all arguments must be provided in order to convert the graph to a task graph:
[6]:
nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dict(unit=1_000_000)
[6]:
({'const_Const_22bddfa577ea4c858eb8e0aeac5ca4e0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet',
'const_Const_c37d2ea7a594424285bb160554d365a4': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet',
'const_Const_19b894a119d74cfab8520f32cbae8f75': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet',
'const_Const_d2eca83adc30467ba1b6d5cf7d46093e': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-04.parquet',
'const_Const_7ae7fd2841234a5a8f7386b4788384e0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-05.parquet',
'const_Const_aab19f56cf3b4d51b3f0532b7485d536': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-06.parquet',
'const_Const_736a1f5be50f42ab89a32325f9db50c0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-07.parquet',
'const_Const_104e0221d61945adb7f11268f0f9fdee': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-08.parquet',
'const_Const_4f086769cc3245558798fc5b4fdb5d0f': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-09.parquet',
'const_Const_ba852608eb694de0958d676b3b430940': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet',
'const_Const_e9690dfc980e443ca1f321a3f864458e': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-11.parquet',
'const_Const_49f572270b10496fafd002f5f2344c45': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-12.parquet',
'input_unit_bb96debe1c5f480f93589e836f4e156c': 1000000,
'node_output_get_total_amount_sum_6fee4a6518764247b8de2d249272810a': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_22bddfa577ea4c858eb8e0aeac5ca4e0',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_ae374f2e96004f84b95f55d8ac8a08ac': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_c37d2ea7a594424285bb160554d365a4',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_c40297479e064da39ca6ad7a615fa618': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_19b894a119d74cfab8520f32cbae8f75',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_0740847276ba4fa9be487c49d8a0e2da': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_d2eca83adc30467ba1b6d5cf7d46093e',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_b0848d68bd1043bebb788c6245fc7b51': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_7ae7fd2841234a5a8f7386b4788384e0',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_17ea5faef84c4864a801276eed9541a0': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_aab19f56cf3b4d51b3f0532b7485d536',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_bd00359f3d564e99856f322eb504db89': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_736a1f5be50f42ab89a32325f9db50c0',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_b8d1af4dbbad4c74918815f33f9d4909': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_104e0221d61945adb7f11268f0f9fdee',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_0ea7100e98c74dcd8b41ef79999be830': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_4f086769cc3245558798fc5b4fdb5d0f',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_f2c987ca4fd54ca18e3dc459dabd38cb': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_ba852608eb694de0958d676b3b430940',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_5de103642b9b406d9342d3a6b129f2da': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_e9690dfc980e443ca1f321a3f864458e',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_get_total_amount_sum_41bc34d152dd47459219c0639cfd7ca7': (<function __main__.get_total_amount_sum(file_url, unit)>,
'const_Const_49f572270b10496fafd002f5f2344c45',
'input_unit_bb96debe1c5f480f93589e836f4e156c'),
'node_output_add_a5836b630b1c433d898a2aedd7761e55': (<function __main__.add(*args)>,
'node_output_get_total_amount_sum_6fee4a6518764247b8de2d249272810a',
'node_output_get_total_amount_sum_ae374f2e96004f84b95f55d8ac8a08ac',
'node_output_get_total_amount_sum_c40297479e064da39ca6ad7a615fa618',
'node_output_get_total_amount_sum_0740847276ba4fa9be487c49d8a0e2da',
'node_output_get_total_amount_sum_b0848d68bd1043bebb788c6245fc7b51',
'node_output_get_total_amount_sum_17ea5faef84c4864a801276eed9541a0',
'node_output_get_total_amount_sum_bd00359f3d564e99856f322eb504db89',
'node_output_get_total_amount_sum_b8d1af4dbbad4c74918815f33f9d4909',
'node_output_get_total_amount_sum_0ea7100e98c74dcd8b41ef79999be830',
'node_output_get_total_amount_sum_f2c987ca4fd54ca18e3dc459dabd38cb',
'node_output_get_total_amount_sum_5de103642b9b406d9342d3a6b129f2da',
'node_output_get_total_amount_sum_41bc34d152dd47459219c0639cfd7ca7')},
['node_output_add_a5836b630b1c433d898a2aedd7761e55'])
Visualizing graphs
The above outputs are hard to interpret and only particularly useful for a computer. Let’s write a simple helper function to visualize the dot graphs we generate:
[7]:
from IPython.display import SVG
def get_graph_image(dot_graph):
dot_graph.write_svg("temp.svg")
return SVG("temp.svg")
Using to_dot()
, we can generate a dot graph:
[8]:
get_graph_image(nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dot())
[8]:
If we only want to see the node relations, we can pass the arguments no_input
, no_const
, and no_output
to make the graph less noisy:
[9]:
get_graph_image(
nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dot(
no_input=True, no_const=True, no_output=True
)
)
[9]:
Although not very useful, we can also generate and visualize a graph from a delayed
function:
[10]:
get_graph_image(get_total_amount_sum.to_graph().to_dot())
[10]:
Nested graphs
What if we want to compose multiple graphs into one bigger graph? We can do that by nesting graph functions!
[11]:
URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02}.parquet"
@graph
def nyc_yellow_taxi_fare_total_by_year(years, unit):
"""
Calculate the total fare amount for multiple years
:param years: A list of years for which to calculate the total fare amount
:param unit: The unit to divide the total amount sum by
:return: Total fare amounts for the given years
"""
results = []
for year in years:
results.append(year)
results.append(nyc_yellow_taxi_fare_total_for_year(year, unit))
return collect_result(*results)
You’ll notice that when we visualize the graph that the nyc_yellow_taxi_fare_total_for_year
function is now a node in the graph:
[12]:
get_graph_image(
nyc_yellow_taxi_fare_total_by_year.to_graph(years=range(2010, 2025)).to_dot()
)
[12]:
Executing graphs
First, we need to generate the task graph:
[13]:
task_graph, keys = nyc_yellow_taxi_fare_total_by_year.to_graph(
years=range(2024, 2025)
).to_dict(unit=1_000_000)
Using pargraph.GraphEngine
:
[14]:
from pargraph import GraphEngine
graph_engine = GraphEngine()
result = graph_engine.get(task_graph, keys)
The results are:
[15]:
result
[15]:
[2024 1145.869126
dtype: float64]
The results are returned as a list due to the keys structure:
[16]:
keys
[16]:
['node_output_collect_result_559f87ac40c44e1d96c6c4677e9922d1']
Hence, if you want to get the actual result, you can access the first element:
[17]:
result[0]
[17]:
2024 1145.869126
dtype: float64