datamanager.py 14.8 KB
Newer Older
import os
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.utils import Utils, TempFile


class DataManager(object):

    def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name, num_chunks,
                 calendar='standard'):
        self.institution = institution
        self.model = model
        self.expid = expid
        self.data_dir = datafolder
        self.frequency = frequency
        self.chunk_size = chunk_size
        self.experiment_name = experiment_name
        self.add_startdate = True
        self.add_name = True
        self.num_chunks = num_chunks
        self.calendar = calendar
    # noinspection PyPep8Naming
    def prepare_CMOR_files(self, startdates, members):
        # Check if cmorized and convert if not
        if not os.path.exists(os.path.join(self.data_dir, self.expid)):
            raise Exception('The experiment {0} is not CMORized. '
                            'Please, CMORize it and launch again.'.format(self.expid))
        for startdate in startdates:
            for member in members:
                member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc{0}'.format(member), 'outputs')
                Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member))

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                filepaths = glob.glob(os.path.join(member_path, '*.gz'))

                if len(filepaths) == 0:
                    continue

                threads = list()
                numthreads = Utils.available_cpu_count()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

                for numthread in range(0, numthreads):
                    t = threading.Thread(target=DataManager._unzip,
                                         args=([filepaths[numthread::numthreads]]))
                    threads.append(t)
                    t.start()

                for t in threads:
                    t.join()

                filepaths = glob.glob(os.path.join(member_path, '*.tar'))
                for numthread in range(0, numthreads):
                    t = threading.Thread(target=DataManager._untar,
                                         args=(filepaths[numthread::numthreads], member_path))
                    threads.append(t)
                    t.start()

                for t in threads:
                    t.join()

                if self.experiment_name != self.model:
                    bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model)
                    for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
                        for filename in filenames:
                            filepath = os.path.join(dirpath, filename)
                            good = filepath.replace('_{0}_output_'.format(self.model),
                                                    '_{0}_{1}_'.format(self.model, self.experiment_name))

                            good = good.replace('/{0}/{0}'.format(self.model),
                                                '/{0}/{1}'.format(self.model,
                                                                  self.experiment_name))

                            Utils.move_file(filepath, good)
                        os.rmdir(dirpath)

                good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name)
                for sdate in os.listdir(good_dir):
                    for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False):
                        for filename in filenames:
                            filepath = os.path.join(dirpath, filename)
                            good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name, sdate),
                                                    '_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate))
                            if good != filepath:
                                Log.info('Moving {0} to {1}'.format(filename, good))
                                Utils.move_file(filepath, good)

    @staticmethod
    def _unzip(files):
        for filepath in files:
            Log.debug('Unzipping {0}', filepath)
            Utils.execute_shell_command('gunzip {0}'.format(filepath))

    @staticmethod
    def _untar(files, member_path):
        for filepath in files:
            Log.debug('Unpacking {0}', filepath)
            Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path))
            os.remove(filepath)

    def get_files(self, startdate, member, chunk, domain, variables, grid=None):

        file_names = list()

        if domain == 'seaIce':
            domain_abreviattion = 'OI'
        else:
            domain_abreviattion = domain[0].upper()

        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
                                   self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency,
                                   domain)

        chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
        chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
        chunk_end = previous_day(chunk_end, 'standard')

        var_file = list()
        for var in variables:
            if grid:
                var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
            else:
                var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

            var_file.append(os.path.join(var_path,
                                         '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
                                         '{7}-{8}.nc'.format(var, domain_abreviattion,
                                                             self.frequency, self.model, self.experiment_name,
                                                             startdate, member_plus,
                                                             "{0:04}{1:02}".format(chunk_start.year,
                                                                                   chunk_start.month),
                                                             "{0:04}{1:02}".format(chunk_end.year,
                                                                                   chunk_end.month))))
        file_names.append(var_file)

        return file_names
    def get_file(self, domain, var, startdate, member, chunk,  grid=None, box=None, frequency=None):
        if domain == 'seaIce':
            domain_abreviattion = 'OI'
        else:
            domain_abreviattion = domain[0].upper()

        if not frequency:
            frequency = self.frequency

        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
                                   self.institution, self.model, self.experiment_name, 'S' + startdate, frequency,
                                   domain)

        chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
        chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
        chunk_end = previous_day(chunk_end, 'standard')

        if box:
            var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()

        if grid:
            var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
        else:
            var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

        filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
                                          '{7}-{8}.nc'.format(var, domain_abreviattion, frequency, self.model,
                                                              self.experiment_name, startdate, member_plus,
                                                              "{0:04}{1:02}".format(chunk_start.year,
                                                                                    chunk_start.month),
                                                              "{0:04}{1:02}".format(chunk_end.year,
                                                                                    chunk_end.month)))

        temp_path = TempFile.get()
        shutil.copyfile(filepath, temp_path)
        return temp_path

    def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
                  rename_var=None, frequency=None, year=None):

        Utils.convert2netcdf4(filetosend)

        if domain == 'seaIce':
            domain_abreviattion = 'OI'
        else:
            domain_abreviattion = domain[0].upper()

        if box:
            var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()

        if rename_var:
            Utils.rename_variable(filetosend, rename_var, var)

        if not frequency:
            frequency = self.frequency

        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output',
                                   self.institution, self.model, self.experiment_name, 'S' + startdate, frequency,
        if chunk is not None:
            chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
            chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
            chunk_end = previous_day(chunk_end, 'standard')

            time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
                                                            chunk_end.month)

        elif year is not None:
            if frequency is not 'yr':
                raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
            time_bound = str(year)

        else:
            raise ValueError('Chunk and year can not be None at the same time')

        if grid:
            var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
        else:
            var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

        filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_'
                                          '{7}.nc'.format(var, domain_abreviattion, frequency, self.model,
                                                          self.experiment_name, startdate, member_plus, time_bound))
        if region:
            if not os.path.exists(filepath):

                handler = Utils.openCdf(filetosend)
                handler.createDimension('region')
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                var_region = handler.createVariable('region', str, 'region', fill_value='')
                var_region[0] = region

                original_var = handler.variables[var]
                new_var = handler.createVariable('new_var', original_var.datatype,
                                                 original_var.dimensions + ('region',))
                new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
                value = original_var[:]
                new_var[..., 0] = value
                handler.close()

                Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
                Utils.rename_variable(filetosend, 'new_var', var)
            else:
                temp = TempFile.get()
                shutil.copyfile(filepath, temp)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
                handler = Utils.openCdf(temp)
                handler_send = Utils.openCdf(filetosend)
                value = handler_send.variables[var][:]
                var_region = handler.variables['region']
                basin_index = np.where(var_region[:] == region)
                if len(basin_index[0]) == 0:
                    var_region[var_region.shape[0]] = region
                    basin_index = var_region.shape[0] - 1

                else:
                    basin_index = basin_index[0][0]

                handler.variables[var][..., basin_index] = value
                handler.close()
                handler_send.close()
                Utils.move_file(temp, filetosend)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region')
    def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
        chunk_files = list()
        for chunk in self.get_year_chunks(startdate, year):
            chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))

        if len(chunk_files) > 1:
            temp = TempFile.get()
            Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
            for chunk_file in chunk_files:
                os.remove(chunk_file)
        else:
            temp = chunk_files[0]
        temp2 = TempFile.get()
        handler = Utils.openCdf(temp)
        time = Utils.get_datetime_from_netcdf(handler)
        handler.close()
        start = None
        end = None
        for x in range(0, len(time)):
            date = time[x]
            if date.year == year:
                if date.month == 1:
                    start = x
                elif date.month == 12:
                    end = x

        Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end))
        os.remove(temp)
        return temp2

    def get_year_chunks(self, startdate, year):
        date = parse_date(startdate)
        chunks = list()
        for chunk in range(1, self.num_chunks+1):
            chunk_start = chunk_start_date(date, chunk, self.chunk_size, 'month', self.calendar)
            if chunk_start.year > year:
                break
            elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
                                                            self.calendar).year == year:
                chunks.append(chunk)

        return chunks

    def get_full_years(self, startdate):
        chunks_per_year = 12 / self.chunk_size
        date = parse_date(startdate)
        first_january = 0
        first_year = date.year
        if date.month != 1:
            month = date.month
            first_year += 1
            while month + self.chunk_size < 12:
                month += self.chunk_size
                first_january += 1

        years = list()
        for chunk in range(first_january, self.num_chunks - chunks_per_year, chunks_per_year):
            years.append(first_year)
            first_year += 1
        return years