import glob import shutil import threading import os from autosubmit.config.log import Log from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day from earthdiagnostics import Utils class DataManager(object): def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name): self.institution = institution self.model = model self.expid = expid self.data_dir = datafolder self.frequency = frequency self.chunk_size = chunk_size self.experiment_name = experiment_name self.add_startdate = True self.add_name = True # noinspection PyPep8Naming def prepare_CMOR_files(self, startdates, members): # Check if cmorized and convert if not if not os.path.exists(os.path.join(self.data_dir, self.expid)): raise Exception('The experiment {0} is not CMORized. ' 'Please, CMORize it and launch again.'.format(self.expid)) for startdate in startdates: for member in members: member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc{0}'.format(member), 'outputs') Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member)) threads = list() numthreads = Utils.available_cpu_count() filepaths = glob.glob(os.path.join(member_path, '*.gz')) for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._unzip, args=([filepaths[numthread::numthreads]])) threads.append(t) t.start() for t in threads: t.join() filepaths = glob.glob(os.path.join(member_path, '*.tar')) for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._untar, args=(filepaths[numthread::numthreads], member_path)) threads.append(t) t.start() for t in threads: t.join() if self.experiment_name != self.model: bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model) for (dirpath, dirnames, filenames) in os.walk(bad_path, False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_output_'.format(self.model), '_{0}_{1}_'.format(self.model, self.experiment_name)) good = good.replace('/{0}/{0}'.format(self.model), '/{0}/{1}'.format(self.model, self.experiment_name)) Utils.move_file(filepath, good) os.rmdir(dirpath) good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name) for sdate in os.listdir(good_dir): for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name,sdate), '_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate)) if good != filepath: Log.info('Moving {0} to {1}'.format(filename, good)) Utils.move_file(filepath, good) @staticmethod def _unzip(files): for filepath in files: Log.debug('Unzipping {0}', filepath) Utils.execute_shell_command('gunzip {0}'.format(filepath)) @staticmethod def _untar(files, member_path): for filepath in files: Log.debug('Unpacking {0}', filepath) Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path)) os.remove(filepath) def get_files(self, startdate, member, chunk, domain, variables, grid=None): file_names = list() if domain == 'seaIce': domain_abreviattion = 'OI' else: domain_abreviattion = domain[0].upper() start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output', self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency, domain) chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') var_file = list() for var in variables: if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) var_file.append(os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_' '{7}-{8}.nc'.format(var, domain_abreviattion, self.frequency, self.model, self.experiment_name, startdate, member_plus, "{0:04}{1:02}".format(chunk_start.year, chunk_start.month), "{0:04}{1:02}".format(chunk_end.year, chunk_end.month)))) file_names.append(var_file) return file_names # # set -vx # # exp=a034 # dir=/group_workspaces/jasmin2/primavera1/WP2/CPL/EC-EARTH3.1/HIST_T511ORCA025 # res=$( echo $dir | cut -f3 -d"_" ) # model=$(echo $dir | cut -f7 -d"/" ) # # cd /esnas/exp/ecearth/$exp/ # # for sd in [1-2]* # do # cd /esnas/exp/ecearth/$exp/$sd # for fc in * # do # cd /esnas/exp/ecearth/$exp/$sd/$fc/outputs # realization=$(( $(echo $fc | cut -c 3-) + 1 )) # outdir=/esnas/exp/ecearth/$exp/$sd/$fc/outputs/primavera # mkdir -p $outdir # for tarfile in MMO*tar # do # lt=$(echo $tarfile | cut -f 5-6 -d"_" | cut -f1 -d"." ) # tar kxvf $tarfile -C $outdir # cd $outdir # for file in ${exp}_1m_*grid*gz ${exp}_*icemod*gz # do # echo $file # gunzip $file # for var in $( cdo showvar ${file%???} ) # do # varnew="" # case $var in # votemper) varnew=thetao; realm=Omon ; modeling_realm="ocean" ; units="C";; # vosaline) varnew=so; realm=Omon ; modeling_realm="ocean" ;; # sosstsst) varnew=tos; realm=Omon ; modeling_realm="ocean" ; units="C";; # sosaline) varnew=sos; realm=Omon ; modeling_realm="ocean" ;; # sossheig) varnew=zos; realm=Omon ; modeling_realm="ocean" ;; # vozocrtx) varnew=uo; realm=Omon ; modeling_realm="ocean" ;; # vomecrty) varnew=vo; realm=Omon ; modeling_realm="ocean" ;; # iiceconc) varnew=sic; realm=OImon ; modeling_realm="seaIce" ;; # iicethic) varnew=sit; realm=OImon ; modeling_realm="seaIce" ;; # isnowthi) varnew=snd; realm=OImon ; modeling_realm="seaIce" ;; # iicesurt) varnew=tsice; realm=OImon ; modeling_realm="seaIce" ; units="C";; # esac # outfile=$(echo ${varnew}_${realm}_${model}_${res}_BSC_${realization}_${lt}.nc ) # if [[ ! -z $varnew ]] && [[ ! -f ${outdir}/${outfile} ]]; then #check if var belongs to list # cdo selvar,$var ${file%???} ${outdir}/$outfile # ncrename -v .time_counter,time -d time_counter,time -v nav_lon,longitude -v nav_lat,latitude -v .${var},${varnew} ${outdir}/${outfile} # for att in institution institute_id experiment_id model_id contact experiment frequency creation_date project_id modeling_realm realization # do # case $att in # institution) att_val="Barcelona Supercomputing Center - Centro Nacional de Supercomputacion" ;; # institute_id) att_val="BSC-CNS" ;; # experiment_id) att_val=$(echo $dir | cut -f7 -d"/") ;; # model_id) att_val="$model" ;; # contact) att_val="pierre-antoine.bretonniere@bsc.es" ;; # experiment) att_val=$(echo $dir | cut -f7 -d"/") ;; # frequency) att_val="mon" ;; # creation_date) att_val="$(echo $(date --rfc-3339=seconds | cut -f1 -d"+" | sed -e 's/ /T/')Z)" ;; # project_id) att_val="PRIMAVERA" ;; # modeling_realm) att_val="$modeling_realm" ;; # realization) att_val="$realization" ;; # esac # ncatted -O -h -a ${att},global,c,c,"${att_val}" ${outdir}/$outfile # if [[ "$units" == "C" ]]; then # cdo addc,273.25 ${outdir}/$outfile ${outdir}/${outfile}.tmp # mv ${outdir}/${outfile}.tmp ${outdir}/${outfile} # ncatted -O -h -a units,${varnew},m,c,"K" ${outdir}/$outfile # fi #temp from C to K # done #attributes # fi # done #var # rm ${file%???} # done #file # cd /esnas/exp/ecearth/$exp/$sd/$fc/outputs # done #tarfile # scp -i ~/.ssh/id_rsa_jasmin ${outdir}/*Omon*nc pabretonniere@jasmin-xfer1.ceda.ac.uk:$dir/. # scp -i ~/.ssh/id_rsa_jasmin ${outdir}/*OImon*nc pabretonniere@jasmin-xfer1.ceda.ac.uk:$dir/. # done #fc # done #sd