diff --git a/nes/nc_projections/points_nes.py b/nes/nc_projections/points_nes.py index 1f1ef991bef644d7a08325a567bfbf2de132fb92..b1421c7ec7aa522ba91dbd9c1567d60e2aa22cd3 100644 --- a/nes/nc_projections/points_nes.py +++ b/nes/nc_projections/points_nes.py @@ -49,7 +49,7 @@ class PointsNes(Nes): (Not working) Indicates if you want to use xarray as default parallel_method : str Indicates the parallelization method that you want. Default over Y axis - accepted values: ['X'] + accepted values: ['X', 'T'] avoid_first_hours : int Number of hours to remove from first time steps. avoid_last_hours : int @@ -107,8 +107,8 @@ class PointsNes(Nes): avoid_last_hours : int Number of hours to remove from last time steps. parallel_method : str - Indicates the parallelization method that you want. Default over X axis - accepted values: ['X'] + Indicates the parallelization method that you want. Default over Y axis + accepted values: ['X', 'T'] """ new = PointsNes(comm=comm, path=path, info=info, dataset=dataset, xarray=xarray, balanced=balanced, parallel_method=parallel_method, avoid_first_hours=avoid_first_hours, @@ -358,6 +358,7 @@ class PointsNes(Nes): if self.variables is not None: for i, (var_name, var_dict) in enumerate(self.variables.items()): + if var_dict['data'] is not None: # Get dimensions when reading datasets @@ -386,7 +387,7 @@ class PointsNes(Nes): for v in var_dict['data']]).astype('S' + str(self.strlen))) except: pass - + # Ensure data is of type numpy array (to create NES) if not isinstance(var_dict['data'], (np.ndarray, np.generic)): try: @@ -444,21 +445,29 @@ class PointsNes(Nes): for att_name, att_value in var_dict.items(): if att_name == 'data': - if len(var_dict['data'].shape) == 1: + if len(att_value.shape) == 1: try: var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']] = att_value except IndexError: raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, att_value.shape)) - elif len(var_dict['data'].shape) == 2: - if 'strlen' in var_dims: + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, + att_value.shape)) + elif len(att_value.shape) == 2: + if 'strlen' in var_dict['dimensions']: try: var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], :] = att_value except IndexError: raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], :].shape, att_value.shape)) + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], :].shape, + att_value.shape)) else: try: var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], @@ -467,7 +476,12 @@ class PointsNes(Nes): raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, - att_value.shape)) + att_value.shape)) + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], + self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, + att_value.shape)) if self.print_info: print("Rank {0:03d}: Var {1} data ({2}/{3})".format(self.rank, var_name, i + 1, len(self.variables))) @@ -496,7 +510,6 @@ class PointsNes(Nes): data_list: dict Variables dictionary with all the data from all the ranks. """ - data_list = deepcopy(self.variables) for var_name, var_info in data_list.items(): try: @@ -505,20 +518,41 @@ class PointsNes(Nes): if self.rank == 0: shp_len = len(data_list[var_name]['data'].shape) if self.parallel_method == 'X': + # concatenate over station if shp_len == 1: - # concatenate over first axis (station) when dims are (station) + # dimensions = (station) axis = 0 elif shp_len == 2: - if 'dimensions' in var_info.keys() and 'strlen' in var_info['dimensions']: - # concatenate over first axis (station) when dims are (station, strlen) + if 'strlen' in var_info['dimensions']: + # dimensions = (station, strlen) axis = 0 else: - # concatenate over second axis (station) when dims are (time, station) - axis = 1 + # dimensions = (time, station) + axis = 1 + else: + msg = 'The points NetCDF must have ' + msg += 'surface values (without levels).' + raise NotImplementedError(msg) + elif self.parallel_method == 'T': + # concatenate over time + if shp_len == 1: + # dimensions = (station) + axis = None + continue + elif shp_len == 2: + if 'strlen' in var_info['dimensions']: + # dimensions = (station, strlen) + axis = None + continue + else: + # dimensions = (time, station) + axis = 0 + else: + raise NotImplementedError('The points NetCDF must only have surface values (without levels).') else: - raise NotImplementedError( + raise NotImplementedError( "Parallel method '{meth}' is not implemented. Use one of these: {accept}".format( - meth=self.parallel_method, accept=['X'])) + meth=self.parallel_method, accept=['X', 'T'])) data_list[var_name]['data'] = np.concatenate(data_aux, axis=axis) except Exception as e: print("**ERROR** an error hase occur while gathering the '{0}' variable.\n".format(var_name)) diff --git a/nes/nc_projections/points_nes_ghost.py b/nes/nc_projections/points_nes_ghost.py index 21ba416981bec269ee8860d92070c28ac94b4784..9b8ab4c75d4af6660b6fcc2eaf2dcf40b95c7f39 100644 --- a/nes/nc_projections/points_nes_ghost.py +++ b/nes/nc_projections/points_nes_ghost.py @@ -324,23 +324,34 @@ class PointsNesGHOST(PointsNes): for att_name, att_value in var_dict.items(): if att_name == 'data': - if len(var_dict['data'].shape) == 1: + print(att_value) + print(att_value.shape) + if len(att_value.shape) == 1: try: var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']] = att_value except IndexError: raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, att_value.shape)) - elif len(var_dict['data'].shape) == 2: + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, + att_value.shape)) + elif len(att_value.shape) == 2: try: var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], self.write_axis_limits['t_min']:self.write_axis_limits['t_max']] = att_value except IndexError: raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], - self.write_axis_limits['t_min']:self.write_axis_limits['t_max']].shape, - att_value.shape)) - elif len(var_dict['data'].shape) == 3: + self.write_axis_limits['t_min']:self.write_axis_limits['t_max']].shape, + att_value.shape)) + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], + self.write_axis_limits['t_min']:self.write_axis_limits['t_max']].shape, + att_value.shape)) + elif len(att_value.shape) == 3: try: var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], @@ -348,10 +359,16 @@ class PointsNesGHOST(PointsNes): except IndexError: raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], - self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], - :].shape, + self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], + :].shape, + att_value.shape)) + except ValueError: + raise ValueError("Axis limits cannot be accessed. out_shape={0}, data_shp={1}".format( + var[self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], + self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], + :].shape, att_value.shape)) - + if self.print_info: print("Rank {0:03d}: Var {1} data ({2}/{3})".format(self.rank, var_name, i + 1, len(self.variables))) @@ -371,6 +388,34 @@ class PointsNesGHOST(PointsNes): msg += 'Variable {0} was not loaded. It will not be written.'.format(var_name) warnings.warn(msg) + def to_netcdf(self, path, compression_level=0, serial=False, info=False, chunking=False): + """ + Write the netCDF output file + + Parameters + ---------- + path : str + Path to the output netCDF file + compression_level : int + Level of compression (0 to 9) Default: 0 (no compression) + serial : bool + Indicates if you want to write in serial or not. Default: False + info : bool + Indicates if you want to print the information of each writing step by stdout Default: False + chunking : bool + Indicates if you want a chunked netCDF output. Only available with non serial writes. Default: False + """ + + if not serial: + msg = 'WARNING!!! ' + msg += 'GHOST datasets cannot be written in parallel yet.' + msg += 'Changing to serial mode.' + warnings.warn(msg) + super(PointsNesGHOST, self).to_netcdf(path, compression_level=compression_level, + serial=True, info=info, chunking=chunking) + + return None + def _gather_data(self): """ Gather all the variable data into the MPI rank 0 to perform a serial write. @@ -380,19 +425,48 @@ class PointsNesGHOST(PointsNes): data_list: dict Variables dictionary with all the data from all the ranks. """ - data_list = deepcopy(self.variables) - for var_name in data_list.keys(): + for var_name, var_info in data_list.items(): try: # noinspection PyArgumentList data_aux = self.comm.gather(data_list[var_name]['data'], root=0) if self.rank == 0: + shp_len = len(data_list[var_name]['data'].shape) + # concatenate over station if self.parallel_method == 'X': - axis = 0 + if shp_len == 1: + # dimensions = (station) + axis = 0 + elif shp_len == 2: + # dimensions = (station, strlen) or + # dimensions = (station, time) + axis = 0 + else: + msg = 'The points NetCDF must have ' + msg += 'surface values (without levels).' + raise NotImplementedError(msg) + elif self.parallel_method == 'T': + # concatenate over time + if shp_len == 1: + # dimensions = (station) + axis = None + continue + elif shp_len == 2: + if 'strlen' in var_info['dimensions']: + # dimensions = (station, strlen) + axis = None + continue + else: + # dimensions = (station, time) + axis = 1 + else: + msg = 'The points NetCDF must have ' + msg += 'surface values (without levels).' + raise NotImplementedError(msg) else: raise NotImplementedError( "Parallel method '{meth}' is not implemented. Use one of these: {accept}".format( - meth=self.parallel_method, accept=['X'])) + meth=self.parallel_method, accept=['X', 'T'])) data_list[var_name]['data'] = np.concatenate(data_aux, axis=axis) except Exception as e: print("**ERROR** an error hase occur while gathering the '{0}' variable.\n".format(var_name)) @@ -404,4 +478,4 @@ class PointsNesGHOST(PointsNes): self.comm.Abort(1) raise e - return data_list + return data_list \ No newline at end of file diff --git a/tests/2-nes_tests_by_projection.py b/tests/2-nes_tests_by_projection.py index 569bc52606baaba5e60f596db9c271d190a432e8..93b4b4734b92f93f74313317e6f12c12aff0613d 100644 --- a/tests/2-nes_tests_by_projection.py +++ b/tests/2-nes_tests_by_projection.py @@ -16,7 +16,7 @@ paths = {'regular_file': {'path': '/gpfs/scratch/bsc32/bsc32538/mr_multiplyby/or 'points_file': {'path': '/esarchive/obs/eea/eionet/hourly/pm10/pm10_202107.nc', 'projection': 'points', 'variables': [], # all - 'parallel_methods': ['X']}, + 'parallel_methods': ['X', 'T']}, 'points_ghost_file': {'path': '/gpfs/projects/bsc32/AC_cache/obs/ghost/EANET/1.4/daily/sconcso4/sconcso4_201911.nc', 'projection': 'points_ghost', 'variables': [], # all