diff --git a/hermesv3_gr/config/config.py b/hermesv3_gr/config/config.py index c821d0c967e452976b6d10ea9378e5f933a14d52..b47de4269dbbd36dddb70898b07658d376b5143b 100755 --- a/hermesv3_gr/config/config.py +++ b/hermesv3_gr/config/config.py @@ -76,7 +76,7 @@ class Config(ArgParser): help='Indicates if you want to start from scratch removing the auxiliary files already created.') p.add_argument('--output_model', required=True, help='Name of the output model.', - choices=['MONARCH', 'CMAQ', 'WRF_CHEM']) + choices=['MONARCH', 'CMAQ', 'WRF_CHEM', 'MOCAGE']) p.add_argument('--compression_level', required=False, type=int, choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], help='Compression level of the NetCDF output (0 for no compressed output).', default=4) diff --git a/hermesv3_gr/modules/writing/writer.py b/hermesv3_gr/modules/writing/writer.py index 69f3f314712562d8a717e626e6b1f1f0beca2889..bda6cf4cb8ea88254a7019a2b262fe6846b45dc3 100755 --- a/hermesv3_gr/modules/writing/writer.py +++ b/hermesv3_gr/modules/writing/writer.py @@ -363,6 +363,7 @@ class Writer(object): from hermesv3_gr.modules.writing.writer_cmaq import WriterCmaq from hermesv3_gr.modules.writing.writer_monarch import WriterMonarch from hermesv3_gr.modules.writing.writer_wrf_chem import WriterWrfChem + from hermesv3_gr.modules.writing.writer_mocage import WriterMocage settings.write_log('Selecting writing output type for {0}.'.format(output_model)) if output_model.lower() == 'monarch': @@ -371,11 +372,15 @@ class Writer(object): return WriterCmaq(path, grid, levels, date, hours, global_attributes_path, compression_level, parallel) elif output_model.lower() == 'wrf_chem': return WriterWrfChem(path, grid, levels, date, hours, global_attributes_path, compression_level, parallel) + elif output_model.lower() == 'mocage': + if grid.grid_type != 'regular': + raise ValueError("MOCAGE writer only accepts regular lat-lon grids") + return WriterMocage(path, grid, levels, date, hours, global_attributes_path, compression_level, parallel) else: settings.write_log('ERROR: Check the .err file to get more info.') if settings.rank == 0: raise AttributeError("The desired '{0}' output model is not available. ".format(output_model) + - "Only accepted 'MONARCH, CMAQ or WRF_CHEM.") + "Only accepted 'MONARCH', 'CMAQ', 'WRF_CHEM' or 'MOCAGE'.") sys.exit(1) @staticmethod diff --git a/hermesv3_gr/modules/writing/writer_mocage.py b/hermesv3_gr/modules/writing/writer_mocage.py new file mode 100755 index 0000000000000000000000000000000000000000..f7a17012c42b555d91f379071e48e4b2a6ff41a2 --- /dev/null +++ b/hermesv3_gr/modules/writing/writer_mocage.py @@ -0,0 +1,567 @@ +#!/usr/bin/env python + +# Copyright 2018 Earth Sciences Department, BSC-CNS +# +# This file is part of HERMESv3_GR. +# +# HERMESv3_GR is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# HERMESv3_GR is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with HERMESv3_GR. If not, see . + + +import sys +import timeit +import numpy as np +from netCDF4 import Dataset +from mpi4py import MPI +from hermesv3_gr.modules.writing.writer import Writer +from hermesv3_gr.config import settings + + +class WriterMocage(Writer): + """ + Class to Write the output file following the MOCAGE conventions. + + :param path: Path to the destination file. + :type path: str + + :param grid: Grid of the destination file. + :type grid: Grid + + :param levels: List with the levels of the grid. + :type levels: list + + :param date: Date of the output file + :type date: datetime.datetime + + :param hours: List with the timestamp hours. + :type hours: list. + + :param global_attributes_path: Path to the file that contains the static global attributes. + :type global_attributes_path: str + + :param compression_level: Indicates if you want to compress the netCDF variable data. + :type compression_level: bool + + :param parallel: Indicates if you want to write in parallel mode. + :type parallel. bool + """ + + def __init__(self, path, grid, levels, date, hours, global_attributes_path, compression_level=True, parallel=False): + super(WriterMocage, self).__init__(path, grid, levels, date, hours, global_attributes_path, compression_level, + parallel) + + def unit_change(self, variable, data): + """ + Do the unit conversions of the data. + + :param variable: Variable to convert. + :type variable: dict + + :param data: Data to change. + :type data: numpy.array + + :return: Data with the new units. + :rtype: numpy.array + """ + st_time = timeit.default_timer() + + if data is not None: + if isinstance(data, np.ndarray): + data = np.array(data, dtype=np.float64) + units = None + for var_name in self.variables_attributes: + if var_name == variable: + units = self.variables_attributes[var_name]['units'] + break + + if units == 'mol/m2/s': + data = data * 1000 + elif units == 'kg/m2/s': + pass + else: + settings.write_log('ERROR: Check the .err file to get more info.') + if settings.rank == 0: + raise TypeError("The unit '{0}' of specie {1} is not defined correctly. ".format(units, variable) + + "Should be 'mol/m2/s' or 'kg/m2/s'") + sys.exit(1) + settings.write_time('WriterMocage', 'unit_change', timeit.default_timer() - st_time, level=3) + return data + + def change_variable_attributes(self): + """ + Modify the emission list to be consistent to use the output as input for MOCAGE model. + + :return: Emission list ready for MOCAGE + :rtype: dict + """ + new_variable_dict = {} + for variable in self.variables_attributes: + new_variable_dict[variable['name']] = { + 'units': variable['units'], + } + + self.variables_attributes = new_variable_dict + + def create_parallel_netcdf(self): + """ + Create an empty netCDF4. + + :return: True at end. + :rtype: bool + """ + st_time = timeit.default_timer() + + settings.write_log("\tCreating parallel NetCDF file.", level=2) + # netcdf = Dataset(netcdf_path, mode='w', format="NETCDF4", parallel=True, comm=settings.comm, info=MPI.Info()) + netcdf = Dataset(self.path, mode='w', format="NETCDF4") + # print 'NETCDF PATH: {0}'.format(netcdf_path) + + settings.write_log("\t\tCreating NetCDF dimensions.", level=2) + # ===== Dimensions ===== + # RegularLatLon: + var_dim = ('latitudes', 'longitudes',) + + # Longitude + netcdf.createDimension('longitudes', self.grid.center_longitudes.shape[0]) + settings.write_log("\t\t\t'longitudes' dimension: {0}".format(self.grid.center_longitudes.shape[0]), level=3) + lon_dim = ('longitudes',) + + # Latitude + netcdf.createDimension('latitudes', self.grid.center_latitudes.shape[0]) + settings.write_log("\t\t\t'latitudes' dimension: {0}".format(self.grid.center_latitudes.shape[0]), level=3) + lat_dim = ('latitudes',) + + # Time + # netcdf.createDimension('time', None) + netcdf.createDimension('time', len(self.hours)) + settings.write_log("\t\t\t'time' dimension: {0}".format(len(self.hours)), level=3) + + # ===== Variables ===== + settings.write_log("\t\tCreating NetCDF variables.", level=2) + # Time + if self.date is None: + time = netcdf.createVariable('time', 'd', ('time',)) + time.units = "UTC_hour" + time[:] = [0.] + else: + time = netcdf.createVariable('time', 'd', ('time',)) + time.units = "UTC_hour" + if settings.rank == 0: + time[:] = np.array(self.hours, dtype=np.float64) + settings.write_log("\t\t\t'time' variable created with size: {0}".format(time[:].shape), level=3) + + # Latitude + lats = netcdf.createVariable('latitudes', 'd', lat_dim, zlib=self.compress) + lats.units = "degree_north" + if settings.rank == 0: + lats[:] = np.array(self.grid.center_latitudes, dtype=np.float64) + settings.write_log("\t\t\t'latitudes' variable created with size: {0}".format(lats[:].shape), level=3) + + # Longitude + lons = netcdf.createVariable('longitudes', 'd', lon_dim, zlib=self.compress) + lons.units = "degree_east" + if settings.rank == 0: + lons[:] = np.array(self.grid.center_longitudes, dtype=np.float64) + settings.write_log("\t\t\t'longitudes' variable created with size: {0}".format(lons[:].shape), level=3) + + if len(self.variables_attributes) is 0: + var = netcdf.createVariable('aux_var', 'd', ('time',) + var_dim, zlib=self.compress) + if settings.rank == 0: + var[:] = 0 + + index = 0 + for var_name, variable in self.variables_attributes.items(): + index += 1 + + var = netcdf.createVariable(var_name, 'd', ('time',) + var_dim, zlib=self.compress) + + var.units = variable['units'] + settings.write_log("\t\t\t'{0}' variable created with size: {1}".format(var_name, var[:].shape) + + "\n\t\t\t\t'{0}' variable will be filled later.".format(var_name), level=3) + + netcdf.close() + + settings.write_time('WriterMocage', 'create_parallel_netcdf', timeit.default_timer() - st_time, level=3) + return True + + def write_parallel_netcdf(self, emission_list): + """ + Append the data to the netCDF4 file already created in parallel mode. + + :param emission_list: Data to append. + :type emission_list: list + + :return: True at end. + :rtype: bool + """ + + st_time = timeit.default_timer() + + settings.write_log("\tAppending data to parallel NetCDF file.", level=2) + if settings.size > 1: + netcdf = Dataset(self.path, mode='a', format="NETCDF4", parallel=True, comm=settings.comm, info=MPI.Info()) + else: + netcdf = Dataset(self.path, mode='a', format="NETCDF4") + settings.write_log("\t\tParallel NetCDF file ready to write.", level=2) + index = 0 + # print "Rank {0} 2".format(rank) + for var_name in self.variables_attributes.keys(): + + data = self.calculate_data_by_var(var_name, emission_list, self.grid.shape) + st_time = timeit.default_timer() + index += 1 + + var = netcdf.variables[var_name] + if settings.size > 1: + var.set_collective(True) + # Correcting NAN + if data is None: + data = 0 + elif isinstance(data, np.ndarray): + data_shape = data.shape + data = data.reshape((data_shape[0], data_shape[2], data_shape[3])) + var[:, self.grid.x_lower_bound:self.grid.x_upper_bound, + self.grid.y_lower_bound:self.grid.y_upper_bound] = data + + settings.write_log("\t\t\t'{0}' variable filled".format(var_name)) + + netcdf.close() + settings.write_time('Writer', 'write_parallel_netcdf', timeit.default_timer() - st_time, level=3) + return True + + def write_serial_netcdf(self, emission_list): + # TODO Documentation + """ + + :param emission_list: + :return: + """ + st_time = timeit.default_timer() + + # Gathering the index + rank_position = np.array( + [self.grid.x_lower_bound, self.grid.x_upper_bound, self.grid.y_lower_bound, self.grid.y_upper_bound], + dtype='i') + full_position = None + if settings.rank == 0: + full_position = np.empty([settings.size, 4], dtype='i') + settings.comm.Gather(rank_position, full_position, root=0) + + if settings.rank == 0: + settings.write_log("\tCreating NetCDF file.", level=2) + netcdf = Dataset(self.path, mode='w', format="NETCDF4") + + # ===== Dimensions ===== + settings.write_log("\t\tCreating NetCDF dimensions.", level=2) + netcdf.createDimension('longitudes', self.grid.center_longitudes.shape[0]) + netcdf.createDimension('latitudes', self.grid.center_latitudes.shape[0]) + netcdf.createDimension('time', len(self.hours)) + + # ===== Variables ===== + settings.write_log("\t\tCreating NetCDF variables.", level=2) + # Time + time = netcdf.createVariable('time', 'd', ('time',)) + time.units = "UTC_hour" + if self.date is None: + time[:] = [0.] + else: + time[:] = np.array(self.hours, dtype=np.float64) + settings.write_log("\t\t\t'time' variable created with size: {0}".format(time[:].shape), level=3) + + # Latitude + lats = netcdf.createVariable('latitudes', 'd', ('latitudes',), zlib=self.compress) + lats.units = "degree_north" + lats[:] = np.array(self.grid.center_latitudes, dtype=np.float64) + settings.write_log("\t\t\t'lat' variable created with size: {0}".format(lats[:].shape), level=3) + + # Longitude + lons = netcdf.createVariable('longitudes', 'd', ('longitudes',), zlib=self.compress) + lons.units = "degree_east" + lons[:] = np.array(self.grid.center_longitudes, dtype=np.float64) + settings.write_log("\t\t\t'lon' variable created with size: {0}".format(lons[:].shape), + level=3) + + full_shape = None + index = 0 + + # self.change_variable_attributes() + + for var_name in self.variables_attributes.keys(): + if settings.size != 1: + settings.write_log("\t\t\tGathering {0} data.".format(var_name), level=3) + rank_data = self.calculate_data_by_var(var_name, emission_list, self.grid.shape) + if rank_data is not None: + # root_shape = settings.comm.bcast(rank_data.shape, root=0) + if full_shape is None: + full_shape = settings.comm.allgather(rank_data.shape) + + counts_i = self.tuple_to_index(full_shape) + rank_buff = [rank_data, counts_i[settings.rank]] + if settings.rank == 0: + displacements = self.calculate_displacements(counts_i) + recvdata = np.empty(sum(counts_i), dtype=settings.precision) + else: + displacements = None + recvdata = None + if settings.precision == np.float32: + recvbuf = [recvdata, counts_i, displacements, MPI.FLOAT] + elif settings.precision == np.float64: + recvbuf = [recvdata, counts_i, displacements, MPI.DOUBLE] + else: + settings.write_log('ERROR: Check the .err file to get more info.') + if settings.rank == 0: + raise TypeError('ERROR: precision {0} unknown'.format(settings.precision)) + sys.exit(1) + + settings.comm.Gatherv(rank_buff, recvbuf, root=0) + + if settings.rank == 0: + if settings.size != 1: + try: + data = np.concatenate(data, axis=3) + except (UnboundLocalError, TypeError, IndexError): + data = 0 + st_time = timeit.default_timer() + index += 1 + + var = netcdf.createVariable(var_name, 'f', ('time', 'latitudes', 'longitudes',), + zlib=self.compress) + var.setncatts(self.variables_attributes[var_name]) + + # data_list = []#np.empty(shape, dtype=np.float64) + + if rank_data is not None: + data = np.empty(var[:].shape, dtype=settings.precision) + for i in range(settings.size): + # print 'Resizeing {0}'.format(i) + if not i == settings.size - 1: + data[:, full_position[i][0]:full_position[i][1], + full_position[i][2]:full_position[i][3]] = \ + np.array(recvbuf[0][displacements[i]: displacements[i + 1]]).reshape( + (full_shape[i][0], full_shape[i][2], full_shape[i][3], )) + else: + data[:, full_position[i][0]:full_position[i][1], + full_position[i][2]:full_position[i][3]] = \ + np.array(recvbuf[0][displacements[i]:]).reshape( + (full_shape[i][0], full_shape[i][2], full_shape[i][3], )) + else: + data = 0 + var[:] = data + settings.write_log("\t\t\t'{0}' variable created with size: {1}".format(var_name, var[:].shape), + level=3) + settings.write_log("\t\tCreating NetCDF metadata.", level=2) + if settings.rank == 0: + # ===== Global attributes ===== + # global_attributes = self.create_global_attributes() + # for attribute in self.global_attributes_order: + # netcdf.setncattr(attribute, global_attributes[attribute]) + + netcdf.close() + settings.write_time('WriterWrfChem', 'write_serial_netcdf', timeit.default_timer() - st_time, level=3) + return True + + + def write_serial_netcdf_old(self, emission_list,): + """ + Write the netCDF4 file in serial mode. + + :param emission_list: Data to append. + :type emission_list: list + + :return: True at end. + :rtype: bool + """ + st_time = timeit.default_timer() + + mpi_numpy = True + mpi_vector = False + + # Gathering the index + if mpi_numpy or mpi_vector: + rank_position = np.array([self.grid.x_lower_bound, self.grid.x_upper_bound, self.grid.y_lower_bound, + self.grid.y_upper_bound], dtype='i') + full_position = None + if settings.rank == 0: + full_position = np.empty([settings.size, 4], dtype='i') + settings.comm.Gather(rank_position, full_position, root=0) + + if settings.rank == 0: + + settings.write_log("\tCreating NetCDF file.", level=2) + netcdf = Dataset(self.path, mode='w', format="NETCDF4") + + # ===== Dimensions ===== + settings.write_log("\t\tCreating NetCDF dimensions.", level=2) + # regular_latlon: + var_dim = ('latitudes', 'longitudes',) + + # Longitude + settings.write_log("\t\t\t'longitudes' dimension: {0}".format(self.grid.center_longitudes.shape[0]), + level=3) + netcdf.createDimension('longitudes', self.grid.center_longitudes.shape[0]) + lon_dim = ('longitudes',) + + # Latitude + settings.write_log("\t\t\t'latitudes' dimension: {0}".format(self.grid.center_latitudes.shape[0]), + level=3) + netcdf.createDimension('latitudes', self.grid.center_latitudes.shape[0]) + lat_dim = ('latitudes',) + + # Time + settings.write_log("\t\t\t'time' dimension: {0}".format(len(self.hours)), level=3) + netcdf.createDimension('time', len(self.hours)) + + # ===== Variables ===== + settings.write_log("\t\tCreating NetCDF variables.", level=2) + # Time + time = netcdf.createVariable('time', 'd', ('time',)) + time.units = "UTC_hour" + if self.date is None: + time[:] = [0.] + else: + time[:] = np.array(self.hours, dtype=np.float64) + settings.write_log("\t\t\t'time' variable created with size: {0}".format(time[:].shape), level=3) + + # Latitude + lats = netcdf.createVariable('latitudes', 'd', lat_dim, zlib=self.compress) + lats.units = "degree_north" + lats[:] = np.array(self.grid.center_latitudes, dtype=np.float64) + settings.write_log("\t\t\t'lat' variable created with size: {0}".format(lats[:].shape), level=3) + + # Longitude + lons = netcdf.createVariable('longitudes', 'd', lon_dim, zlib=self.compress) + lons.units = "degree_east" + lons[:] = np.array(self.grid.center_longitudes, dtype=np.float64) + settings.write_log("\t\t\t'lon' variable created with size: {0}".format(lons[:].shape), + level=3) + + full_shape = None + index = 0 + for var_name in self.variables_attributes.keys(): + if settings.size != 1: + settings.write_log("\t\t\tGathering {0} data.".format(var_name), level=3) + rank_data = self.calculate_data_by_var(var_name, emission_list, self.grid.shape) + if mpi_numpy or mpi_vector: + if rank_data is not None: + root_shape = settings.comm.bcast(rank_data.shape, root=0) + if full_shape is None: + full_shape = settings.comm.allgather(rank_data.shape) + # print 'Rank {0} full_shape: {1}\n'.format(settings.rank, full_shape) + + if mpi_numpy: + if settings.size != 1: + if settings.rank == 0: + recvbuf = np.empty((settings.size,) + rank_data.shape) + else: + recvbuf = None + if root_shape != rank_data.shape: + rank_data_aux = np.empty(root_shape) + rank_data_aux[:, :, :-1] = rank_data + rank_data = rank_data_aux + # print 'Rank {0} data.shape {1}'.format(settings.rank, rank_data.shape) + settings.comm.Gather(rank_data, recvbuf, root=0) + else: + recvbuf = rank_data + elif mpi_vector: + if rank_data is not None: + counts_i = self.tuple_to_index(full_shape) + rank_buff = [rank_data, counts_i[settings.rank]] + if settings.rank == 0: + displacements = self.calculate_displacements(counts_i) + recvdata = np.empty(sum(counts_i), dtype=settings.precision) + else: + displacements = None + recvdata = None + if settings.precision == np.float32: + recvbuf = [recvdata, counts_i, displacements, MPI.FLOAT] + elif settings.precision == np.float64: + recvbuf = [recvdata, counts_i, displacements, MPI.DOUBLE] + else: + settings.write_log('ERROR: Check the .err file to get more info.') + if settings.rank == 0: + raise TypeError('ERROR: precision {0} unknown'.format(settings.precision)) + sys.exit(1) + + settings.comm.Gatherv(rank_buff, recvbuf, root=0) + + else: + if settings.size != 1: + data = settings.comm.gather(rank_data, root=0) + else: + data = rank_data + + if settings.rank == 0: + if not (mpi_numpy or mpi_vector): + if settings.size != 1: + try: + data = np.concatenate(data, axis=3) + except (UnboundLocalError, TypeError, IndexError): + data = 0 + index += 1 + if self.compress: + var = netcdf.createVariable(var_name, 'd', ('time',) + var_dim, zlib=self.compress, + complevel=self.compression_level) + else: + var = netcdf.createVariable(var_name, 'd', ('time',) + var_dim) + + var.units = self.variables_attributes[var_name]['units'] + + if mpi_numpy: + data = np.ones(var[:].shape, dtype=settings.precision) * 100 + for i in range(settings.size): + try: + if i == 0: + var[:, :, :full_position[i][3]] = recvbuf[i] + elif i == settings.size - 1: + var[:, :, full_position[i][2]:] = recvbuf[i, :, :, :-1] + else: + var[:, :, full_position[i][2]:full_position[i][3]] = \ + recvbuf[i, :, :, : full_shape[i][-1]] + except ValueError: + settings.write_log('ERROR: Check the .err file to get more info.') + if settings.rank == 0: + raise TypeError("ERROR on i {0} ".format(i) + + "data shape: {0} ".format(data[:, :, full_position[i][2]:].shape) + + "recvbuf shape {0}".format(recvbuf[i].shape)) + sys.exit(1) + + elif mpi_vector: + if rank_data is not None: + data = np.empty(var[:].shape, dtype=settings.precision) + for i in range(settings.size): + if not i == settings.size - 1: + data[:, full_position[i][0]:full_position[i][1], + full_position[i][2]:full_position[i][3]] = \ + np.array(recvbuf[0][displacements[i]: displacements[i + 1]]).reshape(full_shape[i]) + else: + data[:, full_position[i][0]:full_position[i][1], + full_position[i][2]:full_position[i][3]] = \ + np.array(recvbuf[0][displacements[i]:]).reshape(full_shape[i]) + else: + data = 0 + if isinstance(data, np.ndarray): + var[:] = np.array(data, dtype=np.float64) + else: + var[:] = data + else: + if isinstance(data, np.ndarray): + var[:] = np.array(data, dtype=np.float64) + else: + var[:] = data + + settings.write_log("\t\t\t'{0}' variable created with size: {1}".format(var_name, var[:].shape), + level=3) + + if settings.rank == 0: + netcdf.close() + settings.write_time('WriterMocage', 'write_serial_netcdf', timeit.default_timer() - st_time, level=3)