diff --git a/conf/hermes.conf b/conf/hermes.conf index c79b872dc0b632bd232b66bf8dce848408cd7f66..bb3de1df378f681ddc18d3a4fb508e53f6211384 100755 --- a/conf/hermes.conf +++ b/conf/hermes.conf @@ -3,14 +3,14 @@ log_level = 3 input_dir = /home/Earth/ctena/Models/hermesv3_bu_data data_path = /esarchive/recon output_dir = /scratch/Earth/HERMESv3_BU_OUT -output_name = HERMESv3__aviation.nc +output_name = HERMESv3__solvents.nc emission_summary = 0 start_date = 2016/11/29 00:00:00 # ----- end_date = start_date [DEFAULT] ----- # end_date = 2010/01/01 00:00:00 output_timestep_num = 24 auxiliary_files_path = /scratch/Earth/HERMESv3_BU_aux/test/_ -erase_auxiliary_files = 0 +erase_auxiliary_files = 1 [DOMAIN] @@ -64,6 +64,14 @@ vertical_description = /profiles/vertical/MONARCH_Global_48layers_ver x_0 = 253151.59375 y_0 = 43862.90625 + # IP + #nx = 397 + #ny = 397 + #inc_x = 4000 + #inc_y = 4000 + #x_0 = -807847.688 + #y_0 = -797137.125 + # EUROPA #nx = 478 #ny = 398 @@ -72,13 +80,6 @@ vertical_description = /profiles/vertical/MONARCH_Global_48layers_ver #x_0 = -2131849.000point_sources #y_0 = -2073137.875 - # IP - # nx = 397 - # ny = 397 - # inc_x = 4000 - # inc_y = 4000 - # x_0 = -807847.688 - # y_0 = -797137.125 # MAD #nx = 146 @@ -130,7 +131,7 @@ writing_processors = 1 # traffic_processors = 256 # traffic_area_processors = 1 -aviation_processors = 1 +aviation_processors = 0 shipping_port_processors = 0 livestock_processors = 0 crop_operations_processors = 0 @@ -141,12 +142,16 @@ recreational_boats_processors = 0 point_sources_processors = 0 traffic_processors = 0 traffic_area_processors = 0 +solvents_processors = 4 [SHAPEFILES] nuts3_shapefile = /shapefiles/nuts3/nuts3.shp nuts2_shapefile = /shapefiles/nuts2/nuts2.shp +land_uses_path = /ecmwf/clc/original_files/g250_clc12_v18_5a/g250_clc12_V18_5.tif +land_use_by_nuts2_path = /agriculture/land_use_ccaa.csv population_density_map = /jrc/ghsl/original_files/GHS_POP_GPW42015_GLOBE_R2015A_54009_1k_v1_0.tif +population_nuts2 = /shapefiles/nuts2/pop_by_nut2.csv [SPECIATION DATA] speciation_map = /profiles/speciation/map_base.csv @@ -217,8 +222,6 @@ livestock_hourly_profiles = /profiles/temporal/livestock/hourly_profi livestock_speciation_profiles = /profiles/speciation/livestock/speciation_profiles_base.csv [AGRICULTURAL] -land_uses_path = /ecmwf/clc/original_files/g250_clc12_v18_5a/g250_clc12_V18_5.tif -land_use_by_nut_path = /agriculture/land_use_ccaa.csv crop_by_nut_path = /agriculture/crops_ha_2017.csv crop_from_landuse_path = /agriculture/map_crops_landuse.csv @@ -345,3 +348,14 @@ traffic_area_small_cities_ef_file = /traffic_area/ef/small_cities.csv small_cities_monthly_profile = /profiles/temporal/traffic_area/small_cities_monthly_profiles.csv small_cities_weekly_profile = /profiles/temporal/traffic_area/small_cities_weekly_profiles.csv small_cities_hourly_profile = /profiles/temporal/traffic_area/small_cities_hourly_profiles.csv + +[SOLVENTS] +solvents_pollutants = nmvoc +solvents_proxies_path = /solvents/proxies_profiles.csv +solvents_yearly_emissions_by_nut2_path = /solvents/miteco_solvent_emissions_nuts2_2015.csv +solvents_point_sources_shapefile = /solvents/use_solvents_point_sources.shp +solvents_point_sources_weight_by_nut2_path = /solvents/point_sources_weights_nuts2.csv +solvents_monthly_profile = /profiles/temporal/solvents/monthly_profiles.csv +solvents_weekly_profile = /profiles/temporal/solvents/weekly_profiles.csv +solvents_hourly_profile = /profiles/temporal/solvents/hourly_profiles.csv +solvents_speciation_profiles = /profiles/speciation/solvents/speciation_profiles_base.csv diff --git a/hermesv3_bu/config/config.py b/hermesv3_bu/config/config.py index 26d4ce2ce23141c09c48107b618d1cb5e4a8366d..76a81f1962196d87b1ffdbcf247018ba75c37fb2 100755 --- a/hermesv3_bu/config/config.py +++ b/hermesv3_bu/config/config.py @@ -146,12 +146,14 @@ class Config(ArgParser): p.add_argument('--crop_operations_processors', required=True, type=int) p.add_argument('--crop_fertilizers_processors', required=True, type=int) p.add_argument('--agricultural_machinery_processors', required=True, type=int) + p.add_argument('--solvents_processors', required=True, type=int) p.add_argument('--speciation_map', required=False, help='...') # ===== SHAPEFILES ===== p.add_argument('--nuts3_shapefile', required=False, type=str, default='True') p.add_argument('--nuts2_shapefile', required=False, type=str, default='True') + p.add_argument('--population_nuts2', required=False, type=str, default='True') p.add_argument('--clipping', required=False, type=str, default=None, help='To clip the domain into an specific zone. ' + @@ -220,7 +222,7 @@ class Config(ArgParser): # ***** AGRICULTURAL SECTOR***** p.add_argument('--land_uses_path', required=False, help='...') - p.add_argument('--land_use_by_nut_path', required=False, help='...') + p.add_argument('--land_use_by_nuts2_path', required=False, help='...') p.add_argument('--crop_by_nut_path', required=False, help='...') p.add_argument('--crop_from_landuse_path', required=False, help='...') @@ -347,6 +349,17 @@ class Config(ArgParser): p.add_argument('--small_cities_weekly_profile', required=False, help='...') p.add_argument('--small_cities_monthly_profile', required=False, help='...') + # ***** SOLVENTS SECTOR ***** + p.add_argument('--solvents_pollutants', required=False, help='...') + p.add_argument('--solvents_proxies_path', required=False, help='...') + p.add_argument('--solvents_yearly_emissions_by_nut2_path', required=False, help='...') + p.add_argument('--solvents_point_sources_shapefile', required=False, help='...') + p.add_argument('--solvents_point_sources_weight_by_nut2_path', required=False, help='...') + p.add_argument('--solvents_monthly_profile', required=False, help='...') + p.add_argument('--solvents_weekly_profile', required=False, help='...') + p.add_argument('--solvents_hourly_profile', required=False, help='...') + p.add_argument('--solvents_speciation_profiles', required=False, help='...') + arguments = p.parse_args() for item in vars(arguments): @@ -382,6 +395,7 @@ class Config(ArgParser): comm.Barrier() self.create_dir(arguments.auxiliary_files_path) + # Booleans arguments.do_traffic = arguments.traffic_processors > 0 arguments.do_traffic_area = arguments.traffic_area_processors > 0 arguments.do_aviation = arguments.aviation_processors > 0 @@ -393,6 +407,7 @@ class Config(ArgParser): arguments.do_crop_operations = arguments.crop_operations_processors > 0 arguments.do_crop_fertilizers = arguments.crop_fertilizers_processors > 0 arguments.do_agricultural_machinery = arguments.agricultural_machinery_processors > 0 + arguments.do_solvents = arguments.solvents_processors > 0 # Aviation lists arguments.airport_list = self._parse_list(arguments.airport_list) @@ -458,6 +473,9 @@ class Config(ArgParser): # Traffic area lists arguments.traffic_area_pollutants = self._parse_list(arguments.traffic_area_pollutants) + # Solvents lists + arguments.solvents_pollutants = self._parse_list(arguments.solvents_pollutants) + return arguments @staticmethod diff --git a/hermesv3_bu/io_server/io_raster.py b/hermesv3_bu/io_server/io_raster.py index 0f6f1a68202d3c2b7f8d55e5621276e356be03ed..b331cdf6ffe43e2ec937a7f3e9baddfdf0c69984 100755 --- a/hermesv3_bu/io_server/io_raster.py +++ b/hermesv3_bu/io_server/io_raster.py @@ -10,6 +10,7 @@ import numpy as np from shapely.geometry import Polygon from hermesv3_bu.io_server.io_server import IoServer +from hermesv3_bu.io_server.io_shapefile import IoShapefile from hermesv3_bu.tools.checker import check_files, error_exit @@ -202,7 +203,6 @@ class IoRaster(IoServer): :param out_path: :param write: :param crs: - :param rank: :param nodata: :return: """ @@ -313,3 +313,76 @@ class IoRaster(IoServer): gdf = gpd.read_file(out_path) gdf.set_index('CELL_ID', inplace=True) return gdf + + def to_shapefile_parallel(self, raster_path, gather=False, bcast=False, crs=None, nodata=0): + spent_time = timeit.default_timer() + if self.comm.Get_rank() == 0: + ds = rasterio.open(raster_path) + grid_info = ds.transform + + # TODO remove when new version will be installed + if rasterio.__version__ == '0.36.0': + lons = np.arange(ds.width) * grid_info[1] + grid_info[0] + lats = np.arange(ds.height) * grid_info[5] + grid_info[3] + elif rasterio.__version__ == '1.0.21': + lons = np.arange(ds.width) * grid_info[0] + grid_info[2] + lats = np.arange(ds.height) * grid_info[4] + grid_info[5] + else: + lons = np.arange(ds.width) * grid_info[0] + grid_info[2] + lats = np.arange(ds.height) * grid_info[4] + grid_info[5] + + # 1D to 2D + c_lats = np.array([lats] * len(lons)).T.flatten() + c_lons = np.array([lons] * len(lats)).flatten() + del lons, lats + if rasterio.__version__ == '0.36.0': + b_lons = self.create_bounds(c_lons, grid_info[1], number_vertices=4) + grid_info[1] / 2 + b_lats = self.create_bounds(c_lats, grid_info[1], number_vertices=4, inverse=True) + grid_info[5] / 2 + elif rasterio.__version__ == '1.0.21': + b_lons = self.create_bounds(c_lons, grid_info[0], number_vertices=4) + grid_info[0] / 2 + b_lats = self.create_bounds(c_lats, grid_info[4], number_vertices=4, inverse=True) + grid_info[4] / 2 + else: + b_lons = self.create_bounds(c_lons, grid_info[0], number_vertices=4) + grid_info[0] / 2 + b_lats = self.create_bounds(c_lats, grid_info[4], number_vertices=4, inverse=True) + grid_info[4] / 2 + + b_lats = b_lats.reshape((b_lats.shape[1], b_lats.shape[2])) + b_lons = b_lons.reshape((b_lons.shape[1], b_lons.shape[2])) + + gdf = gpd.GeoDataFrame(ds.read(1).flatten(), columns=['data'], index=range(b_lons.shape[0]), crs=ds.crs) + gdf['geometry'] = None + # nodata_i = gdf['data'] != nodata + # print(nodata_i) + # print(b_lats) + # gdf = gdf[nodata_i] + # b_lons = b_lons[nodata_i, :] + # b_lats = b_lats[nodata_i, :] + # print(gdf) + else: + gdf = None + b_lons = None + b_lats = None + self.comm.Barrier() + gdf = IoShapefile(self.comm).split_shapefile(gdf) + + b_lons = IoShapefile(self.comm).split_shapefile(b_lons) + b_lats = IoShapefile(self.comm).split_shapefile(b_lats) + + i = 0 + for j, df_aux in gdf.iterrows(): + gdf.loc[j, 'geometry'] = Polygon([(b_lons[i, 0], b_lats[i, 0]), + (b_lons[i, 1], b_lats[i, 1]), + (b_lons[i, 2], b_lats[i, 2]), + (b_lons[i, 3], b_lats[i, 3]), + (b_lons[i, 0], b_lats[i, 0])]) + i += 1 + + gdf['CELL_ID'] = gdf.index + gdf = gdf[gdf['data'] != nodata] + if crs is not None: + gdf = gdf.to_crs(crs) + + if gather and not bcast: + gdf = IoShapefile(self.comm).gather_shapefile(gdf) + elif gather and bcast: + gdf = IoShapefile(self.comm).gather_bcast_shapefile(gdf) + return gdf diff --git a/hermesv3_bu/io_server/io_server.py b/hermesv3_bu/io_server/io_server.py index 694798fd68072c67125102568fc2bfdf625474b2..46bd918bcaa6dc48c8b6214ea4f9d13595256f34 100755 --- a/hermesv3_bu/io_server/io_server.py +++ b/hermesv3_bu/io_server/io_server.py @@ -1,6 +1,12 @@ #!/usr/bin/env python +from mpi4py import MPI + class IoServer(object): + """ + :param comm: Communicator object + :type comm: MPI.Comm + """ def __init__(self, comm): self.comm = comm diff --git a/hermesv3_bu/io_server/io_shapefile.py b/hermesv3_bu/io_server/io_shapefile.py index d4a5df150e46b3df3f6633e698cd8d5748745e48..59ece649cf221a86df7b49557988fe4114f3affc 100755 --- a/hermesv3_bu/io_server/io_shapefile.py +++ b/hermesv3_bu/io_server/io_shapefile.py @@ -9,6 +9,7 @@ import pandas as pd import geopandas as gpd from mpi4py import MPI +from geopandas import GeoDataFrame from hermesv3_bu.io_server.io_server import IoServer from hermesv3_bu.tools.checker import check_files @@ -88,6 +89,13 @@ class IoShapefile(IoServer): return data def split_shapefile(self, data, rank=0): + """ + + :param data: + :param rank: + :return: Splitted Shapefile + :rtype: GeoDataFrame + """ if self.comm.Get_size() == 1: data = data @@ -100,6 +108,32 @@ class IoShapefile(IoServer): return data + def gather_bcast_shapefile(self, data, rank=0): + + if self.comm.Get_size() == 1: + data = data + else: + data = self.comm.gather(data, root=rank) + if self.comm.Get_rank() == rank: + data = pd.concat(data) + else: + data = None + data = self.comm.bcast(data, root=rank) + + return data + + def gather_shapefile(self, data, rank=0): + + if self.comm.Get_size() == 1: + data = data + else: + data = self.comm.gather(data, root=rank) + if self.comm.Get_rank() == rank: + data = pd.concat(data) + else: + data = None + return data + def balance(self, data, rank=0): data = self.comm.gather(data, root=rank) diff --git a/hermesv3_bu/sectors/point_source_sector.py b/hermesv3_bu/sectors/point_source_sector.py index fd71d8bb9df9f389da90ff9f2d6c3c0c846bb1a7..584e79d8578fd13d010bed80e0f7aacdd3937693 100755 --- a/hermesv3_bu/sectors/point_source_sector.py +++ b/hermesv3_bu/sectors/point_source_sector.py @@ -98,7 +98,7 @@ class PointSourceSector(Sector): spec_res = links_spec - spec if len(spec_res) > 0: error_exit("The following speciation profile IDs reported in the point sources shapefile do not appear " + - "in the speciation profiles file. {0}".format(month_res)) + "in the speciation profiles file. {0}".format(spec_res)) def read_catalog_csv(self, catalog_path, sector_list): """ diff --git a/hermesv3_bu/sectors/sector.py b/hermesv3_bu/sectors/sector.py index f49e37977693d46bfbae4115de781332a8eb09e3..0427e7b28b742539fc2397818174261672082a67 100755 --- a/hermesv3_bu/sectors/sector.py +++ b/hermesv3_bu/sectors/sector.py @@ -3,12 +3,13 @@ import sys import os import timeit -from hermesv3_bu.logger.log import Log import numpy as np import pandas as pd import geopandas as gpd -from geopandas import GeoDataFrame from mpi4py import MPI + +from geopandas import GeoDataFrame +from hermesv3_bu.logger.log import Log from hermesv3_bu.grids.grid import Grid @@ -445,7 +446,7 @@ class Sector(object): spent_time = timeit.default_timer() nut_shapefile = gpd.read_file(nut_shapefile_path).to_crs(shapefile.crs) shapefile = gpd.sjoin(shapefile, nut_shapefile.loc[:, [nut_value, 'geometry']], how='left', op='intersects') - + del nut_shapefile shapefile = shapefile[~shapefile.index.duplicated(keep='first')] shapefile.drop('index_right', axis=1, inplace=True) @@ -563,3 +564,45 @@ class Sector(object): return_value = [outs for outs, ints in self.speciation_map.items() if ints == input_pollutant] self.logger.write_time_log('Sector', 'get_output_pollutants', timeit.default_timer() - spent_time) return return_value + + def calculate_land_use_by_nut(self, land_use_raster_path, nut_shapefile_path, out_land_use_by_nut_path): + from hermesv3_bu.io_server.io_raster import IoRaster + from hermesv3_bu.io_server.io_shapefile import IoShapefile + + # 1st Clip the raster + lu_raster_path = os.path.join(self.auxiliary_dir, 'clipped_land_use.tif') + + if self.comm.Get_rank() == 0: + if not os.path.exists(lu_raster_path): + lu_raster_path = IoRaster(self.comm).clip_raster_with_shapefile_poly( + land_use_raster_path, self.clip.shapefile, lu_raster_path) + + # 2nd Raster to shapefile + land_use_shp = IoRaster(self.comm).to_shapefile_parallel(lu_raster_path, gather=False, bcast=False) + + # 3rd Add NUT code + land_use_shp.drop(columns='CELL_ID', inplace=True) + land_use_shp.rename(columns={'data': 'land_use'}, inplace=True) + land_use_shp = self.add_nut_code(land_use_shp, nut_shapefile_path, nut_value='nuts2_id') + land_use_shp = land_use_shp[land_use_shp['nut_code'] != -999] + land_use_shp = IoShapefile(self.comm).balance(land_use_shp) + + # 4th Calculate land_use percent + land_use_shp['area'] = land_use_shp.geometry.area + + land_use_by_nut = GeoDataFrame(index=pd.MultiIndex.from_product( + [np.unique(land_use_shp['nut_code']), np.unique(land_use_shp['land_use'])], names=['nuts2_id', 'land_use'])) + + for nut_code in np.unique(land_use_shp['nut_code']): + for land_use in np.unique(land_use_shp['land_use']): + land_use_by_nut.loc[(nut_code, land_use), 'area'] = land_use_shp.loc[ + (land_use_shp['land_use'] == land_use) & (land_use_shp['nut_code'] == nut_code), 'area'].sum() + + land_use_by_nut.reset_index(inplace=True) + land_use_by_nut = IoShapefile(self.comm).gather_shapefile(land_use_by_nut, rank=0) + + if self.comm.Get_rank() == 0: + land_use_by_nut = land_use_by_nut.groupby(['nuts2_id', 'land_use']).sum() + land_use_by_nut.to_csv(out_land_use_by_nut_path) + print('DONE -> {0}'.format(out_land_use_by_nut_path)) + self.comm.Barrier() diff --git a/hermesv3_bu/sectors/sector_manager.py b/hermesv3_bu/sectors/sector_manager.py index 9f4f5aee0f288608fe8484e5a8a90d65ecf13253..a48319673d8c950f8320ef69b5250518c28acc26 100755 --- a/hermesv3_bu/sectors/sector_manager.py +++ b/hermesv3_bu/sectors/sector_manager.py @@ -5,7 +5,7 @@ from hermesv3_bu.logger.log import Log from hermesv3_bu.tools.checker import error_exit SECTOR_LIST = ['traffic', 'traffic_area', 'aviation', 'point_sources', 'recreational_boats', 'shipping_port', - 'residential', 'livestock', 'crop_operations', 'crop_fertilizers', 'agricultural_machinery'] + 'residential', 'livestock', 'crop_operations', 'crop_fertilizers', 'agricultural_machinery', 'solvents'] class SectorManager(object): @@ -86,7 +86,7 @@ class SectorManager(object): arguments.crop_operations_monthly_profiles, arguments.crop_operations_weekly_profiles, arguments.crop_operations_hourly_profiles, arguments.speciation_map, arguments.crop_operations_speciation_profiles, arguments.molecular_weights, - arguments.land_use_by_nut_path, arguments.crop_by_nut_path, arguments.crop_from_landuse_path) + arguments.land_use_by_nuts2_path, arguments.crop_by_nut_path, arguments.crop_from_landuse_path) elif sector == 'crop_fertilizers' and comm_world.Get_rank() in sector_procs: from hermesv3_bu.sectors.agricultural_crop_fertilizers_sector import AgriculturalCropFertilizersSector @@ -99,7 +99,7 @@ class SectorManager(object): arguments.crop_fertilizers_list, arguments.nuts2_shapefile, arguments.land_uses_path, arguments.crop_fertilizers_hourly_profiles, arguments.speciation_map, arguments.crop_fertilizers_speciation_profiles, arguments.molecular_weights, - arguments.land_use_by_nut_path, arguments.crop_by_nut_path, arguments.crop_from_landuse_path, + arguments.land_use_by_nuts2_path, arguments.crop_by_nut_path, arguments.crop_from_landuse_path, arguments.cultivated_ratio, arguments.fertilizers_rate, arguments.crop_f_parameter, arguments.crop_f_fertilizers, arguments.gridded_ph, arguments.gridded_cec, arguments.fertilizers_denominator_yearly_factor_path, arguments.crop_calendar, @@ -119,7 +119,7 @@ class SectorManager(object): arguments.crop_machinery_monthly_profiles, arguments.crop_machinery_weekly_profiles, arguments.crop_machinery_hourly_profiles, arguments.speciation_map, arguments.crop_machinery_speciation_profiles, arguments.molecular_weights, - arguments.land_use_by_nut_path, arguments.crop_by_nut_path, arguments.crop_from_landuse_path, + arguments.land_use_by_nuts2_path, arguments.crop_by_nut_path, arguments.crop_from_landuse_path, arguments.nuts3_shapefile, arguments.crop_machinery_deterioration_factor_path, arguments.crop_machinery_load_factor_path, arguments.crop_machinery_vehicle_ratio_path, arguments.crop_machinery_vehicle_units_path, arguments.crop_machinery_vehicle_workhours_path, @@ -204,6 +204,18 @@ class SectorManager(object): arguments.traffic_area_small_cities_ef_file, arguments.small_cities_monthly_profile, arguments.small_cities_weekly_profile, arguments.small_cities_hourly_profile ) + elif sector == 'solvents' and comm_world.Get_rank() in sector_procs: + from hermesv3_bu.sectors.solvents_sector import SolventsSector + self.sector = SolventsSector( + comm_world.Split(color, sector_procs.index(comm_world.Get_rank())), self.logger, + arguments.auxiliary_files_path, grid, clip, date_array, arguments.solvents_pollutants, + grid.vertical_desctiption, arguments.speciation_map, arguments.molecular_weights, + arguments.solvents_speciation_profiles, arguments.solvents_monthly_profile, + arguments.solvents_weekly_profile, arguments.solvents_hourly_profile, + arguments.solvents_proxies_path, arguments.solvents_yearly_emissions_by_nut2_path, + arguments.solvents_point_sources_shapefile, arguments.solvents_point_sources_weight_by_nut2_path, + arguments.population_density_map, arguments.population_nuts2, arguments.land_uses_path, + arguments.land_use_by_nuts2_path, arguments.nuts2_shapefile) color += 1 diff --git a/hermesv3_bu/sectors/solvents_sector.py b/hermesv3_bu/sectors/solvents_sector.py new file mode 100755 index 0000000000000000000000000000000000000000..a54cf9e0b3d455758f199ad874fb62afff8df702 --- /dev/null +++ b/hermesv3_bu/sectors/solvents_sector.py @@ -0,0 +1,743 @@ +#!/usr/bin/env python + +import sys +import os +import timeit +import geopandas as gpd +import pandas as pd +import numpy as np +from warnings import warn +from hermesv3_bu.sectors.sector import Sector +from hermesv3_bu.io_server.io_shapefile import IoShapefile +from hermesv3_bu.io_server.io_raster import IoRaster +from hermesv3_bu.tools.checker import check_files, error_exit +from pandas import DataFrame +from geopandas import GeoDataFrame +from hermesv3_bu.grids.grid import Grid +from hermesv3_bu.logger.log import Log +from hermesv3_bu.clipping.clip import Clip + +PROXY_NAMES = {'boat_building': 'boat', + 'automobile_manufacturing': 'automobile', + 'car_repairing': 'car_repair', + 'dry_cleaning': 'dry_clean', + 'rubber_processing': 'rubber', + 'paints_manufacturing': 'paints', + 'inks_manufacturing': 'ink', + 'glues_manufacturing': 'glues', + 'pharmaceutical_products_manufacturing': 'pharma', + 'leather_taning': 'leather', + 'printing': 'printing', + } + + +class SolventsSector(Sector): + """ + Solvents sector allows to calculate the solvents emissions. + + It first calculates the horizontal distribution for the different sources and store them in an auxiliary file + during the initialization part. + + Once the initialization is finished it distribute the emissions of the different sub sectors ont he grid to start + the temporal disaggregation. + """ + def __init__(self, comm, logger, auxiliary_dir, grid, clip, date_array, source_pollutants, vertical_levels, + speciation_map_path, molecular_weights_path, speciation_profiles_path, monthly_profile_path, + weekly_profile_path, hourly_profile_path, proxies_map_path, yearly_emissions_by_nut2_path, + point_sources_shapefile_path, point_sources_weight_by_nut2_path, population_raster_path, + population_nuts2_path, land_uses_raster_path, land_uses_nuts2_path, nut2_shapefile_path): + """ + :param comm: Communicator for the sector calculation. + :type comm: MPI.COMM + + :param logger: Logger + :type logger: Log + + :param auxiliary_dir: Path to the directory where the necessary auxiliary files will be created if them are not + created yet. + :type auxiliary_dir: str + + :param grid: Grid object. + :type grid: Grid + + :param clip: Clip object + :type clip: Clip + + :param date_array: List of datetimes. + :type date_array: list(datetime.datetime, ...) + + :param source_pollutants: List of input pollutants to take into account. + :type source_pollutants: list + + :param vertical_levels: List of top level of each vertical layer. + :type vertical_levels: list + + :param speciation_map_path: Path to the CSV file that contains the speciation map. The CSV file must contain + the following columns [dst, src, description] + The 'dst' column will be used as output pollutant list and the 'src' column as their onw input pollutant + to be used as a fraction in the speciation profiles. + :type speciation_map_path: str + + :param molecular_weights_path: Path to the CSV file that contains all the molecular weights needed. The CSV + file must contain the 'Specie' and 'MW' columns. + :type molecular_weights_path: str + + :param speciation_profiles_path: Path to the file that contains all the speciation profiles. The CSV file + must contain the "Code" column with the value of each animal of the animal_list. The rest of columns + have to be the sames as the column 'dst' of the 'speciation_map_path' file. + :type speciation_profiles_path: str + + :param hourly_profile_path: Path to the CSV file that contains all the monthly profiles. The CSV file must + contain the following columns [P_month, January, February, ..., November, December] + The P_month code have to match with the proxies_map_path file. + :type hourly_profile_path: str + + :param weekly_profile_path: Path to the CSV file that contains all the weekly profiles. The CSV file must + contain the following columns [P_week, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday] + The P_week code have to match with the proxies_map_path file. + :type weekly_profile_path: str + + :param hourly_profile_path: Path to the CSV file that contains all the hourly profiles. The CSV file must + contain the following columns [P_hour, 0, 1, 2, 3, ..., 22, 23] + The P_week code have to match with the proxies_map_path file. + :type hourly_profile_path: str + + :param proxies_map_path: Path to the CSV file that contains the proxies map. + :type proxies_map_path: str + + :param yearly_emissions_by_nut2_path: Path to the CSV file that contains the yearly emissions by subsecotr and + nuts2 level. + :type yearly_emissions_by_nut2_path: str + + :param point_sources_shapefile_path: Path to the shapefile that contains the point sources for solvents. + :type point_sources_shapefile_path: str + + :param point_sources_weight_by_nut2_path: Path to the CSV file that contains the weight for each proxy and nut2. + :type point_sources_weight_by_nut2_path: str + + :param population_raster_path: Path to the population raster. + :type population_raster_path: str + + :param population_nuts2_path: Path to the CSV file that contains the amount of population for each nut2. + :type population_nuts2_path: str + + :param land_uses_raster_path: Path to the land use raster. + :type land_uses_raster_path: str + + :param land_uses_nuts2_path: Path to the CSV file that contains the amount of land use for each nut2. + :type land_uses_nuts2_path: str + + :param nut2_shapefile_path: Path to the shapefile that contains the nut2. + :type nut2_shapefile_path: str + """ + spent_time = timeit.default_timer() + logger.write_log('===== SOLVENTS SECTOR =====') + + check_files([speciation_map_path, molecular_weights_path, speciation_profiles_path, monthly_profile_path, + weekly_profile_path, hourly_profile_path, proxies_map_path, yearly_emissions_by_nut2_path, + point_sources_shapefile_path, point_sources_weight_by_nut2_path, population_raster_path, + population_nuts2_path, land_uses_raster_path, land_uses_nuts2_path, nut2_shapefile_path]) + + super(SolventsSector, self).__init__( + comm, logger, auxiliary_dir, grid, clip, date_array, source_pollutants, vertical_levels, + monthly_profile_path, weekly_profile_path, hourly_profile_path, speciation_map_path, + speciation_profiles_path, molecular_weights_path) + + self.proxies_map = self.read_proxies(proxies_map_path) + self.check_profiles() + + self.proxy = self.get_proxy_shapefile( + population_raster_path, population_nuts2_path, land_uses_raster_path, land_uses_nuts2_path, + nut2_shapefile_path, point_sources_shapefile_path, point_sources_weight_by_nut2_path) + + self.yearly_emissions_path = yearly_emissions_by_nut2_path + self.logger.write_time_log('SolventsSector', '__init__', timeit.default_timer() - spent_time) + + def read_proxies(self, path): + """ + Read the proxy map. + + It will filter the CONS == '1' snaps and add the 'spatial_proxy' column that the content will match with some + column of the proxy shapefile. + + :param path: path to the CSV file that have the proxy map. + :type path: str + + :return: Proxy map. + :rtype: DataFrame + """ + spent_time = timeit.default_timer() + proxies_df = pd.read_csv(path, dtype=str) + + proxies_df.set_index('snap', inplace=True) + proxies_df = proxies_df.loc[proxies_df['CONS'] == '1'] + proxies_df.drop(columns=['activity', 'CONS', 'gnfr'], inplace=True) + + proxies_df.loc[proxies_df['spatial_proxy'] == 'population', 'proxy_name'] = 'population' + proxies_df.loc[proxies_df['spatial_proxy'] == 'land_use', 'proxy_name'] = \ + 'lu_' + proxies_df['land_use_code'].replace(' ', '_', regex=True) + proxies_df.loc[proxies_df['spatial_proxy'] == 'shapefile', 'proxy_name'] = \ + proxies_df['industry_code'].map(PROXY_NAMES) + + self.logger.write_time_log('SolventsSector', 'read_proxies', timeit.default_timer() - spent_time) + return proxies_df + + def check_profiles(self): + """ + Check that the profiles appear on the profile files. + + It will check the content of the proxies map. + Check that the 'P_month' content appears on the monthly profiles. + Check that the 'P_week' content appears on the weekly profiles. + Check that the 'P_hour' content appears on the hourly profiles. + Check that the 'P_spec' content appears on the speciation profiles. + + It will stop teh execution if the requirements are not satisfied. + + :return: True when everything is OK. + :rtype: bool + """ + spent_time = timeit.default_timer() + # Checking monthly profiles IDs + links_month = set(np.unique(self.proxies_map['P_month'].dropna().values)) + month = set(self.monthly_profiles.index.values) + month_res = links_month - month + if len(month_res) > 0: + error_exit("The following monthly profile IDs reported in the solvent proxies CSV file do not appear " + + "in the monthly profiles file. {0}".format(month_res)) + # Checking weekly profiles IDs + links_week = set(np.unique(self.proxies_map['P_week'].dropna().values)) + week = set(self.weekly_profiles.index.values) + week_res = links_week - week + if len(week_res) > 0: + error_exit("The following weekly profile IDs reported in the solvent proxies CSV file do not appear " + + "in the weekly profiles file. {0}".format(week_res)) + # Checking hourly profiles IDs + links_hour = set(np.unique(self.proxies_map['P_hour'].dropna().values)) + hour = set(self.hourly_profiles.index.values) + hour_res = links_hour - hour + if len(hour_res) > 0: + error_exit("The following hourly profile IDs reported in the solvent proxies CSV file do not appear " + + "in the hourly profiles file. {0}".format(hour_res)) + # Checking speciation profiles IDs + links_spec = set(np.unique(self.proxies_map['P_spec'].dropna().values)) + spec = set(self.speciation_profile.index.values) + spec_res = links_spec - spec + if len(spec_res) > 0: + error_exit("The following speciation profile IDs reported in the solvent proxies CSV file do not appear " + + "in the speciation profiles file. {0}".format(spec_res)) + + self.logger.write_time_log('SolventsSector', 'check_profiles', timeit.default_timer() - spent_time) + return True + + def read_yearly_emissions(self, path, nut_list): + """ + Read the yearly emission by snap and nuts2. + + Select only the nuts2 IDs that appear in the selected domain. + + :param path: Path to the CSV file that contains the yearly emissions by snap and nuts2. + :type path: str + + :param nut_list: List of nut codes + :type nut_list: list + + :return: Dataframe with thew amount of NMVOC for each snap and nut2 + :rtype: DataFrame + """ + spent_time = timeit.default_timer() + + year_emis = pd.read_csv(path, dtype={'nuts2_id': int, 'snap': str, 'nmvoc': np.float64}) + year_emis = year_emis[year_emis['nuts2_id'].isin(nut_list)] + year_emis.set_index(['nuts2_id', 'snap'], inplace=True) + year_emis.drop(columns=['gnfr_description', 'gnfr', 'snap_description', 'nuts2_na'], inplace=True) + + self.logger.write_time_log('SolventsSector', 'read_yearly_emissions', timeit.default_timer() - spent_time) + return year_emis + + def get_population_by_nut2(self, path): + """ + Read the CSV file that contains the amount of population by nut2. + + :param path: Path to the CSV file that contains the amount of population by nut2. + :type path: str + + :return: Dataframe with the amount of population by nut2. + :rtype: DataFrame + """ + spent_time = timeit.default_timer() + + pop_by_nut2 = pd.read_csv(path) + pop_by_nut2.set_index('nuts2_id', inplace=True) + pop_by_nut2 = pop_by_nut2.to_dict()['pop'] + + self.logger.write_time_log('SolventsSector', 'get_pop_by_nut2', timeit.default_timer() - spent_time) + return pop_by_nut2 + + def get_point_sources_weights_by_nut2(self, path, proxy_name): + """ + Read the CSV file that contains the amount of weight by industry and nut2. + + :param path: Path to the CSV file that contains the amount of weight by industry and nut2. + :type path: str + + :param proxy_name: Proxy to calculate. + :type proxy_name: str + + :return: DataFrame with the amount of weight by industry and nut2. + :rtype: DataFrame + """ + spent_time = timeit.default_timer() + + weights_by_nut2 = pd.read_csv(path) + weights_by_nut2['nuts2_id'] = weights_by_nut2['nuts2_id'].astype(int) + weights_by_nut2 = weights_by_nut2[weights_by_nut2['industry_c'] == proxy_name] + weights_by_nut2.drop(columns=['industry_c'], inplace=True) + weights_by_nut2.set_index("nuts2_id", inplace=True) + weights_by_nut2 = weights_by_nut2.to_dict()['weight'] + + self.logger.write_time_log('SolventsSector', 'get_point_sources_weights_by_nut2', + timeit.default_timer() - spent_time) + return weights_by_nut2 + + def get_land_use_by_nut2(self, path, land_uses, nut_codes): + """ + Read the CSV file that contains the amount of land use by nut2. + + :param path: Path to the CSV file that contains the amount of land use by nut2. + :type path: str + + :param land_uses: List of land uses to take into account. + :type land_uses: list + + :param nut_codes: List of nut2 codes to take into account. + :type nut_codes: list + + :return: DataFrame with the amount of land use by nut2. + :rtype: DataFrame + """ + spent_time = timeit.default_timer() + + land_use_by_nut2 = pd.read_csv(path) + land_use_by_nut2 = land_use_by_nut2[land_use_by_nut2['nuts2_id'].isin(nut_codes)] + land_use_by_nut2 = land_use_by_nut2[land_use_by_nut2['land_use'].isin(land_uses)] + land_use_by_nut2.set_index(['nuts2_id', 'land_use'], inplace=True) + + self.logger.write_time_log('SolventsSector', 'get_land_use_by_nut2', timeit.default_timer() - spent_time) + return land_use_by_nut2 + + def get_population_proxy(self, pop_raster_path, pop_by_nut2_path, nut2_shapefile_path): + """ + Calculate the distribution based on the amount of population. + + :param pop_raster_path: Path to the raster file that contains the population information. + :type pop_raster_path: str + + :param pop_by_nut2_path: Path to the CSV file that contains the amount of population by nut2. + :type pop_by_nut2_path: str + + :param nut2_shapefile_path: Path to the shapefile that contains the nut2. + :type nut2_shapefile_path: str + + :return: GeoDataFrame with the population distribution by destiny cell. + :rtype: GeoDataFrame + """ + spent_time = timeit.default_timer() + + # 1st Clip the raster + self.logger.write_log("\t\tCreating clipped population raster", message_level=3) + if self.comm.Get_rank() == 0: + pop_raster_path = IoRaster(self.comm).clip_raster_with_shapefile_poly( + pop_raster_path, self.clip.shapefile, os.path.join(self.auxiliary_dir, 'solvents', 'pop.tif')) + + # 2nd Raster to shapefile + self.logger.write_log("\t\tRaster to shapefile", message_level=3) + pop_shp = IoRaster(self.comm).to_shapefile_parallel( + pop_raster_path, gather=False, bcast=False, crs={'init': 'epsg:4326'}) + + # 3rd Add NUT code + self.logger.write_log("\t\tAdding nut codes to the shapefile", message_level=3) + # if self.comm.Get_rank() == 0: + pop_shp.drop(columns='CELL_ID', inplace=True) + pop_shp.rename(columns={'data': 'population'}, inplace=True) + pop_shp = self.add_nut_code(pop_shp, nut2_shapefile_path, nut_value='nuts2_id') + pop_shp = pop_shp[pop_shp['nut_code'] != -999] + pop_shp = IoShapefile(self.comm).balance(pop_shp) + # pop_shp = IoShapefile(self.comm).split_shapefile(pop_shp) + + # 4th Calculate population percent + self.logger.write_log("\t\tCalculating population percentage on source resolution", message_level=3) + pop_by_nut2 = self.get_population_by_nut2(pop_by_nut2_path) + pop_shp['tot_pop'] = pop_shp['nut_code'].map(pop_by_nut2) + pop_shp['pop_percent'] = pop_shp['population'] / pop_shp['tot_pop'] + pop_shp.drop(columns=['tot_pop', 'population'], inplace=True) + + # 5th Calculate percent by destiny cell + self.logger.write_log("\t\tCalculating population percentage on destiny resolution", message_level=3) + pop_shp.to_crs(self.grid.shapefile.crs, inplace=True) + pop_shp['src_inter_fraction'] = pop_shp.geometry.area + pop_shp = self.spatial_overlays(pop_shp.reset_index(), self.grid.shapefile.reset_index()) + pop_shp.drop(columns=['idx1', 'idx2', 'index'], inplace=True) + pop_shp['src_inter_fraction'] = pop_shp.geometry.area / pop_shp['src_inter_fraction'] + pop_shp['pop_percent'] = pop_shp['pop_percent'] * pop_shp['src_inter_fraction'] + pop_shp.drop(columns=['src_inter_fraction'], inplace=True) + + popu_dist = pop_shp.groupby(['FID', 'nut_code']).sum() + popu_dist.rename(columns={'pop_percent': 'population'}, inplace=True) + + self.logger.write_time_log('SolventsSector', 'get_population_proxie', timeit.default_timer() - spent_time) + return popu_dist + + def get_land_use_proxy(self, land_use_raster, land_use_by_nut2_path, land_uses, nut2_shapefile_path): + """ + Calculate the distribution based on the amount of land use. + + :param land_use_raster: Path to the raster file that contains the land use information. + :type land_use_raster: str + + :param land_use_by_nut2_path: Path to the CSV file that contains the amount of land use by nut2. + :type land_use_by_nut2_path: str + + :param land_uses: List of land uses to take into account on the distribution. + :type land_uses: list + + :param nut2_shapefile_path: Path to the shapefile that contains the nut2. + :type nut2_shapefile_path: str + + :return: GeoDataFrame with the land use distribution for the selected land uses by destiny cell. + :rtype: GeoDataFrame + """ + spent_time = timeit.default_timer() + # 1st Clip the raster + self.logger.write_log("\t\tCreating clipped land use raster", message_level=3) + lu_raster_path = os.path.join(self.auxiliary_dir, 'solvents', 'lu_{0}.tif'.format( + '_'.join([str(x) for x in land_uses]))) + + if self.comm.Get_rank() == 0: + if not os.path.exists(lu_raster_path): + lu_raster_path = IoRaster(self.comm).clip_raster_with_shapefile_poly( + land_use_raster, self.clip.shapefile, lu_raster_path, values=land_uses) + + # 2nd Raster to shapefile + self.logger.write_log("\t\tRaster to shapefile", message_level=3) + land_use_shp = IoRaster(self.comm).to_shapefile_parallel(lu_raster_path, gather=False, bcast=False) + + # 3rd Add NUT code + self.logger.write_log("\t\tAdding nut codes to the shapefile", message_level=3) + # if self.comm.Get_rank() == 0: + land_use_shp.drop(columns='CELL_ID', inplace=True) + land_use_shp.rename(columns={'data': 'land_use'}, inplace=True) + land_use_shp = self.add_nut_code(land_use_shp, nut2_shapefile_path, nut_value='nuts2_id') + land_use_shp = land_use_shp[land_use_shp['nut_code'] != -999] + land_use_shp = IoShapefile(self.comm).balance(land_use_shp) + # land_use_shp = IoShapefile(self.comm).split_shapefile(land_use_shp) + + # 4th Calculate land_use percent + self.logger.write_log("\t\tCalculating land use percentage on source resolution", message_level=3) + + land_use_shp['area'] = land_use_shp.geometry.area + land_use_by_nut2 = self.get_land_use_by_nut2( + land_use_by_nut2_path, land_uses, np.unique(land_use_shp['nut_code'])) + land_use_shp.drop(columns=['land_use'], inplace=True) + + land_use_shp['fraction'] = land_use_shp.apply( + lambda row: row['area'] / land_use_by_nut2.xs(row['nut_code'], level='nuts2_id').sum(), axis=1) + land_use_shp.drop(columns='area', inplace=True) + + # 5th Calculate percent by dest_cell + self.logger.write_log("\t\tCalculating land use percentage on destiny resolution", message_level=3) + + land_use_shp.to_crs(self.grid.shapefile.crs, inplace=True) + land_use_shp['src_inter_fraction'] = land_use_shp.geometry.area + land_use_shp = self.spatial_overlays(land_use_shp.reset_index(), self.grid.shapefile.reset_index()) + land_use_shp.drop(columns=['idx1', 'idx2', 'index'], inplace=True) + land_use_shp['src_inter_fraction'] = land_use_shp.geometry.area / land_use_shp['src_inter_fraction'] + land_use_shp['fraction'] = land_use_shp['fraction'] * land_use_shp['src_inter_fraction'] + land_use_shp.drop(columns=['src_inter_fraction'], inplace=True) + + land_use_dist = land_use_shp.groupby(['FID', 'nut_code']).sum() + land_use_dist.rename(columns={'fraction': 'lu_{0}'.format('_'.join([str(x) for x in land_uses]))}, inplace=True) + + self.logger.write_time_log('SolventsSector', 'get_land_use_proxy', timeit.default_timer() - spent_time) + return land_use_dist + + def get_point_shapefile_proxy(self, proxy_name, point_shapefile_path, point_sources_weight_by_nut2_path, + nut2_shapefile_path): + """ + Calculate the distribution for the solvent sub sector in the destiny grid cell. + + :param proxy_name: Name of the proxy to be calculated. + :type proxy_name: str + + :param point_shapefile_path: Path to the shapefile that contains all the point sources ant their weights. + :type point_shapefile_path: str + + :param point_sources_weight_by_nut2_path: Path to the CSV file that contains the amount of weight by industry + and nut2. + :type point_sources_weight_by_nut2_path: str + + :param nut2_shapefile_path: Path to the shapefile that contains the nut2. + :type nut2_shapefile_path: str + + :return: GeoDataFrame with the distribution of the selected proxy on the destiny grid cells. + :rtype: GeoDataFrame + """ + spent_time = timeit.default_timer() + + point_shapefile = IoShapefile(self.comm).read_shapefile_parallel(point_shapefile_path) + point_shapefile.drop(columns=['Empresa', 'Empleados', 'Ingresos', 'Consumos', 'LON', 'LAT'], inplace=True) + point_shapefile = point_shapefile[point_shapefile['industry_c'] == + [key for key, value in PROXY_NAMES.items() if value == proxy_name][0]] + point_shapefile = IoShapefile(self.comm).balance(point_shapefile) + point_shapefile.drop(columns=['industry_c'], inplace=True) + point_shapefile = self.add_nut_code(point_shapefile, nut2_shapefile_path, nut_value='nuts2_id') + point_shapefile = point_shapefile[point_shapefile['nut_code'] != -999] + + point_shapefile = IoShapefile(self.comm).gather_shapefile(point_shapefile, rank=0) + if self.comm.Get_rank() == 0: + weight_by_nut2 = self.get_point_sources_weights_by_nut2( + point_sources_weight_by_nut2_path, + [key for key, value in PROXY_NAMES.items() if value == proxy_name][0]) + point_shapefile[proxy_name] = point_shapefile.apply( + lambda row: row['weight'] / weight_by_nut2[row['nut_code']], axis=1) + point_shapefile.drop(columns=['weight'], inplace=True) + # print(point_shapefile.groupby('nut_code')['weight'].sum()) + + point_shapefile = IoShapefile(self.comm).split_shapefile(point_shapefile) + point_shapefile = gpd.sjoin(point_shapefile.to_crs(self.grid.shapefile.crs), self.grid.shapefile.reset_index()) + point_shapefile.drop(columns=['geometry', 'index_right'], inplace=True) + point_shapefile = point_shapefile.groupby(['FID', 'nut_code']).sum() + + self.logger.write_time_log('SolventsSector', 'get_point_shapefile_proxy', timeit.default_timer() - spent_time) + return point_shapefile + + def get_proxy_shapefile(self, population_raster_path, population_nuts2_path, land_uses_raster_path, + land_uses_nuts2_path, nut2_shapefile_path, point_sources_shapefile_path, + point_sources_weight_by_nut2_path): + """ + Calcualte (or read) the proxy shapefile. + + It will split the entire shapoefile into as many processors as selected to split the calculation part. + + :param population_raster_path: Path to the raster file that contains the population information. + :type population_raster_path: str + + :param population_nuts2_path: Path to the CSV file that contains the amount of population by nut2. + :type population_nuts2_path: str + + :param land_uses_raster_path: Path to the raster file that contains the land use information. + :type land_uses_raster_path: str + + :param land_uses_nuts2_path: Path to the CSV file that contains the amount of land use by nut2. + :type land_uses_nuts2_path: str + + :param nut2_shapefile_path: Path to the shapefile that contains the nut2. + :type nut2_shapefile_path: str + + :param point_sources_shapefile_path: Path to the shapefile that contains all the point sources ant their + weights. + :type point_sources_shapefile_path: str + + :param point_sources_weight_by_nut2_path: Path to the CSV file that contains the amount of weight by industry + and nut2. + :type point_sources_weight_by_nut2_path: str + + :return: GeoDataFrame with all the proxies + :rtype: GeoDataFrame + """ + spent_time = timeit.default_timer() + + self.logger.write_log("Getting proxies shapefile", message_level=1) + proxy_names_list = np.unique(self.proxies_map['proxy_name']) + proxy_path = os.path.join(self.auxiliary_dir, 'solvents', 'proxy_distributions.shp') + if not os.path.exists(proxy_path): + proxy_list = [] + for proxy_name in proxy_names_list: + self.logger.write_log("\tGetting proxy for {0}".format(proxy_name), message_level=2) + if proxy_name == 'population': + proxy = self.get_population_proxy(population_raster_path, population_nuts2_path, + nut2_shapefile_path) + elif proxy_name[:3] == 'lu_': + land_uses = [int(x) for x in proxy_name[3:].split('_')] + + proxy = self.get_land_use_proxy(land_uses_raster_path, land_uses_nuts2_path, land_uses, + nut2_shapefile_path) + else: + proxy = self.get_point_shapefile_proxy(proxy_name, point_sources_shapefile_path, + point_sources_weight_by_nut2_path, nut2_shapefile_path) + proxy = IoShapefile(self.comm).gather_shapefile(proxy.reset_index()) + if self.comm.Get_rank() == 0: + proxy_list.append(proxy) + if self.comm.Get_rank() == 0: + proxies = pd.concat(proxy_list, sort=False) + proxies['FID'] = proxies['FID'].astype(int) + proxies['nut_code'] = proxies['nut_code'].astype(int) + proxies = proxies.groupby(['FID', 'nut_code']).sum() + proxies = GeoDataFrame(proxies) + # print(self.grid.shapefile.loc[proxies.index.get_level_values('FID'), 'geometry'].values) + # exit() + proxies = GeoDataFrame( + proxies, geometry=self.grid.shapefile.loc[proxies.index.get_level_values('FID'), 'geometry'].values, + crs=self.grid.shapefile.crs) + IoShapefile(self.comm).write_shapefile_serial(proxies.reset_index(), proxy_path) + else: + proxies = None + else: + if self.comm.Get_rank() == 0: + proxies = IoShapefile(self.comm).read_shapefile_serial(proxy_path) + proxies.set_index(['FID', 'nut_code'], inplace=True) + else: + proxies = None + proxies = IoShapefile(self.comm).split_shapefile(proxies) + + self.logger.write_time_log('SolventsSector', 'get_proxy_shapefile', timeit.default_timer() - spent_time) + return proxies + + def calculate_hourly_emissions(self, yearly_emissions): + """ + Disaggrate to hourly level the yearly emissions. + + :param yearly_emissions: GeoDataFrame with the yearly emissions by destiny cell ID and snap code. + :type yearly_emissions: GeoDataFrame + + :return: GeoDataFrame with the hourly distribution by FID, snap code and time step. + :rtype: GeoDataFrame + """ + def get_mf(df): + month_factor = self.monthly_profiles.loc[df.name[1], df.name[0]] + + df['MF'] = month_factor + return df.loc[:, ['MF']] + + def get_wf(df): + weekly_profile = self.calculate_rebalanced_weekly_profile(self.weekly_profiles.loc[df.name[1], :].to_dict(), + df.name[0]) + df['WF'] = weekly_profile[df.name[0].weekday()] + return df.loc[:, ['WF']] + + def get_hf(df): + hourly_profile = self.hourly_profiles.loc[df.name[1], :].to_dict() + hour_factor = hourly_profile[df.name[0]] + + df['HF'] = hour_factor + return df.loc[:, ['HF']] + + spent_time = timeit.default_timer() + + self.logger.write_log('\tHourly disaggregation', message_level=2) + emissions = self.add_dates(yearly_emissions, drop_utc=True) + + emissions['month'] = emissions['date'].dt.month + emissions['weekday'] = emissions['date'].dt.weekday + emissions['hour'] = emissions['date'].dt.hour + emissions['date_as_date'] = emissions['date'].dt.date + + emissions['MF'] = emissions.groupby(['month', 'P_month']).apply(get_mf) + emissions['WF'] = emissions.groupby(['date_as_date', 'P_week']).apply(get_wf) + emissions['HF'] = emissions.groupby(['hour', 'P_hour']).apply(get_hf) + + emissions['temp_factor'] = emissions['MF'] * emissions['WF'] * emissions['HF'] + emissions.drop(columns=['MF', 'P_month', 'month', 'WF', 'P_week', 'weekday', 'HF', 'P_hour', 'hour', 'date', + 'date_as_date'], inplace=True) + emissions['nmvoc'] = emissions['nmvoc'] * emissions['temp_factor'] + emissions.drop(columns=['temp_factor'], inplace=True) + emissions.set_index(['FID', 'snap', 'tstep'], inplace=True) + + self.logger.write_time_log('SolventsSector', 'calculate_hourly_emissions', timeit.default_timer() - spent_time) + return emissions + + def distribute_yearly_emissions(self): + """ + Calcualte the yearly emission by destiny grid cell and snap code. + + :return: GeoDataFrame with the yearly emissions by snap code. + :rtype: GeoDataFrame + """ + spent_time = timeit.default_timer() + self.logger.write_log('\t\tYearly distribution', message_level=2) + + yearly_emis = self.read_yearly_emissions( + self.yearly_emissions_path, np.unique(self.proxy.index.get_level_values('nut_code'))) + year_nuts = np.unique(yearly_emis.index.get_level_values('nuts2_id')) + proxy_nuts = np.unique(self.proxy.index.get_level_values('nut_code')) + unknow_nuts = list(set(proxy_nuts) - set(year_nuts)) + if len(unknow_nuts) > 0: + warn("*WARNING* The {0} nuts2_id have no emissions in the solvents_yearly_emissions_by_nut2_path.".format( + str(unknow_nuts))) + self.proxy.drop(unknow_nuts, level='nut_code', inplace=True) + emis_list = [] + for snap, snap_df in self.proxies_map.iterrows(): + emis = self.proxy.reset_index() + emis['snap'] = snap + emis['P_month'] = snap_df['P_month'] + emis['P_week'] = snap_df['P_week'] + emis['P_hour'] = snap_df['P_hour'] + emis['P_spec'] = snap_df['P_spec'] + + emis['nmvoc'] = emis.apply(lambda row: yearly_emis.loc[(row['nut_code'], snap), 'nmvoc'] * row[ + self.proxies_map.loc[snap, 'proxy_name']], axis=1) + + emis.set_index(['FID', 'snap'], inplace=True) + emis_list.append(emis[['P_month', 'P_week', 'P_hour', 'P_spec', 'nmvoc', 'geometry']]) + emis = pd.concat(emis_list).sort_index() + emis = emis[emis['nmvoc'] > 0] + + self.logger.write_time_log('SolventsSector', 'distribute_yearly_emissions', timeit.default_timer() - spent_time) + return emis + + def speciate(self, dataframe, code='default'): + """ + Spectiate the NMVOC pollutant into as many pollutants as the speciation map indicates. + + :param dataframe: Emissions to be speciated. + :type dataframe: DataFrame + + :param code: NOt used. + + :return: Speciated emissions. + :rtype: DataFrame + """ + + def calculate_new_pollutant(x, out_p): + sys.stdout.flush() + profile = self.speciation_profile.loc[x.name, ['VOCtoTOG', out_p]] + x[out_p] = x['nmvoc'] * (profile['VOCtoTOG'] * profile[out_p]) + return x[[out_p]] + + spent_time = timeit.default_timer() + self.logger.write_log('\tSpeciation emissions', message_level=2) + + new_dataframe = gpd.GeoDataFrame(index=dataframe.index, data=None, crs=dataframe.crs, + geometry=dataframe.geometry) + for out_pollutant in self.output_pollutants: + self.logger.write_log('\t\tSpeciating {0}'.format(out_pollutant), message_level=3) + new_dataframe[out_pollutant] = dataframe.groupby('P_spec').apply( + lambda x: calculate_new_pollutant(x, out_pollutant)) + new_dataframe.reset_index(inplace=True) + + new_dataframe.drop(columns=['snap', 'geometry'], inplace=True) + new_dataframe.set_index(['FID', 'tstep'], inplace=True) + + self.logger.write_time_log('SolventsSector', 'speciate', timeit.default_timer() - spent_time) + return new_dataframe + + def calculate_emissions(self): + """ + Main function to calculate the emissions. + + :return: Solvent emissions. + :rtype: DataFrame + """ + spent_time = timeit.default_timer() + self.logger.write_log('\tCalculating emissions') + + emissions = self.distribute_yearly_emissions() + emissions = self.calculate_hourly_emissions(emissions) + emissions = self.speciate(emissions) + + emissions.reset_index(inplace=True) + emissions['layer'] = 0 + emissions = emissions.groupby(['FID', 'layer', 'tstep']).sum() + + self.logger.write_time_log('SolventsSector', 'calculate_emissions', timeit.default_timer() - spent_time) + return emissions