Source code for astromatic_wrapper.utils.pipeline

# Copyright 2015 Fred Moolekamp
# BSD 3-clause license
"""
Class and functions to define an astronomy pipeline
"""
import os
import subprocess
import copy
import logging
import warnings

logger = logging.getLogger('astromatic.pipeline')

[docs]class PipelineError(Exception): """ Errors generated by running Pipeline """ pass
[docs]def str_2_bool(bool_str): """ Case independent function to convert a string representation of a boolean (``'true'``/``'false'``, ``'yes'``/``'no'``) into a ``bool``. This is case insensitive, and will also accept part of a boolean string (``'t'``/``'f'``, ``'y'``/``'n'``). Raises a :py:class:`astromatic.utils.pipeline.PipelineError` if an invalid expression is entered. Parameters ---------- bool_str: str String to convert to a boolean """ lower_str = bool_str.lower() if 'true'.startswith(lower_str) or 'yes'.startswith(lower_str): return True elif 'false'.startswith(lower_str) or 'no'.startswith(lower_str): return False else: raise PipelineError( "'{0}' did not match a boolean expression " " (true/false, yes/no, t/f, y/n)".format(bool_str))
[docs]def get_bool(prompt): """ Prompt a user for a boolean expression and repeat until a valid boolean has been entered. Parameters ---------- prompt: str The text to prompt the user with. """ try: bool_str = str_2_bool(raw_input(prompt)) except PipelineError: print( "'{0}' did not match a boolean expression " "(true/false, yes/no, t/f, y/n)".format(bool_str)) return get_bool(prompt) return bool_str
[docs]def create_paths(paths): """ Search for paths on the server. If a path does not exist, create the necessary directories. For example, if ``paths=['~/Documents/images/2014-6-5_data/']`` and only the path *'~/Documents'* exists, both *'~/Documents/images/'* and *'~/Documents/images/2014-6-5_data/'* are created. Parameters ---------- paths: str or list of strings If paths is a string, this is the path to search for and create. If paths is a list, each one is a path to search for and create """ from astropy.extern.six import string_types if isinstance(paths, string_types): paths=[paths] for path in paths: try: os.makedirs(path) except OSError: if not os.path.isdir(path): raise PipelineError("Problem creating new directory, check user permissions")
[docs]def check_path(path, auto_create=False): """ Check if a path exists and if it doesn't, give the user the option to create it. Parameters ---------- path: str Name of the path to check auto_create: bool (optional) If the path does not exist and ``auto_create==True``, the path will automatically be created, otherwise the user will be prompted to create the path """ if not os.path.exists(path): if auto_create or get_bool("'{0}' does not exist, create (y/n)?".format(path)): create_paths(path) else: raise PipelineError("{0} does not exist".format(path))
[docs]class Pipeline(object): def __init__(self, paths={}, pipeline_name=None, next_id=0, create_paths=False, **kwargs): """ Parameters ---------- paths: dict (optional) Paths used for files generated by the pipeline. Each key will be added to the ``Pipeline`` class as the name of a path, and its corresponding value is the path to be used. If ``create_paths==True``, the path will automatically be created on the disk if it does not exist, otherwise the user will be asked whether or not to create the path. At a minimum it is recommended to define a ``temp_path``, used to store temporary files generated by the pipeline and a ``log_path``, used to save any log files created by the pipeline and the pipeline itself after each step. pipeline_name: str (optional) Name of the pipeline (used when saving the pipeline). The default value is ``None``, which results in the current date being used for the pipeline name in the form 'year-month-day_hours-minutes-seconds'. steps: list of `astromatic.utils.pipeline.PipelineStep` (optional) If the user already has a list of steps to run they can be set when the pipeline is initialized next_id: int (optional) Next number to use for a pipeline step id. The default is ``0`` create_paths: bool (optional) If ``create_paths==True``, any path in ``paths`` that does not exist is created. Otherwise the user will be prompted if a path does not exist. The default is to prompt the user (``create_paths==False``). kwargs: dict Additional keyword arguments that might be used in a custom pipeline. """ self.create_paths = create_paths self.name = pipeline_name self.steps = [] self.next_id = next_id self.run_steps = None self.run_warnings = None self.run_step_idx = 0 self.paths = paths # Set additional keyword arguements for key, value in kwargs.items(): setattr(self, key, value) # If the pipeline doesn't have a name, use the current time if self.name is None: import datetime self.name = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S_pipeline') # Create (or prompt user to create) any specified paths that do not exist # and add them to the class members for path_name, path in self.paths.items(): check_path(path, self.create_paths) # Warn the user if any of the recommended paths do not exist if 'temp' not in self.paths: warnings.warn( "'temp' path has not been set for the pipeline. " "If this pipeline generates temporary files an error may occur") if 'log' not in self.paths: warnings.warn( "'log' path has not been set for the pipeline. Log files will not be saved.")
[docs] def add_step(self, func, tags=[], ignore_errors=False, ignore_exceptions=False, **kwargs): """ Add a new `PipelineStep` to the pipeline Parameters ---------- func: function A function to be run in the pipeline. All functions must return a dictionary with (at a minimum) a ``status`` key whose value is either ``success`` or ``error``. It is also common to return a ``warnings`` key whose value is an astropy table that contains a list of warnings that may have occured during the step. The entire result dictionary returned from the function is saved in the pipeline's log file. tags: list (optional) A list of tags used to identify the step. When running the pipeline the user can specify a set of conditions that will filter which steps are run (or not run) based on a set of specified tags ignore_errors: bool (optional) If ``ignore_errors==False`` the pipeline will raise an exception if an error occurred during this step in the pipeline (meaning it returned a result with ``result['status']=='error'``). The default is ``False``. ignore_exceptions: bool (optional) If ``ignore_exceptions==True`` the pipeline will set ``result['status']=='error'`` for the step that threw an exception and continue running. The default is ``ignore_exceptions==False``, which will stop the pipeline and raise an exception. kwargs: dict Keyword arguments passed to the ``func`` when the pipeline is run """ step_id = self.next_id self.next_id += 1 self.steps.append(PipelineStep( func, step_id, tags, ignore_errors, ignore_exceptions, kwargs ))
[docs] def run(self, run_tags=[], ignore_tags=[], run_steps=None, run_name=None, resume=False, ignore_errors=None, ignore_exceptions=None, start_idx=None, current_step_idx=None): """ Run the pipeline given a list of PipelineSteps Parameters ---------- run_tags: list (optional) Run all steps that have a tag listed in ``run_tags`` and not in ``ignore_tags``. If ``len(run_tags)==0`` then all of the steps are run that are not listed in ignore tags. ignore_tags: list (optional) Ignore all steps that contain one of the tags in ``ignore_tags``. run_steps: list of `PipelineStep` (optional) Instead of running the steps associated with a pipeline, the user can specify a set of steps to run. This can be useful if (for example) mulitple criteria are used to select steps to run and the user wants to perform these cuts in some other function to generate the necessary steps to run. run_name: str (optional) Name of the current run. When a pipeline is run, if a ``logpath`` has been specified then a copy of the pipline with a record of all warnings and steps run is saved in the ``logpath`` directory. A ``run_name`` can be specified to distinguish between different runs of the same pipeline with the same ``logpath``. resume: bool (optional) If ``resume==True`` and ``start_idx is None``, the pipeline will continue where it left off. If ``resume==False`` and ``start_idx is None`` then the pipeline will start at the first step (Pipeline.run_step_idx=0). The default is ``resume=False``. ignore_errors: bool (optional) If ``ignore_errors==False`` the pipeline will raise an exception if an error occurred during any step in the pipeline which returned a result with ``result['status']=='error'``. The default is ``None``, which will use the ``ignore_errors`` parameter for each individual step to decide whether or not to throw an exception. ignore_exceptions: bool (optional) If ``ignore_exceptions==True`` the pipeline will set ``result['status']=='error'`` for the step that threw an exception and continue running. The default is ``None``, which will use the ``ignore_exception`` parameter for each individual step to decide whether or not to throw an exception. start_idx: int (optional) Index of ``Pipeline.run_steps`` to begin running the pipeline. All steps in ``Pipeline.run_steps`` after ``start_idx`` will be run in order. The default value is ``None``, which will not change the current ``Pipeline.run_step_idx``. """ import inspect # If no steps are specified and the user is not resuming a previous run, # run all of the steps associated with the pipeline if run_steps is not None: self.run_steps = run_steps elif self.run_steps is None or not resume: self.run_steps = [step for step in self.steps] # Filter the steps based on run_tags and ignore_tags, with ignore tags # taking precendent self.run_steps = [step for step in self.run_steps if (len(run_tags) == 0 or any([tag in run_tags for tag in step.tags])) and not any([tag in ignore_tags for tag in step.tags])] # Set the path of the log file for the current run dill_dump=False if 'log' in self.paths: if run_name is None: logfile = os.path.join(self.paths['log'], 'pipeline.p') else: logfile = os.path.join(self.paths['log'], 'pipeline-{0}.p'.format(run_name)) logger.info('Pipeline state will be saved to {0}'.format(logfile)) # Save the pipeline in the log directory try: import dill dill_dump=True dill.dump(self, open(logfile, 'wb')) except ImportError: warnings.warn( 'Pipeline requires "dill" package to save log file. ' 'Attempting to use pickle') import pickle try: pickle.dump(self, open(logfile, 'wb')) except: warnings.warn('Unable to dump using pickle, no log file will be saved') # If the user specifies a starting index use it, otherwise start at the # first step unless the user specified to resume where it left off if start_idx is not None: self.run_step_idx = start_idx elif not resume: self.run_step_idx = 0 # Run each step in order steps = self.run_steps[self.run_step_idx:] for step in steps: logger.info('running step {0}: {1}'.format(step.step_id, step.tags)) logger.debug('function kwargs: {0}'.format(step.func_kwargs)) func_kwargs = step.func_kwargs.copy() # Some functions use step_id to keep track of log files, so the id of # the current step is added to the funciton call if 'step_id' in inspect.getargspec(step.func).args: func_kwargs['step_id'] = step.step_id # Some functions require the Pipeline as a parameter, # so pass the pipeline to the function if 'pipeline' in inspect.getargspec(step.func).args: func_kwargs['pipeline'] = self # Attempt to run the step. If an exception occurs, use the # ignore_exceptions parameter to determine whether to # stop the Pipeline's execution or warn the user and # continue if (ignore_exceptions is not None and ignore_exceptions) or ( ignore_exceptions is None and step.ignore_exceptions): try: result = step.func(**func_kwargs) except Exception as error: import traceback warning_str = "Exception occurred during step {0} (run_step_idx {1})".format( step.step_id, self.run_step_idx) warnings.warn(warning_str) result = { 'status': 'error', 'error': traceback.format_exc() } else: result = step.func(**func_kwargs) step.results = result # Check that the result is a dictionary with a 'status' key if result is None or not isinstance(result, dict) or 'status' not in result: warning_str = "Step {0} (run_step_idx {1}) did not return a valid result".format( step.step_id, self.run_step_idx) warnings.warn(warning_str) result = { 'status': 'unknown', 'result': result } # If there was an error in the step, use ignore_errors to determine whether # or not to raise an exception if result['status'].lower() == 'error': if ((ignore_errors is None and not step.ignore_errors) or not ignore_errors): raise PipelineError( 'Error returned in step {0} (run_step_idx {1})'.format( step.step_id, self.run_step_idx )) else: warning_str = "Error in step {0} (run_step_idx{1})".format( step.step_id, self.run_step_idx) warning_str += ", see results for more" warnings.warn(warning_str) # Increase the run_step_idx and save the pipeline self.run_step_idx+=1 if dill_dump: dill.dump(self, open(logfile, 'wb')) else: try: import pickle pickle.dump(self, open(logfile, 'wb')) except: warnings.warn('Unable to dump using pickle, no log file will be saved') result = { 'status': 'success', 'warnings': self.get_result_table('warnings', ['filename']) } return result
[docs] def get_result_table(self, key, meta_fields=[]): """ Get a specific key from the results of each step in a pipeline that has already been run. This function expects the result for the given key to be an astropy Table that can be vstacked with the results from previous steps. Parameters ---------- key: str Name of the key in the result dictionary for each step to extract. For example, ``key='warnings'`` will return a table with the warnings generated by each step in the pipeline. meta_fields: list (optional) List of metadata fields to include in each record. For example, astromatic_wrapper warning tables have a 'filename' metadata field, which gives the name of the XML file generated by the AstrOmatic code. Returns ------- all_results: `astropy.table.Table` A table with the results from """ from astropy.table import vstack all_results = None for step in self.steps: result_tbl = None if step.results is not None: if key in step.results: result_tbl = step.results[key] # Add the step number and name of the function called in the step # to the record for each item in the results table. result_tbl['step'] = step.step_id result_tbl['func'] = step.func.__name__ if result_tbl is not None: for f in meta_fields: if f in result_tbl.meta: result_tbl[f] = result_tbl.meta[f] else: warnings.warn("'{0}' not found in table metadata".format(f)) if all_results is None: all_results = result_tbl else: all_results = vstack([all_results, result_tbl]) return all_results
[docs]class PipelineStep: """ A single step in the pipeline. This takes a function and a set of tags and kwargs associated with it and stores them in the pipeline. """ def __init__(self, func, step_id, tags=[], ignore_errors=False, ignore_exceptions=False, func_kwargs={}): """ Initialize a PipelineStep object Parameters ---------- func: function The function to be run. All functions must return a dictionary with at a minimum a ``status`` key whose value is either ``success`` or ``error``. id: str Unique identifier for the step tags: list (optional) A list of tags used to identify the step. When running the pipeline the user can specify a set of conditions that will filter which steps are run (or not run) based on a set of specified tags ignore_errors: bool (optional) If ``ignore_errors==False`` the pipeline will raise an exception if an error occurred during this step in the pipeline, which returned a result with ``result['status']=='error'``. The default is ``False``. ignore_exceptions: bool (optional) If ``ignore_exceptions==True`` the pipeline will set ``result['status']=='error'`` for the step that threw an exception and continue running. The default is ``ignore_exceptions==False``, which will stop the pipeline and raise an exception. func_kwargs: dict Keyword arguments passed to the ``func`` when the pipeline is run """ self.func = func self.tags = tags self.step_id = step_id self.ignore_errors = ignore_errors self.ignore_exceptions = ignore_exceptions self.func_kwargs = func_kwargs self.results = None