From cec3cd44a78f2c81a07af31a863927e7f22837e3 Mon Sep 17 00:00:00 2001 From: Ole Nielsen Date: Tue, 22 Feb 2022 14:33:59 +1100 Subject: [PATCH] Refactored test_parallel_distribute_domain.py --- .../tests/run_parallel_distribute_domain.py | 170 ++++++++++ .../tests/test_parallel_distribute_domain.py | 293 ++++-------------- 2 files changed, 236 insertions(+), 227 deletions(-) create mode 100644 anuga/parallel/tests/run_parallel_distribute_domain.py diff --git a/anuga/parallel/tests/run_parallel_distribute_domain.py b/anuga/parallel/tests/run_parallel_distribute_domain.py new file mode 100644 index 000000000..3756ec1c3 --- /dev/null +++ b/anuga/parallel/tests/run_parallel_distribute_domain.py @@ -0,0 +1,170 @@ +#------------------------------------------------------------------------------ +# Import necessary modules +#------------------------------------------------------------------------------ + +from builtins import range +from builtins import object +import unittest +import os +import sys + +from anuga.utilities.system_tools import get_pathname_from_package + +import numpy as num + +from anuga.utilities import parallel_abstraction as pypar + +#------------------------------------------ +# anuga imports +#------------------------------------------ +import anuga + +from anuga.utilities.numerical_tools import ensure_numeric +from anuga.utilities.util_ext import double_precision +from anuga.utilities.norms import l1_norm, l2_norm, linf_norm + +from anuga import Domain +from anuga import Reflective_boundary +from anuga import Dirichlet_boundary +from anuga import Time_boundary +from anuga import Transmissive_boundary + +from anuga import rectangular_cross +from anuga import create_domain_from_file + + +from anuga.parallel import distribute, myid, numprocs, finalize + + +#-------------------------------------------------------------------------- +# Setup parameters +#-------------------------------------------------------------------------- + +mod_path = get_pathname_from_package('anuga.parallel') + +mesh_filename = os.path.join(mod_path,'data','merimbula_10785_1.tsh') +#mesh_filename = os.path.join(mod_path,'data','test-100.tsh') +yieldstep = 1 +finaltime = 20 +quantity = 'stage' +nprocs = 4 +verbose = False + +#-------------------------------------------------------------------------- +# Setup procedures +#-------------------------------------------------------------------------- +class Set_Stage(object): + """Set an initial condition with constant water height, for xself.x0)&(x 1: + if myid == 0 and verbose: print('DISTRIBUTING PARALLEL DOMAIN') + domain = distribute(domain) + +#------------------------------------------------------------------------------ +# Setup boundary conditions +# This must currently happen *after* domain has been distributed +#------------------------------------------------------------------------------ +domain.store = False +Br = Reflective_boundary(domain) # Solid reflective wall + +domain.set_boundary({'outflow' :Br, 'inflow' :Br, 'inner' :Br, 'exterior' :Br, 'open' :Br}) + +#------------------------------------------------------------------------------ +# Setup diagnostic arrays +#------------------------------------------------------------------------------ +l1list = [] +l2list = [] +linflist = [] +l1norm = num.zeros(3, float) +l2norm = num.zeros(3, float) +linfnorm = num.zeros(3, float) +recv_norm = num.zeros(3, float) + +#------------------------------------------------------------------------------ +# Evolution +#------------------------------------------------------------------------------ +if numprocs > 1: + if myid == 0 and verbose: print('PARALLEL EVOLVE') +else: + if verbose: print('SEQUENTIAL EVOLVE') + +for t in domain.evolve(yieldstep=yieldstep, finaltime=finaltime): + edges = domain.quantities[quantity].edge_values.take(num.flatnonzero(domain.tri_full_flag),axis=0) + l1norm[0] = l1_norm(edges[:,0]) + l1norm[1] = l1_norm(edges[:,1]) + l1norm[2] = l1_norm(edges[:,2]) + l2norm[0] = l2_norm(edges[:,0]) + l2norm[1] = l2_norm(edges[:,1]) + l2norm[2] = l2_norm(edges[:,2]) + linfnorm[0] = linf_norm(edges[:,0]) + linfnorm[1] = linf_norm(edges[:,1]) + linfnorm[2] = linf_norm(edges[:,2]) + if numprocs > 1: + l2norm[0] = pow(l2norm[0], 2) + l2norm[1] = pow(l2norm[1], 2) + l2norm[2] = pow(l2norm[2], 2) + if myid == 0: + #domain.write_time() + + #print edges[:,1] + for p in range(1, numprocs): + recv_norm = pypar.receive(p) + l1norm += recv_norm + recv_norm = pypar.receive(p) + l2norm += recv_norm + recv_norm = pypar.receive(p) + linfnorm[0] = max(linfnorm[0], recv_norm[0]) + linfnorm[1] = max(linfnorm[1], recv_norm[1]) + linfnorm[2] = max(linfnorm[2], recv_norm[2]) + + l2norm[0] = pow(l2norm[0], 0.5) + l2norm[1] = pow(l2norm[1], 0.5) + l2norm[2] = pow(l2norm[2], 0.5) + + l1list.append(l1norm) + l2list.append(l2norm) + linflist.append(linfnorm) + else: + pypar.send(l1norm, 0) + pypar.send(l2norm, 0) + pypar.send(linfnorm, 0) + else: + #domain.write_time() + l1list.append(l1norm) + l2list.append(l2norm) + linflist.append(linfnorm) + +# Store results in the appropriate file +if numprocs > 1: + fid = open('distribute_domain_parallel.txt', 'w') +else: + fid = open('distribute_domain_sequential.txt', 'w') + +#fid.write( +print(l1list, l2list, linflist) +print(len(l1list), len(l2list), len(linflist)) + +for i in range(len(l1list)): + fid.write('%f %f %f %f %f %f %f %f %f\n' % (l1list[i][0], l1list[i][1], l1list[i][2], + l2list[i][0], l2list[i][1], l2list[i][2], + linflist[i][0], linflist[i][1], linflist[i][2])) + +fid.close() diff --git a/anuga/parallel/tests/test_parallel_distribute_domain.py b/anuga/parallel/tests/test_parallel_distribute_domain.py index cac3ece0a..0bcfa8b17 100644 --- a/anuga/parallel/tests/test_parallel_distribute_domain.py +++ b/anuga/parallel/tests/test_parallel_distribute_domain.py @@ -1,245 +1,84 @@ - """Test a run of the sequential shallow water domain against a run of the parallel shallow water domain. WARNING: This assumes that the command to run jobs is mpiexec. Tested with MPICH and LAM (Ole) """ -from __future__ import print_function -from __future__ import division -#------------------------------------------------------------------------------ +# ------------------------ # Import necessary modules -#------------------------------------------------------------------------------ - -from builtins import range -from past.utils import old_div -from builtins import object -from future.utils import raise_ +# ------------------------ +import platform import unittest -import os -import sys - -from anuga.utilities.system_tools import get_pathname_from_package - import numpy as num +import os +import subprocess -from anuga.utilities import parallel_abstraction as pypar - -#------------------------------------------ -# anuga imports -#------------------------------------------ -import anuga - -from anuga.utilities.numerical_tools import ensure_numeric -from anuga.utilities.util_ext import double_precision -from anuga.utilities.norms import l1_norm, l2_norm, linf_norm - -from anuga import Domain -from anuga import Reflective_boundary -from anuga import Dirichlet_boundary -from anuga import Time_boundary -from anuga import Transmissive_boundary - -from anuga import rectangular_cross -from anuga import create_domain_from_file - - -from anuga.parallel import distribute, myid, numprocs, finalize - - -#-------------------------------------------------------------------------- -# Setup parameters -#-------------------------------------------------------------------------- - -mod_path = get_pathname_from_package('anuga.parallel') - -mesh_filename = os.path.join(mod_path,'data','merimbula_10785_1.tsh') -#mesh_filename = os.path.join(mod_path,'data','test-100.tsh') -yieldstep = 1 -finaltime = 20 -quantity = 'stage' -nprocs = 4 verbose = False -#-------------------------------------------------------------------------- -# Setup procedures -#-------------------------------------------------------------------------- -class Set_Stage(object): - """Set an initial condition with constant water height, for xself.x0)&(x 0: - # Verify that the quantity is being conserved across iterations. - assert_(abs(l1norm_seq[x][y] - l1norm_seq[x-1][y]) < tol) - assert_(abs(l2norm_seq[x][y] - l2norm_seq[x-1][y]) < tol) - assert_(abs(linfnorm_seq[x][y] - linfnorm_seq[x-1][y]) < tol) - assert_(abs(l1norm_par[x][y] - l1norm_par[x-1][y]) < tol) - assert_(abs(l2norm_par[x][y] - l2norm_par[x-1][y]) < tol) - assert_(abs(linfnorm_par[x][y] - linfnorm_par[x-1][y]) < tol) - - if verbose: print('Parallel test OK') - - - - finalize() + N = len(seq_values) + assert len(par_values) == N + + for i in range(N): + seq_nums = [float(x) for x in seq_values[i].split()] + par_nums = [float(x) for x in par_values[i].split()] + assert num.allclose(seq_nums, par_nums) + +if __name__ == "__main__": + runner = unittest.TextTestRunner() + suite = unittest.makeSuite(Test_parallel_distribute_domain, 'test') + runner.run(suite)