Skip to content

testing.componets

PyTest suite for testing if run.py is aligned with module specification:

Status: this code relates to the recipe and is a proposition

component_run_get_arg_parser(component_spec_path)

Tests is module run.py has function get_arg_parser(parser)

Source code in shrike/pipeline/testing/components.py
def component_run_get_arg_parser(component_spec_path):
    """Tests is module run.py has function get_arg_parser(parser)"""
    definition, use_component_sdk = component_spec_yaml_exists_and_is_parsable(
        component_spec_path
    )

    run_py_command, definition_command = find_run_py_in_command(
        definition, use_component_sdk
    )

    component_import_path = os.path.dirname(component_spec_path)
    run_py_absdir = os.path.join(component_import_path, run_py_command)
    assert os.path.isfile(
        run_py_absdir
    ), "Component {} has command {} using a python script {} that cannot be found".format(
        component_spec_path, definition_command, run_py_command
    )

    if component_import_path not in sys.path:
        sys.path.insert(0, component_import_path)

    try:
        assert os.path.isfile(
            run_py_absdir
        ), f"module command {run_py_absdir} should exist"
        get_arg_parser_func = import_and_test_class(run_py_absdir, "get_arg_parser")
    except:
        assert (
            False
        ), "importing {} function get_arg_parser() resulted in an exception: {}".format(
            run_py_absdir, traceback.format_exc()
        )

    try:
        returned_parser = get_arg_parser_func()
    except:
        assert (
            False
        ), "Component script {}.get_arg_parser() should be able to run on argparse.ArgumentParser, but raised an exception: {}".format(
            run_py_absdir, traceback.format_exc()
        )

    assert (
        returned_parser is not None
    ), "component script {}.get_arg_parser() is supposed to return a parser when provided with None, please add 'return parser' at the end of the function.".format(
        run_py_absdir
    )

    try:
        parser = argparse.ArgumentParser()
        returned_parser = get_arg_parser_func(parser)
    except:
        assert (
            False
        ), "Component script {}.get_arg_parser() should be able to run on argparse.ArgumentParser, but raised an exception: {}".format(
            run_py_absdir, traceback.format_exc()
        )

    assert (
        returned_parser is not None
    ), "Component script {}.get_arg_parser() is not supposed to return None, please add 'return parser' at the end of the function.".format(
        run_py_absdir
    )

    # test object equality
    assert (
        returned_parser is parser
    ), "Component script {}.get_arg_parser() is supposed to return the parser it was provided, please do not create a new instance if provided with a parser.".format(
        run_py_absdir
    )

    return parser

component_run_py_import(component_spec_path)

Try importing run.py, just to check if basic script passes syntax/imports checks

Source code in shrike/pipeline/testing/components.py
def component_run_py_import(component_spec_path):
    """Try importing run.py, just to check if basic script passes syntax/imports checks"""
    definition, use_component_sdk = component_spec_yaml_exists_and_is_parsable(
        component_spec_path
    )

    run_py_command, definition_command = find_run_py_in_command(
        definition, use_component_sdk
    )

    component_import_path = os.path.dirname(component_spec_path)
    run_py_absdir = os.path.join(component_import_path, run_py_command)
    assert os.path.isfile(
        run_py_absdir
    ), "Component {} has command {} using a python script {} that cannot be found".format(
        component_spec_path, definition_command, run_py_command
    )

    if component_import_path not in sys.path:
        sys.path.insert(0, component_import_path)

    try:
        spec, mod = dynamic_import_module(run_py_absdir)
    except:
        assert False, "importing {} resulted in an exception: {}".format(
            run_py_absdir, traceback.format_exc()
        )

component_spec_yaml_exists_and_is_parsable(component_spec_path)

Checks component spec file

Source code in shrike/pipeline/testing/components.py
def component_spec_yaml_exists_and_is_parsable(component_spec_path):
    """Checks component spec file"""
    assert os.path.isfile(
        component_spec_path
    ), f"Component spec file under path {component_spec_path} could not be found"

    # opens file for testing schema
    with open(component_spec_path, "r") as ifile:
        component_spec_content = ifile.read()
        if (
            "$schema: http://" in component_spec_content
            or "$schema: https://" in component_spec_content
        ):
            use_component_sdk = True
        else:
            use_component_sdk = False

    # Block unit tests from working with module sdk if not enabled
    if not os.environ.get("MODULE_SDK_ENABLE"):
        assert (
            use_component_sdk
        ), "These unit tests are intentionnally blocked from support Module SDK, which is DEPRECATED. To bypass, create env variable MODULE_SDK_ENABLE."

    if use_component_sdk:
        try:
            definition = ComponentDefinition.load(component_spec_path)
        except BaseException as e:
            assert (
                False
            ), "Failed: failed to load (sdk 2.0) component yaml %r, exception=%r" % (
                component_spec_path,
                e,
            )
    else:
        try:
            with open(component_spec_path, "r") as ifile:
                definition = yaml.safe_load(ifile)
        except BaseException as e:
            assert (
                False
            ), "Failed: failed to load (old style) module yaml %r, exception=%r" % (
                component_spec_path,
                e,
            )

    return definition, use_component_sdk

component_uses_private_acr(component_spec_path, acr_url)

Tests base image in private ACR

Source code in shrike/pipeline/testing/components.py
def component_uses_private_acr(component_spec_path, acr_url):
    """Tests base image in private ACR"""
    definition, use_component_sdk = component_spec_yaml_exists_and_is_parsable(
        component_spec_path
    )

    if use_component_sdk:
        component_uses_private_acr_componentsdk(
            component_spec_path, definition, acr_url
        )
    else:
        component_uses_private_acr_modulesdk(component_spec_path, definition, acr_url)

component_uses_private_acr_componentsdk(component_spec_path, definition, acr_url)

Tests base image in private ACR

Source code in shrike/pipeline/testing/components.py
def component_uses_private_acr_componentsdk(component_spec_path, definition, acr_url):
    """Tests base image in private ACR"""
    definition_type = definition.type
    if definition_type in [
        ComponentType.HDInsightComponent,
        ComponentType.ScopeComponent,
        ComponentType.DataTransferComponent,
    ]:
        return

    try:
        base_image_url = definition.environment.docker.image
    except KeyError:
        base_image_url = None
        pass

    if base_image_url is not None:
        assert base_image_url.startswith(
            acr_url
        ), "Component {} baseImage should be drawn from polymerprod, instead found url {}".format(
            component_spec_path, base_image_url
        )

component_uses_private_acr_modulesdk(component_spec_path, definition, acr_url)

Tests base image in private ACR

Source code in shrike/pipeline/testing/components.py
def component_uses_private_acr_modulesdk(component_spec_path, definition, acr_url):
    """Tests base image in private ACR"""
    try:
        base_image_url = definition["implementation"]["container"]["amlEnvironment"][
            "docker"
        ]["baseImage"]
    except KeyError:
        base_image_url = None
        pass

    if base_image_url is not None:
        assert base_image_url.startswith(
            acr_url
        ), "Component(1.5) {} baseImage should be drawn from polymerprod, instead found url {}".format(
            component_spec_path, base_image_url
        )

component_uses_private_python_feed(component_spec_path, feed_url)

Tests private python feed referenced in conda

Source code in shrike/pipeline/testing/components.py
def component_uses_private_python_feed(component_spec_path, feed_url):
    """Tests private python feed referenced in conda"""
    definition, use_component_sdk = component_spec_yaml_exists_and_is_parsable(
        component_spec_path
    )

    if use_component_sdk:
        if definition.type in [
            ComponentType.HDInsightComponent,
            ComponentType.ScopeComponent,
            ComponentType.DataTransferComponent,
        ]:
            return

        try:
            conda_deps_yaml = definition.environment.conda.conda_dependencies._to_dict()
        except KeyError:
            conda_deps_yaml = None
            pass
    else:
        job_type = str(definition["jobType"]).lower()
        if job_type in ["hdinsight", "scopecomponent", "datatransfercomponent"]:
            # hdi/scope/datatransfer jobs don't have python feed
            return
        if job_type == "parallel":
            try:
                conda_deps_path = definition["implementation"]["parallel"][
                    "amlEnvironment"
                ]["python"]["condaDependenciesFile"]
            except KeyError:
                conda_deps_path = None
                pass
        else:
            try:
                conda_deps_path = definition["implementation"]["container"][
                    "amlEnvironment"
                ]["python"]["condaDependenciesFile"]
            except KeyError:
                conda_deps_path = None
                pass

        if conda_deps_path is None:
            # no conda yaml provided, nothing to do here
            return

        conda_deps_abspath = os.path.join(
            os.path.dirname(component_spec_path), conda_deps_path
        )
        assert os.path.isfile(
            conda_deps_abspath
        ), "Component {} specified a conda_dependencies_file {} that cannot be found (abspath: {})".format(
            component_spec_path, conda_deps_path, conda_deps_abspath
        )

        try:
            with open(conda_deps_abspath, "r") as ifile:
                conda_deps_yaml = yaml.safe_load(ifile)
        except:
            assert (
                False
            ), "Component {} conda_dependencies_file under path {} should be yaml parsable, but loading it raised an exception: {}".format(
                component_spec_path, conda_deps_abspath, traceback.format_exc()
            )

    if conda_deps_yaml is None:
        # no conda yaml provided, nothing to do here
        return

    if "channels" in conda_deps_yaml:
        assert conda_deps_yaml["channels"] == [
            "."
        ], "In conda deps, no channels must be specified, or use . as channel"

    if "dependencies" in conda_deps_yaml:
        for entry in conda_deps_yaml["dependencies"]:
            if "pip" in entry and isinstance(entry, dict):
                assert (
                    f"--index-url {feed_url}" in entry["pip"]
                ), "conda deps must reference private python feed under pip dependencies."

find_run_py_in_command(definition, use_component_sdk)

Finds runnable python script in command

Source code in shrike/pipeline/testing/components.py
def find_run_py_in_command(definition, use_component_sdk):
    """Finds runnable python script in command"""
    run_py_command, definition_command = None, None
    if use_component_sdk:
        definition_type = definition.type
        if definition_type == ComponentType.HDInsightComponent:
            run_py_command = definition.file
            definition_command = definition.args
        elif definition_type == ComponentType.DistributedComponent:
            # run_py_command not provided, we need to find it
            definition_command = definition.launcher.additional_arguments
        elif definition_type == ComponentType.ParallelComponent:
            run_py_command = definition.entry
            definition_command = definition.args
        elif definition_type == ComponentType.CommandComponent:
            # run_py_command not provided, we need to find it
            definition_command = definition.command
        elif definition_type not in [
            ComponentType.ScopeComponent,
            ComponentType.DataTransferComponent,
        ]:
            raise Exception(
                f"Component type {definition_type} is not supported in the helper code unit tests (yet)."
            )

        if (
            run_py_command is None
            and definition.type != ComponentType.ScopeComponent
            and definition.type != ComponentType.DataTransferComponent
        ):
            # search for python script
            for entry in definition_command.split(" "):
                if entry.endswith(".py"):
                    run_py_command = entry
                    break
            else:
                assert (
                    False
                ), "Could not find any script name like *.py in component command {}".format(
                    definition_command
                )
    else:
        job_type = str(definition["jobType"]).lower()
        if job_type == "hdinsight":
            run_py_command = definition["implementation"]["hdinsight"]["file"]
            definition_command = run_py_command
        elif job_type == "parallel":
            run_py_command = definition["implementation"]["parallel"]["entry"]
            definition_command = run_py_command
        elif job_type not in ["scopecomponent", "datatransfercomponent"]:
            definition_command = definition["implementation"]["container"]["command"]
            for entry in definition_command:
                if entry.endswith(".py"):
                    run_py_command = entry
                    break
            else:
                assert (
                    False
                ), "Could not find any script name like *.py  in component command {}".format(
                    definition_command.split(" ")
                )

    return run_py_command, definition_command

generate_component_arguments_componentsdk(component_spec, arg, output_script_arguments)

Recursively generate fake arguments to test script argparse.

Parameters:

Name Type Description Default
component_spec dict

module specification in yaml

required
arg list or str or dict)

argument specification

required
output_script_arguments list)

output

required

Returns:

Type Description
list

output_script_arguments

Source code in shrike/pipeline/testing/components.py
def generate_component_arguments_componentsdk(
    component_spec, arg, output_script_arguments
):
    """Recursively generate fake arguments to test script argparse.

    Args:
        component_spec (dict): module specification in yaml
        arg (list or str or dict) : argument specification
        output_script_arguments (list) : output

    Returns:
        list: output_script_arguments
    """
    log.info(f"generate_component_arguments(spec, {arg}, ...)")
    if isinstance(arg, list):  # optional argument or root list
        for entry in arg:
            generate_component_arguments_componentsdk(
                component_spec, entry, output_script_arguments
            )
    elif isinstance(arg, str) and arg.startswith("{"):
        io_key = arg.lstrip("{").rstrip("}")
        if io_key.startswith("inputs."):
            input_key = io_key[7:]
            log.info("inputs keys: " + " ".join([key for key in component_spec.inputs]))
            log.info(
                "parameter keys: "
                + " ".join([key for key in component_spec.parameters])
            )
            if input_key in component_spec.inputs:
                output_script_arguments.append(
                    str(
                        _generate_fake_input_arg_componentsdk(
                            component_spec.inputs[input_key]
                        )
                    )
                )
            elif input_key in component_spec.parameters:
                output_script_arguments.append(
                    str(
                        _generate_fake_input_arg_componentsdk(
                            component_spec.parameters[input_key]
                        )
                    )
                )
            else:
                raise Exception(
                    f"Input key {input_key} is neither an input or a parameter"
                )
        elif io_key.startswith("outputs."):
            output_key = io_key[8:]
            log.info(
                "outputs keys: " + " ".join([key for key in component_spec.outputs])
            )
            output_script_arguments.append(
                str(
                    _generate_fake_input_arg_componentsdk(
                        component_spec.outputs[output_key]
                    )
                )
            )
        else:
            raise NotImplementedError(
                "In argument spec {}, I/O key arg spec is not supported {}".format(
                    arg, io_key
                )
            )
    elif isinstance(arg, str):
        output_script_arguments.append(arg)
    elif isinstance(arg, dict):  # for old module def
        if "inputValue" in arg:
            # find in inputs
            for i_spec in component_spec.inputs:
                if i_spec["name"] == arg["inputValue"]:
                    output_script_arguments.append(
                        str(_generate_fake_input_arg_componentsdk(i_spec))
                    )
        elif "inputPath" in arg:
            # find in inputs
            for i_spec in component_spec.inputs:
                if i_spec["name"] == arg["inputPath"]:
                    output_script_arguments.append(
                        str(_generate_fake_input_arg_componentsdk(i_spec))
                    )
        elif "outputPath" in arg:
            # find in outputs
            output_script_arguments.append("/mnt/fakeoutputpath")

    return output_script_arguments

generate_component_arguments_modulesdk(module_spec, arg, output_script_arguments)

Recursively generate fake arguments to test script argparse.

Parameters:

Name Type Description Default
component_spec dict

module specification in yaml

required
arg list or str or dict)

argument specification

required
output_script_arguments list)

output

required

Returns:

Type Description
list

output_script_arguments

Source code in shrike/pipeline/testing/components.py
def generate_component_arguments_modulesdk(module_spec, arg, output_script_arguments):
    """Recursively generate fake arguments to test script argparse.

    Args:
        component_spec (dict): module specification in yaml
        arg (list or str or dict) : argument specification
        output_script_arguments (list) : output

    Returns:
        list: output_script_arguments
    """
    if isinstance(arg, list):  # optional argument or root list
        for entry in arg:
            generate_component_arguments_modulesdk(
                module_spec, entry, output_script_arguments
            )
    elif isinstance(arg, str):
        output_script_arguments.append(arg)
    elif isinstance(arg, dict):
        if "inputValue" in arg:
            # find in inputs
            for i_spec in module_spec["inputs"]:
                if i_spec["name"] == arg["inputValue"]:
                    output_script_arguments.append(
                        str(_generate_fake_input_arg_modulesdk(i_spec))
                    )
        elif "inputPath" in arg:
            # find in inputs
            for i_spec in module_spec["inputs"]:
                if i_spec["name"] == arg["inputPath"]:
                    output_script_arguments.append(
                        str(_generate_fake_input_arg_modulesdk(i_spec))
                    )
        elif "outputPath" in arg:
            # find in outputs
            output_script_arguments.append("/mnt/fakeoutputpath")

    return output_script_arguments

if_arguments_from_component_spec_match_script_argparse(component_spec_path)

Tests alignment between spec arguments and script parser arguments

Source code in shrike/pipeline/testing/components.py
def if_arguments_from_component_spec_match_script_argparse(component_spec_path):
    """Tests alignment between spec arguments and script parser arguments"""
    # assuming we have a yaml spec file that is loadable
    definition, use_component_sdk = component_spec_yaml_exists_and_is_parsable(
        component_spec_path
    )

    # assuming we can import the get_arg_parser() function
    parser = component_run_get_arg_parser(component_spec_path)

    run_py_command, definition_command = find_run_py_in_command(
        definition, use_component_sdk
    )

    if use_component_sdk:
        arguments_spec = [
            entry.lstrip("[").rstrip("]") for entry in definition_command.split(" ")
        ]

        if arguments_spec[0].startswith("python"):
            arguments_spec.pop(0)
        if arguments_spec[0].endswith(".py"):
            arguments_spec.pop(0)

        script_arguments = []
        generate_component_arguments_componentsdk(
            definition, arguments_spec, script_arguments
        )
    else:
        job_type = str(definition["jobType"]).lower()
        if job_type == "hdinsight":
            arguments_spec = definition["implementation"]["hdinsight"]["args"]
        elif job_type == "parallel":
            arguments_spec = definition["implementation"]["parallel"]["args"]
        elif job_type not in ["scopecomponent", "datatransfercomponent"]:
            arguments_spec = definition["implementation"]["container"]["args"]

        script_arguments = []
        generate_component_arguments_modulesdk(
            definition, arguments_spec, script_arguments
        )

    try:
        _, unknown_args = parser.parse_known_args(script_arguments)
    except:
        assert (
            False
        ), "Component {}, in run.py, parse_known_args() should be able to parse {}, instead raised an exception: {}".format(
            component_spec_path, script_arguments, traceback.format_exc()
        )
    assert (
        len(unknown_args) == 0
    ), "Component {}, while calling run.py with args {}, parsing arguments from module spec should not return unknown args, instead we observed unknown args : {}".format(
        component_spec_path, script_arguments, unknown_args
    )

script_main_with_synthetic_arguments(module, mocker)

Try importing run.py, just to check if basic script passes syntax/imports checks

Source code in shrike/pipeline/testing/components.py
def script_main_with_synthetic_arguments(module, mocker):
    """Try importing run.py, just to check if basic script passes syntax/imports checks"""
    paths = _get_module_paths(module)

    # assuming we have a yaml spec file that is loadable
    module_spec = module_spec_yaml_exists_and_is_parsable(module)

    # import module to get main() function
    if paths.module_spec_absdir not in sys.path:
        sys.path.insert(0, paths.module_spec_absdir)

    try:
        spec, mod = dynamic_import_module(paths.module_import_path)
    except:
        assert False, "importing {} resulted in an exception: {}".format(
            paths.module_import_path, traceback.format_exc()
        )

    if module_spec["jobType"].lower() == "hdinsight":
        arguments_spec = module_spec["implementation"]["hdinsight"]["args"]
    elif (
        module_spec["jobType"].lower() != "scopecomponent"
        and module_spec["jobType"].lower() != "datatransfercomponent"
    ):
        arguments_spec = module_spec["implementation"]["container"]["args"]
    script_arguments = []
    generate_argument(module_spec, arguments_spec, script_arguments)

    log.info(script_arguments)
    # https://medium.com/python-pandemonium/testing-sys-exit-with-pytest-10c6e5f7726f
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        mod.main(script_arguments + ["-h"])

    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 0