Canary helper
Canary helper code
get_repo_info()
    [EXPERIMENTAL] Obtains info on the current repo the code is in.
Returns:
| Type | Description | 
|---|---|
| dict | git meta data | 
Source code in shrike/pipeline/canary_helper.py
          def get_repo_info():
    """[EXPERIMENTAL] Obtains info on the current repo the code is in.
    Returns:
        dict: git meta data"""
    try:
        import git
        repo = git.Repo(search_parent_directories=True)
        branch = repo.active_branch
        head = repo.head
        return {
            "git": repo.remotes.origin.url,
            "branch": branch.name,
            "commit": head.commit.hexsha,
            "last_known_author": head.commit.author.name,
        }
    except:
        return {"git": "n/a"}
test_pipeline_step_metrics(pipeline_run, expected_metrics)
    Tests a pipeline run against a set of expected metrics.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pipeline_run | PipelineRun | the AzureML pipeline run | required | 
| expected_metrics | dict | defines the tests to execute | required | 
Returns:
| Type | Description | 
|---|---|
| List | errors collected during tests | 
!!! notes example entries in expected_metrics
"SelectJsonField" : [{"row" : {"name" : "output", "key" : "size", "value" : 369559}}],
tests module "SelectJsonField" for a metric row named "output", checks key "size" must have value 369559
"tokenizerparallel" : [{"metric" : {"key" : "Failed Items", "value" : 0}}],
tests module "tokenizerparallel" for a metric named "Failed Items", value must be 0
Source code in shrike/pipeline/canary_helper.py
          def test_pipeline_step_metrics(pipeline_run, expected_metrics):
    """Tests a pipeline run against a set of expected metrics.
    Args:
        pipeline_run (PipelineRun): the AzureML pipeline run
        expected_metrics (dict): defines the tests to execute
    Returns:
        List: errors collected during tests
    Notes:
        example entries in expected_metrics
        "SelectJsonField" : [{"row" : {"name" : "output", "key" : "size", "value" : 369559}}],
        tests module "SelectJsonField" for a metric row named "output", checks key "size" must have value 369559
        "tokenizerparallel" : [{"metric" : {"key" : "Failed Items", "value" : 0}}],
        tests module "tokenizerparallel" for a metric named "Failed Items", value must be 0
    """
    errors = []
    log.info("Looping through PipelineRun steps to test metrics...")
    for step in pipeline_run.get_steps():
        log.info(f"Checking status of step {step.name}...")
        observed_metrics = step.get_metrics()
        log.info(f"Step Metrics: {observed_metrics}")
        status = step.get_status()
        if status != "Finished":
            errors.append(f"Pipeline step {step.name} status is {status} != Finished")
        if step.name in expected_metrics:
            for expected_metric_test in expected_metrics[step.name]:
                if "row" in expected_metric_test:
                    log.info(f"Checking metrics, looking for {expected_metric_test}")
                    row_key = expected_metric_test["row"]["name"]
                    metric_key = expected_metric_test["row"]["key"]
                    expected_value = expected_metric_test["row"]["value"]
                    if row_key not in observed_metrics:
                        errors.append(
                            f"Step {step.name} metric row '{row_key}' not available in observed metrics {observed_metrics}"
                        )
                    elif metric_key not in observed_metrics[row_key]:
                        errors.append(
                            f"Step {step.name} metric row '{row_key}' does not have a metric '{metric_key}' in observed metrics {observed_metrics[row_key]}"
                        )
                    elif observed_metrics[row_key][metric_key] != expected_value:
                        errors.append(
                            f"Step {step.name} metric row '{row_key}' - metric '{metric_key}' - does not have expected value {expected_value} in observed metrics {observed_metrics[row_key]}"
                        )
                if "metric" in expected_metric_test:
                    log.info(f"Checking metrics, looking for {expected_metric_test}")
                    metric_key = expected_metric_test["metric"]["key"]
                    expected_value = expected_metric_test["metric"]["value"]
                    if metric_key not in observed_metrics:
                        errors.append(
                            f"Step {step.name} metric '{metric_key}' not available in observed metrics {observed_metrics}"
                        )
                    elif observed_metrics[metric_key] != expected_value:
                        errors.append(
                            f"Step {step.name} metric row '{metric_key}' does not have expected value {expected_value} in observed metrics {observed_metrics[metric_key]}"
                        )
    return errors
test_pipeline_step_output(pipeline_run, step_name, output_name, **kwargs)
    Verify a given pipeline output for some basic checks.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pipeline_run | PipelineRun | the pipeline run | required | 
| step_name | str | name of the step to check | required | 
| output_name | str | name of the output to check | required | 
| **kwargs |  | Arbitrary keyword arguments defining the test | {} | 
!!! kwargs length (int) : to verify the length
Returns:
| Type | Description | 
|---|---|
| dict | results | 
Source code in shrike/pipeline/canary_helper.py
          def test_pipeline_step_output(pipeline_run, step_name, output_name, **kwargs):
    """Verify a given pipeline output for some basic checks.
    Args:
        pipeline_run (PipelineRun): the pipeline run
        step_name (str): name of the step to check
        output_name (str): name of the output to check
        **kwargs: Arbitrary keyword arguments defining the test
    Kwargs:
        length (int) : to verify the length
    Returns:
        dict: results
    """
    pipeline_step = pipeline_run.find_step_run(step_name)
    results = {"errors": []}
    if not pipeline_step:
        results[
            "exception"
        ] = f"Could not find step {step_name} in pipeline {pipeline_run._run_id}."
        return results
    output_port = pipeline_step[0].get_output_data(output_name)
    if not output_port:
        results[
            "exception"
        ] = f"Could not find output {output_name} in step {step_name} in pipeline {pipeline_run._run_id}."
        return results
    data_reference = output_port._data_reference
    data_path = DataPath(
        datastore=data_reference.datastore,
        path_on_datastore=data_reference.path_on_datastore,
        name=data_reference.data_reference_name,
    )
    if kwargs.get("length"):
        expected_length = kwargs.get("length")
        log.info(
            f"Checking count={expected_length} of files for step {step_name} output {output_name}..."
        )
        data_set = Dataset.File.from_files(data_path)
        files_list = data_set.to_path()
        if expected_length < 0:
            # test any length > 0
            results["length"] = {"expected": ">0", "observed": len(files_list)}
            if results["length"]["observed"] == 0:
                message = """Length mismatch in output {output_name} in step {step_name} in pipeline {run_id}. Expected len {a} found {b}.""".format(
                    output_name=output_name,
                    step_name=step_name,
                    run_id=pipeline_run._run_id,
                    b=results["length"]["observed"],
                    a=results["length"]["expected"],
                )
                # logging.error(message)
                results["errors"].append(message)
        else:
            results["length"] = {
                "expected": expected_length,
                "observed": len(files_list),
            }
            if results["length"]["observed"] != results["length"]["expected"]:
                message = """Length mismatch in output {output_name} in step {step_name} in pipeline {run_id}. Expected len {a} found {b}.""".format(
                    output_name=output_name,
                    step_name=step_name,
                    run_id=pipeline_run._run_id,
                    b=results["length"]["observed"],
                    a=results["length"]["expected"],
                )
                # logging.error(message)
                results["errors"].append(message)
    return results