Walkthrough

In this walkthrough, we’ll create some graph computations with Pargraph and analyze NYC taxi data.

Defining delayed functions

delayed functions are pure, inseperable functions used to compose a graph computation. The pure nature of these functions allows them to be scheduled and executed concurrently across cores and machines.

In a graph computation, delayed functions are the nodes and the edges represent the dependencies between delayed functions.

Example: NYC Yellow Taxi Fare Totals by Year

The NYC Taxi and Limousine Commission publishes data for every taxi trip since 2009. Each data file is in the Parquet format and contains every trip taken in a given month. Loading each file and processing them sequentially takes a very long time, so to speed it up we would like to load and process the files in parallel then perform an aggregation afterward.

Here are some functions that we will need, defined as delayed functions:

[1]:
import pandas as pd
from pargraph import delayed


@delayed
def get_total_amount_sum(file_url, unit):
    """
    Calculate the sum of the 'total_amount' column from a Parquet file and divide it by a given unit.

    :param file_url: The URL of the Parquet file
    :param unit: The unit to divide the total amount sum by
    :return: The sum of the 'total_amount' column divided by the given unit
    """
    return (
        pd.read_parquet(file_url, columns=["total_amount"])["total_amount"].sum() / unit
    )


@delayed
def add(*args):
    """
    Calculate the sum of all the arguments provided.

    :param args: A variable number of arguments to be summed
    :return: The sum of all arguments
    """
    return sum(args)


@delayed
def collect_result(*results):
    """
    Collect results into a pandas Series, using every other element as the index and the remaining elements as the values.

    :param results: A variable number of arguments where even-indexed elements are used as the index and odd-indexed elements as the values
    :return: A pandas Series with the specified index and values
    """
    return pd.Series(results[1::2], index=results[0::2])

A delayed function retains the same call semantics as a normal Python function:

[2]:
add(1, 2)
[2]:
3

Defining graph functions

graph functions compose a graph by calling delayed functions and other graph functions. graph functions are to delayed functions as molecules are to atoms.

[3]:
from pargraph import graph

URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02}.parquet"


@graph
def nyc_yellow_taxi_fare_total_for_year(year, unit):
    """
    Calculate the total fare amount for a given year by summing the total amounts from each month.

    :param year: The year for which to calculate the total fare amount
    :param unit: The unit to divide the total amount sum by
    :return: Total fare amount for the given year
    """
    return add(
        *(
            get_total_amount_sum(URL.format(year=year, month=month), unit)
            for month in range(1, 13)
        )
    )

A graph function retains the same call semantics as a normal Python function:

[4]:
nyc_yellow_taxi_fare_total_for_year(year=2024, unit=1_000_000)
[4]:
1145.8691257199998

Converting to graphs

Using to_graph(), we can generate a graph representation of the function.

[5]:
nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024)
[5]:
Graph(consts={ConstKey(key='_2b4a8a0d1c4540bcba120bf22b548b33'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMS5wYXJxdWV0'), ConstKey(key='_dfb19e5cdf664641a82537a3fb00746c'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMi5wYXJxdWV0'), ConstKey(key='_1bab762fa4bf4f5cb2800bf3db7d2b75'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wMy5wYXJxdWV0'), ConstKey(key='_ff769f99997b406583c8ddbbc9d9dd0e'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNC5wYXJxdWV0'), ConstKey(key='_75fb6201a22f46fc9876eb1735b63546'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNS5wYXJxdWV0'), ConstKey(key='_2f8cff99612c4bd09c71e967241e7fbd'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNi5wYXJxdWV0'), ConstKey(key='_990a0d33f4c44747818dc5806943a1b1'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wNy5wYXJxdWV0'), ConstKey(key='_91614fbdd23841fb9186e45fdadc61c0'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wOC5wYXJxdWV0'), ConstKey(key='_179683cff2b94209a7097645c2587b49'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0wOS5wYXJxdWV0'), ConstKey(key='_f2cf83d1721741aabfe3b0dce420379e'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMC5wYXJxdWV0'), ConstKey(key='_fbfe7b630c374bdd93c911d743dcf7eb'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMS5wYXJxdWV0'), ConstKey(key='_9f13bd84b7f240cdb31d67408fc69185'): Const(type='msgpack', value='2U9odHRwczovL2QzN2NpNnZ6dXJ5Y2h4LmNsb3VkZnJvbnQubmV0L3RyaXAtZGF0YS95ZWxsb3dfdHJpcGRhdGFfMjAyNC0xMi5wYXJxdWV0')}, inputs={InputKey(key='unit'): None}, nodes={NodeKey(key='get_total_amount_sum_2d8dce044dc94629ae978f471e9a4b27'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_2b4a8a0d1c4540bcba120bf22b548b33'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_5fd6d6d8a5f44649ad9e18325fa1d5fe'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_dfb19e5cdf664641a82537a3fb00746c'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6513415006b942bdbad75203aed04e59'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_1bab762fa4bf4f5cb2800bf3db7d2b75'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_dc205da7c5cc4059b8b10ba4787de141'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_ff769f99997b406583c8ddbbc9d9dd0e'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_4ce85af8e4c84cdfabe19a05691dda9a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_75fb6201a22f46fc9876eb1735b63546'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_d8553e95e0ca4b1390f240921abd0007'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_2f8cff99612c4bd09c71e967241e7fbd'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6620f973d25a461580614a2e99685a93'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_990a0d33f4c44747818dc5806943a1b1'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_8487ed4445f6459b909e501d9e4bd0aa'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_91614fbdd23841fb9186e45fdadc61c0'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_6661917548034435b9693327fdb97623'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_179683cff2b94209a7097645c2587b49'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_a4af758ab3844b63b7de85fff5a3172a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_f2cf83d1721741aabfe3b0dce420379e'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_23ffa717da794003984e34ecb82d802a'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_fbfe7b630c374bdd93c911d743dcf7eb'), 'unit': InputKey(key='unit')}), NodeKey(key='get_total_amount_sum_1feb5a75eeca467e933239c5d3d4f732'): FunctionCall(function=<function get_total_amount_sum at 0x000001AAC2FB3A60>, args={'file_url': ConstKey(key='_9f13bd84b7f240cdb31d67408fc69185'), 'unit': InputKey(key='unit')}), NodeKey(key='add_eff00a44974644dfb8071545ddb5dd37'): FunctionCall(function=<function add at 0x000001AAF37E09A0>, args={'0': NodeOutputKey(key='get_total_amount_sum_2d8dce044dc94629ae978f471e9a4b27', output='result'), '1': NodeOutputKey(key='get_total_amount_sum_5fd6d6d8a5f44649ad9e18325fa1d5fe', output='result'), '2': NodeOutputKey(key='get_total_amount_sum_6513415006b942bdbad75203aed04e59', output='result'), '3': NodeOutputKey(key='get_total_amount_sum_dc205da7c5cc4059b8b10ba4787de141', output='result'), '4': NodeOutputKey(key='get_total_amount_sum_4ce85af8e4c84cdfabe19a05691dda9a', output='result'), '5': NodeOutputKey(key='get_total_amount_sum_d8553e95e0ca4b1390f240921abd0007', output='result'), '6': NodeOutputKey(key='get_total_amount_sum_6620f973d25a461580614a2e99685a93', output='result'), '7': NodeOutputKey(key='get_total_amount_sum_8487ed4445f6459b909e501d9e4bd0aa', output='result'), '8': NodeOutputKey(key='get_total_amount_sum_6661917548034435b9693327fdb97623', output='result'), '9': NodeOutputKey(key='get_total_amount_sum_a4af758ab3844b63b7de85fff5a3172a', output='result'), '10': NodeOutputKey(key='get_total_amount_sum_23ffa717da794003984e34ecb82d802a', output='result'), '11': NodeOutputKey(key='get_total_amount_sum_1feb5a75eeca467e933239c5d3d4f732', output='result')})}, outputs={OutputKey(key='result'): NodeOutputKey(key='add_eff00a44974644dfb8071545ddb5dd37', output='result')})

Graphs can also be converted to a task graph using to_dict(). Notice that all arguments must be provided in order to convert the graph to a task graph:

[6]:
nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dict(unit=1_000_000)
[6]:
({'const_Const_22bddfa577ea4c858eb8e0aeac5ca4e0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet',
  'const_Const_c37d2ea7a594424285bb160554d365a4': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet',
  'const_Const_19b894a119d74cfab8520f32cbae8f75': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet',
  'const_Const_d2eca83adc30467ba1b6d5cf7d46093e': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-04.parquet',
  'const_Const_7ae7fd2841234a5a8f7386b4788384e0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-05.parquet',
  'const_Const_aab19f56cf3b4d51b3f0532b7485d536': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-06.parquet',
  'const_Const_736a1f5be50f42ab89a32325f9db50c0': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-07.parquet',
  'const_Const_104e0221d61945adb7f11268f0f9fdee': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-08.parquet',
  'const_Const_4f086769cc3245558798fc5b4fdb5d0f': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-09.parquet',
  'const_Const_ba852608eb694de0958d676b3b430940': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet',
  'const_Const_e9690dfc980e443ca1f321a3f864458e': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-11.parquet',
  'const_Const_49f572270b10496fafd002f5f2344c45': 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-12.parquet',
  'input_unit_bb96debe1c5f480f93589e836f4e156c': 1000000,
  'node_output_get_total_amount_sum_6fee4a6518764247b8de2d249272810a': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_22bddfa577ea4c858eb8e0aeac5ca4e0',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_ae374f2e96004f84b95f55d8ac8a08ac': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_c37d2ea7a594424285bb160554d365a4',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_c40297479e064da39ca6ad7a615fa618': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_19b894a119d74cfab8520f32cbae8f75',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_0740847276ba4fa9be487c49d8a0e2da': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_d2eca83adc30467ba1b6d5cf7d46093e',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_b0848d68bd1043bebb788c6245fc7b51': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_7ae7fd2841234a5a8f7386b4788384e0',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_17ea5faef84c4864a801276eed9541a0': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_aab19f56cf3b4d51b3f0532b7485d536',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_bd00359f3d564e99856f322eb504db89': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_736a1f5be50f42ab89a32325f9db50c0',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_b8d1af4dbbad4c74918815f33f9d4909': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_104e0221d61945adb7f11268f0f9fdee',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_0ea7100e98c74dcd8b41ef79999be830': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_4f086769cc3245558798fc5b4fdb5d0f',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_f2c987ca4fd54ca18e3dc459dabd38cb': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_ba852608eb694de0958d676b3b430940',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_5de103642b9b406d9342d3a6b129f2da': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_e9690dfc980e443ca1f321a3f864458e',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_get_total_amount_sum_41bc34d152dd47459219c0639cfd7ca7': (<function __main__.get_total_amount_sum(file_url, unit)>,
   'const_Const_49f572270b10496fafd002f5f2344c45',
   'input_unit_bb96debe1c5f480f93589e836f4e156c'),
  'node_output_add_a5836b630b1c433d898a2aedd7761e55': (<function __main__.add(*args)>,
   'node_output_get_total_amount_sum_6fee4a6518764247b8de2d249272810a',
   'node_output_get_total_amount_sum_ae374f2e96004f84b95f55d8ac8a08ac',
   'node_output_get_total_amount_sum_c40297479e064da39ca6ad7a615fa618',
   'node_output_get_total_amount_sum_0740847276ba4fa9be487c49d8a0e2da',
   'node_output_get_total_amount_sum_b0848d68bd1043bebb788c6245fc7b51',
   'node_output_get_total_amount_sum_17ea5faef84c4864a801276eed9541a0',
   'node_output_get_total_amount_sum_bd00359f3d564e99856f322eb504db89',
   'node_output_get_total_amount_sum_b8d1af4dbbad4c74918815f33f9d4909',
   'node_output_get_total_amount_sum_0ea7100e98c74dcd8b41ef79999be830',
   'node_output_get_total_amount_sum_f2c987ca4fd54ca18e3dc459dabd38cb',
   'node_output_get_total_amount_sum_5de103642b9b406d9342d3a6b129f2da',
   'node_output_get_total_amount_sum_41bc34d152dd47459219c0639cfd7ca7')},
 ['node_output_add_a5836b630b1c433d898a2aedd7761e55'])

Visualizing graphs

The above outputs are hard to interpret and only particularly useful for a computer. Let’s write a simple helper function to visualize the graphs as dot graphs.

[7]:
from IPython.display import SVG


def get_graph_image(dot_graph):
    dot_graph.write_svg("temp.svg")
    return SVG("temp.svg")

Using to_dot(), we can generate a dot graph:

[8]:
get_graph_image(nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dot())
[8]:
_images/walkthrough_17_0.svg

If we only want to see the node relations, we can pass the arguments no_input, no_const, and no_output to make the graph less noisy:

[9]:
get_graph_image(
    nyc_yellow_taxi_fare_total_for_year.to_graph(year=2024).to_dot(
        no_input=True, no_const=True, no_output=True
    )
)
[9]:
_images/walkthrough_19_0.svg

Although not very useful, we can also generate and visualize a graph from a delayed function:

[10]:
get_graph_image(get_total_amount_sum.to_graph().to_dot())
[10]:
_images/walkthrough_21_0.svg

Nested graphs

What if we want to compose multiple graphs into one bigger graph? We can do that by nesting graph functions!

[11]:
URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02}.parquet"


@graph
def nyc_yellow_taxi_fare_total_by_year(years, unit):
    """
    Calculate the total fare amount for multiple years

    :param years: A list of years for which to calculate the total fare amount
    :param unit: The unit to divide the total amount sum by
    :return: Total fare amounts for the given years
    """
    results = []

    for year in years:
        results.append(year)
        results.append(nyc_yellow_taxi_fare_total_for_year(year, unit))

    return collect_result(*results)

Notice that when we visualize the graph that the nyc_yellow_taxi_fare_total_for_year function is now a node in the graph:

[12]:
get_graph_image(
    nyc_yellow_taxi_fare_total_by_year.to_graph(years=range(2010, 2025)).to_dot()
)
[12]:
_images/walkthrough_25_0.svg

Executing graphs

First, we need to generate the task graph:

[13]:
task_graph, keys = nyc_yellow_taxi_fare_total_by_year.to_graph(
    years=range(2024, 2025)
).to_dict(unit=1_000_000)

Then we can execute it using pargraph.GraphEngine:

[14]:
from pargraph import GraphEngine

graph_engine = GraphEngine()
result = graph_engine.get(task_graph, keys)

And the result is:

[15]:
result
[15]:
[2024    1145.869126
 dtype: float64]

The results are returned as a list according to the keys structure:

[16]:
keys
[16]:
['node_output_collect_result_559f87ac40c44e1d96c6c4677e9922d1']

Hence, we can extract our result by indexing the first element:

[17]:
result[0]
[17]:
2024    1145.869126
dtype: float64