From 1a4581217567dd236e9e1d65980dad8a0e7a4905 Mon Sep 17 00:00:00 2001 From: ctena Date: Thu, 16 Mar 2023 18:03:20 +0100 Subject: [PATCH 1/2] Write by time step done --- nes/nc_projections/default_nes.py | 443 ++++++++++--------- tests/4.3-test_write_time_step.py | 149 +++++++ tutorials/6.Others/6.4.WriteByTimeStep.ipynb | 158 +++++++ 3 files changed, 550 insertions(+), 200 deletions(-) create mode 100644 tests/4.3-test_write_time_step.py create mode 100644 tutorials/6.Others/6.4.WriteByTimeStep.ipynb diff --git a/nes/nc_projections/default_nes.py b/nes/nc_projections/default_nes.py index 060bb6a..0e4de4a 100644 --- a/nes/nc_projections/default_nes.py +++ b/nes/nc_projections/default_nes.py @@ -153,6 +153,7 @@ class Nes(object): # Define parallel method self.parallel_method = parallel_method + self.serial_nc = None # Place to store temporally the serial Nes instance # Get minor and major axes of Earth self.earth_radius = self.get_earth_radius('WGS84') @@ -503,6 +504,22 @@ class Nes(object): return None + def set_time(self, time_list): + """ + Modify the original level values with new ones. + + Parameters + ---------- + time_list : List[datetime] + List of time steps + """ + if self.parallel_method == 'T': + raise TypeError("Cannot set time on a 'T' parallel method") + self._time = deepcopy(time_list) + self.time = deepcopy(time_list) + + return None + def set_time_bnds(self, time_bnds): """ Modify the original time bounds values with new ones. @@ -1401,7 +1418,10 @@ class Nes(object): """ Close the NetCDF with netcdf4-python. """ - + if (hasattr(self, 'serial_nc')) and (self.serial_nc is not None): + if self.master: + self.serial_nc.close() + self.serial_nc = None if (hasattr(self, 'netcdf')) and (self.netcdf is not None): self.netcdf.close() self.netcdf = None @@ -2250,161 +2270,176 @@ class Nes(object): """ for i, (var_name, var_dict) in enumerate(self.variables.items()): - if var_dict['data'] is not None: - if isinstance(var_dict['data'], int) and var_dict['data'] == 0: + if isinstance(var_dict['data'], int) and var_dict['data'] == 0: + var_dims = ('time', 'lev',) + self._var_dim + var_dtype = np.float32 + else: + # Get dimensions + if var_dict['data'] is None or len(var_dict['data'].shape) == 4: var_dims = ('time', 'lev',) + self._var_dim - var_dtype = np.float32 else: - # Get dimensions - if len(var_dict['data'].shape) == 4: - var_dims = ('time', 'lev',) + self._var_dim - else: - var_dims = self._var_dim - - # Get data type - if 'dtype' in var_dict.keys(): - var_dtype = var_dict['dtype'] - if var_dtype != var_dict['data'].dtype: - msg = "WARNING!!! " - msg += "Different data types for variable {0}. ".format(var_name) - msg += "Input dtype={0}. Data dtype={1}.".format(var_dtype, var_dict['data'].dtype) - warnings.warn(msg) - try: - var_dict['data'] = var_dict['data'].astype(var_dtype) - except Exception as e: # TODO: Detect exception - print(e) - raise TypeError("It was not possible to cast the data to the input dtype.") - else: - var_dtype = var_dict['data'].dtype - # Transform objects into strings - if var_dtype == np.dtype(object): - var_dict['data'] = var_dict['data'].astype(str) - var_dtype = var_dict['data'].dtype - - # Convert list of strings to chars for parallelization - if not np.issubdtype(var_dict['data'].dtype, np.number): + var_dims = self._var_dim + + # Get data type + if 'dtype' in var_dict.keys(): + var_dtype = var_dict['dtype'] + if var_dict['data'] is not None and var_dtype != var_dict['data'].dtype: + msg = "WARNING!!! " + msg += "Different data types for variable {0}. ".format(var_name) + msg += "Input dtype={0}. Data dtype={1}.".format(var_dtype, var_dict['data'].dtype) + warnings.warn(msg) try: - # Get unicode - unicode_type = len(max(var_dict['data'].flatten(), key=len)) - - if ((var_dict['data'].dtype == np.dtype(' 0, complevel=self.zip_lvl) + else: + if self.balanced: + raise NotImplementedError("A balanced data cannot be chunked.") + if self.master: + chunk_size = var_dict['data'].shape + else: + chunk_size = None + chunk_size = self.comm.bcast(chunk_size, root=0) + var = netcdf.createVariable(var_name, var_dtype, var_dims, + zlib=self.zip_lvl > 0, complevel=self.zip_lvl, + chunksizes=chunk_size) + if self.info: + print("Rank {0:03d}: Var {1} created ({2}/{3})".format( + self.rank, var_name, i + 1, len(self.variables))) + if self.size > 1: + var.set_collective(True) if self.info: - print("Rank {0:03d}: Writing {1} var ({2}/{3})".format( + print("Rank {0:03d}: Var {1} collective ({2}/{3})".format( self.rank, var_name, i + 1, len(self.variables))) - try: - if not chunking: - var = netcdf.createVariable(var_name, var_dtype, var_dims, - zlib=self.zip_lvl > 0, complevel=self.zip_lvl) - else: - if self.balanced: - raise NotImplementedError("A balanced data cannot be chunked.") - if self.master: - chunk_size = var_dict['data'].shape - else: - chunk_size = None - chunk_size = self.comm.bcast(chunk_size, root=0) - var = netcdf.createVariable(var_name, var_dtype, var_dims, - zlib=self.zip_lvl > 0, complevel=self.zip_lvl, - chunksizes=chunk_size) - if self.info: - print("Rank {0:03d}: Var {1} created ({2}/{3})".format( - self.rank, var_name, i + 1, len(self.variables))) - if self.size > 1: - var.set_collective(True) + + for att_name, att_value in var_dict.items(): + if att_name == 'data': + if att_value is not None: + if self.info: + print("Rank {0:03d}: Filling {1})".format(self.rank, var_name)) + if isinstance(att_value, int) and att_value == 0: + var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], + self.write_axis_limits['z_min']:self.write_axis_limits['z_max'], + self.write_axis_limits['y_min']:self.write_axis_limits['y_max'], + self.write_axis_limits['x_min']:self.write_axis_limits['x_max']] = 0 + elif len(att_value.shape) == 4: + var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], + self.write_axis_limits['z_min']:self.write_axis_limits['z_max'], + self.write_axis_limits['y_min']:self.write_axis_limits['y_max'], + self.write_axis_limits['x_min']:self.write_axis_limits['x_max']] = att_value + + elif len(att_value.shape) == 3: + if 'strlen' in var_dims: + var[self.write_axis_limits['y_min']:self.write_axis_limits['y_max'], + self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], + :] = att_value + else: + raise NotImplementedError('It is not possible to write 3D variables.') if self.info: - print("Rank {0:03d}: Var {1} collective ({2}/{3})".format( + print("Rank {0:03d}: Var {1} data ({2}/{3})".format( self.rank, var_name, i + 1, len(self.variables))) + elif att_name not in ['chunk_size', 'var_dims', 'dimensions', 'dtype']: + var.setncattr(att_name, att_value) + + self._set_var_crs(var) + if self.info: + print("Rank {0:03d}: Var {1} completed ({2}/{3})".format( + self.rank, var_name, i + 1, len(self.variables))) + return None - for att_name, att_value in var_dict.items(): - if att_name == 'data': + def append_time_step_data(self, i_time): + """ + Fill the netCDF data for the indicated index time. + + Parameters + ---------- + i_time : int + index of the time step to write + """ + if self.serial_nc is not None: + try: + data = self._gather_data(self.variables) + except KeyError: + data = self.__gather_data_py_object(self.variables) + if self.master: + self.serial_nc.variables = data + self.serial_nc.append_time_step_data(i_time) + self.comm.Barrier() + else: + for i, (var_name, var_dict) in enumerate(self.variables.items()): + for att_name, att_value in var_dict.items(): + if att_name == 'data': + if att_value is not None: if self.info: print("Rank {0:03d}: Filling {1})".format(self.rank, var_name)) + var = self.netcdf.variables[var_name] if isinstance(att_value, int) and att_value == 0: - var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], + var[i_time, self.write_axis_limits['z_min']:self.write_axis_limits['z_max'], self.write_axis_limits['y_min']:self.write_axis_limits['y_max'], self.write_axis_limits['x_min']:self.write_axis_limits['x_max']] = 0 elif len(att_value.shape) == 4: - try: - var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], - self.write_axis_limits['z_min']:self.write_axis_limits['z_max'], - self.write_axis_limits['y_min']:self.write_axis_limits['y_max'], - self.write_axis_limits['x_min']:self.write_axis_limits['x_max']] = att_value - except ValueError as e: - print(var) - print(att_value) - raise e - # var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], - # 0, - # self.write_axis_limits['y_min']:self.write_axis_limits['y_max'], - # self.write_axis_limits['x_min']:self.write_axis_limits['x_max']] = att_value - - except IndexError: - raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( - var[self.write_axis_limits['t_min']:self.write_axis_limits['t_max'], - self.write_axis_limits['z_min']:self.write_axis_limits['z_max'], - self.write_axis_limits['y_min']:self.write_axis_limits['y_max'], - self.write_axis_limits['x_min']:self.write_axis_limits['x_max']].shape, - att_value.shape)) + var[i_time, + self.write_axis_limits['z_min']:self.write_axis_limits['z_max'], + self.write_axis_limits['y_min']:self.write_axis_limits['y_max'], + self.write_axis_limits['x_min']:self.write_axis_limits['x_max']] = att_value + elif len(att_value.shape) == 3: - if 'strlen' in var_dims: - try: - var[self.write_axis_limits['y_min']:self.write_axis_limits['y_max'], - self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], - :] = att_value - except IndexError: - raise IndexError("Different shapes. out_shape={0}, data_shp={1}".format( - var[self.write_axis_limits['y_min']:self.write_axis_limits['y_max'], - self.write_axis_limits['x_min']:self.write_axis_limits['x_max'], - :].shape, - att_value.shape)) - else: - raise NotImplementedError('It is not possible to write 3D variables.') + raise NotImplementedError('It is not possible to write 3D variables.') if self.info: print("Rank {0:03d}: Var {1} data ({2}/{3})".format( self.rank, var_name, i + 1, len(self.variables))) - elif att_name not in ['chunk_size', 'var_dims', 'dimensions']: - var.setncattr(att_name, att_value) - - self._set_var_crs(var) - if self.info: - print("Rank {0:03d}: Var {1} completed ({2}/{3})".format( - self.rank, var_name, i + 1, len(self.variables))) - except Exception as e: - print("**ERROR** an error has occurred while writing the '{0}' variable".format(var_name)) - # print("**ERROR** an error has occurredred while writing the '{0}' variable".format(var_name), - # file=sys.stderr) - raise e - else: - msg = 'WARNING!!! ' - msg += 'Variable {0} was not loaded. It will not be written.'.format(var_name) - warnings.warn(msg) + else: + # Metadata already writen + pass return None @@ -2444,7 +2479,7 @@ class Nes(object): return None - def __to_netcdf_py(self, path, chunking=False): + def __to_netcdf_py(self, path, chunking=False, keep_open=False): """ Create the NetCDF using netcdf4-python methods. @@ -2491,15 +2526,18 @@ class Nes(object): netcdf.setncattr(att_name, att_value) netcdf.setncattr('Conventions', 'CF-1.7') - netcdf.close() + if keep_open: + self.netcdf = netcdf + else: + netcdf.close() return None def __to_netcdf_cams_ra(self, path): return to_netcdf_cams_ra(self, path) - def to_netcdf(self, path, compression_level=0, serial=False, info=False, - chunking=False, type='NES'): + def to_netcdf(self, path, compression_level=0, serial=False, info=False, chunking=False, type='NES', + keep_open=False): """ Write the netCDF output file. @@ -2521,7 +2559,7 @@ class Nes(object): nc_type = type old_info = self.info self.info = info - + self.serial_nc = None self.zip_lvl = compression_level if self.is_xarray: raise NotImplementedError("Writing with xarray not implemented") @@ -2542,15 +2580,18 @@ class Nes(object): new_nc.variables = data new_nc.cell_measures = c_measures if type == 'NES': - new_nc.__to_netcdf_py(path) + new_nc.__to_netcdf_py(path, keep_open=keep_open) elif type == 'CAMS_RA': new_nc.__to_netcdf_cams_ra(path) else: raise ValueError( "Unknown NetCDF type '{0}'. Use 'CAMS_RA' or 'NES'; default='NES'".format(nc_type)) + self.serial_nc = new_nc + else: + self.serial_nc = True else: if nc_type == 'NES': - self.__to_netcdf_py(path, chunking=chunking) + self.__to_netcdf_py(path, chunking=chunking, keep_open=keep_open) elif nc_type == 'CAMS_RA': self.__to_netcdf_cams_ra(path) else: @@ -3099,66 +3140,68 @@ class Nes(object): for var_name in data_list.keys(): if self.info and self.master: print("Gathering {0}".format(var_name)) - shp_len = len(data_list[var_name]['data'].shape) - # Collect local array sizes using the gather communication pattern - rank_shapes = np.array(self.comm.gather(data_list[var_name]['data'].shape, root=0)) - sendbuf = data_list[var_name]['data'].flatten() - sendcounts = np.array(self.comm.gather(len(sendbuf), root=0)) - if self.master: - # recvbuf = np.empty(sum(sendcounts), dtype=type(sendbuf[0])) - recvbuf = np.empty(sum(sendcounts), dtype=type(sendbuf.max())) - else: - recvbuf = None - self.comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=0) - if self.master: - recvbuf = np.split(recvbuf, np.cumsum(sendcounts)) - # TODO ask - # I don't understand why it is giving one more split - if len(recvbuf) > len(sendcounts): - recvbuf = recvbuf[:-1] - for i, shape in enumerate(rank_shapes): - recvbuf[i] = recvbuf[i].reshape(shape) - add_dimension = False # to Add a dimension - if self.parallel_method == 'Y': - if shp_len == 2: - # if is a 2D concatenate over first axis - axis = 0 - elif shp_len == 3: - # if is a 3D concatenate over second axis - axis = 1 - else: - # if is a 4D concatenate over third axis - axis = 2 - elif self.parallel_method == 'X': - if shp_len == 2: - # if is a 2D concatenate over second axis - axis = 1 - elif shp_len == 3: - # if is a 3D concatenate over third axis - axis = 2 + if data_list[var_name]['data'] is not None: + shp_len = len(data_list[var_name]['data'].shape) + # Collect local array sizes using the gather communication pattern + rank_shapes = np.array(self.comm.gather(data_list[var_name]['data'].shape, root=0)) + sendbuf = data_list[var_name]['data'].flatten() + sendcounts = np.array(self.comm.gather(len(sendbuf), root=0)) + if self.master: + # recvbuf = np.empty(sum(sendcounts), dtype=type(sendbuf[0])) + recvbuf = np.empty(sum(sendcounts), dtype=type(sendbuf.max())) + else: + recvbuf = None + self.comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=0) + if self.master: + recvbuf = np.split(recvbuf, np.cumsum(sendcounts)) + # TODO ask + # I don't understand why it is giving one more split + if len(recvbuf) > len(sendcounts): + recvbuf = recvbuf[:-1] + for i, shape in enumerate(rank_shapes): + recvbuf[i] = recvbuf[i].reshape(shape) + add_dimension = False # to Add a dimension + if self.parallel_method == 'Y': + if shp_len == 2: + # if is a 2D concatenate over first axis + axis = 0 + elif shp_len == 3: + # if is a 3D concatenate over second axis + axis = 1 + else: + # if is a 4D concatenate over third axis + axis = 2 + elif self.parallel_method == 'X': + if shp_len == 2: + # if is a 2D concatenate over second axis + axis = 1 + elif shp_len == 3: + # if is a 3D concatenate over third axis + axis = 2 + else: + # if is a 4D concatenate over forth axis + axis = 3 + elif self.parallel_method == 'T': + if shp_len == 2: + # if is a 2D add dimension + add_dimension = True + axis = None # Not used + elif shp_len == 3: + # if is a 3D concatenate over first axis + axis = 0 + else: + # if is a 4D concatenate over second axis + axis = 0 else: - # if is a 4D concatenate over forth axis - axis = 3 - elif self.parallel_method == 'T': - if shp_len == 2: - # if is a 2D add dimension - add_dimension = True - axis = None # Not used - elif shp_len == 3: - # if is a 3D concatenate over first axis - axis = 0 + raise NotImplementedError( + "Parallel method '{meth}' is not implemented. Use one of these: {accept}".format( + meth=self.parallel_method, accept=['X', 'Y', 'T'])) + if add_dimension: + data_list[var_name]['data'] = np.stack(recvbuf) else: - # if is a 4D concatenate over second axis - axis = 0 - else: - raise NotImplementedError( - "Parallel method '{meth}' is not implemented. Use one of these: {accept}".format( - meth=self.parallel_method, accept=['X', 'Y', 'T'])) - if add_dimension: - data_list[var_name]['data'] = np.stack(recvbuf) - else: - data_list[var_name]['data'] = np.concatenate(recvbuf, axis=axis) - + data_list[var_name]['data'] = np.concatenate(recvbuf, axis=axis) + else: + data_list[var_name]['data'] = None return data_list # ================================================================================================================== diff --git a/tests/4.3-test_write_time_step.py b/tests/4.3-test_write_time_step.py new file mode 100644 index 0000000..45dd2b4 --- /dev/null +++ b/tests/4.3-test_write_time_step.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python + +import sys +from mpi4py import MPI +import pandas as pd +import timeit +from datetime import datetime, timedelta +import numpy as np +from nes import * + +comm = MPI.COMM_WORLD +rank = comm.Get_rank() +size = comm.Get_size() + +parallel_method = 'Y' + +result_path = "Times_test_4.3_write_time_step_{0}_{1:03d}.csv".format(parallel_method, size) +result = pd.DataFrame(index=['read', 'calculate', 'write'], + columns=['4.3.1.ParallelWrite', '4.3.2.SerialWrite']) + +# ====================================================================================================================== +# =================================== PARALLEL WRITE =================================================== +# ====================================================================================================================== + +test_name = '4.3.1.ParallelWrite' + +if rank == 0: + print(test_name) + +st_time = timeit.default_timer() +# CREATE GRID +centre_lat = 51 +centre_lon = 10 +west_boundary = -35 +south_boundary = -27 +inc_rlat = 0.2 +inc_rlon = 0.2 +nessy = create_nes(comm=None, info=False, projection='rotated', + centre_lat=centre_lat, centre_lon=centre_lon, + west_boundary=west_boundary, south_boundary=south_boundary, + inc_rlat=inc_rlat, inc_rlon=inc_rlon) + +# ADD VARIABLES +nessy.variables = {'var1': {'data': None, 'units': 'kg.s-1', 'dtype': np.float32}, + 'var2': {'data': None, 'units': 'kg.s-1', 'dtype': np.float32}} +time_list = [datetime(year=2023, month=1, day=1) + timedelta(hours=x) for x in range(24)] +nessy.set_time(time_list) + +comm.Barrier() +result.loc['read', test_name] = timeit.default_timer() - st_time + +# CREATE +st_time = timeit.default_timer() +nessy.to_netcdf(test_name + '.nc', keep_open=True, info=False) + +comm.Barrier() +result.loc['write', test_name] = timeit.default_timer() - st_time + +# CALCUL & APPEND +result.loc['calcul', test_name] = 0 + +for i_time, time_aux in enumerate(time_list): + # CALCUL + st_time = timeit.default_timer() + + nessy.variables['var1']['data'] = np.ones((1, 1, nessy.lat['data'].shape[0], nessy.lon['data'].shape[-1])) * i_time + nessy.variables['var2']['data'] = np.ones((1, 1, nessy.lat['data'].shape[0], nessy.lon['data'].shape[-1])) * i_time + + comm.Barrier() + result.loc['calcul', test_name] += timeit.default_timer() - st_time + # APPEND + st_time = timeit.default_timer() + nessy.append_time_step_data(i_time) + comm.Barrier() + if i_time == len(time_list) - 1: + nessy.close() + result.loc['write', test_name] = timeit.default_timer() - st_time + +comm.Barrier() +if rank == 0: + print(result.loc[:, test_name]) +sys.stdout.flush() + +# ====================================================================================================================== +# =================================== SERIAL WRITE =================================================== +# ====================================================================================================================== + +test_name = '4.3.2.SerialWrite' + +if rank == 0: + print(test_name) + +st_time = timeit.default_timer() +# CREATE GRID +centre_lat = 51 +centre_lon = 10 +west_boundary = -35 +south_boundary = -27 +inc_rlat = 0.2 +inc_rlon = 0.2 +nessy = create_nes(comm=None, info=False, projection='rotated', + centre_lat=centre_lat, centre_lon=centre_lon, + west_boundary=west_boundary, south_boundary=south_boundary, + inc_rlat=inc_rlat, inc_rlon=inc_rlon) + +# ADD VARIABLES +nessy.variables = {'var1': {'data': None, 'units': 'kg.s-1', 'dtype': np.float32}, + 'var2': {'data': None, 'units': 'kg.s-1', 'dtype': np.float32}} +time_list = [datetime(year=2023, month=1, day=1) + timedelta(hours=x) for x in range(24)] +nessy.set_time(time_list) + +comm.Barrier() +result.loc['read', test_name] = timeit.default_timer() - st_time + +# CREATE +st_time = timeit.default_timer() +nessy.to_netcdf(test_name + '.nc', keep_open=True, info=False, serial=True) + +comm.Barrier() +result.loc['write', test_name] = timeit.default_timer() - st_time + +# CALCUL & APPEND +result.loc['calcul', test_name] = 0 + +for i_time, time_aux in enumerate(time_list): + # CALCUL + st_time = timeit.default_timer() + + nessy.variables['var1']['data'] = np.ones((1, 1, nessy.lat['data'].shape[0], nessy.lon['data'].shape[-1])) * i_time + nessy.variables['var2']['data'] = np.ones((1, 1, nessy.lat['data'].shape[0], nessy.lon['data'].shape[-1])) * i_time + + comm.Barrier() + result.loc['calcul', test_name] += timeit.default_timer() - st_time + # APPEND + st_time = timeit.default_timer() + nessy.append_time_step_data(i_time) + comm.Barrier() + if i_time == len(time_list) - 1: + nessy.close() + result.loc['write', test_name] = timeit.default_timer() - st_time + +comm.Barrier() +if rank == 0: + print(result.loc[:, test_name]) +sys.stdout.flush() + +if rank == 0: + result.to_csv(result_path) + print("TEST PASSED SUCCESSFULLY!!!!!") diff --git a/tutorials/6.Others/6.4.WriteByTimeStep.ipynb b/tutorials/6.Others/6.4.WriteByTimeStep.ipynb new file mode 100644 index 0000000..7f3ea27 --- /dev/null +++ b/tutorials/6.Others/6.4.WriteByTimeStep.ipynb @@ -0,0 +1,158 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from nes import *\n", + "import numpy as np\n", + "from datetime import datetime, timedelta" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "centre_lat = 51\n", + "centre_lon = 10\n", + "west_boundary = -35\n", + "south_boundary = -27\n", + "inc_rlat = 0.2\n", + "inc_rlon = 0.2\n", + "rotated_grid = create_nes(comm=None, info=False, projection='rotated',\n", + " centre_lat=centre_lat, centre_lon=centre_lon,\n", + " west_boundary=west_boundary, south_boundary=south_boundary,\n", + " inc_rlat=inc_rlat, inc_rlon=inc_rlon)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "rotated_grid.variables = {'var1': {'data': None, 'units': 'kg.s-1', 'dtype': np.float32},\n", + " 'var2': {'data': None, 'units': 'kg.s-1', 'dtype': np.float32}}" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[datetime.datetime(1996, 12, 31, 0, 0)]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "rotated_grid.time" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "time_list = [datetime(year=2023, month=1, day=1) + timedelta(hours=x) for x in range (2)]" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "rotated_grid.set_time(time_list)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "rotated_grid.to_netcdf('rotated.nc', keep_open=True, info=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "rotated_grid.variables['var1']['data'] = np.ones((1, 1, rotated_grid.lat['data'].shape[0], rotated_grid.lon['data'].shape[-1]))\n", + "rotated_grid.variables['var2']['data'] = np.ones((1, 1, rotated_grid.lat['data'].shape[0], rotated_grid.lon['data'].shape[-1]))" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "rotated_grid.append_time_step_data(0)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "rotated_grid.variables['var1']['data'] = np.ones((1, 1, rotated_grid.lat['data'].shape[0], rotated_grid.lon['data'].shape[-1])) *2\n", + "rotated_grid.variables['var2']['data'] = np.ones((1, 1, rotated_grid.lat['data'].shape[0], rotated_grid.lon['data'].shape[-1])) *2\n", + "\n", + "rotated_grid.append_time_step_data(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "rotated_grid.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.4" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} -- GitLab From a3d752c2a68c4f75a6b11bcd445f308c144bd55e Mon Sep 17 00:00:00 2001 From: Alba Vilanova Date: Fri, 17 Mar 2023 10:14:45 +0100 Subject: [PATCH 2/2] Update CHANGELOG, test and tutorial --- CHANGELOG.md | 1 + ...ime_step.py => 4.3-test_write_timestep.py} | 24 +++--- tests/run_scalability_tests_nord3v2.sh | 2 +- tests/test_bash_mn4.cmd | 1 + tests/test_bash_nord3v2.cmd | 1 + tutorials/6.Others/6.2.Selecting.ipynb | 18 ++-- ...Step.ipynb => 6.4.Write_By_Timestep.ipynb} | 84 ++++++++++++++++--- 7 files changed, 99 insertions(+), 32 deletions(-) rename tests/{4.3-test_write_time_step.py => 4.3-test_write_timestep.py} (90%) rename tutorials/6.Others/{6.4.WriteByTimeStep.ipynb => 6.4.Write_By_Timestep.ipynb} (77%) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc07441..7ef3c82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Improved time on **concatenate_netcdfs** function ([#55](https://earth.bsc.es/gitlab/es/NES/-/issues/55)) * Sum of Nes objects ([#48](https://earth.bsc.es/gitlab/es/NES/-/issues/48)) * Write 2D string data to save variables from shapefiles after doing a spatial join ([#49](https://earth.bsc.es/gitlab/es/NES/-/issues/49)) + * Write by time step to avoid memory issues ([#57](https://earth.bsc.es/gitlab/es/NES/-/issues/57)) * Bugs fixing: * Bug on `cell_methods` serial write ([#53](https://earth.bsc.es/gitlab/es/NES/-/issues/53)) * Horizontal Interpolation Conservative: Improvement on memory usage when calculating the weight matrix ([#54](https://earth.bsc.es/gitlab/es/NES/-/issues/54)) diff --git a/tests/4.3-test_write_time_step.py b/tests/4.3-test_write_timestep.py similarity index 90% rename from tests/4.3-test_write_time_step.py rename to tests/4.3-test_write_timestep.py index 45dd2b4..77539e7 100644 --- a/tests/4.3-test_write_time_step.py +++ b/tests/4.3-test_write_timestep.py @@ -16,13 +16,13 @@ parallel_method = 'Y' result_path = "Times_test_4.3_write_time_step_{0}_{1:03d}.csv".format(parallel_method, size) result = pd.DataFrame(index=['read', 'calculate', 'write'], - columns=['4.3.1.ParallelWrite', '4.3.2.SerialWrite']) + columns=['4.3.1.Parallel_Write', '4.3.2.Serial_Write']) # ====================================================================================================================== # =================================== PARALLEL WRITE =================================================== # ====================================================================================================================== -test_name = '4.3.1.ParallelWrite' +test_name = '4.3.1.Parallel_Write' if rank == 0: print(test_name) @@ -56,18 +56,19 @@ nessy.to_netcdf(test_name + '.nc', keep_open=True, info=False) comm.Barrier() result.loc['write', test_name] = timeit.default_timer() - st_time -# CALCUL & APPEND -result.loc['calcul', test_name] = 0 +# CALCULATE & APPEND +result.loc['calculate', test_name] = 0 for i_time, time_aux in enumerate(time_list): - # CALCUL + # CALCULATE st_time = timeit.default_timer() nessy.variables['var1']['data'] = np.ones((1, 1, nessy.lat['data'].shape[0], nessy.lon['data'].shape[-1])) * i_time nessy.variables['var2']['data'] = np.ones((1, 1, nessy.lat['data'].shape[0], nessy.lon['data'].shape[-1])) * i_time comm.Barrier() - result.loc['calcul', test_name] += timeit.default_timer() - st_time + result.loc['calculate', test_name] += timeit.default_timer() - st_time + # APPEND st_time = timeit.default_timer() nessy.append_time_step_data(i_time) @@ -85,7 +86,7 @@ sys.stdout.flush() # =================================== SERIAL WRITE =================================================== # ====================================================================================================================== -test_name = '4.3.2.SerialWrite' +test_name = '4.3.2.Serial_Write' if rank == 0: print(test_name) @@ -119,18 +120,19 @@ nessy.to_netcdf(test_name + '.nc', keep_open=True, info=False, serial=True) comm.Barrier() result.loc['write', test_name] = timeit.default_timer() - st_time -# CALCUL & APPEND -result.loc['calcul', test_name] = 0 +# CALCULATE & APPEND +result.loc['calculate', test_name] = 0 for i_time, time_aux in enumerate(time_list): - # CALCUL + # CALCULATEATE st_time = timeit.default_timer() nessy.variables['var1']['data'] = np.ones((1, 1, nessy.lat['data'].shape[0], nessy.lon['data'].shape[-1])) * i_time nessy.variables['var2']['data'] = np.ones((1, 1, nessy.lat['data'].shape[0], nessy.lon['data'].shape[-1])) * i_time comm.Barrier() - result.loc['calcul', test_name] += timeit.default_timer() - st_time + result.loc['calculate', test_name] += timeit.default_timer() - st_time + # APPEND st_time = timeit.default_timer() nessy.append_time_step_data(i_time) diff --git a/tests/run_scalability_tests_nord3v2.sh b/tests/run_scalability_tests_nord3v2.sh index c75e601..214fecd 100644 --- a/tests/run_scalability_tests_nord3v2.sh +++ b/tests/run_scalability_tests_nord3v2.sh @@ -8,7 +8,7 @@ module load Python/3.7.4-GCCcore-8.3.0 module load NES/1.0.0-nord3-v2-foss-2019b-Python-3.7.4 -for EXE in "1.1-test_read_write_projection.py" "1.2-test_create_projection.py" "1.3-test_selecting.py" "2.1-test_spatial_join.py" "2.2-test_create_shapefile.py" "2.3-test_bounds.py" "2.4-test_cell_area.py" "3.1-test_vertical_interp.py" "3.2-test_horiz_interp_bilinear.py" "3.3-test_horiz_interp_conservative.py" "4.1-test_daily_stats.py" "4.2-test_sum.py" +for EXE in "1.1-test_read_write_projection.py" "1.2-test_create_projection.py" "1.3-test_selecting.py" "2.1-test_spatial_join.py" "2.2-test_create_shapefile.py" "2.3-test_bounds.py" "2.4-test_cell_area.py" "3.1-test_vertical_interp.py" "3.2-test_horiz_interp_bilinear.py" "3.3-test_horiz_interp_conservative.py" "4.1-test_daily_stats.py" "4.2-test_sum.py" "4.3-test_write_timestep.py" do for nprocs in 1 2 4 8 16 do diff --git a/tests/test_bash_mn4.cmd b/tests/test_bash_mn4.cmd index 491076a..f7450c7 100644 --- a/tests/test_bash_mn4.cmd +++ b/tests/test_bash_mn4.cmd @@ -35,3 +35,4 @@ mpirun --mca mpi_warn_on_fork 0 -np 4 python 3.3-test_horiz_interp_conservative. mpirun --mca mpi_warn_on_fork 0 -np 4 python 4.1-test_daily_stats.py mpirun --mca mpi_warn_on_fork 0 -np 4 python 4.2-test_sum.py +mpirun --mca mpi_warn_on_fork 0 -np 4 python 4.3-test_write_timestep.py diff --git a/tests/test_bash_nord3v2.cmd b/tests/test_bash_nord3v2.cmd index 45a2e3d..1ef30c0 100644 --- a/tests/test_bash_nord3v2.cmd +++ b/tests/test_bash_nord3v2.cmd @@ -33,3 +33,4 @@ mpirun --mca mpi_warn_on_fork 0 -np 4 python 3.3-test_horiz_interp_conservative. mpirun --mca mpi_warn_on_fork 0 -np 4 python 4.1-test_daily_stats.py mpirun --mca mpi_warn_on_fork 0 -np 4 python 4.2-test_sum.py +mpirun --mca mpi_warn_on_fork 0 -np 4 python 4.3-test_write_timestep.py \ No newline at end of file diff --git a/tutorials/6.Others/6.2.Selecting.ipynb b/tutorials/6.Others/6.2.Selecting.ipynb index f4c28b4..7118c4d 100644 --- a/tutorials/6.Others/6.2.Selecting.ipynb +++ b/tutorials/6.Others/6.2.Selecting.ipynb @@ -31,7 +31,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Time" + "## 1. Select data by time" ] }, { @@ -170,7 +170,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Level" + "## 2. Select data by level" ] }, { @@ -216,7 +216,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Coordinates" + "## 3. Select data by coordinates" ] }, { @@ -256,20 +256,20 @@ ] }, { - "cell_type": "code", - "execution_count": 9, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "nessy.to_netcdf(\"test_sel.nc\")" + "## 4. Write dataset" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "nessy.to_netcdf(\"test_sel.nc\")" + ] } ], "metadata": { diff --git a/tutorials/6.Others/6.4.WriteByTimeStep.ipynb b/tutorials/6.Others/6.4.Write_By_Timestep.ipynb similarity index 77% rename from tutorials/6.Others/6.4.WriteByTimeStep.ipynb rename to tutorials/6.Others/6.4.Write_By_Timestep.ipynb index 7f3ea27..9080e0c 100644 --- a/tutorials/6.Others/6.4.WriteByTimeStep.ipynb +++ b/tutorials/6.Others/6.4.Write_By_Timestep.ipynb @@ -1,5 +1,12 @@ { "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Write by timestep (to avoid memory issues)" + ] + }, { "cell_type": "code", "execution_count": 1, @@ -11,6 +18,13 @@ "from datetime import datetime, timedelta" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Create grid" + ] + }, { "cell_type": "code", "execution_count": 2, @@ -29,6 +43,13 @@ " inc_rlat=inc_rlat, inc_rlon=inc_rlon)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Create variables" + ] + }, { "cell_type": "code", "execution_count": 3, @@ -39,6 +60,20 @@ " 'var2': {'data': None, 'units': 'kg.s-1', 'dtype': np.float32}}" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Change time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Old time" + ] + }, { "cell_type": "code", "execution_count": 4, @@ -59,6 +94,13 @@ "rotated_grid.time" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### New time" + ] + }, { "cell_type": "code", "execution_count": 5, @@ -81,6 +123,33 @@ "cell_type": "code", "execution_count": 7, "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[datetime.datetime(2023, 1, 1, 0, 0), datetime.datetime(2023, 1, 1, 1, 0)]" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "rotated_grid.time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Write dataset" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, "outputs": [], "source": [ "rotated_grid.to_netcdf('rotated.nc', keep_open=True, info=False)" @@ -88,7 +157,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 9, "metadata": {}, "outputs": [], "source": [ @@ -98,7 +167,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 10, "metadata": {}, "outputs": [], "source": [ @@ -107,7 +176,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 11, "metadata": {}, "outputs": [], "source": [ @@ -119,19 +188,12 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "rotated_grid.close()" ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { -- GitLab