Pipeline helper
Pipeline helper class to create pipelines loading modules from a flexible manifest.
AMLPipelineHelper
Helper class for building pipelines
__init__(self, config, module_loader=None)
special
Constructs the pipeline helper.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
DictConfig |
config for this object |
required |
module_loader |
AMLModuleLoader |
which module loader to (re)use |
None |
Source code in shrike/pipeline/pipeline_helper.py
def __init__(self, config, module_loader=None):
"""Constructs the pipeline helper.
Args:
config (DictConfig): config for this object
module_loader (AMLModuleLoader): which module loader to (re)use
"""
self.config = config
if module_loader is None:
log.info(
f"Creating instance of AMLModuleLoader for {self.__class__.__name__}"
)
self.module_loader = AMLModuleLoader(self.config)
else:
self.module_loader = module_loader
apply_recommended_runsettings(self, module_name, module_instance, gpu=False, hdi='auto', windows='auto', parallel='auto', mpi='auto', scope='auto', datatransfer='auto', sweep='auto', **custom_runtime_arguments)
Applies regular settings for a given module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module_name |
str |
name of the module from the module manifest (required_modules() method) |
required |
module_instance |
Module |
the AML module we need to add settings to |
required |
gpu |
bool |
is the module using GPU? |
False |
hdi |
bool |
is the module using HDI/Spark? |
'auto' |
windows |
bool |
is the module using Windows compute? |
'auto' |
parallel |
bool |
is the module using ParallelRunStep? |
'auto' |
mpi |
bool |
is the module using Mpi? |
'auto' |
custom_runtime_arguments |
dict |
any additional custom args |
{} |
Source code in shrike/pipeline/pipeline_helper.py
def apply_recommended_runsettings(
self,
module_name,
module_instance,
gpu=False, # can't autodetect that
hdi="auto",
windows="auto",
parallel="auto",
mpi="auto",
scope="auto",
datatransfer="auto",
sweep="auto",
**custom_runtime_arguments,
):
"""Applies regular settings for a given module.
Args:
module_name (str): name of the module from the module manifest (required_modules() method)
module_instance (Module): the AML module we need to add settings to
gpu (bool): is the module using GPU?
hdi (bool): is the module using HDI/Spark?
windows (bool): is the module using Windows compute?
parallel (bool): is the module using ParallelRunStep?
mpi (bool): is the module using Mpi?
custom_runtime_arguments (dict): any additional custom args
"""
# verifies if module_name corresponds to module_instance
self._check_module_runsettings_consistency(module_name, module_instance)
# Auto detect runsettings
if hdi == "auto":
hdi = str(module_instance.type) == "HDInsightComponent"
if hdi:
log.info(f"Module {module_name} detected as HDI: {hdi}")
if parallel == "auto":
parallel = str(module_instance.type) == "ParallelComponent"
if parallel:
log.info(f"Module {module_name} detected as PARALLEL: {parallel}")
if mpi == "auto":
mpi = str(module_instance.type) == "DistributedComponent"
if mpi:
log.info(f"Module {module_name} detected as MPI: {mpi}")
if scope == "auto":
scope = str(module_instance.type) == "ScopeComponent"
if scope:
log.info(f"Module {module_name} detected as SCOPE: {scope}")
if sweep == "auto":
sweep = str(module_instance.type) == "SweepComponent"
if sweep:
log.info(f"Module {module_name} detected as SweepComponent: {sweep}")
if windows == "auto":
if (
str(module_instance.type) == "HDInsightComponent"
or str(module_instance.type) == "ScopeComponent"
or str(module_instance.type) == "DataTransferComponent"
or str(module_instance.type) == "SweepComponent"
):
# HDI/scope/datatransfer/sweep modules might not have that environment object
windows = False
else:
windows = (
module_instance._definition.environment.os.lower() == "windows"
)
if windows:
log.info(f"Module {module_name} detected as WINDOWS: {windows}")
if datatransfer == "auto":
datatransfer = str(module_instance.type) == "DataTransferComponent"
if datatransfer:
log.info(
f"Module {module_name} detected as DATATRANSFER: {datatransfer}"
)
if parallel:
self._apply_parallel_runsettings(
module_name,
module_instance,
windows=windows,
gpu=gpu,
**custom_runtime_arguments,
)
return
if sweep:
self._apply_sweep_runsettings(
module_name,
module_instance,
windows=windows,
gpu=gpu,
**custom_runtime_arguments,
)
return
if windows:
# no detonation chamber, we an't use "use_local" here
self._apply_windows_runsettings(
module_name, module_instance, mpi=mpi, **custom_runtime_arguments
)
return
if hdi:
# no detonation chamber, we an't use "use_local" here
self._apply_hdi_runsettings(
module_name, module_instance, **custom_runtime_arguments
)
return
if scope:
self._apply_scope_runsettings(
module_name, module_instance, **custom_runtime_arguments
)
return
if datatransfer:
self._apply_datatransfer_runsettings(
module_name, module_instance, **custom_runtime_arguments
)
return
self._apply_linux_runsettings(
module_name, module_instance, mpi=mpi, gpu=gpu, **custom_runtime_arguments
)
apply_smart_runsettings(self, component_instance, gpu=False, hdi='auto', windows='auto', parallel='auto', mpi='auto', scope='auto', datatransfer='auto', sweep='auto', **custom_runtime_arguments)
Applies regular settings for a given component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_instance |
Component |
the AML component we need to add settings to |
required |
gpu |
bool |
is the component using GPU? |
False |
hdi |
bool |
is the component using HDI/Spark? |
'auto' |
windows |
bool |
is the component using Windows compute? |
'auto' |
parallel |
bool |
is the component using ParallelRunStep? |
'auto' |
mpi |
bool |
is the component using Mpi? |
'auto' |
scope |
bool |
is the component using scope? |
'auto' |
datatransfer |
bool |
is the component using datatransfer? |
'auto' |
sweep |
bool |
is the component using sweep? |
'auto' |
custom_runtime_arguments |
dict |
any additional custom args |
{} |
Source code in shrike/pipeline/pipeline_helper.py
def apply_smart_runsettings(
self,
component_instance,
gpu=False, # can't autodetect that
hdi="auto",
windows="auto",
parallel="auto",
mpi="auto",
scope="auto",
datatransfer="auto",
sweep="auto",
**custom_runtime_arguments,
):
"""Applies regular settings for a given component.
Args:
component_instance (Component): the AML component we need to add settings to
gpu (bool): is the component using GPU?
hdi (bool): is the component using HDI/Spark?
windows (bool): is the component using Windows compute?
parallel (bool): is the component using ParallelRunStep?
mpi (bool): is the component using Mpi?
scope (bool): is the component using scope?
datatransfer (bool): is the component using datatransfer?
sweep (bool): is the component using sweep?
custom_runtime_arguments (dict): any additional custom args
"""
# infer component_name
component_name = self._get_component_name_from_instance(component_instance)
self.apply_recommended_runsettings(
component_name,
component_instance,
gpu,
hdi,
windows,
parallel,
mpi,
scope,
datatransfer,
sweep,
**custom_runtime_arguments,
)
build(self, config)
Builds a pipeline function for this pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
DictConfig |
configuration object (see get_config_class()) |
required |
Returns:
Type | Description |
---|---|
pipeline_function |
the function to create your pipeline |
Source code in shrike/pipeline/pipeline_helper.py
def build(self, config):
"""Builds a pipeline function for this pipeline.
Args:
config (DictConfig): configuration object (see get_config_class())
Returns:
pipeline_function: the function to create your pipeline
"""
raise NotImplementedError("You need to implement your build() method.")
canary(self, args, experiment, pipeline_run)
Tests the output of the pipeline
Source code in shrike/pipeline/pipeline_helper.py
def canary(self, args, experiment, pipeline_run):
"""Tests the output of the pipeline"""
pass
component_load(self, component_key)
Loads one component from the manifest
Source code in shrike/pipeline/pipeline_helper.py
def component_load(self, component_key) -> Callable[..., "Component"]:
"""Loads one component from the manifest"""
return self.module_loader.load_module(component_key, self.required_modules())
connect(self)
Connect to the AML workspace using internal config
Source code in shrike/pipeline/pipeline_helper.py
def connect(self):
"""Connect to the AML workspace using internal config"""
return azureml_connect(
aml_subscription_id=self.config.aml.subscription_id,
aml_resource_group=self.config.aml.resource_group,
aml_workspace_name=self.config.aml.workspace_name,
aml_auth=self.config.aml.auth,
aml_tenant=self.config.aml.tenant,
aml_force=self.config.aml.force,
) # NOTE: this also stores aml workspace in internal global variable
dataset_load(self, name, version='latest')
Loads a dataset by either id or name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name or uuid of dataset to load |
required |
version |
str |
if loading by name, used to specify version (default "latest") |
'latest' |
NOTE: in AzureML SDK there are 2 different methods for loading dataset one for id, one for name. This method just wraps them up in one.
Source code in shrike/pipeline/pipeline_helper.py
def dataset_load(self, name, version="latest"):
"""Loads a dataset by either id or name.
Args:
name (str): name or uuid of dataset to load
version (str): if loading by name, used to specify version (default "latest")
NOTE: in AzureML SDK there are 2 different methods for loading dataset
one for id, one for name. This method just wraps them up in one."""
# test if given name is a uuid
try:
parsed_uuid = uuid.UUID(name)
log.info(f"Getting a dataset handle [id={name}]...")
return Dataset.get_by_id(self.workspace(), id=name)
except ValueError:
log.info(f"Getting a dataset handle [name={name} version={version}]...")
return Dataset.get_by_name(self.workspace(), name=name, version=version)
get_config_class()
classmethod
Returns a dataclass containing config for this pipeline
Source code in shrike/pipeline/pipeline_helper.py
@classmethod
def get_config_class(cls):
"""Returns a dataclass containing config for this pipeline"""
pass
is_eyesoff(self)
" Check whether the workspace is eyes-off. If it lives in a non-Torus tenant, then eyes-off; If in Torus tenant, check whether it is in the allow-list of eyes-on Torus subscriptions.
Source code in shrike/pipeline/pipeline_helper.py
def is_eyesoff(self) -> bool:
""" "
Check whether the workspace is eyes-off.
If it lives in a non-Torus tenant, then eyes-off;
If in Torus tenant, check whether it is in the allow-list of eyes-on Torus subscriptions.
"""
workspace = self.workspace()
tenant_id = workspace.get_details()["identity"]["tenant_id"]
subscription_id = workspace.subscription_id
return is_eyesoff_helper(tenant_id, subscription_id)
main()
classmethod
Pipeline helper main function, parses arguments and run pipeline.
Source code in shrike/pipeline/pipeline_helper.py
@classmethod
def main(cls):
"""Pipeline helper main function, parses arguments and run pipeline."""
config_dict = cls._default_config()
@hydra.main(config_name="default")
def hydra_run(cfg: DictConfig):
# merge cli config with default config
cfg = OmegaConf.merge(config_dict, cfg)
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument("--config-dir")
args, _ = arg_parser.parse_known_args()
cfg.run.config_dir = os.path.join(
HydraConfig.get().runtime.cwd, args.config_dir
)
log.info("*** CONFIGURATION ***")
log.info(OmegaConf.to_yaml(cfg))
# create class instance
main_instance = cls(cfg)
# run
main_instance.run()
hydra_run()
return cls.BUILT_PIPELINE # return so we can have some unit tests done
module_load(self, module_key)
Loads one module from the manifest
Source code in shrike/pipeline/pipeline_helper.py
def module_load(self, module_key):
"""Loads one module from the manifest"""
return self.module_loader.load_module(module_key, self.required_modules())
pipeline_instance(self, pipeline_function, config)
Creates an instance of the pipeline using arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_function |
function |
the pipeline function obtained from self.build() |
required |
config |
DictConfig |
configuration object (see get_config_class()) |
required |
Returns:
Type | Description |
---|---|
pipeline |
the instance constructed using build() function |
Source code in shrike/pipeline/pipeline_helper.py
def pipeline_instance(self, pipeline_function, config):
"""Creates an instance of the pipeline using arguments.
Args:
pipeline_function (function): the pipeline function obtained from self.build()
config (DictConfig): configuration object (see get_config_class())
Returns:
pipeline: the instance constructed using build() function
"""
raise NotImplementedError(
"You need to implement your pipeline_instance() method."
)
required_modules()
classmethod
Dependencies on modules/components
Returns:
Type | Description |
---|---|
dict[str, dict] |
manifest |
Source code in shrike/pipeline/pipeline_helper.py
@classmethod
def required_modules(cls):
"""Dependencies on modules/components
Returns:
dict[str, dict]: manifest
"""
return {}
required_subgraphs()
classmethod
Dependencies on other subgraphs
Returns:
Type | Description |
---|---|
dict[str, AMLPipelineHelper] |
dictionary of subgraphs used for building this one. keys are whatever string you want for building your graph values should be classes inherinting from AMLPipelineHelper. |
Source code in shrike/pipeline/pipeline_helper.py
@classmethod
def required_subgraphs(cls):
"""Dependencies on other subgraphs
Returns:
dict[str, AMLPipelineHelper]: dictionary of subgraphs used for building this one.
keys are whatever string you want for building your graph
values should be classes inherinting from AMLPipelineHelper.
"""
return {}
run(self)
Run pipeline using arguments
Source code in shrike/pipeline/pipeline_helper.py
def run(self):
"""Run pipeline using arguments"""
# set logging level
if self.config.run.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Log the telemetry information in the Azure Application Insights
try:
telemetry_logger = TelemetryLogger(
enable_telemetry=not self.config.run.disable_telemetry
)
telemetry_logger.log_trace(
message=f"shrike.pipeline=={__version__}",
properties={"custom_dimensions": {"configuration": str(self.config)}},
)
except Exception as ex:
log.debug(
f"Sending trace log messages to application insight is not successful. The exception message: {ex}"
)
# Check whether the experiment name is valid
self.validate_experiment_name(self.config.run.experiment_name)
self.repository_info = get_repo_info()
log.info(f"Running from repository: {self.repository_info}")
log.info(f"azureml.core.VERSION = {azureml.core.VERSION}")
self.connect()
pipeline_run = None
if self.config.run.resume:
if not self.config.run.pipeline_run_id:
raise Exception(
"To be able to use --resume you need to provide both --experiment-name and --run-id."
)
log.info(f"Resuming Experiment {self.config.run.experiment_name}...")
experiment = Experiment(
current_workspace(), self.config.run.experiment_name
)
log.info(f"Resuming PipelineRun {self.config.run.pipeline_run_id}...")
# pipeline_run is of the class "azureml.pipeline.core.PipelineRun"
pipeline_run = PipelineRun(experiment, self.config.run.pipeline_run_id)
else:
keep_modified_files, override = False, False
yaml_to_be_recovered = []
if self.config.tenant_overrides.allow_override:
log.info("Check if tenant is consistent with spec yaml")
override, mapping = self._check_if_spec_yaml_override_is_needed()
if override:
try:
tenant = self.config.aml.tenant
log.info(
f"Performing spec yaml override to adapt to tenant: {tenant}."
)
yaml_to_be_recovered = self._override_spec_yaml(mapping)
keep_modified_files = (
self.config.tenant_overrides.keep_modified_files
)
except BaseException as e:
log.error(f"An error occurred, override is not successful: {e}")
try:
pipeline_run = self.build_and_submit_new_pipeline()
except BaseException as e:
log.error(f"An error {e} occurred during pipeline submission.")
if override:
log.info("Now trying to recover overrides.")
self._recover_tenant_overrides(override, yaml_to_be_recovered, False)
raise
else:
self._recover_tenant_overrides(
override, yaml_to_be_recovered, keep_modified_files
)
if not pipeline_run:
# not submitting code, exit now
return
# launch the pipeline execution
log.info(f"Pipeline Run Id: {pipeline_run.id}")
log.info(
f"""
#################################
#################################
#################################
Follow link below to access your pipeline run directly:
-------------------------------------------------------
{pipeline_run.get_portal_url()}
#################################
#################################
#################################
"""
)
if self.config.run.canary:
log.info(
"*** CANARY MODE ***\n----------------------------------------------------------"
)
pipeline_run.wait_for_completion(show_output=True)
# azureml.pipeline.core.PipelineRun.get_status(): ["Running", "Finished", "Failed"]
# azureml.core.run.get_status(): ["Running", "Completed", "Failed"]
if pipeline_run.get_status() in ["Finished", "Completed"]:
log.info("*** PIPELINE FINISHED, TESTING WITH canary() METHOD ***")
self.canary(self.config, pipeline_run.experiment, pipeline_run)
log.info("OK")
elif pipeline_run.get_status() == "Failed":
log.info("*** PIPELINE FAILED ***")
raise Exception("Pipeline failed.")
else:
log.info("*** PIPELINE STATUS {} UNKNOWN ***")
raise Exception("Pipeline status is unknown.")
else:
if not self.config.run.silent:
webbrowser.open(url=pipeline_run.get_portal_url())
# This will wait for the completion of the pipeline execution
# and show the full logs in the meantime
if self.config.run.resume or self.config.run.wait:
log.info(
"Below are the raw debug logs from your pipeline execution:\n----------------------------------------------------------"
)
pipeline_run.wait_for_completion(show_output=True)
subgraph_load(self, subgraph_key, custom_config={})
Loads one subgraph from the manifest
Parameters:
Name | Type | Description | Default |
---|---|---|---|
subgraph_key |
str |
subgraph identifier that is used in the required_subgraphs() method |
required |
custom_config |
DictConfig |
custom configuration object, this custom object witll be |
{} |
Source code in shrike/pipeline/pipeline_helper.py
def subgraph_load(self, subgraph_key, custom_config=OmegaConf.create()) -> Callable:
"""Loads one subgraph from the manifest
Args:
subgraph_key (str): subgraph identifier that is used in the required_subgraphs() method
custom_config (DictConfig): custom configuration object, this custom object witll be
added to the pipeline config
"""
subgraph_class = self.required_subgraphs()[subgraph_key]
subgraph_config = self.config.copy()
if custom_config:
with open_dict(subgraph_config):
for key in custom_config:
subgraph_config[key] = custom_config[key]
log.info(f"Building subgraph [{subgraph_key} as {subgraph_class.__name__}]...")
# NOTE: below creates subgraph with updated pipeline_config
subgraph_instance = subgraph_class(
config=subgraph_config, module_loader=self.module_loader
)
# subgraph_instance.setup(self.pipeline_config)
return subgraph_instance.build(subgraph_config)
validate_experiment_name(name)
staticmethod
Check whether the experiment name is valid. It's required that experiment names must be between 1 to 250 characters, start with letters or numbers. Valid characters are letters, numbers, "_", and the "-" character.
Source code in shrike/pipeline/pipeline_helper.py
@staticmethod
def validate_experiment_name(name):
"""
Check whether the experiment name is valid. It's required that
experiment names must be between 1 to 250 characters, start with
letters or numbers. Valid characters are letters, numbers, "_",
and the "-" character.
"""
if len(name) < 1 or len(name) > 250:
raise ValueError("Experiment names must be between 1 to 250 characters!")
if not re.match("^[a-zA-Z0-9]$", name[0]):
raise ValueError("Experiment names must start with letters or numbers!")
if not re.match("^[a-zA-Z0-9_-]*$", name):
raise ValueError(
"Valid experiment names must only contain letters, numbers, underscore and dash!"
)
return True
workspace(self)
Gets the current workspace
Source code in shrike/pipeline/pipeline_helper.py
def workspace(self):
"""Gets the current workspace"""
return current_workspace()