Partitioning functions
Interface
- parfun.partition.object.PartitionGenerator
All partitioning functions must return a Python generator of this type.
There are two ways of writing a partitioning functions:
Use regular Python generators (prefered) or iterators, returning partitioned values:
def partition_list_by_chunks(values: List): PartitionGenerator[List]: PARTITION_SIZE = len(values) / 100 for begin in range(0, len(values), PARTITION_SIZE)): yield values[begin:begin + PARTITION_SIZE]
Use partition size aware Python generators, or smart generators. These are more complex but more efficient. Partition size aware generators must get a suggested partition size through the return value of the
yield
statement, and yield partition sizes with its partitioned values:
def partition_list_by_chunks(values: List, constant: int) -> PartitionGenerator[Tuple[List, int]]: # A first empty call to `yield` is required to obtain the first requested partition size requested_partition_size = yield None begin = 0 while begin < len(values): end = min(len(values), begin + requested_partition_size) partition_size = end - begin partition = (values[begin:end], a) # Yield the actual partition along its size, and obtain the requested size for the next partition. requested_partition_size = yield partition_size, partition begin = end
alias of
Union
[Generator
[PartitionType
,None
,None
],Generator
[Optional
[Tuple
[int
,PartitionType
]],int
,None
]]
- parfun.partition.api.all_arguments(partition_with: Callable[[...], Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None]]) Callable[[NamedArguments], Tuple[NamedArguments, Generator[NamedArguments, None, None] | Generator[Tuple[int, NamedArguments] | None, int, None]]]
Applies a single partitioning function to all arguments.
@parfun( split=all_arguments(df_by_group(by=["year", "month"]) ) def func(df_1: pd.DataFrame, df_2: pd.DataFrame): ...
- parfun.partition.api.multiple_arguments(partition_on: Tuple[str, ...] | str, partition_with: Callable[[...], Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None]]) Callable[[NamedArguments], Tuple[NamedArguments, Generator[NamedArguments, None, None] | Generator[Tuple[int, NamedArguments] | None, int, None]]]
Applies a single partitioning function to multiple arguments.
@parfun( split=multiple_arguments( ("df_1", "df_2"), df_by_group(by=["year", "month"]), ) ) def func(df_1: pd.DataFrame, df_2: pd.DataFrame, constant: int): ...
- parfun.partition.api.per_argument(**partition_arg_with: Callable[[...], Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None]]) Callable[[NamedArguments], Tuple[NamedArguments, Generator[NamedArguments, None, None] | Generator[Tuple[int, NamedArguments] | None, int, None]]]
Applies multiple partitioning functions simultaneously on different function arguments, similarly to Python’s
zip()
.@parfun( split=per_argument( df=df_by_row, xs=list_by_chunk, ) ) def func(df: pd.DataFrame, xs: List, constant: int): ...
Collections
A collection of pre-define APIs to help users partition collection data, like list, array, tuple
- parfun.partition.collection.list_by_chunk(*iterables: Iterable[PartitionType]) Generator[Tuple[Iterable[PartitionType], ...], None, None] | Generator[Tuple[int, Tuple[Iterable[PartitionType], ...]] | None, int, None]
Partition one or multiple iterables by chunks of identical sizes.
ls_1 = [1, 2, 3, 4] ls_2 = [1, 4, 9, 16] with_partition_size(list_by_chunk, ls_1, ls_2, partition_size=2)) # [((1, 2), (1, 4)), ((3, 4), (9, 16))]
Dataframes
A collection of pre-define APIs to help users partition dataframe data
- parfun.partition.dataframe.df_by_group(*args, **kwargs) Callable[[...], Generator[DataFrame, None, None] | Generator[Tuple[int, DataFrame] | None, int, None]]
Partitions one or multiple Pandas dataframes by groups of identical numbers of rows, similar to
pandas.DataFrame.groupby()
.See
pandas.DataFrame.groupby()
for function parameters.df_1 = pd.DataFrame({"country": ["USA", "China", "Belgium"], "capital": ["Washington", "Beijing", "Brussels"]}) df_2 = pd.DataFrame({"country": ["USA", "China", "Belgium"], "iso_code": ["US", "CN", "BE"]}) with_partition_size(df_by_group(by="country")(df_1, df_2), partition_size=1) # [( country capital # 2 Belgium Brussels, # country iso_code # 2 Belgium BE), # ( country capital # 1 China Beijing, # country iso_code # 1 China CN), # ( country capital # 0 USA Washington, # country iso_code # 0 USA US)]
- parfun.partition.dataframe.df_by_row(*dfs: DataFrame) Generator[Tuple[DataFrame, ...], None, None] | Generator[Tuple[int, Tuple[DataFrame, ...]] | None, int, None]
Partitions one or multiple Pandas dataframes by rows.
If multiple dataframes are given, these returned partitions will be of identical number of rows.
df_1 = pd.DataFrame(range(0, 5)) df_2 = df_1 ** 2 with_partition_size(df_by_row(df_1, df_2), partition_size=2) # ( 0 # 1 0 # 2 1, # 0 # 1 0 # 2 1), # ( 0 # 3 2 # 4 3, # 0 # 3 4 # 4 9), # ( 0 # 5 4, # 0 # 5 16)]
Utilities
- parfun.partition.utility.with_partition_size(generator: Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None], partition_size: int | Callable[[], int] = 1) Generator[PartitionType, None, None]
Runs a partitioning generator without requiring the partition size estimator.
This function uses the provided partition size input to feed the partitioning generator through Python’s
generator.send()
method, simulating the parallel function’s behaviour.# Runs the `df_by_row` partitioning function with a random partition size generator. with_partition_size( df_by_row(df_1, df_2), partition_size=lambda: random.randint(1, 10) )
- Parameters:
partitions_with – the partitioning generator to execute
partition_size – a constant partition size, or a function generating partition sizes