diff --git a/conf/hermes.conf b/conf/hermes.conf index f199d0100de76d3f847c74316389888489bd7098..d68ac22feae721646b285677a7fb4022c2310a51 100755 --- a/conf/hermes.conf +++ b/conf/hermes.conf @@ -2,14 +2,14 @@ log_level = 3 input_dir = /home/Earth/ctena/Models/hermesv3_bu_data data_path = /esarchive/recon -output_dir = /scratch/Earth/HERMESv3_BU_OUT +output_dir = /scratch/Earth/HERMESv3_BU_OUT/ output_name = HERMESv3_.nc emission_summary = 0 start_date = 2016/11/29 00:00:00 # ----- end_date = start_date [DEFAULT] ----- # end_date = 2010/01/01 00:00:00 output_timestep_num = 24 -auxiliary_files_path = /scratch/Earth/HERMESv3_BU_aux/test/_ +auxiliary_files_path = /scratch/Earth/HERMESv3_BU_aux/aux_parallel/_ erase_auxiliary_files = 0 @@ -55,10 +55,10 @@ vertical_description = /profiles/vertical/MONARCH_Global_48layers_ver #y_0 = 43862.90625 # CATALUNYA test - #nx = 28 - #ny = 30 - nx = 4 - ny = 4 + nx = 28 + ny = 30 + #nx = 4 + #ny = 4 inc_x = 10000 inc_y = 10000 x_0 = 253151.59375 @@ -131,19 +131,7 @@ writing_processors = 1 # traffic_processors = 256 # traffic_area_processors = 1 -aviation_processors = 0 -shipping_port_processors = 0 -livestock_processors = 0 -crop_operations_processors = 0 -crop_fertilizers_processors = 0 -agricultural_machinery_processors = 0 -residential_processors = 0 -recreational_boats_processors = 0 -point_sources_processors = 0 -traffic_processors = 0 -traffic_area_processors = 0 -solvents_processors = 1 - +traffic_processors = 1 [SHAPEFILES] nuts3_shapefile = /shapefiles/nuts3/nuts3.shp @@ -338,7 +326,7 @@ traffic_speciation_profile_resuspension = /profiles/speciation/traffi traffic_area_pollutants = nox_no2,nmvoc,so2,co,nh3,pm10,pm25 do_evaporative = 1 traffic_area_gas_path = /traffic_area/gasoline_vehicles_provinces_2015.csv -population_nuts3 = /traffic_area/population_by_mun.csv +population_nuts3 = /traffic_area/population_nuts3.csv traffic_area_speciation_profiles_evaporative = /profiles/speciation/traffic_area/evaporative_base.csv traffic_area_evaporative_ef_file = /traffic_area/ef/evaporative_nmvoc.csv do_small_cities = 1 diff --git a/hermesv3_bu/hermes.py b/hermesv3_bu/hermes.py index b8841bd3d4f181a3eb6b4112e941f6a4ed21ac10..9101d5f201983e882af2438a39a395e78b35cd13 100755 --- a/hermesv3_bu/hermes.py +++ b/hermesv3_bu/hermes.py @@ -57,7 +57,7 @@ class Hermes(object): self.writer.write(emis) self.comm.Barrier() - self.logger.write_log('***** HERMES simulation finished succesful *****') + self.logger.write_log('***** HERMESv3_BU simulation finished successfully *****') self.logger.write_time_log('Hermes', 'TOTAL', timeit.default_timer() - self.initial_time) self.logger.finish_logs() diff --git a/hermesv3_bu/io_server/io_raster.py b/hermesv3_bu/io_server/io_raster.py index b331cdf6ffe43e2ec937a7f3e9baddfdf0c69984..de052025f2c5d77c38df32da44083c165f5900ae 100755 --- a/hermesv3_bu/io_server/io_raster.py +++ b/hermesv3_bu/io_server/io_raster.py @@ -349,14 +349,10 @@ class IoRaster(IoServer): b_lons = b_lons.reshape((b_lons.shape[1], b_lons.shape[2])) gdf = gpd.GeoDataFrame(ds.read(1).flatten(), columns=['data'], index=range(b_lons.shape[0]), crs=ds.crs) + # Error on to_crs function of geopandas that flip lat with lon in the non dict form + if gdf.crs == 'EPSG:4326': + gdf.crs = {'init': 'epsg:4326'} gdf['geometry'] = None - # nodata_i = gdf['data'] != nodata - # print(nodata_i) - # print(b_lats) - # gdf = gdf[nodata_i] - # b_lons = b_lons[nodata_i, :] - # b_lats = b_lats[nodata_i, :] - # print(gdf) else: gdf = None b_lons = None @@ -378,6 +374,7 @@ class IoRaster(IoServer): gdf['CELL_ID'] = gdf.index gdf = gdf[gdf['data'] != nodata] + if crs is not None: gdf = gdf.to_crs(crs) diff --git a/hermesv3_bu/sectors/agricultural_crop_fertilizers_sector.py b/hermesv3_bu/sectors/agricultural_crop_fertilizers_sector.py index 27bb100c6aeea020de8c5fd4e817077c2695dd2a..db9d4ec1a947a3c057cbba5a7a42e2489493959e 100755 --- a/hermesv3_bu/sectors/agricultural_crop_fertilizers_sector.py +++ b/hermesv3_bu/sectors/agricultural_crop_fertilizers_sector.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +import sys import os import timeit import pandas as pd @@ -11,6 +12,7 @@ from hermesv3_bu.io_server.io_shapefile import IoShapefile from hermesv3_bu.io_server.io_netcdf import IoNetcdf from hermesv3_bu.logger.log import Log from hermesv3_bu.tools.checker import check_files +from geopandas import GeoDataFrame FORMULA = True @@ -42,27 +44,8 @@ class AgriculturalCropFertilizersSector(AgriculturalSector): self.crop_f_parameter = self.read_profiles(crop_f_parameter) self.crop_f_parameter.rename(columns={'nuts2_id': 'code'}, inplace=True) self.crop_f_fertilizers = self.read_profiles(crop_f_fertilizers) - - if self.comm.Get_rank() == 0: - self.logger.write_log('Getting gridded constants', message_level=2) - self.gridded_constants = self.get_gridded_constants( - os.path.join(auxiliary_dir, 'fertilizers', 'gridded_constants.shp'), - gridded_ph, - os.path.join(auxiliary_dir, 'fertilizers', 'gridded_ph.tif'), - gridded_cec, - os.path.join(auxiliary_dir, 'fertilizers', 'gridded_cec.tif')) - self.ef_by_crop = self.get_ef_by_crop() - else: - self.logger.write_log('Waiting for master to get the gridded constants', message_level=2) - self.gridded_constants = None - self.ef_by_crop = None - - self.gridded_constants = self.comm.bcast(self.gridded_constants, root=0) - # self.gridded_constants = IoShapefile(self.comm).split_shapefile(self.gridded_constants) - self.gridded_constants = self.gridded_constants.loc[self.crop_distribution.index, :] - self.ef_by_crop = self.comm.bcast(self.ef_by_crop, root=0) - # self.ef_by_crop = IoShapefile(self.comm).split_shapefile(self.ef_by_crop) - self.ef_by_crop = self.ef_by_crop.loc[self.crop_distribution.index, :] + self.gridded_constants = self.get_gridded_constants(gridded_ph, gridded_cec) + self.ef_by_crop = self.get_ef_by_crop() self.fertilizer_denominator_yearly_factor_path = fertilizer_denominator_yearly_factor_path self.crop_calendar = self.read_profiles(crop_calendar) @@ -140,9 +123,14 @@ class AgriculturalCropFertilizersSector(AgriculturalSector): intersection = self.spatial_overlays(src_shapefile.to_crs(self.grid.shapefile.crs).reset_index(), self.grid.shapefile.reset_index()) + + # intersection = IoShapefile(self.comm).balance(intersection) + intersection['area'] = intersection.geometry.area dst_shapefile = self.grid.shapefile.reset_index().copy() + # dst_shapefile = self.grid.shapefile.loc[np.unique(intersection['FID'])].copy() dst_shapefile['involved_area'] = intersection.groupby('FID')['area'].sum() + # dst_shapefile.reset_index(inplace=True) intersection_with_dst_areas = pd.merge(intersection, dst_shapefile.loc[:, ['FID', 'involved_area']], how='left', on='FID') intersection_with_dst_areas['involved_area'] = \ @@ -150,51 +138,47 @@ class AgriculturalCropFertilizersSector(AgriculturalSector): intersection_with_dst_areas[value] = \ intersection_with_dst_areas[value] * intersection_with_dst_areas['involved_area'] + dst_shapefile.set_index('FID', inplace=True) dst_shapefile[value] = intersection_with_dst_areas.groupby('FID')[value].sum() - dst_shapefile.drop('involved_area', axis=1, inplace=True) + # dst_shapefile.drop('involved_area', axis=1, inplace=True) + dst_shapefile.dropna(inplace=True) + + dst_shapefile = IoShapefile(self.comm).gather_shapefile(dst_shapefile.reset_index()) + if self.comm.Get_rank() == 0: + # dst_shapefile['FID_involved_area'] = dst_shapefile.groupby('FID')['involved_area'].sum() + # dst_shapefile['involved_area'] = dst_shapefile['involved_area'] / dst_shapefile['FID_involved_area'] + # dst_shapefile[value] = dst_shapefile[value] * dst_shapefile['involved_area'] + # dst_shapefile[value] = dst_shapefile[value].astype(np.float64) + # dst_shapefile.drop(columns=['involved_area', 'FID_involved_area'], inplace=True) + # dst_shapefile = dst_shapefile.groupby(['FID'])[value].sum() + dst_shapefile = dst_shapefile.groupby(['FID'])[value].mean() + else: + dst_shapefile = None + dst_shapefile = IoShapefile(self.comm).split_shapefile(dst_shapefile) + # print('Rank {0} -Z {1}: \n{2}\n'.format(self.comm.Get_rank(), value, dst_shapefile)) + # sys.stdout.flush() self.logger.write_time_log('AgriculturalCropFertilizersSector', 'to_dst_resolution', timeit.default_timer() - spent_time) - dst_shapefile.set_index('FID', inplace=True) - return dst_shapefile - def to_dst_resolution_parallel(self, src_shapefile, index, value): + def get_gridded_constants(self, ph_path, cec_path): spent_time = timeit.default_timer() - grid_shp = self.grid.shapefile.loc[index, :].copy() - src_shapefile = self.comm.bcast(src_shapefile, root=0) - src_shapefile = src_shapefile.to_crs(grid_shp.crs) - src_shapefile = src_shapefile[src_shapefile.within(grid_shp.unary_union)] - - intersection = self.spatial_overlays(src_shapefile, grid_shp) - intersection['area'] = intersection.geometry.area - dst_shapefile = grid_shp.copy() - dst_shapefile['involved_area'] = intersection.groupby('FID')['area'].sum() - intersection_with_dst_areas = pd.merge(intersection, dst_shapefile.loc[:, ['FID', 'involved_area']], - how='left', on='FID') - intersection_with_dst_areas['involved_area'] = \ - intersection_with_dst_areas['area'] / intersection_with_dst_areas['involved_area'] - - intersection_with_dst_areas[value] = \ - intersection_with_dst_areas[value] * intersection_with_dst_areas['involved_area'] - dst_shapefile[value] = intersection_with_dst_areas.groupby('FID')[value].sum() - dst_shapefile.drop('involved_area', axis=1, inplace=True) - self.logger.write_time_log('AgriculturalCropFertilizersSector', 'to_dst_resolution_parallel', - timeit.default_timer() - spent_time) - dst_shapefile.set_index('FID', inplace=True) - - return dst_shapefile + self.logger.write_log('Getting gridded constants', message_level=2) - def get_gridded_constants(self, gridded_ph_cec_path, ph_path, clipped_ph_path, cec_path, clipped_cec_path): - spent_time = timeit.default_timer() + gridded_ph_cec_path = os.path.join(self.auxiliary_dir, 'agriculture', 'fertilizers', 'gridded_constants') if not os.path.exists(gridded_ph_cec_path): self.logger.write_log('Getting PH from {0}'.format(ph_path), message_level=2) - IoRaster(self.comm).clip_raster_with_shapefile_poly(ph_path, self.clip.shapefile, clipped_ph_path, - nodata=255) + clipped_ph_path = os.path.join(self.auxiliary_dir, 'agriculture', 'fertilizers', 'gridded_PH.tiff') + if self.comm.Get_rank() == 0: + IoRaster(self.comm).clip_raster_with_shapefile_poly(ph_path, self.clip.shapefile, clipped_ph_path, + nodata=255) self.logger.write_log('PH clipped done!', message_level=3) - ph_gridded = IoRaster(self.comm).to_shapefile_serie(clipped_ph_path, nodata=255) + ph_gridded = IoRaster(self.comm).to_shapefile_parallel(clipped_ph_path, nodata=255) self.logger.write_log('PH to shapefile done!', message_level=3) + ph_gridded.set_index('CELL_ID', inplace=True) ph_gridded.rename(columns={'data': 'ph'}, inplace=True) + ph_gridded = IoShapefile(self.comm).balance(ph_gridded) # To correct input data ph_gridded['ph'] = ph_gridded['ph'] / 10 self.logger.write_log('PH to destiny resolution ...', message_level=3) @@ -202,85 +186,58 @@ class AgriculturalCropFertilizersSector(AgriculturalSector): self.logger.write_log('PH to destiny resolution done!', message_level=3) self.logger.write_log('Getting CEC from {0}'.format(cec_path), message_level=2) - IoRaster(self.comm).clip_raster_with_shapefile_poly(cec_path, self.clip.shapefile, clipped_cec_path, - nodata=-32768) + clipped_cec_path = os.path.join(self.auxiliary_dir, 'agriculture', 'fertilizers', 'gridded_CEC.tiff') + if self.comm.Get_rank() == 0: + IoRaster(self.comm).clip_raster_with_shapefile_poly(cec_path, self.clip.shapefile, clipped_cec_path, + nodata=-32768) self.logger.write_log('CEC clipped done!', message_level=3) - cec_gridded = IoRaster(self.comm).to_shapefile_serie(clipped_cec_path, nodata=-32768) + cec_gridded = IoRaster(self.comm).to_shapefile_parallel(clipped_cec_path, nodata=-32768) self.logger.write_log('CEC to shapefile done!', message_level=3) cec_gridded.rename(columns={'data': 'cec'}, inplace=True) + cec_gridded.set_index('CELL_ID', inplace=True) + cec_gridded = IoShapefile(self.comm).balance(cec_gridded) self.logger.write_log('CEC to destiny resolution ...', message_level=3) - cec_gridded = self.to_dst_resolution(cec_gridded, value='cec') + cec_gridded = self.to_dst_resolution(cec_gridded.reset_index(), value='cec') self.logger.write_log('CEC to destiny resolution done!', message_level=3) - gridded_ph_cec = ph_gridded - gridded_ph_cec['cec'] = cec_gridded['cec'] - gridded_ph_cec.dropna(inplace=True) + ph_gridded = IoShapefile(self.comm).gather_shapefile(ph_gridded.reset_index()) + cec_gridded = IoShapefile(self.comm).gather_shapefile(cec_gridded.reset_index()) + if self.comm.Get_rank() == 0: + gridded_ph_cec = ph_gridded + # gridded_ph_cec = ph_gridded.groupby('FID').mean() + # cec_gridded = cec_gridded.groupby('FID').mean() + # gridded_ph_cec = ph_gridded + gridded_ph_cec['cec'] = cec_gridded['cec'] + gridded_ph_cec.set_index('FID', inplace=True) + # gridded_ph_cec = gridded_ph_cec.loc[(gridded_ph_cec['ph'] > 0) & (gridded_ph_cec['cec'] > 0)] + gridded_ph_cec = GeoDataFrame( + gridded_ph_cec, + geometry=self.grid.shapefile.loc[gridded_ph_cec.index.get_level_values('FID'), 'geometry'].values, + crs=self.grid.shapefile.crs) + else: + gridded_ph_cec = None + gridded_ph_cec = IoShapefile(self.comm).split_shapefile(gridded_ph_cec) + # print('Rank {0} -Z PH: \n{1}\n'.format(self.comm.Get_rank(), np.unique(gridded_ph_cec['ph']))) + # print('Rank {0} -Z CEC: \n{1}\n'.format(self.comm.Get_rank(), np.unique(gridded_ph_cec['cec']))) + # print('Rank {0} -Z FID: \n{1}\n'.format(self.comm.Get_rank(), np.unique(gridded_ph_cec.index))) + # sys.stdout.flush() + # exit() + gridded_ph_cec = self.add_nut_code(gridded_ph_cec.reset_index(), self.nut_shapefile) + gridded_ph_cec = gridded_ph_cec[gridded_ph_cec['nut_code'] != -999] + gridded_ph_cec.set_index('FID', inplace=True) - gridded_ph_cec = self.add_nut_code(gridded_ph_cec, self.nut_shapefile) - gridded_ph_cec.index.name = 'FID' - # gridded_ph_cec.set_index('FID', inplace=True) + IoShapefile(self.comm).write_shapefile_parallel(gridded_ph_cec.reset_index(), gridded_ph_cec_path) - # # Selecting only PH and CEC cells that have also some crop. - # gridded_ph_cec = gridded_ph_cec.loc[self.crop_distribution.index, :] - IoShapefile(self.comm).write_shapefile_serial(gridded_ph_cec.reset_index(), gridded_ph_cec_path) + gridded_ph_cec = IoShapefile(self.comm).gather_bcast_shapefile(gridded_ph_cec) else: gridded_ph_cec = IoShapefile(self.comm).read_shapefile_serial(gridded_ph_cec_path) gridded_ph_cec.set_index('FID', inplace=True) - self.logger.write_time_log('AgriculturalCropFertilizersSector', 'get_gridded_constants', - timeit.default_timer() - spent_time) - return gridded_ph_cec - - def get_gridded_constants_parallel(self, gridded_ph_cec_path, ph_path, clipped_ph_path, cec_path, clipped_cec_path, - index): - spent_time = timeit.default_timer() - if not os.path.exists(gridded_ph_cec_path): - if self.comm.Get_rank() == 0: - self.logger.write_log('Getting PH from {0}'.format(ph_path), message_level=2) - IoRaster(self.comm).clip_raster_with_shapefile_poly(ph_path, self.clip.shapefile, clipped_ph_path, - nodata=255) - self.logger.write_log('PH clipped done!', message_level=3) - ph_gridded = IoRaster(self.comm).to_shapefile_serie(clipped_ph_path, nodata=255) - self.logger.write_log('PH to shapefile done!', message_level=3) - ph_gridded.rename(columns={'data': 'ph'}, inplace=True) - # To correct input data - ph_gridded['ph'] = ph_gridded['ph'] / 10 - else: - ph_gridded = None - self.logger.write_log('PH to destiny resolution ...', message_level=3) - ph_gridded = self.to_dst_resolution_parallel(ph_gridded, index, value='ph') - self.logger.write_log('PH to destiny resolution done!', message_level=3) - if self.comm.Get_rank() == 0: - self.logger.write_log('Getting CEC from {0}'.format(cec_path), message_level=2) - IoRaster(self.comm).clip_raster_with_shapefile_poly(cec_path, self.clip.shapefile, clipped_cec_path, - nodata=-32768) - self.logger.write_log('CEC clipped done!', message_level=3) - cec_gridded = IoRaster(self.comm).to_shapefile_serie(clipped_cec_path, nodata=-32768) - self.logger.write_log('CEC to shapefile done!', message_level=3) - cec_gridded.rename(columns={'data': 'cec'}, inplace=True) - else: - cec_gridded = None - - self.logger.write_log('CEC to destiny resolution ...', message_level=3) - cec_gridded = self.to_dst_resolution_parallel(cec_gridded, index, value='cec') - self.logger.write_log('CEC to destiny resolution done!', message_level=3) - - gridded_ph_cec = ph_gridded - gridded_ph_cec['cec'] = cec_gridded['cec'] - - gridded_ph_cec.dropna(inplace=True) - - gridded_ph_cec = self.add_nut_code(gridded_ph_cec, self.nut_shapefile) - gridded_ph_cec.index.name = 'FID' - # gridded_ph_cec.set_index('FID', inplace=True) + # Selecting only PH and CEC cells that have also some crop. + gridded_ph_cec = gridded_ph_cec.loc[self.crop_distribution.index, :] + # gridded_ph_cec = gridded_ph_cec.loc[(gridded_ph_cec['ph'] > 0) & (gridded_ph_cec['cec'] > 0)] - # # Selecting only PH and CEC cells that have also some crop. - # gridded_ph_cec = gridded_ph_cec.loc[self.crop_distribution.index, :] - IoShapefile(self.comm).write_shapefile_parallel(gridded_ph_cec.reset_index(), gridded_ph_cec_path) - else: - gridded_ph_cec = IoShapefile(self.comm).read_shapefile_parallel(gridded_ph_cec_path) - gridded_ph_cec.set_index('FID', inplace=True) - self.logger.write_time_log('AgriculturalCropFertilizersSector', 'get_gridded_constants_parallel', + self.logger.write_time_log('AgriculturalCropFertilizersSector', 'get_gridded_constants', timeit.default_timer() - spent_time) return gridded_ph_cec diff --git a/hermesv3_bu/sectors/agricultural_machinery_sector.py b/hermesv3_bu/sectors/agricultural_machinery_sector.py index c82b9ed48ebfbd06bb7f1e626c3ca5e3933f57e6..6d6245824bf27ab0b91307f1821dcf57d88560df 100755 --- a/hermesv3_bu/sectors/agricultural_machinery_sector.py +++ b/hermesv3_bu/sectors/agricultural_machinery_sector.py @@ -67,7 +67,7 @@ class AgriculturalMachinerySector(AgriculturalSector): crop_distribution.reset_index(inplace=True) - crop_distribution_nut_path = os.path.join(self.auxiliary_dir, 'crops', 'crops_nut.shp') + crop_distribution_nut_path = os.path.join(self.auxiliary_dir, 'agriculture', 'crops', 'crops_nuts3') if not os.path.exists(crop_distribution_nut_path): nut_shapefile = gpd.read_file(nut_shapefile) if nut_code is not None: diff --git a/hermesv3_bu/sectors/agricultural_sector.py b/hermesv3_bu/sectors/agricultural_sector.py index 31118ee158c8b3f386f6ef671e0e7ae81131a6b9..07a10e91fc8ed8076354c0d075f915edbb92f938 100755 --- a/hermesv3_bu/sectors/agricultural_sector.py +++ b/hermesv3_bu/sectors/agricultural_sector.py @@ -12,10 +12,17 @@ from mpi4py import MPI from hermesv3_bu.sectors.sector import Sector from hermesv3_bu.io_server.io_shapefile import IoShapefile from hermesv3_bu.io_server.io_raster import IoRaster +from hermesv3_bu.tools.checker import error_exit from hermesv3_bu.logger.log import Log +from hermesv3_bu.grids.grid import Grid from geopandas import GeoDataFrame from pandas import DataFrame +from ctypes import cdll, CDLL +cdll.LoadLibrary("libc.so.6") +libc = CDLL("libc.so.6") +libc.malloc_trim(0) + class AgriculturalSector(Sector): def __init__(self, comm_agr, comm, logger, auxiliary_dir, grid, clip, date_array, nut_shapefile, @@ -38,8 +45,8 @@ class AgriculturalSector(Sector): created yet. :type auxiliary_dir: str - :param grid_shp: Shapefile with the grid horizontal distribution. - :type grid_shp: GeoDataFrame + :param grid: Grid object + :type grid: Grid :param date_array: List of datetimes. :type date_array: list(datetime.datetime, ...) @@ -122,8 +129,10 @@ class AgriculturalSector(Sector): self.land_use_by_nut = land_use_by_nut self.crop_by_nut = crop_by_nut self.crop_from_landuse = self.get_crop_from_land_uses(crop_from_landuse_path) + self.crop_distribution = self.get_crops_by_dst_cell( os.path.join(auxiliary_dir, 'agriculture', 'crops', 'crops.shp')) + self.logger.write_time_log('AgriculturalSector', '__init__', timeit.default_timer() - spent_time) def involved_grid_cells(self, src_shp): @@ -168,6 +177,16 @@ class AgriculturalSector(Sector): """ Get the involved land uses and their weight for each crop. + Result: + { : [(int(, float()), (int(, float()), ...] + 'alfalfa': [(12, 1.0), (13, 0.3)], + 'almond': [(16, 1.0), (20, 0.3)], + 'apple': [(16, 1.0), (20, 0.3)], + 'apricot': [(16, 1.0), (20, 0.3)], + 'barley': [(12, 1.0)], + ... + } + :param crop_from_land_use_path: Path to the file that contains the crops and their involved land uses with the weights. :type crop_from_land_use_path: str @@ -210,40 +229,50 @@ class AgriculturalSector(Sector): return land_uses_list - def get_land_use_src_by_nut(self, land_uses): + def get_land_use_src_by_nut(self, land_uses, write=False): """ Create a shapefile with the involved source cells from the input raster and only for the given land uses. :param land_uses: List of land uses to use. :type land_uses: list - :return: Shapefile with the land use and NUT code of each source cell. Index: CELL_ID + :param write: Boolean that indicates if you want to write the land use shapefile in the source resolution. + :type write: bool + + :return: Shapefile with the land use and nut_code of each source cell. Index: CELL_ID :rtype: GeoDataFrame """ spent_time = timeit.default_timer() - land_use_src_by_nut_path = os.path.join(self.auxiliary_dir, 'agriculture', 'land_uses', 'land_uses_src.shp') + + land_use_src_by_nut_path = os.path.join(self.auxiliary_dir, 'agriculture', 'land_uses', 'land_use_src_nut') if not os.path.exists(land_use_src_by_nut_path): - land_uses_clipped = IoRaster(self.comm_agr).clip_raster_with_shapefile_poly( - self.land_uses_path, self.clip.shapefile, - os.path.join(self.auxiliary_dir, 'agriculture', 'land_uses', 'land_uses_clip.tif'), values=land_uses) - - land_uses_shp = IoRaster(self.comm_agr).to_shapefile_serie(land_uses_clipped) - - ccaa_shp = IoShapefile(self.comm_agr).read_shapefile_serial(self.nut_shapefile).to_crs(land_uses_shp.crs) - ccaa_shp.drop(columns=['nuts2_na'], inplace=True) - ccaa_shp.rename(columns={'nuts2_id': 'NUT'}, inplace=True) - # ccaa_shp.set_index('NUT', inplace=True) - land_use_src_by_nut = self.spatial_overlays(land_uses_shp.reset_index(), ccaa_shp, how='intersection') - land_use_src_by_nut.drop(columns=['idx1', 'idx2', 'CELL_ID'], inplace=True) + land_uses_clipped = os.path.join(self.auxiliary_dir, 'agriculture', 'land_uses', 'land_uses_clip.tif') + if self.comm_agr.Get_rank() == 0: + land_uses_clipped = IoRaster(self.comm_agr).clip_raster_with_shapefile_poly( + self.land_uses_path, self.clip.shapefile, land_uses_clipped, values=land_uses) + self.comm_agr.Barrier() + self.logger.write_log('\t\tRaster {0} to_shapefile.'.format(land_uses_clipped), message_level=3) + land_use_src_by_nut = IoRaster(self.comm_agr).to_shapefile_parallel(land_uses_clipped) + self.logger.write_log('\t\tFiltering shapefile.'.format(land_uses_clipped), message_level=3) land_use_src_by_nut.rename(columns={'data': 'land_use'}, inplace=True) land_use_src_by_nut['land_use'] = land_use_src_by_nut['land_use'].astype(np.int16) - land_use_src_by_nut.reset_index(inplace=True, drop=True) - IoShapefile(self.comm_agr).write_shapefile_serial(land_use_src_by_nut, land_use_src_by_nut_path) + land_use_src_by_nut = self.add_nut_code(land_use_src_by_nut, self.nut_shapefile, nut_value='nuts2_id') + land_use_src_by_nut = land_use_src_by_nut[land_use_src_by_nut['nut_code'] != -999] + libc.malloc_trim(0) + land_use_src_by_nut = IoShapefile(self.comm_agr).balance(land_use_src_by_nut) + + land_use_src_by_nut.set_index('CELL_ID', inplace=True) + if write: + self.logger.write_log('\t\tWriting {0} file.'.format(land_use_src_by_nut_path), message_level=3) + IoShapefile(self.comm_agr).write_shapefile_parallel(land_use_src_by_nut.reset_index(), + land_use_src_by_nut_path) else: - land_use_src_by_nut = IoShapefile(self.comm_agr).read_shapefile_serial(land_use_src_by_nut_path) - # land_use_src_by_nut.index.name = 'CELL_ID' + land_use_src_by_nut = IoShapefile(self.comm_agr).read_shapefile_parallel(land_use_src_by_nut_path) + land_use_src_by_nut.set_index('CELL_ID', inplace=True) + self.logger.write_time_log('AgriculturalSector', 'get_land_use_src_by_nut', timeit.default_timer() - spent_time) + return land_use_src_by_nut def get_tot_land_use_by_nut(self, land_uses): @@ -259,9 +288,10 @@ class AgriculturalSector(Sector): spent_time = timeit.default_timer() df = pd.read_csv(self.land_use_by_nut, dtype={'nuts2_id': str}) - df.rename(columns={'nuts2_id': 'NUT'}, inplace=True) + df.rename(columns={'nuts2_id': 'nut_code'}, inplace=True) df = df.loc[df['land_use'].isin(land_uses), :] - df.set_index(['NUT', 'land_use'], inplace=True) + df['nut_code'] = df['nut_code'].astype(np.int32) + df.set_index(['nut_code', 'land_use'], inplace=True) self.logger.write_time_log('AgriculturalSector', 'get_tot_land_use_by_nut', timeit.default_timer() - spent_time) return df @@ -282,10 +312,11 @@ class AgriculturalSector(Sector): spent_time = timeit.default_timer() land_use_by_nut = pd.DataFrame(index=pd.MultiIndex.from_product( - [np.unique(land_use_distribution_src_nut['NUT']), land_uses], names=['NUT', 'land_use'])) - + [np.unique(land_use_distribution_src_nut['nut_code'].astype(np.int64)), + np.unique(land_uses).astype(np.int16)], names=['nut_code', 'land_use'])) + land_use_by_nut['area'] = 0.0 land_use_distribution_src_nut['area'] = land_use_distribution_src_nut.area - land_use_by_nut['area'] = land_use_distribution_src_nut.groupby(['NUT', 'land_use']).sum() + land_use_by_nut['area'] += land_use_distribution_src_nut.groupby(['nut_code', 'land_use'])['area'].sum() land_use_by_nut.fillna(0.0, inplace=True) self.logger.write_time_log('AgriculturalSector', 'get_land_use_by_nut_csv', timeit.default_timer() - spent_time) @@ -305,17 +336,21 @@ class AgriculturalSector(Sector): :rtype: DataFrame """ spent_time = timeit.default_timer() + if nuts is not None: - land_use_by_nut = land_use_by_nut.iloc[land_use_by_nut.index.get_level_values('NUT').isin(nuts)] + land_use_by_nut = land_use_by_nut.iloc[land_use_by_nut.index.get_level_values('nut_code').isin(nuts)] - new_df = pd.DataFrame(index=np.unique(land_use_by_nut.index.get_level_values('NUT')), + new_df = pd.DataFrame(index=np.unique(land_use_by_nut.index.get_level_values('nut_code')), columns=self.crop_from_landuse.keys()) new_df.fillna(0, inplace=True) for crop, land_use_weight_list in self.crop_from_landuse.items(): for land_use, weight in land_use_weight_list: - new_df[crop] += land_use_by_nut.xs(land_use, level='land_use')['area'] * weight - + aux_df = land_use_by_nut.reset_index() + aux_df = aux_df.loc[aux_df['land_use'] == land_use] + aux_df.drop(columns=['land_use'], inplace=True) + aux_df.set_index('nut_code', inplace=True) + new_df[crop] += aux_df['area'] * weight self.logger.write_time_log('AgriculturalSector', 'land_use_to_crop_by_nut', timeit.default_timer() - spent_time) return new_df @@ -358,10 +393,10 @@ class AgriculturalSector(Sector): crop_by_nut = pd.read_csv(self.crop_by_nut, dtype={'nuts2_id': str}) crop_by_nut.drop(columns='nuts2_na', inplace=True) - crop_by_nut.rename(columns={'nuts2_id': 'code'}, inplace=True) + crop_by_nut.rename(columns={'nuts2_id': 'nut_code'}, inplace=True) - # crop_by_nut['code'] = crop_by_nut['code'].astype(np.int16) - crop_by_nut.set_index('code', inplace=True) + crop_by_nut['nut_code'] = crop_by_nut['nut_code'].astype(np.int64) + crop_by_nut.set_index('nut_code', inplace=True) crop_by_nut = crop_by_nut.loc[crop_share_by_nut.index, :] crop_area_by_nut = crop_share_by_nut * crop_by_nut @@ -384,25 +419,26 @@ class AgriculturalSector(Sector): """ spent_time = timeit.default_timer() - crop_distribution_src = land_use_distribution_src_nut.loc[:, ['NUT', 'geometry']] + crop_distribution_src = land_use_distribution_src_nut.loc[:, ['nut_code', 'geometry']] + for crop, landuse_weight_list in self.crop_from_landuse.items(): crop_distribution_src[crop] = 0 for landuse, weight in landuse_weight_list: crop_distribution_src.loc[land_use_distribution_src_nut['land_use'] == int(landuse), crop] += \ land_use_distribution_src_nut.loc[land_use_distribution_src_nut['land_use'] == int(landuse), 'area'] * float(weight) - for nut in np.unique(crop_distribution_src['NUT']): + for nut in np.unique(crop_distribution_src['nut_code']): for crop in crop_area_by_nut.columns.values: - crop_distribution_src.loc[crop_distribution_src['NUT'] == nut, crop] /= crop_distribution_src.loc[ - crop_distribution_src['NUT'] == nut, crop].sum() - for nut in np.unique(crop_distribution_src['NUT']): + crop_distribution_src.loc[crop_distribution_src['nut_code'] == nut, crop] /= crop_distribution_src.loc[ + crop_distribution_src['nut_code'] == nut, crop].sum() + for nut in np.unique(crop_distribution_src['nut_code']): for crop in crop_area_by_nut.columns.values: - crop_distribution_src.loc[crop_distribution_src['NUT'] == nut, crop] *= \ + crop_distribution_src.loc[crop_distribution_src['nut_code'] == nut, crop] *= \ crop_area_by_nut.loc[nut, crop] self.logger.write_time_log('AgriculturalSector', 'calculate_crop_distribution_src', timeit.default_timer() - spent_time) - + crop_distribution_src = IoShapefile(self.comm_agr).balance(crop_distribution_src) return crop_distribution_src def get_crop_distribution_in_dst_cells(self, crop_distribution): @@ -420,9 +456,10 @@ class AgriculturalSector(Sector): crop_distribution = crop_distribution.to_crs(self.grid.shapefile.crs) crop_distribution['src_inter_fraction'] = crop_distribution.geometry.area - crop_distribution = self.spatial_overlays(crop_distribution.reset_index(), self.grid.shapefile.reset_index(), how='intersection') + + crop_distribution = IoShapefile(self.comm_agr).balance(crop_distribution) crop_distribution['src_inter_fraction'] = \ crop_distribution.geometry.area / crop_distribution['src_inter_fraction'] @@ -455,44 +492,48 @@ class AgriculturalSector(Sector): """ spent_time = timeit.default_timer() if not os.path.exists(file_path): + self.logger.write_log('Creating the crop distribution shapefile.', message_level=2) + + self.logger.write_log('\tCreating land use distribution on the source resolution.', message_level=3) + involved_land_uses = self.get_involved_land_uses() + land_use_distribution_src_nut = self.get_land_use_src_by_nut(involved_land_uses, write=False) + + land_use_by_nut = self.get_land_use_by_nut_csv(land_use_distribution_src_nut, involved_land_uses) + + self.logger.write_log('\tCreating the crop distribution on the source resolution.', message_level=3) + crop_by_nut = self.land_use_to_crop_by_nut(land_use_by_nut) + tot_land_use_by_nut = self.get_tot_land_use_by_nut(involved_land_uses) + tot_crop_by_nut = self.land_use_to_crop_by_nut( + tot_land_use_by_nut, nuts=list(np.unique(land_use_by_nut.index.get_level_values('nut_code')))) + crop_shape_by_nut = self.get_crop_shape_by_nut(crop_by_nut, tot_crop_by_nut) + crop_area_by_nut = self.get_crop_area_by_nut(crop_shape_by_nut) + crop_distribution_src = self.calculate_crop_distribution_src( + crop_area_by_nut, land_use_distribution_src_nut) + + self.logger.write_log('\tCreating the crop distribution on the grid resolution.', message_level=3) + crop_distribution_dst = self.get_crop_distribution_in_dst_cells(crop_distribution_src) + self.logger.write_log('\tCreating the crop distribution shapefile.', message_level=3) + crop_distribution_dst = IoShapefile(self.comm_agr).gather_shapefile(crop_distribution_dst.reset_index()) if self.comm_agr.Get_rank() == 0: - self.logger.write_log('Creating the crop distribution shapefile on the grid resolution.', - message_level=2) - - involved_land_uses = self.get_involved_land_uses() - land_use_distribution_src_nut = self.get_land_use_src_by_nut(involved_land_uses) - land_use_by_nut = self.get_land_use_by_nut_csv(land_use_distribution_src_nut, involved_land_uses) - tot_land_use_by_nut = self.get_tot_land_use_by_nut(involved_land_uses) - - crop_by_nut = self.land_use_to_crop_by_nut(land_use_by_nut) - tot_crop_by_nut = self.land_use_to_crop_by_nut( - tot_land_use_by_nut, nuts=list(np.unique(land_use_by_nut.index.get_level_values('NUT')))) - - crop_shape_by_nut = self.get_crop_shape_by_nut(crop_by_nut, tot_crop_by_nut) - crop_area_by_nut = self.get_crop_area_by_nut(crop_shape_by_nut) - - crop_distribution_src = self.calculate_crop_distribution_src( - crop_area_by_nut, land_use_distribution_src_nut) - - crop_distribution_dst = self.get_crop_distribution_in_dst_cells(crop_distribution_src) - crop_distribution_dst = self.add_timezone(crop_distribution_dst) - - IoShapefile(self.comm).write_shapefile_serial(crop_distribution_dst, file_path) + crop_distribution_dst = crop_distribution_dst.groupby('FID').sum() + crop_distribution_dst = GeoDataFrame( + crop_distribution_dst, + geometry=self.grid.shapefile.loc[crop_distribution_dst.index.get_level_values('FID'), + 'geometry'].values, + crs=self.grid.shapefile.crs) else: - self.logger.write_log('Waiting for the master process that creates the crop distribution shapefile.', - message_level=2) crop_distribution_dst = None - self.comm_agr.Barrier() - if self.comm.Get_rank() == 0 and self.comm_agr.Get_rank() != 0: - # Every master rank read the created crop distribution shapefile. - crop_distribution_dst = IoShapefile(self.comm).read_shapefile_serial(file_path) - self.comm.Barrier() - crop_distribution_dst = IoShapefile(self.comm).split_shapefile(crop_distribution_dst) - else: - crop_distribution_dst = IoShapefile(self.comm).read_shapefile_parallel(file_path) + self.logger.write_log('\tAdding timezone to the shapefile.', message_level=3) + crop_distribution_dst = IoShapefile(self.comm_agr).split_shapefile(crop_distribution_dst) + crop_distribution_dst = self.add_timezone(crop_distribution_dst) + + self.logger.write_log('\tWriting the crop distribution shapefile.', message_level=3) + IoShapefile(self.comm_agr).write_shapefile_parallel(crop_distribution_dst, file_path) + + crop_distribution_dst = IoShapefile(self.comm).read_shapefile_parallel(file_path) crop_distribution_dst.set_index('FID', inplace=True, drop=True) - # Filtering crops by used on the subsector (operations, fertilizers, machinery) + # Filtering crops by used on the sub-sector (operations, fertilizers, machinery) crop_distribution_dst = crop_distribution_dst.loc[:, self.crop_list + ['timezone', 'geometry']] self.logger.write_time_log('AgriculturalSector', 'get_crops_by_dst_cell', timeit.default_timer() - spent_time) diff --git a/hermesv3_bu/sectors/livestock_sector.py b/hermesv3_bu/sectors/livestock_sector.py index 2cf1cc921d2a760747e063139ec5d166d3f4fa2d..f97748c2acd31c1ca2118299afbdc366fba7efa1 100755 --- a/hermesv3_bu/sectors/livestock_sector.py +++ b/hermesv3_bu/sectors/livestock_sector.py @@ -15,6 +15,8 @@ from hermesv3_bu.io_server.io_netcdf import IoNetcdf from hermesv3_bu.grids.grid import Grid from hermesv3_bu.tools.checker import check_files, error_exit +from geopandas import GeoDataFrame + # Constants for grassing daily factor estimation SIGMA = 60 TAU = 170 @@ -155,6 +157,7 @@ class LivestockSector(Sector): """ spent_time = timeit.default_timer() logger.write_log('===== LIVESTOCK SECTOR =====') + check_files( [gridded_livestock_path.replace('', animal) for animal in animal_list] + [correction_split_factors_path.replace('', animal) for animal in animal_list] + @@ -212,26 +215,21 @@ class LivestockSector(Sector): "nut_code" column must contain the NUT ID. :type correction_split_factors_path: str - :return: GeoDataframe with the amount of each animal subtype by destiny cell (FID) + :return: GeoDataFrame with the amount of each animal subtype by destiny cell (FID) Columns: 'FID', 'cattle_01', 'cattle_02', 'cattle_03' 'cattle_04', 'cattle_05', 'cattle_06', 'cattle_07', 'cattle_08', 'cattle_09', 'cattle_10', 'cattle_11', 'chicken_01', 'chicken_02', 'goats_01', 'goats_02', 'goats_03', goats_04', 'goats_05', 'goats_06', 'pigs_01', 'pigs_02', 'pigs_03', 'pigs_04', 'pigs_05', 'pigs_06', 'pigs_07', 'pigs_08', 'pigs_09', 'pigs_10', 'timezone', 'geometry' - :rtype: geopandas.GeoDataframe + :rtype: GeoDataFrame """ spent_time = timeit.default_timer() self.logger.write_log('\tCreating animal distribution', message_level=2) - # Work for master MPI process - if self.comm.Get_rank() == 0: - animals_df = self.create_animals_shapefile(gridded_livestock_path) - animals_df = self.animal_distribution_by_category(animals_df, nut_shapefile_path, - correction_split_factors_path) - else: - animals_df = None - # Split distribution, in a balanced way, between MPI process - animals_df = IoShapefile(self.comm).split_shapefile(animals_df) + animals_df = self.create_animals_shapefile(gridded_livestock_path) + animals_df = self.animal_distribution_by_category(animals_df, nut_shapefile_path, + correction_split_factors_path) + self.logger.write_log('Animal distribution done', message_level=2) self.logger.write_time_log('LivestockSector', 'create_animals_distribution', timeit.default_timer() - spent_time) @@ -274,7 +272,7 @@ class LivestockSector(Sector): :type gridded_livestock_path: str :return: Shapefile with the amount of each animal of the animal list in the source resolution. - :rtype: geopandas.GeoDataframe + :rtype: GeoDataFrame """ spent_time = timeit.default_timer() self.logger.write_log('\t\tCreating animal shapefile into source resolution', message_level=3) @@ -283,22 +281,22 @@ class LivestockSector(Sector): for animal in self.animal_list: self.logger.write_log('\t\t\t {0}'.format(animal), message_level=3) # Each one of the animal distributions will be stored separately - animal_distribution_path = os.path.join(self.auxiliary_dir, 'livestock', 'animal_distribution', animal, - '{0}.shp'.format(animal)) + animal_distribution_path = os.path.join(self.auxiliary_dir, 'livestock', animal, '{0}.shp'.format(animal)) if not os.path.exists(animal_distribution_path): # Create clipped raster file - clipped_raster_path = IoRaster(self.comm).clip_raster_with_shapefile_poly( - gridded_livestock_path.replace('', animal), self.clip.shapefile, - os.path.join(self.auxiliary_dir, 'livestock', 'animal_distribution', animal, - '{0}_clip.tif'.format(animal))) - - animal_df = IoRaster(self.comm).to_shapefile_serie(clipped_raster_path, animal_distribution_path, - write=True) + clipped_raster_path = os.path.join( + self.auxiliary_dir, 'livestock', animal, '{0}_clip.tif'.format(animal)) + if self.comm.Get_rank() == 0: + clipped_raster_path = IoRaster(self.comm).clip_raster_with_shapefile_poly( + gridded_livestock_path.replace('', animal), self.clip.shapefile, clipped_raster_path) + + animal_df = IoRaster(self.comm).to_shapefile_parallel(clipped_raster_path) + animal_df.rename(columns={'data': animal}, inplace=True) + animal_df.set_index('CELL_ID', inplace=True) + IoShapefile(self.comm).write_shapefile_parallel(animal_df.reset_index(), animal_distribution_path) else: - animal_df = IoShapefile(self.comm).read_shapefile_serial(animal_distribution_path) - - animal_df.rename(columns={'data': animal}, inplace=True) - # animal_df.set_index('CELL_ID', inplace=True) + animal_df = IoShapefile(self.comm).read_shapefile_parallel(animal_distribution_path) + animal_df.set_index('CELL_ID', inplace=True) # Creating full animal shapefile if animal_distribution is None: @@ -312,9 +310,9 @@ class LivestockSector(Sector): # Removing empty data animal_distribution = animal_distribution.loc[(animal_distribution[self.animal_list] != 0).any(axis=1), :] + self.logger.write_time_log('LivestockSector', 'create_animals_shapefile_src_resolution', timeit.default_timer() - spent_time) - return animal_distribution def animals_shapefile_to_dst_resolution(self, animal_distribution): @@ -322,14 +320,16 @@ class LivestockSector(Sector): Interpolates the source distribution into the destiny grid. :param animal_distribution: Animal distribution shapefile in the source resolution. - :type animal_distribution: geopandas.GeoDataframe + :type animal_distribution: GeoDataFrame :return: Animal distribution shapefile in the destiny resolution. - :rtype: geopandas.GeoDataframe + :rtype: GeoDataFrame """ spent_time = timeit.default_timer() self.logger.write_log('\t\tCreating animal shapefile into destiny resolution', message_level=3) self.grid.shapefile.reset_index(inplace=True) + + animal_distribution = IoShapefile(self.comm).balance(animal_distribution) # Changing coordinates system to the grid one animal_distribution.to_crs(self.grid.shapefile.crs, inplace=True) # Getting src area @@ -347,11 +347,17 @@ class LivestockSector(Sector): # Sum by destiny cell animal_distribution = animal_distribution.loc[:, self.animal_list + ['FID']].groupby('FID').sum() - self.grid.shapefile.set_index('FID', drop=False, inplace=True) - # Adding geometry and coordinates system from the destiny grid shapefile - animal_distribution = gpd.GeoDataFrame(animal_distribution, crs=self.grid.shapefile.crs, - geometry=self.grid.shapefile.loc[animal_distribution.index, 'geometry']) - animal_distribution.reset_index(inplace=True) + animal_distribution = IoShapefile(self.comm).gather_shapefile(animal_distribution.reset_index()) + if self.comm.Get_rank() == 0: + animal_distribution = animal_distribution.groupby('FID').sum() + # Adding geometry and coordinates system from the destiny grid shapefile + animal_distribution = gpd.GeoDataFrame( + animal_distribution, crs=self.grid.shapefile.crs, + geometry=self.grid.shapefile.loc[animal_distribution.index, 'geometry']) + else: + animal_distribution = None + + animal_distribution = IoShapefile(self.comm).split_shapefile(animal_distribution) self.logger.write_time_log('LivestockSector', 'animals_shapefile_to_dst_resolution', timeit.default_timer() - spent_time) @@ -373,15 +379,15 @@ class LivestockSector(Sector): :return: """ spent_time = timeit.default_timer() - animal_distribution_path = os.path.join(self.auxiliary_dir, 'livestock', 'animal_distribution', - 'animal_distribution.shp') + animal_distribution_path = os.path.join(self.auxiliary_dir, 'livestock', 'animal_distribution') if not os.path.exists(animal_distribution_path): dataframe = self.create_animals_shapefile_src_resolution(gridded_livestock_path) dataframe = self.animals_shapefile_to_dst_resolution(dataframe) - IoShapefile(self.comm).write_shapefile_serial(dataframe, animal_distribution_path) + IoShapefile(self.comm).write_shapefile_parallel(dataframe.reset_index(), animal_distribution_path) else: - dataframe = IoShapefile(self.comm).read_shapefile_serial(animal_distribution_path) + dataframe = IoShapefile(self.comm).read_shapefile_parallel(animal_distribution_path) + dataframe.set_index('FID', inplace=True) self.logger.write_time_log('LivestockSector', 'create_animals_shapefile', timeit.default_timer() - spent_time) return dataframe @@ -426,12 +432,12 @@ class LivestockSector(Sector): return splitting_factors - def animal_distribution_by_category(self, dataframe, nut_shapefile_path, correction_split_factors_path): + def animal_distribution_by_category(self, animal_distribution, nut_shapefile_path, correction_split_factors_path): """ Split the animal categories into as many categories as each animal type has. - :param dataframe: GeoDataframe with the animal distribution by animal type. - :type dataframe: geopandas.GeoDataframe + :param animal_distribution: GeoDataFrame with the animal distribution by animal type. + :type animal_distribution: GeoDataFrame :param nut_shapefile_path: Path to the shapefile that contain the NUT polygons. The shapefile must contain the 'nuts3_id' information with the NUT_code. @@ -451,37 +457,43 @@ class LivestockSector(Sector): 'cattle_08', 'cattle_09', 'cattle_10', 'cattle_11', 'chicken_01', 'chicken_02', 'goats_01', 'goats_02', 'goats_03', goats_04', 'goats_05', 'goats_06', 'pigs_01', 'pigs_02', 'pigs_03', 'pigs_04', 'pigs_05', 'pigs_06', 'pigs_07', 'pigs_08', 'pigs_09', 'pigs_10', 'timezone', 'geometry' - :rtype: geopandas.GeoDataframe + :rtype: GeoDataFrame """ spent_time = timeit.default_timer() - animal_distribution_path = os.path.join(self.auxiliary_dir, 'livestock', 'animal_distribution', - 'animal_distribution_by_cat.shp') + animal_distribution_path = os.path.join(self.auxiliary_dir, 'livestock', 'animal_distribution_by_cat') if not os.path.exists(animal_distribution_path): - dataframe = self.add_nut_code(dataframe, nut_shapefile_path, nut_value='nuts3_id') - dataframe.rename(columns={'nut_code': 'nuts3_id'}, inplace=True) + animal_distribution = self.add_nut_code(animal_distribution.reset_index(), nut_shapefile_path, + nut_value='nuts3_id') + animal_distribution.rename(columns={'nut_code': 'nuts3_id'}, inplace=True) + animal_distribution = animal_distribution[animal_distribution['nuts3_id'] != -999] + animal_distribution.set_index('FID', inplace=True) splitting_factors = self.get_splitting_factors(correction_split_factors_path) # Adding the splitting factors by NUT code - dataframe = pd.merge(dataframe, splitting_factors, how='left', on='nuts3_id') - - dataframe.drop(columns=['nuts3_id'], inplace=True) + animal_distribution = pd.merge(animal_distribution.reset_index(), splitting_factors, how='left', + on='nuts3_id') + animal_distribution.set_index('FID', inplace=True) + animal_distribution.drop(columns=['nuts3_id'], inplace=True) for animal in self.animal_list: - animal_types = [i for i in list(dataframe.columns.values) if i.startswith(animal)] - dataframe.loc[:, animal_types] = dataframe.loc[:, animal_types].multiply(dataframe[animal], - axis='index') - dataframe.drop(columns=[animal], inplace=True) + animal_types = [i for i in list(animal_distribution.columns.values) if i.startswith(animal)] + animal_distribution.loc[:, animal_types] = animal_distribution.loc[:, animal_types].multiply( + animal_distribution[animal], axis='index') + animal_distribution.drop(columns=[animal], inplace=True) - dataframe = self.add_timezone(dataframe) - IoShapefile(self.comm).write_shapefile_serial(dataframe, animal_distribution_path) + animal_distribution = self.add_timezone(animal_distribution) + animal_distribution.set_index('FID', inplace=True) + + IoShapefile(self.comm).write_shapefile_parallel(animal_distribution.reset_index(), animal_distribution_path) else: - dataframe = IoShapefile(self.comm).read_shapefile_serial(animal_distribution_path) + animal_distribution = IoShapefile(self.comm).read_shapefile_parallel(animal_distribution_path) + animal_distribution.set_index('FID', inplace=True) self.logger.write_time_log('LivestockSector', 'animal_distribution_by_category', timeit.default_timer() - spent_time) - return dataframe + return animal_distribution def get_daily_factors(self, animal_shp, day): """ @@ -501,13 +513,13 @@ class LivestockSector(Sector): 'cattle_08', 'cattle_09', 'cattle_10', 'cattle_11', 'chicken_01', 'chicken_02', 'goats_01', 'goats_02', 'goats_03', goats_04', 'goats_05', 'goats_06', 'pigs_01', 'pigs_02', 'pigs_03', 'pigs_04', 'pigs_05', 'pigs_06', 'pigs_07', 'pigs_08', 'pigs_09', 'pigs_10', 'timezone', 'geometry' - :type animal_shp: geopandas.GeoDataframe + :type animal_shp: GeoDataFrame :param day: Date of the day to generate. :type day: datetime.date :return: Shapefile with the daily factors. - :rtype: geopandas.GeoDataframe + :rtype: GeoDataFrame """ import math spent_time = timeit.default_timer() @@ -662,15 +674,15 @@ class LivestockSector(Sector): 'cattle_08', 'cattle_09', 'cattle_10', 'cattle_11', 'chicken_01', 'chicken_02', 'goats_01', 'goats_02', 'goats_03', goats_04', 'goats_05', 'goats_06', 'pigs_01', 'pigs_02', 'pigs_03', 'pigs_04', 'pigs_05', 'pigs_06', 'pigs_07', 'pigs_08', 'pigs_09', 'pigs_10', 'timezone', 'geometry' - :type animals_df: geopandas.GeoDataframe + :type animals_df: GeoDataFrame :param daily_factors: GeoDataframe with the daily factors. Columns: 'REC', 'geometry', 'FD_housing_open', 'FD_housing_closed, 'FD_storage', 'FD_grassing' - :type daily_factors: geopandas.GeoDataframe + :type daily_factors: GeoDataFrame :return: Animal distribution with the daily factors. - :rtype: geopandas.GeoDataframe + :rtype: GeoDataFrame """ spent_time = timeit.default_timer() animals_df = animals_df.to_crs({'init': 'epsg:4326'}) @@ -692,21 +704,23 @@ class LivestockSector(Sector): """ Calculate the emissions, already speciated, corresponding to the given day. - :param animals_df: GeoDataframe with the amount of each animal subtype by destiny cell (FID) + :param animals_df: GeoDataFrame with the amount of each animal subtype by destiny cell (FID) Columns: 'FID', 'cattle_01', 'cattle_02', 'cattle_03' 'cattle_04', 'cattle_05', 'cattle_06', 'cattle_07', 'cattle_08', 'cattle_09', 'cattle_10', 'cattle_11', 'chicken_01', 'chicken_02', 'goats_01', 'goats_02', 'goats_03', goats_04', 'goats_05', 'goats_06', 'pigs_01', 'pigs_02', 'pigs_03', 'pigs_04', 'pigs_05', 'pigs_06', 'pigs_07', 'pigs_08', 'pigs_09', 'pigs_10', 'timezone', 'geometry' - :type animals_df: geopandas.GeoDataframe + :type animals_df: GeoDataFrame :param day: Date of the day to generate. :type day: datetime.date - :return: GeoDataframe with the daily emissions by destiny cell. - :rtype: geopandas.GeoDataframe + :return: GeoDataFrame with the daily emissions by destiny cell. + :rtype: GeoDataFrame """ spent_time = timeit.default_timer() + + animals_df.reset_index(inplace=True) daily_factors = self.get_daily_factors(animals_df.loc[:, ['FID', 'geometry']], day) animals_df = self.add_daily_factors_to_animal_distribution(animals_df, daily_factors) @@ -938,7 +952,7 @@ class LivestockSector(Sector): 'cattle_08', 'cattle_09', 'cattle_10', 'cattle_11', 'chicken_01', 'chicken_02', 'goats_01', 'goats_02', 'goats_03', goats_04', 'goats_05', 'goats_06', 'pigs_01', 'pigs_02', 'pigs_03', 'pigs_04', 'pigs_05', 'pigs_06', 'pigs_07', 'pigs_08', 'pigs_09', 'pigs_10', 'timezone', 'geometry' - :type animals_df: geopandas.GeoDataframe + :type animals_df: GeoDataFrame :return: Dictionary with the day as key (same key as self.day_dict) and the daily emissions as value. :rtype: dict @@ -960,7 +974,7 @@ class LivestockSector(Sector): :type df_by_day: dict :return: GeoDataframe with all the time steps (each time step have the daily emission) - :rtype: geopandas.GeoDataframe + :rtype: GeoDataFrame """ spent_time = timeit.default_timer() df_list = [] @@ -987,7 +1001,7 @@ class LivestockSector(Sector): :type dict_by_day: dict :return: GeoDataframe with the hourly distribution. - :rtype: geopandas.GeoDataframe + :rtype: GeoDataFrame """ spent_time = timeit.default_timer() @@ -1041,7 +1055,7 @@ class LivestockSector(Sector): Calculate the livestock emissions hourly distributed. :return: GeoDataframe with all the emissions. - :rtype: geopandas.GeoDataframe + :rtype: GeoDataFrame """ spent_time = timeit.default_timer() self.logger.write_log('\tCalculating emissions') diff --git a/hermesv3_bu/sectors/residential_sector.py b/hermesv3_bu/sectors/residential_sector.py index ae9c9b3e73ad991b2b39284a9612fc86c8028c78..4f34981bdbbec04c4cb752671111287a899ffc39 100755 --- a/hermesv3_bu/sectors/residential_sector.py +++ b/hermesv3_bu/sectors/residential_sector.py @@ -51,13 +51,8 @@ class ResidentialSector(Sector): self.residential_spatial_proxies = self.read_residential_spatial_proxies(residential_spatial_proxies) self.ef_profiles = self.read_ef_file(residential_ef_files_path) - if self.comm.Get_rank() == 0: - self.fuel_distribution = self.get_fuel_distribution(prov_shapefile, ccaa_shapefile, population_density_map, - population_type_map, create_pop_csv=False) - else: - self.fuel_distribution = None - self.fuel_distribution = IoShapefile(self.comm).split_shapefile(self.fuel_distribution) - + self.fuel_distribution = self.get_fuel_distribution( + prov_shapefile, ccaa_shapefile, population_density_map, population_type_map, create_pop_csv=False) self.heating_degree_day_path = heating_degree_day_path self.temperature_path = temperature_path self.logger.write_time_log('ResidentialSector', '__init__', timeit.default_timer() - spent_time) @@ -159,26 +154,25 @@ class ResidentialSector(Sector): fuel_distribution_path = os.path.join(self.auxiliary_dir, 'residential', 'fuel_distribution.shp') if not os.path.exists(fuel_distribution_path): - - population_density = IoRaster(self.comm).clip_raster_with_shapefile_poly( - population_density_map, self.clip.shapefile, - os.path.join(self.auxiliary_dir, 'residential', 'population_density.tif')) - population_density = IoRaster(self.comm).to_shapefile_serie_by_cell(population_density) + population_density = os.path.join(self.auxiliary_dir, 'residential', 'population_density.tif') + if self.comm.Get_rank() == 0: + population_density = IoRaster(self.comm).clip_raster_with_shapefile_poly( + population_density_map, self.clip.shapefile, population_density) + population_density = IoRaster(self.comm).to_shapefile_parallel(population_density) population_density.rename(columns={'data': 'pop'}, inplace=True) - population_type = IoRaster(self.comm).clip_raster_with_shapefile_poly( - population_type_map, self.clip.shapefile, - os.path.join(self.auxiliary_dir, 'residential', 'population_type.tif')) - population_type = IoRaster(self.comm).to_shapefile_serie_by_cell(population_type) + population_type = os.path.join(self.auxiliary_dir, 'residential', 'population_type.tif') + if self.comm.Get_rank() == 0: + population_type = IoRaster(self.comm).clip_raster_with_shapefile_poly( + population_type_map, self.clip.shapefile, population_type) + population_type = IoRaster(self.comm).to_shapefile_parallel(population_type) population_type.rename(columns={'data': 'type'}, inplace=True) population_type['type'] = population_type['type'].astype(np.int16) population_type.loc[population_type['type'] == 2, 'type'] = 3 population_density['type'] = population_type['type'] - # population_density = gpd.sjoin(population_density, population_type, how='left', op='intersects') - # population_density.drop(columns=['index_right'], inplace=True) population_density = self.add_nut_code(population_density, prov_shapefile, nut_value='nuts3_id') population_density.rename(columns={'nut_code': 'prov'}, inplace=True) @@ -186,6 +180,7 @@ class ResidentialSector(Sector): population_density = self.add_nut_code(population_density, ccaa_shapefile, nut_value='nuts2_id') population_density.rename(columns={'nut_code': 'ccaa'}, inplace=True) population_density = population_density.loc[population_density['ccaa'] != -999, :] + population_density = IoShapefile(self.comm).balance(population_density) if create_pop_csv: population_density.loc[:, ['prov', 'pop', 'type']].groupby(['prov', 'type']).sum().reset_index().to_csv( @@ -268,10 +263,15 @@ class ResidentialSector(Sector): fuel] = population_density['pop'].multiply( energy_consumption / total_pop) fuel_distribution = self.to_dst_resolution(fuel_distribution) - fuel_distribution.set_index('FID', inplace=True) - IoShapefile(self.comm).write_shapefile_serial(fuel_distribution.reset_index(), fuel_distribution_path) + fuel_distribution = IoShapefile(self.comm).gather_shapefile(fuel_distribution, rank=0) + if self.comm.Get_rank() == 0: + fuel_distribution.groupby('FID').sum() + IoShapefile(self.comm).write_shapefile_serial(fuel_distribution, fuel_distribution_path) + else: + fuel_distribution = None + fuel_distribution = IoShapefile(self.comm).split_shapefile(fuel_distribution) else: - fuel_distribution = IoShapefile(self.comm).read_shapefile_serial(fuel_distribution_path) + fuel_distribution = IoShapefile(self.comm).read_shapefile_parallel(fuel_distribution_path) fuel_distribution.set_index('FID', inplace=True) self.logger.write_time_log('ResidentialSector', 'get_fuel_distribution', timeit.default_timer() - spent_time) diff --git a/hermesv3_bu/sectors/sector.py b/hermesv3_bu/sectors/sector.py index 0427e7b28b742539fc2397818174261672082a67..bac090adb382ad76e84f6b7f94ebf6eb29375cfb 100755 --- a/hermesv3_bu/sectors/sector.py +++ b/hermesv3_bu/sectors/sector.py @@ -8,9 +8,12 @@ import pandas as pd import geopandas as gpd from mpi4py import MPI +from hermesv3_bu.io_server.io_raster import IoRaster +from hermesv3_bu.io_server.io_shapefile import IoShapefile from geopandas import GeoDataFrame from hermesv3_bu.logger.log import Log from hermesv3_bu.grids.grid import Grid +from geopandas import GeoDataFrame class Sector(object): @@ -441,7 +444,7 @@ class Sector(object): :type nut_value: str :return: Shapefile with the 'nut_code' column set. - :rtype: geopandas.GeoDataframe + :rtype: GeoDataFrame """ spent_time = timeit.default_timer() nut_shapefile = gpd.read_file(nut_shapefile_path).to_crs(shapefile.crs) @@ -452,7 +455,7 @@ class Sector(object): shapefile.rename(columns={nut_value: 'nut_code'}, inplace=True) shapefile.loc[shapefile['nut_code'].isna(), 'nut_code'] = -999 - shapefile['nut_code'] = shapefile['nut_code'].astype(np.int16) + shapefile['nut_code'] = shapefile['nut_code'].astype(np.int64) self.logger.write_time_log('Sector', 'add_nut_code', timeit.default_timer() - spent_time) return shapefile @@ -566,9 +569,6 @@ class Sector(object): return return_value def calculate_land_use_by_nut(self, land_use_raster_path, nut_shapefile_path, out_land_use_by_nut_path): - from hermesv3_bu.io_server.io_raster import IoRaster - from hermesv3_bu.io_server.io_shapefile import IoShapefile - # 1st Clip the raster lu_raster_path = os.path.join(self.auxiliary_dir, 'clipped_land_use.tif') @@ -606,3 +606,35 @@ class Sector(object): land_use_by_nut.to_csv(out_land_use_by_nut_path) print('DONE -> {0}'.format(out_land_use_by_nut_path)) self.comm.Barrier() + + def create_population_by_nut(self, population_raster_path, nut_shapefile_path, output_path, nut_column='nuts3_id'): + # 1st Clip the raster + self.logger.write_log("\t\tCreating clipped population raster", message_level=3) + if self.comm.Get_rank() == 0: + clipped_population_path = IoRaster(self.comm).clip_raster_with_shapefile_poly( + population_raster_path, self.clip.shapefile, + os.path.join(self.auxiliary_dir, 'traffic_area', 'pop.tif')) + else: + clipped_population_path = None + + # 2nd Raster to shapefile + self.logger.write_log("\t\tRaster to shapefile", message_level=3) + pop_shp = IoRaster(self.comm).to_shapefile_parallel( + clipped_population_path, gather=False, bcast=False, crs={'init': 'epsg:4326'}) + + # 3rd Add NUT code + self.logger.write_log("\t\tAdding nut codes to the shapefile", message_level=3) + # if self.comm.Get_rank() == 0: + pop_shp.drop(columns='CELL_ID', inplace=True) + pop_shp.rename(columns={'data': 'population'}, inplace=True) + pop_shp = self.add_nut_code(pop_shp, nut_shapefile_path, nut_value=nut_column) + pop_shp = pop_shp[pop_shp['nut_code'] != -999] + pop_shp.rename(columns={'nut_code': nut_column}, inplace=True) + + pop_shp = IoShapefile(self.comm).gather_shapefile(pop_shp) + if self.comm.Get_rank() == 0: + popu_dist = pop_shp.groupby(nut_column).sum() + popu_dist.to_csv(output_path) + self.comm.Barrier() + + return True diff --git a/hermesv3_bu/sectors/traffic_area_sector.py b/hermesv3_bu/sectors/traffic_area_sector.py index 21045250b39845fbd10f5d1cf9bb6d67f2560333..00dcacabd8db25f0236a80c3bc10d8372ed49931 100755 --- a/hermesv3_bu/sectors/traffic_area_sector.py +++ b/hermesv3_bu/sectors/traffic_area_sector.py @@ -7,10 +7,14 @@ import geopandas as gpd import pandas as pd import numpy as np from hermesv3_bu.sectors.sector import Sector +from hermesv3_bu.io_server.io_raster import IoRaster from hermesv3_bu.io_server.io_shapefile import IoShapefile from hermesv3_bu.io_server.io_netcdf import IoNetcdf from hermesv3_bu.tools.checker import check_files, error_exit +from pandas import DataFrame +from geopandas import GeoDataFrame + pmc_list = ['pmc', 'PMC'] @@ -18,15 +22,17 @@ pmc_list = ['pmc', 'PMC'] class TrafficAreaSector(Sector): def __init__(self, comm, logger, auxiliary_dir, grid, clip, date_array, source_pollutants, vertical_levels, population_tif_path, speciation_map_path, molecular_weights_path, - do_evaporative, gasoline_path, total_pop_by_prov, nuts_shapefile, speciation_profiles_evaporative, + do_evaporative, gasoline_path, population_nuts3, nuts_shapefile, speciation_profiles_evaporative, evaporative_ef_file, temperature_dir, do_small_cities, small_cities_shp, speciation_profiles_small_cities, small_cities_ef_file, small_cities_monthly_profile, small_cities_weekly_profile, small_cities_hourly_profile): spent_time = timeit.default_timer() logger.write_log('===== TRAFFIC AREA SECTOR =====') + print(do_evaporative, do_small_cities) + sys.stdout.flush() if do_evaporative: check_files([population_tif_path, speciation_map_path, molecular_weights_path, - gasoline_path, total_pop_by_prov, nuts_shapefile, speciation_profiles_evaporative, + gasoline_path, population_nuts3, nuts_shapefile, speciation_profiles_evaporative, evaporative_ef_file, temperature_dir]) if do_small_cities: check_files([population_tif_path, speciation_map_path, molecular_weights_path, @@ -40,10 +46,13 @@ class TrafficAreaSector(Sector): self.temperature_dir = temperature_dir self.speciation_profiles_evaporative = self.read_speciation_profiles(speciation_profiles_evaporative) self.evaporative_ef_file = evaporative_ef_file + + # self.create_population_by_nut(population_tif_path, nuts_shapefile, population_nuts3, nut_column='nuts3_id') + if do_evaporative: logger.write_log('\tInitialising evaporative emissions.', message_level=2) - self.evaporative = self.init_evaporative(population_tif_path, nuts_shapefile, gasoline_path, - total_pop_by_prov) + self.population_percent = self.get_population_percent(population_tif_path, population_nuts3, nuts_shapefile) + self.evaporative = self.init_evaporative(gasoline_path) else: self.evaporative = None @@ -61,161 +70,233 @@ class TrafficAreaSector(Sector): self.logger.write_time_log('TrafficAreaSector', '__init__', timeit.default_timer() - spent_time) - def init_evaporative(self, global_path, provinces_shapefile, gasoline_path, total_pop_by_prov): - spent_time = timeit.default_timer() + def get_population_by_nut2(self, path): + """ + Read the CSV file that contains the amount of population by nut3. - if self.comm.Get_rank() == 0: - if not os.path.exists(os.path.join(self.auxiliary_dir, 'traffic_area', 'vehicle_by_cell.shp')): - self.logger.write_log('\t\tCreating population shapefile.', message_level=3) - pop = self.get_clipped_population( - global_path, os.path.join(self.auxiliary_dir, 'traffic_area', 'population.shp'), write_file=False) - self.logger.write_log('\t\tCreating population shapefile by NUT.', message_level=3) - pop = self.make_population_by_nuts( - pop, provinces_shapefile, os.path.join(self.auxiliary_dir, 'traffic_area', 'pop_NUT.shp'), - write_file=False) - self.logger.write_log('\t\tCreating population shapefile by NUT and cell.', message_level=3) - pop = self.make_population_by_nuts_cell( - pop, os.path.join(self.auxiliary_dir, 'traffic_area', 'pop_NUT_cell.shp'), write_file=False) - self.logger.write_log('\t\tCreating vehicle shapefile by cell.', message_level=3) - veh_cell = self.make_vehicles_by_cell( - pop, gasoline_path, pd.read_csv(total_pop_by_prov), - os.path.join(self.auxiliary_dir, 'traffic_area', 'vehicle_by_cell.shp')) - else: - self.logger.write_log('\t\tReading vehicle shapefile by cell.', message_level=3) - veh_cell = IoShapefile(self.comm).read_shapefile_serial( - os.path.join(self.auxiliary_dir, 'traffic_area', 'vehicle_by_cell.shp')) - else: - veh_cell = None - - veh_cell = IoShapefile(self.comm).split_shapefile(veh_cell) - - self.logger.write_time_log('TrafficAreaSector', 'init_evaporative', timeit.default_timer() - spent_time) - return veh_cell + :param path: Path to the CSV file that contains the amount of population by nut3. + :type path: str - def init_small_cities(self, global_path, small_cities_shapefile): + :return: DataFrame with the amount of population by nut3. + :rtype: DataFrame + """ spent_time = timeit.default_timer() - if self.comm.Get_rank() == 0: - if not os.path.exists(os.path.join(self.auxiliary_dir, 'traffic_area', 'pop_SMALL_cell.shp')): - self.logger.write_log('\t\tCreating population shapefile.', message_level=3) - pop = self.get_clipped_population( - global_path, os.path.join(self.auxiliary_dir, 'traffic_area', 'population.shp'), write_file=False) - self.logger.write_log('\t\tCreating population small cities shapefile.', message_level=3) - pop = self.make_population_by_nuts( - pop, small_cities_shapefile, os.path.join(self.auxiliary_dir, 'traffic_area', 'pop_SMALL.shp'), - write_file=False) - self.logger.write_log('\t\tCreating population small cities shapefile by cell.', message_level=3) - pop = self.make_population_by_nuts_cell( - pop, os.path.join(self.auxiliary_dir, 'traffic_area', 'pop_SMALL_cell.shp'), write_file=True) - else: - self.logger.write_log('\t\tReading population small cities shapefile by cell.', message_level=3) - pop = IoShapefile(self.comm).read_shapefile_serial( - os.path.join(self.auxiliary_dir, 'traffic_area', 'pop_SMALL_cell.shp')) - else: - pop = None - pop = IoShapefile(self.comm).split_shapefile(pop) - self.logger.write_time_log('TrafficAreaSector', 'init_small_cities', timeit.default_timer() - spent_time) - return pop + pop_by_nut3 = pd.read_csv(path) + pop_by_nut3.set_index('nuts3_id', inplace=True) + pop_by_nut3 = pop_by_nut3.to_dict()['population'] - def get_clipped_population(self, global_path, population_shapefile_path, write_file=True): - from hermesv3_bu.io_server.io_raster import IoRaster - spent_time = timeit.default_timer() + self.logger.write_time_log('TrafficAreaSector', 'get_pop_by_nut3', timeit.default_timer() - spent_time) + return pop_by_nut3 - if not os.path.exists(population_shapefile_path): - population_density = IoRaster(self.comm).clip_raster_with_shapefile_poly( - global_path, self.clip.shapefile, - os.path.join(self.auxiliary_dir, 'traffic_area', 'population.tif')) - population_density = IoRaster(self.comm).to_shapefile_serie_by_cell( - population_density, out_path=population_shapefile_path, write=write_file) - else: - population_density = IoShapefile(self.comm).read_shapefile_serial(population_shapefile_path) + def get_population_percent(self, pop_raster_path, pop_by_nut_path, nut_shapefile_path): + """ + Calculate the percentage based on the amount of population. + + :param pop_raster_path: Path to the raster file that contains the population information. + :type pop_raster_path: str - self.logger.write_time_log('TrafficAreaSector', 'get_clipped_population', timeit.default_timer() - spent_time) + :param pop_by_nut_path: Path to the CSV file that contains the amount of population by nut3. + :type pop_by_nut_path: str - return population_density + :param nut_shapefile_path: Path to the shapefile that contains the nut3. + :type nut_shapefile_path: str - def make_population_by_nuts(self, population_shape, nut_shp, pop_by_nut_path, write_file=True, csv_path=None, - column_id='nuts3_id'): + :return: DataFrame with the population distribution by destiny cell. + :rtype: DataFrame + """ spent_time = timeit.default_timer() - if not os.path.exists(pop_by_nut_path): - nut_df = IoShapefile(self.comm).read_shapefile_serial(nut_shp) - population_shape['area_in'] = population_shape.geometry.area - df = gpd.overlay(population_shape, nut_df.to_crs(population_shape.crs), how='intersection') - df.crs = population_shape.crs - df.loc[:, 'data'] = df['data'] * (df.geometry.area / df['area_in']) - del df['area_in'] - if write_file: - IoShapefile(self.comm).write_shapefile_serial(df, pop_by_nut_path) - if csv_path is not None: - df = df.loc[:, ['data', column_id]].groupby(column_id).sum() - df.to_csv(csv_path) + pop_percent_path = os.path.join(self.auxiliary_dir, 'traffic_area', 'population_percent') + if not os.path.exists(pop_percent_path): + # 1st Clip the raster + self.logger.write_log("\t\tCreating clipped population raster", message_level=3) + if self.comm.Get_rank() == 0: + pop_raster_path = IoRaster(self.comm).clip_raster_with_shapefile_poly( + pop_raster_path, self.clip.shapefile, os.path.join(self.auxiliary_dir, 'traffic_area', 'pop.tif')) + + # 2nd Raster to shapefile + self.logger.write_log("\t\tRaster to shapefile", message_level=3) + pop_shp = IoRaster(self.comm).to_shapefile_parallel( + pop_raster_path, gather=False, bcast=False, crs={'init': 'epsg:4326'}) + + # 3rd Add NUT code + self.logger.write_log("\t\tAdding nut codes to the shapefile", message_level=3) + # if self.comm.Get_rank() == 0: + pop_shp.drop(columns='CELL_ID', inplace=True) + pop_shp.rename(columns={'data': 'population'}, inplace=True) + pop_shp = self.add_nut_code(pop_shp, nut_shapefile_path, nut_value='nuts3_id') + pop_shp = pop_shp[pop_shp['nut_code'] != -999] + pop_shp = IoShapefile(self.comm).balance(pop_shp) + + # 4th Calculate population percent + self.logger.write_log("\t\tCalculating population percentage on source resolution", message_level=3) + pop_by_nut2 = self.get_population_by_nut2(pop_by_nut_path) + pop_shp['tot_pop'] = pop_shp['nut_code'].map(pop_by_nut2) + pop_shp['pop_percent'] = pop_shp['population'] / pop_shp['tot_pop'] + pop_shp.drop(columns=['tot_pop', 'population'], inplace=True) + + # 5th Calculate percent by destiny cell + self.logger.write_log("\t\tCalculating population percentage on destiny resolution", message_level=3) + pop_shp.to_crs(self.grid.shapefile.crs, inplace=True) + pop_shp['src_inter_fraction'] = pop_shp.geometry.area + pop_shp = self.spatial_overlays(pop_shp.reset_index(), self.grid.shapefile.reset_index()) + pop_shp.drop(columns=['idx1', 'idx2', 'index'], inplace=True) + pop_shp['src_inter_fraction'] = pop_shp.geometry.area / pop_shp['src_inter_fraction'] + pop_shp['pop_percent'] = pop_shp['pop_percent'] * pop_shp['src_inter_fraction'] + pop_shp.drop(columns=['src_inter_fraction'], inplace=True) + + pop_shp = IoShapefile(self.comm).gather_shapefile(pop_shp) + if self.comm.Get_rank() == 0: + popu_dist = pop_shp.groupby(['FID', 'nut_code']).sum() + popu_dist = GeoDataFrame( + popu_dist, + geometry=self.grid.shapefile.loc[popu_dist.index.get_level_values('FID'), 'geometry'].values, + crs=self.grid.shapefile.crs) + IoShapefile(self.comm).write_shapefile_serial(popu_dist.reset_index(), pop_percent_path) + else: + popu_dist = None + popu_dist = IoShapefile(self.comm).split_shapefile(popu_dist) else: - df = IoShapefile(self.comm).read_shapefile_serial(pop_by_nut_path) + popu_dist = IoShapefile(self.comm).read_shapefile_parallel(pop_percent_path) + popu_dist.set_index(['FID', 'nut_code'], inplace=True) - self.logger.write_time_log('TrafficAreaSector', 'make_population_by_nuts', timeit.default_timer() - spent_time) - return df + self.logger.write_time_log('TrafficAreaSector', 'get_population_percent', timeit.default_timer() - spent_time) + return popu_dist - def make_population_by_nuts_cell(self, pop_by_nut, pop_nut_cell_path, write_file=True): - spent_time = timeit.default_timer() + def get_population(self, pop_raster_path, nut_shapefile_path): + """ + Calculate the amount of population. - if not os.path.exists(pop_nut_cell_path): + :param pop_raster_path: Path to the raster file that contains the population information. + :type pop_raster_path: str - pop_by_nut = pop_by_nut.to_crs(self.grid.shapefile.crs) - # del pop_by_nut[''] - pop_by_nut['area_in'] = pop_by_nut.geometry.area + :param nut_shapefile_path: Path to the shapefile that contains the small cities. + :type nut_shapefile_path: str - # df = gpd.overlay(pop_by_nut, grid_shp, how='intersection') - df = self.spatial_overlays(pop_by_nut, self.grid.shapefile.reset_index(), how='intersection') + :return: DataFrame with the amount of population distribution by small city. + :rtype: DataFrame + """ + spent_time = timeit.default_timer() - df.crs = self.grid.shapefile.crs - df.loc[:, 'data'] = df['data'] * (df.geometry.area / df['area_in']) - del pop_by_nut['area_in'] - if write_file: - IoShapefile(self.comm).write_shapefile_serial(df, pop_nut_cell_path) + pop_path = os.path.join(self.auxiliary_dir, 'traffic_area', 'population_small') + if not os.path.exists(pop_path): + # 1st Clip the raster + self.logger.write_log("\t\tCreating clipped population raster", message_level=3) + if self.comm.Get_rank() == 0: + pop_raster_path = IoRaster(self.comm).clip_raster_with_shapefile_poly( + pop_raster_path, self.clip.shapefile, os.path.join(self.auxiliary_dir, 'traffic_area', 'pop.tif')) + + # 2nd Raster to shapefile + self.logger.write_log("\t\tRaster to shapefile", message_level=3) + pop_shp = IoRaster(self.comm).to_shapefile_parallel( + pop_raster_path, gather=False, bcast=False, crs={'init': 'epsg:4326'}) + + # 3rd Add NUT code + self.logger.write_log("\t\tAdding nut codes to the shapefile", message_level=3) + # if self.comm.Get_rank() == 0: + pop_shp.drop(columns='CELL_ID', inplace=True) + pop_shp.rename(columns={'data': 'population'}, inplace=True) + + pop_shp = self.add_nut_code(pop_shp, nut_shapefile_path, nut_value='ORDER08') + pop_shp = pop_shp[pop_shp['nut_code'] != -999] + pop_shp = IoShapefile(self.comm).balance(pop_shp) + + # 4th Calculate percent by destiny cell + self.logger.write_log("\t\tCalculating population percentage on destiny resolution", message_level=3) + pop_shp.to_crs(self.grid.shapefile.crs, inplace=True) + pop_shp['src_inter_fraction'] = pop_shp.geometry.area + pop_shp = self.spatial_overlays(pop_shp.reset_index(), self.grid.shapefile.reset_index()) + pop_shp.drop(columns=['idx1', 'idx2', 'index'], inplace=True) + pop_shp['src_inter_fraction'] = pop_shp.geometry.area / pop_shp['src_inter_fraction'] + pop_shp['population'] = pop_shp['population'] * pop_shp['src_inter_fraction'] + pop_shp.drop(columns=['src_inter_fraction', 'nut_code'], inplace=True) + + pop_shp = IoShapefile(self.comm).gather_shapefile(pop_shp) + if self.comm.Get_rank() == 0: + popu_dist = pop_shp.groupby(['FID']).sum() + popu_dist = GeoDataFrame( + popu_dist, + geometry=self.grid.shapefile.loc[popu_dist.index.get_level_values('FID'), 'geometry'].values, + crs=self.grid.shapefile.crs) + IoShapefile(self.comm).write_shapefile_serial(popu_dist.reset_index(), pop_path) + else: + popu_dist = None + popu_dist = IoShapefile(self.comm).split_shapefile(popu_dist) else: - df = IoShapefile(self.comm).read_shapefile_serial(pop_nut_cell_path) + popu_dist = IoShapefile(self.comm).read_shapefile_parallel(pop_path) + popu_dist.set_index(['FID'], inplace=True) - self.logger.write_time_log('TrafficAreaSector', 'make_population_by_nuts_cell', - timeit.default_timer() - spent_time) - return df + self.logger.write_time_log('TrafficAreaSector', 'get_population_percent', timeit.default_timer() - spent_time) + return popu_dist - def make_vehicles_by_cell(self, pop_nut_cell, gasoline_path, total_pop_by_nut, veh_by_cell_path, - column_id='nuts3_id'): - spent_time = timeit.default_timer() + def init_evaporative(self, gasoline_path): + """ + Create the gasoline vehicle by destiny cell. - if not os.path.exists(veh_by_cell_path): + :param gasoline_path: Path to the CSV file that contains the amount of vehicles by NUTS3. + :type gasoline_path: str - total_pop_by_nut.loc[:, column_id] = total_pop_by_nut[column_id].astype(np.int16) - pop_nut_cell.loc[:, column_id] = pop_nut_cell[column_id].astype(np.int16) - - df = pd.merge(pop_nut_cell, total_pop_by_nut, left_on=column_id, right_on=column_id, how='left') + :return: Shapefile with the vehicle distribution. + :rtype: GeoDataFrame + """ + spent_time = timeit.default_timer() + veh_cell_path = os.path.join(self.auxiliary_dir, 'traffic_area', 'vehicle_by_cell') + if not os.path.exists(veh_cell_path): + veh_cell = self.make_vehicles_by_cell(gasoline_path) + IoShapefile(self.comm).write_shapefile_parallel(veh_cell.reset_index(), veh_cell_path) + else: + self.logger.write_log('\t\tReading vehicle shapefile by cell.', message_level=3) + veh_cell = IoShapefile(self.comm).read_shapefile_parallel(veh_cell_path) + veh_cell.set_index('FID', inplace=True) - df['pop_percent'] = df['data'] / df['population'] - df.drop(columns=['data', 'population'], inplace=True) + self.logger.write_time_log('TrafficAreaSector', 'init_evaporative', timeit.default_timer() - spent_time) + return veh_cell - gas_df = pd.read_csv(gasoline_path, index_col='COPERT_V_name').transpose() - vehicle_type_list = list(gas_df.columns.values) - gas_df.loc[:, column_id] = gas_df.index.astype(np.int16) + def init_small_cities(self, global_path, small_cities_shapefile): + spent_time = timeit.default_timer() + pop = self.get_population(global_path, small_cities_shapefile) - df = df.merge(gas_df, left_on=column_id, right_on=column_id, how='left') - for vehicle_type in vehicle_type_list: - df.loc[:, vehicle_type] = df[vehicle_type] * df['pop_percent'] + self.logger.write_time_log('TrafficAreaSector', 'init_small_cities', timeit.default_timer() - spent_time) + return pop - del df['pop_percent'], df[column_id] + def read_vehicles_by_nut(self, path): + spent_time = timeit.default_timer() - aux_df = df.loc[:, ['FID'] + vehicle_type_list].groupby('FID').sum() - aux_df.loc[:, 'FID'] = aux_df.index + vehicles_by_nut = pd.read_csv(path, index_col='COPERT_V_name') + vehicle_list = vehicles_by_nut.index.values + nut_list = vehicles_by_nut.columns.values.astype(np.int32) + vehicles_by_nut = pd.DataFrame(vehicles_by_nut.values.T, index=nut_list, columns=vehicle_list) + vehicles_by_nut.index.name = 'nuts3_id' - geom = self.grid.shapefile.loc[aux_df.index, 'geometry'] + self.logger.write_time_log('TrafficAreaSector', 'read_vehicles_by_nut', timeit.default_timer() - spent_time) + return vehicles_by_nut - df = gpd.GeoDataFrame(aux_df, geometry=geom, crs=pop_nut_cell.crs) - IoShapefile(self.comm).write_shapefile_serial(df, veh_by_cell_path) + def make_vehicles_by_cell(self, gasoline_path): + spent_time = timeit.default_timer() + vehicles_by_nut = self.read_vehicles_by_nut(gasoline_path) + + vehicle_list = vehicles_by_nut.columns.values + vehicle_by_cell = pd.merge(self.population_percent.reset_index(), vehicles_by_nut.reset_index(), + left_on='nut_code', right_on='nuts3_id', how='left') + vehicle_by_cell.drop(columns=['nut_code', 'nuts3_id'], inplace=True) + vehicle_by_cell[vehicle_list] = vehicle_by_cell[vehicle_list].multiply( + vehicle_by_cell['pop_percent'], axis='index') + vehicle_by_cell.drop(columns=['pop_percent'], inplace=True) + vehicle_by_cell = IoShapefile(self.comm).gather_shapefile(vehicle_by_cell, rank=0) + if self.comm.Get_rank() == 0: + vehicle_by_cell = vehicle_by_cell.groupby('FID').sum() else: - df = IoShapefile(self.comm).read_shapefile_serial(veh_by_cell_path) + vehicle_by_cell = None + vehicle_by_cell = IoShapefile(self.comm).split_shapefile(vehicle_by_cell) + + vehicle_by_cell = GeoDataFrame( + vehicle_by_cell, + geometry=self.grid.shapefile.loc[vehicle_by_cell.index.get_level_values('FID'), 'geometry'].values, + crs=self.grid.shapefile.crs) self.logger.write_time_log('TrafficAreaSector', 'make_vehicles_by_cell', timeit.default_timer() - spent_time) - return df + return vehicle_by_cell def get_profiles_from_temperature(self, temperature, default=False): spent_time = timeit.default_timer() @@ -251,6 +332,7 @@ class TrafficAreaSector(Sector): def calculate_evaporative_emissions(self): spent_time = timeit.default_timer() + self.evaporative.reset_index(inplace=True) veh_list = list(self.evaporative.columns.values) veh_list.remove('FID') veh_list.remove('geometry') @@ -285,7 +367,7 @@ class TrafficAreaSector(Sector): geom1_col='centroid', src_column='REC', axis=1) del self.evaporative['c_lat'], self.evaporative['c_lon'], self.evaporative['centroid'] IoShapefile(self.comm).write_shapefile_parallel( - self.evaporative, os.path.join(self.auxiliary_dir, 'traffic_area', 'vehicle_by_cell.shp')) + self.evaporative, os.path.join(self.auxiliary_dir, 'traffic_area', 'vehicle_by_cell')) else: del self.evaporative['c_lat'], self.evaporative['c_lon'], self.evaporative['centroid'] @@ -352,19 +434,18 @@ class TrafficAreaSector(Sector): self.logger.write_time_log('TrafficAreaSector', 'speciate_evaporative', timeit.default_timer() - spent_time) return speciated_df - def small_cities_emissions_by_population(self, df): + def small_cities_emissions_by_population(self, pop_by_cell): spent_time = timeit.default_timer() - df = df.loc[:, ['data', 'FID']].groupby('FID').sum() ef_df = pd.read_csv(self.small_cities_ef_file, sep=',') ef_df.drop(['Code', 'Copert_V_name'], axis=1, inplace=True) for pollutant in ef_df.columns.values: - df[pollutant] = df['data'] * ef_df[pollutant].iloc[0] - df.drop('data', axis=1, inplace=True) + pop_by_cell[pollutant] = pop_by_cell['population'] * ef_df[pollutant].iloc[0] + pop_by_cell.drop(columns=['population'], inplace=True) self.logger.write_time_log('TrafficAreaSector', 'small_cities_emissions_by_population', timeit.default_timer() - spent_time) - return df + return pop_by_cell def add_timezones(self, grid, default=False): from timezonefinder import TimezoneFinder @@ -475,6 +556,7 @@ class TrafficAreaSector(Sector): def calculate_emissions(self): spent_time = timeit.default_timer() + self.logger.write_log('\tCalculating traffic area.', message_level=2) if self.do_evaporative: self.logger.write_log('\tCalculating evaporative emissions.', message_level=2) diff --git a/hermesv3_bu/sectors/traffic_sector.py b/hermesv3_bu/sectors/traffic_sector.py index ee02526e08905674125ebcf3de3336300b1b62e1..6ff9e65f3cffa61b178a09600fa5ba868a0990ba 100755 --- a/hermesv3_bu/sectors/traffic_sector.py +++ b/hermesv3_bu/sectors/traffic_sector.py @@ -532,7 +532,7 @@ class TrafficSector(Sector): # It is assumed that after 48 h without rain the potential emission is equal to one dst[dst >= (1 - np.exp(- RECOVERY_RATIO * 48))] = 1. # Creates the GeoDataFrame - df = gpd.GeoDataFrame(dst.T, geometry=precipitation.geometry) + df = gpd.GeoDataFrame(dst.T, geometry=precipitation['geometry'].values) df.columns = ['PR_{0}'.format(x) for x in df.columns.values[:-1]] + ['geometry'] df.loc[:, 'REC'] = df.index @@ -1099,8 +1099,8 @@ class TrafficSector(Sector): p_factor = self.calculate_precipitation_factor(link_lons.min(), link_lons.max(), link_lats.min(), link_lats.max(), self.precipitation_path) - unary_union = p_factor.unary_union + road_link_aux['REC'] = road_link_aux.apply(self.nearest, geom_union=unary_union, df1=road_link_aux, df2=p_factor, geom1_col='centroid', src_column='REC', axis=1) road_link_aux.drop(columns=['centroid'], inplace=True) diff --git a/hermesv3_bu/tools/checker.py b/hermesv3_bu/tools/checker.py index bb43017dd1e905916eea6ef5593fb9f786dc9f3c..bdf2cdffc78c61961ae2e42393c0b06e03a9d140 100644 --- a/hermesv3_bu/tools/checker.py +++ b/hermesv3_bu/tools/checker.py @@ -34,5 +34,5 @@ def error_exit(error_message): print(error_message) print(error_message, file=sys.stderr) sys.stderr.flush() - time.sleep(2) + time.sleep(5) MPI.COMM_WORLD.Abort()