LS-DYNA job submission#
This example shows how to submit an LS-DYNA job to HPS. Once submitted, minimal job
information is serialized to a rep_job.json
file. This mimics what an app would need
to store to subsequently monitor the job and download results.
The job consists of two tasks:
The first task runs the actual LS-DYNA simulation.
The second task runs a small LS-PrePost script to postprocess the results of the first task.
Usage
python lsdyna_job.py submit
python lsdyna_job.py monitor
python lsdyna_job.py download
Notes
This example only runs on the Windows platform as the LS-PrePost task must open the LS-PREPOST GUI.
The
download
action requires thetqdm
andhumanize
packages to show a progress bar during the download of the result files. You can install these packages with this command:python -m pip install tqdm humanize
You can dowlonad the ZIP file
for
the LS-DYNA job submission example and use a tool such as 7-Zip to extract the files.
Here is the project_setup.py
script for this example:
"""
Script showing how to submit an LS-DYNA job to HPS.
Once submitted, minimal job information is serialized to a ``hps_job.json`` file.
This mimics what an app would need to store to subsequently monitor the job and download results.
The job consists of two tasks:
- The first task runs the actual LS-DYNA simulation.
- The second task runs a small LS-PrePost script to postprocess
the results of the first task.
Usage:
$ python lsdyna_job.py submit
$ python lsdyna_job.py monitor
$ python lsdyna_job.py download
"""
import argparse
import json
import logging
import os
import time
from ansys.hps.client import Client, HPSError, __ansys_apps_version__
from ansys.hps.client.jms import (
File,
JmsApi,
Job,
JobDefinition,
Project,
ProjectApi,
ResourceRequirements,
Software,
SuccessCriteria,
TaskDefinition,
)
log = logging.getLogger(__name__)
class HPSJob:
"""
Simplistic helper class to store job information similarly to
what a pre/post processing application would do.
"""
def __init__(
self,
hps_url=None,
project_id=None,
job_definition_id=None,
job_id=None,
auth_token=None,
task_ids=[],
):
self.hps_url = hps_url
self.project_id = project_id
self.job_definition_id = job_definition_id
self.job_id = job_id
self.auth_token = auth_token
self.task_ids = task_ids
def __str__(self):
repr = json.dumps(self, default=lambda x: x.__dict__, sort_keys=True, indent=4)
return f"HPS Job:\n{repr}" # noqa: E231
def save(self):
"""Save job info to JSON file"""
with open("hps_job.json", "w") as f:
f.write(json.dumps(self, default=lambda x: x.__dict__, sort_keys=True, indent=4))
@classmethod
def load(cls):
"""Load job info from JSON file"""
with open("hps_job.json", "r") as f:
job = json.load(f, object_hook=lambda d: cls(**d))
return job
def submit_job(
client,
name,
version=__ansys_apps_version__,
use_exec_script=True,
distributed=False,
) -> HPSJob:
"""Create an HPS project running a simple LS-DYNA
job simulating the impact of a cylinder made of Aluminum
against a plate made of steel.
"""
log.info("=== Connect to the HPS server")
jms_api = JmsApi(client)
log.info("=== Create an empty project")
proj = Project(name=name, priority=1, active=True)
proj = jms_api.create_project(proj)
project_api = ProjectApi(client, proj.id)
log.info("=== Create file resources")
cwd = os.path.dirname(__file__)
files = []
# input files
files.append(
File(
name="inp",
evaluation_path="cylinder_plate.k",
type="text/plain",
src=os.path.join(cwd, "cylinder_plate.k"),
)
)
files.append(
File(
name="post_commands",
evaluation_path="postprocess.cfile",
type="text/plain",
src=os.path.join(cwd, "postprocess.cfile"),
)
)
# output files
files.append(
File(name="d3hsp", evaluation_path="d3hsp", type="text/plain", collect=True, monitor=False)
)
if distributed:
files.append(
File(
name="messag",
evaluation_path="mes0000",
type="text/plain",
collect=True,
monitor=True,
)
) # for distributed run
else:
files.append(
File(
name="messag",
evaluation_path="messag",
type="text/plain",
collect=True,
monitor=True,
)
) # for shared memory run
files.append(
File(
name="d3plot",
evaluation_path="d3plot*",
type="application/octet-stream",
collect=True,
monitor=False,
)
)
files.append(
File(name="img", evaluation_path="**.png", type="image/png", collect=True, monitor=False)
)
files.append(
File(
name="movie",
evaluation_path="**.wmv",
type="video/x-ms-wmv",
collect=True,
monitor=False,
)
)
files = project_api.create_files(files)
file_ids = {f.name: f.id for f in files}
log.info("=== Simulation workflow")
job_def = JobDefinition(name="JobDefinition.1", active=True)
# Define process steps (task definitions)
ls_dyna_command = "%executable% i=%file:inp% ncpu=%resource:num_cores% memory=300m"
if distributed:
ls_dyna_command = "%executable% -dis -np %resource:num_cores% i=%file:inp% memory=300m"
task_defs = []
task_def1 = TaskDefinition(
name="LS-DYNA Run",
software_requirements=[Software(name="Ansys LS-DYNA", version=version)],
execution_command=ls_dyna_command,
max_execution_time=3600.0,
resource_requirements=ResourceRequirements(
num_cores=6,
memory=6000 * 1024 * 1024,
disk_space=4000 * 1024 * 1024,
distributed=distributed,
),
execution_level=0,
num_trials=1,
input_file_ids=[file_ids["inp"]],
output_file_ids=[file_ids["d3hsp"], file_ids["messag"], file_ids["d3plot"]],
success_criteria=SuccessCriteria(
return_code=0,
required_output_file_ids=[file_ids["d3plot"]],
require_all_output_parameters=False,
require_all_output_files=False,
),
)
if use_exec_script:
exec_script_file = project_api.copy_default_execution_script(
f"lsdyna-v{version[2:4]}{version[6]}-exec_lsdyna.py"
)
task_def1.use_execution_script = True
task_def1.execution_script_id = exec_script_file.id
task_defs.append(task_def1)
task_def2 = TaskDefinition(
name="LS-PrePost Run",
software_requirements=[Software(name="Ansys LS-PrePost", version=version)],
execution_command="%executable% c=%file:post_commands%",
max_execution_time=600.0,
resource_requirements=ResourceRequirements(
num_cores=2,
memory=3000,
disk_space=4000,
distributed=False,
platform="windows",
),
execution_level=1,
num_trials=1,
input_file_ids=[file_ids["post_commands"], file_ids["d3plot"]],
output_file_ids=[file_ids["img"], file_ids["movie"]],
success_criteria=SuccessCriteria(
required_output_file_ids=[file_ids["movie"]],
require_all_output_parameters=False,
require_all_output_files=False,
),
)
task_defs.append(task_def2)
task_definitions = project_api.create_task_definitions(task_defs)
# Create job definition in project
job_def = JobDefinition(name="JobDefinition.1", active=True)
job_def.task_definition_ids = [td.id for td in task_definitions]
job_def = project_api.create_job_definitions([job_def])[0]
# Refresh the parameters
params = project_api.get_parameter_definitions(id=job_def.parameter_definition_ids)
log.debug("=== Jobs")
job = Job(
eval_status="pending",
name="Nominal: 56.7km/h (30ms sim time)",
job_definition_id=job_def.id,
)
job = project_api.create_jobs([job])[0]
log.info(f"Created project '{proj.name}', ID='{proj.id}'")
app_job = HPSJob()
app_job.project_id = proj.id
app_job.job_definition_id = job_def.id
app_job.job_id = job.id
app_job.hps_url = client.url
app_job.auth_token = client.refresh_token
return app_job
def monitor_job(app_job: HPSJob):
"""
Monitor the evaluation status of an existing HPS job.
"""
# Since the auth token is stored in the job info, there's no need
# to enter user and password anymore to connect to the HPS server.
client = Client(url=app_job.hps_url, refresh_token=app_job.auth_token)
project_api = ProjectApi(client, app_job.project_id)
job = project_api.get_jobs(id=app_job.job_id)[0]
while job.eval_status not in ["evaluated", "timeout", "failed", "aborted"]:
time.sleep(2)
log.info(
f"Waiting for job {job.name} to complete "
f"[{client.url}/jms/#/projects/{app_job.project_id}/jobs/{job.id}] ... "
)
job = project_api.get_jobs(id=job.id)[0]
tasks = project_api.get_tasks(job_id=job.id)
for task in tasks:
log.info(
f"Task {task.task_definition_snapshot.name}(id={task.id}) is {task.eval_status}"
)
log.info(f"Job {job.name} final status: {job.eval_status}")
return
def download_results(app_job: HPSJob):
"""
Download the job output files (if any).
This method requires the packages ``tqdm`` and ``humanize``.
You can install them with:
python -m pip install tqdm humanize
"""
try:
from tqdm import tqdm
except ImportError as e:
log.error("The 'tqdm' package is not installed. Please pip install it")
return
try:
import humanize
except ImportError as e:
log.error("The 'humanize' package is not installed. Please pip install it")
return
# Since the auth token is stored in the job info, there's no need
# to enter user and password anymore to connect to the HPS server.
client = Client(url=job.hps_url, refresh_token=app_job.auth_token)
project_api = ProjectApi(client, app_job.project_id)
tasks = project_api.get_tasks(job_id=app_job.job_id)
for task in tasks:
if not task.output_file_ids:
log.info(f"No files are available on the server for Task {task.id}")
continue
query_params = {"id": task.output_file_ids, "hash.ne": "null"}
files = project_api.get_files(content=False, **query_params)
for f in files:
print(f)
log.info(
f"{len(files)} files are available on the server for Task "
f"{task.id}: {', '.join([f.evaluation_path for f in files])}"
)
for file in files:
target_folder = os.path.join("job_results", task.task_definition_snapshot.name)
download_path = os.path.join(target_folder, file.evaluation_path)
if os.path.exists(download_path):
if file.size == os.path.getsize(download_path):
log.info(
f"Skip download of file "
f"{file.evaluation_path} ({humanize.naturalsize(file.size)})"
)
continue
else:
log.warning(
f"{file.evaluation_path} already exists: "
f"size on server: {humanize.naturalsize(file.size)}, "
f"size on disk {humanize.naturalsize(os.path.getsize(download_path))} MB"
)
log.info(
f"Start download of file {file.evaluation_path} ({humanize.naturalsize(file.size)})"
)
with tqdm(
total=file.size,
unit="B",
unit_scale=True,
desc=file.evaluation_path,
initial=0,
ascii=True,
ncols=100,
) as pbar:
project_api.download_file(
file, target_folder, progress_handler=lambda chunk_size: pbar.update(chunk_size)
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"action", default="submit", choices=["submit", "monitor", "download"], help="Action to run"
)
parser.add_argument("-n", "--name", type=str, default="LS-DYNA Cylinder Plate")
parser.add_argument("-es", "--use-exec-script", default=True, type=bool)
parser.add_argument("-ds", "--distributed", default=False, type=bool)
parser.add_argument("-U", "--url", default="https://localhost:8443/hps")
parser.add_argument("-u", "--username", default="repuser")
parser.add_argument("-p", "--password", default="repuser")
parser.add_argument("-v", "--ansys-version", default=__ansys_apps_version__)
args = parser.parse_args()
logger = logging.getLogger()
logging.basicConfig(format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.INFO)
log.debug("=== HPS connection")
try:
log.info(f"HPS URL: {args.url}")
if args.action == "submit":
# create client with offline_access scope
# to persist a long-lived refresh token
client = Client(
url=args.url,
username=args.username,
password=args.password,
scope="openid offline_access",
)
job = submit_job(
client=client,
name=args.name,
version=args.ansys_version,
use_exec_script=args.use_exec_script,
distributed=args.distributed,
)
job.save()
elif args.action == "monitor":
job = HPSJob.load()
log.info(job)
monitor_job(job)
elif args.action == "download":
job = HPSJob.load()
log.info(job)
download_results(job)
except HPSError as e:
log.error(str(e))