From 7e261bbbf1cef38b6cc228f35509314915281f5e Mon Sep 17 00:00:00 2001 From: ctena Date: Tue, 7 Mar 2023 18:18:16 +0100 Subject: [PATCH 1/3] Horizontal Interpolation Conservative improvement usage of memory --- nes/methods/horizontal_interpolation.py | 103 ++++++++++---------- tests/3.3-test_horiz_interp_conservative.py | 26 +++-- 2 files changed, 73 insertions(+), 56 deletions(-) diff --git a/nes/methods/horizontal_interpolation.py b/nes/methods/horizontal_interpolation.py index 897327f..58a2abc 100644 --- a/nes/methods/horizontal_interpolation.py +++ b/nes/methods/horizontal_interpolation.py @@ -4,6 +4,7 @@ import sys import warnings import numpy as np import pandas as pd +from geopandas import GeoSeries import os import nes from mpi4py import MPI @@ -13,6 +14,8 @@ from datetime import datetime from warnings import warn import copy import pyproj +import gc +import psutil # CONSTANTS NEAREST_OPTS = ['NearestNeighbour', 'NearestNeighbours', 'nn', 'NN'] @@ -33,7 +36,7 @@ def interpolate_horizontal(self, dst_grid, weight_matrix_path=None, kind='Neares weight_matrix_path : str, None Path to the weight matrix to read/create. kind : str - Kind of horizontal methods. Accepted values: ['NearestNeighbour', 'Conservative']. + Kind of horizontal interpolation. Accepted values: ['NearestNeighbour', 'Conservative']. n_neighbours : int Used if kind == NearestNeighbour. Number of nearest neighbours to interpolate. Default: 4. info : bool @@ -94,7 +97,7 @@ def interpolate_horizontal(self, dst_grid, weight_matrix_path=None, kind='Neares # Apply weights for var_name, var_info in self.variables.items(): if info and self.master: - print("\t{var} horizontal methods".format(var=var_name)) + print("\t{var} horizontal interpolation".format(var=var_name)) sys.stdout.flush() src_shape = var_info['data'].shape if isinstance(dst_grid, nes.PointsNes): @@ -207,7 +210,7 @@ def get_weights_idx_t_axis(self, dst_grid, weight_matrix_path, kind, n_neighbour weight_matrix_path : str, None Path to the weight matrix to read/create. kind : str - Kind of horizontal methods. Accepted values: ['NearestNeighbour', 'Conservative']. + Kind of horizontal interpolation. Accepted values: ['NearestNeighbour', 'Conservative']. n_neighbours : int Used if kind == NearestNeighbour. Number of nearest neighbours to interpolate. Default: 4. only_create : bool @@ -301,7 +304,7 @@ def get_weights_idx_xy_axis(self, dst_grid, weight_matrix_path, kind, n_neighbou weight_matrix_path : str, None Path to the weight matrix to read/create. kind : str - Kind of horizontal methods. Accepted values: ['NearestNeighbour', 'Conservative']. + Kind of horizontal interpolation. Accepted values: ['NearestNeighbour', 'Conservative']. n_neighbours : int Used if kind == NearestNeighbour. Number of nearest neighbours to interpolate. Default: 4. only_create : bool @@ -540,83 +543,83 @@ def create_area_conservative_weight_matrix(self, dst_nes, wm_path=None, info=Fal if info and self.master: print("\tCreating area conservative Weight Matrix") sys.stdout.flush() + + my_crs = pyproj.CRS.from_proj4("+proj=latlon") # Common projection for both shapefiles + # Get a portion of the destiny grid if dst_nes.shapefile is None: dst_nes.create_shapefile() dst_grid = copy.deepcopy(dst_nes.shapefile) - # Get the complete source grid + # Formatting Destination grid + dst_grid.to_crs(crs=my_crs, inplace=True) + dst_grid['FID_dst'] = dst_grid.index + + # Preparing Source grid if self.shapefile is None: self.create_shapefile() src_grid = copy.deepcopy(self.shapefile) - if self.parallel_method == 'T': - # All process has the same shapefile - pass - else: + # Formatting Source grid + src_grid.to_crs(crs=my_crs, inplace=True) + + # Serialize index intersection function to avoid memory problems + if self.size > 1 and self.parallel_method != 'T': src_grid = self.comm.gather(src_grid, root=0) + dst_grid = self.comm.gather(dst_grid, root=0) if self.master: src_grid = pd.concat(src_grid) - src_grid = self.comm.bcast(src_grid) - - my_crs = pyproj.CRS.from_proj4("+proj=latlon") - # Normalizing projections - dst_grid.to_crs(crs=my_crs, inplace=True) - dst_grid['FID_dst'] = dst_grid.index - dst_grid = dst_grid.reset_index() - - # src_grid.to_crs(crs=pyproj.Proj(proj='geocent', ellps='WGS84', datum='WGS84').crs, inplace=True) - src_grid.to_crs(crs=my_crs, inplace=True) - src_grid['FID_src'] = src_grid.index + dst_grid = pd.concat(dst_grid) + if self.master: + src_grid['FID_src'] = src_grid.index + src_grid = src_grid.reset_index() + dst_grid = dst_grid.reset_index() + fid_src, fid_dst = dst_grid.sindex.query_bulk(src_grid.geometry, predicate='intersects') + + # Calculate intersected areas and fractions + intersection_df = pd.DataFrame(columns=["FID_src", "FID_dst"]) + + intersection_df['FID_src'] = np.array(src_grid.loc[fid_src, 'FID_src'], dtype=np.uint32) + intersection_df['FID_dst'] = np.array(dst_grid.loc[fid_dst, 'FID_dst'], dtype=np.uint32) + + intersection_df['geometry_src'] = src_grid.loc[fid_src, 'geometry'].values + intersection_df['geometry_dst'] = dst_grid.loc[fid_dst, 'geometry'].values + del src_grid, dst_grid, fid_src, fid_dst + # Split the array into smaller arrays in order to scatter the data among the processes + intersection_df = np.array_split(intersection_df, self.size) + else: + intersection_df = None - src_grid = src_grid.reset_index() + intersection_df = self.comm.scatter(intersection_df, root=0) if info and self.master: print("\t\tGrids created and ready to interpolate") sys.stdout.flush() - - # Get intersected areas - inp, res = dst_grid.sindex.query_bulk(src_grid.geometry, predicate='intersects') - - # Calculate intersected areas and fractions - intersection = pd.DataFrame(columns=["FID_src", "FID_dst"]) - intersection['INP'] = np.array(inp, dtype=np.uint32) - intersection['RES'] = np.array(res, dtype=np.uint32) - intersection['FID_src'] = np.array(src_grid.loc[inp, 'FID_src'], dtype=np.uint32) - intersection['FID_dst'] = np.array(dst_grid.loc[res, 'FID_dst'], dtype=np.uint32) - - intersection['geometry_src'] = src_grid.loc[inp, 'geometry'].values - intersection['geometry_dst'] = dst_grid.loc[res, 'geometry'].values - if True: # No Warnings Zone warnings.filterwarnings('ignore') - intersection['intersect_area'] = intersection.apply( - lambda x: x['geometry_src'].intersection(x['geometry_dst']).buffer(0).area, axis=1) - intersection.drop(intersection[intersection['intersect_area'] <= 0].index, inplace=True) - - intersection["src_area"] = src_grid.loc[intersection['FID_src'], 'geometry'].area.values + intersection_df['weight'] = np.array(intersection_df.apply( + lambda x: x['geometry_src'].intersection(x['geometry_dst']).buffer(0).area / x['geometry_src'].area, + axis=1), dtype=np.float64) + intersection_df.drop(columns=["geometry_src", "geometry_dst"], inplace=True) + gc.collect() warnings.filterwarnings('default') - intersection['weight'] = intersection['intersect_area'] / intersection["src_area"] - # Format & Clean - intersection.drop(columns=["geometry_src", "geometry_dst", "src_area", "intersect_area"], inplace=True) - if info and self.master: print("\t\tWeights calculated. Formatting weight matrix.") sys.stdout.flush() # Initialising weight matrix if self.parallel_method != 'T': - intersection = self.comm.gather(intersection, root=0) + intersection_df = self.comm.gather(intersection_df, root=0) if self.master: if self.parallel_method != 'T': - intersection = pd.concat(intersection) - intersection = intersection.set_index(['FID_dst', intersection.groupby('FID_dst').cumcount()]).rename_axis( + intersection_df = pd.concat(intersection_df) + intersection_df = intersection_df.set_index(['FID_dst', intersection_df.groupby('FID_dst').cumcount()]).rename_axis( ('FID', 'level')).sort_index() - intersection.rename(columns={"FID_src": "idx"}, inplace=True) + intersection_df.rename(columns={"FID_src": "idx"}, inplace=True) weight_matrix = dst_nes.copy() weight_matrix.time = [datetime(year=2000, month=1, day=1, hour=0, second=0, microsecond=0)] weight_matrix._time = [datetime(year=2000, month=1, day=1, hour=0, second=0, microsecond=0)] @@ -629,7 +632,7 @@ def create_area_conservative_weight_matrix(self, dst_nes, wm_path=None, info=Fal weight_matrix.set_communicator(MPI.COMM_SELF) - weight_matrix.set_levels({'data': np.arange(intersection.index.get_level_values('level').max() + 1), + weight_matrix.set_levels({'data': np.arange(intersection_df.index.get_level_values('level').max() + 1), 'dimensions': ('lev',), 'units': '', 'positive': 'up'}) @@ -653,7 +656,7 @@ def create_area_conservative_weight_matrix(self, dst_nes, wm_path=None, info=Fal # Filling Weight matrix variables for aux_lev in weight_matrix.lev['data']: - aux_data = intersection.xs(level='level', key=aux_lev) + aux_data = intersection_df.xs(level='level', key=aux_lev) weight_matrix.variables['weight']['data'][0, aux_lev, aux_data.index] = aux_data.loc[:, 'weight'].values weight_matrix.variables['idx']['data'][0, aux_lev, aux_data.index] = aux_data.loc[:, 'idx'].values # Re-shaping diff --git a/tests/3.3-test_horiz_interp_conservative.py b/tests/3.3-test_horiz_interp_conservative.py index 5198124..a10f27d 100644 --- a/tests/3.3-test_horiz_interp_conservative.py +++ b/tests/3.3-test_horiz_interp_conservative.py @@ -16,9 +16,12 @@ result_path = "Times_test_3.3_horiz_interp_conservative.py_{0}_{1:03d}.csv".form result = pd.DataFrame(index=['read', 'calculate', 'write'], columns=['3.3.1.Only interp', '3.3.2.Create_WM', "3.3.3.Use_WM", "3.3.4.Read_WM"]) -# NAMEE src_path = "/gpfs/projects/bsc32/models/NES_tutorial_data/MONARCH_d01_2022111512.nc" +src_type = 'NAMEE' var_list = ['O3'] +# src_path = "/gpfs/projects/bsc32/models/NES_tutorial_data/nox_no_201505.nc" +# src_type = 'CAMS_glob_antv21' +# var_list = ['nox_no'] # ====================================================================================================================== # ====================================== Only interp ===================================================== @@ -52,13 +55,15 @@ y_0 = -797137.125 dst_nes = create_nes(comm=None, info=False, projection='lcc', lat_1=lat_1, lat_2=lat_2, lon_0=lon_0, lat_0=lat_0, nx=nx, ny=ny, inc_x=inc_x, inc_y=inc_y, x_0=x_0, y_0=y_0, parallel_method=parallel_method, times=src_nes.get_full_times()) +dst_type = "IP" + comm.Barrier() result.loc['read', test_name] = timeit.default_timer() - st_time st_time = timeit.default_timer() # INTERPOLATE -interp_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative') +interp_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative', info=False) # interp_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative', weight_matrix_path='T_WM.nc') comm.Barrier() result.loc['calculate', test_name] = timeit.default_timer() - st_time @@ -104,18 +109,21 @@ y_0 = -797137.125 dst_nes = create_nes(comm=None, info=False, projection='lcc', lat_1=lat_1, lat_2=lat_2, lon_0=lon_0, lat_0=lat_0, nx=nx, ny=ny, inc_x=inc_x, inc_y=inc_y, x_0=x_0, y_0=y_0, parallel_method=parallel_method, times=src_nes.get_full_times()) +dst_type = "IP" + comm.Barrier() result.loc['read', test_name] = timeit.default_timer() - st_time # Cleaning WM -if os.path.exists("CONS_WM_NAMEE_to_IP.nc") and rank == 0: - os.remove("CONS_WM_NAMEE_to_IP.nc") +if os.path.exists("CONS_WM_{0}_to_{1}.nc".format(src_type, dst_type)) and rank == 0: + os.remove("CONS_WM_{0}_to_{1}.nc".format(src_type, dst_type)) comm.Barrier() # INTERPOLATE st_time = timeit.default_timer() wm_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative', info=True, - weight_matrix_path="CONS_WM_NAMEE_to_IP.nc", only_create_wm=True) + weight_matrix_path="CONS_WM_{0}_to_{1}.nc".format(src_type, dst_type), + only_create_wm=True) comm.Barrier() result.loc['calculate', test_name] = timeit.default_timer() - st_time @@ -160,6 +168,8 @@ y_0 = -797137.125 dst_nes = create_nes(comm=None, info=False, projection='lcc', lat_1=lat_1, lat_2=lat_2, lon_0=lon_0, lat_0=lat_0, nx=nx, ny=ny, inc_x=inc_x, inc_y=inc_y, x_0=x_0, y_0=y_0, parallel_method=parallel_method, times=src_nes.get_full_times()) +dst_type = "IP" + comm.Barrier() result.loc['read', test_name] = timeit.default_timer() - st_time @@ -210,12 +220,15 @@ y_0 = -797137.125 dst_nes = create_nes(comm=None, info=False, projection='lcc', lat_1=lat_1, lat_2=lat_2, lon_0=lon_0, lat_0=lat_0, nx=nx, ny=ny, inc_x=inc_x, inc_y=inc_y, x_0=x_0, y_0=y_0, parallel_method=parallel_method, times=src_nes.get_full_times()) +dst_type = "IP" + comm.Barrier() result.loc['read', test_name] = timeit.default_timer() - st_time # INTERPOLATE st_time = timeit.default_timer() -interp_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative', weight_matrix_path="CONS_WM_NAMEE_to_IP.nc") +interp_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative', + weight_matrix_path="CONS_WM_{0}_to_{1}.nc".format(src_type, dst_type)) comm.Barrier() result.loc['calculate', test_name] = timeit.default_timer() - st_time @@ -232,3 +245,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") -- GitLab From 70331ec01f8897891bf6b31716a606ef7d0a7e10 Mon Sep 17 00:00:00 2001 From: ctena Date: Tue, 7 Mar 2023 18:20:52 +0100 Subject: [PATCH 2/3] Horizontal Interpolation Conservative improvement usage of memory (CHANGELOG) --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 663be2a..c79027a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Changes and new features: * Bugs fixing: * Bug on `cell_methods` serial write ([#53](https://earth.bsc.es/gitlab/es/NES/-/issues/53)) + * Horizontal Interpolation Conservative: Improvement on memory usage when calculating the weight matrix ([#54](https://earth.bsc.es/gitlab/es/NES/-/issues/54)) ### 1.1.0 * Release date: 2023/03/02 -- GitLab From e03c30b9d3bd0a5c41e2efd294e02f77e22fda7f Mon Sep 17 00:00:00 2001 From: Alba Vilanova Date: Wed, 8 Mar 2023 13:16:20 +0100 Subject: [PATCH 3/3] Add final prints to tests --- tests/1.1-test_read_write_projection.py | 4 ++++ tests/1.2-test_create_projection.py | 1 + tests/1.3-test_selecting.py | 2 +- tests/2.1-test_spatial_join.py | 1 + tests/2.2-test_create_shapefile.py | 1 + tests/2.3-test_bounds.py | 1 + tests/2.4-test_cell_area.py | 2 +- tests/3.1-test_vertical_interp.py | 1 + tests/3.2-test_horiz_interp_bilinear.py | 1 + tests/4.1-test_daily_stats.py | 1 + 10 files changed, 13 insertions(+), 2 deletions(-) diff --git a/tests/1.1-test_read_write_projection.py b/tests/1.1-test_read_write_projection.py index 37fd9cd..cd47aae 100644 --- a/tests/1.1-test_read_write_projection.py +++ b/tests/1.1-test_read_write_projection.py @@ -214,3 +214,7 @@ comm.Barrier() if rank == 0: print(result.loc[:, test_name]) sys.stdout.flush() + +if rank == 0: + result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/1.2-test_create_projection.py b/tests/1.2-test_create_projection.py index f6e4125..fe6f0ba 100644 --- a/tests/1.2-test_create_projection.py +++ b/tests/1.2-test_create_projection.py @@ -187,3 +187,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/1.3-test_selecting.py b/tests/1.3-test_selecting.py index 72ab997..68106b1 100644 --- a/tests/1.3-test_selecting.py +++ b/tests/1.3-test_selecting.py @@ -181,4 +181,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) - print("TEST PASSED SUCCESSFULLY") + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/2.1-test_spatial_join.py b/tests/2.1-test_spatial_join.py index 8aedaba..2daab1e 100644 --- a/tests/2.1-test_spatial_join.py +++ b/tests/2.1-test_spatial_join.py @@ -212,3 +212,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/2.2-test_create_shapefile.py b/tests/2.2-test_create_shapefile.py index d41b593..4c44ff0 100644 --- a/tests/2.2-test_create_shapefile.py +++ b/tests/2.2-test_create_shapefile.py @@ -198,3 +198,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/2.3-test_bounds.py b/tests/2.3-test_bounds.py index 3bee2ed..3e1c85a 100644 --- a/tests/2.3-test_bounds.py +++ b/tests/2.3-test_bounds.py @@ -158,3 +158,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/2.4-test_cell_area.py b/tests/2.4-test_cell_area.py index fde0b56..49821a0 100644 --- a/tests/2.4-test_cell_area.py +++ b/tests/2.4-test_cell_area.py @@ -192,4 +192,4 @@ del nessy if rank == 0: result.to_csv(result_path) - + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/3.1-test_vertical_interp.py b/tests/3.1-test_vertical_interp.py index d7bcfed..a285675 100644 --- a/tests/3.1-test_vertical_interp.py +++ b/tests/3.1-test_vertical_interp.py @@ -63,3 +63,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/3.2-test_horiz_interp_bilinear.py b/tests/3.2-test_horiz_interp_bilinear.py index 097085a..3018881 100644 --- a/tests/3.2-test_horiz_interp_bilinear.py +++ b/tests/3.2-test_horiz_interp_bilinear.py @@ -218,3 +218,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/4.1-test_daily_stats.py b/tests/4.1-test_daily_stats.py index dd00407..a32ed62 100644 --- a/tests/4.1-test_daily_stats.py +++ b/tests/4.1-test_daily_stats.py @@ -58,3 +58,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") -- GitLab