CFX static mixer#

This example shows how to submit a CFX static mixer model for solving on Ansys HPS.

You can download the ZIP file for the CFX static mixer example and use a tool such as 7-Zip to extract the files.

Here is the project_setup.py script for this example:

project_setup.py#
"""
Example script to set up a simple CFX project in PyHPS.
"""
import argparse
import logging
import os

from ansys.hps.client import Client, HPSError, __ansys_apps_version__
from ansys.hps.client.jms import (
    File,
    JmsApi,
    Job,
    JobDefinition,
    Licensing,
    Project,
    ProjectApi,
    ResourceRequirements,
    Software,
    SuccessCriteria,
    TaskDefinition,
)

log = logging.getLogger(__name__)


def create_project(client, name, num_jobs=20, version=__ansys_apps_version__):
    """
    Create an HPS project consisting of an ANSYS CFX model.
    """
    jms_api = JmsApi(client)
    log.debug("=== Project")
    proj = Project(name=name, priority=1, active=True)
    proj = jms_api.create_project(proj, replace=True)

    project_api = ProjectApi(client, proj.id)

    log.debug("=== Files")
    cwd = os.path.dirname(__file__)
    files = []
    files.append(
        File(
            name="ccl",
            evaluation_path="runInput.ccl",
            type="text/plain",
            src=os.path.join(cwd, "runInput.ccl"),
        )
    )
    files.append(
        File(
            name="inp",
            evaluation_path="StaticMixer_001.cfx",
            type="application/octet-stream",
            src=os.path.join(cwd, "StaticMixer_001.cfx"),
        )
    )
    files.append(
        File(
            name="def",
            evaluation_path="StaticMixer_001.def",
            type="application/octet-stream",
            src=os.path.join(cwd, "StaticMixer_001.def"),
        )
    )

    files.append(
        File(
            name="exec_cfx",
            evaluation_path="exec_cfx.py",
            type="application/x-python-code",
            src=os.path.join(cwd, "exec_cfx.py"),
        )
    )

    files.append(
        File(
            name="out",
            evaluation_path="StaticMixer_*.out",
            type="text/plain",
            collect=True,
            monitor=True,
        )
    )
    files.append(
        File(
            name="res",
            evaluation_path="StaticMixer_*.res",
            type="text/plain",
            collect=True,
            monitor=False,
        )
    )

    files = project_api.create_files(files)
    file_ids = {f.name: f.id for f in files}

    log.debug("=== JobDefinition with simulation workflow and parameters")
    job_def = JobDefinition(name="JobDefinition.1", active=True)

    # Task definition
    num_input_files = 4
    task_def = TaskDefinition(
        name="CFX_run",
        software_requirements=[
            Software(name="Ansys CFX", version=version),
        ],
        execution_command=None,  # only execution script supported initially
        resource_requirements=ResourceRequirements(
            num_cores=1.0,
            memory=250,
            disk_space=5,
        ),
        execution_level=0,
        execution_context={"cfx_cclFile": "runInput.ccl", "cfx_runName": "StaticMixer"},
        max_execution_time=50.0,
        num_trials=1,
        input_file_ids=[f.id for f in files[:num_input_files]],
        output_file_ids=[f.id for f in files[num_input_files:]],
        success_criteria=SuccessCriteria(
            return_code=0,
            required_output_file_ids=[file_ids["out"]],
            require_all_output_files=False,
        ),
        licensing=Licensing(enable_shared_licensing=False),  # Shared licensing disabled by default
    )

    task_def.use_execution_script = True
    task_def.execution_command = None
    task_def.execution_script_id = file_ids["exec_cfx"]

    task_defs = [task_def]
    task_defs = project_api.create_task_definitions(task_defs)

    job_def.task_definition_ids = [td.id for td in task_defs]

    # Create job_definition in project
    job_def = project_api.create_job_definitions([job_def])[0]

    job_def = project_api.get_job_definitions()[0]

    log.debug(f"=== Create {num_jobs} jobs")
    jobs = []
    for i in range(num_jobs):
        jobs.append(Job(name=f"Job.{i}", eval_status="pending", job_definition_id=job_def.id))
    jobs = project_api.create_jobs(jobs)

    log.info(f"Created project '{proj.name}', ID='{proj.id}'")

    return proj


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-n", "--name", type=str, default="cfx_static_mixer")
    parser.add_argument("-j", "--num-jobs", type=int, default=1)
    parser.add_argument("-U", "--url", default="https://127.0.0.1:8443/hps")
    parser.add_argument("-u", "--username", default="repuser")
    parser.add_argument("-p", "--password", default="repuser")
    parser.add_argument("-v", "--ansys-version", default=__ansys_apps_version__)
    args = parser.parse_args()

    logger = logging.getLogger()
    logging.basicConfig(format="%(message)s", level=logging.DEBUG)

    try:
        log.info("Connect to HPC Platform Services")
        client = Client(url=args.url, username=args.username, password=args.password)
        log.info(f"HPS URL: {client.url}")
        proj = create_project(
            client=client, name=args.name, num_jobs=args.num_jobs, version=args.ansys_version
        )

    except HPSError as e:
        log.error(str(e))

This example uses this exec_cfx.py execution script instead of a solver command line:

exec_cfx.py#
"""
Copyright (C) 2021 ANSYS, Inc. and its subsidiaries.  All Rights Reserved.
"""
import _thread
import json
import logging
import os
from os import path
import platform
import re
import shlex
import subprocess
import time
import traceback

from ansys.rep.common.logging import log
from ansys.rep.evaluator.task_manager import ApplicationExecution
from ansys.rep.evaluator.task_manager.context import SubmitContext


class CfxExecution(ApplicationExecution):
    isLinux = platform.platform().startswith("Linux")

    def __init__(self, context):
        self.active_run_name = None
        self.putative_run_name = None
        self.withSoftInterrupt = True
        ApplicationExecution.__init__(self, context)

    def publish_to_default_log(self, msg):
        log.info(msg)

    def publish_to_debug_log(self, msg):
        log.debug(msg)

    def execute(self):
        log.info("Start CFX execution script")

        try:
            log.info("Evaluator Platform: " + platform.platform())

            num_cores = self.context.resource_requirements["num_cores"]
            log.info(f"Requested cores: {num_cores}")

            # create defaults for inputs not provided
            inputs = {
                "cfx_additionalArgs": self.context.execution_context.get("cfx_additionalArgs", "")
            }
            inputs["cfx_solverFile"] = self.context.execution_context.get("cfx_solverFile", None)
            inputs["cfx_definitionFile"] = self.context.execution_context.get(
                "cfx_definitionFile", None
            )
            inputs["cfx_iniFile"] = self.context.execution_context.get("cfx_iniFile", None)
            inputs["cfx_cclFile"] = self.context.execution_context.get("cfx_cclFile", None)
            inputs["cfx_contFile"] = self.context.execution_context.get("cfx_contFile", None)
            inputs["cfx_mcontFile"] = self.context.execution_context.get("cfx_mcontFile", None)
            inputs["cfx_mdefFile"] = self.context.execution_context.get("cfx_mdefFile", None)
            inputs["cfx_parFile"] = self.context.execution_context.get("cfx_parFile", None)
            inputs["cfx_indirectPath"] = self.context.execution_context.get(
                "cfx_indirectPath", None
            )
            inputs["cfx_version"] = self.context.execution_context.get("cfx_version", None)
            inputs["cfx_useAAS"] = self.context.execution_context.get("cfx_useAAS", False)
            inputs["cfx_startMethod"] = self.context.execution_context.get("cfx_startMethod", None)
            inputs["cfx_runName"] = self.context.execution_context.get("cfx_runName", None)

            cclFile = next((f for f in self.context.input_files if f["name"] == "ccl"), None)
            if cclFile != None:
                inputs["cfx_cclFile"] = cclFile["path"]
                log.info("ccl file path: " + cclFile["path"])

            defFile = next((f for f in self.context.input_files if f["name"] == "def"), None)
            if defFile != None:
                inputs["cfx_definitionFile"] = defFile["path"]
                log.info("def file path: " + defFile["path"])

            self.publish_to_default_log(
                "Task inputs after applying default values to missing inputs:"
            )
            for name in inputs.keys():
                if inputs[name] == None:
                    continue
                self.publish_to_default_log("\t-" + name + ":<" + str(inputs[name]) + ">")

            # Check existence of files which must exist if specified
            inputs_existchk = [
                "cclFile",
                "contFile",
                "definitionFile",
                "iniFile",
                "mcontFile",
                "mdefFile",
                "parFile",
                "solverFile",
            ]

            self.publish_to_default_log("Checking if provided files exist in the storage...")
            for i in inputs_existchk:
                k = "cfx_" + i
                if not inputs[k] == None:
                    if not os.path.isfile(inputs[k]):
                        raise Exception("Required file does not exist!\n" + inputs[k])

            if not inputs["cfx_indirectPath"] == None:
                # Special check for indirect startup and set active name for later use
                rundir = inputs["cfx_indirectPath"] + ".dir"
                if not os.path.isdir(rundir):
                    raise Exception("Required directory does not exist!\n" + rundir)
                startup_ccl = rundir + "/startup.ccl"
                if not os.path.isfile(startup_ccl):
                    raise Exception(startup_ccl)
                self.active_run_name = inputs["cfx_indirectPath"]
            else:
                # Set putative run name from input file
                for i in ["definitionFile", "mdefFile", "contFile", "iniFile", "mcontFile"]:
                    k = "cfx_" + i
                    if not inputs[k] == None:
                        probname = re.sub("(_\d{3})?\.[^\.]+$", "", inputs[k])
                        self.set_putative_run_name(probname)
                        break

                if self.putative_run_name == None and inputs["cfx_runName"] != None:
                    self.set_putative_run_name(inputs["cfx_runName"])

                # Set putative run name from -eg or -name value (-name always wins)
                if (
                    not inputs["cfx_additionalArgs"] == ""
                    and not inputs["cfx_additionalArgs"] == None
                ):
                    for opt in ["-eg", "-example", "-name"]:
                        m = re.search(opt + "\s+([^\s-]+)", inputs["cfx_additionalArgs"])
                        if m:
                            self.set_putative_run_name(m.group(1))

            # Identify application
            app_name = "Ansys CFX"
            app = next((a for a in self.context.software if a["name"] == app_name), None)
            assert app, f"{app_name} is required for execution"

            log.info("Using " + app["name"] + " " + app["version"])
            log.info("Current directory: " + os.getcwd())

            files = [f for f in os.listdir(".") if os.path.isfile(f)]
            for f in files:
                log.info("   " + f)

            # Determine CFX root directory, solver command and hosts
            self.publish_to_default_log("CFX Root directory = " + app["install_path"])

            exe = app["executable"]  # should already be platform specific
            self.publish_to_default_log("CFX Solver command: " + exe)

            # Create command line
            # Add parallel options
            cmd = [os.path.basename(exe)]
            cmd.append("-fullname")
            cmd.append(self.active_run_name)
            cmd.append("-batch")
            cmd.append("-serial")

            # Add options requiring an argument
            options_arg = {
                "-ccl": "cclFile",
                "-continue-from-file": "contFile",
                "-def": "definitionFile",
                "-indirect-startup-path": "indirectPath",
                "-initial-file": "iniFile",
                "-mcont": "mcontFile",
                "-mdef": "mdefFile",
                "-parfile-read": "parFile",
                "-solver": "solverFile",
            }
            for opt, i in sorted(options_arg.items()):
                k = "cfx_" + i
                if not inputs[k] == None:
                    cmd.append(opt)
                    cmd.append(inputs[k])

            # Add additional options
            if not inputs["cfx_additionalArgs"] == "" and not inputs["cfx_additionalArgs"] == None:
                cmd.extend(shlex.split(inputs["cfx_additionalArgs"]))

            # Start the solver
            self.publish_to_default_log("CFX solver command line = " + str(cmd))

            rc = None
            self.CFXOutputFile = None
            self.CFXMonFile = None
            cfx_env = os.environ.copy()

            with subprocess.Popen(
                cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=cfx_env, executable=exe
            ) as self.proc:
                if self.proc == None:
                    raise Exception("CFX Solver did not start")
                self.publish_to_default_log("CFX solver started\npid:" + format(self.proc.pid))
                t1 = _thread.start_new_thread(self.process_output, (self.proc,))
                t2 = _thread.start_new_thread(self.process_error, (self.proc,))

                while rc is None:
                    rc = self.proc.poll()
                    time.sleep(1)

            # Post solution actions
            for msg in ["Finished CFX solve"]:
                self.publish_to_default_log(msg)

            if rc != 0:
                self.publish_to_default_log(f"Error: Solver exited with errors ({rc}).")
                raise Exception("Solver exited with errors.")

            return

        except Exception as e:
            self.publish_to_debug_log(traceback.print_exc())
            self.publish_to_default_log(str(e))
            raise e

    # Set putative run name from problem name (to be called BEFORE the run is started)
    def set_putative_run_name(self, probname):
        if self.active_run_name != None:
            return
        imax = 0
        for dI in os.listdir(os.getcwd()):
            m = re.match("^" + probname + "_(\d+)(\.(ansys|dir|out|res|mres|trn|cfx))?$", dI)
            if m:
                i = int(m.group(1))
                if i > imax:
                    imax = i
        prob_ext = str(imax + 1)
        self.putative_run_name = probname + "_" + prob_ext.zfill(3)
        self.active_run_name = self.putative_run_name
        self.publish_to_default_log("Set putative run name = " + self.putative_run_name)

    # Find active run name from putative run name (to be called AFTER the run is started)
    def find_active_run_name(self):
        # Putative run name set: Wait for output or run directory or output file to exist
        if self.active_run_name == None:
            if self.putative_run_name == None:
                raise Exception("Unable to find active run name. Putative run name not set.")
            outdir = path.join(os.getcwd(), self.putative_run_name)
            rundir = outdir + ".dir"
            outfile = outdir + ".out"
            while self.active_run_name == None:
                if path.isdir(outdir) or path.isdir(rundir) or path.isfile(outfile):
                    self.active_run_name = self.putative_run_name
                else:
                    time.sleep(1)
        return self.active_run_name

    # Monitor the stdout of the main process. If present, create log and log data.
    def process_output(self, proc):
        for line in iter(proc.stdout.readline, b""):
            msg = line.decode("utf-8").rstrip()
            self.publish_to_default_log(msg)
        proc.stdout.close()

    # Monitor the stderr of the main process. If present, create log and log data.
    def process_error(self, proc):
        for line in iter(proc.stderr.readline, b""):
            msg = line.decode("utf-8").rstrip()
            self.publish_to_default_log(msg)
        proc.stderr.close()


# EXAMPLE: this function will only be called if this script is run at the command line.
if __name__ == "__main__":
    log = logging.getLogger()
    logging.basicConfig(format="%(message)s", level=logging.DEBUG)

    try:
        log.info("Loading sample CFX context...")

        with open("cfx_context.json", "r") as f:
            context = json.load(f)
            print(context)

        submit_context = SubmitContext(**context)

        log.info("Executing...")
        ex = CfxExecution(submit_context).execute()
        log.info("Execution ended.")

    except Exception as e:
        log.error(str(e))