# Copyright 2015 Fred Moolekamp
# BSD 3-clause license
"""
Class and functions to define an astronomy pipeline
"""
import os
import subprocess
import copy
import logging
import warnings
logger = logging.getLogger('astromatic.pipeline')
[docs]class PipelineError(Exception):
"""
Errors generated by running Pipeline
"""
pass
[docs]def str_2_bool(bool_str):
"""
Case independent function to convert a string representation of a
boolean (``'true'``/``'false'``, ``'yes'``/``'no'``) into a ``bool``. This is case
insensitive, and will also accept part of a boolean string
(``'t'``/``'f'``, ``'y'``/``'n'``).
Raises a :py:class:`astromatic.utils.pipeline.PipelineError`
if an invalid expression is entered.
Parameters
----------
bool_str: str
String to convert to a boolean
"""
lower_str = bool_str.lower()
if 'true'.startswith(lower_str) or 'yes'.startswith(lower_str):
return True
elif 'false'.startswith(lower_str) or 'no'.startswith(lower_str):
return False
else:
raise PipelineError(
"'{0}' did not match a boolean expression "
" (true/false, yes/no, t/f, y/n)".format(bool_str))
[docs]def get_bool(prompt):
"""
Prompt a user for a boolean expression and repeat until a valid boolean
has been entered.
Parameters
----------
prompt: str
The text to prompt the user with.
"""
try:
bool_str = str_2_bool(raw_input(prompt))
except PipelineError:
print(
"'{0}' did not match a boolean expression "
"(true/false, yes/no, t/f, y/n)".format(bool_str))
return get_bool(prompt)
return bool_str
[docs]def create_paths(paths):
"""
Search for paths on the server. If a path does not exist, create the necessary directories.
For example, if ``paths=['~/Documents/images/2014-6-5_data/']`` and only the path
*'~/Documents'* exists, both *'~/Documents/images/'* and
*'~/Documents/images/2014-6-5_data/'* are created.
Parameters
----------
paths: str or list of strings
If paths is a string, this is the path to search for and create.
If paths is a list, each one is a path to search for and create
"""
from astropy.extern.six import string_types
if isinstance(paths, string_types):
paths=[paths]
for path in paths:
try:
os.makedirs(path)
except OSError:
if not os.path.isdir(path):
raise PipelineError("Problem creating new directory, check user permissions")
[docs]def check_path(path, auto_create=False):
"""
Check if a path exists and if it doesn't, give the user the option to create it.
Parameters
----------
path: str
Name of the path to check
auto_create: bool (optional)
If the path does not exist and ``auto_create==True``, the path will
automatically be created, otherwise the user will be prompted to create the path
"""
if not os.path.exists(path):
if auto_create or get_bool("'{0}' does not exist, create (y/n)?".format(path)):
create_paths(path)
else:
raise PipelineError("{0} does not exist".format(path))
[docs]class Pipeline(object):
def __init__(self, paths={}, pipeline_name=None,
next_id=0, create_paths=False, **kwargs):
"""
Parameters
----------
paths: dict (optional)
Paths used for files generated by the pipeline. Each key will be
added to the ``Pipeline`` class as the name of a path, and its
corresponding value is the path to be used. If ``create_paths==True``,
the path will automatically be created on the disk if it does not
exist, otherwise the user will be asked whether or not to create the path.
At a minimum it is recommended to define a ``temp_path``, used to
store temporary files generated by the pipeline and a ``log_path``,
used to save any log files created by the pipeline and the pipeline itself
after each step.
pipeline_name: str (optional)
Name of the pipeline (used when saving the pipeline). The default
value is ``None``, which results in the current date being used
for the pipeline name in the form
'year-month-day_hours-minutes-seconds'.
steps: list of `astromatic.utils.pipeline.PipelineStep` (optional)
If the user already has a list of steps to run they can be
set when the pipeline is initialized
next_id: int (optional)
Next number to use for a pipeline step id. The default is ``0``
create_paths: bool (optional)
If ``create_paths==True``, any path in ``paths`` that does not exist
is created. Otherwise the user will be prompted if a path does not
exist. The default is to prompt the user (``create_paths==False``).
kwargs: dict
Additional keyword arguments that might be used in a custom pipeline.
"""
self.create_paths = create_paths
self.name = pipeline_name
self.steps = []
self.next_id = next_id
self.run_steps = None
self.run_warnings = None
self.run_step_idx = 0
self.paths = paths
# Set additional keyword arguements
for key, value in kwargs.items():
setattr(self, key, value)
# If the pipeline doesn't have a name, use the current time
if self.name is None:
import datetime
self.name = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S_pipeline')
# Create (or prompt user to create) any specified paths that do not exist
# and add them to the class members
for path_name, path in self.paths.items():
check_path(path, self.create_paths)
# Warn the user if any of the recommended paths do not exist
if 'temp' not in self.paths:
warnings.warn(
"'temp' path has not been set for the pipeline. "
"If this pipeline generates temporary files an error may occur")
if 'log' not in self.paths:
warnings.warn(
"'log' path has not been set for the pipeline. Log files will not be saved.")
[docs] def add_step(self, func, tags=[], ignore_errors=False, ignore_exceptions=False, **kwargs):
"""
Add a new `PipelineStep` to the pipeline
Parameters
----------
func: function
A function to be run in the pipeline. All functions must return
a dictionary with (at a minimum) a ``status`` key whose value is either
``success`` or ``error``. It is also common to return a ``warnings`` key whose
value is an astropy table that contains a list of warnings that may have
occured during the step. The entire result dictionary returned from the function
is saved in the pipeline's log file.
tags: list (optional)
A list of tags used to identify the step. When running the pipeline the user
can specify a set of conditions that will filter which steps are run (or not run)
based on a set of specified tags
ignore_errors: bool (optional)
If ``ignore_errors==False`` the pipeline will raise an exception if an error
occurred during this step in the pipeline (meaning it returned a result with
``result['status']=='error'``). The default is ``False``.
ignore_exceptions: bool (optional)
If ``ignore_exceptions==True`` the pipeline will set ``result['status']=='error'``
for the step that threw an exception and continue running. The default is
``ignore_exceptions==False``, which will stop the pipeline and raise an
exception.
kwargs: dict
Keyword arguments passed to the ``func`` when the pipeline is run
"""
step_id = self.next_id
self.next_id += 1
self.steps.append(PipelineStep(
func,
step_id,
tags,
ignore_errors,
ignore_exceptions,
kwargs
))
[docs] def run(self, run_tags=[], ignore_tags=[], run_steps=None, run_name=None,
resume=False, ignore_errors=None, ignore_exceptions=None,
start_idx=None, current_step_idx=None):
"""
Run the pipeline given a list of PipelineSteps
Parameters
----------
run_tags: list (optional)
Run all steps that have a tag listed in ``run_tags`` and not in ``ignore_tags``.
If ``len(run_tags)==0`` then all of the steps are run that are not listed
in ignore tags.
ignore_tags: list (optional)
Ignore all steps that contain one of the tags in ``ignore_tags``.
run_steps: list of `PipelineStep` (optional)
Instead of running the steps associated with a pipeline, the user can specify
a set of steps to run. This can be useful if (for example) mulitple criteria
are used to select steps to run and the user wants to perform these cuts in
some other function to generate the necessary steps to run.
run_name: str (optional)
Name of the current run. When a pipeline is run, if a ``logpath`` has been
specified then a copy of the pipline with a record of all warnings and
steps run is saved in the ``logpath`` directory. A ``run_name`` can be specified
to distinguish between different runs of the same pipeline with the same
``logpath``.
resume: bool (optional)
If ``resume==True`` and ``start_idx is None``, the pipeline will continue
where it left off. If ``resume==False`` and ``start_idx is None`` then the
pipeline will start at the first step (Pipeline.run_step_idx=0). The
default is ``resume=False``.
ignore_errors: bool (optional)
If ``ignore_errors==False`` the pipeline will raise an exception if an error
occurred during any step in the pipeline which returned a result with
``result['status']=='error'``. The default is ``None``, which will use
the ``ignore_errors`` parameter for each individual step to decide whether
or not to throw an exception.
ignore_exceptions: bool (optional)
If ``ignore_exceptions==True`` the pipeline will set ``result['status']=='error'``
for the step that threw an exception and continue running. The default is ``None``,
which will use the ``ignore_exception`` parameter for each individual step to
decide whether or not to throw an exception.
start_idx: int (optional)
Index of ``Pipeline.run_steps`` to begin running the pipeline. All steps in
``Pipeline.run_steps`` after ``start_idx`` will be run in order. The default
value is ``None``, which will not change the current ``Pipeline.run_step_idx``.
"""
import inspect
# If no steps are specified and the user is not resuming a previous run,
# run all of the steps associated with the pipeline
if run_steps is not None:
self.run_steps = run_steps
elif self.run_steps is None or not resume:
self.run_steps = [step for step in self.steps]
# Filter the steps based on run_tags and ignore_tags, with ignore tags
# taking precendent
self.run_steps = [step for step in self.run_steps if
(len(run_tags) == 0 or any([tag in run_tags for tag in step.tags])) and
not any([tag in ignore_tags for tag in step.tags])]
# Set the path of the log file for the current run
dill_dump=False
if 'log' in self.paths:
if run_name is None:
logfile = os.path.join(self.paths['log'], 'pipeline.p')
else:
logfile = os.path.join(self.paths['log'], 'pipeline-{0}.p'.format(run_name))
logger.info('Pipeline state will be saved to {0}'.format(logfile))
# Save the pipeline in the log directory
try:
import dill
dill_dump=True
dill.dump(self, open(logfile, 'wb'))
except ImportError:
warnings.warn(
'Pipeline requires "dill" package to save log file. '
'Attempting to use pickle')
import pickle
try:
pickle.dump(self, open(logfile, 'wb'))
except:
warnings.warn('Unable to dump using pickle, no log file will be saved')
# If the user specifies a starting index use it, otherwise start at the
# first step unless the user specified to resume where it left off
if start_idx is not None:
self.run_step_idx = start_idx
elif not resume:
self.run_step_idx = 0
# Run each step in order
steps = self.run_steps[self.run_step_idx:]
for step in steps:
logger.info('running step {0}: {1}'.format(step.step_id, step.tags))
logger.debug('function kwargs: {0}'.format(step.func_kwargs))
func_kwargs = step.func_kwargs.copy()
# Some functions use step_id to keep track of log files, so the id of
# the current step is added to the funciton call
if 'step_id' in inspect.getargspec(step.func).args:
func_kwargs['step_id'] = step.step_id
# Some functions require the Pipeline as a parameter,
# so pass the pipeline to the function
if 'pipeline' in inspect.getargspec(step.func).args:
func_kwargs['pipeline'] = self
# Attempt to run the step. If an exception occurs, use the
# ignore_exceptions parameter to determine whether to
# stop the Pipeline's execution or warn the user and
# continue
if (ignore_exceptions is not None and ignore_exceptions) or (
ignore_exceptions is None and step.ignore_exceptions):
try:
result = step.func(**func_kwargs)
except Exception as error:
import traceback
warning_str = "Exception occurred during step {0} (run_step_idx {1})".format(
step.step_id, self.run_step_idx)
warnings.warn(warning_str)
result = {
'status': 'error',
'error': traceback.format_exc()
}
else:
result = step.func(**func_kwargs)
step.results = result
# Check that the result is a dictionary with a 'status' key
if result is None or not isinstance(result, dict) or 'status' not in result:
warning_str = "Step {0} (run_step_idx {1}) did not return a valid result".format(
step.step_id, self.run_step_idx)
warnings.warn(warning_str)
result = {
'status': 'unknown',
'result': result
}
# If there was an error in the step, use ignore_errors to determine whether
# or not to raise an exception
if result['status'].lower() == 'error':
if ((ignore_errors is None and not step.ignore_errors) or
not ignore_errors):
raise PipelineError(
'Error returned in step {0} (run_step_idx {1})'.format(
step.step_id, self.run_step_idx
))
else:
warning_str = "Error in step {0} (run_step_idx{1})".format(
step.step_id, self.run_step_idx)
warning_str += ", see results for more"
warnings.warn(warning_str)
# Increase the run_step_idx and save the pipeline
self.run_step_idx+=1
if dill_dump:
dill.dump(self, open(logfile, 'wb'))
else:
try:
import pickle
pickle.dump(self, open(logfile, 'wb'))
except:
warnings.warn('Unable to dump using pickle, no log file will be saved')
result = {
'status': 'success',
'warnings': self.get_result_table('warnings', ['filename'])
}
return result
[docs] def get_result_table(self, key, meta_fields=[]):
"""
Get a specific key from the results of each step in a pipeline that has already been
run. This function expects the result for the given key to be an astropy Table that
can be vstacked with the results from previous steps.
Parameters
----------
key: str
Name of the key in the result dictionary for each step to extract. For example,
``key='warnings'`` will return a table with the warnings generated by each step
in the pipeline.
meta_fields: list (optional)
List of metadata fields to include in each record. For example,
astromatic_wrapper warning tables have a 'filename' metadata field, which
gives the name of the XML file generated by the AstrOmatic code.
Returns
-------
all_results: `astropy.table.Table`
A table with the results from
"""
from astropy.table import vstack
all_results = None
for step in self.steps:
result_tbl = None
if step.results is not None:
if key in step.results:
result_tbl = step.results[key]
# Add the step number and name of the function called in the step
# to the record for each item in the results table.
result_tbl['step'] = step.step_id
result_tbl['func'] = step.func.__name__
if result_tbl is not None:
for f in meta_fields:
if f in result_tbl.meta:
result_tbl[f] = result_tbl.meta[f]
else:
warnings.warn("'{0}' not found in table metadata".format(f))
if all_results is None:
all_results = result_tbl
else:
all_results = vstack([all_results, result_tbl])
return all_results
[docs]class PipelineStep:
"""
A single step in the pipeline. This takes a function and a set of tags and kwargs
associated with it and stores them in the pipeline.
"""
def __init__(self, func, step_id, tags=[], ignore_errors=False, ignore_exceptions=False,
func_kwargs={}):
"""
Initialize a PipelineStep object
Parameters
----------
func: function
The function to be run. All functions must return a dictionary with at a
minimum a ``status`` key whose value is either ``success`` or ``error``.
id: str
Unique identifier for the step
tags: list (optional)
A list of tags used to identify the step. When running the pipeline the user
can specify a set of conditions that will filter which steps are run (or not run)
based on a set of specified tags
ignore_errors: bool (optional)
If ``ignore_errors==False`` the pipeline will raise an exception if an error
occurred during this step in the pipeline, which returned a result with
``result['status']=='error'``. The default is ``False``.
ignore_exceptions: bool (optional)
If ``ignore_exceptions==True`` the pipeline will set ``result['status']=='error'``
for the step that threw an exception and continue running. The default is
``ignore_exceptions==False``, which will stop the pipeline and raise an
exception.
func_kwargs: dict
Keyword arguments passed to the ``func`` when the pipeline is run
"""
self.func = func
self.tags = tags
self.step_id = step_id
self.ignore_errors = ignore_errors
self.ignore_exceptions = ignore_exceptions
self.func_kwargs = func_kwargs
self.results = None