From 934c2acc1da9bea8ad611cafaba695e054aef486 Mon Sep 17 00:00:00 2001 From: ctena Date: Tue, 14 Mar 2023 16:09:49 +0100 Subject: [PATCH 1/2] Avoiding Numpy MPI gather if KeyError is raised --- nes/nc_projections/default_nes.py | 149 +++++++++++++++--------------- 1 file changed, 74 insertions(+), 75 deletions(-) diff --git a/nes/nc_projections/default_nes.py b/nes/nc_projections/default_nes.py index 9585c79..afcc874 100644 --- a/nes/nc_projections/default_nes.py +++ b/nes/nc_projections/default_nes.py @@ -2223,6 +2223,7 @@ class Nes(object): for var_name in self.variables.keys(): self.variables[var_name]['cell_measures'] = 'area: cell_area' + return None def _create_variables(self, netcdf, chunking=False): """ @@ -2515,8 +2516,14 @@ class Nes(object): else: # if serial: if serial and self.size > 1: - data = self._gather_data(self.variables) - c_measures = self._gather_data(self.cell_measures) + try: + data = self._gather_data(self.variables) + except KeyError: + data = self.__gather_data_py_object(self.variables) + try: + c_measures = self._gather_data(self.cell_measures) + except KeyError: + c_measures = self.__gather_data_py_object(self.cell_measures) if self.master: new_nc = self.copy(copy_vars=False) new_nc.set_communicator(MPI.COMM_SELF) @@ -2659,8 +2666,14 @@ class Nes(object): # if serial: if self.parallel_method in ['X', 'Y'] and self.size > 1: - data = self._gather_data(self.variables) - c_measures = self._gather_data(self.cell_measures) + try: + data = self._gather_data(self.variables) + except KeyError: + data = self.__gather_data_py_object(self.variables) + try: + c_measures = self._gather_data(self.cell_measures) + except KeyError: + c_measures = self.__gather_data_py_object(self.cell_measures) if self.master: new_nc = self.copy(copy_vars=False) new_nc.set_communicator(MPI.COMM_SELF) @@ -3074,79 +3087,65 @@ class Nes(object): for var_name in data_list.keys(): if self.info and self.master: print("Gathering {0}".format(var_name)) - try: - shp_len = len(data_list[var_name]['data'].shape) - # Collect local array sizes using the gather communication pattern - rank_shapes = np.array(self.comm.gather(data_list[var_name]['data'].shape, root=0)) - sendbuf = data_list[var_name]['data'].flatten() - sendcounts = np.array(self.comm.gather(len(sendbuf), root=0)) - if self.master: - # recvbuf = np.empty(sum(sendcounts), dtype=type(sendbuf[0])) - recvbuf = np.empty(sum(sendcounts), dtype=type(sendbuf.max())) - else: - recvbuf = None - self.comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=0) - if self.master: - recvbuf = np.split(recvbuf, np.cumsum(sendcounts)) - # TODO ask - # I don't understand why it is giving one more split - if len(recvbuf) > len(sendcounts): - recvbuf = recvbuf[:-1] - for i, shape in enumerate(rank_shapes): - recvbuf[i] = recvbuf[i].reshape(shape) - add_dimension = False # to Add a dimension - if self.parallel_method == 'Y': - if shp_len == 2: - # if is a 2D concatenate over first axis - axis = 0 - elif shp_len == 3: - # if is a 3D concatenate over second axis - axis = 1 - else: - # if is a 4D concatenate over third axis - axis = 2 - elif self.parallel_method == 'X': - if shp_len == 2: - # if is a 2D concatenate over second axis - axis = 1 - elif shp_len == 3: - # if is a 3D concatenate over third axis - axis = 2 - else: - # if is a 4D concatenate over forth axis - axis = 3 - elif self.parallel_method == 'T': - if shp_len == 2: - # if is a 2D add dimension - add_dimension = True - axis = None # Not used - elif shp_len == 3: - # if is a 3D concatenate over first axis - axis = 0 - else: - # if is a 4D concatenate over second axis - axis = 0 + shp_len = len(data_list[var_name]['data'].shape) + # Collect local array sizes using the gather communication pattern + rank_shapes = np.array(self.comm.gather(data_list[var_name]['data'].shape, root=0)) + sendbuf = data_list[var_name]['data'].flatten() + sendcounts = np.array(self.comm.gather(len(sendbuf), root=0)) + if self.master: + # recvbuf = np.empty(sum(sendcounts), dtype=type(sendbuf[0])) + recvbuf = np.empty(sum(sendcounts), dtype=type(sendbuf.max())) + else: + recvbuf = None + self.comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=0) + if self.master: + recvbuf = np.split(recvbuf, np.cumsum(sendcounts)) + # TODO ask + # I don't understand why it is giving one more split + if len(recvbuf) > len(sendcounts): + recvbuf = recvbuf[:-1] + for i, shape in enumerate(rank_shapes): + recvbuf[i] = recvbuf[i].reshape(shape) + add_dimension = False # to Add a dimension + if self.parallel_method == 'Y': + if shp_len == 2: + # if is a 2D concatenate over first axis + axis = 0 + elif shp_len == 3: + # if is a 3D concatenate over second axis + axis = 1 else: - raise NotImplementedError( - "Parallel method '{meth}' is not implemented. Use one of these: {accept}".format( - meth=self.parallel_method, accept=['X', 'Y', 'T'])) - if add_dimension: - data_list[var_name]['data'] = np.stack(recvbuf) + # if is a 4D concatenate over third axis + axis = 2 + elif self.parallel_method == 'X': + if shp_len == 2: + # if is a 2D concatenate over second axis + axis = 1 + elif shp_len == 3: + # if is a 3D concatenate over third axis + axis = 2 else: - data_list[var_name]['data'] = np.concatenate(recvbuf, axis=axis) - - except AttributeError: - data_list[var_name]['data'] = 0 - except Exception as e: - print("**ERROR** an error has occurred while gathering the '{0}' variable.\n".format(var_name)) - sys.stderr.write("**ERROR** an error has occurred while gathering the '{0}' variable.\n".format( - var_name)) - print(e) - sys.stderr.write(str(e)) - # print(e, file=sys.stderr) - sys.stderr.flush() - self.comm.Abort(1) - raise e + # if is a 4D concatenate over forth axis + axis = 3 + elif self.parallel_method == 'T': + if shp_len == 2: + # if is a 2D add dimension + add_dimension = True + axis = None # Not used + elif shp_len == 3: + # if is a 3D concatenate over first axis + axis = 0 + else: + # if is a 4D concatenate over second axis + axis = 0 + else: + raise NotImplementedError( + "Parallel method '{meth}' is not implemented. Use one of these: {accept}".format( + meth=self.parallel_method, accept=['X', 'Y', 'T'])) + if add_dimension: + data_list[var_name]['data'] = np.stack(recvbuf) + else: + data_list[var_name]['data'] = np.concatenate(recvbuf, axis=axis) return data_list -- GitLab From 27718292fec4988e300d5bba81baa3e74b93264e Mon Sep 17 00:00:00 2001 From: ctena Date: Tue, 14 Mar 2023 16:25:41 +0100 Subject: [PATCH 2/2] Created set_strlen function to customize the maximum size of the string data --- nes/nc_projections/default_nes.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/nes/nc_projections/default_nes.py b/nes/nc_projections/default_nes.py index afcc874..010c006 100644 --- a/nes/nc_projections/default_nes.py +++ b/nes/nc_projections/default_nes.py @@ -297,6 +297,18 @@ class Nes(object): return new + def set_strlen(self, strlen=75): + """ + Set the strlen + + Parameters + ---------- + strlen : int + Max length of the string + """ + self.strlen = strlen + return None + def __del__(self): """ To delete the Nes object and close all the open datasets. -- GitLab