datamanager.py 10.1 KB
Newer Older
import os
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day

from earthdiagnostics import Utils


class DataManager(object):

    def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name):
        self.institution = institution
        self.model = model
        self.expid = expid
        self.data_dir = datafolder
        self.frequency = frequency
        self.chunk_size = chunk_size
        self.experiment_name = experiment_name
        self.add_startdate = True
        self.add_name = True
    # noinspection PyPep8Naming
    def prepare_CMOR_files(self, startdates, members):
        # Check if cmorized and convert if not
        if not os.path.exists(os.path.join(self.data_dir, self.expid)):
            raise Exception('The experiment {0} is not CMORized. '
                            'Please, CMORize it and launch again.'.format(self.expid))

        for startdate in startdates:
            for member in members:
                member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc{0}'.format(member), 'outputs')
                Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member))

                threads = list()
                numthreads = Utils.available_cpu_count()
                filepaths = glob.glob(os.path.join(member_path, '*.gz'))
                for numthread in range(0, numthreads):
                    t = threading.Thread(target=DataManager._unzip,
                                         args=([filepaths[numthread::numthreads]]))
                    threads.append(t)
                    t.start()

                for t in threads:
                    t.join()

                filepaths = glob.glob(os.path.join(member_path, '*.tar'))
                for numthread in range(0, numthreads):
                    t = threading.Thread(target=DataManager._untar,
                                         args=(filepaths[numthread::numthreads], member_path))
                    threads.append(t)
                    t.start()

                for t in threads:
                    t.join()

                if self.experiment_name != self.model:
                    bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model)
                    for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
                        for filename in filenames:
                            filepath = os.path.join(dirpath, filename)
                            good = filepath.replace('_{0}_output_'.format(self.model),
                                                    '_{0}_{1}_'.format(self.model, self.experiment_name))

                            good = good.replace('/{0}/{0}'.format(self.model),
                                                '/{0}/{1}'.format(self.model,
                                                                  self.experiment_name))

                            Utils.move_file(filepath, good)
                        os.rmdir(dirpath)

                good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name)
                for sdate in os.listdir(good_dir):
                    for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False):
                        for filename in filenames:
                            filepath = os.path.join(dirpath, filename)
                            good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name,sdate),
                                                    '_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate))
                            if good != filepath:
                                Log.info('Moving {0} to {1}'.format(filename, good))
                                Utils.move_file(filepath, good)

    @staticmethod
    def _unzip(files):
        for filepath in files:
            Log.debug('Unzipping {0}', filepath)
            Utils.execute_shell_command('gunzip {0}'.format(filepath))

    @staticmethod
    def _untar(files, member_path):
        for filepath in files:
            Log.debug('Unpacking {0}', filepath)
            Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path))
            os.remove(filepath)

    def get_files(self, startdate, member, chunk, domain, variables, grid=None):

        file_names = list()

        if domain == 'seaIce':
            domain_abreviattion = 'OI'
        else:
            domain_abreviattion = domain[0].upper()

        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
                                   self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency,
                                   domain)

        chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
        chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
        chunk_end = previous_day(chunk_end, 'standard')

        var_file = list()
        for var in variables:
            if grid:
                var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
            else:
                var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

            var_file.append(os.path.join(var_path,
                                         '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
                                         '{7}-{8}.nc'.format(var, domain_abreviattion,
                                                             self.frequency, self.model, self.experiment_name,
                                                             startdate, member_plus,
                                                             "{0:04}{1:02}".format(chunk_start.year,
                                                                                   chunk_start.month),
                                                             "{0:04}{1:02}".format(chunk_end.year,
                                                                                   chunk_end.month))))
        file_names.append(var_file)

        return file_names


#
# set -vx
#
# exp=a034
# dir=/group_workspaces/jasmin2/primavera1/WP2/CPL/EC-EARTH3.1/HIST_T511ORCA025
# res=$( echo $dir | cut -f3 -d"_" )
# model=$(echo $dir | cut -f7 -d"/" )
#
# cd /esnas/exp/ecearth/$exp/
#
# for sd in [1-2]*
# do
#   cd /esnas/exp/ecearth/$exp/$sd
#   for fc in *
#   do
#     cd /esnas/exp/ecearth/$exp/$sd/$fc/outputs
#     realization=$(( $(echo $fc | cut -c 3-) + 1 ))
#     outdir=/esnas/exp/ecearth/$exp/$sd/$fc/outputs/primavera
#     mkdir -p $outdir
#     for tarfile in MMO*tar
#     do
#       lt=$(echo $tarfile | cut -f 5-6 -d"_" | cut -f1 -d"." )
#       tar kxvf $tarfile -C $outdir
#       cd $outdir
#       for file in ${exp}_1m_*grid*gz ${exp}_*icemod*gz
#       do
#         echo $file
#         gunzip $file
#         for var in $( cdo showvar ${file%???} )
#         do
#           varnew=""
#           case $var in
#             votemper) varnew=thetao; realm=Omon ; modeling_realm="ocean" ; units="C";;
#             vosaline) varnew=so; realm=Omon ; modeling_realm="ocean" ;;
#             sosstsst) varnew=tos; realm=Omon ; modeling_realm="ocean" ; units="C";;
#             sosaline) varnew=sos; realm=Omon ; modeling_realm="ocean" ;;
#             sossheig) varnew=zos; realm=Omon ; modeling_realm="ocean" ;;
#             vozocrtx) varnew=uo; realm=Omon ; modeling_realm="ocean" ;;
#             vomecrty) varnew=vo; realm=Omon ; modeling_realm="ocean" ;;
#             iiceconc) varnew=sic; realm=OImon ; modeling_realm="seaIce" ;;
#             iicethic) varnew=sit; realm=OImon ; modeling_realm="seaIce" ;;
#             isnowthi) varnew=snd; realm=OImon ; modeling_realm="seaIce" ;;
#             iicesurt) varnew=tsice; realm=OImon ; modeling_realm="seaIce" ; units="C";;
#           esac
#           outfile=$(echo ${varnew}_${realm}_${model}_${res}_BSC_${realization}_${lt}.nc )
#           if [[ ! -z $varnew ]] && [[ ! -f ${outdir}/${outfile} ]]; then #check if var belongs to list
#             cdo selvar,$var ${file%???} ${outdir}/$outfile
#             ncrename -v .time_counter,time -d time_counter,time -v nav_lon,longitude -v nav_lat,latitude -v .${var},${varnew} ${outdir}/${outfile}
#             for att in institution institute_id experiment_id model_id contact experiment frequency creation_date project_id modeling_realm realization
#             do
#               case $att in
#                 institution) att_val="Barcelona Supercomputing Center - Centro Nacional de Supercomputacion" ;;
#                 institute_id) att_val="BSC-CNS" ;;
#                 experiment_id) att_val=$(echo $dir | cut -f7 -d"/") ;;
#                 model_id) att_val="$model" ;;
#                 contact) att_val="pierre-antoine.bretonniere@bsc.es" ;;
#                 experiment) att_val=$(echo $dir | cut -f7 -d"/") ;;
#                 frequency) att_val="mon" ;;
#                 creation_date) att_val="$(echo $(date --rfc-3339=seconds | cut -f1 -d"+" | sed -e 's/ /T/')Z)"  ;;
#                 project_id) att_val="PRIMAVERA" ;;
#                 modeling_realm) att_val="$modeling_realm" ;;
#                 realization) att_val="$realization" ;;
#               esac
#               ncatted -O -h -a ${att},global,c,c,"${att_val}" ${outdir}/$outfile
#               if [[ "$units" == "C" ]]; then
#                 cdo addc,273.25 ${outdir}/$outfile ${outdir}/${outfile}.tmp
#                 mv ${outdir}/${outfile}.tmp ${outdir}/${outfile}
#                 ncatted -O -h -a units,${varnew},m,c,"K" ${outdir}/$outfile
#               fi #temp from C to K
#             done #attributes
#           fi
#         done #var
#         rm ${file%???}
#       done #file
#       cd /esnas/exp/ecearth/$exp/$sd/$fc/outputs
#     done #tarfile
#     scp -i ~/.ssh/id_rsa_jasmin ${outdir}/*Omon*nc pabretonniere@jasmin-xfer1.ceda.ac.uk:$dir/.
#     scp -i ~/.ssh/id_rsa_jasmin ${outdir}/*OImon*nc pabretonniere@jasmin-xfer1.ceda.ac.uk:$dir/.
#   done #fc
# done #sd