From 01a9af6df1f57d8cacf22e8a3e5db0418efe62da Mon Sep 17 00:00:00 2001 From: Carles Tena Date: Fri, 28 Feb 2020 11:35:01 +0100 Subject: [PATCH 1/3] Working on new MPI patron --- CHANGELOG | 7 +++--- conf/hermes.conf | 9 ++++---- hermesv3_bu/writer/writer.py | 42 ++++++++++++++++++++---------------- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 9c99603..48e9a8d 100755 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,5 @@ -0.1.1 - 2019/10/29 +0.1.2 + 2020/02/28 First beta version: @@ -29,7 +29,8 @@ 12. Solvents sector - Writing options: - 0. Return emissions on memory + A. Return emissions on memory + 1. Default writer 2. CMAQ writer 3. MONARCH writer diff --git a/conf/hermes.conf b/conf/hermes.conf index 364be0d..342e118 100755 --- a/conf/hermes.conf +++ b/conf/hermes.conf @@ -2,8 +2,8 @@ log_level = 3 input_dir = /home/Earth/ctena/Models/hermesv3_bu_data data_path = /esarchive/recon -output_dir = /scratch/Earth/HERMESv3_BU_OUT/ -output_name = HERMESv3_.nc +output_dir = /scratch/Earth/ctena/HERMESv3_BU_OUT/ +output_name = HERMESv3__p4_w2.nc emission_summary = 0 start_date = 2016/11/29 00:00:00 # ----- end_date = start_date [DEFAULT] ----- @@ -20,6 +20,7 @@ erase_auxiliary_files = 0 domain_type = rotated_nested # output_type=[MONARCH, CMAQ, WRF_CHEM, DEFAULT] output_model = DEFAULT +com output_attributes = /writing/global_attributes_WRF-Chem.csv vertical_description = /profiles/vertical/MONARCH_Global_48layers_vertical_description.csv #vertical_description = /profiles/vertical/CMAQ_15layers_vertical_description.csv @@ -128,7 +129,7 @@ vertical_description = /profiles/vertical/MONARCH_Global_48layers_ver # SECTORS #################################################################### [SECTOR MANAGEMENT] -writing_processors = 1 +writing_processors = 2 # # aviation_processors = 1 # shipping_port_processors = 1 @@ -140,7 +141,7 @@ writing_processors = 1 # recreational_boats_processors = 4 # point_sources_processors = 16 # traffic_processors = 256 -traffic_area_processors = 1 +traffic_area_processors = 4 [SHAPEFILES] nuts3_shapefile = /shapefiles/nuts3/nuts3.shp diff --git a/hermesv3_bu/writer/writer.py b/hermesv3_bu/writer/writer.py index f7ca833..8a286c5 100755 --- a/hermesv3_bu/writer/writer.py +++ b/hermesv3_bu/writer/writer.py @@ -309,38 +309,42 @@ class Writer(object): spent_time = timeit.default_timer() # Sending self.logger.write_log('Sending emissions to the writing processors.', message_level=2) - requests = [] + requests = {} + data_list = [] for w_rank, info in self.rank_distribution.items(): partial_emis = emissions.loc[(emissions.index.get_level_values(0) >= info['fid_min']) & (emissions.index.get_level_values(0) < info['fid_max'])] + partial_emis.reset_index(inplace=True) + if w_rank == self.comm_world.Get_rank(): + self.logger.write_log("I'm the writing processor {0}. Saving {1} emissions".format( + w_rank, sys.getsizeof(partial_emis)), message_level=3) + data_list.append(partial_emis) + else: + self.logger.write_log('\tFrom {0} sending {1} to {2}'.format( + self.comm_world.Get_rank(), sys.getsizeof(partial_emis), w_rank), message_level=3) - self.logger.write_log('\tFrom {0} sending {1} to {2}'.format( - self.comm_world.Get_rank(), sys.getsizeof(partial_emis), w_rank), message_level=3) - # requests.append(self.comm_world.isend(sys.getsizeof(partial_emis), dest=w_rank, - # tag=self.comm_world.Get_rank() + MPI_TAG_CONSTANT)) - requests.append(self.comm_world.isend(partial_emis, dest=w_rank, tag=self.comm_world.Get_rank())) + requests[w_rank] = self.comm_world.isend(partial_emis, dest=w_rank, tag=self.comm_world.Get_rank()) # Receiving self.logger.write_log('Receiving emissions in the writing processors.', message_level=2) - if self.comm_world.Get_rank() in self.rank_distribution.keys(): - self.logger.write_log("I'm a writing processor.", message_level=3) - data_list = [] - - self.logger.write_log("Prepared to receive", message_level=3) - for i_rank in range(self.comm_world.Get_size()): - self.logger.write_log( - '\tFrom {0} to {1}'.format(i_rank, self.comm_world.Get_rank()), message_level=3) - req = self.comm_world.irecv(BUFFER_SIZE, source=i_rank, tag=i_rank) - dataframe = req.wait() - data_list.append(dataframe.reset_index()) + if self.comm_world.Get_rank() not in self.rank_distribution.keys(): + for req in requests.values(): + req.wait() + new_emissions = None + else: + for w_rank in self.rank_distribution.keys(): + if w_rank == self.comm_world.Get_rank(): + for i_rank in range(self.comm_world.Get_size()): + if i_rank != w_rank: + data_list.append(self.comm_world.recv(BUFFER_SIZE, source=i_rank, tag=i_rank)) + else: + requests[w_rank].wait() new_emissions = pd.concat(data_list) new_emissions[['FID', 'layer', 'tstep']] = new_emissions[['FID', 'layer', 'tstep']].astype(np.int32) new_emissions = new_emissions.groupby(['FID', 'layer', 'tstep']).sum() - else: - new_emissions = None self.comm_world.Barrier() self.logger.write_log('All emissions received.', message_level=2) -- GitLab From f926ceae210e67fa162ab85b92e49d8456482bcc Mon Sep 17 00:00:00 2001 From: Carles Tena Date: Fri, 28 Feb 2020 12:22:45 +0100 Subject: [PATCH 2/3] Added netCDF compression --- CHANGELOG | 2 +- conf/hermes.conf | 2 +- hermesv3_bu/config/config.py | 2 ++ hermesv3_bu/writer/cmaq_writer.py | 17 +++++++++++------ hermesv3_bu/writer/default_writer.py | 20 +++++++++----------- hermesv3_bu/writer/monarch_writer.py | 16 +++++++++------- hermesv3_bu/writer/wrfchem_writer.py | 19 +++++++++++-------- hermesv3_bu/writer/writer.py | 10 +++++++++- 8 files changed, 53 insertions(+), 35 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 48e9a8d..8695d0e 100755 --- a/CHANGELOG +++ b/CHANGELOG @@ -30,7 +30,7 @@ - Writing options: A. Return emissions on memory - + 1. Default writer 2. CMAQ writer 3. MONARCH writer diff --git a/conf/hermes.conf b/conf/hermes.conf index 342e118..70e57f6 100755 --- a/conf/hermes.conf +++ b/conf/hermes.conf @@ -20,7 +20,7 @@ erase_auxiliary_files = 0 domain_type = rotated_nested # output_type=[MONARCH, CMAQ, WRF_CHEM, DEFAULT] output_model = DEFAULT -com +compression_level = 0 output_attributes = /writing/global_attributes_WRF-Chem.csv vertical_description = /profiles/vertical/MONARCH_Global_48layers_vertical_description.csv #vertical_description = /profiles/vertical/CMAQ_15layers_vertical_description.csv diff --git a/hermesv3_bu/config/config.py b/hermesv3_bu/config/config.py index 4def471..f9141b8 100755 --- a/hermesv3_bu/config/config.py +++ b/hermesv3_bu/config/config.py @@ -79,6 +79,8 @@ class Config(ArgParser): p.add_argument('--domain_type', required=True, help='Type of domain to simulate.', choices=['lcc', 'rotated', 'mercator', 'regular', 'rotated_nested']) + p.add_argument('--compression_level', required=False, type=int, choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + help='Compression level of the NetCDF output (0 for no compressed output).', default=4) # Rotated options p.add_argument('--centre_lat', required=False, type=float, diff --git a/hermesv3_bu/writer/cmaq_writer.py b/hermesv3_bu/writer/cmaq_writer.py index d609162..8d2f0a6 100755 --- a/hermesv3_bu/writer/cmaq_writer.py +++ b/hermesv3_bu/writer/cmaq_writer.py @@ -14,7 +14,7 @@ from hermesv3_bu.tools.checker import error_exit class CmaqWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, global_attributes_path, emission_summary=False): + rank_distribution, global_attributes_path, compression_level, emission_summary=False): """ Initialise the CMAQ writer that will write a NetCDF in the CMAQ input format (IOAPIv3.2). @@ -65,8 +65,10 @@ class CmaqWriter(Writer): spent_time = timeit.default_timer() logger.write_log('CMAQ writer selected.') - super(CmaqWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary) + super(CmaqWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) + if self.grid.grid_type not in ['Lambert Conformal Conic']: error_exit("Only Lambert Conformal Conic grid is implemented for CMAQ. " + "The current grid type is '{0}'".format(self.grid.grid_type)) @@ -296,11 +298,14 @@ class CmaqWriter(Writer): for var_name in self.pollutant_info.index: self.logger.write_log('\t\tCreating {0} variable'.format(var_name), message_level=3) - if self.comm_write.Get_size() > 1: + if self.compression: + var = netcdf.createVariable(var_name, np.float64, ('TSTEP', 'LAY', 'ROW', 'COL',), + zlib=True, complevel=self.compression_level) + else: var = netcdf.createVariable(var_name, np.float64, ('TSTEP', 'LAY', 'ROW', 'COL',)) + if self.comm_write.Get_size() > 1: var.set_collective(True) - else: - var = netcdf.createVariable(var_name, np.float64, ('TSTEP', 'LAY', 'ROW', 'COL',), zlib=True) + try: var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) diff --git a/hermesv3_bu/writer/default_writer.py b/hermesv3_bu/writer/default_writer.py index 1f2546f..2a4df19 100755 --- a/hermesv3_bu/writer/default_writer.py +++ b/hermesv3_bu/writer/default_writer.py @@ -12,7 +12,7 @@ CHUNK = True class DefaultWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary=False): + rank_distribution, compression_level=4, emission_summary=False): """ Initialise the Default writer that will write a NetCDF CF-1.6 complient. @@ -59,8 +59,9 @@ class DefaultWriter(Writer): """ spent_time = timeit.default_timer() logger.write_log('Default writer selected.') - super(DefaultWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, - pollutant_info, rank_distribution, emission_summary) + super(DefaultWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) self.logger.write_time_log('DefaultWriter', '__init__', timeit.default_timer() - spent_time) @@ -198,16 +199,13 @@ class DefaultWriter(Writer): # emissions.drop(columns=['Unnamed: 0'], inplace=True) for var_name in emissions.columns.values: self.logger.write_log('\t\tCreating {0} variable'.format(var_name), message_level=3) + if self.compression: + var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, + zlib=True, complevel=self.compression_level) + else: + var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim) if self.comm_write.Get_size() > 1: - if CHUNK: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, - chunksizes=self.rank_distribution[0]['shape']) - else: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim) - var.set_collective(True) - else: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, zlib=True) var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) var[:, :, diff --git a/hermesv3_bu/writer/monarch_writer.py b/hermesv3_bu/writer/monarch_writer.py index 435811b..9502352 100755 --- a/hermesv3_bu/writer/monarch_writer.py +++ b/hermesv3_bu/writer/monarch_writer.py @@ -11,7 +11,7 @@ from hermesv3_bu.tools.checker import error_exit class MonarchWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary=False): + rank_distribution, compression_level, emission_summary=False): """ Initialise the MONARCH writer that will write a NetCDF CF-1.6 complient. @@ -59,8 +59,9 @@ class MonarchWriter(Writer): spent_time = timeit.default_timer() logger.write_log('MONARCH writer selected.') - super(MonarchWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, - pollutant_info, rank_distribution, emission_summary) + super(MonarchWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) if self.grid.grid_type not in ['Rotated', 'Rotated_nested']: error_exit("ERROR: Only Rotated or Rotated-nested grid is implemented for MONARCH. " + @@ -196,12 +197,13 @@ class MonarchWriter(Writer): # var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, # chunksizes=self.rank_distribution[0]['shape']) - - if self.comm_write.Get_size() > 1: + if self.compression: + var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, + zlib=True, complevel=self.compression_level) + else: var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim) + if self.comm_write.Get_size() > 1: var.set_collective(True) - else: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, zlib=True) try: var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) diff --git a/hermesv3_bu/writer/wrfchem_writer.py b/hermesv3_bu/writer/wrfchem_writer.py index a1fd289..4108fbf 100755 --- a/hermesv3_bu/writer/wrfchem_writer.py +++ b/hermesv3_bu/writer/wrfchem_writer.py @@ -14,7 +14,7 @@ from hermesv3_bu.tools.checker import error_exit class WrfChemWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, global_attributes_path, emission_summary=False): + rank_distribution, global_attributes_path, compression_level, emission_summary=False): """ Initialise the WRF-Chem writer that will write a NetCDF in the CMAQ input format (IOAPIv3.2). @@ -65,8 +65,10 @@ class WrfChemWriter(Writer): spent_time = timeit.default_timer() logger.write_log('WRF-Chem writer selected.') - super(WrfChemWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, - pollutant_info, rank_distribution, emission_summary) + super(WrfChemWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) + if self.grid.grid_type not in ['Lambert Conformal Conic', 'Mercator']: error_exit("ERROR: Only Lambert Conformal Conic or Mercator grid is implemented for WRF-Chem. " + "The current grid type is '{0}'".format(self.grid.grid_type)) @@ -348,14 +350,15 @@ class WrfChemWriter(Writer): # ========== POLLUTANTS ========== for var_name in emissions.columns.values: self.logger.write_log('\t\tCreating {0} variable'.format(var_name), message_level=3) - - if self.comm_write.Get_size() > 1: + if self.compression: var = netcdf.createVariable(var_name, np.float64, - ('Time', 'emissions_zdim', 'south_north', 'west_east',)) - var.set_collective(True) + ('Time', 'emissions_zdim', 'south_north', 'west_east',), + zlib=True, complevel=self.compression_level) else: var = netcdf.createVariable(var_name, np.float64, - ('Time', 'emissions_zdim', 'south_north', 'west_east',), zlib=True) + ('Time', 'emissions_zdim', 'south_north', 'west_east',)) + if self.comm_write.Get_size() > 1: + var.set_collective(True) var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) diff --git a/hermesv3_bu/writer/writer.py b/hermesv3_bu/writer/writer.py index 8a286c5..4c05b2a 100755 --- a/hermesv3_bu/writer/writer.py +++ b/hermesv3_bu/writer/writer.py @@ -228,7 +228,7 @@ def get_balanced_distribution(logger, processors, shape): class Writer(object): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary=False): + rank_distribution, compression_level, emission_summary=False): """ Initialise the Writer class. @@ -281,6 +281,14 @@ class Writer(object): self.pollutant_info = pollutant_info self.rank_distribution = rank_distribution self.emission_summary = emission_summary + if self.comm_write.Get_size() > 1: + self.compression = False + if compression_level > 0: + self.logger.write_log("WARNING: No compression available for parallel write.", message_level=1) + else: + if compression_level > 0: + self.compression = True + self.compression_level = compression_level if self.emission_summary and self.comm_write.Get_rank() == 0: self.emission_summary_paths = { -- GitLab From 569f74f184583470d233d35c01a68894d334ffc1 Mon Sep 17 00:00:00 2001 From: Carles Tena Date: Fri, 28 Feb 2020 12:37:31 +0100 Subject: [PATCH 3/3] Added netCDF compression (bug corrected) --- hermesv3_bu/writer/writer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hermesv3_bu/writer/writer.py b/hermesv3_bu/writer/writer.py index 4c05b2a..55d4f98 100755 --- a/hermesv3_bu/writer/writer.py +++ b/hermesv3_bu/writer/writer.py @@ -289,6 +289,8 @@ class Writer(object): if compression_level > 0: self.compression = True self.compression_level = compression_level + else: + self.compression = False if self.emission_summary and self.comm_write.Get_rank() == 0: self.emission_summary_paths = { -- GitLab