diff --git a/CHANGELOG b/CHANGELOG index 9c99603d2e1389eeccd8614868098f9d29fdee90..8695d0e0d17e0d671e55a9e5f18ed30fb0cbef56 100755 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,5 @@ -0.1.1 - 2019/10/29 +0.1.2 + 2020/02/28 First beta version: @@ -29,7 +29,8 @@ 12. Solvents sector - Writing options: - 0. Return emissions on memory + A. Return emissions on memory + 1. Default writer 2. CMAQ writer 3. MONARCH writer diff --git a/conf/hermes.conf b/conf/hermes.conf index 364be0d4d728c61fec214a8d511abe11a71fd1ec..70e57f6fbe2f85f2072a0898428d9f08e35790c9 100755 --- a/conf/hermes.conf +++ b/conf/hermes.conf @@ -2,8 +2,8 @@ log_level = 3 input_dir = /home/Earth/ctena/Models/hermesv3_bu_data data_path = /esarchive/recon -output_dir = /scratch/Earth/HERMESv3_BU_OUT/ -output_name = HERMESv3_.nc +output_dir = /scratch/Earth/ctena/HERMESv3_BU_OUT/ +output_name = HERMESv3__p4_w2.nc emission_summary = 0 start_date = 2016/11/29 00:00:00 # ----- end_date = start_date [DEFAULT] ----- @@ -20,6 +20,7 @@ erase_auxiliary_files = 0 domain_type = rotated_nested # output_type=[MONARCH, CMAQ, WRF_CHEM, DEFAULT] output_model = DEFAULT +compression_level = 0 output_attributes = /writing/global_attributes_WRF-Chem.csv vertical_description = /profiles/vertical/MONARCH_Global_48layers_vertical_description.csv #vertical_description = /profiles/vertical/CMAQ_15layers_vertical_description.csv @@ -128,7 +129,7 @@ vertical_description = /profiles/vertical/MONARCH_Global_48layers_ver # SECTORS #################################################################### [SECTOR MANAGEMENT] -writing_processors = 1 +writing_processors = 2 # # aviation_processors = 1 # shipping_port_processors = 1 @@ -140,7 +141,7 @@ writing_processors = 1 # recreational_boats_processors = 4 # point_sources_processors = 16 # traffic_processors = 256 -traffic_area_processors = 1 +traffic_area_processors = 4 [SHAPEFILES] nuts3_shapefile = /shapefiles/nuts3/nuts3.shp diff --git a/hermesv3_bu/config/config.py b/hermesv3_bu/config/config.py index 4def4711b29de6b6c46f0adcb4eebcac09e72da0..f9141b84c8773cedba4d9373abdf59d830009cd3 100755 --- a/hermesv3_bu/config/config.py +++ b/hermesv3_bu/config/config.py @@ -79,6 +79,8 @@ class Config(ArgParser): p.add_argument('--domain_type', required=True, help='Type of domain to simulate.', choices=['lcc', 'rotated', 'mercator', 'regular', 'rotated_nested']) + p.add_argument('--compression_level', required=False, type=int, choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + help='Compression level of the NetCDF output (0 for no compressed output).', default=4) # Rotated options p.add_argument('--centre_lat', required=False, type=float, diff --git a/hermesv3_bu/writer/cmaq_writer.py b/hermesv3_bu/writer/cmaq_writer.py index d609162360e15dbc02e36b29eadef7bb545f609f..8d2f0a63550a174d45ba541dc3b7ab67c8588b73 100755 --- a/hermesv3_bu/writer/cmaq_writer.py +++ b/hermesv3_bu/writer/cmaq_writer.py @@ -14,7 +14,7 @@ from hermesv3_bu.tools.checker import error_exit class CmaqWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, global_attributes_path, emission_summary=False): + rank_distribution, global_attributes_path, compression_level, emission_summary=False): """ Initialise the CMAQ writer that will write a NetCDF in the CMAQ input format (IOAPIv3.2). @@ -65,8 +65,10 @@ class CmaqWriter(Writer): spent_time = timeit.default_timer() logger.write_log('CMAQ writer selected.') - super(CmaqWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary) + super(CmaqWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) + if self.grid.grid_type not in ['Lambert Conformal Conic']: error_exit("Only Lambert Conformal Conic grid is implemented for CMAQ. " + "The current grid type is '{0}'".format(self.grid.grid_type)) @@ -296,11 +298,14 @@ class CmaqWriter(Writer): for var_name in self.pollutant_info.index: self.logger.write_log('\t\tCreating {0} variable'.format(var_name), message_level=3) - if self.comm_write.Get_size() > 1: + if self.compression: + var = netcdf.createVariable(var_name, np.float64, ('TSTEP', 'LAY', 'ROW', 'COL',), + zlib=True, complevel=self.compression_level) + else: var = netcdf.createVariable(var_name, np.float64, ('TSTEP', 'LAY', 'ROW', 'COL',)) + if self.comm_write.Get_size() > 1: var.set_collective(True) - else: - var = netcdf.createVariable(var_name, np.float64, ('TSTEP', 'LAY', 'ROW', 'COL',), zlib=True) + try: var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) diff --git a/hermesv3_bu/writer/default_writer.py b/hermesv3_bu/writer/default_writer.py index 1f2546fac076c3b5c94f298c55120cccc5e2e0a6..2a4df19f937172c598c3632dd4798a9d41a8bbc9 100755 --- a/hermesv3_bu/writer/default_writer.py +++ b/hermesv3_bu/writer/default_writer.py @@ -12,7 +12,7 @@ CHUNK = True class DefaultWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary=False): + rank_distribution, compression_level=4, emission_summary=False): """ Initialise the Default writer that will write a NetCDF CF-1.6 complient. @@ -59,8 +59,9 @@ class DefaultWriter(Writer): """ spent_time = timeit.default_timer() logger.write_log('Default writer selected.') - super(DefaultWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, - pollutant_info, rank_distribution, emission_summary) + super(DefaultWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) self.logger.write_time_log('DefaultWriter', '__init__', timeit.default_timer() - spent_time) @@ -198,16 +199,13 @@ class DefaultWriter(Writer): # emissions.drop(columns=['Unnamed: 0'], inplace=True) for var_name in emissions.columns.values: self.logger.write_log('\t\tCreating {0} variable'.format(var_name), message_level=3) + if self.compression: + var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, + zlib=True, complevel=self.compression_level) + else: + var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim) if self.comm_write.Get_size() > 1: - if CHUNK: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, - chunksizes=self.rank_distribution[0]['shape']) - else: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim) - var.set_collective(True) - else: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, zlib=True) var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) var[:, :, diff --git a/hermesv3_bu/writer/monarch_writer.py b/hermesv3_bu/writer/monarch_writer.py index 435811b2f1ae05a2864df7b7824f9e9b04b27a90..9502352b93891ec081a8fed4fc9fb7463cb9b4be 100755 --- a/hermesv3_bu/writer/monarch_writer.py +++ b/hermesv3_bu/writer/monarch_writer.py @@ -11,7 +11,7 @@ from hermesv3_bu.tools.checker import error_exit class MonarchWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary=False): + rank_distribution, compression_level, emission_summary=False): """ Initialise the MONARCH writer that will write a NetCDF CF-1.6 complient. @@ -59,8 +59,9 @@ class MonarchWriter(Writer): spent_time = timeit.default_timer() logger.write_log('MONARCH writer selected.') - super(MonarchWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, - pollutant_info, rank_distribution, emission_summary) + super(MonarchWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) if self.grid.grid_type not in ['Rotated', 'Rotated_nested']: error_exit("ERROR: Only Rotated or Rotated-nested grid is implemented for MONARCH. " + @@ -196,12 +197,13 @@ class MonarchWriter(Writer): # var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, # chunksizes=self.rank_distribution[0]['shape']) - - if self.comm_write.Get_size() > 1: + if self.compression: + var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, + zlib=True, complevel=self.compression_level) + else: var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim) + if self.comm_write.Get_size() > 1: var.set_collective(True) - else: - var = netcdf.createVariable(var_name, np.float64, ('time', 'lev',) + var_dim, zlib=True) try: var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) diff --git a/hermesv3_bu/writer/wrfchem_writer.py b/hermesv3_bu/writer/wrfchem_writer.py index a1fd289d2ec553b6a6dccaa81b1d5f77e1033fb4..4108fbfdd896185f1612737b89a69fb634bac3ee 100755 --- a/hermesv3_bu/writer/wrfchem_writer.py +++ b/hermesv3_bu/writer/wrfchem_writer.py @@ -14,7 +14,7 @@ from hermesv3_bu.tools.checker import error_exit class WrfChemWriter(Writer): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, global_attributes_path, emission_summary=False): + rank_distribution, global_attributes_path, compression_level, emission_summary=False): """ Initialise the WRF-Chem writer that will write a NetCDF in the CMAQ input format (IOAPIv3.2). @@ -65,8 +65,10 @@ class WrfChemWriter(Writer): spent_time = timeit.default_timer() logger.write_log('WRF-Chem writer selected.') - super(WrfChemWriter, self).__init__(comm_world, comm_write, logger, netcdf_path, grid, date_array, - pollutant_info, rank_distribution, emission_summary) + super(WrfChemWriter, self).__init__( + comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, rank_distribution, + compression_level, emission_summary) + if self.grid.grid_type not in ['Lambert Conformal Conic', 'Mercator']: error_exit("ERROR: Only Lambert Conformal Conic or Mercator grid is implemented for WRF-Chem. " + "The current grid type is '{0}'".format(self.grid.grid_type)) @@ -348,14 +350,15 @@ class WrfChemWriter(Writer): # ========== POLLUTANTS ========== for var_name in emissions.columns.values: self.logger.write_log('\t\tCreating {0} variable'.format(var_name), message_level=3) - - if self.comm_write.Get_size() > 1: + if self.compression: var = netcdf.createVariable(var_name, np.float64, - ('Time', 'emissions_zdim', 'south_north', 'west_east',)) - var.set_collective(True) + ('Time', 'emissions_zdim', 'south_north', 'west_east',), + zlib=True, complevel=self.compression_level) else: var = netcdf.createVariable(var_name, np.float64, - ('Time', 'emissions_zdim', 'south_north', 'west_east',), zlib=True) + ('Time', 'emissions_zdim', 'south_north', 'west_east',)) + if self.comm_write.Get_size() > 1: + var.set_collective(True) var_data = self.dataframe_to_array(emissions.loc[:, [var_name]]) diff --git a/hermesv3_bu/writer/writer.py b/hermesv3_bu/writer/writer.py index f7ca8337f5c5b4cf912731f6d8db9dcf8443b050..55d4f987918d2d8e912582bb77423d7e050a88f5 100755 --- a/hermesv3_bu/writer/writer.py +++ b/hermesv3_bu/writer/writer.py @@ -228,7 +228,7 @@ def get_balanced_distribution(logger, processors, shape): class Writer(object): def __init__(self, comm_world, comm_write, logger, netcdf_path, grid, date_array, pollutant_info, - rank_distribution, emission_summary=False): + rank_distribution, compression_level, emission_summary=False): """ Initialise the Writer class. @@ -281,6 +281,16 @@ class Writer(object): self.pollutant_info = pollutant_info self.rank_distribution = rank_distribution self.emission_summary = emission_summary + if self.comm_write.Get_size() > 1: + self.compression = False + if compression_level > 0: + self.logger.write_log("WARNING: No compression available for parallel write.", message_level=1) + else: + if compression_level > 0: + self.compression = True + self.compression_level = compression_level + else: + self.compression = False if self.emission_summary and self.comm_write.Get_rank() == 0: self.emission_summary_paths = { @@ -309,38 +319,42 @@ class Writer(object): spent_time = timeit.default_timer() # Sending self.logger.write_log('Sending emissions to the writing processors.', message_level=2) - requests = [] + requests = {} + data_list = [] for w_rank, info in self.rank_distribution.items(): partial_emis = emissions.loc[(emissions.index.get_level_values(0) >= info['fid_min']) & (emissions.index.get_level_values(0) < info['fid_max'])] + partial_emis.reset_index(inplace=True) + if w_rank == self.comm_world.Get_rank(): + self.logger.write_log("I'm the writing processor {0}. Saving {1} emissions".format( + w_rank, sys.getsizeof(partial_emis)), message_level=3) + data_list.append(partial_emis) + else: + self.logger.write_log('\tFrom {0} sending {1} to {2}'.format( + self.comm_world.Get_rank(), sys.getsizeof(partial_emis), w_rank), message_level=3) - self.logger.write_log('\tFrom {0} sending {1} to {2}'.format( - self.comm_world.Get_rank(), sys.getsizeof(partial_emis), w_rank), message_level=3) - # requests.append(self.comm_world.isend(sys.getsizeof(partial_emis), dest=w_rank, - # tag=self.comm_world.Get_rank() + MPI_TAG_CONSTANT)) - requests.append(self.comm_world.isend(partial_emis, dest=w_rank, tag=self.comm_world.Get_rank())) + requests[w_rank] = self.comm_world.isend(partial_emis, dest=w_rank, tag=self.comm_world.Get_rank()) # Receiving self.logger.write_log('Receiving emissions in the writing processors.', message_level=2) - if self.comm_world.Get_rank() in self.rank_distribution.keys(): - self.logger.write_log("I'm a writing processor.", message_level=3) - data_list = [] - - self.logger.write_log("Prepared to receive", message_level=3) - for i_rank in range(self.comm_world.Get_size()): - self.logger.write_log( - '\tFrom {0} to {1}'.format(i_rank, self.comm_world.Get_rank()), message_level=3) - req = self.comm_world.irecv(BUFFER_SIZE, source=i_rank, tag=i_rank) - dataframe = req.wait() - data_list.append(dataframe.reset_index()) + if self.comm_world.Get_rank() not in self.rank_distribution.keys(): + for req in requests.values(): + req.wait() + new_emissions = None + else: + for w_rank in self.rank_distribution.keys(): + if w_rank == self.comm_world.Get_rank(): + for i_rank in range(self.comm_world.Get_size()): + if i_rank != w_rank: + data_list.append(self.comm_world.recv(BUFFER_SIZE, source=i_rank, tag=i_rank)) + else: + requests[w_rank].wait() new_emissions = pd.concat(data_list) new_emissions[['FID', 'layer', 'tstep']] = new_emissions[['FID', 'layer', 'tstep']].astype(np.int32) new_emissions = new_emissions.groupby(['FID', 'layer', 'tstep']).sum() - else: - new_emissions = None self.comm_world.Barrier() self.logger.write_log('All emissions received.', message_level=2)