Single Machine: dask.distributed¶
The dask.distributed
scheduler works well on a single machine. It is sometimes
preferred over the default scheduler for the following reasons:
It provides access to asynchronous API, notably Futures
It provides a diagnostic dashboard that can provide valuable insight on performance and progress
It handles data locality with more sophistication, and so can be more efficient than the multiprocessing scheduler on workloads that require multiple processes
You can create a dask.distributed
scheduler by importing and creating a
Client
with no arguments. This overrides whatever default was previously
set.
from dask.distributed import Client
client = Client()
You can navigate to http://localhost:8787/status to see the diagnostic dashboard if you have Bokeh installed.
Client¶
You can trivially set up a local cluster on your machine by instantiating a Dask Client with no arguments
from dask.distributed import Client
client = Client()
This sets up a scheduler in your local process and several processes running single-threaded Workers.
If you want to run workers in your same process, you can pass the
processes=False
keyword argument.
client = Client(processes=False)
This is sometimes preferable if you want to avoid inter-worker communication and your computations release the GIL. This is common when primarily using NumPy or Dask Array.
LocalCluster¶
The Client()
call described above is shorthand for creating a LocalCluster
and then passing that to your client.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
This is equivalent, but somewhat more explicit. You may want to look at the
keyword arguments available on LocalCluster
to understand the options available
to you on handling the mixture of threads and processes, like specifying explicit
ports, and so on.
-
class
distributed.deploy.local.
LocalCluster
(n_workers=None, threads_per_worker=None, processes=True, loop=None, start=None, ip=None, scheduler_port=0, silence_logs=30, diagnostics_port=8787, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, **worker_kwargs)¶ Create local Scheduler and Workers
This creates a “cluster” of a scheduler and workers running on the local machine.
- Parameters
- n_workers: int
Number of workers to start
- processes: bool
Whether to use processes (True) or threads (False). Defaults to True
- threads_per_worker: int
Number of threads per each worker
- scheduler_port: int
Port of the scheduler. 8786 by default, use 0 to choose a random port
- silence_logs: logging level
Level of logs to print out to stdout.
logging.WARN
by default. Use a falsey value like False or None for no change.- ip: string
IP address on which the scheduler will listen, defaults to only localhost
- diagnostics_port: int
Port on which the web will be provided. 8787 by default, use 0 to choose a random port,
None
to disable it, or an(ip:port)
tuple to listen on a different IP address than the scheduler.- asynchronous: bool (False by default)
Set to True if using this cluster within async/await functions or within Tornado gen.coroutines. This should remain False for normal use.
- kwargs: dict
Extra worker arguments, will be passed to the Worker constructor.
- service_kwargs: Dict[str, Dict]
Extra keywords to hand to the running services
- securitySecurity
Examples
>>> c = LocalCluster() # Create a local cluster with as many workers as cores >>> c LocalCluster("127.0.0.1:8786", workers=8, ncores=8)
>>> c = Client(c) # connect to local cluster
Add a new worker to the cluster
>>> w = c.start_worker(ncores=2)
Shut down the extra worker
>>> c.stop_worker(w)
Pass extra keyword arguments to Bokeh
>>> LocalCluster(service_kwargs={'bokeh': {'prefix': '/foo'}})
-
close
(timeout=20)¶ Close the cluster
-
scale_down
(workers)¶ Remove
workers
from the clusterGiven a list of worker addresses this function should remove those workers from the cluster. This may require tracking which jobs are associated to which worker address.
This can be implemented either as a function or as a Tornado coroutine.
-
scale_up
(n, **kwargs)¶ Bring the total count of workers up to
n
This function/coroutine should bring the total number of workers up to the number
n
.This can be implemented either as a function or as a Tornado coroutine.
-
start_worker
(**kwargs)¶ Add a new worker to the running cluster
- Parameters
- port: int (optional)
Port on which to serve the worker, defaults to 0 or random
- ncores: int (optional)
Number of threads to use. Defaults to number of logical cores
- Returns
- The created Worker or Nanny object. Can be discarded.
Examples
>>> c = LocalCluster() >>> c.start_worker(ncores=2)
-
stop_worker
(w)¶ Stop a running worker
Examples
>>> c = LocalCluster() >>> w = c.start_worker(ncores=2) >>> c.stop_worker(w)