Skip to content

Pipeline helper

Pipeline helper class to create pipelines loading modules from a flexible manifest.

AMLPipelineHelper

Helper class for building pipelines

__init__(self, config, module_loader=None) special

Constructs the pipeline helper.

Parameters:

Name Type Description Default
config DictConfig

config for this object

required
module_loader AMLModuleLoader

which module loader to (re)use

None
Source code in shrike/pipeline/pipeline_helper.py
def __init__(self, config, module_loader=None):
    """Constructs the pipeline helper.

    Args:
        config (DictConfig): config for this object
        module_loader (AMLModuleLoader): which module loader to (re)use
    """
    self.config = config

    if module_loader is None:
        log.info(
            f"Creating instance of AMLModuleLoader for {self.__class__.__name__}"
        )
        self.module_loader = AMLModuleLoader(self.config)
    else:
        self.module_loader = module_loader

Applies regular settings for a given module.

Parameters:

Name Type Description Default
module_name str

name of the module from the module manifest (required_modules() method)

required
module_instance Module

the AML module we need to add settings to

required
gpu bool

is the module using GPU?

False
hdi bool

is the module using HDI/Spark?

'auto'
windows bool

is the module using Windows compute?

'auto'
parallel bool

is the module using ParallelRunStep?

'auto'
mpi bool

is the module using Mpi?

'auto'
custom_runtime_arguments dict

any additional custom args

{}
Source code in shrike/pipeline/pipeline_helper.py
def apply_recommended_runsettings(
    self,
    module_name,
    module_instance,
    gpu=False,  # can't autodetect that
    hdi="auto",
    windows="auto",
    parallel="auto",
    mpi="auto",
    scope="auto",
    datatransfer="auto",
    sweep="auto",
    **custom_runtime_arguments,
):
    """Applies regular settings for a given module.

    Args:
        module_name (str): name of the module from the module manifest (required_modules() method)
        module_instance (Module): the AML module we need to add settings to
        gpu (bool): is the module using GPU?
        hdi (bool): is the module using HDI/Spark?
        windows (bool): is the module using Windows compute?
        parallel (bool): is the module using ParallelRunStep?
        mpi (bool): is the module using Mpi?
        custom_runtime_arguments (dict): any additional custom args
    """
    # verifies if module_name corresponds to module_instance
    self._check_module_runsettings_consistency(module_name, module_instance)

    # Auto detect runsettings
    if hdi == "auto":
        hdi = str(module_instance.type) == "HDInsightComponent"
        if hdi:
            log.info(f"Module {module_name} detected as HDI: {hdi}")

    if parallel == "auto":
        parallel = str(module_instance.type) == "ParallelComponent"
        if parallel:
            log.info(f"Module {module_name} detected as PARALLEL: {parallel}")

    if mpi == "auto":
        mpi = str(module_instance.type) == "DistributedComponent"
        if mpi:
            log.info(f"Module {module_name} detected as MPI: {mpi}")

    if scope == "auto":
        scope = str(module_instance.type) == "ScopeComponent"
        if scope:
            log.info(f"Module {module_name} detected as SCOPE: {scope}")

    if sweep == "auto":
        sweep = str(module_instance.type) == "SweepComponent"
        if sweep:
            log.info(f"Module {module_name} detected as SweepComponent: {sweep}")

    if windows == "auto":
        if (
            str(module_instance.type) == "HDInsightComponent"
            or str(module_instance.type) == "ScopeComponent"
            or str(module_instance.type) == "DataTransferComponent"
            or str(module_instance.type) == "SweepComponent"
        ):
            # HDI/scope/datatransfer/sweep modules might not have that environment object
            windows = False
        else:
            windows = (
                module_instance._definition.environment.os.lower() == "windows"
            )
            if windows:
                log.info(f"Module {module_name} detected as WINDOWS: {windows}")

    if datatransfer == "auto":
        datatransfer = str(module_instance.type) == "DataTransferComponent"
        if datatransfer:
            log.info(
                f"Module {module_name} detected as DATATRANSFER: {datatransfer}"
            )

    if parallel:
        self._apply_parallel_runsettings(
            module_name,
            module_instance,
            windows=windows,
            gpu=gpu,
            **custom_runtime_arguments,
        )
        return

    if sweep:
        self._apply_sweep_runsettings(
            module_name,
            module_instance,
            windows=windows,
            gpu=gpu,
            **custom_runtime_arguments,
        )
        return

    if windows:
        # no detonation chamber, we an't use "use_local" here
        self._apply_windows_runsettings(
            module_name, module_instance, mpi=mpi, **custom_runtime_arguments
        )
        return

    if hdi:
        # no detonation chamber, we an't use "use_local" here
        self._apply_hdi_runsettings(
            module_name, module_instance, **custom_runtime_arguments
        )
        return

    if scope:
        self._apply_scope_runsettings(
            module_name, module_instance, **custom_runtime_arguments
        )
        return

    if datatransfer:
        self._apply_datatransfer_runsettings(
            module_name, module_instance, **custom_runtime_arguments
        )
        return

    self._apply_linux_runsettings(
        module_name, module_instance, mpi=mpi, gpu=gpu, **custom_runtime_arguments
    )

apply_smart_runsettings(self, component_instance, gpu=False, hdi='auto', windows='auto', parallel='auto', mpi='auto', scope='auto', datatransfer='auto', sweep='auto', **custom_runtime_arguments)

Applies regular settings for a given component.

Parameters:

Name Type Description Default
component_instance Component

the AML component we need to add settings to

required
gpu bool

is the component using GPU?

False
hdi bool

is the component using HDI/Spark?

'auto'
windows bool

is the component using Windows compute?

'auto'
parallel bool

is the component using ParallelRunStep?

'auto'
mpi bool

is the component using Mpi?

'auto'
scope bool

is the component using scope?

'auto'
datatransfer bool

is the component using datatransfer?

'auto'
sweep bool

is the component using sweep?

'auto'
custom_runtime_arguments dict

any additional custom args

{}
Source code in shrike/pipeline/pipeline_helper.py
def apply_smart_runsettings(
    self,
    component_instance,
    gpu=False,  # can't autodetect that
    hdi="auto",
    windows="auto",
    parallel="auto",
    mpi="auto",
    scope="auto",
    datatransfer="auto",
    sweep="auto",
    **custom_runtime_arguments,
):
    """Applies regular settings for a given component.

    Args:
        component_instance (Component): the AML component we need to add settings to
        gpu (bool): is the component using GPU?
        hdi (bool): is the component using HDI/Spark?
        windows (bool): is the component using Windows compute?
        parallel (bool): is the component using ParallelRunStep?
        mpi (bool): is the component using Mpi?
        scope (bool): is the component using scope?
        datatransfer (bool): is the component using datatransfer?
        sweep (bool): is the component using sweep?
        custom_runtime_arguments (dict): any additional custom args
    """
    # infer component_name
    component_name = self._get_component_name_from_instance(component_instance)
    self.apply_recommended_runsettings(
        component_name,
        component_instance,
        gpu,
        hdi,
        windows,
        parallel,
        mpi,
        scope,
        datatransfer,
        sweep,
        **custom_runtime_arguments,
    )

build(self, config)

Builds a pipeline function for this pipeline.

Parameters:

Name Type Description Default
config DictConfig

configuration object (see get_config_class())

required

Returns:

Type Description
pipeline_function

the function to create your pipeline

Source code in shrike/pipeline/pipeline_helper.py
def build(self, config):
    """Builds a pipeline function for this pipeline.

    Args:
        config (DictConfig): configuration object (see get_config_class())

    Returns:
        pipeline_function: the function to create your pipeline
    """
    raise NotImplementedError("You need to implement your build() method.")

canary(self, args, experiment, pipeline_run)

Tests the output of the pipeline

Source code in shrike/pipeline/pipeline_helper.py
def canary(self, args, experiment, pipeline_run):
    """Tests the output of the pipeline"""
    pass

component_load(self, component_key)

Loads one component from the manifest

Source code in shrike/pipeline/pipeline_helper.py
def component_load(self, component_key) -> Callable[..., "Component"]:
    """Loads one component from the manifest"""
    return self.module_loader.load_module(component_key, self.required_modules())

connect(self)

Connect to the AML workspace using internal config

Source code in shrike/pipeline/pipeline_helper.py
def connect(self):
    """Connect to the AML workspace using internal config"""
    return azureml_connect(
        aml_subscription_id=self.config.aml.subscription_id,
        aml_resource_group=self.config.aml.resource_group,
        aml_workspace_name=self.config.aml.workspace_name,
        aml_auth=self.config.aml.auth,
        aml_tenant=self.config.aml.tenant,
        aml_force=self.config.aml.force,
    )  # NOTE: this also stores aml workspace in internal global variable

dataset_load(self, name, version='latest')

Loads a dataset by either id or name.

Parameters:

Name Type Description Default
name str

name or uuid of dataset to load

required
version str

if loading by name, used to specify version (default "latest")

'latest'

NOTE: in AzureML SDK there are 2 different methods for loading dataset one for id, one for name. This method just wraps them up in one.

Source code in shrike/pipeline/pipeline_helper.py
def dataset_load(self, name, version="latest"):
    """Loads a dataset by either id or name.

    Args:
        name (str): name or uuid of dataset to load
        version (str): if loading by name, used to specify version (default "latest")

    NOTE: in AzureML SDK there are 2 different methods for loading dataset
    one for id, one for name. This method just wraps them up in one."""
    # test if given name is a uuid
    try:
        parsed_uuid = uuid.UUID(name)
        log.info(f"Getting a dataset handle [id={name}]...")
        return Dataset.get_by_id(self.workspace(), id=name)
    except ValueError:
        log.info(f"Getting a dataset handle [name={name} version={version}]...")
        return Dataset.get_by_name(self.workspace(), name=name, version=version)

get_config_class() classmethod

Returns a dataclass containing config for this pipeline

Source code in shrike/pipeline/pipeline_helper.py
@classmethod
def get_config_class(cls):
    """Returns a dataclass containing config for this pipeline"""
    pass

is_eyesoff(self)

" Check whether the workspace is eyes-off. If it lives in a non-Torus tenant, then eyes-off; If in Torus tenant, check whether it is in the allow-list of eyes-on Torus subscriptions.

Source code in shrike/pipeline/pipeline_helper.py
def is_eyesoff(self) -> bool:
    """ "
    Check whether the workspace is eyes-off.
    If it lives in a non-Torus tenant, then eyes-off;
    If in Torus tenant, check whether it is in the allow-list of eyes-on Torus subscriptions.
    """
    workspace = self.workspace()
    tenant_id = workspace.get_details()["identity"]["tenant_id"]
    subscription_id = workspace.subscription_id
    return is_eyesoff_helper(tenant_id, subscription_id)

main() classmethod

Pipeline helper main function, parses arguments and run pipeline.

Source code in shrike/pipeline/pipeline_helper.py
@classmethod
def main(cls):
    """Pipeline helper main function, parses arguments and run pipeline."""
    config_dict = cls._default_config()

    @hydra.main(config_name="default")
    def hydra_run(cfg: DictConfig):
        # merge cli config with default config
        cfg = OmegaConf.merge(config_dict, cfg)

        arg_parser = argparse.ArgumentParser()
        arg_parser.add_argument("--config-dir")
        args, _ = arg_parser.parse_known_args()
        cfg.run.config_dir = os.path.join(
            HydraConfig.get().runtime.cwd, args.config_dir
        )

        log.info("*** CONFIGURATION ***")
        log.info(OmegaConf.to_yaml(cfg))

        # create class instance
        main_instance = cls(cfg)

        # run
        main_instance.run()

    hydra_run()

    return cls.BUILT_PIPELINE  # return so we can have some unit tests done

module_load(self, module_key)

Loads one module from the manifest

Source code in shrike/pipeline/pipeline_helper.py
def module_load(self, module_key):
    """Loads one module from the manifest"""
    return self.module_loader.load_module(module_key, self.required_modules())

pipeline_instance(self, pipeline_function, config)

Creates an instance of the pipeline using arguments.

Parameters:

Name Type Description Default
pipeline_function function

the pipeline function obtained from self.build()

required
config DictConfig

configuration object (see get_config_class())

required

Returns:

Type Description
pipeline

the instance constructed using build() function

Source code in shrike/pipeline/pipeline_helper.py
def pipeline_instance(self, pipeline_function, config):
    """Creates an instance of the pipeline using arguments.

    Args:
        pipeline_function (function): the pipeline function obtained from self.build()
        config (DictConfig): configuration object (see get_config_class())

    Returns:
        pipeline: the instance constructed using build() function
    """
    raise NotImplementedError(
        "You need to implement your pipeline_instance() method."
    )

required_modules() classmethod

Dependencies on modules/components

Returns:

Type Description
dict[str, dict]

manifest

Source code in shrike/pipeline/pipeline_helper.py
@classmethod
def required_modules(cls):
    """Dependencies on modules/components

    Returns:
        dict[str, dict]: manifest
    """
    return {}

required_subgraphs() classmethod

Dependencies on other subgraphs

Returns:

Type Description
dict[str, AMLPipelineHelper]

dictionary of subgraphs used for building this one. keys are whatever string you want for building your graph values should be classes inherinting from AMLPipelineHelper.

Source code in shrike/pipeline/pipeline_helper.py
@classmethod
def required_subgraphs(cls):
    """Dependencies on other subgraphs
    Returns:
        dict[str, AMLPipelineHelper]: dictionary of subgraphs used for building this one.
            keys are whatever string you want for building your graph
            values should be classes inherinting from AMLPipelineHelper.
    """
    return {}

run(self)

Run pipeline using arguments

Source code in shrike/pipeline/pipeline_helper.py
    def run(self):
        """Run pipeline using arguments"""
        # set logging level
        if self.config.run.verbose:
            logging.getLogger().setLevel(logging.DEBUG)

        # Log the telemetry information in the Azure Application Insights
        try:
            telemetry_logger = TelemetryLogger(
                enable_telemetry=not self.config.run.disable_telemetry
            )
            telemetry_logger.log_trace(
                message=f"shrike.pipeline=={__version__}",
                properties={"custom_dimensions": {"configuration": str(self.config)}},
            )
        except Exception as ex:
            log.debug(
                f"Sending trace log messages to application insight is not successful. The exception message: {ex}"
            )

        # Check whether the experiment name is valid
        self.validate_experiment_name(self.config.run.experiment_name)

        self.repository_info = get_repo_info()
        log.info(f"Running from repository: {self.repository_info}")

        log.info(f"azureml.core.VERSION = {azureml.core.VERSION}")
        self.connect()

        pipeline_run = None
        if self.config.run.resume:
            if not self.config.run.pipeline_run_id:
                raise Exception(
                    "To be able to use --resume you need to provide both --experiment-name and --run-id."
                )

            log.info(f"Resuming Experiment {self.config.run.experiment_name}...")
            experiment = Experiment(
                current_workspace(), self.config.run.experiment_name
            )
            log.info(f"Resuming PipelineRun {self.config.run.pipeline_run_id}...")
            # pipeline_run is of the class "azureml.pipeline.core.PipelineRun"
            pipeline_run = PipelineRun(experiment, self.config.run.pipeline_run_id)
        else:
            keep_modified_files, override = False, False
            yaml_to_be_recovered = []

            if self.config.tenant_overrides.allow_override:
                log.info("Check if tenant is consistent with spec yaml")
                override, mapping = self._check_if_spec_yaml_override_is_needed()
                if override:
                    try:
                        tenant = self.config.aml.tenant
                        log.info(
                            f"Performing spec yaml override to adapt to tenant: {tenant}."
                        )
                        yaml_to_be_recovered = self._override_spec_yaml(mapping)
                        keep_modified_files = (
                            self.config.tenant_overrides.keep_modified_files
                        )
                    except BaseException as e:
                        log.error(f"An error occurred, override is not successful: {e}")
            try:
                pipeline_run = self.build_and_submit_new_pipeline()
            except BaseException as e:
                log.error(f"An error {e} occurred during pipeline submission.")
                if override:
                    log.info("Now trying to recover overrides.")
                self._recover_tenant_overrides(override, yaml_to_be_recovered, False)
                raise
            else:
                self._recover_tenant_overrides(
                    override, yaml_to_be_recovered, keep_modified_files
                )

        if not pipeline_run:
            # not submitting code, exit now
            return
        # launch the pipeline execution
        log.info(f"Pipeline Run Id: {pipeline_run.id}")
        log.info(
            f"""
#################################
#################################
#################################

Follow link below to access your pipeline run directly:
-------------------------------------------------------

{pipeline_run.get_portal_url()}

#################################
#################################
#################################
        """
        )

        if self.config.run.canary:
            log.info(
                "*** CANARY MODE ***\n----------------------------------------------------------"
            )
            pipeline_run.wait_for_completion(show_output=True)

            # azureml.pipeline.core.PipelineRun.get_status(): ["Running", "Finished", "Failed"]
            # azureml.core.run.get_status(): ["Running", "Completed", "Failed"]
            if pipeline_run.get_status() in ["Finished", "Completed"]:
                log.info("*** PIPELINE FINISHED, TESTING WITH canary() METHOD ***")
                self.canary(self.config, pipeline_run.experiment, pipeline_run)
                log.info("OK")
            elif pipeline_run.get_status() == "Failed":
                log.info("*** PIPELINE FAILED ***")
                raise Exception("Pipeline failed.")
            else:
                log.info("*** PIPELINE STATUS {} UNKNOWN ***")
                raise Exception("Pipeline status is unknown.")

        else:
            if not self.config.run.silent:
                webbrowser.open(url=pipeline_run.get_portal_url())

            # This will wait for the completion of the pipeline execution
            # and show the full logs in the meantime
            if self.config.run.resume or self.config.run.wait:
                log.info(
                    "Below are the raw debug logs from your pipeline execution:\n----------------------------------------------------------"
                )
                pipeline_run.wait_for_completion(show_output=True)

subgraph_load(self, subgraph_key, custom_config={})

Loads one subgraph from the manifest

Parameters:

Name Type Description Default
subgraph_key str

subgraph identifier that is used in the required_subgraphs() method

required
custom_config DictConfig

custom configuration object, this custom object witll be

{}
Source code in shrike/pipeline/pipeline_helper.py
def subgraph_load(self, subgraph_key, custom_config=OmegaConf.create()) -> Callable:
    """Loads one subgraph from the manifest
    Args:
        subgraph_key (str): subgraph identifier that is used in the required_subgraphs() method
        custom_config (DictConfig): custom configuration object, this custom object witll be
        added to the pipeline config

    """
    subgraph_class = self.required_subgraphs()[subgraph_key]

    subgraph_config = self.config.copy()
    if custom_config:
        with open_dict(subgraph_config):
            for key in custom_config:
                subgraph_config[key] = custom_config[key]

    log.info(f"Building subgraph [{subgraph_key} as {subgraph_class.__name__}]...")
    # NOTE: below creates subgraph with updated pipeline_config
    subgraph_instance = subgraph_class(
        config=subgraph_config, module_loader=self.module_loader
    )
    # subgraph_instance.setup(self.pipeline_config)
    return subgraph_instance.build(subgraph_config)

validate_experiment_name(name) staticmethod

Check whether the experiment name is valid. It's required that experiment names must be between 1 to 250 characters, start with letters or numbers. Valid characters are letters, numbers, "_", and the "-" character.

Source code in shrike/pipeline/pipeline_helper.py
@staticmethod
def validate_experiment_name(name):
    """
    Check whether the experiment name is valid. It's required that
    experiment names must be between 1 to 250 characters, start with
    letters or numbers. Valid characters are letters, numbers, "_",
    and the "-" character.
    """
    if len(name) < 1 or len(name) > 250:
        raise ValueError("Experiment names must be between 1 to 250 characters!")
    if not re.match("^[a-zA-Z0-9]$", name[0]):
        raise ValueError("Experiment names must start with letters or numbers!")
    if not re.match("^[a-zA-Z0-9_-]*$", name):
        raise ValueError(
            "Valid experiment names must only contain letters, numbers, underscore and dash!"
        )
    return True

workspace(self)

Gets the current workspace

Source code in shrike/pipeline/pipeline_helper.py
def workspace(self):
    """Gets the current workspace"""
    return current_workspace()