# coding=utf-8 """Compute the potential density anomalies""" import diagonals.density import numpy as np import iris from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import TempFile from earthdiagnostics.diagnostic import Diagnostic class Density(Diagnostic): """ Compute the total potential density anomaly :param data_manager: data management object :type data_manager: DataManager :param startdate: startdate :type startdate: str :param member: member number :type member: int :param chunk: chunk's number :type chunk: int """ alias = "density" "Diagnostic alias for the configuration file" def __init__( self, data_manager, startdate, member, chunk, ): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member self.chunk = chunk self.sigmas = [0, 2] def __eq__(self, other): if self._different_type(other): return False return ( self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk ) def __str__(self): return ( f"Density Startdate: {self.startdate} Member: {self.member} " f"Chunk: {self.chunk}" ) def __hash__(self): return hash(str(self)) @classmethod def generate_jobs(cls, diags, options): """ Create a job for each chunk to compute the diagnostic :param diags: Diagnostics manager class :type diags: Diags :param options: This diagnostic does not require extra options :type options: list[str] :return: """ options_available = [] options = cls.process_options(options, options_available) job_list = list() for ( startdate, member, chunk, ) in diags.config.experiment.get_chunk_list(): job_list.append( Density( diags.data_manager, startdate, member, chunk, ) ) return job_list def request_data(self): """Request data required by the diagnostic""" self.bigthetao = self.request_chunk( ModelingRealms.ocean, "bigthetao", self.startdate, self.member, self.chunk, ) self.so = self.request_chunk( ModelingRealms.ocean, "so", self.startdate, self.member, self.chunk, ) def declare_data_generated(self): """Declare data to be generated by the diagnostic""" self.sigma = {} for sigma in self.sigmas: self.sigma[sigma] = self.declare_chunk( ModelingRealms.ocean, f"sigma{sigma}", self.startdate, self.member, self.chunk, ) def compute(self): """Run the diagnostic""" bigthetao = iris.load_cube(self.bigthetao.local_file) so = iris.load_cube(self.so.local_file) # Convert from practical to absolute so = so / 0.99530670233846 for sigma in self.sigmas: ref_pressure = sigma * 1000 sigma_values = [] for time in range(so.shape[0]): sigma_values.append(diagonals.density.compute( so[time, ...].data.astype(np.float32), bigthetao.data[time, ...].astype(np.float32), np.full(bigthetao.shape[1:], ref_pressure, dtype=np.float32) )) sigma_values = np.stack(sigma_values) sigma_cube = bigthetao.copy(sigma_values) sigma_cube.var_name = f'sigma{sigma}' sigma_cube.standard_name = 'sea_water_sigma_theta' sigma_cube.long_name = ( "potential density anomaly (potential density minus 1000 " f"Kg/m3) with reference pressure of {ref_pressure} dbar" ) sigma_cube.units = 'kg m-3' temp = TempFile.get() iris.save(sigma_cube, temp, zlib=True) del sigma_cube del sigma_values self.sigma[sigma].set_local_file(temp)