from __future__ import annotations
import re
import sys
from copy import deepcopy
from typing import Any, Dict, Type
import joblib.parallel
from joblib.parallel import (
ParallelBackendBase,
parallel_backend,
register_parallel_backend,
)
if sys.version_info >= (3, 11):
from typing import Self # nocover
else:
from typing_extensions import Self # nocover
class NestedBackendMixin:
"""Mixin class for nested parallel backend.
Overrides `ParallelBackendBase.get_nested_backend` method
to return Self class.
"""
def get_nested_backend(self, *args: Any, **kwargs: Any) -> tuple[Self, None]:
"""Get nested parallel backend.
Returns
-------
tuple[Self, None]
"""
return self.__class__(), None
def _create_nested_backend(
backend_class: type[ParallelBackendBase],
) -> type[ParallelBackendBase]:
"""Dynamically create nested parallel backend class.
Parameters
----------
backend_class : type[ParallelBackendBase]
Base class of parallel backend.
Returns
-------
type[ParallelBackendBase]
Nested parallel backend class.
"""
if issubclass(backend_class, NestedBackendMixin):
return backend_class
return type(
f"Nested{backend_class.__name__}", (NestedBackendMixin, backend_class), {}
)
class _NestedBackendDict(Dict[str, Type[ParallelBackendBase]]):
"""Dict class that dynamically creates and registers
nested parallel backend class with key name "nested-{key}"
if key name is not "nested-{key}".
"""
def __setitem__(self, __key: str, __value: type[ParallelBackendBase]) -> None:
"""Set self[__key] to __value.
If __key is not "nested-{key}",
dynamically create nested parallel backend class
and register it with key name "nested-{key}".
"""
if isinstance(__key, str) and re.match("nested-", __key) is None:
super().__setitem__(f"nested-{__key}", _create_nested_backend(__value))
return super().__setitem__(__key, __value)
[docs]def apply(*, set_default: bool = True, auto_register: bool = True) -> None:
'''Apply `nest_joblib`.
The following joblib specification of not doing nested-parallelism may
be inefficient in an environment with sufficient memory.
`joblib/_parallel_backends.py`:
>>> def get_nested_backend(self):
>>> """Backend instance to be used by nested Parallel calls.
>>>
>>> By default a thread-based backend is used for the first level of
>>> nesting. Beyond, switch to sequential backend to avoid spawning too
>>> many threads on the host.
>>> """
>>> nesting_level = getattr(self, 'nesting_level', 0) + 1
>>> if nesting_level > 1:
>>> return SequentialBackend(nesting_level=nesting_level), None
>>> else:
>>> return ThreadingBackend(nesting_level=nesting_level), None
After calling this function,
when joblib.parallel.register_parallel_backend(name, backend) is called,
a subclass of backend with modified get_nested_backend is
dynamically generated and registered with the name f"nested-{name}".
Parameters
----------
set_default : bool, optional
Whether to set DEFAULT_BACKEND to nested-loky, by default True
auto_register : bool, optional
Whether to automatically register nested parallel backend.
Useful when registering external parallel backend like `ray`
AFTER calling this function.
If False, nested version of already registered parallel backends
will still be registered.
If error occurs, setting this to False may help, by default True
Examples
--------
Using `nested-loky` (nested version of `loky`) as default parallel backend:
>>> import nest_joblib
>>> nest_joblib.apply()
Using `nested-dask` (nested version of another parallel backend `dask`)
as default parallel backend:
>>> from joblib import parallel_backend
>>> from nest_joblib import apply
>>>
>>> apply()
>>> parallel_backend("nested-dask")
Using `nested-ray` (nested version of external parallel backend `ray`)
as default parallel backend:
>>> from joblib import parallel_backend
>>> from nest_joblib import apply
>>> from ray.util.joblib import register_ray
>>>
>>> apply()
>>> register_ray()
>>> parallel_backend("nested-ray")
'''
# BACKENDS
for name, backend in deepcopy(joblib.parallel.BACKENDS).items():
if re.match("nested-", name) is not None:
continue
register_parallel_backend(f"nested-{name}", _create_nested_backend(backend))
# change the type of BACKENDS from dict to _NestedBackendDict
if auto_register:
joblib.parallel.BACKENDS = _NestedBackendDict(joblib.parallel.BACKENDS)
# EXTERNAL_BACKENDS
for name, register_backend in deepcopy(joblib.parallel.EXTERNAL_BACKENDS).items():
if re.match("nested-", name) is not None:
continue
joblib.parallel.EXTERNAL_BACKENDS[f"nested-{name}"] = register_backend
# set DEFAULT_BACKEND to nested-loky
if set_default:
parallel_backend("nested-loky")