diff --git a/CHANGELOG b/CHANGELOG index 9c99603d2e1389eeccd8614868098f9d29fdee90..8695d0e0d17e0d671e55a9e5f18ed30fb0cbef56 100755 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,5 @@ -0.1.1 - 2019/10/29 +0.1.2 + 2020/02/28 First beta version: @@ -29,7 +29,8 @@ 12. Solvents sector - Writing options: - 0. Return emissions on memory + A. Return emissions on memory + 1. Default writer 2. CMAQ writer 3. MONARCH writer diff --git a/conf/hermes.conf b/conf/hermes.conf index ad9ea14736848678cbfc107aa8a6a3e677db1d05..70e57f6fbe2f85f2072a0898428d9f08e35790c9 100755 --- a/conf/hermes.conf +++ b/conf/hermes.conf @@ -2,24 +2,25 @@ log_level = 3 input_dir = /home/Earth/ctena/Models/hermesv3_bu_data data_path = /esarchive/recon -output_dir = /scratch/Earth/HERMESv3_BU_OUT/ -output_name = HERMESv3_.nc +output_dir = /scratch/Earth/ctena/HERMESv3_BU_OUT/ +output_name = HERMESv3__p4_w2.nc emission_summary = 0 start_date = 2016/11/29 00:00:00 # ----- end_date = start_date [DEFAULT] ----- # end_date = 2010/01/01 00:00:00 output_timestep_num = 24 -auxiliary_files_path = /scratch/Earth/HERMESv3_BU_aux/__test +auxiliary_files_path = /scratch/Earth/ctena/HERMESv3_BU_aux/__test first_time = 0 erase_auxiliary_files = 0 [DOMAIN] -# domain_type=[lcc, rotated, mercator, regular] -domain_type = lcc +# domain_type=[lcc, rotated, mercator, regular, rotated_nested] +domain_type = rotated_nested # output_type=[MONARCH, CMAQ, WRF_CHEM, DEFAULT] output_model = DEFAULT +compression_level = 0 output_attributes = /writing/global_attributes_WRF-Chem.csv vertical_description = /profiles/vertical/MONARCH_Global_48layers_vertical_description.csv #vertical_description = /profiles/vertical/CMAQ_15layers_vertical_description.csv @@ -39,6 +40,16 @@ vertical_description = /profiles/vertical/MONARCH_Global_48layers_ver # inc_rlat = 0.4 # inc_rlon = 0.4 +# if domain_type == rotated_nested: + parent_grid_path = /scratch/Earth/ctena/temporal_coords.nc + parent_ratio = 4 + # i -> lon + i_parent_start = 200 + # j -> lat + j_parent_start = 100 + n_rlat = 400 + n_rlon = 400 + # if domain_type == lcc: # CALIOPE @@ -118,7 +129,7 @@ vertical_description = /profiles/vertical/MONARCH_Global_48layers_ver # SECTORS #################################################################### [SECTOR MANAGEMENT] -writing_processors = 1 +writing_processors = 2 # # aviation_processors = 1 # shipping_port_processors = 1 @@ -130,7 +141,7 @@ writing_processors = 1 # recreational_boats_processors = 4 # point_sources_processors = 16 # traffic_processors = 256 -traffic_area_processors = 1 +traffic_area_processors = 4 [SHAPEFILES] nuts3_shapefile = /shapefiles/nuts3/nuts3.shp diff --git a/hermesv3_bu/clipping/clip.py b/hermesv3_bu/clipping/clip.py index 5b2ebb6598dd1e964c49bcf340817d948e4bbd5f..9addce193c8ea0701d6e77b66d03ec68b7c5dbd1 100755 --- a/hermesv3_bu/clipping/clip.py +++ b/hermesv3_bu/clipping/clip.py @@ -34,10 +34,10 @@ def select_clip(comm, logger, auxiliary_path, clipping, grid): clip = DefaultClip(logger, auxiliary_path, grid) elif clipping[0] == os.path.sep: from hermesv3_bu.clipping.shapefile_clip import ShapefileClip - clip = ShapefileClip(logger, auxiliary_path, clipping) + clip = ShapefileClip(logger, auxiliary_path, clipping, grid) else: from hermesv3_bu.clipping.custom_clip import CustomClip - clip = CustomClip(logger, auxiliary_path, clipping) + clip = CustomClip(logger, auxiliary_path, clipping, grid) else: clip = None @@ -49,9 +49,10 @@ def select_clip(comm, logger, auxiliary_path, clipping, grid): class Clip(object): - def __init__(self, logger, auxiliary_path): + def __init__(self, logger, auxiliary_path, grid): spent_time = timeit.default_timer() self.logger = logger + self.grid = grid self.shapefile = None self.shapefile_path = os.path.join(auxiliary_path, 'clip', 'clip.shp') diff --git a/hermesv3_bu/clipping/custom_clip.py b/hermesv3_bu/clipping/custom_clip.py index f6f79b27ccb93f669bddfce8e689c19eb01a38a0..e780dd2c88e2a2fcefec24521ebc264d06a2a7ea 100755 --- a/hermesv3_bu/clipping/custom_clip.py +++ b/hermesv3_bu/clipping/custom_clip.py @@ -9,7 +9,7 @@ from hermesv3_bu.logger.log import Log class CustomClip(Clip): - def __init__(self, logger, auxiliary_path, points_str): + def __init__(self, logger, auxiliary_path, points_str, grid): """ Initialise the Custom Clip class @@ -24,7 +24,7 @@ class CustomClip(Clip): """ spent_time = timeit.default_timer() logger.write_log('Custom clip selected') - super(CustomClip, self).__init__(logger, auxiliary_path) + super(CustomClip, self).__init__(logger, auxiliary_path, grid) self.clip_type = 'Custom clip' self.shapefile = self.create_clip(points_str) self.logger.write_time_log('CustomClip', '__init__', timeit.default_timer() - spent_time) @@ -56,9 +56,15 @@ class CustomClip(Clip): if not ((lon_list[0] == lon_list[-1]) and (lat_list[0] == lat_list[-1])): lon_list.append(lon_list[0]) lat_list.append(lat_list[0]) + geom = Polygon([[p.x, p.y] for p in [Point(xy) for xy in zip(lon_list, lat_list)]]) + clip = gpd.GeoDataFrame( + geometry=[geom], + crs={'init': 'epsg:4326'}) + border = gpd.GeoDataFrame(geometry=[self.grid.shapefile.unary_union], crs=self.grid.shapefile.crs) + geom = gpd.overlay(clip, border.to_crs(clip.crs), how='intersection').unary_union clip = gpd.GeoDataFrame( - geometry=[Polygon([[p.x, p.y] for p in [Point(xy) for xy in zip(lon_list, lat_list)]])], + geometry=[geom], crs={'init': 'epsg:4326'}) clip.to_file(self.shapefile_path) diff --git a/hermesv3_bu/clipping/default_clip.py b/hermesv3_bu/clipping/default_clip.py index f05bda8cc6ca35758b495f5b0f19a2d84770b013..5cc1f99123e1a37cb81b0dddd728ae920964eb61 100755 --- a/hermesv3_bu/clipping/default_clip.py +++ b/hermesv3_bu/clipping/default_clip.py @@ -24,7 +24,7 @@ class DefaultClip(Clip): """ spent_time = timeit.default_timer() logger.write_log('Default clip selected') - super(DefaultClip, self).__init__(logger, auxiliary_path) + super(DefaultClip, self).__init__(logger, auxiliary_path, grid) self.clip_type = 'Default clip' self.shapefile = self.create_clip(grid) self.logger.write_time_log('DefaultClip', '__init__', timeit.default_timer() - spent_time) diff --git a/hermesv3_bu/clipping/shapefile_clip.py b/hermesv3_bu/clipping/shapefile_clip.py index 88792ef532d2403cd46d5e97780b0c4d5027e746..7d4eb4642ad31bf7ca5cb633fbc7e81e39d528d2 100755 --- a/hermesv3_bu/clipping/shapefile_clip.py +++ b/hermesv3_bu/clipping/shapefile_clip.py @@ -10,7 +10,7 @@ from hermesv3_bu.tools.checker import error_exit class ShapefileClip(Clip): - def __init__(self, logger, auxiliary_path, clip_input_path): + def __init__(self, logger, auxiliary_path, clip_input_path, grid): """ Initialise the Shapefile Clip class @@ -25,7 +25,7 @@ class ShapefileClip(Clip): """ spent_time = timeit.default_timer() logger.write_log('Shapefile clip selected') - super(ShapefileClip, self).__init__(logger, auxiliary_path) + super(ShapefileClip, self).__init__(logger, auxiliary_path, grid) self.clip_type = 'Shapefile clip' self.shapefile = self.create_clip(clip_input_path) self.logger.write_time_log('ShapefileClip', '__init__', timeit.default_timer() - spent_time) @@ -46,7 +46,9 @@ class ShapefileClip(Clip): if not os.path.exists(os.path.dirname(self.shapefile_path)): os.makedirs(os.path.dirname(self.shapefile_path)) clip = gpd.read_file(clip_path) - clip = gpd.GeoDataFrame(geometry=[clip.unary_union], crs=clip.crs) + border = gpd.GeoDataFrame(geometry=[self.grid.shapefile.unary_union], crs=self.grid.shapefile.crs) + geom = gpd.overlay(clip, border.to_crs(clip.crs), how='intersection').unary_union + clip = gpd.GeoDataFrame(geometry=[geom], crs=clip.crs) clip.to_file(self.shapefile_path) else: error_exit(" Clip shapefile {0} not found.") diff --git a/hermesv3_bu/config/config.py b/hermesv3_bu/config/config.py index f99e3e4072e4067c68e908aff90b853f4f46fcc5..f9141b84c8773cedba4d9373abdf59d830009cd3 100755 --- a/hermesv3_bu/config/config.py +++ b/hermesv3_bu/config/config.py @@ -74,12 +74,14 @@ class Config(ArgParser): p.add_argument('--output_attributes', required=False, help='Path to the file that contains the global attributes.') - p.add_argument('--domain_type', required=True, help='Type of domain to simulate.', - choices=['lcc', 'rotated', 'mercator', 'regular']) - p.add_argument('--vertical_description', required=True, help='Path to the file that contains the vertical description of the desired output.') + p.add_argument('--domain_type', required=True, help='Type of domain to simulate.', + choices=['lcc', 'rotated', 'mercator', 'regular', 'rotated_nested']) + p.add_argument('--compression_level', required=False, type=int, choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + help='Compression level of the NetCDF output (0 for no compressed output).', default=4) + # Rotated options p.add_argument('--centre_lat', required=False, type=float, help='Central geographic latitude of grid (non-rotated degrees). Corresponds to the TPH0D ' + @@ -100,6 +102,20 @@ class Config(ArgParser): help='Longitudinal grid resolution (rotated degrees). Corresponds to the DLMD parameter ' + 'in NMMB-MONARCH.') + # Rotated_nested options + p.add_argument('--parent_grid_path', required=False, type=str, + help='Path to the netCDF that contains the grid definition.') + p.add_argument('--parent_ratio', required=False, type=int, + help='Ratio between the parent and the nested domain.') + p.add_argument('--i_parent_start', required=False, type=int, + help='Location of the I to start the nested.') + p.add_argument('--j_parent_start', required=False, type=int, + help='Location of the J to start the nested.') + p.add_argument('--n_rlat', required=False, type=int, + help='Number of rotated latitude points.') + p.add_argument('--n_rlon', required=False, type=int, + help='Number of rotated longitude points.') + # Lambert conformal conic options p.add_argument('--lat_1', required=False, type=float, help='Standard parallel 1 (in deg). Corresponds to the P_ALP parameter of the GRIDDESC file.') @@ -686,6 +702,9 @@ class Config(ArgParser): elif arguments.domain_type == 'rotated': arguments.__dict__[item] = arguments.__dict__[item].replace('', '{1}_{2}'.format( item, arguments.inc_rlat, arguments.inc_rlon)) + elif arguments.domain_type == 'rotated_nested': + arguments.__dict__[item] = arguments.__dict__[item].replace('', '{1}_{2}'.format( + item, arguments.n_rlat, arguments.n_rlon)) elif arguments.domain_type == 'lcc' or arguments.domain_type == 'mercator': arguments.__dict__[item] = arguments.__dict__[item].replace('', '{1}_{2}'.format( item, arguments.inc_x, arguments.inc_y)) diff --git a/hermesv3_bu/grids/grid.py b/hermesv3_bu/grids/grid.py index 1f1e49b5e134bc8fba691c9e91093b92b844c6da..ff91a4acb648a3164fb9386401725d8a20152226 100755 --- a/hermesv3_bu/grids/grid.py +++ b/hermesv3_bu/grids/grid.py @@ -45,6 +45,12 @@ def select_grid(comm, logger, arguments): logger, arguments.auxiliary_files_path, arguments.output_timestep_num, arguments.vertical_description, arguments.centre_lat, arguments.centre_lon, arguments.west_boundary, arguments.south_boundary, arguments.inc_rlat, arguments.inc_rlon) + elif arguments.domain_type == 'rotated_nested': + from hermesv3_bu.grids.grid_rotated_nested import RotatedNestedGrid + grid = RotatedNestedGrid( + logger, arguments.auxiliary_files_path, arguments.output_timestep_num, + arguments.vertical_description, arguments.parent_grid_path, arguments.parent_ratio, + arguments.i_parent_start, arguments.j_parent_start, arguments.n_rlat, arguments.n_rlon) elif arguments.domain_type == 'mercator': from hermesv3_bu.grids.grid_mercator import MercatorGrid diff --git a/hermesv3_bu/grids/grid_rotated.py b/hermesv3_bu/grids/grid_rotated.py index 219570769a3dbd3dc6c1dedaa5bbe5c939bc8dc8..5d1fbec643c8ba30bc114ba7d24fac7eea592c12 100755 --- a/hermesv3_bu/grids/grid_rotated.py +++ b/hermesv3_bu/grids/grid_rotated.py @@ -172,7 +172,7 @@ class RotatedGrid(Grid): boundary_latitudes=self.boundary_latitudes, boundary_longitudes=self.boundary_longitudes, rotated=True, rotated_lats=self.rlat, rotated_lons=self.rlon, - north_pole_lat=90 - self.attributes['new_pole_latitude_degrees'], + north_pole_lat=self.attributes['new_pole_latitude_degrees'], north_pole_lon=self.attributes['new_pole_longitude_degrees']) self.logger.write_log("\tGrid created at '{0}'".format(self.netcdf_path), 3) self.logger.write_time_log('RotatedGrid', 'write_netcdf', timeit.default_timer() - spent_time, 3) diff --git a/hermesv3_bu/grids/grid_rotated_nested.py b/hermesv3_bu/grids/grid_rotated_nested.py new file mode 100755 index 0000000000000000000000000000000000000000..9b5ed392397f893d5608ffa91195fe1de37c18a0 --- /dev/null +++ b/hermesv3_bu/grids/grid_rotated_nested.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python + +import os +import timeit +from hermesv3_bu.grids.grid import Grid +import numpy as np +import math + +from hermesv3_bu.logger.log import Log + + +class RotatedNestedGrid(Grid): + def __init__(self, logger, auxiliary_path, tstep_num, vertical_description_path, parent_grid_path, parent_ratio, + i_parent_start, j_parent_start, n_rlat, n_rlon): + """ + + :param logger: Logger. + :type logger: Log + + :param auxiliary_path: + :param tstep_num: + :param vertical_description_path: + + """ + spent_time = timeit.default_timer() + + self.rlat = None + self.rlon = None + + logger.write_log('Rotated Nested grid selected.') + self.grid_type = 'Rotated_nested' + + attributes = {'parent_grid_path': parent_grid_path, 'parent_ratio': parent_ratio, + 'i_parent_start': i_parent_start, 'j_parent_start': j_parent_start, + 'n_rlat': n_rlat, 'n_rlon': n_rlon, 'crs': {'init': 'epsg:4326'}} + attributes = self.get_parent_attributes(attributes) + + # Initialises with parent class + super(RotatedNestedGrid, self).__init__(logger, attributes, auxiliary_path, vertical_description_path) + + self.shape = (tstep_num, len(self.vertical_desctiption), attributes['n_rlat'], attributes['n_rlon']) + self.logger.write_time_log('RotatedNestedGrid', '__init__', timeit.default_timer() - spent_time, 3) + + @staticmethod + def get_parent_attributes(attributes): + from netCDF4 import Dataset + + netcdf = Dataset(attributes['parent_grid_path'], mode='r') + + rlat = netcdf.variables['rlat'][:] + attributes['inc_rlat'] = (rlat[1] - rlat[0]) / attributes['parent_ratio'] + # j_parent_start start the index at 1 so we must - 1 + attributes['1st_rlat'] = rlat[int(attributes['j_parent_start']) - 1] + + rlon = netcdf.variables['rlon'][:] + attributes['inc_rlon'] = (rlon[1] - rlon[0]) / attributes['parent_ratio'] + # i_parent_start start the index at 1 so we must - 1 + attributes['1st_rlon'] = rlon[attributes['i_parent_start'] - 1] + + rotated_pole = netcdf.variables['rotated_pole'] + attributes['new_pole_longitude_degrees'] = rotated_pole.grid_north_pole_longitude + attributes['new_pole_latitude_degrees'] = 90 - rotated_pole.grid_north_pole_latitude + + netcdf.close() + + return attributes + + def create_regular_rotated(self): + """ + Create a regular grid on the rotated domain. + + :return: center_latitudes, center_longitudes, corner_latitudes, corner_longitudes + :rtype: tuple + """ + spent_time = timeit.default_timer() + + center_latitudes = np.linspace(self.attributes['1st_rlat'], self.attributes['1st_rlat'] + + (self.attributes['inc_rlat'] * (self.attributes['n_rlat'] - 1)), + self.attributes['n_rlat'], dtype=np.float) + center_longitudes = np.linspace(self.attributes['1st_rlon'], self.attributes['1st_rlon'] + + (self.attributes['inc_rlon'] * (self.attributes['n_rlon'] - 1)), + self.attributes['n_rlon'], dtype=np.float) + + corner_latitudes = self.create_bounds(center_latitudes, self.attributes['inc_rlat'], number_vertices=4, + inverse=True) + corner_longitudes = self.create_bounds(center_longitudes, self.attributes['inc_rlon'], number_vertices=4) + + self.logger.write_time_log('RotatedNestedGrid', 'create_regular_rotated', + timeit.default_timer() - spent_time, 3) + return center_latitudes, center_longitudes, corner_latitudes, corner_longitudes + + def create_coords(self): + """ + Create the coordinates for a rotated domain. + """ + spent_time = timeit.default_timer() + # Create rotated coordinates + (self.rlat, self.rlon, br_lats_single, br_lons_single) = self.create_regular_rotated() + + # 1D to 2D + c_lats = np.array([self.rlat] * len(self.rlon)).T + c_lons = np.array([self.rlon] * len(self.rlat)) + + # Create rotated boundary coordinates + b_lats = super(RotatedNestedGrid, self).create_bounds(c_lats, self.attributes['inc_rlat'], number_vertices=4, + inverse=True) + b_lons = super(RotatedNestedGrid, self).create_bounds(c_lons, self.attributes['inc_rlon'], number_vertices=4) + + # Rotated to Lat-Lon + self.boundary_longitudes, self.boundary_latitudes = self.rotated2latlon(b_lons, b_lats) + self.center_longitudes, self.center_latitudes = self.rotated2latlon(c_lons, c_lats) + + self.logger.write_time_log('RotatedNestedGrid', 'create_coords', timeit.default_timer() - spent_time, 3) + return True + + def rotated2latlon(self, lon_deg, lat_deg, lon_min=-180): + """ + Calculate the unrotated coordinates using the rotated ones. + + :param lon_deg: Rotated longitude coordinate. + :type lon_deg: numpy.array + + :param lat_deg: Rotated latitude coordinate. + :type lat_deg: numpy.array + + :param lon_min: Minimum value for the longitudes: -180 (-180 to 180) or 0 (0 to 360) + :type lon_min: float + + :return: Unrotated coordinates. Longitudes, Latitudes + :rtype: tuple(numpy.array, numpy.array) + """ + spent_time = timeit.default_timer() + degrees_to_radians = math.pi / 180. + # radians_to_degrees = 180. / math.pi + + # Positive east to negative east + # self.new_pole_longitude_degrees -= 180 + + tph0 = self.attributes['new_pole_latitude_degrees'] * degrees_to_radians + tlm = lon_deg * degrees_to_radians + tph = lat_deg * degrees_to_radians + tlm0d = self.attributes['new_pole_longitude_degrees'] + ctph0 = np.cos(tph0) + stph0 = np.sin(tph0) + + stlm = np.sin(tlm) + ctlm = np.cos(tlm) + stph = np.sin(tph) + ctph = np.cos(tph) + + # Latitude + sph = (ctph0 * stph) + (stph0 * ctph * ctlm) + # if sph > 1.: + # sph = 1. + # if sph < -1.: + # sph = -1. + sph[sph > 1.] = 1. + sph[sph < -1.] = -1. + + aph = np.arcsin(sph) + aphd = aph / degrees_to_radians + + # Longitude + anum = ctph * stlm + denom = (ctlm * ctph - stph0 * sph) / ctph0 + relm = np.arctan2(anum, denom) - math.pi + almd = relm / degrees_to_radians + tlm0d + + # if almd < min_lon: + # almd += 360 + # elif almd > max_lon: + # almd -= 360 + almd[almd > (lon_min + 360)] -= 360 + almd[almd < lon_min] += 360 + + self.logger.write_time_log('RotatedNestedGrid', 'rotated2latlon', timeit.default_timer() - spent_time, 3) + + return almd, aphd + + def write_netcdf(self): + """ + Write a rotated grid NetCDF with empty data + """ + from hermesv3_bu.io_server.io_netcdf import write_coords_netcdf + spent_time = timeit.default_timer() + if not os.path.exists(self.netcdf_path): + if not os.path.exists(os.path.dirname(self.netcdf_path)): + os.makedirs(os.path.dirname(self.netcdf_path)) + # Writes an auxiliary empty NetCDF only with the coordinates and an empty variable. + write_coords_netcdf(self.netcdf_path, self.center_latitudes, self.center_longitudes, + [{'name': 'var_aux', 'units': '', 'data': 0}], + boundary_latitudes=self.boundary_latitudes, + boundary_longitudes=self.boundary_longitudes, + rotated=True, rotated_lats=self.rlat, rotated_lons=self.rlon, + north_pole_lat=90 - self.attributes['new_pole_latitude_degrees'], + north_pole_lon=self.attributes['new_pole_longitude_degrees']) + self.logger.write_log("\tGrid created at '{0}'".format(self.netcdf_path), 3) + self.logger.write_time_log('RotatedNestedGrid', 'write_netcdf', timeit.default_timer() - spent_time, 3) + return True diff --git a/hermesv3_bu/writer/cmaq_writer.py b/hermesv3_bu/writer/cmaq_writer.py index d609162360e15dbc02e36b29eadef7bb545f609f..8d2f0a63550a174d45ba541dc3b7ab67c8588b73 100755 --- a/hermesv3_bu/writer/cmaq_writer.py +++ b/hermesv3_bu/writer/cmaq_writer.py @@ -14,7 +14,7 @@ from hermesv3_bu.tools.checker import error_exit class CmaqWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, global_attributes_path, emission_summary=False): + rank_distribution, global_attributes_path, compression_level, emission_summary=False): """ Initialise the CMAQ writer that will write a NetCDF in the CMAQ input format (IOAPIv3.2). @@ -65,8 +65,10 @@ class CmaqWriter(Writer): spent_time = timeit.default_timer() logger.write_log('CMAQ writer selected.') - super(CmaqWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary) + super(CmaqWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) + if self.grid.grid_type not in ['Lambert Conformal Conic']: error_exit("Only Lambert Conformal Conic grid is implemented for CMAQ. " + "The current grid type is '{0}'".format(self.grid.grid_type)) @@ -296,11 +298,14 @@ class CmaqWriter(Writer): for var_name in self.pollutant_info.index: self.logger.write_log('\t\tCreating {0} variable'.format(var_name), message_level=3) - if self.comm_write.Get_size() > 1: + if self.compression: + var = netcdf.createVariable(var_name, np.float64, ('TSTEP', 'LAY', 'ROW', 'COL',), + zlib=True, complevel=self.compression_level) + else: var = netcdf.createVariable(var_name, np.float64, ('TSTEP', 'LAY', 'ROW', 'COL',)) + if self.comm_write.Get_size() > 1: var.set_collective(True) - else: - var = netcdf.createVariable(var_name, np.float64, ('TSTEP', 'LAY', 'ROW', 'COL',), zlib=True) + try: var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) diff --git a/hermesv3_bu/writer/default_writer.py b/hermesv3_bu/writer/default_writer.py index 5bde50bb8bcab1f262f3445ec05b92eee0f18cf8..2a4df19f937172c598c3632dd4798a9d41a8bbc9 100755 --- a/hermesv3_bu/writer/default_writer.py +++ b/hermesv3_bu/writer/default_writer.py @@ -12,7 +12,7 @@ CHUNK = True class DefaultWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary=False): + rank_distribution, compression_level=4, emission_summary=False): """ Initialise the Default writer that will write a NetCDF CF-1.6 complient. @@ -59,8 +59,9 @@ class DefaultWriter(Writer): """ spent_time = timeit.default_timer() logger.write_log('Default writer selected.') - super(DefaultWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, - pollutant_info, rank_distribution, emission_summary) + super(DefaultWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) self.logger.write_time_log('DefaultWriter', '__init__', timeit.default_timer() - spent_time) @@ -110,7 +111,7 @@ class DefaultWriter(Writer): var_dim = ('y', 'x',) lat_dim = lon_dim = var_dim - elif self.grid.grid_type == 'Rotated': + elif self.grid.grid_type in ['Rotated', 'Rotated_nested']: netcdf.createDimension('rlat', len(self.grid.rlat)) netcdf.createDimension('rlon', len(self.grid.rlon)) var_dim = ('rlat', 'rlon') @@ -177,7 +178,7 @@ class DefaultWriter(Writer): y_var.standard_name = "projection_y_coordinate" y_var[:] = self.grid.y - elif self.grid.grid_type == 'Rotated': + elif self.grid.grid_type in ['Rotated', 'Rotated_nested']: self.logger.write_log('\t\tCreating rlat variable', message_level=3) rlat = netcdf.createVariable('rlat', np.float64, ('rlat',)) rlat.long_name = "latitude in rotated pole grid" @@ -198,16 +199,13 @@ class DefaultWriter(Writer): # emissions.drop(columns=['Unnamed: 0'], inplace=True) for var_name in emissions.columns.values: self.logger.write_log('\t\tCreating {0} variable'.format(var_name), message_level=3) + if self.compression: + var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, + zlib=True, complevel=self.compression_level) + else: + var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim) if self.comm_write.Get_size() > 1: - if CHUNK: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, - chunksizes=self.rank_distribution[0]['shape']) - else: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim) - var.set_collective(True) - else: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, zlib=True) var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) var[:, :, @@ -224,7 +222,7 @@ class DefaultWriter(Writer): var.grid_mapping = 'Latitude_Longitude' elif self.grid.grid_type == 'Lambert Conformal Conic': var.grid_mapping = 'Lambert_Conformal' - elif self.grid.grid_type == 'Rotated': + elif self.grid.grid_type in ['Rotated', 'Rotated_nested']: var.grid_mapping = 'rotated_pole' elif self.grid.grid_type == 'Mercator': var.grid_mapping = 'mercator' @@ -247,7 +245,7 @@ class DefaultWriter(Writer): mapping.longitude_of_central_meridian = self.grid.attributes['lon_0'] mapping.latitude_of_projection_origin = self.grid.attributes['lat_0'] - elif self.grid.grid_type == 'Rotated': + elif self.grid.grid_type in ['Rotated', 'Rotated_nested']: mapping = netcdf.createVariable('rotated_pole', 'c') mapping.grid_mapping_name = 'rotated_latitude_longitude' mapping.grid_north_pole_latitude = 90 - self.grid.attributes['new_pole_latitude_degrees'] diff --git a/hermesv3_bu/writer/monarch_writer.py b/hermesv3_bu/writer/monarch_writer.py index deb5d4cd24864f448bc6514bcd562877fffb1913..9502352b93891ec081a8fed4fc9fb7463cb9b4be 100755 --- a/hermesv3_bu/writer/monarch_writer.py +++ b/hermesv3_bu/writer/monarch_writer.py @@ -11,7 +11,7 @@ from hermesv3_bu.tools.checker import error_exit class MonarchWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary=False): + rank_distribution, compression_level, emission_summary=False): """ Initialise the MONARCH writer that will write a NetCDF CF-1.6 complient. @@ -59,11 +59,12 @@ class MonarchWriter(Writer): spent_time = timeit.default_timer() logger.write_log('MONARCH writer selected.') - super(MonarchWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, - pollutant_info, rank_distribution, emission_summary) + super(MonarchWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) - if self.grid.grid_type not in ['Rotated']: - error_exit("ERROR: Only Rotated grid is implemented for MONARCH. " + + if self.grid.grid_type not in ['Rotated', 'Rotated_nested']: + error_exit("ERROR: Only Rotated or Rotated-nested grid is implemented for MONARCH. " + "The current grid type is '{0}'".format(self.grid.grid_type)) for i, (pollutant, variable) in enumerate(self.pollutant_info.iterrows()): @@ -196,12 +197,13 @@ class MonarchWriter(Writer): # var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, # chunksizes=self.rank_distribution[0]['shape']) - - if self.comm_write.Get_size() > 1: + if self.compression: + var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, + zlib=True, complevel=self.compression_level) + else: var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim) + if self.comm_write.Get_size() > 1: var.set_collective(True) - else: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, zlib=True) try: var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) diff --git a/hermesv3_bu/writer/wrfchem_writer.py b/hermesv3_bu/writer/wrfchem_writer.py index a1fd289d2ec553b6a6dccaa81b1d5f77e1033fb4..4108fbfdd896185f1612737b89a69fb634bac3ee 100755 --- a/hermesv3_bu/writer/wrfchem_writer.py +++ b/hermesv3_bu/writer/wrfchem_writer.py @@ -14,7 +14,7 @@ from hermesv3_bu.tools.checker import error_exit class WrfChemWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, global_attributes_path, emission_summary=False): + rank_distribution, global_attributes_path, compression_level, emission_summary=False): """ Initialise the WRF-Chem writer that will write a NetCDF in the CMAQ input format (IOAPIv3.2). @@ -65,8 +65,10 @@ class WrfChemWriter(Writer): spent_time = timeit.default_timer() logger.write_log('WRF-Chem writer selected.') - super(WrfChemWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, - pollutant_info, rank_distribution, emission_summary) + super(WrfChemWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) + if self.grid.grid_type not in ['Lambert Conformal Conic', 'Mercator']: error_exit("ERROR: Only Lambert Conformal Conic or Mercator grid is implemented for WRF-Chem. " + "The current grid type is '{0}'".format(self.grid.grid_type)) @@ -348,14 +350,15 @@ class WrfChemWriter(Writer): # ========== POLLUTANTS ========== for var_name in emissions.columns.values: self.logger.write_log('\t\tCreating {0} variable'.format(var_name), message_level=3) - - if self.comm_write.Get_size() > 1: + if self.compression: var = netcdf.createVariable(var_name, np.float64, - ('Time', 'emissions_zdim', 'south_north', 'west_east',)) - var.set_collective(True) + ('Time', 'emissions_zdim', 'south_north', 'west_east',), + zlib=True, complevel=self.compression_level) else: var = netcdf.createVariable(var_name, np.float64, - ('Time', 'emissions_zdim', 'south_north', 'west_east',), zlib=True) + ('Time', 'emissions_zdim', 'south_north', 'west_east',)) + if self.comm_write.Get_size() > 1: + var.set_collective(True) var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) diff --git a/hermesv3_bu/writer/writer.py b/hermesv3_bu/writer/writer.py index 0a92322b392c2c7906313218d4951cd46fc77242..55d4f987918d2d8e912582bb77423d7e050a88f5 100755 --- a/hermesv3_bu/writer/writer.py +++ b/hermesv3_bu/writer/writer.py @@ -14,6 +14,8 @@ CHUNKING = True BALANCED = False MPI_TAG_CONSTANT = 10**6 +BUFFER_SIZE = 2**28 + def select_writer(logger, comm_world, arguments, grid, date_array): """ @@ -226,7 +228,7 @@ def get_balanced_distribution(logger, processors, shape): class Writer(object): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary=False): + rank_distribution, compression_level, emission_summary=False): """ Initialise the Writer class. @@ -279,6 +281,16 @@ class Writer(object): self.pollutant_info = pollutant_info self.rank_distribution = rank_distribution self.emission_summary = emission_summary + if self.comm_write.Get_size() > 1: + self.compression = False + if compression_level > 0: + self.logger.write_log("WARNING: No compression available for parallel write.", message_level=1) + else: + if compression_level > 0: + self.compression = True + self.compression_level = compression_level + else: + self.compression = False if self.emission_summary and self.comm_write.Get_rank() == 0: self.emission_summary_paths = { @@ -307,38 +319,42 @@ class Writer(object): spent_time = timeit.default_timer() # Sending self.logger.write_log('Sending emissions to the writing processors.', message_level=2) - requests = [] + requests = {} + data_list = [] for w_rank, info in self.rank_distribution.items(): partial_emis = emissions.loc[(emissions.index.get_level_values(0) >= info['fid_min']) & (emissions.index.get_level_values(0) < info['fid_max'])] + partial_emis.reset_index(inplace=True) + if w_rank == self.comm_world.Get_rank(): + self.logger.write_log("I'm the writing processor {0}. Saving {1} emissions".format( + w_rank, sys.getsizeof(partial_emis)), message_level=3) + data_list.append(partial_emis) + else: + self.logger.write_log('\tFrom {0} sending {1} to {2}'.format( + self.comm_world.Get_rank(), sys.getsizeof(partial_emis), w_rank), message_level=3) - self.logger.write_log('\tFrom {0} sending {1} to {2}'.format( - self.comm_world.Get_rank(), sys.getsizeof(partial_emis), w_rank), message_level=3) - # requests.append(self.comm_world.isend(sys.getsizeof(partial_emis), dest=w_rank, - # tag=self.comm_world.Get_rank() + MPI_TAG_CONSTANT)) - requests.append(self.comm_world.isend(partial_emis, dest=w_rank, tag=self.comm_world.Get_rank())) + requests[w_rank] = self.comm_world.isend(partial_emis, dest=w_rank, tag=self.comm_world.Get_rank()) # Receiving self.logger.write_log('Receiving emissions in the writing processors.', message_level=2) - if self.comm_world.Get_rank() in self.rank_distribution.keys(): - self.logger.write_log("I'm a writing processor.", message_level=3) - data_list = [] - - self.logger.write_log("Prepared to receive", message_level=3) - for i_rank in range(self.comm_world.Get_size()): - self.logger.write_log( - '\tFrom {0} to {1}'.format(i_rank, self.comm_world.Get_rank()), message_level=3) - req = self.comm_world.irecv(2**27, source=i_rank, tag=i_rank) - dataframe = req.wait() - data_list.append(dataframe.reset_index()) + if self.comm_world.Get_rank() not in self.rank_distribution.keys(): + for req in requests.values(): + req.wait() + new_emissions = None + else: + for w_rank in self.rank_distribution.keys(): + if w_rank == self.comm_world.Get_rank(): + for i_rank in range(self.comm_world.Get_size()): + if i_rank != w_rank: + data_list.append(self.comm_world.recv(BUFFER_SIZE, source=i_rank, tag=i_rank)) + else: + requests[w_rank].wait() new_emissions = pd.concat(data_list) new_emissions[['FID', 'layer', 'tstep']] = new_emissions[['FID', 'layer', 'tstep']].astype(np.int32) new_emissions = new_emissions.groupby(['FID', 'layer', 'tstep']).sum() - else: - new_emissions = None self.comm_world.Barrier() self.logger.write_log('All emissions received.', message_level=2)