datamanager.py 18.5 KB
Newer Older
# import threading
#
# import netCDF4
import os
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day

from earthdiagnostics import Utils, TempFile


class DataManager(object):

    def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name, num_chunks,
                 calendar='standard'):
        self.institution = institution
        self.model = model
        self.expid = expid
        self.data_dir = datafolder
        self.frequency = frequency
        self.chunk_size = chunk_size
        self.experiment_name = experiment_name
        self.add_startdate = True
        self.add_name = True
        self.num_chunks = num_chunks
        self.calendar = calendar
    # noinspection PyPep8Naming
    def prepare_CMOR_files(self, startdates, members):
        # Check if cmorized and convert if not
        if not os.path.exists(os.path.join(self.data_dir, self.expid)):
            raise Exception('The experiment {0} is not CMORized. '
                            'Please, CMORize it and launch again.'.format(self.expid))
        # for startdate in startdates:
        #     for member in members:
        #         member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc{0}'.format(member), 'outputs')
        #         Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member))
        #
        #         threads = list()
        #         numthreads = Utils.available_cpu_count()
        #         filepaths = glob.glob(os.path.join(member_path, '*.gz'))
        #         for numthread in range(0, numthreads):
        #             t = threading.Thread(target=DataManager._unzip,
        #                                  args=([filepaths[numthread::numthreads]]))
        #             threads.append(t)
        #             t.start()
        #
        #         for t in threads:
        #             t.join()
        #
        #         filepaths = glob.glob(os.path.join(member_path, '*.tar'))
        #         for numthread in range(0, numthreads):
        #             t = threading.Thread(target=DataManager._untar,
        #                                  args=(filepaths[numthread::numthreads], member_path))
        #             threads.append(t)
        #             t.start()
        #
        #         for t in threads:
        #             t.join()
        #
        #         if self.experiment_name != self.model:
        #             bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model)
        #             for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
        #                 for filename in filenames:
        #                     filepath = os.path.join(dirpath, filename)
        #                     good = filepath.replace('_{0}_output_'.format(self.model),
        #                                             '_{0}_{1}_'.format(self.model, self.experiment_name))
        #
        #                     good = good.replace('/{0}/{0}'.format(self.model),
        #                                         '/{0}/{1}'.format(self.model,
        #                                                           self.experiment_name))
        #
        #                     Utils.move_file(filepath, good)
        #                 os.rmdir(dirpath)
        #
        #         good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name)
        #         for sdate in os.listdir(good_dir):
        #             for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False):
        #                 for filename in filenames:
        #                     filepath = os.path.join(dirpath, filename)
        #                     good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name,sdate),
        #                                             '_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate))
        #                     if good != filepath:
        #                         Log.info('Moving {0} to {1}'.format(filename, good))
        #                         Utils.move_file(filepath, good)

    @staticmethod
    def _unzip(files):
        for filepath in files:
            Log.debug('Unzipping {0}', filepath)
            Utils.execute_shell_command('gunzip {0}'.format(filepath))

    @staticmethod
    def _untar(files, member_path):
        for filepath in files:
            Log.debug('Unpacking {0}', filepath)
            Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path))
            os.remove(filepath)

    def get_files(self, startdate, member, chunk, domain, variables, grid=None):

        file_names = list()

        if domain == 'seaIce':
            domain_abreviattion = 'OI'
        else:
            domain_abreviattion = domain[0].upper()

        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
                                   self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency,
                                   domain)

        chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
        chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
        chunk_end = previous_day(chunk_end, 'standard')

        var_file = list()
        for var in variables:
            if grid:
                var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
            else:
                var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

            var_file.append(os.path.join(var_path,
                                         '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
                                         '{7}-{8}.nc'.format(var, domain_abreviattion,
                                                             self.frequency, self.model, self.experiment_name,
                                                             startdate, member_plus,
                                                             "{0:04}{1:02}".format(chunk_start.year,
                                                                                   chunk_start.month),
                                                             "{0:04}{1:02}".format(chunk_end.year,
                                                                                   chunk_end.month))))
        file_names.append(var_file)

        return file_names
    def get_file(self, domain, var, startdate, member, chunk,  grid=None, box=None, frequency=None):
        if domain == 'seaIce':
            domain_abreviattion = 'OI'
        else:
            domain_abreviattion = domain[0].upper()

        if not frequency:
            frequency = self.frequency

        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
                                   self.institution, self.model, self.experiment_name, 'S' + startdate, frequency,
                                   domain)

        chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
        chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
        chunk_end = previous_day(chunk_end, 'standard')

        if box:
            var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()

        if grid:
            var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
        else:
            var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

        filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
                                          '{7}-{8}.nc'.format(var, domain_abreviattion, frequency, self.model,
                                                              self.experiment_name, startdate, member_plus,
                                                              "{0:04}{1:02}".format(chunk_start.year,
                                                                                    chunk_start.month),
                                                              "{0:04}{1:02}".format(chunk_end.year,
                                                                                    chunk_end.month)))

        temp_path = TempFile.get()
        shutil.copyfile(filepath, temp_path)
        return temp_path

    def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
                  rename_var=None, frequency=None, year=None):

        Utils.convert2netcdf4(filetosend)

        if domain == 'seaIce':
            domain_abreviattion = 'OI'
        else:
            domain_abreviattion = domain[0].upper()

        if box:
            var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()

        if rename_var:
            Utils.rename_variable(filetosend, rename_var, var)

        if not frequency:
            frequency = self.frequency

        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
                                   self.institution, self.model, self.experiment_name, 'S' + startdate, frequency,
        if chunk is not None:
            chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
            chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
            chunk_end = previous_day(chunk_end, 'standard')

            time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
                                                            chunk_end.month)

        elif year is not None:
            if frequency is not 'yr':
                raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
            time_bound = str(year)

        else:
            raise ValueError('Chunk and year can not be None at the same time')

        if grid:
            var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
        else:
            var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

        filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
                                          '{7}.nc'.format(var, domain_abreviattion, frequency, self.model,
                                                          self.experiment_name, startdate, member_plus, time_bound))
        if region:
            if not os.path.exists(filepath):

                handler = Utils.openCdf(filetosend)
                handler.createDimension('region')
                var_region = handler.createVariable('region', str, 'region')
                var_region[0] = region

                original_var = handler.variables[var]
                new_var = handler.createVariable('new_var', original_var.datatype,  original_var.dimensions + ('region',))
                new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
                value = original_var[:]
                new_var[..., 0] = value
                handler.close()

                Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
                Utils.rename_variable(filetosend, 'new_var', var)
            else:
                temp = TempFile.get()
                shutil.copyfile(filepath, temp)
                handler = Utils.openCdf(temp)
                handler_send = Utils.openCdf(filetosend)
                value = handler_send.variables[var][:]
                var_region = handler.variables['region']
                basin_index = np.where(var_region[:] == region)
                if len(basin_index[0]) == 0:
                    var_region[var_region.shape[0]] = region
                    basin_index = var_region.shape[0] - 1

                else:
                    basin_index = basin_index[0][0]
                try:
                    handler.variables[var][..., basin_index] = value
                except Exception:
                    error = True
                handler.close()
                handler_send.close()
                Utils.move_file(temp, filetosend)

    def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
        chunk_files = list()
        for chunk in self.get_year_chunks(startdate, year):
            chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))

        if len(chunk_files) > 1:
            temp = TempFile.get()
            Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
            for chunk_file in chunk_files:
                os.remove(chunk_file)
        else:
            temp = chunk_files[0]
        temp2 = TempFile.get()
        handler = Utils.openCdf(temp)
        time = Utils.get_datetime_from_netcdf(handler)
        handler.close()
        for x in range(0, len(time)):
            date = time[x]
            if date.year == year:
                if date.month == 1:
                    start = x
                elif date.month == 12:
                    end = x

        Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end))
        os.remove(temp)
        return temp2

    def get_year_chunks(self, startdate, year):
        date = parse_date(startdate)
        chunks = list()
        for chunk in range(1, self.num_chunks+1):
            chunk_start = chunk_start_date(date, chunk, self.chunk_size, 'month', self.calendar)
            if chunk_start.year > year:
                break
            elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
                                                            self.calendar).year == year:
               chunks.append(chunk)

        return chunks

    def get_full_years(self, startdate):
        chunks_per_year = 12 / self.chunk_size
        date = parse_date(startdate)
        first_january = 0
        first_year = date.year
        if date.month != 1:
            month = date.month
            first_year += 1
            while month + self.chunk_size < 12:
                month += self.chunk_size
                first_january += 1

        years = list()
        for chunk in range(first_january, self.num_chunks - chunks_per_year, chunks_per_year):
            years.append(first_year)
            first_year += 1
        return years


#
# set -vx
#
# exp=a034
# dir=/group_workspaces/jasmin2/primavera1/WP2/CPL/EC-EARTH3.1/HIST_T511ORCA025
# res=$( echo $dir | cut -f3 -d"_" )
# model=$(echo $dir | cut -f7 -d"/" )
#
# cd /esnas/exp/ecearth/$exp/
#
# for sd in [1-2]*
# do
#   cd /esnas/exp/ecearth/$exp/$sd
#   for fc in *
#   do
#     cd /esnas/exp/ecearth/$exp/$sd/$fc/outputs
#     realization=$(( $(echo $fc | cut -c 3-) + 1 ))
#     outdir=/esnas/exp/ecearth/$exp/$sd/$fc/outputs/primavera
#     mkdir -p $outdir
#     for tarfile in MMO*tar
#     do
#       lt=$(echo $tarfile | cut -f 5-6 -d"_" | cut -f1 -d"." )
#       tar kxvf $tarfile -C $outdir
#       cd $outdir
#       for file in ${exp}_1m_*grid*gz ${exp}_*icemod*gz
#       do
#         echo $file
#         gunzip $file
#         for var in $( cdo showvar ${file%???} )
#         do
#           varnew=""
#           case $var in
#             votemper) varnew=thetao; realm=Omon ; modeling_realm="ocean" ; units="C";;
#             vosaline) varnew=so; realm=Omon ; modeling_realm="ocean" ;;
#             sosstsst) varnew=tos; realm=Omon ; modeling_realm="ocean" ; units="C";;
#             sosaline) varnew=sos; realm=Omon ; modeling_realm="ocean" ;;
#             sossheig) varnew=zos; realm=Omon ; modeling_realm="ocean" ;;
#             vozocrtx) varnew=uo; realm=Omon ; modeling_realm="ocean" ;;
#             vomecrty) varnew=vo; realm=Omon ; modeling_realm="ocean" ;;
#             iiceconc) varnew=sic; realm=OImon ; modeling_realm="seaIce" ;;
#             iicethic) varnew=sit; realm=OImon ; modeling_realm="seaIce" ;;
#             isnowthi) varnew=snd; realm=OImon ; modeling_realm="seaIce" ;;
#             iicesurt) varnew=tsice; realm=OImon ; modeling_realm="seaIce" ; units="C";;
#           esac
#           outfile=$(echo ${varnew}_${realm}_${model}_${res}_BSC_${realization}_${lt}.nc )
#           if [[ ! -z $varnew ]] && [[ ! -f ${outdir}/${outfile} ]]; then #check if var belongs to list
#             cdo selvar,$var ${file%???} ${outdir}/$outfile
#             ncrename -v .time_counter,time -d time_counter,time -v nav_lon,longitude -v nav_lat,latitude -v .${var},${varnew} ${outdir}/${outfile}
#             for att in institution institute_id experiment_id model_id contact experiment frequency creation_date project_id modeling_realm realization
#             do
#               case $att in
#                 institution) att_val="Barcelona Supercomputing Center - Centro Nacional de Supercomputacion" ;;
#                 institute_id) att_val="BSC-CNS" ;;
#                 experiment_id) att_val=$(echo $dir | cut -f7 -d"/") ;;
#                 model_id) att_val="$model" ;;
#                 contact) att_val="pierre-antoine.bretonniere@bsc.es" ;;
#                 experiment) att_val=$(echo $dir | cut -f7 -d"/") ;;
#                 frequency) att_val="mon" ;;
#                 creation_date) att_val="$(echo $(date --rfc-3339=seconds | cut -f1 -d"+" | sed -e 's/ /T/')Z)"  ;;
#                 project_id) att_val="PRIMAVERA" ;;
#                 modeling_realm) att_val="$modeling_realm" ;;
#                 realization) att_val="$realization" ;;
#               esac
#               ncatted -O -h -a ${att},global,c,c,"${att_val}" ${outdir}/$outfile
#               if [[ "$units" == "C" ]]; then
#                 cdo addc,273.25 ${outdir}/$outfile ${outdir}/${outfile}.tmp
#                 mv ${outdir}/${outfile}.tmp ${outdir}/${outfile}
#                 ncatted -O -h -a units,${varnew},m,c,"K" ${outdir}/$outfile
#               fi #temp from C to K
#             done #attributes
#           fi
#         done #var
#         rm ${file%???}
#       done #file
#       cd /esnas/exp/ecearth/$exp/$sd/$fc/outputs
#     done #tarfile
#     scp -i ~/.ssh/id_rsa_jasmin ${outdir}/*Omon*nc pabretonniere@jasmin-xfer1.ceda.ac.uk:$dir/.
#     scp -i ~/.ssh/id_rsa_jasmin ${outdir}/*OImon*nc pabretonniere@jasmin-xfer1.ceda.ac.uk:$dir/.
#   done #fc
# done #sd