From 4cf31c18808b89eeb08a3b5adbc437e359c1c1c4 Mon Sep 17 00:00:00 2001 From: Alba Vilanova Date: Thu, 28 Jul 2022 10:39:57 +0200 Subject: [PATCH] Fix parallelization for points NES by T --- nes/create_nes.py | 16 ++--- nes/nc_projections/points_nes.py | 90 ++++++++++++++------------ nes/nc_projections/points_nes_ghost.py | 90 +++++++++++++++++++++++--- tests/2-nes_tests_by_projection.py | 2 +- 4 files changed, 138 insertions(+), 60 deletions(-) diff --git a/nes/create_nes.py b/nes/create_nes.py index 4ff494b..88819d2 100644 --- a/nes/create_nes.py +++ b/nes/create_nes.py @@ -49,25 +49,25 @@ def create_nes(comm=None, info=False, projection=None, parallel_method='Y', bala warnings.warn(msg) if projection is None: - if parallel_method != 'X': - raise ValueError("Parallel method must be 'X' to create points NES.") - nessy = PointsNes(comm=comm, dataset=None, xarray=None, info=info, parallel_method=parallel_method, + if parallel_method != 'Y': + raise ValueError("Parallel method must be 'X' or 'T' to create points NES.") + nessy = PointsNes(comm=comm, dataset=None, xarray=False, info=info, parallel_method=parallel_method, avoid_first_hours=0, avoid_last_hours=0, first_level=0, last_level=None, balanced=balanced, - create_nes=True, strlen=75, times=times, **kwargs) + create_nes=True, strlen=strlen, times=times, **kwargs) elif projection == 'regular': - nessy = LatLonNes(comm=comm, dataset=None, xarray=None, info=info, parallel_method=parallel_method, + nessy = LatLonNes(comm=comm, dataset=None, xarray=False, info=info, parallel_method=parallel_method, avoid_first_hours=0, avoid_last_hours=0, first_level=0, last_level=None, balanced=balanced, create_nes=True, times=times, **kwargs) elif projection == 'rotated': - nessy = RotatedNes(comm=comm, dataset=None, xarray=None, info=info, parallel_method=parallel_method, + nessy = RotatedNes(comm=comm, dataset=None, xarray=False, info=info, parallel_method=parallel_method, avoid_first_hours=0, avoid_last_hours=0, first_level=0, last_level=None, balanced=balanced, create_nes=True, times=times, **kwargs) elif projection == 'lcc': - nessy = LCCNes(comm=comm, dataset=None, xarray=None, info=info, parallel_method=parallel_method, + nessy = LCCNes(comm=comm, dataset=None, xarray=False, info=info, parallel_method=parallel_method, avoid_first_hours=0, avoid_last_hours=0, first_level=0, last_level=None, balanced=balanced, create_nes=True, times=times, **kwargs) elif projection == 'mercator': - nessy = MercatorNes(comm=comm, dataset=None, xarray=None, info=info, parallel_method=parallel_method, + nessy = MercatorNes(comm=comm, dataset=None, xarray=False, info=info, parallel_method=parallel_method, avoid_first_hours=0, avoid_last_hours=0, first_level=0, last_level=None, balanced=balanced, create_nes=True, times=times, **kwargs) else: diff --git a/nes/nc_projections/points_nes.py b/nes/nc_projections/points_nes.py index 8f38699..412e3be 100644 --- a/nes/nc_projections/points_nes.py +++ b/nes/nc_projections/points_nes.py @@ -49,7 +49,7 @@ class PointsNes(Nes): (Not working) Indicates if you want to use xarray as default parallel_method : str Indicates the parallelization method that you want. Default over Y axis - accepted values: ['X'] + accepted values: ['X', 'T'] avoid_first_hours : int Number of hours to remove from first time steps. avoid_last_hours : int @@ -108,7 +108,7 @@ class PointsNes(Nes): Number of hours to remove from last time steps. parallel_method : str Indicates the parallelization method that you want. Default over Y axis - accepted values: ['X'] + accepted values: ['X', 'T'] """ new = PointsNes(comm=comm, path=path, info=info, dataset=dataset, xarray=xarray, balanced=balanced, parallel_method=parallel_method, avoid_first_hours=avoid_first_hours, @@ -358,6 +358,7 @@ class PointsNes(Nes): if self.variables is not None: for i, (var_name, var_dict) in enumerate(self.variables.items()): + if var_dict['data'] is not None: # Get dimensions when reading datasets @@ -386,7 +387,7 @@ class PointsNes(Nes): for v in var_dict['data']]).astype('S' + str(self.strlen))) except: pass - + # Ensure data is of type numpy array (to create NES) if not isinstance(var_dict['data'], (np.ndarray, np.generic)): try: @@ -444,14 +445,18 @@ class PointsNes(Nes): for att_name, att_value in var_dict.items(): if att_name == 'data': - if len(var_dict['data'].shape) == 1: + if len(att_value.shape) == 1: try: var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']] = att_value except IndexError: raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, att_value.shape)) - elif len(var_dict['data'].shape) == 2: + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, + att_value.shape)) + elif len(att_value.shape) == 2: if 'strlen' in var_dict['dimensions']: try: var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], :] = att_value @@ -459,6 +464,10 @@ class PointsNes(Nes): raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], :].shape, att_value.shape)) + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], :].shape, + att_value.shape)) else: try: var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], @@ -467,7 +476,12 @@ class PointsNes(Nes): raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, - att_value.shape)) + att_value.shape)) + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], + self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, + att_value.shape)) if self.print_info: print("Rank {0:03d}: Var {1} data ({2}/{3})".format(self.rank, var_name, i + 1, len(self.variables))) @@ -496,7 +510,6 @@ class PointsNes(Nes): data_list: dict Variables dictionary with all the data from all the ranks. """ - data_list = deepcopy(self.variables) for var_name, var_info in data_list.items(): try: @@ -505,20 +518,41 @@ class PointsNes(Nes): if self.rank == 0: shp_len = len(data_list[var_name]['data'].shape) if self.parallel_method == 'X': + # concatenate over station if shp_len == 1: - # concatenate over first axis (station) when dims are (station) + # dimensions = (station) axis = 0 elif shp_len == 2: if 'strlen' in var_info['dimensions']: - # concatenate over first axis (station) when dims are (station, strlen) + # dimensions = (station, strlen) axis = 0 else: - # concatenate over second axis (station) when dims are (time, station) - axis = 1 + # dimensions = (time, station) + axis = 1 + else: + msg = 'The points NetCDF must have ' + msg += 'surface values (without levels).' + raise NotImplementedError(msg) + elif self.parallel_method == 'T': + # concatenate over time + if shp_len == 1: + # dimensions = (station) + axis = None + continue + elif shp_len == 2: + if 'strlen' in var_info['dimensions']: + # dimensions = (station, strlen) + axis = None + continue + else: + # dimensions = (time, station) + axis = 0 + else: + raise NotImplementedError('The points NetCDF must only have surface values (without levels).') else: - raise NotImplementedError( + raise NotImplementedError( "Parallel method '{meth}' is not implemented. Use one of these: {accept}".format( - meth=self.parallel_method, accept=['X'])) + meth=self.parallel_method, accept=['X', 'T'])) data_list[var_name]['data'] = np.concatenate(data_aux, axis=axis) except Exception as e: print("**ERROR** an error hase occur while gathering the '{0}' variable.\n".format(var_name)) @@ -582,33 +616,3 @@ class PointsNes(Nes): lons[:] = self._lon['data'] return None - - def set_read_axis_limits(self): - """ - Calculate the 4D reading axis limits - - Returns - ------- - dict - Dictionary with the 2D limits of the rank data to read. - t_min, t_max, x_min and x_max - """ - - axis_limits = {'x_min': None, 'x_max': None, - 't_min': None, 't_max': None} - - if self.parallel_method == 'X': - x_len = self._lon['data'].shape[-1] - if x_len < self.size: - raise IndexError('More processors (size={0}) selected than X elements (size={1})'.format(self.size, x_len)) - axis_limits['x_min'] = (x_len // self.size) * self.rank - if self.rank + 1 < self.size: - axis_limits['x_max'] = (x_len // self.size) * (self.rank + 1) - # Spin up - axis_limits['t_min'] = self.get_time_id(self.hours_start, first=True) - axis_limits['t_max'] = self.get_time_id(self.hours_end, first=False) - else: - raise NotImplementedError("Parallel method '{meth}' is not implemented. Use one of these: {accept}".format( - meth=self.parallel_method, accept=['X'])) - - return axis_limits diff --git a/nes/nc_projections/points_nes_ghost.py b/nes/nc_projections/points_nes_ghost.py index 386e4f3..ee85935 100644 --- a/nes/nc_projections/points_nes_ghost.py +++ b/nes/nc_projections/points_nes_ghost.py @@ -286,14 +286,20 @@ class PointsNesGHOST(PointsNes): for att_name, att_value in var_dict.items(): if att_name == 'data': - if len(var_dict['data'].shape) == 1: + print(att_value) + print(att_value.shape) + if len(att_value.shape) == 1: try: var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']] = att_value except IndexError: raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, att_value.shape)) - elif len(var_dict['data'].shape) == 2: + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, + att_value.shape)) + elif len(att_value.shape) == 2: try: var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], self.write_axis_limits['t_min']:self.write_axis_limits['t_max']] = att_value @@ -302,7 +308,12 @@ class PointsNesGHOST(PointsNes): var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], self.write_axis_limits['t_min']:self.write_axis_limits['t_max']].shape, att_value.shape)) - elif len(var_dict['data'].shape) == 3: + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], + self.write_axis_limits['t_min']:self.write_axis_limits['t_max']].shape, + att_value.shape)) + elif len(att_value.shape) == 3: try: var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], @@ -313,7 +324,13 @@ class PointsNesGHOST(PointsNes): self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], :].shape, att_value.shape)) - + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], + self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], + :].shape, + att_value.shape)) + if self.print_info: print("Rank {0:03d}: Var {1} data ({2}/{3})".format(self.rank, var_name, i + 1, len(self.variables))) @@ -333,6 +350,34 @@ class PointsNesGHOST(PointsNes): msg += 'Variable {0} was not loaded. It will not be written.'.format(var_name) warnings.warn(msg) + def to_netcdf(self, path, compression_level=0, serial=False, info=False, chunking=False): + """ + Write the netCDF output file + + Parameters + ---------- + path : str + Path to the output netCDF file + compression_level : int + Level of compression (0 to 9) Default: 0 (no compression) + serial : bool + Indicates if you want to write in serial or not. Default: False + info : bool + Indicates if you want to print the information of each writing step by stdout Default: False + chunking : bool + Indicates if you want a chunked netCDF output. Only available with non serial writes. Default: False + """ + + if not serial: + msg = 'WARNING!!! ' + msg += 'GHOST datasets cannot be written in parallel yet.' + msg += 'Changing to serial mode.' + warnings.warn(msg) + super(PointsNesGHOST, self).to_netcdf(path, compression_level=compression_level, + serial=True, info=info, chunking=chunking) + + return None + def _gather_data(self): """ Gather all the variable data into the MPI rank 0 to perform a serial write. @@ -342,19 +387,48 @@ class PointsNesGHOST(PointsNes): data_list: dict Variables dictionary with all the data from all the ranks. """ - data_list = deepcopy(self.variables) - for var_name in data_list.keys(): + for var_name, var_info in data_list.items(): try: # noinspection PyArgumentList data_aux = self.comm.gather(data_list[var_name]['data'], root=0) if self.rank == 0: + shp_len = len(data_list[var_name]['data'].shape) + # concatenate over station if self.parallel_method == 'X': - axis = 0 + if shp_len == 1: + # dimensions = (station) + axis = 0 + elif shp_len == 2: + # dimensions = (station, strlen) or + # dimensions = (station, time) + axis = 0 + else: + msg = 'The points NetCDF must have ' + msg += 'surface values (without levels).' + raise NotImplementedError(msg) + elif self.parallel_method == 'T': + # concatenate over time + if shp_len == 1: + # dimensions = (station) + axis = None + continue + elif shp_len == 2: + if 'strlen' in var_info['dimensions']: + # dimensions = (station, strlen) + axis = None + continue + else: + # dimensions = (station, time) + axis = 1 + else: + msg = 'The points NetCDF must have ' + msg += 'surface values (without levels).' + raise NotImplementedError(msg) else: raise NotImplementedError( "Parallel method '{meth}' is not implemented. Use one of these: {accept}".format( - meth=self.parallel_method, accept=['X'])) + meth=self.parallel_method, accept=['X', 'T'])) data_list[var_name]['data'] = np.concatenate(data_aux, axis=axis) except Exception as e: print("**ERROR** an error hase occur while gathering the '{0}' variable.\n".format(var_name)) diff --git a/tests/2-nes_tests_by_projection.py b/tests/2-nes_tests_by_projection.py index 569bc52..93b4b47 100644 --- a/tests/2-nes_tests_by_projection.py +++ b/tests/2-nes_tests_by_projection.py @@ -16,7 +16,7 @@ paths = {'regular_file': {'path': '/gpfs/scratch/bsc32/bsc32538/mr_multiplyby/or 'points_file': {'path': '/esarchive/obs/eea/eionet/hourly/pm10/pm10_202107.nc', 'projection': 'points', 'variables': [], # all - 'parallel_methods': ['X']}, + 'parallel_methods': ['X', 'T']}, 'points_ghost_file': {'path': '/gpfs/projects/bsc32/AC_cache/obs/ghost/EANET/1.4/daily/sconcso4/sconcso4_201911.nc', 'projection': 'points_ghost', 'variables': [], # all -- GitLab