Scheduling State¶
Overview¶
The life of a computation with Dask can be described in the following stages:
- The user authors a graph using some library, perhaps Dask.delayed or
dask.dataframe or the
submit/map
functions on the client. They submit these tasks to the scheduler. - The schedulers assimilates these tasks into its graph of all tasks to track and as their dependencies become available it asks workers to run each of these tasks.
- The worker receives information about how to run the task, communicates with its peer workers to collect dependencies, and then runs the relevant function on the appropriate data. It reports back to the scheduler that it has finished.
- The scheduler reports back to the user that the task has completed. If the user desires, it then fetches the data from the worker through the scheduler.
Most relevant logic is in tracking tasks as they evolve from newly submitted, to waiting for dependencies, to actively running on some worker, to finished in memory, to garbage collected. Tracking this process, and tracking all effects that this task has on other tasks that might depend on it, is the majority of the complexity of the dynamic task scheduler. This section describes the system used to perform this tracking.
For more abstract information about the policies used by the scheduler, see Scheduling Policies.
State Variables¶
We start with a description of the state that the scheduler keeps on each task. Each of the following is a dictionary keyed by task name (described below).
Task description variables¶
These containers keep task information accross a task’s whole lifetime.
tasks:
{key: task}
:Dictionary mapping key to a serialized task.
A key is the name of a task, generally formed from the name of the function, followed by a hash of the function and arguments, like
'inc-ab31c010444977004d656610d2d421ec'
.The value of this dictionary is the task, which is an unevaluated function and arguments. This is stored in one of two forms:
{'function': inc, 'args': (1,), 'kwargs': {}}
; a dictionary with the function, arguments, and keyword arguments (kwargs). However in the scheduler these are stored serialized, as they were sent from the client, so it looks more like{'function': b'\x80\x04\x95\xcb\...', 'args': b'...', }
(inc, 1)
: a tuple satisfying the dask graph protocol. This again is stored serialized.
These are the values that will eventually be sent to a worker when the task is ready to run.
dependencies and dependents:
{key: {keys}}
:These are dictionaries which show which tasks depend on which others. They contain redundant information. If
dependencies[a] == {b, c}
then the task with the name ofa
depends on the results of the two tasks with the names ofb
andc
. There will be complimentary entries in dependents such thata in dependents[b]
anda in dependents[c]
such asdependents[b] == {a, d}
. Keeping the information around twice allows for constant-time access for either direction of query, so we can both look up a task’s out-edges or in-edges efficiently.priority:
{key: tuple}
:The
priority
dictionary provides each key with a relative ranking which is used to break ties when many keys are being considered for execution.This ranking is generally a tuple of two parts. The first (and dominant) part corresponds to when it was submitted. Generally earlier tasks take precedence. The second part is determined by the client, and is a way to prioritize tasks within a large graph that may be important, such as if they are on the critical path, or good to run in order to release many dependencies. This is explained further in Scheduling Policy.
host_restrictions:
{key: {hostnames}}
:A set of hostnames per key of where that key can be run. Usually this is empty unless a key has been specifically restricted to only run on certain hosts. A hostname may correspond to one or several connected workers.
worker_restrictions:
{key: {worker addresses}}
:A set of complete worker addresses per key of where that key can be run. Usually this is empty unless a key has been specifically restricted to only run on certain workers.
resource_restrictions:
{key: {resource: quantity}}
:Resources required by a task, such as
{'gpu': 1}
or{'memory': 1e9}
. These are user-defined names and are matched against the contents of theworker_resources
dictionary.loose_restrictions:
{key}
:Set of keys for which we are allowed to violate restrictions (see above) if no valid workers are present and the task would otherwise go into the
unrunnable
set. In other words, if a key is inloose_restrictions
, then its restrictions become mere preferences, otherwise they are mandatory.who_wants:
{key: {client}}
:When a client submits a graph to the scheduler it also specifies which output keys it desires. Those keys are tracked here where each desired key knows which clients want it. These keys will not be released from memory and, when they complete, messages will be sent to all of these clients that the task is ready.
wants_what:
{client: {key}}
:The transpose of
who_wants
.
Task state flow¶
These state variables reflect the current status of a task and may get updated at each state transition.
task_state:
{key: string}
:The
task_state
dictionary holds the current state of every key. Current valid states includereleased
,waiting
,no-worker
,processing
,memory
, anderred
. These states are explained further below.waiting and waiting_data:
{key: {keys}}
:These dictionaries are a subset of
dependencies
anddependents
respectively, as they only track keys that are still in play.For example
waiting
looks likedependencies
, tracking all of the tasks that a certain task requires before it can run. However, as tasks are completed and arrive in memory they are removed from their dependents sets inwaiting
, so that when a set becomes empty we know that a key is ready to run and ready to be allocated to a worker.Similarly, the
waiting_data
dictionary holds all of the dependents of a key that have yet to run and still require that this task stay in memory in services of tasks that may depend on it (itsdependents
). When a value set in this dictionary becomes empty its task may be garbage-collected (unless some client actively desires that this task stay in memory, as tracked inwho_wants
).processing:
{worker: {key: cost}}
:Keys that are currently allocated to a worker. This is keyed by worker address and contains the expected cost in seconds of running each task, summing both the task’s expected computation time and the expected communication time of its result.
Multiple tasks may be submitted to a worker in advance and the worker will run them eventually, depending on its execution resources (but see Work Stealing).
rprocessing:
{key: worker}
:The reverse of the
processing
dictionary. This tracks the worker processing each task that is currently running. This is redundant withprocessing
and just here for faster indexed querying.who_has:
{key: {worker}}
:For keys that are in memory this shows on which workers they currently reside.
has_what:
{worker: {key}}
:This is the transpose of
who_has
, showing all keys that currently reside on each worker.released:
{keys}
The set of keys that are neither waiting to be processed, nor in memory. These typically are just-initialized tasks, or tasks that have already been computed but which it is not necessary to keep in memory.
unrunnable:
{key}
The set of keys that are not currently able to run, either because they have a user-defined restriction (described in
host_restrictions
,worker_restrictions
andresource_restrictions
) that is not met by any connected worker, or because no worker is connected at all.These keys already have all their
dependencies
satisfied (theirwaiting
set is empty), and are waiting for an appropriate worker to join the network before computing.exceptions and tracebacks:
{key: Exception/Traceback}
:Dictionaries mapping keys to remote exceptions and tracebacks. When tasks fail we store their exceptions and tracebacks (serialized from the worker) here so that users may gather the exceptions to see the error.
exceptions_blame:
{key: key}
:If a task fails then we mark all of its dependent tasks as failed as well. This dictionary lets any failed task see which task was the origin of its failure.
suspicious_tasks:
{key: int}
Number of times a task has been involved in a worker failure. Some tasks may cause workers to fail (such as
sys.exit(0)
). When a worker fails all of the tasks on that worker are reassigned to others. This combination of behaviors can cause a bad task to catastrophically destroy all workers on the cluster, one after another. Whenever a worker fails we mark each task currently running on that worker as suspicious. If a task is involved in three failures (or some other fixed constant) then we mark the task aserred
.retries:
{key: int}
Number of times a task is automatically retried in case of failure. If present, this is decremented each time a task’s execution failed, until it reaches 0.
nbytes:
{key: int}
:The number of bytes, as determined by
sizeof
, of the result of each finished task. This number is used for diagnostics and to help prioritize work.
Worker state variables¶
These state variables track the current state of each worker, and are involved in deciding which worker to run a task on.
ncores:
{worker: int}
The number of CPU cores made available on each worker.
worker_resources:
{worker: {str: Number}}
:The available resources on each worker like
{'gpu': 2, 'mem': 1e9}
. These are abstract quantities that constrain certain tasks from running at the same time on a given worker.used_resources:
{worker: {str: Number}}
:The sum of each resource used by all tasks allocated to a particular worker. The numbers in this dictionary can only be less or equal than those in
worker_resources
.worker_bytes:
{worker: int}
:The total memory size, in bytes, used by the keys currently held in memory on each given worker.
occupancy:
{worker: duration}
:The total expected runtime, in seconds, of all tasks currently processing on a worker.
idle and saturated:
{worker}
:Two sets of workers indicating their ability to start computing a new task in a relatively short timespan. “Idle” workers will be preferred when deciding a suitable worker to run a new task on. Conversely, “saturated” workers may see their workload lightened through Work Stealing.
These two sets are computed based on each worker’s number of cores (
ncores
), task queue (processing
) andoccupancy
numbers.These two sets are disjoint. Also, some workers may be neither “idle” nor “saturated”.
Example Event and Response¶
Whenever an event happens, like when a client sends up more tasks, or when a worker finishes a task, the scheduler changes the state above. For example when a worker reports that a task has finished we perform actions like the following:
Task `key` finished by `worker`:
# Update task state
task_state[key] = 'memory'
processing[worker].remove(key)
del rprocessing[key]
who_has[key].add(worker)
has_what[worker].add(key)
# Update memory / resource / occupancy counters
nbytes[key] = nbytes
worker_bytes[worker] += nbytes
for key, value in resource_restrictions[key]:
used_resources[key] -= value
update_worker_occupancy_and_idleness(worker)
# Notify clients interested in this task's result
if key in who_wants:
send_done_message_to_clients(who_wants[key])
# Transitively update dependent tasks
for dep in dependencies[key]:
waiting_data[dep].remove(key)
for dep in dependents[key]:
waiting[dep].remove(key)
for task in ready_tasks():
worker = best_worker(task):
send_task_to_worker(task, worker)
State Transitions¶
The code presented in the section above is just for demonstration. In practice
writing this code for every possible event is highly error prone, resulting in
hard-to-track-down bugs. Instead the scheduler moves tasks between a fixed
set of states, notably released
, waiting
, no-worker
, processing
,
memory
, error
.
Tasks fall into the following states with the following allowed transitions
- Released: Known but not actively computing or in memory
- Waiting: On track to be computed, waiting on dependencies to arrive in memory
- No-worker (ready, rare): Ready to be computed, but no appropriate worker exists
- Processing: Actively being computed by one or more workers
- Memory: In memory on one or more workers
- Erred: Task computation, or one of its dependencies, has encountered an error
- Forgotten (not actually a state): Task is no longer needed by any client and so is removed from state
Tasks and task states¶
The table below shows which state variable a task is in, depending on the task’s state. Cells with a check mark (✓) indicate the task key must be present in the given state variable; cells with an question mark (?) indicate the task key may be present in the given state variable.
State variable | Released | Waiting | No-worker | Processing | Memory | Erred |
---|---|---|---|---|---|---|
tasks | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
priority | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
dependencies | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
dependents | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
host_restrictions | ? | ? | ? | ? | ? | ? |
worker_restrictions | ? | ? | ? | ? | ? | ? |
resource_restrictions | ? | ? | ? | ? | ? | ? |
loose_restrictions | ? | ? | ? | ? | ? | ? |
released | ✓ | |||||
waiting | ✓ | |||||
waiting_data | ✓ | |||||
unrunnable | ✓ | |||||
processing | ✓ | |||||
rprocessing | ✓ | |||||
who_has | ✓ | |||||
has_what | ✓ | |||||
nbytes (1) | ? | ? | ? | ? | ✓ | ? |
exceptions | ✓ | |||||
tracebacks | ✓ | |||||
exceptions_blame | ✓ | |||||
retries | ? | ? | ? | ? | ? | ? |
suspicious_tasks | ? | ? | ? | ? | ? | ? |
Notes:
- nbytes: a task can be in this collection as long as it was already computed, even if not currently held in a worker’s memory.
Transitions and worker state¶
The table below shows which worker state variables are updated on each task state transition.
Transition | Affected worker state |
---|---|
released → waiting | occupancy, idle, saturated |
waiting → processing | occupancy, idle, saturated, used_resources |
waiting → memory | idle, saturated, worker_bytes |
processing → memory | occupancy, idle, saturated, used_resources, worker_bytes |
processing → erred | occupancy, idle, saturated, used_resources |
processing → released | occupancy, idle, saturated, used_resources |
memory → released | worker_bytes |
memory → forgotten | worker_bytes |
Note
Another way of understanding this table is to observe that entering or
exiting a specific task state updates a well-defined set of worker state
variables. For example, entering and exiting the Memory state updates
worker_bytes
.
Implementation¶
Every transition between states is a separate method in the scheduler. These
task transition functions are prefixed with transition
and then have the
name of the start and finish task state like the following.
def transition_released_waiting(self, key):
def transition_processing_memory(self, key):
def transition_processing_erred(self, key):
These functions each have three effects.
- They perform the necessary transformations on the scheduler state (the 20 dicts/lists/sets) to move one key between states.
- They return a dictionary of recommended
{key: state}
transitions to enact directly afterwards on other keys. For example after we transition a key into memory we may find that many waiting keys are now ready to transition from waiting to a ready state. - Optionally they include a set of validation checks that can be turned on for testing.
Rather than call these functions directly we call the central function
transition
:
def transition(self, key, final_state):
""" Transition key to the suggested state """
This transition function finds the appropriate path from the current to the final state. It also serves as a central point for logging and diagnostics.
Often we want to enact several transitions at once or want to continually
respond to new transitions recommended by initial transitions until we reach a
steady state. For that we use the transitions
function (note the plural s
).
def transitions(self, recommendations):
recommendations = recommendations.copy()
while recommendations:
key, finish = recommendations.popitem()
new = self.transition(key, finish)
recommendations.update(new)
This function runs transition
, takes the recommendations and runs them as
well, repeating until no further task-transitions are recommended.
Stimuli¶
Transitions occur from stimuli, which are state-changing messages to the scheduler from workers or clients. The scheduler responds to the following stimuli:
- Workers
- Task finished: A task has completed on a worker and is now in memory
- Task erred: A task ran and erred on a worker
- Task missing data: A task tried to run but was unable to find necessary data on other workers
- Worker added: A new worker was added to the network
- Worker removed: An existing worker left the network
- Clients
- Update graph: The client sends more tasks to the scheduler
- Release keys: The client no longer desires the result of certain keys
Stimuli functions are prepended with the text stimulus
, and take a variety
of keyword arguments from the message as in the following examples:
def stimulus_task_finished(self, key=None, worker=None, nbytes=None,
type=None, compute_start=None, compute_stop=None,
transfer_start=None, transfer_stop=None):
def stimulus_task_erred(self, key=None, worker=None,
exception=None, traceback=None)
These functions change some non-essential administrative state and then call transition functions.
Note that there are several other non-state-changing messages that we receive from the workers and clients, such as messages requesting information about the current state of the scheduler. These are not considered stimuli.
API¶
-
class
distributed.scheduler.
Scheduler
(center=None, loop=None, delete_interval=500, synchronize_worker_interval=60000, services=None, allowed_failures=3, extensions=None, validate=False, scheduler_file=None, security=None, **kwargs)[source]¶ Dynamic distributed task scheduler
The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.
All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.
The scheduler communicates with the outside world through Comm objects. It maintains a consistent and valid view of the world even when listening to several clients at once.
A Scheduler is typically started either with the
dask-scheduler
executable:$ dask-scheduler Scheduler started at 127.0.0.1:8786
Or within a LocalCluster a Client starts up without connection information:
>>> c = Client() >>> c.cluster.scheduler Scheduler(...)
Users typically do not interact with the scheduler directly but rather with the client object
Client
.State
The scheduler contains the following state variables. Each variable is listed along with what it stores and a brief description.
- tasks:
{key: task}
: - Dictionary mapping key to a serialized task like the following:
{'function': b'...', 'args': b'...'}
or{'task': b'...'}
- tasks:
- dependencies:
{key: {keys}}
: - Dictionary showing which keys depend on which others
- dependencies:
- dependents:
{key: {keys}}
: - Dictionary showing which keys are dependent on which others
- dependents:
- task_state:
{key: string}
: - Dictionary listing the current state of every task among the following: released, waiting, queue, no-worker, processing, memory, erred
- task_state:
- priority:
{key: tuple}
: - A score per key that determines its priority
- priority:
- waiting:
{key: {key}}
: - Dictionary like dependencies but excludes keys already computed
- waiting:
- waiting_data:
{key: {key}}
: - Dictionary like dependents but excludes keys already computed
- waiting_data:
- ready:
deque(key)
- Keys that are ready to run, but not yet assigned to a worker
- ready:
- processing:
{worker: {key: cost}}
: - Set of keys currently in execution on each worker and their expected duration
- processing:
- rprocessing:
{key: worker}
: - The worker currently executing a particular task
- rprocessing:
- who_has:
{key: {worker}}
: - Where each key lives. The current state of distributed memory.
- who_has:
- has_what:
{worker: {key}}
: - What worker has what keys. The transpose of who_has.
- has_what:
- released:
{keys}
- Set of keys that are known, but released from memory
- released:
- unrunnable:
{key}
- Keys that we are unable to run
- unrunnable:
- host_restrictions:
{key: {hostnames}}
: - A set of hostnames per key of where that key can be run. Usually this is empty unless a key has been specifically restricted to only run on certain hosts.
- host_restrictions:
- worker_restrictions:
{key: {workers}}
: - Like host_restrictions except that these include specific host:port worker names
- worker_restrictions:
- loose_restrictions:
{key}
: - Set of keys for which we are allow to violate restrictions (see above) if not valid workers are present.
- loose_restrictions:
- resource_restrictions:
{key: {str: Number}}
: - Resources required by a task, such as
{'GPU': 1}
or{'memory': 1e9}
. These names must match resources specified when creating workers.
- resource_restrictions:
- worker_resources:
{worker: {str: Number}}
: - The available resources on each worker like
{'gpu': 2, 'mem': 1e9}
. These are abstract quantities that constrain certain tasks from running at the same time.
- worker_resources:
- used_resources:
{worker: {str: Number}}
: - The sum of each resource used by all tasks allocated to a particular worker.
- used_resources:
- exceptions:
{key: Exception}
: - A dict mapping keys to remote exceptions
- exceptions:
- tracebacks:
{key: list}
: - A dict mapping keys to remote tracebacks stored as a list of strings
- tracebacks:
- exceptions_blame:
{key: key}
: - A dict mapping a key to another key on which it depends that has failed
- exceptions_blame:
- suspicious_tasks:
{key: int}
- Number of times a task has been involved in a worker failure
- suspicious_tasks:
- retries:
{key: int}
- Number of times a task may be automatically retried after failing
- retries:
- deleted_keys:
{key: {workers}}
- Locations of workers that have keys that should be deleted
- deleted_keys:
- wants_what:
{client: {key}}
: - What keys are wanted by each client.. The transpose of who_wants.
- wants_what:
- who_wants:
{key: {client}}
: - Which clients want each key. The active targets of computation.
- who_wants:
- nbytes:
{key: int}
: - Number of bytes for a key as reported by workers holding that key.
- nbytes:
- ncores:
{worker: int}
: - Number of cores owned by each worker
- ncores:
- idle:
{worker}
: - Set of workers that are not fully utilized
- idle:
- worker_info:
{worker: {str: data}}
: - Information about each worker
- worker_info:
- host_info:
{hostname: dict}
: - Information about each worker host
- host_info:
- worker_bytes:
{worker: int}
: - Number of bytes in memory on each worker
- worker_bytes:
- occupancy:
{worker: time}
- Expected runtime for all tasks currently processing on a worker
- occupancy:
- services:
{str: port}
: - Other services running on this scheduler, like Bokeh
- services:
- loop:
IOLoop
: - The running Tornado IOLoop
- loop:
- comms:
[Comm]
: - A list of Comms from which we both accept stimuli and report results
- comms:
- task_duration:
{key-prefix: time}
- Time we expect certain functions to take, e.g.
{'sum': 0.25}
- task_duration:
- coroutines:
[Futures]
: - A list of active futures that control operation
- coroutines:
-
add_client
(comm, client=None)[source]¶ Add client to network
We listen to all future messages from this Comm.
-
add_keys
(comm=None, worker=None, keys=())[source]¶ Learn that a worker has certain keys
This should not be used in practice and is mostly here for legacy reasons.
-
add_plugin
(plugin)[source]¶ Add external plugin to scheduler
See https://distributed.readthedocs.io/en/latest/plugins.html
-
add_worker
(comm=None, address=None, keys=(), ncores=None, name=None, resolve_address=True, nbytes=None, now=None, resources=None, host_info=None, **info)[source]¶ Add a new worker to the cluster
-
broadcast
(comm=None, msg=None, workers=None, hosts=None, nanny=False)[source]¶ Broadcast message to workers, return all results
-
close
(comm=None, fast=False)[source]¶ Send cleanup signal to all coroutines then wait until finished
See also
-
close_worker
(stream=None, worker=None, safe=None)[source]¶ Remove a worker from the cluster
This both removes the worker from our local state and also sends a signal to the worker to shut down. This works regardless of whether or not the worker has a nanny process restarting it
-
coerce_address
(addr, resolve=True)[source]¶ Coerce possible input addresses to canonical form. resolve can be disabled for testing with fake hostnames.
Handles strings, tuples, or aliases.
-
feed
(comm, function=None, setup=None, teardown=None, interval=1, **kwargs)[source]¶ Provides a data Comm to external requester
Caution: this runs arbitrary Python code on the scheduler. This should eventually be phased out. It is mostly used by diagnostics.
-
get_comm_cost
(key, worker)[source]¶ Get the estimated communication cost (in s.) to compute key on the given worker.
-
get_task_duration
(key, default=0.5)[source]¶ Get the estimated computation cost of the given key (not including any communication cost).
-
get_worker_service_addr
(worker, service_name)[source]¶ Get the (host, port) address of the named service on the worker. Returns None if the service doesn’t exist.
-
handle_client
(comm, client=None)[source]¶ Listen and respond to messages from clients
This runs once per Client Comm or Queue.
See also
Scheduler.worker_stream
- The equivalent function for workers
-
handle_long_running
(key=None, worker=None, compute_duration=None)[source]¶ A task has seceded from the thread pool
We stop the task from being stolen in the future, and change task duration accounting as if the task has stopped.
-
handle_worker
(worker)[source]¶ Listen to responses from a single worker
This is the main loop for scheduler-worker interaction
See also
Scheduler.handle_client
- Equivalent coroutine for clients
-
rebalance
(comm=None, keys=None, workers=None)[source]¶ Rebalance keys so that each worker stores roughly equal bytes
Policy
This orders the workers by what fraction of bytes of the existing keys they have. It walks down this list from most-to-least. At each worker it sends the largest results it can find and sends them to the least occupied worker until either the sender or the recipient are at the average expected load.
-
reevaluate_occupancy
()[source]¶ Periodically reassess task duration time
The expected duration of a task can change over time. Unfortunately we don’t have a good constant-time way to propagate the effects of these changes out to the summaries that they affect, like the total expected runtime of each of the workers, or what tasks are stealable.
In this coroutine we walk through all of the workers and re-align their estimates with the current state of tasks. We do this periodically rather than at every transition, and we only do it if the scheduler process isn’t under load (using psutil.Process.cpu_percent()). This lets us avoid this fringe optimization when we have better things to think about.
-
remove_worker
(comm=None, address=None, safe=False, close=True)[source]¶ Remove worker from cluster
We do this when a worker reports that it plans to leave or when it appears to be unresponsive. This may send its tasks back to a released state.
-
replicate
(comm=None, keys=None, n=None, workers=None, branching_factor=2, delete=True)[source]¶ Replicate data throughout cluster
This performs a tree copy of the data throughout the network individually on each piece of data.
Parameters: keys: Iterable
list of keys to replicate
n: int
Number of replications we expect to see within the cluster
branching_factor: int, optional
The number of workers that can copy data in each generation
See also
-
report
(msg, client=None)[source]¶ Publish updates to all listening Queues and Comms
If the message contains a key then we only send the message to those comms that care about the key.
-
reschedule
(key=None, worker=None)[source]¶ Reschedule a task
Things may have shifted and this task may now be better suited to run elsewhere
-
run_function
(stream, function, args=(), kwargs={})[source]¶ Run a function within this process
See also
Client.run_on_scheduler
-
scatter
(comm=None, data=None, workers=None, client=None, broadcast=False, timeout=2)[source]¶ Send data out to workers
See also
-
start
(addr_or_port=8786, start_queues=True)[source]¶ Clear out old state and restart all running coroutines
-
start_ipython
(comm=None)[source]¶ Start an IPython kernel
Returns Jupyter connection info dictionary.
-
stimulus_cancel
(comm, keys=None, client=None, force=False)[source]¶ Stop execution on a list of keys
-
stimulus_missing_data
(cause=None, key=None, worker=None, ensure=True, **kwargs)[source]¶ Mark that certain keys have gone missing. Recover.
-
stimulus_task_erred
(key=None, worker=None, exception=None, traceback=None, **kwargs)[source]¶ Mark that a task has erred on a particular worker
-
stimulus_task_finished
(key=None, worker=None, **kwargs)[source]¶ Mark that a task has finished execution on a particular worker
-
transition
(key, finish, *args, **kwargs)[source]¶ Transition a key from its current state to the finish state
Returns: Dictionary of recommendations for future transitions See also
Scheduler.transitions
- transitive version of this function
Examples
>>> self.transition('x', 'waiting') {'x': 'processing'}
-
transition_story
(*keys)¶ Get all transitions that touch one of the input keys
-
transitions
(recommendations)[source]¶ Process transitions until none are left
This includes feedback from previous transitions and continues until we reach a steady state
-
update_data
(comm=None, who_has=None, nbytes=None, client=None)[source]¶ Learn that new data has entered the network from an external source
See also
Scheduler.mark_key_in_memory
-
update_graph
(client=None, tasks=None, keys=None, dependencies=None, restrictions=None, priority=None, loose_restrictions=None, resources=None, submitting_task=None, retries=None)[source]¶ Add new computations to the internal dask graph
This happens whenever the Client calls submit, map, get, or compute.
-
valid_workers
(key)[source]¶ Return set of currently valid worker addresses for key
If all workers are valid then this returns
True
. This checks tracks the following state:- worker_restrictions
- host_restrictions
- resource_restrictions
-
worker_objective
(key, worker)[source]¶ Objective function to determine which worker should get the key
Minimize expected start time. If a tie then break with data storate.
-
worker_send
(worker, msg)[source]¶ Send message to worker
This also handles connection failures by adding a callback to remove the worker on the next cycle.
-
workers_list
(workers)[source]¶ List of qualifying workers
Takes a list of worker addresses or hostnames. Returns a list of all worker addresses that match
-
workers_to_close
(memory_ratio=2)[source]¶ Find workers that we can close with low cost
This returns a list of workers that are good candidates to retire. These workers are not running anything and are storing relatively little data relative to their peers. If all workers are idle then we still maintain enough workers to have enough RAM to store our data, with a comfortable buffer.
This is for use with systems like
distributed.deploy.adaptive
.Parameters: memory_factor: Number
Amount of extra space we want to have for our stored data. Defaults two 2, or that we want to have twice as much memory as we currently have data.
Returns: to_close: list of workers that are OK to close
-
distributed.scheduler.
decide_worker
(dependencies, occupancy, who_has, valid_workers, loose_restrictions, objective, key)[source]¶ Decide which worker should take task
>>> dependencies = {'c': {'b'}, 'b': {'a'}} >>> occupancy = {'alice:8000': 0, 'bob:8000': 0} >>> who_has = {'a': {'alice:8000'}} >>> nbytes = {'a': 100} >>> ncores = {'alice:8000': 1, 'bob:8000': 1} >>> valid_workers = True >>> loose_restrictions = set()
We choose the worker that has the data on which ‘b’ depends (alice has ‘a’)
>>> decide_worker(dependencies, occupancy, who_has, has_what, ... valid_workers, loose_restrictions, nbytes, ncores, 'b') 'alice:8000'
If both Alice and Bob have dependencies then we choose the less-busy worker
>>> who_has = {'a': {'alice:8000', 'bob:8000'}} >>> has_what = {'alice:8000': {'a'}, 'bob:8000': {'a'}} >>> decide_worker(dependencies, who_has, has_what, ... valid_workers, loose_restrictions, nbytes, ncores, 'b') 'bob:8000'
Optionally provide valid workers of where jobs are allowed to occur
>>> valid_workers = {'alice:8000', 'charlie:8000'} >>> decide_worker(dependencies, who_has, has_what, ... valid_workers, loose_restrictions, nbytes, ncores, 'b') 'alice:8000'
If the task requires data communication, then we choose to minimize the number of bytes sent between workers. This takes precedence over worker occupancy.
>>> dependencies = {'c': {'a', 'b'}} >>> who_has = {'a': {'alice:8000'}, 'b': {'bob:8000'}} >>> has_what = {'alice:8000': {'a'}, 'bob:8000': {'b'}} >>> nbytes = {'a': 1, 'b': 1000}
>>> decide_worker(dependencies, who_has, has_what, ... {}, set(), nbytes, ncores, 'c') 'bob:8000'