From 8483c7c0c76b01c895953494436cb4a161e7d4e9 Mon Sep 17 00:00:00 2001 From: Carlotta Date: Thu, 13 Mar 2025 17:42:46 +0100 Subject: [PATCH 1/7] new methods --- mapies/util/func_tools.py | 12 +++++++++ mapies/viirs.py | 52 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index fb2c05b0..a19023c2 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -488,3 +488,15 @@ def filter_observations(obs, lat, lon, count): return final_obs, final_lat, final_lon, final_count + +def time_slicing(start_date, end_date, frequency): + """ + Time slicing function + """ + + date_slices = pd.date_range(start_date, end_date, freq=frequency.strftime('%Y%m%d%H%M')) + + return date_slices + + + \ No newline at end of file diff --git a/mapies/viirs.py b/mapies/viirs.py index 05e03330..85eac5e7 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -46,6 +46,7 @@ class VIIRS(MAPIES): self.end_day = int(end_date[6:8]) self.monthly_dict = {} self.daily_dict = {} # Dictionary to produce daily netcdf files + self.avg_results = [] # List to store averaged results (when averaging by frequency) self.dates_slice = pd.date_range( self.start_date, @@ -250,4 +251,53 @@ class VIIRS(MAPIES): ds.to_netcdf(filename, encoding={}) outfiles.append(filename) - return outfiles \ No newline at end of file + return outfiles + + + def average_by_frequency(self, frequency="D", save_nc=True): + """ + Function to average the observations by frequency + """ + avg_results = [] # List to store final results + + dates_slice = time_slicing(self.start_date, self.end_date, frequency) + + for date in dates_slice: + l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=self.frequency/2)).strftime('%Y%m%d%H%M') + r_border = (datetime.strptime(date, "%Y%m%d%H%M") + timedelta(hours=self.frequency/2)).strftime('%Y%m%d%H%M') + + # Collect data + obs, lon, lat, time_values = np.array([]), np.array([]), np.array([]), np.array([], dtype='datetime64') + days = pd.date_range(l_border, r_border, freq="D") + + for pandas_day in days: + day = pandas_day.day + try: + day_obs = self.daily_dict[day]["obs"] + day_lon = self.daily_dict[day]["lon"] + day_lat = self.daily_dict[day]["lat"] + day_time = self.daily_dict[day]["time"] + except KeyError: + continue + obs = np.append(obs, day_obs) + lon = np.append(lon, day_lon) + lat = np.append(lat, day_lat) + time_values = np.append(time_values, day_time) + + # Filter by time range + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) + + # Perform averaging on the grid + if isinstance(self.grid, RotatedGrid): + lon, lat, rlon, rlat, obs, obserr, count_obs = self.grid.aggregate(lon, lat, obs, None) + elif isinstance(self.grid, RegularGrid): + lon, lat, obs, obserr, count_obs = self.grid.aggregate(lon, lat, obs, None) + + # Store the averaged results + avg_results.append((date, lon, lat, obs)) + + if save_nc: + day_start, day_end = day_boundaries(day, self.start_date, self.end_date) + self.to_netCDF(obs, lat, lon, time_values, day_start, day_end) + + self.avg_results = avg_results \ No newline at end of file -- GitLab From dd0b93c279a14484e81aea99e645e14679afd025 Mon Sep 17 00:00:00 2001 From: Carlotta Date: Thu, 13 Mar 2025 17:44:41 +0100 Subject: [PATCH 2/7] updates --- run/run_viirs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/run/run_viirs.py b/run/run_viirs.py index fac75931..a73571d2 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -23,7 +23,6 @@ if __name__ == "__main__": c.gather_nc_files() - c.process_avg_data(monthly_avg=False, batch_size = 100, apply_qa=True, save=True) #c.yearly_average() c.plot_2D_observations(months=[0], filename="new.png", outdir=outdir) -- GitLab From 60d71a1834a7449b5843d0ec241576913e2147f5 Mon Sep 17 00:00:00 2001 From: Carlotta Date: Wed, 26 Mar 2025 18:08:40 +0100 Subject: [PATCH 3/7] First attempt to produce averages by frequency (it still need to be tested) --- mapies/mapies.py | 128 +++++++++++++++++++++++++++++--------- mapies/util/func_tools.py | 10 +-- mapies/viirs.py | 59 +++--------------- run/run_viirs.py | 21 ++++--- 4 files changed, 123 insertions(+), 95 deletions(-) diff --git a/mapies/mapies.py b/mapies/mapies.py index bc412f73..e784200c 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # MAPIES base -from datetime import datetime +from datetime import datetime, timedelta from mapies.util.func_tools import * from mapies.grids.rotated import RotatedGrid, IrregularRotatedGrid, CAMSGrid, BDRCGrid @@ -112,7 +112,7 @@ class MAPIES: @timeit @logger.catch - def process_avg_data(self, monthly_avg=False, batch_size=100, save=False, apply_qa=True, geospatial_crop: NDArray | None = None): + def process_avg_data(self, frequency_avg: str | None = None, batch_size=100, save=False, apply_qa=True, geospatial_crop: NDArray | None = None): """ Process the data. @@ -135,34 +135,38 @@ class MAPIES: if geospatial_crop is not None: logger.info(f"Applying Geospatial cropping: {geospatial_crop}") self.geospatial_crop = np.array(geospatial_crop) - if monthly_avg: - for month in self.monthly_dict.keys(): - - # Catch if they haven't selected a grid - try: - logger.info(f"Running average on {self.grid}") - except AttributeError: - raise Exception("Please specify a grid in the initialisation of MAPIES, using grid_repr=''") - - # Process the data for the current month - final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( - batch_size=batch_size, month=month - ) - - self.monthly_dict[month] = { - "lon": final_lon, - "lat": final_lat, - "obs": final_obs, - "count": cumulative_count, - } - logger.debug(f'Updated monthly dictionary for month {month}') - logger.debug(f'Lon shape: {final_lon.shape}, Lat shape: {final_lat.shape}, Obs shape: {final_obs.shape}') - if np.all(final_lon == 0) or np.all(np.isnan(final_lon)): - logger.warning(f"All longitude values for month {month} are 0 or NaN.") - if np.all(final_lat == 0) or np.all(np.isnan(final_lat)): - logger.warning(f"All latitude values for month {month} are 0 or NaN.") - if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): - logger.warning(f"All observation values for month {month} are 0 or NaN.") + if frequency_avg is not None: + logger.info(f"Running frequency-based averaging with frequency='{frequency_avg}'") + self.process_lazy_data(apply_qa=apply_qa, geospatial_crop=geospatial_crop, save=False) + self.average_by_frequency(frequency=frequency_avg, save_nc=save) + # if monthly_avg: + # for month in self.monthly_dict.keys(): + + # # Catch if they haven't selected a grid + # try: + # logger.info(f"Running average on {self.grid}") + # except AttributeError: + # raise Exception("Please specify a grid in the initialisation of MAPIES, using grid_repr=''") + + # # Process the data for the current month + # final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( + # batch_size=batch_size, month=month + # ) + + # self.monthly_dict[month] = { + # "lon": final_lon, + # "lat": final_lat, + # "obs": final_obs, + # "count": cumulative_count, + # } + # logger.debug(f'Updated monthly dictionary for month {month}') + # logger.debug(f'Lon shape: {final_lon.shape}, Lat shape: {final_lat.shape}, Obs shape: {final_obs.shape}') + # if np.all(final_lon == 0) or np.all(np.isnan(final_lon)): + # logger.warning(f"All longitude values for month {month} are 0 or NaN.") + # if np.all(final_lat == 0) or np.all(np.isnan(final_lat)): + # logger.warning(f"All latitude values for month {month} are 0 or NaN.") + # if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): + # logger.warning(f"All observation values for month {month} are 0 or NaN.") else: final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( @@ -179,6 +183,70 @@ class MAPIES: logger.debug(f'Saved processed data to netCDF') + def average_by_frequency(self, frequency="D", save_nc=True): + """ + Function to average the observations by frequency + """ + + freq_hours = frequency_int(frequency) + avg_results = [] + + dates_slice = time_slicing(self.start_date, self.end_date, frequency) + + for date in dates_slice: + center_time = datetime.strptime(date, "%Y%m%d%H%M") + l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') + r_border = (datetime.strptime(date, "%Y%m%d%H%M") + timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') + + # Collect data + obs, lon, lat, time_values = np.array([]), np.array([]), np.array([]), np.array([], dtype='datetime64') + days = pd.date_range(l_border, r_border, freq="D") + + valid_results = [] + for pandas_day in days: + day = pandas_day.date() + try: + day_obs = self.daily_dict[day]["obs"] + day_lon = self.daily_dict[day]["lon"] + day_lat = self.daily_dict[day]["lat"] + day_time = self.daily_dict[day]["time"] + valid_results.append((day_obs, day_lat, day_lon, day_time)) + except KeyError: + continue + + # Concatenate all arrays together + if not valid_results: + continue + obs, lat, lon, time_values = append_and_concatenate(valid_results, day) + if obs is None: + continue # No valid data for this time slice + + # Filter by exact time range + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) + + # Aggregate on the grid + if isinstance(self.grid, RotatedGrid): + lon, lat, rlon, rlat, obs, count_obs = self.grid.aggregate(lon, lat, obs, None) + elif isinstance(self.grid, RegularGrid): + lon, lat, obs, obserr, count_obs = self.grid.aggregate(lon, lat, obs, None) + else: + raise ValueError("Unsupported grid type") + + # Save result + avg_results.append((date, lon, lat, obs)) + + # Optionally save NetCDF + if save_nc: + # Use the center_time date for boundaries + day_start, day_end = day_boundaries(center_time.date(), self.start_date, self.end_date) + self.to_netCDF(obs, lat, lon, time_values, day_start, day_end) + + # Optional: free memory + del obs, lon, lat, time_values, valid_results + + # Store the results in memory if needed + self.avg_results = avg_results + def process_data_mp(self, batch_size, month=None): """ diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index a19023c2..fb1f20d7 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -344,9 +344,11 @@ def day_boundaries(day, start_date, end_date): If not return the start and end date as the boundaries """ from datetime import datetime, timedelta, time - date = start_date + timedelta(days=day) - timedelta(days=1) - day_start = date.floor(freq="d") - day_end = (date + timedelta(days=1)).floor(freq="d") + # date = start_date + timedelta(days=day) - timedelta(days=1) + # day_start = date.floor(freq="d") + # day_end = (date + timedelta(days=1)).floor(freq="d") + day_start = datetime.combine(day, datetime.min.time()) + day_end = day_start + timedelta(days=1) # If day_start or day_end are outside the regions of start_date and end_date # Return those instead @@ -494,7 +496,7 @@ def time_slicing(start_date, end_date, frequency): Time slicing function """ - date_slices = pd.date_range(start_date, end_date, freq=frequency.strftime('%Y%m%d%H%M')) + date_slices = pd.date_range(start_date, end_date, freq=frequency).strftime('%Y%m%d%H%M') return date_slices diff --git a/mapies/viirs.py b/mapies/viirs.py index 85eac5e7..8aed8f6f 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -114,6 +114,7 @@ class VIIRS(MAPIES): parts = file.split('.') julian_day = parts[1][1:] date = datetime.strptime(julian_day, "%Y%j") + full_date = date.date() # Get the full date for the daily_dict keys if date.month not in self.monthly_dict: self.monthly_dict[date.month] = { "files": [], # List to store file paths @@ -122,8 +123,8 @@ class VIIRS(MAPIES): "obs": None, # Observations array "count": None, # Count of observations used to compute the average } - if date.day not in self.daily_dict: - self.daily_dict[date.day] = { + if full_date not in self.daily_dict: + self.daily_dict[full_date] = { "files": [], # List to store file paths "lon": None, # Longitudes array "lat": None, # Latitudes array @@ -132,7 +133,7 @@ class VIIRS(MAPIES): } self.monthly_dict[date.month]["files"].append(file) - self.daily_dict[date.day]["files"].append(file) + self.daily_dict[full_date]["files"].append(file) logger.debug(f"Discovered files for months: {list(self.monthly_dict.keys())}") for month, data in self.monthly_dict.items(): @@ -141,7 +142,7 @@ class VIIRS(MAPIES): logger.debug(f"Discovered files for days: {list(self.daily_dict.keys())}") for day, data in self.daily_dict.items(): - logger.debug(f"Day {day:02d}: {len(data['files'])} files") + logger.debug(f"Day {day.strftime('%Y-%m-%d')}: {len(data['files'])} files") # ============================================================================= @@ -175,7 +176,7 @@ class VIIRS(MAPIES): time_values = np.array([], dtype='datetime64') days = pd.date_range(l_border, r_border, freq="D") for pandas_day in days: - day = pandas_day.day + day = pandas_day.date() try: day_obs = self.daily_dict[day]["obs"] day_lon = self.daily_dict[day]["lon"] @@ -254,50 +255,4 @@ class VIIRS(MAPIES): return outfiles - def average_by_frequency(self, frequency="D", save_nc=True): - """ - Function to average the observations by frequency - """ - avg_results = [] # List to store final results - - dates_slice = time_slicing(self.start_date, self.end_date, frequency) - - for date in dates_slice: - l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=self.frequency/2)).strftime('%Y%m%d%H%M') - r_border = (datetime.strptime(date, "%Y%m%d%H%M") + timedelta(hours=self.frequency/2)).strftime('%Y%m%d%H%M') - - # Collect data - obs, lon, lat, time_values = np.array([]), np.array([]), np.array([]), np.array([], dtype='datetime64') - days = pd.date_range(l_border, r_border, freq="D") - - for pandas_day in days: - day = pandas_day.day - try: - day_obs = self.daily_dict[day]["obs"] - day_lon = self.daily_dict[day]["lon"] - day_lat = self.daily_dict[day]["lat"] - day_time = self.daily_dict[day]["time"] - except KeyError: - continue - obs = np.append(obs, day_obs) - lon = np.append(lon, day_lon) - lat = np.append(lat, day_lat) - time_values = np.append(time_values, day_time) - - # Filter by time range - obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) - - # Perform averaging on the grid - if isinstance(self.grid, RotatedGrid): - lon, lat, rlon, rlat, obs, obserr, count_obs = self.grid.aggregate(lon, lat, obs, None) - elif isinstance(self.grid, RegularGrid): - lon, lat, obs, obserr, count_obs = self.grid.aggregate(lon, lat, obs, None) - - # Store the averaged results - avg_results.append((date, lon, lat, obs)) - - if save_nc: - day_start, day_end = day_boundaries(day, self.start_date, self.end_date) - self.to_netCDF(obs, lat, lon, time_values, day_start, day_end) - - self.avg_results = avg_results \ No newline at end of file + \ No newline at end of file diff --git a/run/run_viirs.py b/run/run_viirs.py index a73571d2..e685df45 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -6,14 +6,17 @@ import time if __name__ == "__main__": # Change these variables # 00:30 to 02:36 from 2024-01-01 to 2024-01-03 - start_date = "202401010000" - end_date = "202401012359" + start_date = "202401080000" + end_date = "202401082359" - outdir="/home/cmeikle/Projects/data/" - indir="/home/cmeikle/Projects/data/VIIRS" + # outdir="/home/cmeikle/Projects/data/" + # indir="/home/cmeikle/Projects/data/VIIRS" + outdir="/home/cgile/Documents/mapies/figures" + indir="/home/cgile/Documents/mapies/VIIRS" + # indir="/home/cgile/bscearth000/esarchive/obs/nasa/viirs_noaa20_aerdb_l2/original_files/VIIRS" @@ -23,13 +26,13 @@ if __name__ == "__main__": c.gather_nc_files() - c.process_avg_data(monthly_avg=False, batch_size = 100, apply_qa=True, save=True) + c.process_avg_data(frequency_avg="3H", batch_size = 100, apply_qa=True, save=False) #c.yearly_average() - c.plot_2D_observations(months=[0], filename="new.png", outdir=outdir) - c.plot_2D_num_obs(months=[0], outdir=outdir) - c.process_lazy_data(apply_qa=True, save=False) + # c.plot_2D_observations(months=[0], filename="new.png", outdir=outdir) + # c.plot_2D_num_obs(months=[0], outdir=outdir) + # c.process_lazy_data(apply_qa=True, save=False) - c.to_da(frequency="3H", save_figs=True) + # c.to_da(frequency="3H", save_figs=True) -- GitLab From da928a62769ecd9801a2b9ee9283ea61f8ac7734 Mon Sep 17 00:00:00 2001 From: Carlotta Date: Thu, 27 Mar 2025 18:03:37 +0100 Subject: [PATCH 4/7] Final version of average_by_frequency --- mapies/mapies.py | 287 ++++++++++++++++---------------- mapies/tests/test_func_tools.py | 37 +++- mapies/util/func_tools.py | 6 + mapies/viirs.py | 2 +- run/run_viirs.py | 18 +- 5 files changed, 191 insertions(+), 159 deletions(-) diff --git a/mapies/mapies.py b/mapies/mapies.py index e784200c..ba058bee 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -12,6 +12,7 @@ import pandas as pd import xarray as xr import matplotlib.pyplot as plt import cartopy.crs as ccrs +from collections import defaultdict import os import sys import yaml @@ -110,6 +111,55 @@ class MAPIES: self.qualityFlags = qa_dict["qa_variables"] + @timeit + @logger.catch + def build_frequency_dict(self, frequency: str): + """ + Build self.frequency_dict by grouping files according to the selected frequency. + + - If frequency < 24h --> daily dict + - If 1D <= frequency < 7D --> weekly dict + - If >= 7D (weekly or coarser) --> monthly dict + """ + self.frequency_dict = defaultdict(lambda: { + "files": [], + "obs": None, + "lat": None, + "lon": None, + "time": None + }) + + freq_hours = frequency_int(frequency) + + for file in self.files: + parts = file.split('.') + julian_day = parts[1][1:] + date = datetime.strptime(julian_day, "%Y%j") + + if freq_hours < 24: + dict_key = date.date() + elif 24 <= freq_hours < 7*24: + iso_year, iso_week, _ = date.isocalendar() + dict_key = f"{iso_year}-{iso_week:02d}" + else: + dict_key = date.month + + if dict_key not in self.frequency_dict: + self.monthly_dict[dict_key] = { + "files": [], # List to store file paths + "lon": None, # Longitudes array + "lat": None, # Latitudes array + "obs": None, # Observations array + "count": None, # Count of observations used to compute the average + } + self.frequency_dict[dict_key]["files"].append(file) + + + logger.debug(f"Built frequency_dict with {len(self.frequency_dict)} keys.") + for key, data in self.frequency_dict.items(): + logger.debug(f" Key {key}: {len(data['files'])} files") + + @timeit @logger.catch def process_avg_data(self, frequency_avg: str | None = None, batch_size=100, save=False, apply_qa=True, geospatial_crop: NDArray | None = None): @@ -137,37 +187,11 @@ class MAPIES: self.geospatial_crop = np.array(geospatial_crop) if frequency_avg is not None: logger.info(f"Running frequency-based averaging with frequency='{frequency_avg}'") + # self.build_frequency_dict(frequency=frequency_avg) + # self.process_lazy_data2(apply_qa=apply_qa, geospatial_crop=geospatial_crop, save=False) + # self.average_by_frequency2(frequency=frequency_avg, save_nc=save) self.process_lazy_data(apply_qa=apply_qa, geospatial_crop=geospatial_crop, save=False) self.average_by_frequency(frequency=frequency_avg, save_nc=save) - # if monthly_avg: - # for month in self.monthly_dict.keys(): - - # # Catch if they haven't selected a grid - # try: - # logger.info(f"Running average on {self.grid}") - # except AttributeError: - # raise Exception("Please specify a grid in the initialisation of MAPIES, using grid_repr=''") - - # # Process the data for the current month - # final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( - # batch_size=batch_size, month=month - # ) - - # self.monthly_dict[month] = { - # "lon": final_lon, - # "lat": final_lat, - # "obs": final_obs, - # "count": cumulative_count, - # } - # logger.debug(f'Updated monthly dictionary for month {month}') - # logger.debug(f'Lon shape: {final_lon.shape}, Lat shape: {final_lat.shape}, Obs shape: {final_obs.shape}') - # if np.all(final_lon == 0) or np.all(np.isnan(final_lon)): - # logger.warning(f"All longitude values for month {month} are 0 or NaN.") - # if np.all(final_lat == 0) or np.all(np.isnan(final_lat)): - # logger.warning(f"All latitude values for month {month} are 0 or NaN.") - # if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): - # logger.warning(f"All observation values for month {month} are 0 or NaN.") - else: final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( batch_size=batch_size, month=None @@ -189,9 +213,11 @@ class MAPIES: """ freq_hours = frequency_int(frequency) + logger.debug(f"Frequency in hours: {freq_hours}") avg_results = [] dates_slice = time_slicing(self.start_date, self.end_date, frequency) + logger.debug(f"Dates slice: {dates_slice}") for date in dates_slice: center_time = datetime.strptime(date, "%Y%m%d%H%M") @@ -233,7 +259,7 @@ class MAPIES: raise ValueError("Unsupported grid type") # Save result - avg_results.append((date, lon, lat, obs)) + avg_results.append((date, lon, lat, obs, count_obs)) # Optionally save NetCDF if save_nc: @@ -316,51 +342,6 @@ class MAPIES: return final_lon, final_lat, final_obs, cumulative_count - @timeit - @logger.catch - def yearly_average(self): - """ - Compute the yearly average for the processed data. - """ - cumulative_obs = None - cumulative_lat = None - cumulative_lon = None - cumulative_count = None - - for key, month_data in self.monthly_dict.items(): - if not all(var in month_data for var in ["lon", "lat", "obs", "count"]): - logger.warning(f"Skipping month {key}: Missing data.") - continue - if any(month_data[var] is None for var in ["lon", "lat", "obs", "count"]): - logger.warning(f"Skipping month {key}: No valid data.") - continue - - lon_values = month_data["lon"] - lat_values = month_data["lat"] - obs_values = month_data["obs"] - count_values = month_data["count"] - - results = [(obs_values, lat_values, lon_values, count_values)] - cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count = combine_arrays(results, cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count) - - if cumulative_obs is None or cumulative_count is None or np.all(cumulative_count == 0): - logger.warning("No valid data for the entire year. Returning empty arrays.") - self.yearly_lon = None - self.yearly_lat = None - self.yearly_obs = None - self.yearly_count = None - return - - yearly_obs, yearly_lat, yearly_lon = compute_final_averages(cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count) - - self.yearly_lon = yearly_lon - self.yearly_lat = yearly_lat - self.yearly_obs = yearly_obs - self.yearly_count = cumulative_count - - logger.debug("Yearly average computation completed.") - - @timeit @logger.catch def process_lazy_data(self, apply_qa=True, geospatial_crop: NDArray | None = None, save=True): @@ -418,6 +399,57 @@ class MAPIES: if save: self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) + @timeit + @logger.catch + def process_lazy_data2(self, apply_qa=True, geospatial_crop: NDArray | None = None, save=True): + """ + Process the data for the specified time range. It only retrieves a daily dictionary using dask lazy loading. + """ + flag = True + #Check if the user wants qa flags + if apply_qa: + self.apply_qa = True + else: + self.apply_qa = False + if geospatial_crop is not None: + self.geospatial_crop = np.array(geospatial_crop) + for dict_key in self.frequency_dict.keys(): + files = self.frequency_dict[dict_key]["files"] + + # Use multiprocessing to process files within the batch + + viirs_mp = self.mp_type( + self.grid, + self.obs_var, + self.lat_var, + self.lon_var, + self.time_var, + self.start_date, + self.end_date, + self.apply_qa, + self.qualityFlags, + files, + self.geospatial_crop, + flag, + ) + valid_results, failed_files = viirs_mp.process_batch() + + final_obs, final_lat, final_lon, final_time = append_and_concatenate(valid_results, dict_key) + + # Skip the day if no valid data was processed + if final_obs is None: + continue + + # Check if any valid data exists + if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): + logger.warning(f"All observation values for day {dict_key} are 0 or NaN. Skipping...") + continue + + # Update day dict + self.frequency_dict[dict_key]["obs"] = final_obs + self.frequency_dict[dict_key]["lon"] = final_lon + self.frequency_dict[dict_key]["lat"] = final_lat + self.frequency_dict[dict_key]["time"] = final_time @logger.catch def to_netCDF(self, obs, lat, lon, time, start_time, end_time): @@ -492,44 +524,27 @@ class MAPIES: filename=filename, ) return - elif months is not None: - for m in months: - try: - month_data = self.monthly_dict[m] - lon_values = month_data["lon"] - lat_values = month_data["lat"] - obs_values = month_data["obs"] - except Exception as e: - logger.error(f"Error for month {m}: {e}") - continue - title = f"Observation 2D plot of month {m} from {self.start_date} to {self.end_date}" - filename = filename or f"{self.datatype}_2D_obs_month_{m}_year_{self.year}.png" - super().plot_2D_obs_custom( - lon_values=lon_values, - lat_values=lat_values, - obs_values=obs_values, - outdir=outdir, - title=title, - filename=filename, - ) - else: + elif months == [1]: try: - lon_values = self.yearly_lon - lat_values = self.yearly_lat - obs_values = self.yearly_obs + for i, (date, lon_values, lat_values, obs_values, count_obs) in enumerate(self.avg_results): + date_obj = datetime.strptime(date, "%Y%m%d%H%M") + title = f"Observation 2D plot - {date_obj}, average {i}" + filename = f"{self.datatype}_2D_obs_from_{self.start_date}_to_{self.end_date}_average_{i}.png" + + self.plot_2D_obs_custom( + lon_values=lon_values, + lat_values=lat_values, + obs_values=obs_values, + outdir=outdir, + title=title, + filename=filename, + ) except Exception as e: - logger.error(f"Error for yearly data: {e}") + logger.error(f"Error plotting all data: {e}") return - title = f"Observation 2D plot of yearly average of year {self.year}" - filename = filename or f"{self.datatype}_2D_yearly_obs_{self.year}.png" - self.plot_2D_obs_custom( - lon_values=lon_values, - lat_values=lat_values, - obs_values=obs_values, - outdir=outdir, - title=title, - filename=filename, - ) + else: + logger.error("Error: months can be either [0] for all data, [1] for averages by frequency.") + return def plot_2D_num_obs(self, months=[0], filename=None, outdir=None): @@ -553,44 +568,26 @@ class MAPIES: filename=filename, ) return - elif months is not None: - for m in months: - try: - month_data = self.monthly_dict[m] - lon_values = month_data["lon"] - lat_values = month_data["lat"] - cum_count = month_data["count"] - except Exception as e: - logger.error(f"Error for month {m}: {e}") - continue - title = f"Plot of the number of valid observations for month {m} from {self.start_date} to {self.end_date}" - filename = filename or f"{self.datatype}_2D_obs_count_month_{m}_year_{self.year}.png" - self.plot_2D_num_obs_custom( - lon_values=lon_values, - lat_values=lat_values, - num_obs=cum_count, - outdir=outdir, - title=title, - filename=filename, - ) - else: + elif months == [1]: try: - lon_values = self.yearly_lon - lat_values = self.yearly_lat - cum_count = self.yearly_count + for i, (date, lon_values, lat_values, obs_values, count_obs) in enumerate(self.avg_results): + date_obj = datetime.strptime(date, "%Y%m%d%H%M") + title = f"Plot of the number of valid observations - {date_obj}, average {i}" + filename = f"{self.datatype}_2D_obs_count_from_{self.start_date}_to_{self.end_date}_average_{i}.png" + self.plot_2D_num_obs_custom( + lon_values=lon_values, + lat_values=lat_values, + num_obs=count_obs, + outdir=outdir, + title=title, + filename=filename, + ) except Exception as e: - logger.error(f"Error for yearly data: {e}") + logger.error(f"Error plotting all data: {e}") return - title = f"Plot of the number of valid observations for yearly average of {self.year}" - filename = filename or f"{outdir}/{self.datatype}_2D_obs_count_yearly_{self.year}.png" - self.plot_2D_num_obs_custom( - lon_values=lon_values, - lat_values=lat_values, - num_obs=cum_count, - outdir=outdir, - title=title, - filename=filename, - ) + else: + logger.error("Error: months can be either [0] for all data, [1] for averages by frequency.") + return # TODO this can be moved to a plotting function python file diff --git a/mapies/tests/test_func_tools.py b/mapies/tests/test_func_tools.py index caab469a..c69ce980 100644 --- a/mapies/tests/test_func_tools.py +++ b/mapies/tests/test_func_tools.py @@ -4,6 +4,7 @@ import pytest import pandas as pd import xarray as xr import numpy as np +from datetime import datetime from mapies.util.func_tools import time_converter, day_boundaries, time_domain_selection, quality_assurance @@ -49,16 +50,42 @@ def test_quality_assurance(test_input, expected): assert quality_assurance(*test_input)[2].all() == expected[2].all() +# @pytest.mark.parametrize( +# "test_input,expected", +# [ +# ((1, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301010030"), pd.to_datetime("20230101235959").ceil(freq="d"))), +# ((2, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301020000"), pd.to_datetime("20230102235959").ceil(freq="d"))), +# ((3, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301030000"), pd.to_datetime("202301030030"))), + +# ] + +# ) @pytest.mark.parametrize( "test_input,expected", [ - ((1, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301010030"), pd.to_datetime("20230101235959").ceil(freq="d"))), - ((2, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301020000"), pd.to_datetime("20230102235959").ceil(freq="d"))), - ((3, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301030000"), pd.to_datetime("202301030030"))), - + ( + (datetime(2023, 1, 1).date(), pd.to_datetime("2023-01-01 00:30"), pd.to_datetime("2023-01-03 00:30")), + (pd.to_datetime("2023-01-01 00:30"), pd.to_datetime("2023-01-02 00:00")) + ), + ( + (datetime(2023, 1, 2).date(), pd.to_datetime("2023-01-01 00:30"), pd.to_datetime("2023-01-03 00:30")), + (pd.to_datetime("2023-01-02 00:00"), pd.to_datetime("2023-01-03 00:00")) + ), + ( + (datetime(2023, 1, 3).date(), pd.to_datetime("2023-01-01 00:30"), pd.to_datetime("2023-01-03 00:30")), + (pd.to_datetime("2023-01-03 00:00"), pd.to_datetime("2023-01-03 00:30")) + ), + ( + (datetime(2022, 12, 31).date(), pd.to_datetime("2023-01-01 00:00"), pd.to_datetime("2023-01-02 00:00")), + (pd.to_datetime("2023-01-01 00:00"), pd.to_datetime("2023-01-01 00:00")) # both clipped + ), + ( + (datetime(2023, 1, 4).date(), pd.to_datetime("2023-01-01 00:00"), pd.to_datetime("2023-01-03 23:59")), + (pd.to_datetime("2023-01-04 00:00"), pd.to_datetime("2023-01-03 23:59")) # end clipped + ) ] - ) + def test_day_boundaries(test_input, expected): assert day_boundaries(*test_input)[0] == expected[0] assert day_boundaries(*test_input)[1] == expected[1] diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index fb1f20d7..e3d8ed1b 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -253,6 +253,12 @@ def frequency_int(time_interval): frequency = 1 elif time_interval == "D": frequency = 24 + elif time_interval == "W": + frequency = 24*7 + elif time_interval == "M": + frequency = 24*30 + elif time_interval == "Y": + frequency = 24*365 else: frequency = int(time_interval[0]) return frequency diff --git a/mapies/viirs.py b/mapies/viirs.py index 8aed8f6f..f8110ae6 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -144,7 +144,7 @@ class VIIRS(MAPIES): for day, data in self.daily_dict.items(): logger.debug(f"Day {day.strftime('%Y-%m-%d')}: {len(data['files'])} files") - + # ============================================================================= # Supporting functions # ============================================================================= diff --git a/run/run_viirs.py b/run/run_viirs.py index e685df45..8269775e 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -6,15 +6,15 @@ import time if __name__ == "__main__": # Change these variables # 00:30 to 02:36 from 2024-01-01 to 2024-01-03 - start_date = "202401080000" - end_date = "202401082359" + start_date = "202401010000" + end_date = "202401312359" # outdir="/home/cmeikle/Projects/data/" # indir="/home/cmeikle/Projects/data/VIIRS" - outdir="/home/cgile/Documents/mapies/figures" + outdir="/home/cgile/Documents/mapies/figures/averages" indir="/home/cgile/Documents/mapies/VIIRS" # indir="/home/cgile/bscearth000/esarchive/obs/nasa/viirs_noaa20_aerdb_l2/original_files/VIIRS" @@ -26,15 +26,17 @@ if __name__ == "__main__": c.gather_nc_files() - c.process_avg_data(frequency_avg="3H", batch_size = 100, apply_qa=True, save=False) - #c.yearly_average() - # c.plot_2D_observations(months=[0], filename="new.png", outdir=outdir) + c.process_avg_data(frequency_avg="M", batch_size = 100, apply_qa=True, save=False) + c.plot_2D_observations(months=[1], outdir=outdir) # c.plot_2D_num_obs(months=[0], outdir=outdir) # c.process_lazy_data(apply_qa=True, save=False) # c.to_da(frequency="3H", save_figs=True) - - + # print(f"Number of averages: {len(c.avg_results)}") + # for obs, lat, lon, t in c.avg_results: + # # print first 10 elements of each tuple + # print(obs[:10], lat[:10], lon[:10], t[:10]) + # c.avg_results end_time = time.time() elapsed_time = end_time - start_time -- GitLab From 28cd4221596abdda834f893629a8725a2557cec8 Mon Sep 17 00:00:00 2001 From: Carlotta Date: Thu, 3 Apr 2025 17:00:48 +0200 Subject: [PATCH 5/7] updates on average_by_frequency --- run/run_viirs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/run/run_viirs.py b/run/run_viirs.py index 8269775e..b5a5c8fa 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -14,7 +14,7 @@ if __name__ == "__main__": # outdir="/home/cmeikle/Projects/data/" # indir="/home/cmeikle/Projects/data/VIIRS" - outdir="/home/cgile/Documents/mapies/figures/averages" + outdir="/home/cgile/Documents/mapies/figures/averages/1month_1hour" indir="/home/cgile/Documents/mapies/VIIRS" # indir="/home/cgile/bscearth000/esarchive/obs/nasa/viirs_noaa20_aerdb_l2/original_files/VIIRS" @@ -26,8 +26,8 @@ if __name__ == "__main__": c.gather_nc_files() - c.process_avg_data(frequency_avg="M", batch_size = 100, apply_qa=True, save=False) - c.plot_2D_observations(months=[1], outdir=outdir) + c.process_avg_data(frequency_avg="H", batch_size = 100, apply_qa=True, save=False) + # c.plot_2D_observations(months=[1], outdir=outdir) # c.plot_2D_num_obs(months=[0], outdir=outdir) # c.process_lazy_data(apply_qa=True, save=False) -- GitLab From 16bdaf0ad5e3b46c9b18d8052d3c2c1ecba02192 Mon Sep 17 00:00:00 2001 From: Carlotta Date: Mon, 5 May 2025 17:19:54 +0200 Subject: [PATCH 6/7] FInal average by frequency --- mapies/mapies.py | 121 ++++++++++++++++++++------------- mapies/util/multiprocessing.py | 71 +++++++++++++++++++ run/run_viirs.py | 9 ++- 3 files changed, 147 insertions(+), 54 deletions(-) diff --git a/mapies/mapies.py b/mapies/mapies.py index ba058bee..3142354f 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -30,6 +30,52 @@ logger.remove() logger.add(sys.stderr, level="INFO") logger.add("../logs/mapies.log", level="DEBUG") +@logger.catch +def process_single_date_slice(date, daily_dict, grid, start_date, end_date, freq_hours): + logger.debug(f"Processing date slice: {date}") + center_time = datetime.strptime(date, "%Y%m%d%H%M") + l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') + r_border = (datetime.strptime(date, "%Y%m%d%H%M") + timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') + + # Collect data + obs, lon, lat, time_values = np.array([]), np.array([]), np.array([]), np.array([], dtype='datetime64') + days = pd.date_range(l_border, r_border, freq="D") + + valid_results = [] + for pandas_day in days: + day = pandas_day.date() + logger.debug(f"Processing day: {day}") + try: + day_obs = daily_dict[day]["obs"] + day_lon = daily_dict[day]["lon"] + day_lat = daily_dict[day]["lat"] + day_time = daily_dict[day]["time"] + valid_results.append((day_obs, day_lat, day_lon, day_time)) + except KeyError: + continue + + # Concatenate all arrays together + if not valid_results: + return None + obs, lat, lon, time_values = append_and_concatenate(valid_results, day) + if obs is None: + return None # No valid data for this time slice + + # Filter by exact time range + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) + + # Aggregate on the grid + if isinstance(grid, RotatedGrid): + lon, lat, rlon, rlat, obs, count_obs = grid.aggregate(lon, lat, obs, None) + elif isinstance(grid, RegularGrid): + lon, lat, obs, obserr, count_obs = grid.aggregate(lon, lat, obs, None) + else: + raise ValueError("Unsupported grid type") + + final_obs, final_lat, final_lon, final_count = filter_observations(obs, lat, lon, count_obs) + + return (date, final_lon, final_lat, final_obs, final_count, time_values, center_time.date()) + class MAPIES: """ @@ -205,13 +251,10 @@ class MAPIES: self.to_netCDF(self.obs, self.lat, self.lon, mid_time_array, self.start_date, self.end_date) logger.debug(f'Saved processed data to netCDF') + - - def average_by_frequency(self, frequency="D", save_nc=True): - """ - Function to average the observations by frequency - """ - + @logger.catch + def average_by_frequency(self, frequency="D", save_nc=False): freq_hours = frequency_int(frequency) logger.debug(f"Frequency in hours: {freq_hours}") avg_results = [] @@ -219,61 +262,36 @@ class MAPIES: dates_slice = time_slicing(self.start_date, self.end_date, frequency) logger.debug(f"Dates slice: {dates_slice}") - for date in dates_slice: - center_time = datetime.strptime(date, "%Y%m%d%H%M") - l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') - r_border = (datetime.strptime(date, "%Y%m%d%H%M") + timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') - - # Collect data - obs, lon, lat, time_values = np.array([]), np.array([]), np.array([]), np.array([], dtype='datetime64') - days = pd.date_range(l_border, r_border, freq="D") - - valid_results = [] - for pandas_day in days: - day = pandas_day.date() - try: - day_obs = self.daily_dict[day]["obs"] - day_lon = self.daily_dict[day]["lon"] - day_lat = self.daily_dict[day]["lat"] - day_time = self.daily_dict[day]["time"] - valid_results.append((day_obs, day_lat, day_lon, day_time)) - except KeyError: - continue - - # Concatenate all arrays together - if not valid_results: + args = [ + ( + date, + self.daily_dict, + self.grid, + self.start_date, + self.end_date, + freq_hours) + for date in dates_slice + ] + + with multiprocessing.Pool(processes=8) as pool: + results = pool.starmap(process_single_date_slice, args) + + for result in results: + if result is None: continue - obs, lat, lon, time_values = append_and_concatenate(valid_results, day) - if obs is None: - continue # No valid data for this time slice - - # Filter by exact time range - obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) - - # Aggregate on the grid - if isinstance(self.grid, RotatedGrid): - lon, lat, rlon, rlat, obs, count_obs = self.grid.aggregate(lon, lat, obs, None) - elif isinstance(self.grid, RegularGrid): - lon, lat, obs, obserr, count_obs = self.grid.aggregate(lon, lat, obs, None) - else: - raise ValueError("Unsupported grid type") + date, lon, lat, obs, count_obs, time_values, center_time = result # Save result avg_results.append((date, lon, lat, obs, count_obs)) - + # Optionally save NetCDF if save_nc: # Use the center_time date for boundaries day_start, day_end = day_boundaries(center_time.date(), self.start_date, self.end_date) self.to_netCDF(obs, lat, lon, time_values, day_start, day_end) - # Optional: free memory - del obs, lon, lat, time_values, valid_results - - # Store the results in memory if needed self.avg_results = avg_results - def process_data_mp(self, batch_size, month=None): """ Process the data with multiprocessing. @@ -379,6 +397,11 @@ class MAPIES: final_obs, final_lat, final_lon, final_time = append_and_concatenate(valid_results, day) + logger.info(f"Size of obs in daily dict: {sys.getsizeof(final_obs)} bytes") + logger.info(f"Size of lon in daily dict: {sys.getsizeof(final_lon)} bytes") + logger.info(f"Size of lat in daily dict: {sys.getsizeof(final_lat)} bytes") + logger.info(f"Size of time in daily dict: {sys.getsizeof(final_time)} bytes") + # Skip the day if no valid data was processed if final_obs is None: continue diff --git a/mapies/util/multiprocessing.py b/mapies/util/multiprocessing.py index 034fa2be..0268cb8a 100644 --- a/mapies/util/multiprocessing.py +++ b/mapies/util/multiprocessing.py @@ -2,6 +2,7 @@ import numpy as np import pandas as pd import xarray as xr +from datetime import datetime, timedelta from typing import List from abc import ABC, abstractmethod import multiprocessing @@ -108,6 +109,46 @@ def process_single_file_viirs( return obs, lat, lon, time_values +# def process_single_date_slice(date, daily_dict, grid, start_date, end_date, freq_hours): +# center_time = datetime.strptime(date, "%Y%m%d%H%M") +# l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') +# r_border = (datetime.strptime(date, "%Y%m%d%H%M") + timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') + +# # Collect data +# obs, lon, lat, time_values = np.array([]), np.array([]), np.array([]), np.array([], dtype='datetime64') +# days = pd.date_range(l_border, r_border, freq="D") + +# valid_results = [] +# for pandas_day in days: +# day = pandas_day.date() +# try: +# day_obs = daily_dict[day]["obs"] +# day_lon = daily_dict[day]["lon"] +# day_lat = daily_dict[day]["lat"] +# day_time = daily_dict[day]["time"] +# valid_results.append((day_obs, day_lat, day_lon, day_time)) +# except KeyError: +# continue + +# # Concatenate all arrays together +# if not valid_results: +# return None +# obs, lat, lon, time_values = append_and_concatenate(valid_results, day) +# if obs is None: +# return None # No valid data for this time slice + +# # Filter by exact time range +# obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) + +# # Aggregate on the grid +# if isinstance(grid, RotatedGrid): +# lon, lat, rlon, rlat, obs, count_obs = grid.aggregate(lon, lat, obs, None) +# elif isinstance(grid, RegularGrid): +# lon, lat, obs, obserr, count_obs = grid.aggregate(lon, lat, obs, None) +# else: +# raise ValueError("Unsupported grid type") + +# return (date, lon, lat, obs, count_obs, time_values, center_time.date()) class GenericMultiProcessing(ABC): """ @@ -117,6 +158,7 @@ class GenericMultiProcessing(ABC): @abstractmethod def process_batch(self): pass + @dataclass @@ -173,6 +215,33 @@ class VIIRSMultiprocessing(GenericMultiProcessing): failed_files= [file for file, res in zip(self.batch, results) if res is None] return valid_results, failed_files + + + # def average_by_frequency_mp(self, frequency="D", save_nc=False): + + # freq_hours = frequency_int(frequency) + # logger.debug(f"Frequency in hours: {freq_hours}") + # avg_results = [] + + # dates_slice = time_slicing(self.start_date, self.end_date, frequency) + # logger.debug(f"Dates slice: {dates_slice}") + + # args = [ + # ( + # date, + # self.daily_dict, + # self.r, + # self.start_date, + # self.end_date, + # freq_hours) + # for date in dates_slice + # ] + + # with multiprocessing.Pool(processes=8) as pool: + # results = pool.starmap(process_single_date_slice, args) + + # return results + @dataclass @@ -228,3 +297,5 @@ class TROPOMIMultiprocessing(GenericMultiProcessing): failed_files= [file for file, res in zip(self.batch, results) if res is None] return valid_results, failed_files + + diff --git a/run/run_viirs.py b/run/run_viirs.py index b5a5c8fa..68c8b886 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -19,24 +19,23 @@ if __name__ == "__main__": # indir="/home/cgile/bscearth000/esarchive/obs/nasa/viirs_noaa20_aerdb_l2/original_files/VIIRS" - start_time = time.time() c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="bdrc") c.gather_nc_files() - c.process_avg_data(frequency_avg="H", batch_size = 100, apply_qa=True, save=False) - # c.plot_2D_observations(months=[1], outdir=outdir) + c.process_avg_data(frequency_avg="1H", batch_size = 100, apply_qa=True, save=False) + c.plot_2D_observations(months=[1], outdir=outdir) # c.plot_2D_num_obs(months=[0], outdir=outdir) # c.process_lazy_data(apply_qa=True, save=False) # c.to_da(frequency="3H", save_figs=True) - # print(f"Number of averages: {len(c.avg_results)}") + print(f"Number of averages: {len(c.avg_results)}") # for obs, lat, lon, t in c.avg_results: # # print first 10 elements of each tuple # print(obs[:10], lat[:10], lon[:10], t[:10]) - # c.avg_results + c.avg_results end_time = time.time() elapsed_time = end_time - start_time -- GitLab From fd96631573e0f59343cf7ca9e7e48b6caad57471 Mon Sep 17 00:00:00 2001 From: Carlotta Date: Tue, 6 May 2025 16:29:27 +0200 Subject: [PATCH 7/7] Updated tests --- mapies/mapies.py | 1 - mapies/tests/test_viirs.py | 70 +++++++++++++++++++------------------- run/run_viirs.py | 43 ++++++++++++++++++----- 3 files changed, 70 insertions(+), 44 deletions(-) diff --git a/mapies/mapies.py b/mapies/mapies.py index 3142354f..0a44144f 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -281,7 +281,6 @@ class MAPIES: continue date, lon, lat, obs, count_obs, time_values, center_time = result - # Save result avg_results.append((date, lon, lat, obs, count_obs)) # Optionally save NetCDF diff --git a/mapies/tests/test_viirs.py b/mapies/tests/test_viirs.py index cea73943..51737798 100644 --- a/mapies/tests/test_viirs.py +++ b/mapies/tests/test_viirs.py @@ -21,13 +21,14 @@ def test_to_da(): assert outfiles == [Path("obs202401310000.nc")] @pytest.mark.parametrize( - "monthly_avg, apply_qa", - [(True, True), - (True, False), - (False, True), - (False, False)], + "frequency_avg, apply_qa", + [("3H", True), + ("3H", False), + (None, True), + (None, False), + ], ) -def test_process_avg_data(monthly_avg, apply_qa): +def test_process_avg_data(frequency_avg, apply_qa): start_date = "202401310000" end_date = "202401312359" outdir = "./" @@ -37,35 +38,29 @@ def test_process_avg_data(monthly_avg, apply_qa): c.gather_nc_files() - c.process_avg_data(monthly_avg=monthly_avg, batch_size=100, apply_qa=apply_qa, save=False) - - month_key = 1 - final_lon = c.monthly_dict[month_key]["lon"] - final_lat = c.monthly_dict[month_key]["lat"] - final_obs = c.monthly_dict[month_key]["obs"] - cumulative_count = c.monthly_dict[month_key]["count"] + c.process_avg_data(frequency_avg=frequency_avg, batch_size=100, apply_qa=apply_qa, save=False) expected_values = { - (True, True): { - "obs": np.array([0.15000001, 0.16919138, 0.15477157]), - "lat": np.array([-9.30731487, -9.29670143, -9.13449192]), - "lon": np.array([-18.8669529, -18.80527687, -18.71993446]), - "count_obs": np.array([1, 1, 1]), + ("3H", True): { + "obs": np.array([1.32284749, 0.05722538, 0.06578986]), + "lat": np.array([28.73104858, 30.67773438, 30.59019089]), + "lon": np.array([80.22098541, 80.8727417, 80.99289703]), + "count_obs": np.array([1, 1, 2]), }, - (True, False): { - "obs": np.array([0.13365674, 0.13539954, 0.11358637]), - "lat": np.array([-9.26055002, -9.24970277, -9.14501572]), - "lon": np.array([-18.91040993, -18.81873067, -18.78282547]), - "count_obs": np.array([2, 3, 1]), + ("3H", False): { + "obs": np.array([1.14879799, 1.17684265, 1.02754712]), + "lat": np.array([25.38115501, 25.45448812, 25.67737961]), + "lon": np.array([78.80477142, 78.77971903, 78.68408966]), + "count_obs": np.array([1, 3, 1]), }, - (False, True): { + (None, True): { "obs": np.array([0.15000001, 0.16919138, 0.15477157]), "lat": np.array([-9.30731487, -9.29670143, -9.13449192]), "lon": np.array([-18.8669529, -18.80527687, -18.71993446]), "count_obs": np.array([1, 1, 1]), }, - (False, False): { + (None, False): { "obs": np.array([0.13365674, 0.13539954, 0.11358637]), "lat": np.array([-9.26055002, -9.24970277, -9.14501572]), "lon": np.array([-18.91040993, -18.81873067, -18.78282547]), @@ -73,22 +68,27 @@ def test_process_avg_data(monthly_avg, apply_qa): } } - if monthly_avg: - non_zero_indeces = np.nonzero(final_obs)[0] - obs_values = final_obs[non_zero_indeces[:3]] - lat_values = final_lat[non_zero_indeces[:3]] - lon_values = final_lon[non_zero_indeces[:3]] - count_obs_values = cumulative_count[non_zero_indeces[:3]] + if frequency_avg is not None: + obs_values = c.avg_results[1][3][:3] + lat_values = c.avg_results[1][2][:3] + lon_values = c.avg_results[1][1][:3] + count_obs_values = c.avg_results[1][4][:3] else: obs_values = c.obs[:3] lat_values = c.lat[:3] lon_values = c.lon[:3] count_obs_values = c.count_obs[:3] - expected_obs = expected_values[(monthly_avg, apply_qa)]["obs"] - expected_lat = expected_values[(monthly_avg, apply_qa)]["lat"] - expected_lon = expected_values[(monthly_avg, apply_qa)]["lon"] - expected_count_obs = expected_values[(monthly_avg, apply_qa)]["count_obs"] + expected_obs = expected_values[(frequency_avg, apply_qa)]["obs"] + expected_lat = expected_values[(frequency_avg, apply_qa)]["lat"] + expected_lon = expected_values[(frequency_avg, apply_qa)]["lon"] + expected_count_obs = expected_values[(frequency_avg, apply_qa)]["count_obs"] + + print(f"\nACTUAL [{frequency_avg}, {apply_qa}]") + print("OBS:", obs_values) + print("LAT:", lat_values) + print("LON:", lon_values) + print("COUNT:", count_obs_values) np.testing.assert_array_almost_equal(obs_values, expected_obs) np.testing.assert_array_almost_equal(lat_values, expected_lat) diff --git a/run/run_viirs.py b/run/run_viirs.py index 68c8b886..051689eb 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -6,7 +6,7 @@ import time if __name__ == "__main__": # Change these variables # 00:30 to 02:36 from 2024-01-01 to 2024-01-03 - start_date = "202401010000" + start_date = "202401310000" end_date = "202401312359" @@ -21,22 +21,49 @@ if __name__ == "__main__": start_time = time.time() - c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="bdrc") + c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="rotated") c.gather_nc_files() + frequency_avg = "3H" - c.process_avg_data(frequency_avg="1H", batch_size = 100, apply_qa=True, save=False) - c.plot_2D_observations(months=[1], outdir=outdir) + + c.process_avg_data(frequency_avg=frequency_avg, batch_size = 100, apply_qa=False, save=False) + # c.plot_2D_observations(months=[1], outdir=outdir) # c.plot_2D_num_obs(months=[0], outdir=outdir) # c.process_lazy_data(apply_qa=True, save=False) # c.to_da(frequency="3H", save_figs=True) - print(f"Number of averages: {len(c.avg_results)}") - # for obs, lat, lon, t in c.avg_results: + # print(f"Number of averages: {len(c.avg_results)}") + # for date, lon, lat, obs, count_obs in c.avg_results: # # print first 10 elements of each tuple - # print(obs[:10], lat[:10], lon[:10], t[:10]) - c.avg_results + # print(f"Date: {date}") + # print(f"Lon: {lon[:10]}") + # print(f"Lat: {lat[:10]}") + # print(f"Obs: {obs[:10]}") + # print(f"Count: {count_obs[:10]}") + # c.avg_results + if frequency_avg == "3H": + print("3H averages") + obs_values = c.avg_results[1][3][:3] + lat_values = c.avg_results[1][2][:3] + lon_values = c.avg_results[1][1][:3] + count_obs_values = c.avg_results[1][4][:3] + print(f"Obs: {obs_values}") + print(f"Lat: {lat_values}") + print(f"Lon: {lon_values}") + print(f"Count: {count_obs_values}") + else: + print("Normal average") + obs_values = c.obs[:3] + lat_values = c.lat[:3] + lon_values = c.lon[:3] + count_obs_values = c.count_obs[:3] + print(f"Obs: {obs_values}") + print(f"Lat: {lat_values}") + print(f"Lon: {lon_values}") + print(f"Count: {count_obs_values}") + end_time = time.time() elapsed_time = end_time - start_time print(f"Script executed in {elapsed_time:.2f} seconds.") -- GitLab