From e56b35248efc8d64c828f0768214a6356618bbdc Mon Sep 17 00:00:00 2001 From: Carles Tena Date: Fri, 12 Jun 2020 13:51:38 +0200 Subject: [PATCH] Fixed bug for GFAS hourly in parallel --- .../point_gfas_hourly_emission_inventory.py | 101 ++++++++++-------- hermesv3_gr/modules/grids/grid.py | 99 +++++++++++++++++ 2 files changed, 153 insertions(+), 47 deletions(-) diff --git a/hermesv3_gr/modules/emision_inventories/point_gfas_hourly_emission_inventory.py b/hermesv3_gr/modules/emision_inventories/point_gfas_hourly_emission_inventory.py index a6b53cf..124d1c7 100755 --- a/hermesv3_gr/modules/emision_inventories/point_gfas_hourly_emission_inventory.py +++ b/hermesv3_gr/modules/emision_inventories/point_gfas_hourly_emission_inventory.py @@ -145,11 +145,13 @@ class PointGfasHourlyEmissionInventory(EmissionInventory): if not os.path.exists(os.path.dirname(coordinates_path)): os.makedirs(os.path.dirname(coordinates_path)) - coordinates_shapefile = os.path.join( - self.auxiliary_files_path, 'gfas', 'coordinates.shp'.format(settings.size, settings.rank)) - if not os.path.exists(coordinates_shapefile): + coordinates_full = os.path.join( + self.auxiliary_files_path, 'gfas', 'coordinates_full.csv'.format(settings.size, settings.rank)) + if not os.path.exists(coordinates_full): + settings.write_log('\t\tCreating GFAS coordinates shapefile. It may take a few minutes.', level=3) src_area = get_grid_area(self.get_input_path(date=self.date_array[0])) netcdf = Dataset(self.get_input_path(date=self.date_array[0]), mode='r') + lats = netcdf.variables['latitude'][:] lons = netcdf.variables['longitude'][:] netcdf.close() @@ -172,35 +174,54 @@ class PointGfasHourlyEmissionInventory(EmissionInventory): coordinates['src_area'] = src_area.flatten() coordinates['geometry'] = coordinates['geometry'].apply(wkt.loads) coordinates.reset_index(inplace=True) - coordinates.to_file(coordinates_shapefile) - else: - coordinates = gpd.read_file(coordinates_shapefile) - coordinates = gpd.sjoin(coordinates, - self.grid.to_shapefile(full_grid=True).to_crs(coordinates.crs), - how='inner', op='within') + # Too time consumer + #coordinates.to_file(coordinates_shapefile) + settings.write_log('\t\t\tDone!', level=3) - coordinates.rename(columns={'FID_left': 'FID'}, inplace=True) - coordinates = coordinates[['FID', 'src_area', 'Cell_ID']] - for proc in range(settings.size): - if proc > 0: - settings.comm.send(coordinates[coordinates['Cell_ID'].isin(self.cell_id_by_proc[proc])], - dest=proc) - coordinates = coordinates[coordinates['Cell_ID'].isin(self.cell_id_by_proc[0])] + grid = self.grid.to_shapefile(full_grid=True).to_crs(coordinates.crs) - else: - coordinates = settings.comm.recv(source=0) + temp_coords = Dataset(os.path.join(self.grid.temporal_path, 'temporal_coords.nc'), mode='r') + + grid['dst_area'] = temp_coords.variables['cell_area'][:].flatten() + temp_coords.close() - # coordinates = coordinates[['FID', 'Cell_ID']] - temp_coords = Dataset(os.path.join(self.grid.temporal_path, 'temporal_coords.nc'), mode='r') + settings.write_log('\t\tAllocating GFAS coordinates into the Grid cells. It may take a few minutes.', + level=3) + + coordinates = gpd.sjoin(coordinates, grid, how='inner', op='within') + settings.write_log('\t\t\tDone!', level=3) + coordinates.rename(columns={'FID_left': 'FID'}, inplace=True) + + coordinates = coordinates[['FID', 'src_area', 'Cell_ID', 'dst_area']] + coordinates.to_csv(coordinates_full) + else: + coordinates = pd.read_csv(coordinates_full) - cell_area = pd.DataFrame(temp_coords.variables['cell_area'][:].flatten(), columns=['dst_area']) - temp_coords.close() - cell_area['Cell_ID'] = cell_area.index + # for proc in range(settings.size): + # if proc > 0: + # settings.comm.send(coordinates[coordinates['Cell_ID'].isin(self.cell_id_by_proc[proc])], + # dest=proc) + # coordinates = coordinates[coordinates['Cell_ID'].isin(self.cell_id_by_proc[0])] - coordinates = pd.merge(coordinates, cell_area, on='Cell_ID') - # print(coordinates) - # exit() + else: + settings.write_log('\t\tWaiting to GFAS coordinates shapefile (rank 0). It may take a few minutes.', + level=3) + # coordinates = settings.comm.recv(source=0) + coordinates = None + coordinates = settings.comm.bcast(coordinates, root=0) + settings.write_log('\t\tGFAS coordinates received.', level=3) + + coordinates = coordinates[coordinates['Cell_ID'].isin(self.cell_id_by_proc[settings.rank])] + # # coordinates = coordinates[['FID', 'Cell_ID']] + # temp_coords = Dataset(os.path.join(self.grid.temporal_path, 'temporal_coords.nc'), mode='r') + # + # cell_area = pd.DataFrame(temp_coords.variables['cell_area'][:].flatten(), columns=['dst_area']) + # temp_coords.close() + # cell_area['Cell_ID'] = cell_area.index + # + # coordinates = pd.merge(coordinates, cell_area, on='Cell_ID') + sys.stdout.flush() coordinates.to_csv(coordinates_path) else: coordinates = pd.read_csv(coordinates_path) @@ -325,7 +346,6 @@ class PointGfasHourlyEmissionInventory(EmissionInventory): # One NetCDF for time step (hour) for tstep, date in enumerate(self.date_array): - # print(date, self.get_input_path(date=date)) shapefile_list_hour = [] netcdf = Dataset(self.get_input_path(date=date), mode='r') for pollutant in self.input_pollutants: @@ -359,29 +379,17 @@ class PointGfasHourlyEmissionInventory(EmissionInventory): shapefile = balance_dataframe(settings.comm, shapefile) shapefile.fillna(0.0, inplace=True) - # print(timeit.default_timer() - st_time) - # shapefile = shapefile.reset_index().set_index(['FID', 'tstep']) - self.emissions = shapefile return True def distribute(self, emissions): - emissions_list = [] - # print('distributing') - for proc in range(settings.size): - partial_emis = emissions[emissions['FID'].isin(sorted(np.unique(self.fid_distribution[proc])))].copy() - if settings.rank < proc: - settings.comm.send(partial_emis, dest=proc) - recv_partial_emis = settings.comm.recv(source=proc) - elif settings.rank > proc: - recv_partial_emis = settings.comm.recv(source=proc) - settings.comm.send(partial_emis, dest=proc) - else: - recv_partial_emis = partial_emis - emissions_list.append(recv_partial_emis) - - emissions = pd.concat(emissions_list) + settings.write_log("\t\tCalculating distribution.", level=3) + total_emis = settings.comm.gather(emissions, root=0) + if settings.rank == 0: + total_emis = pd.concat(total_emis) + total_emis = settings.comm.bcast(total_emis, root=0) + emissions = total_emis[total_emis['FID'].isin(sorted(np.unique(self.fid_distribution[settings.rank])))] emissions = pd.merge(emissions, self.coordinates.reset_index(), on='FID') @@ -428,14 +436,13 @@ class PointGfasHourlyEmissionInventory(EmissionInventory): warnings.warn('WARNING: One or more fires have an altitude of fire emission injection higher than the top' + ' layer of the model defined in the {0} file'.format(vertical_description_path)) - # print(self.emissions.loc[~self.emissions['altitude'].isna()]) del self.emissions['altitude'] self.emissions = self.emissions.groupby(['FID', 'tstep', 'layer']).sum() self.emissions.reset_index(inplace=True) self.emissions = self.vertical.distribute_vertically(self.emissions, self.input_pollutants) - + settings.write_log("\t\t\tVertical allocation done!", level=3) settings.write_time('PointGfasHourlyEmissionInventory', 'calculate_altitudes', timeit.default_timer() - st_time, level=2) return True diff --git a/hermesv3_gr/modules/grids/grid.py b/hermesv3_gr/modules/grids/grid.py index a4784dd..c43d290 100755 --- a/hermesv3_gr/modules/grids/grid.py +++ b/hermesv3_gr/modules/grids/grid.py @@ -21,6 +21,7 @@ import os import sys import timeit +import time import numpy as np import ESMF import hermesv3_gr.config.settings as settings @@ -352,6 +353,104 @@ class Grid(object): import pandas as pd from shapely.geometry import Polygon + st_time = timeit.default_timer() + settings.write_log('\t\tGetting grid shapefile', level=3) + + if full_grid: + self.shapefile_path = os.path.join(self.temporal_path, 'shapefile') + else: + self.shapefile_path = os.path.join(self.temporal_path, 'shapefiles_n{0}'.format(settings.size)) + + if not os.path.exists(self.shapefile_path): + if settings.rank == 0: + os.makedirs(self.shapefile_path) + else: + time.sleep(15) + if full_grid: + self.shapefile_path = os.path.join(self.shapefile_path, 'grid_shapefile.shp') + else: + self.shapefile_path = os.path.join(self.shapefile_path, 'grid_shapefile_{0}.shp'.format(settings.rank)) + + done = self.is_shapefile() + # print('Rank {0}: {1} {2}'.format(self.comm.Get_rank(), done, self.shapefile_path)) + sys.stdout.flush() + if not done: + settings.write_log('\t\t\tGrid shapefile not done. Lets try to create it.', level=3) + # Create Shapefile + + y = self.boundary_latitudes + x = self.boundary_longitudes + + if self.grid_type in ['global', 'regular']: + x = x.reshape((x.shape[1], x.shape[2])) + y = y.reshape((y.shape[1], y.shape[2])) + + aux_shape = (y.shape[0], x.shape[0], 4) + x_aux = np.empty(aux_shape) + x_aux[:, :, 0] = x[np.newaxis, :, 0] + x_aux[:, :, 1] = x[np.newaxis, :, 1] + x_aux[:, :, 2] = x[np.newaxis, :, 1] + x_aux[:, :, 3] = x[np.newaxis, :, 0] + + x = x_aux + del x_aux + + y_aux = np.empty(aux_shape) + y_aux[:, :, 0] = y[:, np.newaxis, 0] + y_aux[:, :, 1] = y[:, np.newaxis, 0] + y_aux[:, :, 2] = y[:, np.newaxis, 1] + y_aux[:, :, 3] = y[:, np.newaxis, 1] + + y = y_aux + del y_aux + + if not full_grid: + y = y[self.x_lower_bound:self.x_upper_bound, self.y_lower_bound:self.y_upper_bound, :] + x = x[self.x_lower_bound:self.x_upper_bound, self.y_lower_bound:self.y_upper_bound, :] + + aux_b_lats = y.reshape((y.shape[0] * y.shape[1], y.shape[2])) + aux_b_lons = x.reshape((x.shape[0] * x.shape[1], x.shape[2])) + + gdf = gpd.GeoDataFrame(index=range(aux_b_lons.shape[0]), crs={'init': 'epsg:4326'}) + + gdf['geometry'] = None + # Create one dataframe with 8 columns, 4 points with two coordinates each one + for i in range(aux_b_lons.shape[0]): + + gdf.loc[i, 'geometry'] = Polygon([(aux_b_lons[i, 0], aux_b_lats[i, 0]), + (aux_b_lons[i, 1], aux_b_lats[i, 1]), + (aux_b_lons[i, 2], aux_b_lats[i, 2]), + (aux_b_lons[i, 3], aux_b_lats[i, 3]), + (aux_b_lons[i, 0], aux_b_lats[i, 0])]) + if i % 1000 == 0: + settings.write_log('\t\t\t\t{0:.2f}%'.format((i+1)*100/aux_b_lons.shape[0]), level=3) + + # Calculating Cell ID + index = np.array(range(self.full_shape[2] * self.full_shape[3])) + index = index.reshape((self.full_shape[2], self.full_shape[3])) + if not full_grid: + index = index[self.x_lower_bound:self.x_upper_bound, self.y_lower_bound:self.y_upper_bound] + gdf['Cell_ID'] = index.flatten() + + gdf = gdf.to_crs(self.crs) + + gdf['FID'] = gdf.index + settings.write_log('\t\t\t\t100.00%', level=3) + gdf.to_file(self.shapefile_path) + else: + settings.write_log('\t\t\tGrid shapefile already done. Lets try to read it.', level=3) + gdf = gpd.read_file(self.shapefile_path) + # gdf.set_index('FID', inplace=True) + + settings.write_time('Grid', 'to_shapefile', timeit.default_timer() - st_time, level=1) + + return gdf + + def to_shapefile_old(self, full_grid=True): + import geopandas as gpd + import pandas as pd + from shapely.geometry import Polygon + st_time = timeit.default_timer() # settings.write_log('\t\tGetting grid shapefile', level=3) -- GitLab