Walkthrough

In this walkthrough, we’ll create our very own graphs using Pargraph.

Defining delayed functions

delayed functions are pure and inseparable. These functions can be composed to implement graph functions and their pure nature allows them to be scheduled and executed concurrently across cores and/or hosts.

In a graph, delayed functions represent the nodes and the edges represent the dependencies between them.

Example: NYC Yellow Taxi Fare Totals by Year

NYC Taxi and Limousine Commission publishes data of every taxi trip since 2009. Each data file is in the Parquet format and contains each trip taken in a given month. Loading each file and processing them sequentially takes a very long time, so to speed it up we would like to load and process the files in parallel then perform an aggregation afterward.

Here are some functions that we will need, defined as delayed functions:

[1]:
import pandas as pd
from pargraph import delayed


@delayed
def get_total_amount_sum(file_url, unit):
    """
    Calculate the sum of the 'total_amount' column from a Parquet file and divide it by a given unit.

    :param file_url: The URL of the Parquet file
    :param unit: The unit to divide the total amount sum by
    :return: The sum of the 'total_amount' column divided by the given unit
    """
    return (
        pd.read_parquet(file_url, columns=["total_amount"])["total_amount"].sum() / unit
    )


@delayed
def add(*args):
    """
    Calculate the sum of all the arguments provided.

    :param args: A variable number of arguments to be summed
    :return: The sum of all arguments
    """
    return sum(args)


@delayed
def collect_result(*results):
    """
    Collect results into a pandas Series, using every other element as the index and the remaining elements as the values.

    :param results: A variable number of arguments where even-indexed elements are used as the index and odd-indexed elements as the values
    :return: A pandas Series with the specified index and values
    """
    return pd.Series(results[1::2], index=results[0::2])

A delayed function retains the same call semantics as a normal Python function:

[2]:
add(1, 2)
[2]:
3

Defining graph functions

graph functions contain a combination of graph and delayed functions. graph functions are to delayed functions as molecules are to atoms.

[3]:
from pargraph import graph

URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02}.parquet"


@graph
def nyc_yellow_taxi_fare_total_for_year(year, unit):
    """
    Calculate the total fare amount for a given year by summing the total amounts from each month.

    :param year: The year for which to calculate the total fare amount
    :param unit: The unit to divide the total amount sum by
    :return: Total fare amount for the given year
    """
    return add(
        *(
            get_total_amount_sum(URL.format(year=year, month=month), unit)
            for month in range(1, 13)
        )
    )

A graph function retains the same call semantics as a normal Python function:

[4]:
nyc_yellow_taxi_fare_total_for_year(year=2024, unit=1_000_000)
[4]:
1145.8691257199998

Converting to graphs

Using to_graph(), we can generate a graph representation of the pipeline function we’ve created:

[5]:
nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024)
[5]:
Graph(consts={ConstKey(key='_2b4a8a0d1c4540bcba120bf22b548b33'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMS5wYXJxdWV0'), ConstKey(key='_dfb19e5cdf664641a82537a3fb00746c'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMi5wYXJxdWV0'), ConstKey(key='_1bab762fa4bf4f5cb2800bf3db7d2b75'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMy5wYXJxdWV0'), ConstKey(key='_ff769f99997b406583c8ddbbc9d9dd0e'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNC5wYXJxdWV0'), ConstKey(key='_75fb6201a22f46fc9876eb1735b63546'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNS5wYXJxdWV0'), ConstKey(key='_2f8cff99612c4bd09c71e967241e7fbd'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNi5wYXJxdWV0'), ConstKey(key='_990a0d33f4c44747818dc5806943a1b1'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNy5wYXJxdWV0'), ConstKey(key='_91614fbdd23841fb9186e45fdadc61c0'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wOC5wYXJxdWV0'), ConstKey(key='_179683cff2b94209a7097645c2587b49'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wOS5wYXJxdWV0'), ConstKey(key='_f2cf83d1721741aabfe3b0dce420379e'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMC5wYXJxdWV0'), ConstKey(key='_fbfe7b630c374bdd93c911d743dcf7eb'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMS5wYXJxdWV0'), ConstKey(key='_9f13bd84b7f240cdb31d67408fc69185'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMi5wYXJxdWV0')}, inputs={InputKey(key='unit'): None}, nodes={NodeKey(key='get_total_amount_sum_2d8dce044dc94629ae978f471e9a4b27'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_2b4a8a0d1c4540bcba120bf22b548b33'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_5fd6d6d8a5f44649ad9e18325fa1d5fe'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_dfb19e5cdf664641a82537a3fb00746c'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6513415006b942bdbad75203aed04e59'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_1bab762fa4bf4f5cb2800bf3db7d2b75'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_dc205da7c5cc4059b8b10ba4787de141'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_ff769f99997b406583c8ddbbc9d9dd0e'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_4ce85af8e4c84cdfabe19a05691dda9a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_75fb6201a22f46fc9876eb1735b63546'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_d8553e95e0ca4b1390f240921abd0007'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_2f8cff99612c4bd09c71e967241e7fbd'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6620f973d25a461580614a2e99685a93'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_990a0d33f4c44747818dc5806943a1b1'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_8487ed4445f6459b909e501d9e4bd0aa'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_91614fbdd23841fb9186e45fdadc61c0'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6661917548034435b9693327fdb97623'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_179683cff2b94209a7097645c2587b49'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_a4af758ab3844b63b7de85fff5a3172a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_f2cf83d1721741aabfe3b0dce420379e'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_23ffa717da794003984e34ecb82d802a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_fbfe7b630c374bdd93c911d743dcf7eb'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_1feb5a75eeca467e933239c5d3d4f732'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_9f13bd84b7f240cdb31d67408fc69185'), 'unit': InputKey(key='unit')}), NodeKey(key='add_eff00a44974644dfb8071545ddb5dd37'): FunctionCall(function=<function add at 0x000001AAF37E09A0>, args={'0': NodeOutputKey(key='get_total_amount_sum_2d8dce044dc94629ae978f471e9a4b27', output='result'), '1': NodeOutputKey(key='get_total_amount_sum_5fd6d6d8a5f44649ad9e18325fa1d5fe', output='result'), '2': NodeOutputKey(key='get_total_amount_sum_6513415006b942bdbad75203aed04e59', output='result'), '3': NodeOutputKey(key='get_total_amount_sum_dc205da7c5cc4059b8b10ba4787de141', output='result'), '4': NodeOutputKey(key='get_total_amount_sum_4ce85af8e4c84cdfabe19a05691dda9a', output='result'), '5': NodeOutputKey(key='get_total_amount_sum_d8553e95e0ca4b1390f240921abd0007', output='result'), '6': NodeOutputKey(key='get_total_amount_sum_6620f973d25a461580614a2e99685a93', output='result'), '7': NodeOutputKey(key='get_total_amount_sum_8487ed4445f6459b909e501d9e4bd0aa', output='result'), '8': NodeOutputKey(key='get_total_amount_sum_6661917548034435b9693327fdb97623', output='result'), '9': NodeOutputKey(key='get_total_amount_sum_a4af758ab3844b63b7de85fff5a3172a', output='result'), '10': NodeOutputKey(key='get_total_amount_sum_23ffa717da794003984e34ecb82d802a', output='result'), '11': NodeOutputKey(key='get_total_amount_sum_1feb5a75eeca467e933239c5d3d4f732', output='result')})}, outputs={OutputKey(key='result'): NodeOutputKey(key='add_eff00a44974644dfb8071545ddb5dd37', output='result')})

Graphs can also be converted to a task graph using to_dict(). Notice that all arguments must be provided in order to convert the graph to a task graph:

[6]:
nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dict(unit=1_000_000)
[6]:
({'const_Const_22bddfa577ea4c858eb8e0aeac5ca4e0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet',
  'const_Const_c37d2ea7a594424285bb160554d365a4': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet',
  'const_Const_19b894a119d74cfab8520f32cbae8f75': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet',
  'const_Const_d2eca83adc30467ba1b6d5cf7d46093e': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-04.parquet',
  'const_Const_7ae7fd2841234a5a8f7386b4788384e0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-05.parquet',
  'const_Const_aab19f56cf3b4d51b3f0532b7485d536': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-06.parquet',
  'const_Const_736a1f5be50f42ab89a32325f9db50c0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-07.parquet',
  'const_Const_104e0221d61945adb7f11268f0f9fdee': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-08.parquet',
  'const_Const_4f086769cc3245558798fc5b4fdb5d0f': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-09.parquet',
  'const_Const_ba852608eb694de0958d676b3b430940': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet',
  'const_Const_e9690dfc980e443ca1f321a3f864458e': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-11.parquet',
  'const_Const_49f572270b10496fafd002f5f2344c45': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-12.parquet',
  'input_unit_bb96debe1c5f480f93589e836f4e156c': 1000000,
  'node_output_get_total_amount_sum_6fee4a6518764247b8de2d249272810a': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_22bddfa577ea4c858eb8e0aeac5ca4e0',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_ae374f2e96004f84b95f55d8ac8a08ac': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_c37d2ea7a594424285bb160554d365a4',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_c40297479e064da39ca6ad7a615fa618': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_19b894a119d74cfab8520f32cbae8f75',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_0740847276ba4fa9be487c49d8a0e2da': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_d2eca83adc30467ba1b6d5cf7d46093e',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_b0848d68bd1043bebb788c6245fc7b51': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_7ae7fd2841234a5a8f7386b4788384e0',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_17ea5faef84c4864a801276eed9541a0': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_aab19f56cf3b4d51b3f0532b7485d536',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_bd00359f3d564e99856f322eb504db89': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_736a1f5be50f42ab89a32325f9db50c0',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_b8d1af4dbbad4c74918815f33f9d4909': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_104e0221d61945adb7f11268f0f9fdee',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_0ea7100e98c74dcd8b41ef79999be830': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_4f086769cc3245558798fc5b4fdb5d0f',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_f2c987ca4fd54ca18e3dc459dabd38cb': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_ba852608eb694de0958d676b3b430940',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_5de103642b9b406d9342d3a6b129f2da': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_e9690dfc980e443ca1f321a3f864458e',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_41bc34d152dd47459219c0639cfd7ca7': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_49f572270b10496fafd002f5f2344c45',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_add_a5836b630b1c433d898a2aedd7761e55': (<function __main__.add(*args)>,
   'node_output_get_total_amount_sum_6fee4a6518764247b8de2d249272810a',
   'node_output_get_total_amount_sum_ae374f2e96004f84b95f55d8ac8a08ac',
   'node_output_get_total_amount_sum_c40297479e064da39ca6ad7a615fa618',
   'node_output_get_total_amount_sum_0740847276ba4fa9be487c49d8a0e2da',
   'node_output_get_total_amount_sum_b0848d68bd1043bebb788c6245fc7b51',
   'node_output_get_total_amount_sum_17ea5faef84c4864a801276eed9541a0',
   'node_output_get_total_amount_sum_bd00359f3d564e99856f322eb504db89',
   'node_output_get_total_amount_sum_b8d1af4dbbad4c74918815f33f9d4909',
   'node_output_get_total_amount_sum_0ea7100e98c74dcd8b41ef79999be830',
   'node_output_get_total_amount_sum_f2c987ca4fd54ca18e3dc459dabd38cb',
   'node_output_get_total_amount_sum_5de103642b9b406d9342d3a6b129f2da',
   'node_output_get_total_amount_sum_41bc34d152dd47459219c0639cfd7ca7')},
 ['node_output_add_a5836b630b1c433d898a2aedd7761e55'])

Visualizing graphs

The above outputs are hard to interpret and only particularly useful for a computer. Let’s write a simple helper function to visualize the dot graphs we generate:

[7]:
from IPython.display import SVG


def get_graph_image(dot_graph):
    dot_graph.write_svg("temp.svg")
    return SVG("temp.svg")

Using to_dot(), we can generate a dot graph:

[8]:
get_graph_image(nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dot())
[8]:
_images/walkthrough_17_0.svg

If we only want to see the node relations, we can pass the arguments no_input, no_const, and no_output to make the graph less noisy:

[9]:
get_graph_image(
    nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dot(
        no_input=True, no_const=True, no_output=True
    )
)
[9]:
_images/walkthrough_19_0.svg

Although not very useful, we can also generate and visualize a graph from a delayed function:

[10]:
get_graph_image(get_total_amount_sum.to_graph().to_dot())
[10]:
_images/walkthrough_21_0.svg

Nested graphs

What if we want to compose multiple graphs into one bigger graph? We can do that by nesting graph functions!

[11]:
URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02}.parquet"


@graph
def nyc_yellow_taxi_fare_total_by_year(years, unit):
    """
    Calculate the total fare amount for multiple years

    :param years: A list of years for which to calculate the total fare amount
    :param unit: The unit to divide the total amount sum by
    :return: Total fare amounts for the given years
    """
    results = []

    for year in years:
        results.append(year)
        results.append(nyc_yellow_taxi_fare_total_for_year(year, unit))

    return collect_result(*results)

You’ll notice that when we visualize the graph that the nyc_yellow_taxi_fare_total_for_year function is now a node in the graph:

[12]:
get_graph_image(
    nyc_yellow_taxi_fare_total_by_year.to_graph(years=range(2010, 2025)).to_dot()
)
[12]:
_images/walkthrough_25_0.svg

Executing graphs

First, we need to generate the task graph:

[13]:
task_graph, keys = nyc_yellow_taxi_fare_total_by_year.to_graph(
    years=range(2024, 2025)
).to_dict(unit=1_000_000)

Using pargraph.GraphEngine:

[14]:
from pargraph import GraphEngine

graph_engine = GraphEngine()
result = graph_engine.get(task_graph, keys)

The results are:

[15]:
result
[15]:
[2024    1145.869126
 dtype: float64]

The results are returned as a list due to the keys structure:

[16]:
keys
[16]:
['node_output_collect_result_559f87ac40c44e1d96c6c4677e9922d1']

Hence, if you want to get the actual result, you can access the first element:

[17]:
result[0]
[17]:
2024    1145.869126
dtype: float64