Source code for WorkflowWebTools.clusterworkflows

#pylint: disable=too-many-locals

"""
Tools for clustering workflows based on their errors.

The errors are clustered based on a generated vector.
The vector can be thought as lying on two different hyper-sphere shells.
One sphere is for the error codes that occur and
the other sphere is the sites where those errors occur.
The overall distance between two workflow will be the sum in quadrature
of the distances on these two hyper-spheres.

These distances are configurable in the server ``config.yml``.
Each sphere has a center radius, thickness, and number of errors to
be at the midpoint.
The equation to determine distance from the origin for a vector
of errors is the following.

.. math::

  \\mathrm{distance} = \\frac{d}{\\sqrt{2}} +
      2 w \\left(\\frac{|\\vec{v}|}{|\\vec{v}| + m} - 0.5\\right)

The following parameters are set in ``config.yml`` for the
site name and the error code hyperspheres separately.

- *m* is the 'midpoint' parameter is the number of errors at a given site
  or given error code that will place the workflow at the midpoint of the
  hypersphere shell.
- *d* is the 'distance' parameter is the cartesian distance between the midpoints
  of two different sites or error codes.
- *w* is the 'width' parameter which defines the total shell width that the workflow
  could possibly land on.

The direction is determined by the error code or site name distribution.
Note that this always points in the upper quadrant for a given coordinate.

The equation is chosen so that two workflows that have completely different
errors at the same site, and the error width is 0,
will end up with a distance that is equal to the error distance when clustering.
This way, site and errors can have different distance weights, and there
can be some separation for the number of errors in a workflow
(for non-zero width).

:author: Daniel Abercrombie <dabercro@mit.edu>
"""

import threading

import cherrypy
import numpy
import sklearn.cluster

from . import globalerrors
from . import serverconfig
from . import errorutils


[docs]def get_workflow_vectors(workflows, session=None, allmap=None): """ Gets the errors for workflows in a list of numpy arrays :param str workflows: the workflows that vectors are returned for :param cherrypy.Session session: Stores the information for a session :param dict allmap: a globalerrors.ErrorInfo allmap to override the session's allmap :return: a list of numpy arrays of errors for the workflow :rtype: list of numpy.array """ curs = globalerrors.check_session(session, can_refresh=True) if not allmap: allmap = globalerrors.check_session(session).get_allmap() columns = ['errorcode', 'sitename'] column_output = {} for column in columns: # Initialize with all zeros settings = serverconfig.get_cluster_settings()[column] column_output[column] = [numpy.zeros(len(allmap[column])) for _ in workflows] curs.db_lock.acquire() curs.curs.execute("SELECT SUM(numbererrors), {0}, stepname " "FROM workflows " "GROUP BY stepname, {0} " "ORDER BY {0} ASC, stepname ASC;".format(column)) numerrors, colval, stepname = curs.curs.fetchone() or (0, '', '') wfname = stepname.split('/')[1] if stepname else '' for icol, value in enumerate(allmap[column]): for iwkf, workflow in enumerate(workflows): while colval == value and workflow == wfname: column_output[column][iwkf][icol] += numerrors numerrors, colval, stepname = curs.curs.fetchone() or (0, '', '') if stepname: wfname = stepname.split('/')[1] curs.db_lock.release() # Preprocessing here for output in column_output[column]: length = numpy.linalg.norm(output) or 1.0 norm = (float(settings['distance'])/1.4142 + 2.0 * float(settings['width']) * (length/(length + float(settings['midpoint'])) - 0.5))/length output *= float(norm) return [numpy.concatenate([column_output[col][iwkf] for col in columns]) \ for iwkf, _ in enumerate(workflows)]
[docs]def get_clusterer(history_path, errors_path=''): """Use this function to get the clusterer of workflows :param str history_path: Path to the workflow historical data. This can be a local file path or a URL. :param str errors_path: The errors for a given session to include in the clustering :return: A dict of a clusterer that is fitted to historical data with its allmap. The keys are 'clusterer' and 'allmap'. :rtype: dict """ cherrypy.log('Initializing cluster session') # This will be the location of our training data fake_session = { 'info': globalerrors.ErrorInfo(history_path) } # If the path to additional errors is given, add that to the clustering data. if errors_path: errorutils.add_to_database(globalerrors.check_session(fake_session), errors_path) globalerrors.check_session(fake_session).set_all_lists() # Get the data by getting table for each workflow workflows = globalerrors.check_session(fake_session).return_workflows() # Fill the data cherrypy.log('Getting workflow vectors') data = get_workflow_vectors(workflows, fake_session) cherrypy.log('Number of datapoints to cluster: %i' % len(data)) cherrypy.log('Fitting workflows...') settings = serverconfig.get_cluster_settings() clusterer = sklearn.cluster.KMeans(n_clusters=settings['n_clusters'], n_init=settings['n_init'], n_jobs=-1) clusterer.fit(numpy.array(data)) cherrypy.log('Done') return {'clusterer': clusterer, 'allmap': fake_session['info'].get_allmap()}
[docs]def get_workflow_groups(clusterer, session=None): """Groups workflows together based on a fitted clusterer :param dict clusterer: is a dictionary with the clusterer fit with historic data and the allmap to generate it. This matches the output of :func:`get_clusterer`. :param cherrypy.Session session: Stores the information for a session :returns: A dictionary pointing workflows to a group :rtype: dict """ errorinfo = globalerrors.check_session(session, can_refresh=True) if errorinfo.clusters: return errorinfo.clusters cherrypy.log('Fitting existing workflows.') workflows = globalerrors.check_session(session).return_workflows() vectors = get_workflow_vectors(workflows, session, clusterer['allmap']) predictions = clusterer['clusterer'].predict(numpy.array(vectors)) cherrypy.log(str(predictions)) for index, workflow in enumerate(workflows): errorinfo.clusters[workflow] = predictions[index] return errorinfo.clusters
[docs]def get_clustered_group(workflow, clusterer, session=None): """Get the group for a given workflow in this session :param str workflow: The workflow to get the group for. :param sklearn.cluster.KMeans clusterer: is the clusterer fit with historic data. :param cherrypy.Session session: Stores the information for a session :returns: List of other workflows in the same group :rtype: set """ output = [] predictions = get_workflow_groups(clusterer, session) group = predictions.get(workflow) if group is not None: for wkf, cluster in predictions.iteritems(): if cluster == group and wkf != workflow: output.append(wkf) return output
CLUSTER_LOCK = threading.Lock() """ Lock that should be acquired before running clustering functions in here """