This notebook demonstrates how to use Globus within CyberGIS-Compute to retrieve a large amount of outputs generated by a model executed on HPC, which is often needed for postprocessing work performed on CJW. A new “data transfer” job type is provided for moving data from HPC back to the CJW Jupyter environment. Under the hood, this new job type utilizes the Globus service (https://www.globus.org/) to perform a point-to-point data transfer between HPC and CJW.
In this demo, we will first prepare a 60-member ensemble SUMMA mode and submit it to the XSEDE Expanse HPC for execution using the CyberGIS-Compute. When the model run is finished, we won't use the regular "download" function in the Compute SDK to retrieve the results. Instead, we submit another Globus job to the Compute, which will hand it off to the Globus scheduler and monitor the process (just like talking Slurm scheduler on HPC in the case of regular model submission). Please refer to the example notebook below for more details.
FYI, a deeper integration of Globus in the Compute is under active development, which will build automatic bi-directional data transfer between HPC and CJW Jupyter (or any accessible Globus endpoints) into regular model submission (eliminating the need to submit a separate “data transfer” job);
For more info on this resource https://www.hydroshare.org/resource/13d6b84a9553410297a67fa366a56cb2/
resource_id = '13d6b84a9553410297a67fa366a56cb2'
import json
import os
from hs_restclient import HydroShare, HydroShareAuthBasic
auth = HydroShareAuthBasic("cybergis", "demo")
hs = HydroShare(auth=auth)
base_dir = "/home/jovyan/work"
download_dir = os.path.join(base_dir, 'Downloads')
!mkdir -p {download_dir}
hs.getResource(resource_id, destination=download_dir, unzip=True)
#Unzip model file
model_folder_name = "SummaModel_ReynoldsAspenStand_StomatalResistance"
content_folder = os.path.join(download_dir ,"{}/{}/data/contents".format(resource_id, resource_id))
file_manager_rel_path = "settings/summa_fileManager_riparianAspenSimpleResistance.txt"
import tempfile
workspace_dir = os.path.join(base_dir, 'workspace')
!mkdir -p {workspace_dir}
unzip_dir = tempfile.mkdtemp(dir=workspace_dir)
!cd {content_folder} && unzip -o {model_folder_name}.zip -d {unzip_dir}
print("Unzipping Done")
model_source_folder_path = os.path.join(unzip_dir, model_folder_name)
!cd {model_source_folder_path} && chmod +x ./installTestCases_local.sh
!cd {model_source_folder_path} && ./installTestCases_local.sh
import pysumma as ps
import os
executable = "/usr/bin/summa.exe"
! {executable} --version
# path to the SUMMA filemanager file on Jupyter
file_manager = os.path.join(model_source_folder_path, file_manager_rel_path)
print(file_manager)
# Create pySUMMA Simulation Object
S = ps.Simulation(executable, file_manager)
# Configure the model
S.manager['simStartTime'].value = "2006-07-01 00:00"
S.manager['simEndTime'].value = "2007-09-30 00:00"
# Save configiuration to disk
S.manager.write()
print(S.decisions)
import numpy as np
import json
# create ensemble
# different parameterizations
# 0, 0.05, 0.1 .... 0.95, 1.0
l = list(map(lambda i: round(i*0.01, 2), range(0, 100, 5)))
param_options = {
'rootDistExp': np.array(l)
}
# different parameterizations
decision_options = {
"stomResist": ["BallBerry", "Jarvis", "simpleResistance"]
}
config = ps.ensemble.total_product(dec_conf=decision_options, param_trial_conf=param_options)
with open(os.path.join(model_source_folder_path, 'summa_options.json'), 'w') as outfile:
json.dump(config, outfile)
# check ensemble parameters
print("Number of ensemble runs: {}".format(len(config)))
print(json.dumps(config, indent=4, sort_keys=True)[:800])
print("...")
from job_supervisor_client import *
communitySummaSession = Session('summa', isJupyter=True)
communitySummaJob = communitySummaSession.job() # create new job
communitySummaJob.upload(model_source_folder_path)
communitySummaJob.submit(payload={
"node": 60,
"machine": "expanse",
"file_manager_rel_path": file_manager_rel_path
})
communitySummaJob.events(liveOutput=True)
# job_dir = os.path.join(workspace_dir, "{}".format(communitySummaJob.id))
# !mkdir -p {job_dir}/output
# communitySummaJob.download(job_dir)
communitySummaJob.id
summa_job_dir = os.path.join(workspace_dir, "{}".format(communitySummaJob.id))
globus_job_dir = os.path.join(summa_job_dir, "globus_{}".format(str(int(time.time()))))
globus_output_dir = os.path.join(globus_job_dir, "output")
!mkdir -p {globus_output_dir}
job_folder_name_hpc="SUMMA_{}".format(communitySummaJob.id)
jupyter_output_rel_path = globus_output_dir.replace("/home/jovyan/work/", "")
globus_job_json= dict(jupyter_user=os.environ["JUPYTERHUB_USER"],
hpc="expanse",
job_folder_name=job_folder_name_hpc,
jupyter_output_path=jupyter_output_rel_path)
globus_job_json_path = os.path.join(globus_job_dir, "globus.json")
with open(globus_job_json_path, 'w') as outfile:
json.dump(globus_job_json, outfile)
! cat {globus_job_json_path}
globus_session = Session("globus", isJupyter=True)
globus_job = globus_session.job()
globus_job.upload(globus_job_dir)
globus_job.submit()
# monitor job execution
globus_job.events(liveOutput=True)
!du {globus_output_dir} -h
!ls {globus_output_dir}/1*/output -al