diff --git a/mapies/tests/test_viirs.py b/mapies/tests/test_viirs.py index ce2e87fec0c911bd8c3996b6aede9df0264fcab8..0383225cad88ad80efa3f7eb87411d491b2204c0 100644 --- a/mapies/tests/test_viirs.py +++ b/mapies/tests/test_viirs.py @@ -17,6 +17,6 @@ def test_to_da(): c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="rotated") print(c.dest) c.read_nc() - c.process_data(monthly_avg=False) - outfiles = c.to_da(frequency="3H", save_nc= True, save_figs=False) + c.process_lazy_data(apply_qa=False, save=False) + outfiles = c.to_da(frequency="3H", save_nc=False, save_figs=False) assert outfiles == [Path("obs202401010000.nc")] \ No newline at end of file diff --git a/mapies/tropomi.py b/mapies/tropomi.py index 36091b0ad6050c55e196aa35096b05d70b7f3fed..2df0c03af7d1a21f7ac1f1d9461c73e5541f348d 100644 --- a/mapies/tropomi.py +++ b/mapies/tropomi.py @@ -396,7 +396,7 @@ class TROPOMI(MAPIES): print("Yearly average computation completed.") @timeit - def process_lazy_data(self, apply_qa=True, geospatial_crop: NDArray | None = None): + def process_lazy_data(self, apply_qa=True, geospatial_crop: NDArray | None = None, save=True): """ Process the data for the specified time range. It only retrieves a daily dictionary using dask lazy loading. """ @@ -464,8 +464,8 @@ class TROPOMI(MAPIES): continue # Get day start and day end but only if they are within object start and end date day_start, day_end = day_boundaries(day, self.start_date, self.end_date) - self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) - + if save: + self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) except Exception as e: print(f"Error processing data for day {day}: {e}") continue diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index e27f35d2d6bad79b50b3c23240d9e20a179875ac..a276de88a00ccfe1cafef63d0a980b766a39f5bc 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -215,9 +215,10 @@ def time_domain_selection( return (array >= start_date) & (array < end_date) else: return (array >= start_date) & (array <= end_date) - + time_mask = np.where(is_between_times(time_values))[0] + time_values = time_values[time_mask] lon = lon[time_mask] lat = lat[time_mask] diff --git a/mapies/viirs.py b/mapies/viirs.py index 55ea229a1161f626079b0569a6df4ec0c51e8c58..b21d1406059bd8c68a595d1e0b963634815851d8 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -26,6 +26,7 @@ from mapies.util.func_tools import * #timeit, get_file_list, time_converter, frequency_int, error_estimation, time_domain_selection, geo_to_rot from mapies.grids.monarch import RotatedGrid, IrregularRotatedGrid, RegularGrid, regridding_function from mapies.util.variables_names import VARS_ATTRIBUTES +from pathlib import Path import json from numpy.typing import DTypeLike, NDArray @@ -458,7 +459,7 @@ class VIIRS(MAPIES): print("Yearly average computation completed.") @timeit - def process_lazy_data(self, apply_qa=True, geospatial_crop: NDArray | None = None): + def process_lazy_data(self, apply_qa=True, geospatial_crop: NDArray | None = None, save=True): """ Process the data for the specified time range. It only retrieves a daily dictionary using dask lazy loading. """ @@ -525,10 +526,16 @@ class VIIRS(MAPIES): print(f"All observation values for day {day} are 0 or NaN. Skipping...") continue + # Update day dict + self.daily_dict[day]["obs"] = final_obs + self.daily_dict[day]["lon"] = final_lon + self.daily_dict[day]["lat"] = final_lat + self.daily_dict[day]["time"] = final_time # Get day start and day end but only if they are within object start and end date day_start, day_end = day_boundaries(day, self.start_date, self.end_date) - self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) + if save: + self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) except Exception as e: print(f"Error processing data for day {day}: {e}") @@ -594,7 +601,7 @@ class VIIRS(MAPIES): """ # Calculate error - self.obserr = error_estimation(self.datatype, self.obs, self.unc_const) + outfiles = [] @@ -608,21 +615,40 @@ class VIIRS(MAPIES): for date in da_dates_slice: l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=self.frequency/2)).strftime('%Y%m%d%H%M') r_border = (datetime.strptime(date, "%Y%m%d%H%M") + timedelta(hours=self.frequency/2)).strftime('%Y%m%d%H%M') - # Included by Guillaume in his DA function so I believe he needs this to be true + filename = Path(self.dest).joinpath(f'obs{date}.nc') - # Cut down the time interval again then reindex obs, obserr, lat and lon - # Returning also arrays for type, lev, id and n with the same index as the time var - time_values, frequency_index = time_domain_selection( - self.time_values, - l_border, - r_border, - ) + + # concat all days that we need + obs = np.array([]) + lon = np.array([]) + lat = np.array([]) + time_values = np.array([], dtype='datetime64') + days = pd.date_range(l_border, r_border, freq="D") + for pandas_day in days: + day = pandas_day.day + try: + day_obs = self.daily_dict[day]["obs"] + day_lon = self.daily_dict[day]["lon"] + day_lat = self.daily_dict[day]["lat"] + day_time = self.daily_dict[day]["time"] + except KeyError: + print("No data for day selected") + continue + obs = np.append(obs, day_obs) + lon = np.append(lon, day_lon) + lat = np.append(lat, day_lat) + time_values = np.append(time_values, day_time) + + # Perform a further time cut + try: + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) + except TypeError: + print("No data, skipping") + continue + # Calculate obserr + obserr = error_estimation(self.datatype, obs, self.unc_const) + - # Reindex - obs = self.reindex(frequency_index, self.obs) - lon = self.reindex(frequency_index, self.lon_values) - lat = self.reindex(frequency_index, self.lat_values) - obserr = self.reindex(frequency_index, self.obserr) # Run aggregation with grid representation try: if self.grid_repr == "rotated": @@ -672,6 +698,7 @@ class VIIRS(MAPIES): ds = self.to_xarray(coords=coords, data_vars=data_vars) if save_nc: #logger.info(f"Outputting da data with {filename}") + print(f"Outputting da data with {filename}") ds.to_netcdf(filename, encoding={}) outfiles.append(filename) diff --git a/run/run_viirs.py b/run/run_viirs.py index f8360fec5de38b6fe5c773fe220fd65bfe15c659..bb7055fa0a93a750f5896e0d7bed136524c2c807 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -21,11 +21,13 @@ if __name__ == "__main__": c.read_nc() - #c.process_data(monthly_avg=False, batch_size = 50, apply_qa=True, save=True) + c.process_data(monthly_avg=False, batch_size = 50, apply_qa=True, save=True) # c.yearly_average() - #c.plot_2D_observations(months=[0], outdir=outdir) - #c.plot_2D_num_obs(months=[0], outdir=outdir) - c.process_lazy_data(apply_qa=True) + c.plot_2D_observations(months=[0], outdir=outdir) + c.plot_2D_num_obs(months=[0], outdir=outdir) + c.process_lazy_data(apply_qa=True, save=False) + c.to_da(frequency="3H", save_figs=True) + end_time = time.time() diff --git a/setup.py b/setup.py index 054f077e4601482e49ce6c44afd8c0fdddfdcf32..53852fb6a5e8e0675489de886877933720783c49 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ from setuptools import find_packages from setuptools import setup # Could update this using versioneer -version="0.0.10" +version="0.0.11" setup( name="mapies",