diff --git a/CHANGELOG b/CHANGELOG index 54bbc2c0bb58d9c0074d1a3855c37e7c402696cd..b309af697c23282df56ba55e2bc8ae53dc3c91f7 100755 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,7 @@ 2.0.4 XXXX/XX/XX - Rotated nested grid + - GFAS hourly working 2.0.3 2020/02/07 diff --git a/data/profiles/speciation/MolecularWeights.csv b/data/profiles/speciation/MolecularWeights.csv index 1a43266a6dd480fc13f34f54e3efa018e8dceacb..42c3ab551c3014dc37f1ae23ee58927e48caad85 100755 --- a/data/profiles/speciation/MolecularWeights.csv +++ b/data/profiles/speciation/MolecularWeights.csv @@ -66,4 +66,5 @@ voc22,68.8 voc23,75.3 voc24,59.1 voc25,86.9 -nmvoc,1.0 \ No newline at end of file +nmvoc,1.0 +tpm,1.0 \ No newline at end of file diff --git a/hermesv3_gr/modules/emision_inventories/emission_inventory.py b/hermesv3_gr/modules/emision_inventories/emission_inventory.py index 167042479a4a6f79db5b1c727b89eec59dfd8e35..4f8e440dc6664d9fbd681ad566959fc058a1ec9d 100755 --- a/hermesv3_gr/modules/emision_inventories/emission_inventory.py +++ b/hermesv3_gr/modules/emision_inventories/emission_inventory.py @@ -91,6 +91,7 @@ class EmissionInventory(object): self.inputs_path = inputs_path self.input_frequency = input_frequency self.grid = grid + self.auxiliary_files_path = options.auxiliary_files_path # Profiles p_vertical = self.get_profile(p_vertical) @@ -171,7 +172,7 @@ class EmissionInventory(object): for pollutant_name in pollutants: pollutant_list.append( {'name': pollutant_name, - 'path': self.get_input_path(pollutant_name), + 'path': self.get_input_path(pollutant=pollutant_name), 'Dataset': "{0}_{1}".format(self.inventory_name, self.sector)} ) return pollutant_list @@ -265,7 +266,7 @@ class EmissionInventory(object): settings.write_time('EmissionInventory', 'do_regrid', timeit.default_timer() - st_time, level=2) @staticmethod - def make_emission_list(options, grid, vertical_output_profile, date): + def make_emission_list(options, grid, vertical_levels, date): """ Extract the information of the cross table to read all the needed emissions. @@ -275,8 +276,8 @@ class EmissionInventory(object): :param grid: Grid to use. :type grid: Grid - :param vertical_output_profile: Path to eht file that contains the vertical profile. - :type vertical_output_profile: str + :param vertical_levels: Verical levels + :type vertical_levels: list :param date: Date to simulate. :type date: datetime.datetime @@ -289,6 +290,7 @@ class EmissionInventory(object): from .point_gfas_emission_inventory import PointGfasEmissionInventory from .gfas_emission_inventory import GfasEmissionInventory from .point_source_emission_inventory import PointSourceEmissionInventory + from .point_gfas_hourly_emission_inventory import PointGfasHourlyEmissionInventory st_time = timeit.default_timer() settings.write_log('Loading emissions') @@ -348,7 +350,7 @@ class EmissionInventory(object): GfasEmissionInventory(options, grid, date, emission_inventory.ei, emission_inventory.source_type, emission_inventory.sector, pollutants, emission_inventory_path, - emission_inventory.frequency, vertical_output_profile, + emission_inventory.frequency, vertical_levels, reference_year=emission_inventory.ref_year, factors=emission_inventory.factor_mask, regrid_mask=emission_inventory.regrid_mask, @@ -363,7 +365,7 @@ class EmissionInventory(object): EmissionInventory(options, grid, date, emission_inventory.ei, emission_inventory.source_type, emission_inventory.sector, pollutants, emission_inventory_path, - emission_inventory.frequency, vertical_output_profile, + emission_inventory.frequency, vertical_levels, reference_year=emission_inventory.ref_year, factors=emission_inventory.factor_mask, regrid_mask=emission_inventory.regrid_mask, @@ -375,21 +377,33 @@ class EmissionInventory(object): p_speciation=emission_inventory.p_speciation)) elif emission_inventory.source_type == 'point': if emission_inventory.ei == 'GFASv12': - emission_inventory_list.append( - PointGfasEmissionInventory( - options, grid, date, emission_inventory.ei, emission_inventory.source_type, - emission_inventory.sector, pollutants, emission_inventory_path, - emission_inventory.frequency, vertical_output_profile, - reference_year=emission_inventory.ref_year, factors=emission_inventory.factor_mask, - regrid_mask=emission_inventory.regrid_mask, p_vertical=emission_inventory.p_vertical, - p_month=p_month, p_week=p_week, p_day=p_day, p_hour=p_hour, - p_speciation=emission_inventory.p_speciation)) + if emission_inventory.frequency == 'daily': + emission_inventory_list.append( + PointGfasEmissionInventory( + options, grid, date, emission_inventory.ei, emission_inventory.source_type, + emission_inventory.sector, pollutants, emission_inventory_path, + emission_inventory.frequency, vertical_levels, + reference_year=emission_inventory.ref_year, factors=emission_inventory.factor_mask, + regrid_mask=emission_inventory.regrid_mask, p_vertical=emission_inventory.p_vertical, + p_month=p_month, p_week=p_week, p_day=p_day, p_hour=p_hour, + p_speciation=emission_inventory.p_speciation)) + elif emission_inventory.frequency == 'hourly': + emission_inventory_list.append( + PointGfasHourlyEmissionInventory( + options, grid, date, emission_inventory.ei, emission_inventory.source_type, + emission_inventory.sector, pollutants, emission_inventory_path, + emission_inventory.frequency, vertical_levels, + reference_year=emission_inventory.ref_year, factors=emission_inventory.factor_mask, + regrid_mask=emission_inventory.regrid_mask, p_vertical=emission_inventory.p_vertical, + p_speciation=emission_inventory.p_speciation)) + else: + raise NotImplementedError("ERROR: {0} frequency are not implemented, use 'daily' or 'hourly.") else: emission_inventory_list.append( PointSourceEmissionInventory(options, grid, date, emission_inventory.ei, emission_inventory.source_type, emission_inventory.sector, pollutants, emission_inventory_path, - emission_inventory.frequency, vertical_output_profile, + emission_inventory.frequency, vertical_levels, reference_year=emission_inventory.ref_year, factors=emission_inventory.factor_mask, regrid_mask=emission_inventory.regrid_mask, diff --git a/hermesv3_gr/modules/emision_inventories/point_gfas_emission_inventory.py b/hermesv3_gr/modules/emision_inventories/point_gfas_emission_inventory.py index 2cc879f895192dbe96b663798bca363ea4446fbb..102f7d8923054f866ad35edeb0103ecac62a6327 100755 --- a/hermesv3_gr/modules/emision_inventories/point_gfas_emission_inventory.py +++ b/hermesv3_gr/modules/emision_inventories/point_gfas_emission_inventory.py @@ -203,26 +203,26 @@ class PointGfasEmissionInventory(EmissionInventory): return return_value - def do_vertical_allocation(self, values): - """ - Allocates the fire emissions on their top level. - - :param values: 2D array with the fire emissions - :type values: numpy.array - - :return: Emissions already allocated on the top altitude of each fire. - :rtype: numpy.array - """ - print('do_vertical_allocation') - sys.exit() - st_time = timeit.default_timer() - - return_value = self.vertical.do_vertical_interpolation_allocation(values, self.altitude) - - settings.write_time('PointGfasEmissionInventory', 'do_vertical_allocation', timeit.default_timer() - st_time, - level=3) - - return return_value + # def do_vertical_allocation(self, values): + # """ + # Allocates the fire emissions on their top level. + # + # :param values: 2D array with the fire emissions + # :type values: numpy.array + # + # :return: Emissions already allocated on the top altitude of each fire. + # :rtype: numpy.array + # """ + # print('do_vertical_allocation') + # sys.exit() + # st_time = timeit.default_timer() + # + # return_value = self.vertical.do_vertical_interpolation_allocation(values, self.altitude) + # + # settings.write_time('PointGfasEmissionInventory', 'do_vertical_allocation', timeit.default_timer() - st_time, + # level=3) + # + # return return_value def do_regrid(self): import pandas as pd @@ -294,71 +294,71 @@ class PointGfasEmissionInventory(EmissionInventory): return True - def do_regrid_balanced_NOT_USED(self): - import pandas as pd - import geopandas as gpd - from shapely.geometry import Point - import numpy as np - from netCDF4 import Dataset - - st_time = timeit.default_timer() - settings.write_log("\tAllocating GFAS as point sources on grid:", level=2) - - netcdf = Dataset(self.pollutant_dicts[0]['path'], mode='r') - if settings.rank == 0: - gdf = gpd.GeoDataFrame(crs={'init': 'epsg:4326'}) - - first_var = netcdf.variables[self.input_pollutants[0]][:] - - gdf['src_index'] = np.where(first_var.flatten() > 0)[0] - - gdf['altitude'] = self.vertical.altitude.flatten()[gdf['src_index']] - - lat_1d = netcdf.variables['lat'][:] - lon_1d = netcdf.variables['lon'][:] - lon_1d[lon_1d > 180] -= 360 - # 1D to 2D - lats = np.array([lat_1d] * len(lon_1d)).T - lons = np.array([lon_1d] * len(lat_1d)) - - gdf['geometry'] = [Point(xy) for xy in zip(lons.flatten()[gdf['src_index']], - lats.flatten()[gdf['src_index']])] - gdf['src_area'] = netcdf.variables['cell_area'][:].flatten()[gdf['src_index']] - grid_shp = self.grid.to_shapefile() - - temp_coords = Dataset(os.path.join(self.grid.temporal_path, 'temporal_coords.nc'), mode='r') - cell_area = temp_coords.variables['cell_area'][:].flatten() - grid_shp['dst_area'] = cell_area[grid_shp['FID']] - # grid_shp['dst_area'] = grid_shp.to_crs({'init': 'epsg:3857'}).area - # print grid_shp.crs['units'] - # sys.exit() - - gdf = gpd.sjoin(gdf.to_crs(grid_shp.crs), grid_shp, how='inner') - print(gdf) - gdf = np.array_split(gdf, settings.size) - else: - gdf = None - - gdf = settings.comm.scatter(gdf, root=0) - - for num, pollutant in enumerate(self.pollutant_dicts): - settings.write_log('\t\tPollutant {0} ({1}/{2})'.format( - pollutant['name'], num + 1, len(self.pollutant_dicts)), level=3) - print(('\t\tPollutant {0} ({1}/{2})'.format(pollutant['name'], num + 1, len(self.pollutant_dicts)))) - aux = netcdf.variables[pollutant['name']][:].flatten()[gdf['src_index']] - - gdf[pollutant['name']] = (aux / gdf['dst_area'].values) * netcdf.variables['cell_area'][:].flatten()[ - gdf['src_index']] - # print netcdf.variables['bc'][:].sum() - - netcdf.close() - - settings.write_time('PointGfasEmissionInventory', 'do_regrid', timeit.default_timer() - st_time, level=2) - print('regrid done') - del gdf['src_index'], gdf['index_right'] - self.emissions = gdf - - return True + # def do_regrid_balanced_NOT_USED(self): + # import pandas as pd + # import geopandas as gpd + # from shapely.geometry import Point + # import numpy as np + # from netCDF4 import Dataset + # + # st_time = timeit.default_timer() + # settings.write_log("\tAllocating GFAS as point sources on grid:", level=2) + # + # netcdf = Dataset(self.pollutant_dicts[0]['path'], mode='r') + # if settings.rank == 0: + # gdf = gpd.GeoDataFrame(crs={'init': 'epsg:4326'}) + # + # first_var = netcdf.variables[self.input_pollutants[0]][:] + # + # gdf['src_index'] = np.where(first_var.flatten() > 0)[0] + # + # gdf['altitude'] = self.vertical.altitude.flatten()[gdf['src_index']] + # + # lat_1d = netcdf.variables['lat'][:] + # lon_1d = netcdf.variables['lon'][:] + # lon_1d[lon_1d > 180] -= 360 + # # 1D to 2D + # lats = np.array([lat_1d] * len(lon_1d)).T + # lons = np.array([lon_1d] * len(lat_1d)) + # + # gdf['geometry'] = [Point(xy) for xy in zip(lons.flatten()[gdf['src_index']], + # lats.flatten()[gdf['src_index']])] + # gdf['src_area'] = netcdf.variables['cell_area'][:].flatten()[gdf['src_index']] + # grid_shp = self.grid.to_shapefile() + # + # temp_coords = Dataset(os.path.join(self.grid.temporal_path, 'temporal_coords.nc'), mode='r') + # cell_area = temp_coords.variables['cell_area'][:].flatten() + # grid_shp['dst_area'] = cell_area[grid_shp['FID']] + # # grid_shp['dst_area'] = grid_shp.to_crs({'init': 'epsg:3857'}).area + # # print grid_shp.crs['units'] + # # sys.exit() + # + # gdf = gpd.sjoin(gdf.to_crs(grid_shp.crs), grid_shp, how='inner') + # print(gdf) + # gdf = np.array_split(gdf, settings.size) + # else: + # gdf = None + # + # gdf = settings.comm.scatter(gdf, root=0) + # + # for num, pollutant in enumerate(self.pollutant_dicts): + # settings.write_log('\t\tPollutant {0} ({1}/{2})'.format( + # pollutant['name'], num + 1, len(self.pollutant_dicts)), level=3) + # print(('\t\tPollutant {0} ({1}/{2})'.format(pollutant['name'], num + 1, len(self.pollutant_dicts)))) + # aux = netcdf.variables[pollutant['name']][:].flatten()[gdf['src_index']] + # + # gdf[pollutant['name']] = (aux / gdf['dst_area'].values) * netcdf.variables['cell_area'][:].flatten()[ + # gdf['src_index']] + # # print netcdf.variables['bc'][:].sum() + # + # netcdf.close() + # + # settings.write_time('PointGfasEmissionInventory', 'do_regrid', timeit.default_timer() - st_time, level=2) + # print('regrid done') + # del gdf['src_index'], gdf['index_right'] + # self.emissions = gdf + # + # return True def calculate_altitudes(self, vertical_description_path): """ diff --git a/hermesv3_gr/modules/emision_inventories/point_gfas_hourly_emission_inventory.py b/hermesv3_gr/modules/emision_inventories/point_gfas_hourly_emission_inventory.py new file mode 100755 index 0000000000000000000000000000000000000000..b9ee204b74f4fefc21d20b1849a040cdbc748347 --- /dev/null +++ b/hermesv3_gr/modules/emision_inventories/point_gfas_hourly_emission_inventory.py @@ -0,0 +1,463 @@ +#!/usr/bin/env python + +# Copyright 2018 Earth Sciences Department, BSC-CNS +# +# This file is part of HERMESv3_GR. +# +# HERMESv3_GR is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# HERMESv3_GR is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with HERMESv3_GR. If not, see . + +import sys +import os +import timeit +import warnings +from datetime import datetime +import pandas as pd +import hermesv3_gr.config.settings as settings +from .emission_inventory import EmissionInventory +from hermesv3_gr.modules.temporal.temporal import TemporalDistribution +from hermesv3_gr.modules.vertical.vertical_gfas_hourly import GfasHourlyVerticalDistribution +from hermesv3_gr.tools.mpi_tools import * +# import mpi4py +# mpi4py.rc.recv_mprobe = False + +BUFFER_SIZE = 2**25 + + +class PointGfasHourlyEmissionInventory(EmissionInventory): + """ + Class that defines the content and the methodology for the GFAS hourly emission inventories + + :param current_date: Date to simulate. + :type current_date: datetime.datetime + + :param inventory_name: Name of the inventory to use. + :type inventory_name: str + + :param sector: Name of the sector of the inventory to use. + :type sector: str + + :param pollutants: List of the pollutant name to take into account. + :type pollutants: list of str + + :param frequency: Frequency of the inputs. [yearly, monthly, daily] + :type frequency: str + + :param reference_year: year of reference of the information of the dataset. + :type reference_year: int + + :param factors: NOT IMPLEMENTED YET + :type factors: NOT IMPLEMENTED YET + + :param regrid_mask: NOT IMPLEMENTED YET + :type regrid_mask: NOT IMPLEMENTED YET + + :param p_vertical: ID of the vertical profile to use. + :type p_vertical: str + + :param p_speciation: ID of the speciation profile to use. + :type p_speciation: str + """ + + def __init__(self, options, grid, current_date, inventory_name, source_type, sector, pollutants, inputs_path, + frequency, vertical_output_profile, + reference_year=2010, factors=None, regrid_mask=None, p_vertical=None, p_speciation=None): + + st_time = timeit.default_timer() + settings.write_log('\t\tCreating GFAS emission inventory as point source.', level=3) + super(PointGfasHourlyEmissionInventory, self).__init__( + options, grid, current_date, inventory_name, source_type, sector, pollutants, inputs_path, frequency, + vertical_output_profile, + reference_year=reference_year, factors=factors, regrid_mask=regrid_mask, p_vertical=None, + p_month=None, p_week=None, p_day=None, p_hour=None, p_speciation=p_speciation) + + self.method = self.get_method(p_vertical) + self.altitude_name = self.get_altitude_name() + + self.vertical = GfasHourlyVerticalDistribution(vertical_output_profile, self.get_approach(p_vertical)) + + self.date_array = TemporalDistribution.caclulate_date_array( + options.start_date, options.output_timestep_type, options.output_timestep_num, options.output_timestep_freq) + + self.grid_shp = self.grid.to_shapefile(full_grid=False) + self.cell_id_by_proc = self.get_cell_ids() + self.coordinates = self.get_coordinates() + self.fid_distribution = self.get_fids() + + self.temporal = None + + settings.write_time('PointGfasHourlyEmissionInventory', 'Init', timeit.default_timer() - st_time, level=3) + + def __str__(self): + string = "PointGfasHourlyEmissionInventory:\n" + string += "\t self.method = {0}\n".format(self.method) + string += "\t self.vertical = {0}\n".format(self.vertical) + + return string + + @ staticmethod + def get_approach(p_vertical): + """ + Extract the given approach value. + + :return: Approach value + :rtype: str + """ + import re + + st_time = timeit.default_timer() + + return_value = None + aux_list = re.split(', |,| , | ,', p_vertical) + for element in aux_list: + aux_value = re.split('=| =|= | = ', element) + if aux_value[0] == 'approach': + return_value = aux_value[1] + + settings.write_time('PointGfasHourlyEmissionInventory', 'get_approach', timeit.default_timer() - st_time, + level=3) + + return return_value + + def get_coordinates(self): + from netCDF4 import Dataset + import geopandas as gpd + import pandas as pd + from shapely import wkt + from hermesv3_gr.tools.netcdf_tools import get_grid_area + + # Getting 2D coordinates + coordinates_path = os.path.join( + self.auxiliary_files_path, 'gfas', 'coordinates_s{0}_n{1}.csv'.format(settings.size, settings.rank)) + + if not os.path.exists(coordinates_path): + if settings.rank == 0: + if not os.path.exists(os.path.dirname(coordinates_path)): + os.makedirs(os.path.dirname(coordinates_path)) + + coordinates_shapefile = os.path.join( + self.auxiliary_files_path, 'gfas', 'coordinates.shp'.format(settings.size, settings.rank)) + if not os.path.exists(coordinates_shapefile): + src_area = get_grid_area(self.get_input_path(date=self.date_array[0])) + netcdf = Dataset(self.get_input_path(date=self.date_array[0]), mode='r') + lats = netcdf.variables['latitude'][:] + lons = netcdf.variables['longitude'][:] + netcdf.close() + + lons[lons > 180] = lons[lons > 180] - 360.0 + + len_lats = len(lats) + len_lons = len(lons) + + lats = np.array([lats] * len_lons).T.flatten() + lons = np.array([lons] * len_lats).flatten() + + coordinates = gpd.GeoDataFrame(lats, columns=['lats'], crs={'init': 'epsg:4326'}) + coordinates['lons'] = lons + coordinates.index.name = 'FID' + + coordinates['geometry'] = \ + 'POINT (' + coordinates['lons'].astype(str) + ' ' + coordinates['lats'].astype(str) + ')' + coordinates.drop(columns=['lats', 'lons'], inplace=True) + coordinates['src_area'] = src_area.flatten() + coordinates['geometry'] = coordinates['geometry'].apply(wkt.loads) + coordinates.reset_index(inplace=True) + coordinates.to_file(coordinates_shapefile) + else: + coordinates = gpd.read_file(coordinates_shapefile) + coordinates = gpd.sjoin(coordinates, + self.grid.to_shapefile(full_grid=True).to_crs(coordinates.crs), + how='inner', op='within') + + coordinates.rename(columns={'FID_left': 'FID'}, inplace=True) + + coordinates = coordinates[['FID', 'src_area', 'Cell_ID']] + for proc in range(settings.size): + if proc > 0: + settings.comm.send(coordinates[coordinates['Cell_ID'].isin(self.cell_id_by_proc[proc])], + dest=proc) + coordinates = coordinates[coordinates['Cell_ID'].isin(self.cell_id_by_proc[0])] + + else: + coordinates = settings.comm.recv(source=0) + + # coordinates = coordinates[['FID', 'Cell_ID']] + temp_coords = Dataset(os.path.join(self.grid.temporal_path, 'temporal_coords.nc'), mode='r') + + cell_area = pd.DataFrame(temp_coords.variables['cell_area'][:].flatten(), columns=['dst_area']) + temp_coords.close() + cell_area['Cell_ID'] = cell_area.index + + coordinates = pd.merge(coordinates, cell_area, on='Cell_ID') + # print(coordinates) + # exit() + coordinates.to_csv(coordinates_path) + else: + coordinates = pd.read_csv(coordinates_path) + coordinates = coordinates[['FID', 'src_area', 'Cell_ID', 'dst_area']] + coordinates.set_index('FID', inplace=True) + + return coordinates + + def get_input_path(self, date=None, pollutant=None, extension='nc'): + """ + Completes the path of the NetCDF that contains the needed information of the given pollutant. + + :param date: Date of the NetCDF. + :type date: datetime + + :param pollutant: Name of the pollutant of the NetCDF. + :type pollutant: str + + :param extension: Extension of the input file. + :type: str + + :return: Full path of the needed NetCDF. + :rtype: str + """ + st_time = timeit.default_timer() + + if date is None: + date = self.date + + # TODO to change path + file_path = os.path.join(self.inputs_path, 'multivar', 'ftp', + '{0}'.format(date.strftime('%Y%m%d')), + 'ga_{0}.{1}'.format(date.strftime('%Y%m%d_%H'), extension)) + + # Checking input file + if not os.path.exists(file_path): + settings.write_log('ERROR: Check the .err file to get more info.') + if settings.rank == 0: + raise IOError('ERROR: File {0} not found.'.format(file_path)) + sys.exit(1) + + settings.write_time('PointGfasHourlyEmissionInventory', 'get_input_path', timeit.default_timer() - st_time, + level=3) + + return file_path + + def get_altitude_name(self): + st_time = timeit.default_timer() + + if self.method == 'sovief': + alt_var = 'apt' + elif self.method == 'prm': + alt_var = 'mami' + else: + alt_var = None + print("ERROR: Only 'sovief' and 'prm' methods are accepted.") + + settings.write_time('PointGfasHourlyEmissionInventory', 'get_altitude_name', timeit.default_timer() - st_time, + level=3) + return alt_var + + @ staticmethod + def get_method(p_vertical): + """ + Extract the given method value. + + :return: Method value + :rtype: str + """ + import re + + st_time = timeit.default_timer() + + return_value = None + aux_list = re.split(', |,| , | ,', p_vertical) + for element in aux_list: + aux_value = re.split('=| =|= | = ', element) + if aux_value[0] == 'method': + return_value = aux_value[1] + + settings.write_time('PointGfasHourlyEmissionInventory', 'get_method', timeit.default_timer() - st_time, level=3) + + return return_value + + def get_cell_ids(self): + cell_id_by_proc = {} + for proc in range(settings.size): + if proc == settings.rank: + aux_fid = self.grid_shp['Cell_ID'].values + else: + aux_fid = None + cell_id_by_proc[proc] = bcast_array(settings.comm, aux_fid, master=proc) + + return cell_id_by_proc + + def get_fids(self): + fid_by_proc = {} + for proc in range(settings.size): + if proc == settings.rank: + aux_fid = list(np.unique(self.coordinates.index.get_level_values('FID').values)) + else: + aux_fid = None + fid_by_proc[proc] = bcast_array(settings.comm, aux_fid, master=proc) + + return fid_by_proc + + def do_regrid(self): + + import numpy as np + from netCDF4 import Dataset + + st_time = timeit.default_timer() + settings.write_log("\tAllocating GFAS as point sources on grid:", level=2) + + # # Masking + # if self.masking.regrid_mask is not None: + # gdf = gdf.loc[gdf['src_index'].isin(np.where(self.masking.regrid_mask.flatten() > 0)[0]), ] + + distribution = None + # st_time = timeit.default_timer() + shapefile_list = [] + + # One NetCDF for time step (hour) + for tstep, date in enumerate(self.date_array): + # print(date, self.get_input_path(date=date)) + shapefile_list_hour = [] + netcdf = Dataset(self.get_input_path(date=date), mode='r') + for pollutant in self.input_pollutants: + if distribution is None: + distribution = get_balanced_distribution( + settings.size, netcdf.variables[pollutant].shape)[settings.rank] + var = netcdf.variables[pollutant][0, distribution['y_min']: distribution['y_max'], :].flatten() + + var_index = np.where(var > 0)[0] + shapefile_list_hour.append( + pd.DataFrame(var[var_index], columns=[pollutant], + index=pd.MultiIndex.from_product([(var_index + distribution['fid_min']).astype(int), + [tstep]], names=['FID', 'tstep']))) + + shapefile = pd.concat(shapefile_list_hour, axis=1, sort=False) + + # shapefile = shapefile[sorted(sum(fid_distribution.values(), []))] + + altitude_var = netcdf.variables[self.altitude_name][:].flatten() + netcdf.close() + + shapefile['altitude'] = altitude_var[shapefile.index.get_level_values('FID')] + + shapefile_list.append(shapefile) + shapefile = pd.concat(shapefile_list, sort=False) + + # Filtering shapefile with the involved cells + shapefile = shapefile[shapefile.index.get_level_values('FID').isin( + sorted(sum(self.fid_distribution.values(), [])))] + + shapefile = balance_dataframe(settings.comm, shapefile) + shapefile.fillna(0.0, inplace=True) + + # print(timeit.default_timer() - st_time) + # shapefile = shapefile.reset_index().set_index(['FID', 'tstep']) + + self.emissions = shapefile + + return True + + def distribute(self, emissions): + emissions_list = [] + # print('distributing') + for proc in range(settings.size): + partial_emis = emissions[emissions['FID'].isin(sorted(np.unique(self.fid_distribution[proc])))].copy() + if settings.rank < proc: + settings.comm.send(partial_emis, dest=proc) + recv_partial_emis = settings.comm.recv(source=proc) + elif settings.rank > proc: + recv_partial_emis = settings.comm.recv(source=proc) + settings.comm.send(partial_emis, dest=proc) + else: + recv_partial_emis = partial_emis + emissions_list.append(recv_partial_emis) + + emissions = pd.concat(emissions_list) + + emissions = pd.merge(emissions, self.coordinates.reset_index(), on='FID') + + for pollutant in self.input_pollutants: + emissions[pollutant] = emissions[pollutant] * (emissions['src_area'] / emissions['dst_area']) + emissions.drop(columns=['FID', 'src_area', 'dst_area'], inplace=True) + + # Replacing Cell_ID by FID to be able to write + emissions = pd.merge(emissions, self.grid_shp[['FID', 'Cell_ID']], on='Cell_ID') + emissions.drop(columns=['Cell_ID'], inplace=True) + + emissions.rename(columns={'Cell_ID': 'FID'}, inplace=True) + emissions = emissions.groupby(['FID', 'tstep', 'layer']).sum() + + return emissions + + def calculate_altitudes(self, vertical_description_path): + """ + Calculate the number layer to allocate the point source. + + :param vertical_description_path: Path to the file that contains the vertical description + :type vertical_description_path: str + + :return: True + :rtype: bool + """ + import pandas as pd + + st_time = timeit.default_timer() + settings.write_log("\t\tCalculating vertical allocation.", level=3) + df = pd.read_csv(vertical_description_path, sep=',') + if self.vertical.approach == 'surface': + self.emissions['altitude'] = 0 + + self.emissions['layer'] = None + for i, line in df.iterrows(): + layer = line['Ilayer'] - 1 + self.emissions.loc[self.emissions['altitude'] <= line['height_magl'], 'layer'] = layer + self.emissions.loc[self.emissions['altitude'] <= line['height_magl'], 'altitude'] = None + + # Fires with higher altitudes than the max layer limit goes to the last layer + if len(self.emissions[~self.emissions['altitude'].isna()]) > 0: + self.emissions.loc[~self.emissions['altitude'].isna(), 'layer'] = layer + warnings.warn('WARNING: One or more fires have an altitude of fire emission injection higher than the top' + + ' layer of the model defined in the {0} file'.format(vertical_description_path)) + + # print(self.emissions.loc[~self.emissions['altitude'].isna()]) + del self.emissions['altitude'] + + self.emissions = self.emissions.groupby(['FID', 'tstep', 'layer']).sum() + self.emissions.reset_index(inplace=True) + + self.emissions = self.vertical.distribute_vertically(self.emissions, self.input_pollutants) + + settings.write_time('PointGfasHourlyEmissionInventory', 'calculate_altitudes', timeit.default_timer() - st_time, + level=2) + return True + + def point_source_by_cell(self): + """ + Sums the different emissions that are allocated in the same cell and layer. + + :return: None + """ + emissions = self.distribute(self.emissions).reset_index() + self.location = emissions.loc[:, ['FID', 'tstep', 'layer']] + + self.emissions = [] + for input_pollutant in self.input_pollutants: + dict_aux = { + 'name': input_pollutant, + 'units': '...', + 'data': emissions.loc[:, input_pollutant].values + } + self.emissions.append(dict_aux) + + +if __name__ == "__main__": + pass diff --git a/hermesv3_gr/modules/grids/grid.py b/hermesv3_gr/modules/grids/grid.py index 2a1c23d5bfd1deb167d1553348d9ab5fdb44c316..a4784dd934b72e56da7e6cf279cea97477b2b059 100755 --- a/hermesv3_gr/modules/grids/grid.py +++ b/hermesv3_gr/modules/grids/grid.py @@ -116,6 +116,7 @@ class Grid(object): self.coords_netcdf_file = os.path.join(temporal_path, 'temporal_coords.nc') self.temporal_path = temporal_path self.shapefile_path = None + self.border_shapefile_path = None # self.esmf_grid = None self.x_lower_bound = None @@ -428,23 +429,45 @@ class Grid(object): list_points = df.values del df['p1'], df['p2'], df['p3'], df['p4'] + # Calculating Cell ID + index = np.array(range(self.full_shape[2] * self.full_shape[3])) + index = index.reshape((self.full_shape[2], self.full_shape[3])) + index = index[self.x_lower_bound:self.x_upper_bound, self.y_lower_bound:self.y_upper_bound] + df['Cell_ID'] = index.flatten() # List of polygons from the list of points geometry = [Polygon(list(points)) for points in list_points] gdf = gpd.GeoDataFrame(df, crs={'init': 'epsg:4326'}, geometry=geometry) gdf = gdf.to_crs(self.crs) - gdf['FID'] = gdf.index + gdf.index.name = 'FID' - gdf.to_file(self.shapefile_path) + gdf.reset_index().to_file(self.shapefile_path) else: settings.write_log('\t\tGrid shapefile already done. Lets try to read it.', level=3) gdf = gpd.read_file(self.shapefile_path) + gdf.set_index('FID') settings.write_time('Grid', 'to_shapefile', timeit.default_timer() - st_time, level=1) return gdf + def get_border(self): + from geopandas import GeoDataFrame + from hermesv3_gr.tools.mpi_tools import bcast_array + + st_time = timeit.default_timer() + # settings.write_log('\t\tGetting grid shapefile', level=3) + + if settings.rank == 0: + grid = self.to_shapefile(full_grid=True) + border = GeoDataFrame(geometry=[grid.unary_union], crs=grid.crs) + else: + border = None + + border = bcast_array(settings.comm, border) + return border + def chech_coords_file(self): """ Checks if the auxiliary coordinates file is created well. diff --git a/hermesv3_gr/modules/temporal/temporal.py b/hermesv3_gr/modules/temporal/temporal.py index e1630aadbda8a4ada642ade11eb196687b09a944..256246fb3c411d7a51e159dd065cab12da61a813 100755 --- a/hermesv3_gr/modules/temporal/temporal.py +++ b/hermesv3_gr/modules/temporal/temporal.py @@ -122,7 +122,8 @@ class TemporalDistribution(object): settings.write_time('TemporalDistribution', 'Init', timeit.default_timer() - st_time, level=2) - def caclulate_date_array(self, st_date, time_step_type, time_step_num, time_step_freq): + @staticmethod + def caclulate_date_array(st_date, time_step_type, time_step_num, time_step_freq): from datetime import timedelta from calendar import monthrange, isleap diff --git a/hermesv3_gr/modules/vertical/vertical_gfas_hourly.py b/hermesv3_gr/modules/vertical/vertical_gfas_hourly.py new file mode 100755 index 0000000000000000000000000000000000000000..b650de4a83c935774743e65e5bc452971ddf9bbf --- /dev/null +++ b/hermesv3_gr/modules/vertical/vertical_gfas_hourly.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python + +# Copyright 2018 Earth Sciences Department, BSC-CNS +# +# This file is part of HERMESv3_GR. +# +# HERMESv3_GR is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# HERMESv3_GR is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with HERMESv3_GR. If not, see . + + +import timeit +import hermesv3_gr.config.settings as settings +from .vertical import VerticalDistribution + + +class GfasHourlyVerticalDistribution(VerticalDistribution): + """ + Class that contains all the needed information to vertically distribute the fire emissions. + + :param vertical_output_profile: Path to the file that contains the vertical description of the desired output. + :type vertical_output_profile: str + + :param approach: Approach to take into account. + :type approach: str + """ + def __init__(self, vertical_output_profile, approach): + st_time = timeit.default_timer() + + self.output_heights = vertical_output_profile + self.approach = approach + + settings.write_time('GfasHourlyVerticalDistribution', 'Init', timeit.default_timer() - st_time, level=3) + + def __str__(self): + + string = "GFAS Hourly Vertical distribution:\n" + + string += "\t self.approach = {0}\n".format(self.approach) + + string += "\t self.output_heights = {0}\n".format(self.output_heights) + + return string + + @staticmethod + def calculate_widths(heights_list): + """ + Calculate the width of each vertical level. + + :param heights_list: List of the top altitude in meters of each level. + :type heights_list: list + + :return: List with the width of each vertical level. + :rtype: list + """ + st_time = timeit.default_timer() + + widths = [] + for i in range(len(heights_list)): + if i == 0: + widths.append(heights_list[i]) + else: + widths.append(heights_list[i] - heights_list[i - 1]) + + settings.write_time('GfasHourlyVerticalDistribution', 'calculate_widths', + timeit.default_timer() - st_time, level=3) + return widths + + def get_weights(self, heights_list): + """ + Calculate the proportion (%) of emission to put on each layer. + + :param heights_list: List with the width of each vertical level. + :type heights_list: list + + :return: List of the weight to apply to each layer. + :rtype: list + """ + st_time = timeit.default_timer() + + weights = [] + width_list = self.calculate_widths(heights_list) + if self.approach == 'uniform': + max_percent = 1. + elif self.approach == '50_top': + max_percent = 0.5 + width_list = width_list[0:-1] + else: + max_percent = 1. + + for width in width_list: + weights.append((width * max_percent) / sum(width_list)) + if self.approach == '50_top': + if len(heights_list) == 1: + weights.append(1.) + else: + weights.append(0.5) + + settings.write_time('GfasHourlyVerticalDistribution', 'get_weights', timeit.default_timer() - st_time, level=3) + return weights + + def apply_approach(self, top_fires): + """ + Scatters the emissions vertically. + + :param top_fires: 4D array (time, level, latitude, longitude) with all the emission on each top layer. + :type top_fires: numpy.array + + :return: 4D array (time, level, latitude, longitude) with all the emission distributed on all the involved + layers. + :rtype: numpy.array + """ + import numpy as np + + st_time = timeit.default_timer() + + fires = np.zeros(top_fires.shape) + for i in range(len(self.output_heights)): + if top_fires[i].sum() != 0: + weight_list = self.get_weights(list(self.output_heights[0: i + 1])) + for i_weight in range(len(weight_list)): + fires[i_weight] += top_fires[i] * weight_list[i_weight] + + settings.write_time('GfasHourlyVerticalDistribution', 'apply_approach', + timeit.default_timer() - st_time, level=3) + return fires + + def do_vertical_interpolation_allocation(self, values, altitude): + """ + Allocates the fire emissions on their top level. + + :param values: 2D array with the fire emissions + :type values: numpy.array + + :param altitude: 2D array with the altitude of the fires. + :type altitude: numpy.array + + :return: Emissions already allocated on the top altitude of each fire. + :rtype: numpy.array + """ + import numpy as np + + st_time = timeit.default_timer() + + fire_list = [] + aux_var = values + for height in self.output_heights: + aux_data = np.zeros(aux_var.shape) + ma = np.ma.masked_less_equal(altitude, height) + aux_data[ma.mask] += aux_var[ma.mask] + aux_var -= aux_data + fire_list.append(aux_data) + fire_list = np.array(fire_list).reshape((len(fire_list), values.shape[1], values.shape[2])) + + settings.write_time('GfasHourlyVerticalDistribution', 'do_vertical_interpolation_allocation', + timeit.default_timer() - st_time, level=3) + return fire_list + + def do_vertical_interpolation(self, values): + """ + Manages all the process to do the vertical distribution. + + :param values: Emissions to be vertically distributed. + :type values: numpy.array + + :return: Emissions already vertically distributed. + :rtype: numpy.array + """ + st_time = timeit.default_timer() + + fire_list = self.apply_approach(values) + + settings.write_time('GfasHourlyVerticalDistribution', 'do_vertical_interpolation', + timeit.default_timer() - st_time, level=3) + return fire_list + + def calculate_weight_layer_dict(self, layer): + weight_layer_dict = {x: None for x in range(layer + 1)} + if self.approach == '50_top': + weight_layer_dict[layer] = 0.5 + to_distribute = 0.5 + layer = layer - 1 + elif self.approach == 'uniform': + to_distribute = 1. + + total_width = self.output_heights[layer] + + previous_height = 0 + for i, height in enumerate(self.output_heights[0:layer + 1]): + partial_width = height - previous_height + weight_layer_dict[i] = to_distribute * partial_width / total_width + + previous_height = height + + return weight_layer_dict + + def distribute_vertically(self, emissions_df, input_pollutant_list): + import pandas as pd + + vert_emissions = [] + for layer, emis in emissions_df.groupby('layer'): + if layer == 0: + vert_emissions.append(emis) + else: + weight_layer_dict = self.calculate_weight_layer_dict(layer) + for layer, weight in weight_layer_dict.items(): + aux_emis = emis.copy() + aux_emis.loc[:, 'layer'] = layer + aux_emis.loc[:, input_pollutant_list] = aux_emis[input_pollutant_list].multiply(weight) + + vert_emissions.append(aux_emis) + if len(vert_emissions) == 0: + vert_emissions = emissions_df + else: + vert_emissions = pd.concat(vert_emissions) + + vert_emissions = vert_emissions.groupby(['FID', 'tstep', 'layer']).sum() + vert_emissions.reset_index(inplace=True) + return vert_emissions + + +if __name__ == '__main__': + pass diff --git a/hermesv3_gr/modules/writing/writer.py b/hermesv3_gr/modules/writing/writer.py index c32517bfaafdf3223ff04c575666c8efd56278e0..fb8ebb959612d9bf1c3853f63e7f21c8b52aeb54 100755 --- a/hermesv3_gr/modules/writing/writer.py +++ b/hermesv3_gr/modules/writing/writer.py @@ -225,9 +225,15 @@ class Writer(object): else: aux_data = emission['data'] elif ei.source_type == 'point': - aux_data = np.zeros((shape[1], shape[2] * shape[3])) - aux_data[ei.location['layer'], ei.location['FID']] = emission['data'] - aux_data = aux_data.reshape((shape[1], shape[2], shape[3])) + if 'tstep' in ei.location.columns: + aux_data = np.zeros((shape[0], shape[1], shape[2] * shape[3])) + aux_data[ei.location['tstep'], ei.location['layer'], + ei.location['FID']] = emission['data'] + aux_data = aux_data.reshape(shape) + else: + aux_data = np.zeros((shape[1], shape[2] * shape[3])) + aux_data[ei.location['layer'], ei.location['FID']] = emission['data'] + aux_data = aux_data.reshape((shape[1], shape[2], shape[3])) else: aux_data = None @@ -241,7 +247,10 @@ class Writer(object): if ei.temporal_factors is not None: data += aux_data[np.newaxis, :, :, :] * ei.temporal_factors[:, np.newaxis, :, :] else: - data += aux_data[np.newaxis, :, :, :] + if len(aux_data.shape) == 4: + data += aux_data + else: + data += aux_data[np.newaxis, :, :, :] settings.write_time('TemporalDistribution', 'calculate_data_by_var', timeit.default_timer() - temporal_time, level=2) # Unit changes diff --git a/hermesv3_gr/tools/mpi_tools.py b/hermesv3_gr/tools/mpi_tools.py new file mode 100644 index 0000000000000000000000000000000000000000..430530266531ddb3f131a5c555f08e151799873b --- /dev/null +++ b/hermesv3_gr/tools/mpi_tools.py @@ -0,0 +1,78 @@ +import numpy as np + + +def scatter_array(comm, data, master=0): + if comm.Get_size() == 1: + data = data + else: + if comm.Get_rank() == master: + data = np.array_split(data, comm.Get_size()) + else: + data = None + data = comm.scatter(data, root=master) + + return data + + +def bcast_array(comm, data, master=0): + if comm.Get_size() == 1: + data = data + else: + data = comm.bcast(data, root=master) + + return data + + +def balance_dataframe(comm, data, master=0): + import pandas as pd + data = comm.gather(data, root=master) + if comm.Get_rank() == master: + data = pd.concat(data) + data = np.array_split(data, comm.Get_size()) + else: + data = None + + data = comm.scatter(data, root=master) + + return data + + +def get_balanced_distribution(processors, shape): + while len(shape) < 4: + shape = (0,) + shape + + fid_dist = {} + total_rows = shape[2] + + procs_rows = total_rows // processors + procs_rows_extended = total_rows-(procs_rows*processors) + + rows_sum = 0 + for proc in range(processors): + if proc < procs_rows_extended: + aux_rows = procs_rows + 1 + else: + aux_rows = procs_rows + + total_rows -= aux_rows + if total_rows < 0: + rows = total_rows + aux_rows + else: + rows = aux_rows + + min_fid = proc * aux_rows * shape[3] + max_fid = (proc + 1) * aux_rows * shape[3] + + fid_dist[proc] = { + 'y_min': rows_sum, + 'y_max': rows_sum + rows, + 'x_min': 0, + 'x_max': shape[3], + 'fid_min': min_fid, + 'fid_max': max_fid, + 'shape': (shape[0], shape[1], rows, shape[3]), + } + + rows_sum += rows + + return fid_dist