Dask-Yarn¶
Dask-Yarn deploys Dask on YARN clusters, such as are found in traditional Hadoop installations. Dask-Yarn provides an easy interface to quickly start, scale, and stop Dask clusters natively from Python.
from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster where each worker has two cores and eight GiB of memory
cluster = YarnCluster(environment='environment.tar.gz',
worker_vcores=2,
worker_memory="8GiB")
# Scale out to ten such workers
cluster.scale(10)
# Connect to the cluster
client = Client(cluster)
Dask-Yarn uses Skein, a Pythonic library to create and deploy YARN applications.
Install¶
Dask-Yarn is designed to only require installation on an edge node. To install, use one of the following methods:
Install with Conda:
conda install -c conda-forge dask-yarn
Install with Pip:
pip install dask-yarn
Install from Source:
Dask-Yarn is available on github and can always be installed from source.
pip install git+https://github.com/dask/dask-yarn.git
Quickstart¶
Dask-Yarn is designed to be used like any other python library - install it locally and use it in your code (either interactively, or as part of an application). As long as the computer you’re deploying on has access to the YARN cluster (usually an edge node), everything should work fine.
Install Dask-Yarn on an Edge Node¶
Dask-Yarn is designed to be used from an edge node. To install, use either
conda or pip to create a new environment and install dask-yarn
on
the edge node.
Conda Environments:
Create a new conda environment with dask-yarn
installed. You may also want
to add any other packages you rely on for your work.
$ conda create -n my_env dask-yarn # Create an environment
$ conda activate my_env # Activate the environment
Virtual Environments:
Create a new virtual environment with dask-yarn
installed. You may also
want to add any other packages you rely on for your work.
$ python -m venv my_env # Create an environment using venv
$ source my_env/bin/activate # Activate the environment
$ pip install dask-yarn # Install some packages
Package your environment for Distribution¶
We need to ensure that the libraries used on the Yarn cluster are the same as
what you are using locally. By default, dask-yarn
handles this by
distributing a packaged python environment to the Yarn cluster as part of the
applications. This is typically handled using
- conda-pack for Conda environments
- venv-pack for virtual environments
See Managing Python Environments for more information.
Conda Environments:
If you haven’t already installed conda-pack, you’ll need to do so now. You can either install it in the environment to be packaged, or your root environment (where it will be available to use in all environments).
$ conda install -c conda-forge conda-pack # Install conda-pack
$ conda-pack # Package environment
Collecting packages...
Packing environment at '/home/username/miniconda/envs/my_env' to 'my_env.tar.gz'
[########################################] | 100% Completed | 12.2s
Virtual Environments:
If you haven’t already installed venv-pack, you’ll need to do so now.
$ pip install venv-pack # Install venv-pack
$ venv-pack # Package environment
Collecting packages...
Packing environment at '/home/username/my-env' to 'my-env.tar.gz'
[########################################] | 100% Completed | 8.3s
Kinit (Optional)¶
If your cluster is configured to use Kerberos for authentication, you need to make sure you have an active ticket-granting-ticket before continuing:
$ kinit
Usage¶
To start a YARN cluster, create an instance of YarnCluster
. This
constructor takes several parameters, leave them empty to use the defaults
defined in the Configuration.
from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster where each worker has two cores and eight GiB of memory
cluster = YarnCluster(environment='environment.tar.gz',
worker_vcores=2,
worker_memory="8GiB")
# Connect to the cluster
client = Client(cluster)
# Do some work here
# Shutdown client and cluster (alternatively use context-manager as shown below):
client.shutdown()
cluster.shutdown()
By default no workers are started on cluster creation. To change the number of
workers, use the YarnCluster.scale()
method. When scaling up, new workers
will be requested from YARN. When scaling down, workers will be intelligently
selected and scaled down gracefully, freeing up resources.
# Scale up to 10 workers
cluster.scale(10)
# ...
# Scale back down to 2 workers
cluster.scale(2)
Alternatively, you can enable adaptive scaling using the
YarnCluster.adapt()
method. When enabled, the cluster will scale up and
down automatically depending on usage. Here we turn on adaptive scaling,
bounded at a minimum of 2 workers and a maximum of 10 workers.
# Adaptively scale between 2 and 10 workers
cluster.adapt(minimum=2, maximum=10)
If you’re working interactively in a Jupyter Notebook or JupyterLab, you
can also use the provided graphical interface to change the cluster size,
instead of calling YarnCluster.scale()
or YarnCluster.adapt()
manually.

Normally the cluster will persist until the YarnCluster
object is deleted.
To be more explicit about when the cluster is shutdown, you can either use the
cluster as a context manager, or manually call YarnCluster.shutdown()
.
# Use ``YarnCluster`` as a context manager
with YarnCluster(...) as cluster:
# The cluster will remain active inside this block,
# and will be shutdown when the context exits.
# Or manually call `shutdown`
cluster = YarnCluster(...)
# ...
cluster.shutdown()
Managing Python Environments¶
We need to ensure that the libraries used on the Yarn cluster are the same as what you are using locally. There are a few ways to specify this:
- The path to an archived environment (either conda or virtual environments)
- The path to a Conda environment (as
conda:///...
) - The path to a virtual environment (as
venv:///...
) - The path to a python executable (as
python:///...
)
Note that when not using an archive, the provided path must be valid on all nodes in the cluster.
Using Archived Python Environments¶
The most common way to use dask-yarn
is to distribute an archived Python
environment throughout the YARN cluster as part of the application. Packaging
the environment for distribution is typically handled using
- conda-pack for Conda environments
- venv-pack for virtual environments
These environments can contain any Python packages you might need, but require
dask-yarn
(and its dependencies) at a minimum.
Archiving Conda Environments Using Conda-Pack¶
You can package a conda environment using conda-pack.
$ conda create -n my-env dask-yarn scikit-learn # Create an environment
$ conda activate my-env # Activate the environment
$ conda-pack # Package environment
Collecting packages...
Packing environment at '/home/username/miniconda/envs/my-env' to 'my-env.tar.gz'
[########################################] | 100% Completed | 12.2s
Archiving Virtual Environments Using Venv-Pack¶
You can package a virtual environment using venv-pack. The virtual environment
can be created using either venv or virtualenv. Note that the python linked
to in the virtual environment must exist and be accessible on every node in the
YARN cluster. If the environment was created with a different Python, you can
change the link path using the --python-prefix
flag. For more information see
the venv-pack documentation.
$ python -m venv my_env # Create an environment using venv
$ python -m virtualenv my_env # Or create an environment using virtualenv
$ source my_env/bin/activate # Activate the environment
$ pip install dask-yarn scikit-learn # Install some packages
$ venv-pack # Package environment
Collecting packages...
Packing environment at '/home/username/my-env' to 'my-env.tar.gz'
[########################################] | 100% Completed | 8.3s
Specifying the Archived Environment¶
You can now start a cluster with the packaged environment by passing the
path to the constructor, e.g. YarnCluster(environment='my-env.tar.gz',
...)
.
Note that if the environment is a local file, the archive will be automatically
uploaded to a temporary directory on HDFS before starting the application. If
you find yourself reusing the same environment multiple times, you may want to
upload the environment to HDFS once beforehand to avoid repeating this process
for each cluster (the environment is then specified as
hdfs:///path/to/my-env.tar.gz
).
After startup you may want to verify that your versions match with the following:
from dask_yarn import YarnCluster
from dask.distributed import Client
cluster = YarnCluster(environment='my-env.tar.gz')
client = Client(cluster)
client.get_versions(check=True) # check that versions match between all nodes
Using Python Environments Local to Each Node¶
Alternatively, you can specify the path to a conda environment, virtual environment, or Python executable that is already found on each node:
from dask_yarn import YarnCluster
# Use a conda environment at /path/to/my/conda/env
cluster = YarnCluster(environment='conda:///path/to/my/conda/env')
# Use a virtual environment at /path/to/my/virtual/env
cluster = YarnCluster(environment='venv:///path/to/my/virtual/env')
# Use a Python executable at /path/to/my/python
cluster = YarnCluster(environment='python:///path/to/my/python')
As before, these environments can have any Python packages, but must include
dask-yarn
(and its dependencies) at a minimum. It’s also very important
that these environments are uniform across all nodes; mismatched environments
can lead to hard to diagnose issues. To check this, you can use the
Client.get_versions
method:
from dask.distributed import Client
client = Client(cluster)
client.get_versions(check=True) # check that versions match between all nodes
Configuration¶
Specifying all parameters to the YarnCluster constructor every time may be
error prone, especially when sharing this workflow with new users.
Alternatively, you can provide defaults in a configuration file, traditionally
held in ~/.config/dask/yarn.yaml
or /etc/dask/yarn.yaml
. Note that
this configuration is optional, and only changes the defaults when not
specified in the constructor. You only need to set the fields you care about,
unset fields will fall back to the default configuration.
Example:
# ~/.config/dask/yarn.yaml
yarn:
name: dask # Application name
queue: default # Yarn queue to deploy to
environment: /path/to/my-env.tar.gz
scheduler: # Specifications of scheduler container
vcores: 1
memory: 4GiB
worker: # Specifications of worker containers
vcores: 2
memory: 8GiB
Users can now create YarnClusters without specifying any additional information.
from dask_yarn import YarnCluster
cluster = YarnCluster()
cluster.scale(20)
For more information on Dask configuration see the Dask configuration documentation.
Providing a Custom Skein Specification¶
Sometimes you’ll need more control over the deployment than is provided by the
above configuration fields. In this case you can provide the path to a custom
Skein specification to the yarn.specification
field. If this field is
present in the configuration, it will be used as long as no parameters are
passed to the YarnCluster
constructor. Note that this is equivalent to
calling YarnCluster.from_specification()
programatically.
# /home/username/.config/dask/yarn.yaml
yarn:
specification: /path/to/spec.yaml
The specification requires at least one Service named dask.worker
which
describes how to start a single worker. If an additional service
dask.scheduler
is provided, this will be assumed to start the scheduler. If
dask.scheduler
isn’t present, a scheduler will be started locally instead.
In the script
section for each service, the appropriate dask-yarn
CLI Docs command should be used:
dask-yarn services worker
to start the workerdask-yarn services scheduler
to start the worker
Beyond that, you have full flexibility for how to define a specification. See the Skein documentation for more information. A few examples are provided below:
Example: deploy-mode local with node_label restrictions¶
This specification is similar to that created automatically when
deploy_mode='local'
is specified (scheduler runs locally, only worker
service specified), except it adds node_label restrictions for the workers.
Here we restrict workers to run only on nodes labeled as GPU.
# /path/to/spec.yaml
name: dask
queue: myqueue
services:
dask.worker:
# Restrict workers to GPU nodes only
node_label: GPU
# Don't start any workers initially
instances: 0
# Workers can infinite number of times
max_restarts: -1
# Restrict workers to 4 GiB and 2 cores each
resources:
memory: 4 GiB
vcores: 2
# Distribute this python environment to every worker node
files:
environment: /path/to/my/environment.tar.gz
# The bash script to start the worker
# Here we activate the environment, then start the worker
script: |
source environment/bin/activate
dask-yarn services worker
Example: deploy-mode remote with custom setup¶
This specification is similar to that created automatically when
deploy_mode='remote'
is specified (both scheduler and worker run inside
YARN containers), except it runs an initialization script before starting each
service.
# /path/to/spec.yaml
name: dask
queue: myqueue
services:
dask.scheduler:
# Restrict scheduler to 2 GiB and 1 core
resources:
memory: 2 GiB
vcores: 1
# The bash script to start the scheduler.
# Here we have dask-yarn already installed on the node,
# and also run a custom script before starting the service
script: |
some-custom-initialization-script
dask-yarn services worker
dask.worker:
# Don't start any workers initially
instances: 0
# Workers can infinite number of times
max_restarts: -1
# Workers should only be started after the scheduler starts
depends:
- dask.scheduler
# Restrict workers to 4 GiB and 2 cores each
resources:
memory: 4 GiB
vcores: 2
# The bash script to start the worker.
# Here we have dask-yarn already installed on the node,
# and also run a custom script before starting the service
script: |
some-custom-initialization-script
dask-yarn services worker
Default Configuration¶
The default configuration file is as follows
yarn:
specification: null # A path to a skein specification yaml file.
# Overrides the following configuration if given.
name: dask # Application name
queue: default # Yarn queue to deploy to
deploy-mode: remote # The deploy mode to use (either remote or local)
environment: null # The Python environment to use
tags: [] # List of strings to tag applications
user: '' # The user to submit the application on behalf of,
# leave as empty string for current user.
host: "0.0.0.0" # The scheduler host, when in deploy-mode=local
port: 0 # The scheduler port, when in deploy-mode=local
dashboard-address: ":0" # The dashboard address, when in deploy-mode=local
scheduler: # Specifications of scheduler container
vcores: 1
memory: 2GiB
gpus: 0 # Number of GPUs requested
worker: # Specifications of worker containers
vcores: 1
memory: 2GiB
count: 0 # Number of workers to start on initialization
restarts: -1 # Allowed number of restarts, -1 for unlimited
env: {} # A map of environment variables to set on the worker
gpus: 0 # Number of GPUs requested
worker_class: "dask.distributed.Nanny" # The kind of worker to launch
worker_options: {} # A map of options to pass to the worker
client: # Specification of client container
vcores: 1
memory: 2GiB
gpus: 0 # Number of GPUs requested
env: {} # A map of environment variables ot set on the client
Submitting Applications¶
Warning
The submission API is experimental and may change between versions
Sometimes you have Dask Application you want to deploy completely on YARN, without having a corresponding process running on an edge node. This may come up with production applications deployed automatically, or long running jobs you don’t want to consume edge node resources.
To handle these cases, dask-yarn
provides a CLI Docs that can be used to
submit applications to be run on the YARN cluster asynchronously. There are
three commands that may be useful here:
dask-yarn submit
: submit an application to the YARN clusterdask-yarn status
: check on the status of an applicationdask-yarn kill
: kill a running application
Submitting an Application¶
To prepare an application to be submitted using dask-yarn submit
, you need
to change the creation of your YarnCluster
from using the constructor
to using YarnCluster.from_current()
.
# Replace this
cluster = YarnCluster(...)
# with this
cluster = YarnCluster.from_current()
This is because the script won’t be run until the cluster is already created -
at that point configuration passed to the YarnCluster
constructor
won’t be useful. Cluster configuration is instead passed via the dask-yarn
submit
CLI (note that as before, the cluster can be
scaled dynamically after creation).
# Submit `myscript.py` to run on a dask cluster with 8 workers,
# each with 2 cores and 4 GiB
$ dask-yarn submit \
--environment my_env.tar.gz \
--worker-count 8 \
--worker-vcores 2 \
--worker-memory 4GiB \
myscript.py
application_1538148161343_0051
This outputs a YARN Application ID, which can be used with other YARN tools.
Checking Application Status¶
Submitted application status can be checked using the YARN Web UI, or
programmatically using dask-yarn status
. This command takes one parameter -
the application id.
$ dask-yarn status application_1538148161343_0051
APPLICATION_ID NAME STATE STATUS CONTAINERS VCORES MEMORY RUNTIME
application_1538148161343_0051 dask RUNNING UNDEFINED 9 17 33792 6m
Killing a Running Application¶
Submitted applications normally run until completion. If you need to terminate
one before then, you can use the dask-yarn kill
command. This command
takes one parameter - the application id.
$ dask-yarn kill application_1538148161343_0051
Accessing the Application Logs¶
Application logs can be retrieved a few ways:
The logs of running applications can be viewed using the Skein Web UI (
dask-yarn
is built using Skein).The logs of completed applications can be viewed using the
yarn logs
command.$ yarn logs -applicationId application_1538148161343_0051
Deploying on Amazon EMR¶
Amazon Elastic MapReduce (EMR) is a web service for creating a cloud-hosted Hadoop cluster.
Dask-Yarn works out-of-the-box on Amazon EMR, following the Quickstart as written should get you up and running fine. We recommend doing the installation step as part of a bootstrap action.
For a curated installation, we also provide an example bootstrap action for installing Dask and Jupyter on cluster startup. This script is heavily commented, and can be used as an example if you need a more customized installation.
Here we provide a brief walkthrough of a workflow we’ve found useful when deploying Dask on Amazon EMR:
Configure the EMR Cluster¶
The EMR documentation contains an example showing how to configure and start an EMR cluster. We recommend referencing their documentation. A few notes on recommended settings:
- If you plan to use a bootstrap action, you’ll need to follow the
Go to advanced options
link on theCreate Cluster
page - this feature is not available underQuick Options
. - When chosing which applications to install,
dask-yarn
only requires a Hadoop installation, all other applications are optional depending on your workflow.
While this configuration can take some time and thought, the next time you want to start a Dask cluster on EMR you can clone this cluster to reuse the configuration.
Add a Bootstrap Action¶
To make installation on a clean cluster easier, we recommend scripting the installation as part of a bootstrap action. For a curated install, we provide an example bootstrap action that you may use. This script does the following:
- Installs miniconda.
- Installs
dask
,distributed
,dask-yarn
, pyarrow, and s3fs. This list of packages can be extended using the--conda-packages
flag. - Packages the environment with conda-pack for distribution to the workers.
- Optionally installs and starts a Jupyter Notebook server running on port
8888. This can be disabled with the
--no-jupyter
flag. The password for the notebook server can be set with the--password
option, the default isdask-user
.
If you require a more customized install than this, you may wish to provide your own script. The example script is heavily commented, and should hopefully provide enough of a reference for your use.
To use the script, follow AWS’s documentation on using bootstrap actions. You’ll need to upload the script to an S3 bucket accessible to your project first.
Start the EMR Cluster¶
Once you’ve finished configuring your cluster, you can start it with the
Create Cluster
button. This may take a while (~10 mins), depending on your
settings.
Connect to the EMR Cluster¶
Once the cluster is running, you’ll want to connect to it. Depending on your
EC2 security groups
settings, you may have direct access to the cluster, or
you may need to start an ssh tunnel (default). For either of these you’ll need
to know the public DNS name of your master node. This address looks like
ec2-###-##-##-###.compute-1.amazonaws.com
, and can be found by following
the AWS documentation.
Direct Access
If you have direct access to the cluster, you should be able to access the
resource-manager WebUI at <public-dns-name>:8088
. If you used our provided
bootstrap action, the Jupyter Notebook should be available at
<public-dns-name>:8888
.
Using an SSH Tunnel
If you don’t have direct access, you’ll need to start an SSH tunnel to access the Web UIs or the Jupyter Notebook. For more information, see the AWS documentation.
If you used our provided bootstrap action, the Jupyter Notebook should be
available at <public-dns-name>:8888
, and can be accessed by starting a SSH
tunnel via:
$ ssh -i ~/mykeypair.pem -L 8888:<public-dns-name>:8888 hadoop@<public-dns-name>
where ~/mykeypair.pem
is your .pem
file, and <public-dns-name>
is
the public DNS name of your master node.
Create a Dask Cluster¶
At this point you should have access to a running EMR cluster, with Dask and
its dependencies installed. To create a new Dask cluster running inside the EMR
cluster, create an instance of YarnCluster
. If you didn’t use our
bootstrap action, you’ll also need to provide a path to your packaged
environment (see Managing Python Environments for more information).
from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster
cluster = YarnCluster()
# Connect to the cluster
client = Client(cluster)
By default no workers are started on cluster creation. To change the number of
workers, use the YarnCluster.scale()
method. When scaling up, new workers
will be requested from YARN. When scaling down, workers will be intelligently
selected and scaled down gracefully, freeing up resources.
# Scale up to 10 workers
cluster.scale(10)
# ...
# Scale back down to 2 workers
cluster.scale(2)
If you’re working interactively in a Jupyter Notebook you can also use the provided graphical interface to change the cluster size.

If you used our bootstrap action, the dask dashboard will also be available, and the link included in the cluster widget above.
Shutdown the EMR Cluster¶
You can start, scale, and stop many Dask clusters within a single EMR cluster. When you’re finally done doing your work, you’ll want to shutdown the whole EMR cluster to conserve resources. See the AWS documentation for more information.
Deploying on Google Cloud Dataproc¶
Dataproc is Google Cloud’s hosted service for creating Apache Hadoop and Apache Spark clusters. Dataproc supports a series of open-source initialization actions that allows installation of a wide range of open source tools when creating a cluster. In particular, the following instructions will guide you through creating a Dataproc cluster with Dask and Dask-Yarn installed and configured for you. This tutorial is loosely adapted from the README for the Dask initialization action.
What the Initialization Action is Doing¶
The initialization action installation script does several things:
- It accepts a metadata parameter for configuring your cluster to use Dask with either its standalone scheduler or with Dask-Yarn to utilize Yarn.
- For the
yarn
configuration, this script installsdask
anddask-yarn
on all machines and adds a baseline Skein config file. This file tells each machine where to locate the Dask-Yarn environment, as well as how many workers to use by default: 2. This way, you can get started withdask-yarn
by simply creating aYarnCluster
object without providing any parameters. Dask relies on using Yarn to schedule its tasks. - For the
standalone
configuration, this script installsdask
and configures the cluster to use the Dask scheduler for managing Dask workloads. - The Dataproc service itself provides support for web UIs such as Jupyter and the Dask web UIs. This will be explained in more detail below.
Configuring your Dataproc Cluster¶
There are several ways to create a Dataproc cluster. This tutorial will focus on using the gcloud SDK to do so.
First, you’ll need to create a GCP Project. Please follow the instructions here to do so.
Decide on a name for your Dataproc cluster. Then, pick a geographic region to place your cluster in, ideally one close to you.
The following command will create a cluster for the dask-yarn configuration.
CLUSTER_NAME=<cluster-name>
REGION=<region>
gcloud dataproc clusters create ${CLUSTER_NAME} \
--region ${REGION} \
--master-machine-type n1-standard-16 \
--worker-machine-type n1-standard-16 \
--image-version preview \
--initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/dask/dask.sh \
--metadata dask-runtime=yarn \
--optional-components JUPYTER \
--enable-component-gateway
To break down this command:
gcloud dataproc clusters create ${CLUSTER_NAME}
uses the gcloud sdk to to create a Dataproc cluster.--region ${REGION}
specifies the cluster region.--master-machine-type
andworker-machine-type
allow configuration of CPUs and RAM via different types of machines.image-version preview
specifies the Dataproc image version. You’ll use the latest preview image of Dataproc for the most up-to-date features.--initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/dask/dask.sh
specifies the initialization actions to install on the cluster. You can add as many as you’d like via a comma-separated list.--metadata dask-runtime=yarn
specifies to configure your cluster with Dask configured for use withyarn
.--optional-components JUPYTER
configures the cluster with the Jupyter optional component to access Jupyter notebooks running on the cluster. Like initialization actions, you can add as many optional components as you’d like. These differ from initialization actions in that they come with first-class support from the Dataproc service, but there are less options available.--enable-component-gateway
allows you to bypass needing an SSH tunnel for a certain predetermined list of web UIs on your cluster, such as Jupyter and the Yarn ApplicationMaster, by connecting directly through the Dataproc web console.
Connecting to your cluster¶
You can access your cluster several different ways. If you configured your cluster with a notebook service such as Jupyter or Zeppelin and enable component gateway (explained above), you can access these by navigating to your clusters page, clicking on the name of your cluster and clicking on the Web Interfaces tab to access your web UIs.
You can also ssh into your cluster. You can do this via the Dataproc web console: from the clusters page, click on your cluster name, then VM Instances and click SSH next to the master node.
Additionally, you can also use the gcloud sdk to SSH onto your cluster. First,
locate the zone that your cluster is in. This will be the region you specified
earlier but with a letter attached to it, such as us-central1-b
.
To locate your cluster’s zone, you can find this on the clusters page next
to your cluster. This was determined via Dataproc’s Auto Zone feature, but
you can choose any zone to place your cluster by adding the --zone
flag
when creating a new cluster.
gcloud compute ssh ${CLUSTER_NAME}-m --zone ${ZONE}
Once connected, either via a Jupyter notebook or via ssh, try running some code. If your cluster is configured with Dask-Yarn:
from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.array as da
import numpy as np
cluster = YarnCluster()
client = Client(cluster)
cluster.adapt() # Dynamically scale Dask resources
x = da.sum(np.ones(5))
x.compute()
If your cluster is configured with the standalone scheduler:
from dask.distributed import Client
import dask.array as da
import numpy as np
client = Client("localhost:8786")
x = da.sum(np.ones(5))
x.compute()
Monitoring Dask Jobs¶
You can monitor your Dask applications using Web UIs, depending on the runtime you are using.
For yarn mode, you can access the Skein Web UI via the YARN ResourceManager. To access the YARN ResourceManager, create your cluster with component gateway enabled or create an SSH tunnel. You can then access the Skein Web UI by following these instructions.
For standalone mode, you can access the native Dask UI. Create an SSH tunnel to access the Dask UI on port 8787.
Deleting your Dataproc Cluster¶
You can delete your cluster when you are done with it by running the following command:
gcloud dataproc clusters delete ${CLUSTER_NAME} --region ${REGION}
Further Information¶
Please refer to the Dataproc documentation for more information on using Dataproc.
API Docs¶
-
class
dask_yarn.
YarnCluster
(environment=None, n_workers=None, worker_vcores=None, worker_memory=None, worker_restarts=None, worker_env=None, worker_class=None, worker_options=None, worker_gpus=None, scheduler_vcores=None, scheduler_gpus=None, scheduler_memory=None, deploy_mode=None, name=None, queue=None, tags=None, user=None, host=None, port=None, dashboard_address=None, skein_client=None, asynchronous=False, loop=None)¶ Start a Dask cluster on YARN.
You can define default values for this in Dask’s
yarn.yaml
configuration file. See http://docs.dask.org/en/latest/configuration.html for more information.Parameters: - environment : str, optional
The Python environment to use. Can be one of the following:
- A path to an archived Python environment
- A path to a conda environment, specified as conda:///…
- A path to a virtual environment, specified as venv:///…
- A path to a python executable, specifed as python:///…
Note that if not an archive, the paths specified must be valid on all nodes in the cluster.
- n_workers : int, optional
The number of workers to initially start.
- worker_vcores : int, optional
The number of virtual cores to allocate per worker.
- worker_memory : str, optional
The amount of memory to allocate per worker. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.
- worker_restarts : int, optional
The maximum number of worker restarts to allow before failing the application. Default is unlimited.
- worker_env : dict, optional
A mapping of environment variables to their values. These will be set in the worker containers before starting the dask workers.
- worker_gpus : int, options
The number of gpus to allocate per worker
- scheduler_vcores : int, optional
The number of virtual cores to allocate per scheduler.
- scheduler_gpus : int, options
The number of gpus to allocate per scheduler
- scheduler_memory : str, optional
The amount of memory to allocate to the scheduler. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.
- deploy_mode : {‘remote’, ‘local’}, optional
The deploy mode to use. If
'remote'
, the scheduler will be deployed in a YARN container. If'local'
, the scheduler will run locally, which can be nice for debugging. Default is'remote'
.- name : str, optional
The application name.
- queue : str, optional
The queue to deploy to.
- tags : sequence, optional
A set of strings to use as tags for this application.
- user : str, optional
The user to submit the application on behalf of. Default is the current user - submitting as a different user requires user permissions, see the YARN documentation for more information.
- host : str, optional
Host address on which the scheduler will listen. Only used if
deploy_mode='local'
. Defaults to'0.0.0.0'
.- port : int, optional
The port on which the scheduler will listen. Only used if
deploy_mode='local'
. Defaults to0
for a random port.- dashboard_address : str
Address on which to the dashboard server will listen. Only used if
deploy_mode='local'
. Defaults to ‘:0’ for a random port.- skein_client : skein.Client, optional
The
skein.Client
to use. If not provided, one will be started.- asynchronous : bool, optional
If true, starts the cluster in asynchronous mode, where it can be used in other async code.
- loop : IOLoop, optional
The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.
Examples
>>> cluster = YarnCluster(environment='my-env.tar.gz', ...) >>> cluster.scale(10)
-
adapt
(minimum=0, maximum=inf, interval='1s', wait_count=3, target_duration='5s', **kwargs)¶ Turn on adaptivity
This scales Dask clusters automatically based on scheduler activity.
Parameters: - minimum : int, optional
Minimum number of workers. Defaults to
0
.- maximum : int, optional
Maximum number of workers. Defaults to
inf
.- interval : timedelta or str, optional
Time between worker add/remove recommendations.
- wait_count : int, optional
Number of consecutive times that a worker should be suggested for removal before we remove it.
- target_duration : timedelta or str, optional
Amount of time we want a computation to take. This affects how aggressively we scale up.
- **kwargs :
Additional parameters to pass to
distributed.Scheduler.workers_to_close
.
Examples
>>> cluster.adapt(minimum=0, maximum=10)
-
dashboard_link
¶ Link to the dask dashboard. None if dashboard isn’t running
-
classmethod
from_application_id
(app_id, skein_client=None, asynchronous=False, loop=None)¶ Connect to an existing
YarnCluster
with a given application id.Parameters: - app_id : str
The existing cluster’s application id.
- skein_client : skein.Client
The
skein.Client
to use. If not provided, one will be started.- asynchronous : bool, optional
If true, starts the cluster in asynchronous mode, where it can be used in other async code.
- loop : IOLoop, optional
The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.
Returns: - YarnCluster
-
classmethod
from_current
(asynchronous=False, loop=None)¶ Connect to an existing
YarnCluster
from inside the cluster.Parameters: - asynchronous : bool, optional
If true, starts the cluster in asynchronous mode, where it can be used in other async code.
- loop : IOLoop, optional
The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.
Returns: - YarnCluster
-
classmethod
from_specification
(spec, skein_client=None, asynchronous=False, loop=None)¶ Start a dask cluster from a skein specification.
Parameters: - spec : skein.ApplicationSpec, dict, or filename
The application specification to use. Must define at least one service:
'dask.worker'
. If no'dask.scheduler'
service is defined, a scheduler will be started locally.- skein_client : skein.Client, optional
The
skein.Client
to use. If not provided, one will be started.- asynchronous : bool, optional
If true, starts the cluster in asynchronous mode, where it can be used in other async code.
- loop : IOLoop, optional
The IOLoop instance to use. Defaults to the current loop in asynchronous mode, otherwise a background loop is started.
-
logs
(scheduler=True, workers=True)¶ Return logs for the scheduler and/or workers
Parameters: - scheduler : boolean, optional
Whether or not to collect logs for the scheduler
- workers : boolean or iterable, optional
A list of worker addresses to select. Defaults to all workers if
True
or no workers ifFalse
Returns: - logs : dict
A dictionary of name -> logs.
-
scale
(n)¶ Scale cluster to n workers.
Parameters: - n : int
Target number of workers
Examples
>>> cluster.scale(10) # scale cluster to ten workers
-
shutdown
(status='SUCCEEDED', diagnostics=None)¶ Shutdown the application.
Parameters: - status : {‘SUCCEEDED’, ‘FAILED’, ‘KILLED’}, optional
The yarn application exit status.
- diagnostics : str, optional
The application exit message, usually used for diagnosing failures. Can be seen in the YARN Web UI for completed applications under “diagnostics”. If not provided, a default will be used.
-
workers
()¶ A list of all currently running worker containers.
CLI Docs¶
Warning
The CLI is experimental and may change between versions
dask-yarn¶
Deploy Dask on Apache YARN
usage: dask-yarn [--help] [--version] command ...
-
--help
,
-h
¶
Show this help message then exit
-
--version
¶
Show version then exit
dask-yarn kill¶
Kill a Dask application
usage: dask-yarn kill [--help] APP_ID
-
app_id
¶
The application id
-
--help
,
-h
¶
Show this help message then exit
dask-yarn services¶
Manage Dask services
usage: dask-yarn services [--help] command ...
-
--help
,
-h
¶
Show this help message then exit
dask-yarn services client¶
Start a Dask client process
usage: dask-yarn services client [--help] script [args...]
-
script
¶
Path to a Python script to run.
-
args
¶
Any additional arguments to forward to script
-
--help
,
-h
¶
Show this help message then exit
dask-yarn services scheduler¶
Start a Dask scheduler process
usage: dask-yarn services scheduler [--help]
-
--help
,
-h
¶
Show this help message then exit
dask-yarn services worker¶
Start a Dask worker process
usage: dask-yarn services worker [--nthreads NTHREADS]
[--memory_limit MEMORY_LIMIT] [--help]
-
--nthreads
<nthreads>
¶ Number of threads. Defaults to number of vcores in container
-
--memory_limit
<memory_limit>
¶ Maximum memory available to the worker. This can be an integer (in bytes), a string (like ‘5 GiB’ or ‘500 MiB’), or 0 (no memory management). Defaults to the container memory limit.
-
--help
,
-h
¶
Show this help message then exit
dask-yarn status¶
Check the status of a submitted Dask application
usage: dask-yarn status [--help] APP_ID
-
app_id
¶
The application id
-
--help
,
-h
¶
Show this help message then exit
dask-yarn submit¶
Submit a Dask application to a YARN cluster
usage: dask-yarn submit [--name NAME] [--queue QUEUE] [--user USER]
[--tags TAGS] [--environment ENVIRONMENT]
[--deploy-mode DEPLOY_MODE]
[--worker-count WORKER_COUNT]
[--worker-vcores WORKER_VCORES]
[--worker-memory WORKER_MEMORY]
[--worker-restarts WORKER_RESTARTS]
[--worker-env WORKER_ENV]
[--client-vcores CLIENT_VCORES]
[--client-memory CLIENT_MEMORY]
[--client-env CLIENT_ENV]
[--scheduler-vcores SCHEDULER_VCORES]
[--scheduler-memory SCHEDULER_MEMORY]
[--temporary-security-credentials] [--help]
script [args...]
-
script
¶
Path to a python script to run on the client
-
args
¶
Any additional arguments to forward to script
-
--name
<name>
¶ The application name
-
--queue
<queue>
¶ The queue to deploy to
-
--user
<user>
¶ The user to submit the application on behalf of. Default is the current user - submitting as a different user requires proxy-user permissions.
A comma-separated list of strings to use as tags for this application.
-
--environment
<environment>
¶ Path to the Python environment to use. See the docs for more information
-
--deploy-mode
<deploy_mode>
¶ Either ‘remote’ (default) or ‘local’. If ‘remote’, the scheduler and client will be deployed in a YARN container. If ‘local’, they will be run locally.
-
--worker-count
<worker_count>
¶ The number of workers to initially start.
-
--worker-vcores
<worker_vcores>
¶ The number of virtual cores to allocate per worker.
-
--worker-memory
<worker_memory>
¶ The amount of memory to allocate per worker. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.
-
--worker-restarts
<worker_restarts>
¶ The maximum number of worker restarts to allow before failing the application. Default is unlimited.
-
--worker-env
<worker_env>
¶ Environment variables to set on the workers. Pass a key-value pair like
--worker-env key=val
. May be used more than once.
-
--client-vcores
<client_vcores>
¶ The number of virtual cores to allocate for the client.
-
--client-memory
<client_memory>
¶ The amount of memory to allocate for the client. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.
-
--client-env
<client_env>
¶ Environment variables to set on the client. Pass a key-value pair like
--client-env key=val
. May be used more than once.
-
--scheduler-vcores
<scheduler_vcores>
¶ The number of virtual cores to allocate for the scheduler.
-
--scheduler-memory
<scheduler_memory>
¶ The amount of memory to allocate for the scheduler. Accepts a unit suffix (e.g. ‘2 GiB’ or ‘4096 MiB’). Will be rounded up to the nearest MiB.
-
--temporary-security-credentials
¶
Instead of using a consistent set of TLS credentials for all clusters, create a fresh set just for this application.
-
--help
,
-h
¶
Show this help message then exit