Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# coding=utf-8
"""Compute the potential density anomalies"""
import diagonals.density
import numpy as np
import iris
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import TempFile
from earthdiagnostics.diagnostic import Diagnostic
class Density(Diagnostic):
"""
Compute the total potential density anomaly
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
"""
alias = "density"
"Diagnostic alias for the configuration file"
def __init__(
self,
data_manager,
startdate,
member,
chunk,
):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
self.chunk = chunk
self.sigmas = [0, 2]
def __eq__(self, other):
if self._different_type(other):
return False
return (
self.startdate == other.startdate
and self.member == other.member
and self.chunk == other.chunk
)
def __str__(self):
return (
f"Density Startdate: {self.startdate} Member: {self.member} "
f"Chunk: {self.chunk}"
)
def __hash__(self):
return hash(str(self))
@classmethod
def generate_jobs(cls, diags, options):
"""
Create a job for each chunk to compute the diagnostic
:param diags: Diagnostics manager class
:type diags: Diags
:param options: This diagnostic does not require extra options
:type options: list[str]
:return:
"""
options_available = []
options = cls.process_options(options, options_available)
job_list = list()
for (
startdate,
member,
chunk,
) in diags.config.experiment.get_chunk_list():
job_list.append(
Density(
diags.data_manager,
startdate,
member,
chunk,
)
)
return job_list
def request_data(self):
"""Request data required by the diagnostic"""
self.bigthetao = self.request_chunk(
ModelingRealms.ocean,
"bigthetao",
self.startdate,
self.member,
self.chunk,
)
self.so = self.request_chunk(
ModelingRealms.ocean,
"so",
self.startdate,
self.member,
self.chunk,
)
def declare_data_generated(self):
"""Declare data to be generated by the diagnostic"""
self.sigma = {}
for sigma in self.sigmas:
self.sigma[sigma] = self.declare_chunk(
ModelingRealms.ocean,
f"sigma{sigma}",
self.startdate,
self.member,
self.chunk,
)
def compute(self):
"""Run the diagnostic"""
bigthetao = iris.load_cube(self.bigthetao.local_file)
so = iris.load_cube(self.so.local_file)
# Convert from practical to absolute
so = so / 0.99530670233846
for sigma in self.sigmas:
ref_pressure = sigma * 1000
sigma_values = []
for time in range(so.shape[0]):
sigma_values.append(diagonals.density.compute(
so[time, ...].data.astype(np.float32),
bigthetao.data[time, ...].astype(np.float32),
np.full(bigthetao.shape[1:],
ref_pressure, dtype=np.float32)
))
sigma_values = np.ma.masked_invalid(np.stack(sigma_values))
sigma_cube = bigthetao.copy(sigma_values)
sigma_cube.var_name = f'sigma{sigma}'
sigma_cube.standard_name = 'sea_water_sigma_theta'
sigma_cube.long_name = (
"potential density anomaly (potential density minus 1000 "
f"Kg/m3) with reference pressure of {ref_pressure} dbar"
)
sigma_cube.units = 'kg m-3'
temp = TempFile.get()
iris.save(sigma_cube, temp, zlib=True, fill_value=1e20)
del sigma_cube
del sigma_values
self.sigma[sigma].set_local_file(temp)