API
- parfun.parallel(split: ~typing.Callable[[~parfun.kernel.function_signature.NamedArguments], ~typing.Tuple[~parfun.kernel.function_signature.NamedArguments, ~typing.Generator[~parfun.kernel.function_signature.NamedArguments, None, None] | ~typing.Generator[~typing.Tuple[int, ~parfun.kernel.function_signature.NamedArguments] | None, int, None]]], combine_with: ~typing.Callable[[~typing.Iterable[~typing.Any]], ~typing.Any], initial_partition_size: int | ~typing.Callable[[~typing.Any], int] | None = None, fixed_partition_size: int | ~typing.Callable[[~typing.Any], int] | None = None, profile: bool = False, trace_export: str | None = None, partition_size_estimator_factory: ~typing.Callable[[], ~parfun.partition_size_estimator.mixins.PartitionSizeEstimator] = <class 'parfun.partition_size_estimator.linear_regression_estimator.LinearRegessionEstimator'>) Callable
Returns a function decorator that automatically parallelizes a function.
@pf.parallel( split=pf.per_argument( values=pf.py_list.by_chunk, ), combine_with=pf.py_list.concat ) def multiply_by_constant(values: Iterable[int], constant: int): return [v * constant for v in values] # This would be functionally equivalent to running the function inside a single for loop: results = [] for partition in pf.py_list.by_chunk(values): results.append(multiply_by_constant(partition, constant)) return combine_with(results)
- Parameters:
split –
Partition the data based on the provided partitioning function.
See
api
for the list of predefined partitioning functions.combine_with (Callable) – aggregates the results by running the function.
initial_partition_size (int | Callable[[PartitionType], int] | None) –
Overrides the first estimate from the partition size estimator.
If the value is a callable, the function will be provided with the input to be partitioned and shall return the initial partition size to use.
fixed_partition_size (int | Callable[[PartitionType], int] | None) –
Uses a constant partition size and do not run the partition size estimator.
If the value is a callable, the function will be provided with the input to be partitioned and shall return the partition size to use.
profile (bool) – if true, prints additional debugging information about the parallelization overhead.
trace_export (str) – if defined, will export the execution time to the provided CSV file’s path.
partition_size_estimator_factory (Callable[[], PartitionSizeEstimator]) – the partition size estimator class to use
- Returns:
a decorated function
- Return type:
Callable
Backend setup
- parfun.set_parallel_backend(backend: str | BackendEngine, *args, **kwargs) None
Initializes and sets the current parfun backend.
set_parallel_backend("local_multiprocessing", max_workers=4, is_process=False)
- Parameters:
backend (Union[str, BackendEngine]) –
Supported backend options:
"none"
: disable the current parallel backend.Functions decorated with
parfun()
will run sequentially as if not decorated.Partitioning and combining functions will be ignored.
"local_single_process"
: runs the tasks inside the calling Python process.Functions decorated with
parfun()
will partition the input data, and run the combining function on the output data, but will also execute the function inside the calling Python process.Mostly intended for debugging purposes.
"local_multiprocessing"
: runs the tasks in parallel using Pythonmultiprocessing
processes."scaler_local"
: runs the tasks in parallel using an internally managed Scaler cluster.See
ScalerLocalBackend
."scaler_remote"
: runs the tasks in parallel using an externally managed Dask cluster.See
ScalerRemoteBackend
."dask_local"
: runs the tasks in parallel using an internally managed Dask cluster.See
DaskLocalClusterBackend
."dask_remote"
: runs the tasks in parallel using an externally managed Dask cluster.See
DaskRemoteClusterBackend
."dask_current"
: runs the tasks in parallel using the currently running Dask client (get_client()
).See
DaskCurrentBackend
.
args – Additional positional parameters for the backend constructor
kwargs – Additional keyword parameters for the backend constructor.
- Return type:
None
- parfun.set_parallel_backend_context(backend: str | BackendEngine, *args, **kwargs)
Sets a new parallel backend instance in a contextlib’s context.
with set_parallel_backend_context("local_single_processing"): some_parallel_computation()
- Parameters:
backend (Union[str, BackendEngine]) – See
set_parallel_backend()
.
- parfun.get_parallel_backend() BackendEngine | None
- Returns:
the current backend instance, or
None
if no backend is currently set.- Return type:
Optional[BackendEngine]
Backend instances
- class parfun.backend.mixins.BackendEngine
Asynchronous task manager interface.
- abstract allows_nested_tasks() bool
Indicates if Parfun can submit new tasks from other tasks.
- abstract session() BackendSession
Returns a new managed session for submitting tasks.
with backend.session() as session: arg_ref = session.preload_value(arg) future = session.submit(fn, arg_ref) print(future.result())
- abstract shutdown()
Shutdowns all resources required by the backend engine.
- class parfun.backend.mixins.BackendSession
A task submitting session to a backend engine that manages the lifecycle of the task objects (preloaded values, argument values and future objects).
- preload_value(value: Any) Any
Preloads a value to the backend engine.
The returned value will be used when calling
submit()
instead of the original value.
- abstract submit(fn: Callable, *args, **kwargs) ProfiledFuture
Executes an asynchronous computation.
Blocking if no computing resource is available.
- class parfun.backend.local_single_process.LocalSingleProcessBackend
- allows_nested_tasks() bool
Indicates if Parfun can submit new tasks from other tasks.
- session() BackendSession
Returns a new managed session for submitting tasks.
with backend.session() as session: arg_ref = session.preload_value(arg) future = session.submit(fn, arg_ref) print(future.result())
- shutdown()
Shutdowns all resources required by the backend engine.
- class parfun.backend.local_single_process.LocalSingleProcessSession
- submit(fn: Callable, *args, **kwargs) ProfiledFuture
Executes an asynchronous computation.
Blocking if no computing resource is available.
- class parfun.backend.local_multiprocessing.LocalMultiprocessingBackend(max_workers: int = 1, is_process: bool = True, **kwargs)
Concurrent engine that uses Python builtin
multiprocessing
module.- allows_nested_tasks() bool
Indicates if Parfun can submit new tasks from other tasks.
- session() LocalMultiprocessingSession
Returns a new managed session for submitting tasks.
with backend.session() as session: arg_ref = session.preload_value(arg) future = session.submit(fn, arg_ref) print(future.result())
- shutdown(wait=True)
Shutdowns all resources required by the backend engine.
- class parfun.backend.local_multiprocessing.LocalMultiprocessingSession(underlying_executor: Executor)
- submit(fn, *args, **kwargs) ProfiledFuture
Executes an asynchronous computation.
Blocking if no computing resource is available.
- class parfun.backend.dask.DaskBaseBackend(n_workers: int)
- allows_nested_tasks() bool
Indicates if Parfun can submit new tasks from other tasks.
- session() DaskSession
Returns a new managed session for submitting tasks.
with backend.session() as session: arg_ref = session.preload_value(arg) future = session.submit(fn, arg_ref) print(future.result())
- class parfun.backend.dask.DaskCurrentBackend(n_workers: int)
Uses the current Dask worker context to deduce the backend instance.
This backend should be used by Dask’s worker tasks that desire to access the underlying backend instance.
- shutdown()
Shutdowns all resources required by the backend engine.
- class parfun.backend.dask.DaskLocalClusterBackend(n_workers: int = 1, dashboard_address=':33333', memory_limit='100GB')
Creates a Dask cluster on the local machine and uses it as a backend engine.
- shutdown()
Shutdowns all resources required by the backend engine.
- class parfun.backend.dask.DaskRemoteClusterBackend(scheduler_address: str)
Connects to a previously instantiated Dask instance as a backend engine.
- shutdown()
Shutdowns all resources required by the backend engine.
- class parfun.backend.dask.DaskSession(engine: DaskBaseBackend, n_workers: int)
- submit(fn, *args, **kwargs) ProfiledFuture | None
Executes an asynchronous computation.
Blocking if no computing resource is available.
- class parfun.backend.scaler.ScalerLocalBackend(scheduler_address: str | None = None, n_workers: int = 1, per_worker_queue_size: int = 1000, allows_nested_tasks: bool = True, logging_paths: Tuple[str, ...] = ('/dev/stdout',), logging_level: str = 'INFO', logging_config_file: str | None = None, **kwargs)
Creates a Scaler cluster on the local machine and uses it as a backend engine.
- shutdown()
Shutdowns all resources required by the backend engine.
- class parfun.backend.scaler.ScalerRemoteBackend(scheduler_address: str, n_workers: int = 1, allows_nested_tasks: bool = True, **client_kwargs)
Connects to a previously instantiated Scaler instance as a backend engine.
- allows_nested_tasks() bool
Indicates if Parfun can submit new tasks from other tasks.
- session() ScalerSession
Returns a new managed session for submitting tasks.
with backend.session() as session: arg_ref = session.preload_value(arg) future = session.submit(fn, arg_ref) print(future.result())
- shutdown()
Shutdowns all resources required by the backend engine.
- class parfun.backend.scaler.ScalerSession(scheduler_address: str, n_workers: int, client_kwargs: Dict)
- preload_value(value: Any) ObjectReference
Preloads a value to the backend engine.
The returned value will be used when calling
submit()
instead of the original value.
- submit(fn, *args, **kwargs) ProfiledFuture | None
Executes an asynchronous computation.
Blocking if no computing resource is available.
Partitioning
- parfun.partition.object.PartitionGenerator
All partitioning functions must return a Python generator of this type.
There are two ways of writing a partitioning functions:
Use regular Python generators (prefered) or iterators, returning partitioned values:
def partition_list_by_chunks(values: List): PartitionGenerator[List]: PARTITION_SIZE = len(values) / 100 for begin in range(0, len(values), PARTITION_SIZE)): yield values[begin:begin + PARTITION_SIZE]
Use partition size aware Python generators, or smart generators. These are more complex but more efficient. Partition size aware generators must get a suggested partition size through the return value of the
yield
statement, and yield partition sizes with its partitioned values:
def partition_list_by_chunks(values: List, constant: int) -> PartitionGenerator[Tuple[List, int]]: # A first empty call to `yield` is required to obtain the first requested partition size requested_partition_size = yield None begin = 0 while begin < len(values): end = min(len(values), begin + requested_partition_size) partition_size = end - begin partition = (values[begin:end], a) # Yield the actual partition along its size, and obtain the requested size for the next partition. requested_partition_size = yield partition_size, partition begin = end
alias of
Union
[Generator
[PartitionType
,None
,None
],Generator
[Optional
[Tuple
[int
,PartitionType
]],int
,None
]]
- parfun.all_arguments(partition_with: Callable[[...], Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None]]) Callable[[NamedArguments], Tuple[NamedArguments, Generator[NamedArguments, None, None] | Generator[Tuple[int, NamedArguments] | None, int, None]]]
Applies a single partitioning function to all arguments.
@pf.parallel( split=pf.all_arguments(pf.dataframe.by_group(by=["year", "month"]), ... ) def func(df_1: pd.DataFrame, df_2: pd.DataFrame): ...
- parfun.multiple_arguments(partition_on: Tuple[str, ...] | str, partition_with: Callable[[...], Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None]]) Callable[[NamedArguments], Tuple[NamedArguments, Generator[NamedArguments, None, None] | Generator[Tuple[int, NamedArguments] | None, int, None]]]
Applies a single partitioning function to multiple, but not all, arguments.
@pf.parallel( split=pf.multiple_arguments( ("df_1", "df_2"), pf.dataframe.by_group(by=["year", "month"]), ), ... ) def func(df_1: pd.DataFrame, df_2: pd.DataFrame, constant: int): ...
- parfun.per_argument(**partition_arg_with: Callable[[...], Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None]]) Callable[[NamedArguments], Tuple[NamedArguments, Generator[NamedArguments, None, None] | Generator[Tuple[int, NamedArguments] | None, int, None]]]
Applies multiple partitioning functions simultaneously on different function arguments.
@pf.parallel( split=pf.per_argument( df=pf.dataframe.by_row, xs=pf.py_list.by_chunk, ) ) def func(df: pd.DataFrame, xs: List, constant: int): ...
- parfun.partition.utility.with_partition_size(generator: Generator[PartitionType, None, None] | Generator[Tuple[int, PartitionType] | None, int, None], partition_size: int | Callable[[], int] = 1) Generator[PartitionType, None, None]
Runs a partitioning generator without requiring the partition size estimator.
This function uses the provided partition size input to feed the partitioning generator through Python’s
generator.send()
method, simulating the parallel function’s behavior.# Runs the `by_row` partitioning function with a random partition size generator. with_partition_size( pf.dataframe.by_row(df_1, df_2), partition_size=lambda: random.randint(1, 10) )
- Parameters:
partitions_with – the partitioning generator to execute
partition_size – a constant partition size, or a function generating partition sizes
Python lists
A collection of pre-define APIs to help users partition and combine collection, such as lists, arrays or tuples.
- parfun.py_list.by_chunk(*iterables: Iterable[PartitionType]) Generator[Tuple[Iterable[PartitionType], ...], None, None] | Generator[Tuple[int, Tuple[Iterable[PartitionType], ...]] | None, int, None]
Partition one or multiple iterables by chunks of identical sizes.
ls_1 = [1, 2, 3, 4] ls_2 = [1, 4, 9, 16] with_partition_size(by_chunk, ls_1, ls_2, partition_size=2)) # [((1, 2), (1, 4)), ((3, 4), (9, 16))]
- parfun.py_list.concat(values: Iterable[List[ListValue]]) List[ListValue]
Chains a collection of lists in a single list.
concat([[1,2], [3], [4, 5]]) # [1, 2, 3, 4, 5]
Pandas dataframes
A collection of pre-define APIs to partition and combine dataframes.
- parfun.dataframe.by_group(*args, **kwargs) Callable[[...], Generator[DataFrame, None, None] | Generator[Tuple[int, DataFrame] | None, int, None]]
Partitions one or multiple Pandas dataframes by groups of identical numbers of rows, similar to
pandas.DataFrame.groupby()
.See
pandas.DataFrame.groupby()
for function parameters.df_1 = pd.DataFrame({"country": ["USA", "China", "Belgium"], "capital": ["Washington", "Beijing", "Brussels"]}) df_2 = pd.DataFrame({"country": ["USA", "China", "Belgium"], "iso_code": ["US", "CN", "BE"]}) with_partition_size(df_by_group(by="country")(df_1, df_2), partition_size=1) # [( country capital # 2 Belgium Brussels, # country iso_code # 2 Belgium BE), # ( country capital # 1 China Beijing, # country iso_code # 1 China CN), # ( country capital # 0 USA Washington, # country iso_code # 0 USA US)]
- parfun.dataframe.by_row(*dfs: DataFrame) Generator[Tuple[DataFrame, ...], None, None] | Generator[Tuple[int, Tuple[DataFrame, ...]] | None, int, None]
Partitions one or multiple Pandas dataframes by rows.
If multiple dataframes are given, these returned partitions will be of identical number of rows.
df_1 = pd.DataFrame(range(0, 5)) df_2 = df_1 ** 2 with_partition_size(by_row(df_1, df_2), partition_size=2) # ( 0 # 1 0 # 2 1, # 0 # 1 0 # 2 1), # ( 0 # 3 2 # 4 3, # 0 # 3 4 # 4 9), # ( 0 # 5 4, # 0 # 5 16)]
- parfun.dataframe.concat(dfs: Iterable[DataFrame]) DataFrame
Similar to
pandas.concat()
.df_1 = pd.DataFrame([1,2,3]) df_2 = pd.DataFrame([4,5,6]) print(concat([df_1, df_2])) # 0 # 0 1 # 1 2 # 2 3 # 3 4 # 4 5 # 5 6