from __future__ import print_function, division, absolute_import
import logging
from tornado import gen
from ..utils import log_errors, PeriodicCallback
logger = logging.getLogger(__name__)
[docs]class Adaptive(object):
'''
Adaptively allocate workers based on scheduler load. A superclass.
Contains logic to dynamically resize a Dask cluster based on current use.
Parameters
----------
scheduler: distributed.Scheduler
cluster: object
Must have scale_up and scale_down methods/coroutines
startup_cost : int, default 1
Factor representing how costly it is to start an additional worker.
Affects quickly to adapt to high tasks per worker loads
scale_factor : int, default 2
Factor to scale by when it's determined additional workers are needed
Examples
--------
>>> class MyCluster(object):
... def scale_up(self, n):
... """ Bring worker count up to n """
... def scale_down(self, workers):
... """ Remove worker addresses from cluster """
Notes
-----
Subclasses can override :meth:`Adaptive.should_scale_up` and
:meth:`Adaptive.should_scale_down` to control when the cluster should be
resized. The default implementation checks if there are too many tasks
per worker or too little memory available (see :meth:`Adaptive.needs_cpu`
and :meth:`Adaptive.needs_memory`).
:meth:`Adaptive.get_scale_up_kwargs` method controls the arguments passed to
the cluster's ``scale_up`` method.
'''
def __init__(self, scheduler, cluster, interval=1000, startup_cost=1,
scale_factor=2):
self.scheduler = scheduler
self.cluster = cluster
self.startup_cost = startup_cost
self.scale_factor = scale_factor
self._adapt_callback = PeriodicCallback(self._adapt, interval)
self.scheduler.loop.add_callback(self._adapt_callback.start)
self._adapting = False
[docs] def needs_cpu(self):
"""
Check if the cluster is CPU constrained (too many tasks per core)
Notes
-----
Returns ``True`` if the occupancy per core is some factor larger
than ``startup_cost``.
"""
total_occupancy = self.scheduler.total_occupancy
total_cores = sum(self.scheduler.ncores.values())
if total_occupancy / (total_cores + 1e-9) > self.startup_cost * 2:
logger.info("CPU limit exceeded [%d occupancy / %d cores]",
total_occupancy, total_cores)
return True
else:
return False
[docs] def needs_memory(self):
"""
Check if the cluster is RAM constrained
Notes
-----
Returns ``True`` if the required bytes in distributed memory is some
factor larger than the actual distributed memory available.
"""
limit_bytes = {w: self.scheduler.worker_info[w]['memory_limit']
for w in self.scheduler.worker_info}
worker_bytes = self.scheduler.worker_bytes
limit = sum(limit_bytes.values())
total = sum(worker_bytes.values())
if total > 0.6 * limit:
logger.info("Ram limit exceeded [%d/%d]", limit, total)
return True
else:
return False
[docs] def should_scale_up(self):
"""
Determine whether additional workers should be added to the cluster
Returns
-------
scale_up : bool
Notes
----
Additional workers are added whenever
1. There are unrunnable tasks and no workers
2. The cluster is CPU constrained
3. The cluster is RAM constrained
See Also
--------
needs_cpu
needs_memory
"""
with log_errors():
if self.scheduler.unrunnable and not self.scheduler.ncores:
return True
needs_cpu = self.needs_cpu()
needs_memory = self.needs_memory()
if needs_cpu or needs_memory:
return True
return False
[docs] def should_scale_down(self):
"""
Determine whether any workers should potentially be removed from
the cluster.
Returns
-------
scale_down : bool
Notes
-----
``Adaptive.should_scale_down`` always returns True, so we will always
attempt to remove workers as determined by
``Scheduler.workers_to_close``.
See Also
--------
Scheduler.workers_to_close
"""
return len(self.scheduler.workers_to_close()) > 0
@gen.coroutine
def _retire_workers(self):
with log_errors():
workers = yield self.scheduler.retire_workers(remove=True,
close_workers=True)
if workers:
logger.info("Retiring workers %s", workers)
f = self.cluster.scale_down(workers)
if gen.is_future(f):
yield f
[docs] def get_scale_up_kwargs(self):
"""
Get the arguments to be passed to ``self.cluster.scale_up``.
Notes
-----
By default the desired number of total workers is returned (``n``).
Subclasses should ensure that the return dictionary includes a key-
value pair for ``n``, either by implementing it or by calling the
parent's ``get_scale_up_kwargs``.
See Also
--------
LocalCluster.scale_up
"""
instances = max(1, len(self.scheduler.ncores) * self.scale_factor)
logger.info("Scaling up to %d workers", instances)
return {'n': instances}
@gen.coroutine
def _adapt(self):
if self._adapting: # Semaphore to avoid overlapping adapt calls
return
self._adapting = True
try:
should_scale_up = self.should_scale_up()
should_scale_down = self.should_scale_down()
if should_scale_up and should_scale_down:
logger.info("Attempting to scale up and scale down simultaneously.")
else:
if should_scale_up:
kwargs = self.get_scale_up_kwargs()
f = self.cluster.scale_up(**kwargs)
if gen.is_future(f):
yield f
if should_scale_down:
yield self._retire_workers()
finally:
self._adapting = False
def adapt(self):
self.scheduler.loop.add_callback(self._adapt)