diff --git a/CHANGELOG.md b/CHANGELOG.md index 663be2ab22b5baecb9c68ea7bc130c2a3472d6dd..c79027aa14aae72985ec10f62c7124cbe2ca493f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Changes and new features: * Bugs fixing: * Bug on `cell_methods` serial write ([#53](https://earth.bsc.es/gitlab/es/NES/-/issues/53)) + * Horizontal Interpolation Conservative: Improvement on memory usage when calculating the weight matrix ([#54](https://earth.bsc.es/gitlab/es/NES/-/issues/54)) ### 1.1.0 * Release date: 2023/03/02 diff --git a/nes/methods/horizontal_interpolation.py b/nes/methods/horizontal_interpolation.py index 897327fb04b2f6b288a939e868a5568a6e7c0518..58a2abc4448edd23b7de4b632d65957aa69c0f36 100644 --- a/nes/methods/horizontal_interpolation.py +++ b/nes/methods/horizontal_interpolation.py @@ -4,6 +4,7 @@ import sys import warnings import numpy as np import pandas as pd +from geopandas import GeoSeries import os import nes from mpi4py import MPI @@ -13,6 +14,8 @@ from datetime import datetime from warnings import warn import copy import pyproj +import gc +import psutil # CONSTANTS NEAREST_OPTS = ['NearestNeighbour', 'NearestNeighbours', 'nn', 'NN'] @@ -33,7 +36,7 @@ def interpolate_horizontal(self, dst_grid, weight_matrix_path=None, kind='Neares weight_matrix_path : str, None Path to the weight matrix to read/create. kind : str - Kind of horizontal methods. Accepted values: ['NearestNeighbour', 'Conservative']. + Kind of horizontal interpolation. Accepted values: ['NearestNeighbour', 'Conservative']. n_neighbours : int Used if kind == NearestNeighbour. Number of nearest neighbours to interpolate. Default: 4. info : bool @@ -94,7 +97,7 @@ def interpolate_horizontal(self, dst_grid, weight_matrix_path=None, kind='Neares # Apply weights for var_name, var_info in self.variables.items(): if info and self.master: - print("\t{var} horizontal methods".format(var=var_name)) + print("\t{var} horizontal interpolation".format(var=var_name)) sys.stdout.flush() src_shape = var_info['data'].shape if isinstance(dst_grid, nes.PointsNes): @@ -207,7 +210,7 @@ def get_weights_idx_t_axis(self, dst_grid, weight_matrix_path, kind, n_neighbour weight_matrix_path : str, None Path to the weight matrix to read/create. kind : str - Kind of horizontal methods. Accepted values: ['NearestNeighbour', 'Conservative']. + Kind of horizontal interpolation. Accepted values: ['NearestNeighbour', 'Conservative']. n_neighbours : int Used if kind == NearestNeighbour. Number of nearest neighbours to interpolate. Default: 4. only_create : bool @@ -301,7 +304,7 @@ def get_weights_idx_xy_axis(self, dst_grid, weight_matrix_path, kind, n_neighbou weight_matrix_path : str, None Path to the weight matrix to read/create. kind : str - Kind of horizontal methods. Accepted values: ['NearestNeighbour', 'Conservative']. + Kind of horizontal interpolation. Accepted values: ['NearestNeighbour', 'Conservative']. n_neighbours : int Used if kind == NearestNeighbour. Number of nearest neighbours to interpolate. Default: 4. only_create : bool @@ -540,83 +543,83 @@ def create_area_conservative_weight_matrix(self, dst_nes, wm_path=None, info=Fal if info and self.master: print("\tCreating area conservative Weight Matrix") sys.stdout.flush() + + my_crs = pyproj.CRS.from_proj4("+proj=latlon") # Common projection for both shapefiles + # Get a portion of the destiny grid if dst_nes.shapefile is None: dst_nes.create_shapefile() dst_grid = copy.deepcopy(dst_nes.shapefile) - # Get the complete source grid + # Formatting Destination grid + dst_grid.to_crs(crs=my_crs, inplace=True) + dst_grid['FID_dst'] = dst_grid.index + + # Preparing Source grid if self.shapefile is None: self.create_shapefile() src_grid = copy.deepcopy(self.shapefile) - if self.parallel_method == 'T': - # All process has the same shapefile - pass - else: + # Formatting Source grid + src_grid.to_crs(crs=my_crs, inplace=True) + + # Serialize index intersection function to avoid memory problems + if self.size > 1 and self.parallel_method != 'T': src_grid = self.comm.gather(src_grid, root=0) + dst_grid = self.comm.gather(dst_grid, root=0) if self.master: src_grid = pd.concat(src_grid) - src_grid = self.comm.bcast(src_grid) - - my_crs = pyproj.CRS.from_proj4("+proj=latlon") - # Normalizing projections - dst_grid.to_crs(crs=my_crs, inplace=True) - dst_grid['FID_dst'] = dst_grid.index - dst_grid = dst_grid.reset_index() - - # src_grid.to_crs(crs=pyproj.Proj(proj='geocent', ellps='WGS84', datum='WGS84').crs, inplace=True) - src_grid.to_crs(crs=my_crs, inplace=True) - src_grid['FID_src'] = src_grid.index + dst_grid = pd.concat(dst_grid) + if self.master: + src_grid['FID_src'] = src_grid.index + src_grid = src_grid.reset_index() + dst_grid = dst_grid.reset_index() + fid_src, fid_dst = dst_grid.sindex.query_bulk(src_grid.geometry, predicate='intersects') + + # Calculate intersected areas and fractions + intersection_df = pd.DataFrame(columns=["FID_src", "FID_dst"]) + + intersection_df['FID_src'] = np.array(src_grid.loc[fid_src, 'FID_src'], dtype=np.uint32) + intersection_df['FID_dst'] = np.array(dst_grid.loc[fid_dst, 'FID_dst'], dtype=np.uint32) + + intersection_df['geometry_src'] = src_grid.loc[fid_src, 'geometry'].values + intersection_df['geometry_dst'] = dst_grid.loc[fid_dst, 'geometry'].values + del src_grid, dst_grid, fid_src, fid_dst + # Split the array into smaller arrays in order to scatter the data among the processes + intersection_df = np.array_split(intersection_df, self.size) + else: + intersection_df = None - src_grid = src_grid.reset_index() + intersection_df = self.comm.scatter(intersection_df, root=0) if info and self.master: print("\t\tGrids created and ready to interpolate") sys.stdout.flush() - - # Get intersected areas - inp, res = dst_grid.sindex.query_bulk(src_grid.geometry, predicate='intersects') - - # Calculate intersected areas and fractions - intersection = pd.DataFrame(columns=["FID_src", "FID_dst"]) - intersection['INP'] = np.array(inp, dtype=np.uint32) - intersection['RES'] = np.array(res, dtype=np.uint32) - intersection['FID_src'] = np.array(src_grid.loc[inp, 'FID_src'], dtype=np.uint32) - intersection['FID_dst'] = np.array(dst_grid.loc[res, 'FID_dst'], dtype=np.uint32) - - intersection['geometry_src'] = src_grid.loc[inp, 'geometry'].values - intersection['geometry_dst'] = dst_grid.loc[res, 'geometry'].values - if True: # No Warnings Zone warnings.filterwarnings('ignore') - intersection['intersect_area'] = intersection.apply( - lambda x: x['geometry_src'].intersection(x['geometry_dst']).buffer(0).area, axis=1) - intersection.drop(intersection[intersection['intersect_area'] <= 0].index, inplace=True) - - intersection["src_area"] = src_grid.loc[intersection['FID_src'], 'geometry'].area.values + intersection_df['weight'] = np.array(intersection_df.apply( + lambda x: x['geometry_src'].intersection(x['geometry_dst']).buffer(0).area / x['geometry_src'].area, + axis=1), dtype=np.float64) + intersection_df.drop(columns=["geometry_src", "geometry_dst"], inplace=True) + gc.collect() warnings.filterwarnings('default') - intersection['weight'] = intersection['intersect_area'] / intersection["src_area"] - # Format & Clean - intersection.drop(columns=["geometry_src", "geometry_dst", "src_area", "intersect_area"], inplace=True) - if info and self.master: print("\t\tWeights calculated. Formatting weight matrix.") sys.stdout.flush() # Initialising weight matrix if self.parallel_method != 'T': - intersection = self.comm.gather(intersection, root=0) + intersection_df = self.comm.gather(intersection_df, root=0) if self.master: if self.parallel_method != 'T': - intersection = pd.concat(intersection) - intersection = intersection.set_index(['FID_dst', intersection.groupby('FID_dst').cumcount()]).rename_axis( + intersection_df = pd.concat(intersection_df) + intersection_df = intersection_df.set_index(['FID_dst', intersection_df.groupby('FID_dst').cumcount()]).rename_axis( ('FID', 'level')).sort_index() - intersection.rename(columns={"FID_src": "idx"}, inplace=True) + intersection_df.rename(columns={"FID_src": "idx"}, inplace=True) weight_matrix = dst_nes.copy() weight_matrix.time = [datetime(year=2000, month=1, day=1, hour=0, second=0, microsecond=0)] weight_matrix._time = [datetime(year=2000, month=1, day=1, hour=0, second=0, microsecond=0)] @@ -629,7 +632,7 @@ def create_area_conservative_weight_matrix(self, dst_nes, wm_path=None, info=Fal weight_matrix.set_communicator(MPI.COMM_SELF) - weight_matrix.set_levels({'data': np.arange(intersection.index.get_level_values('level').max() + 1), + weight_matrix.set_levels({'data': np.arange(intersection_df.index.get_level_values('level').max() + 1), 'dimensions': ('lev',), 'units': '', 'positive': 'up'}) @@ -653,7 +656,7 @@ def create_area_conservative_weight_matrix(self, dst_nes, wm_path=None, info=Fal # Filling Weight matrix variables for aux_lev in weight_matrix.lev['data']: - aux_data = intersection.xs(level='level', key=aux_lev) + aux_data = intersection_df.xs(level='level', key=aux_lev) weight_matrix.variables['weight']['data'][0, aux_lev, aux_data.index] = aux_data.loc[:, 'weight'].values weight_matrix.variables['idx']['data'][0, aux_lev, aux_data.index] = aux_data.loc[:, 'idx'].values # Re-shaping diff --git a/tests/1.1-test_read_write_projection.py b/tests/1.1-test_read_write_projection.py index 37fd9cd6d57cc6ebea829a624f809ee2e3fcac26..cd47aaed949f6c238b348d0ab6de3ef5051b4d01 100644 --- a/tests/1.1-test_read_write_projection.py +++ b/tests/1.1-test_read_write_projection.py @@ -214,3 +214,7 @@ comm.Barrier() if rank == 0: print(result.loc[:, test_name]) sys.stdout.flush() + +if rank == 0: + result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/1.2-test_create_projection.py b/tests/1.2-test_create_projection.py index f6e41254bcd23035df7ea9333687f311e569f142..fe6f0ba597ba1746ac721d6ad8c99ff84bb0474f 100644 --- a/tests/1.2-test_create_projection.py +++ b/tests/1.2-test_create_projection.py @@ -187,3 +187,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/1.3-test_selecting.py b/tests/1.3-test_selecting.py index 72ab997edfd221cc944309136e194ba57e04a40f..68106b1f08e32533888ae7b43887999ec98f17d2 100644 --- a/tests/1.3-test_selecting.py +++ b/tests/1.3-test_selecting.py @@ -181,4 +181,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) - print("TEST PASSED SUCCESSFULLY") + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/2.1-test_spatial_join.py b/tests/2.1-test_spatial_join.py index 8aedaba3c58dda1b7daa9da3de868f0f8621ad59..2daab1e66d0803011c917727a3e684de3433cacf 100644 --- a/tests/2.1-test_spatial_join.py +++ b/tests/2.1-test_spatial_join.py @@ -212,3 +212,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/2.2-test_create_shapefile.py b/tests/2.2-test_create_shapefile.py index d41b59304d5158ffb1f9d53f4f45615e57a41574..4c44ff01a4a8fa480411ba1b671e7b01e76c0332 100644 --- a/tests/2.2-test_create_shapefile.py +++ b/tests/2.2-test_create_shapefile.py @@ -198,3 +198,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/2.3-test_bounds.py b/tests/2.3-test_bounds.py index 3bee2ede76cee4e96dd3e9031ce4209505e292a7..3e1c85a3ae0e9eec4e0d40c8c2678f3008dcd037 100644 --- a/tests/2.3-test_bounds.py +++ b/tests/2.3-test_bounds.py @@ -158,3 +158,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/2.4-test_cell_area.py b/tests/2.4-test_cell_area.py index fde0b56a3fcb4992c7a4e3f877caeabe3c864518..49821a0f9dd22df5c0aac5f7bc1aaf48dcb46e02 100644 --- a/tests/2.4-test_cell_area.py +++ b/tests/2.4-test_cell_area.py @@ -192,4 +192,4 @@ del nessy if rank == 0: result.to_csv(result_path) - + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/3.1-test_vertical_interp.py b/tests/3.1-test_vertical_interp.py index d7bcfed201572bdbe10fb50c1c1314f8367398f8..a28567559832a7af81e38f61aa7785f17218dcfe 100644 --- a/tests/3.1-test_vertical_interp.py +++ b/tests/3.1-test_vertical_interp.py @@ -63,3 +63,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/3.2-test_horiz_interp_bilinear.py b/tests/3.2-test_horiz_interp_bilinear.py index 097085a7b85704be793ec60831419c652453dd56..30188814b9d92b7f1c71ca9066ec80be90277631 100644 --- a/tests/3.2-test_horiz_interp_bilinear.py +++ b/tests/3.2-test_horiz_interp_bilinear.py @@ -218,3 +218,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/3.3-test_horiz_interp_conservative.py b/tests/3.3-test_horiz_interp_conservative.py index 51981244a7e9a54a3cbc38defe32f4fe9f000f44..a10f27df87c8ad5282014892d4ef34efa0508c13 100644 --- a/tests/3.3-test_horiz_interp_conservative.py +++ b/tests/3.3-test_horiz_interp_conservative.py @@ -16,9 +16,12 @@ result_path = "Times_test_3.3_horiz_interp_conservative.py_{0}_{1:03d}.csv".form result = pd.DataFrame(index=['read', 'calculate', 'write'], columns=['3.3.1.Only interp', '3.3.2.Create_WM', "3.3.3.Use_WM", "3.3.4.Read_WM"]) -# NAMEE src_path = "/gpfs/projects/bsc32/models/NES_tutorial_data/MONARCH_d01_2022111512.nc" +src_type = 'NAMEE' var_list = ['O3'] +# src_path = "/gpfs/projects/bsc32/models/NES_tutorial_data/nox_no_201505.nc" +# src_type = 'CAMS_glob_antv21' +# var_list = ['nox_no'] # ====================================================================================================================== # ====================================== Only interp ===================================================== @@ -52,13 +55,15 @@ y_0 = -797137.125 dst_nes = create_nes(comm=None, info=False, projection='lcc', lat_1=lat_1, lat_2=lat_2, lon_0=lon_0, lat_0=lat_0, nx=nx, ny=ny, inc_x=inc_x, inc_y=inc_y, x_0=x_0, y_0=y_0, parallel_method=parallel_method, times=src_nes.get_full_times()) +dst_type = "IP" + comm.Barrier() result.loc['read', test_name] = timeit.default_timer() - st_time st_time = timeit.default_timer() # INTERPOLATE -interp_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative') +interp_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative', info=False) # interp_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative', weight_matrix_path='T_WM.nc') comm.Barrier() result.loc['calculate', test_name] = timeit.default_timer() - st_time @@ -104,18 +109,21 @@ y_0 = -797137.125 dst_nes = create_nes(comm=None, info=False, projection='lcc', lat_1=lat_1, lat_2=lat_2, lon_0=lon_0, lat_0=lat_0, nx=nx, ny=ny, inc_x=inc_x, inc_y=inc_y, x_0=x_0, y_0=y_0, parallel_method=parallel_method, times=src_nes.get_full_times()) +dst_type = "IP" + comm.Barrier() result.loc['read', test_name] = timeit.default_timer() - st_time # Cleaning WM -if os.path.exists("CONS_WM_NAMEE_to_IP.nc") and rank == 0: - os.remove("CONS_WM_NAMEE_to_IP.nc") +if os.path.exists("CONS_WM_{0}_to_{1}.nc".format(src_type, dst_type)) and rank == 0: + os.remove("CONS_WM_{0}_to_{1}.nc".format(src_type, dst_type)) comm.Barrier() # INTERPOLATE st_time = timeit.default_timer() wm_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative', info=True, - weight_matrix_path="CONS_WM_NAMEE_to_IP.nc", only_create_wm=True) + weight_matrix_path="CONS_WM_{0}_to_{1}.nc".format(src_type, dst_type), + only_create_wm=True) comm.Barrier() result.loc['calculate', test_name] = timeit.default_timer() - st_time @@ -160,6 +168,8 @@ y_0 = -797137.125 dst_nes = create_nes(comm=None, info=False, projection='lcc', lat_1=lat_1, lat_2=lat_2, lon_0=lon_0, lat_0=lat_0, nx=nx, ny=ny, inc_x=inc_x, inc_y=inc_y, x_0=x_0, y_0=y_0, parallel_method=parallel_method, times=src_nes.get_full_times()) +dst_type = "IP" + comm.Barrier() result.loc['read', test_name] = timeit.default_timer() - st_time @@ -210,12 +220,15 @@ y_0 = -797137.125 dst_nes = create_nes(comm=None, info=False, projection='lcc', lat_1=lat_1, lat_2=lat_2, lon_0=lon_0, lat_0=lat_0, nx=nx, ny=ny, inc_x=inc_x, inc_y=inc_y, x_0=x_0, y_0=y_0, parallel_method=parallel_method, times=src_nes.get_full_times()) +dst_type = "IP" + comm.Barrier() result.loc['read', test_name] = timeit.default_timer() - st_time # INTERPOLATE st_time = timeit.default_timer() -interp_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative', weight_matrix_path="CONS_WM_NAMEE_to_IP.nc") +interp_nes = src_nes.interpolate_horizontal(dst_grid=dst_nes, kind='Conservative', + weight_matrix_path="CONS_WM_{0}_to_{1}.nc".format(src_type, dst_type)) comm.Barrier() result.loc['calculate', test_name] = timeit.default_timer() - st_time @@ -232,3 +245,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tests/4.1-test_daily_stats.py b/tests/4.1-test_daily_stats.py index dd00407b3206e3927de1bc3eb0dd5755b7c47ea7..a32ed62482141a1cbdcded7033b267c14d0a0b1f 100644 --- a/tests/4.1-test_daily_stats.py +++ b/tests/4.1-test_daily_stats.py @@ -58,3 +58,4 @@ sys.stdout.flush() if rank == 0: result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!")