From da7b3373f219b2593fd49d65ff44a785d020cfc9 Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Wed, 19 Feb 2025 16:57:23 +0100 Subject: [PATCH 1/2] Adding process lazy data to tropomi --- mapies/tropomi.py | 190 +++++++++++++++++++++++++++++++++ mapies/util/variables_names.py | 3 +- mapies/viirs.py | 10 +- run/run_tropomi.py | 1 + run/run_viirs.py | 2 +- 5 files changed, 198 insertions(+), 8 deletions(-) diff --git a/mapies/tropomi.py b/mapies/tropomi.py index 59f2d93d..5cf081eb 100644 --- a/mapies/tropomi.py +++ b/mapies/tropomi.py @@ -17,8 +17,17 @@ import os from mapies.util.func_tools import * #timeit, get_file_list, time_converter, frequency_int, error_estimation, time_domain_selection, geo_to_rot from mapies.grids.monarch import RotatedGrid, IrregularRotatedGrid, RegularGrid, regridding_function +from mapies.util.variables_names import VARS_ATTRIBUTES +import json from numpy.typing import DTypeLike, NDArray +os.environ["OMP_NUM_THREADS"] = "8" + +#logger = logging.getLogger(__name__) +#logging.basicConfig(filename='../logs/mapies.log', level=logging.INFO) + +TIME_ORIGIN = np.datetime64("1900-01-01T00:00:00") + def process_single_file( r, @@ -328,6 +337,187 @@ class TROPOMI(MAPIES): return final_lon, final_lat, final_obs, cumulative_count + @timeit + def yearly_average(self): + """ + Compute the yearly average for the processed data. + """ + cumulative_obs = None + cumulative_lat = None + cumulative_lon = None + cumulative_count = None + + for key, month_data in self.monthly_dict.items(): + if not all(var in month_data for var in ["lon", "lat", "obs", "count"]): + print(f"Skipping month {key}: Missing data.") + continue + if any(month_data[var] is None for var in ["lon", "lat", "obs", "count"]): + print(f"Skipping month {key}: No valid data.") + continue + + lon_values = month_data["lon"] + lat_values = month_data["lat"] + obs_values = month_data["obs"] + count_values = month_data["count"] + + if cumulative_obs is None: + cumulative_obs = obs_values * count_values + cumulative_lat = lat_values * count_values + cumulative_lon = lon_values * count_values + cumulative_count = count_values + else: + cumulative_obs += obs_values * count_values + cumulative_lat += lat_values * count_values + cumulative_lon += lon_values * count_values + cumulative_count += count_values + + if cumulative_obs is None or cumulative_count is None or np.all(cumulative_count == 0): + print("No valid data for the entire year. Returning empty arrays.") + self.yearly_lon = None + self.yearly_lat = None + self.yearly_obs = None + self.yearly_count = None + return + + valid_mask = cumulative_count > 0 + yearly_obs = np.zeros_like(cumulative_obs) + yearly_lat = np.zeros_like(cumulative_lat) + yearly_lon = np.zeros_like(cumulative_lon) + + yearly_obs[valid_mask] = cumulative_obs[valid_mask] / cumulative_count[valid_mask] + yearly_lat[valid_mask] = cumulative_lat[valid_mask] / cumulative_count[valid_mask] + yearly_lon[valid_mask] = cumulative_lon[valid_mask] / cumulative_count[valid_mask] + + self.yearly_lon = yearly_lon + self.yearly_lat = yearly_lat + self.yearly_obs = yearly_obs + self.yearly_count = cumulative_count + + print("Yearly average computation completed.") + + @timeit + def process_lazy_data(self, apply_qa=True, geospatial_crop: NDArray | None = None): + """ + Process the data for the specified time range. It only retrieves a daily dictionary using dask lazy loading. + """ + flag = True + #Check if the user wants qa flags + if apply_qa: + self.apply_qa = True + else: + self.apply_qa = False + if geospatial_crop is not None: + self.geospatial_crop = np.array(geospatial_crop) + for day in self.daily_dict.keys(): + try: + files = self.daily_dict[day]["files"] + args = [ + ( + self.grid, + file, + self.obs_var, + self.lat_var, + self.lon_var, + self.time_var, + self.start_date, + self.end_date, + self.apply_qa, + self.qualityFlags, + flag, + self.geospatial_crop + + ) + for file in files + ] + + daily_obs = [] + daily_lat = [] + daily_lon = [] + daily_time = [] + + with multiprocessing.Pool(processes=8) as pool: # Adjust number of processes as needed + results = pool.starmap(process_single_file, args) + + valid_results = [res for res in results if res is not None] + # Check shapes of obs lat and lon to see if they are all the same + + for obs, lat, lon, time_values in valid_results: + #print(f'Shape of obs: {obs.shape}, Shape of lat: {lat.shape}, Shape of lon: {lon.shape}, Shape of time: {time_values.shape}') + if len(obs) > 0: + daily_lon.append(lon) + daily_lat.append(lat) + daily_obs.append(obs) + daily_time.append(time_values) + + if not daily_obs: + print(f"No valid data for day {day}") + continue + + final_obs = np.concatenate(daily_obs, axis=0) + final_lon = np.concatenate(daily_lon, axis=0) + final_lat = np.concatenate(daily_lat, axis=0) + final_time = np.concatenate(daily_time, axis=0) + + # Check if any valid data exists + if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): + print(f"All observation values for day {day} are 0 or NaN. Skipping...") + continue + self.to_netCDF(final_obs, final_lat, final_lon, final_time, self.start_date, self.end_date) + + except Exception as e: + print(f"Error processing data for day {day}: {e}") + continue + + + def to_netCDF(self, obs, lat, lon, time, start_time, end_time): + try: + # Convert time to seconds since TIME_ORIGIN + final_time_seconds = (time - TIME_ORIGIN) / np.timedelta64(1, "s") + final_time_seconds = np.round(final_time_seconds, 0) + + grid_representation_str = json.dumps(self.grid_config, ensure_ascii=False) + qa_flags_str = json.dumps(self.qualityFlags, ensure_ascii=False) + + # Create xarray Dataset + ds = xr.Dataset( + coords={"time": ("time", final_time_seconds, { + "units": "seconds since 1900-01-01 00:00:00", + "calendar": "gregorian" + })}, + + data_vars={ + "lon": (["time"], lon, VARS_ATTRIBUTES['lon'].copy()), + "lat": (["time"], lat, VARS_ATTRIBUTES['lat'].copy()), + VARS_ATTRIBUTES[self.obs_var]["mapies_variable"]: (["time"], obs, VARS_ATTRIBUTES[self.obs_var].copy()), + }, + attrs={ + "title": f"{VARS_ATTRIBUTES['title']['description']} from {start_time} to {end_time}", + "institution": "Barcelona Supercomputing Center", + "grid": f"{self.grid_repr}: {grid_representation_str}", + "Developers": "MAPIES team", + "QA": f"Quality Assurance: {self.apply_qa}, quality assurance applied: {qa_flags_str}", + "history": f"Created {datetime.now()}", + }, + ) + + # Define encoding for compact storage + encoding = { + "time": {"dtype": "float32"}, + "lon": {"dtype": "float32"}, + "lat": {"dtype": "float32"}, + VARS_ATTRIBUTES[self.obs_var]["mapies_variable"]: {"dtype": "float32"}, + } + + # Save to NetCDF file + filename = f"{self.dest}/Data_{self.datatype}_{start_time}_{end_time}.nc" + ds.to_netcdf(filename, mode='w', encoding=encoding) + + print(f"Saved data to {filename}") + + except Exception as e: + print(f"Error saving data into netCDF: {e}") + return + def plot_2D_observations(self, months=[0], filename=None, outdir=None): diff --git a/mapies/util/variables_names.py b/mapies/util/variables_names.py index 12be66d7..a17b08cb 100644 --- a/mapies/util/variables_names.py +++ b/mapies/util/variables_names.py @@ -1,6 +1,7 @@ VARS_ATTRIBUTES={ "lon": {"description": "Longitude of observations", "units": "degrees_east"}, "lat": {"description": "Latitude of observations", "units": "degrees_north"}, - "Aerosol_Optical_Thickness_550_Land_Ocean_Best_Estimate": {"description": "Aerosol_Optical_Thickness_550_Land_Ocean_Best_Estimate values", "units": "unitless"}, + "Aerosol_Optical_Thickness_550_Land_Ocean_Best_Estimate": {"description": "Aerosol_Optical_Thickness_550_Land_Ocean_Best_Estimate values", "units": "unitless", "mapies_variable":"aod550"}, + "nitrogendioxide_tropospheric_column": {"description": "nitrogendioxide_tropospheric_column", "units": "unitless", "mapies_variable":"no2"}, "title": {"description": "VIIRS data"}, } \ No newline at end of file diff --git a/mapies/viirs.py b/mapies/viirs.py index 5c815461..fd59d96d 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -26,8 +26,6 @@ from mapies.util.func_tools import * #timeit, get_file_list, time_converter, frequency_int, error_estimation, time_domain_selection, geo_to_rot from mapies.grids.monarch import RotatedGrid, IrregularRotatedGrid, RegularGrid, regridding_function from mapies.util.variables_names import VARS_ATTRIBUTES -from pathlib import Path -from glob import glob as glob_module import json from numpy.typing import DTypeLike, NDArray @@ -506,7 +504,7 @@ class VIIRS(MAPIES): # Check shapes of obs lat and lon to see if they are all the same for obs, lat, lon, time_values in valid_results: - print(f'Shape of obs: {obs.shape}, Shape of lat: {lat.shape}, Shape of lon: {lon.shape}, Shape of time: {time_values.shape}') + #print(f'Shape of obs: {obs.shape}, Shape of lat: {lat.shape}, Shape of lon: {lon.shape}, Shape of time: {time_values.shape}') if len(obs) > 0: daily_lon.append(lon) daily_lat.append(lat) @@ -526,12 +524,12 @@ class VIIRS(MAPIES): if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): print(f"All observation values for day {day} are 0 or NaN. Skipping...") continue + self.to_netCDF(final_obs, final_lat, final_lon, final_time, self.start_date, self.end_date) except Exception as e: print(f"Error processing data for day {day}: {e}") continue - self.to_netCDF(final_obs, final_lat, final_lon, final_time, self.start_date, self.end_date) - + def to_netCDF(self, obs, lat, lon, time, start_time, end_time): try: # Convert time to seconds since TIME_ORIGIN @@ -572,7 +570,7 @@ class VIIRS(MAPIES): } # Save to NetCDF file - filename = f"{self.dest}/Data_{start_time}_{end_time}.nc" + filename = f"{self.dest}/Data_{self.datatype}_{start_time}_{end_time}.nc" ds.to_netcdf(filename, mode='w', encoding=encoding) print(f"Saved data to {filename}") diff --git a/run/run_tropomi.py b/run/run_tropomi.py index c42eb7d6..ad5ff778 100644 --- a/run/run_tropomi.py +++ b/run/run_tropomi.py @@ -20,6 +20,7 @@ if __name__ == "__main__": # c.yearly_average() c.plot_2D_observations(months=[0], outdir=outdir) c.plot_2D_num_obs(months=[0], outdir=outdir) + c.process_lazy_data(apply_qa=True, geospatial_crop=[[-40, 40], [0, 70]]) end_time = time.time() elapsed_time = end_time - start_time print(f"Script executed in {elapsed_time:.2f} seconds.") diff --git a/run/run_viirs.py b/run/run_viirs.py index 24ef6269..56a86b18 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -7,7 +7,7 @@ if __name__ == "__main__": # Change these variables # 00:30 to 02:36 from 2024-01-01 to 2024-01-03 start_date = "202401010030" - end_date = "202401012359" + end_date = "202401022359" -- GitLab From c0d581e124d140abb1e9c4c4ed622635d4cef121 Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Thu, 20 Feb 2025 12:37:17 +0100 Subject: [PATCH 2/2] Update process lazy data to output correct day and add tests --- mapies/tests/test_func_tools.py | 19 +++++++++++++- mapies/tropomi.py | 2 ++ mapies/util/func_tools.py | 45 ++++++++++++++++++++++++--------- mapies/viirs.py | 6 ++++- run/run_viirs.py | 6 ++--- 5 files changed, 61 insertions(+), 17 deletions(-) diff --git a/mapies/tests/test_func_tools.py b/mapies/tests/test_func_tools.py index da0df3e7..97e7ba8c 100644 --- a/mapies/tests/test_func_tools.py +++ b/mapies/tests/test_func_tools.py @@ -4,7 +4,7 @@ import pytest import pandas as pd import xarray as xr import numpy as np -from mapies.util.func_tools import time_converter, time_domain_selection, geo_to_rot, quality_assurance +from mapies.util.func_tools import time_converter, day_boundaries, time_domain_selection, geo_to_rot, quality_assurance @@ -59,6 +59,23 @@ def test_quality_assurance(test_input, expected): assert quality_assurance(*test_input)[0].all() == expected[0].all() assert quality_assurance(*test_input)[1].all() == expected[1].all() assert quality_assurance(*test_input)[2].all() == expected[2].all() + + +@pytest.mark.parametrize( + "test_input,expected", + [ + ((1, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301010030"), pd.to_datetime("20230101235959").ceil(freq="d"))), + ((2, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301020000"), pd.to_datetime("20230102235959").ceil(freq="d"))), + ((3, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301030000"), pd.to_datetime("202301030030"))), + + ] + +) +def test_day_boundaries(test_input, expected): + assert day_boundaries(*test_input)[0] == expected[0] + assert day_boundaries(*test_input)[1] == expected[1] + + """ @pytest.mark.parametrize( "test_input,expected", diff --git a/mapies/tropomi.py b/mapies/tropomi.py index 5cf081eb..9c07b7cf 100644 --- a/mapies/tropomi.py +++ b/mapies/tropomi.py @@ -462,6 +462,8 @@ class TROPOMI(MAPIES): if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): print(f"All observation values for day {day} are 0 or NaN. Skipping...") continue + # Get day start and day end but only if they are within object start and end date + day_start, day_end = day_boundaries(day, self.start_date, self.end_date) self.to_netCDF(final_obs, final_lat, final_lon, final_time, self.start_date, self.end_date) except Exception as e: diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index c9d50981..295084dd 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -343,16 +343,37 @@ def new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end def reindex( - dependent_var_index, - independent_var_values, - ): - """" - Recutting the data whenever a selction of the data has been made along one of the dimenisons - - Based off of how it has been done in Providentia - """ - # Maybe add a checker try/except or assert - independent_var_values = independent_var_values[dependent_var_index] + dependent_var_index, + independent_var_values, + ): + """" + Recutting the data whenever a selction of the data has been made along one of the dimenisons + + Based off of how it has been done in Providentia + """ + # Maybe add a checker try/except or assert + independent_var_values = independent_var_values[dependent_var_index] + + # return reindexed values + return independent_var_values + + +def day_boundaries(day, start_date, end_date): + """ + Get daily boundaries only if they are within start and end date + If not return the start and end date as the boundaries + """ + from datetime import datetime, timedelta, time + date = start_date + timedelta(days=day) - timedelta(days=1) + day_start = date.floor(freq="d") + day_end = date.ceil(freq="d") + + # If day_start or day_end are outside the regions of start_date and end_date + # Return those instead + if day_start < start_date: + day_start = start_date + + if day_end > end_date: + day_end = end_date - # return reindexed values - return independent_var_values \ No newline at end of file + return day_start, day_end \ No newline at end of file diff --git a/mapies/viirs.py b/mapies/viirs.py index fd59d96d..9e5905a2 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -524,7 +524,11 @@ class VIIRS(MAPIES): if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): print(f"All observation values for day {day} are 0 or NaN. Skipping...") continue - self.to_netCDF(final_obs, final_lat, final_lon, final_time, self.start_date, self.end_date) + + + # Get day start and day end but only if they are within object start and end date + day_start, day_end = day_boundaries(day, self.start_date, self.end_date) + self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) except Exception as e: print(f"Error processing data for day {day}: {e}") diff --git a/run/run_viirs.py b/run/run_viirs.py index 56a86b18..f8360fec 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -21,10 +21,10 @@ if __name__ == "__main__": c.read_nc() - c.process_data(monthly_avg=False, batch_size = 50, apply_qa=True, save=True) + #c.process_data(monthly_avg=False, batch_size = 50, apply_qa=True, save=True) # c.yearly_average() - c.plot_2D_observations(months=[0], outdir=outdir) - c.plot_2D_num_obs(months=[0], outdir=outdir) + #c.plot_2D_observations(months=[0], outdir=outdir) + #c.plot_2D_num_obs(months=[0], outdir=outdir) c.process_lazy_data(apply_qa=True) -- GitLab