Skip to content

Commit

Permalink
Merge pull request #16 from coecms/reconnect
Browse files Browse the repository at this point in the history
Allow multiple running instances, reconnect to existing instance
  • Loading branch information
Scott Wales authored Apr 23, 2020
2 parents 8e8ea2d + 202fdbd commit 978e705
Showing 1 changed file with 41 additions and 25 deletions.
66 changes: 41 additions & 25 deletions gadi_jupyter
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ General Options:
-l: NCI username
-L: NCI login node (default 'gadi.nci.org.au')
-e: Conda environment
-d: Debug mode
Queue Options:
-q QUEUE: Queue name
Expand All @@ -35,10 +36,11 @@ NCPUS='1'
MEM=''
WALLTIME=1:00:00
JOBFS=100gb
CONDA_ENV=analysis3-20.01
CONDA_ENV=analysis3-20.04
DEBUG=""

# Handle arguments
optspec="hl:L:q:n:m:t:J:P:e:"
optspec="hl:L:q:n:m:t:J:P:e:d"
while getopts "$optspec" optchar; do
case "${optchar}" in
h)
Expand Down Expand Up @@ -72,6 +74,9 @@ while getopts "$optspec" optchar; do
e)
CONDA_ENV="${OPTARG}"
;;
d)
DEBUG=true
;;
*)
print_help
exit 2
Expand All @@ -80,7 +85,7 @@ while getopts "$optspec" optchar; do
done

# This gets evaluated on Gadi in the SSH script
WORKDIR=\$TMPDIR/runjp
WORKDIR=/scratch/${PROJECT:-\$PROJECT}/\$USER/tmp/runjp

SSH='ssh -oBatchMode=yes'
if [ -n "$USER" ]; then
Expand All @@ -90,6 +95,10 @@ if [ -z "$MEM" ]; then
MEM="$(( NCPUS * 4 ))gb"
fi

if [ $NCPUS -gt 48 ]; then
echo "WARNING: Using more than one node with Dask needs extra setup and is not supported by this script"
fi

SUBMITOPTS="-N jupyter-notebook ${PROJECT:+-P '$PROJECT'} -q '$QUEUE' -l 'ncpus=${NCPUS},mem=${MEM},walltime=${WALLTIME},jobfs=${JOBFS}'"

echo "Starting notebook on ${LOGINNODE}..."
Expand All @@ -100,7 +109,7 @@ $SSH "$LOGINNODE" true
echo "qsub ${SUBMITOPTS}"

# Kill the job if this top-level script is cancelled while the job is still in the queue
trap "{ echo 'Stopping queued job... (Ctrl-C will leave job in the queue)' ; $SSH \"$LOGINNODE\" <<< \"qdel \\\$(cat \\$WORKDIR/jobid)\" ; }" EXIT
trap "{ echo 'Stopping queued job... (Ctrl-C will leave job in the queue)' ; $SSH \"$LOGINNODE\" > /dev/null <<< 'qdel \$(cat $WORKDIR/jobid)' ; }" EXIT

message=$(
$SSH -q "$LOGINNODE" <<EOF | tail -n 1
Expand All @@ -109,6 +118,16 @@ set -eu
WORKDIR="$WORKDIR"
mkdir -p "\$WORKDIR"
# Check if already running
if [ -f "\$WORKDIR/jobid" ] && qstat \$(cat "\$WORKDIR/jobid") &> /dev/null; then
while [ ! -f "\$WORKDIR/message" ]; do
sleep 5
done
cat "\$WORKDIR/message" | sed 's/$/ RECONNECT/'
exit
fi
rm -f "\$WORKDIR/message"
cat > "\$WORKDIR/runjp.sh" <<EOQ
Expand Down Expand Up @@ -157,14 +176,21 @@ qsub $SUBMITOPTS -l "storage=\$storage" -j oe -o "\$WORKDIR/pbs.log" "\$WORKDIR/
while [ ! -f "\$WORKDIR/message" ]; do
sleep 5
done
cat "\$WORKDIR/message"
cat "\$WORKDIR/message" | sed 's/$/ NEW/'
EOF
)

echo "Remote Message: '$message'"
if [ -n "$DEBUG" ]; then
echo "DEBUG: Remote Message: '$message'"
fi

# Grab info from the PBS job
read jobhost token jobid remote_port <<< "$message"
read jobhost token jobid remote_port type <<< "$message"

if [ "$type" = "RECONNECT" ]; then
echo
echo "Existing jupyterlab found, reconnecting to that instead"
fi

# Find a local port
for local_port in {8888..9000}; do
Expand All @@ -173,9 +199,13 @@ for local_port in {8888..9000}; do
fi 2> /dev/null
done

echo
echo "Notebook running as PBS job ${jobid}"
echo
echo "Starting tunnel..."
if [ -n "$DEBUG" ]; then
echo "DEBUG:" $SSH -N -L "${local_port}:$jobhost:${remote_port}" "$LOGINNODE"
fi
$SSH -N -L "${local_port}:$jobhost:${remote_port}" "$LOGINNODE" &
tunnelid=$!

Expand All @@ -189,26 +219,12 @@ URL="http://localhost:${local_port}/lab?token=${token}"
cat << EOF
Start a Dask cluster in your notebook using the Dask panel of Jupyterlab, or by
running
running (needs kernel analysis3-20.01 or later):
---------------------------------------------------------------
import os
import dask.distributed
# Edit as desired
threads_per_worker = 1
try:
c # Already running
except NameError:
c = dask.distributed.Client(
n_workers=int(os.environ['PBS_NCPUS'])//threads_per_worker,
threads_per_worker=threads_per_worker,
memory_limit=f'{3.9*threads_per_worker}gb',
local_directory=os.path.join(os.environ['PBS_JOBFS'],
'dask-worker-space')
)
c
import climtas.nci
climtas.nci.GadiClient()
---------------------------------------------------------------
Opening ${URL}
Expand Down

0 comments on commit 978e705

Please sign in to comment.