diff --git a/mapies/mapies.py b/mapies/mapies.py index 5ba6a8f50bc0eebf7bcb5a01b7945255a3726775..7508c2671296c14dbb81d580fd40125b09c4284f 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # MAPIES base -from datetime import datetime +from datetime import datetime, timedelta from mapies.util.func_tools import * from mapies.util.plotting import MapiesPlotter @@ -13,6 +13,7 @@ import pandas as pd import xarray as xr import matplotlib.pyplot as plt import cartopy.crs as ccrs +from collections import defaultdict import os import sys import yaml @@ -31,6 +32,52 @@ logger.remove() logger.add(sys.stderr, level="INFO") logger.add("../logs/mapies.log", level="DEBUG") +@logger.catch +def process_single_date_slice(date, daily_dict, grid, start_date, end_date, freq_hours): + logger.debug(f"Processing date slice: {date}") + center_time = datetime.strptime(date, "%Y%m%d%H%M") + l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') + r_border = (datetime.strptime(date, "%Y%m%d%H%M") + timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') + + # Collect data + obs, lon, lat, time_values = np.array([]), np.array([]), np.array([]), np.array([], dtype='datetime64') + days = pd.date_range(l_border, r_border, freq="D") + + valid_results = [] + for pandas_day in days: + day = pandas_day.date() + logger.debug(f"Processing day: {day}") + try: + day_obs = daily_dict[day]["obs"] + day_lon = daily_dict[day]["lon"] + day_lat = daily_dict[day]["lat"] + day_time = daily_dict[day]["time"] + valid_results.append((day_obs, day_lat, day_lon, day_time)) + except KeyError: + continue + + # Concatenate all arrays together + if not valid_results: + return None + obs, lat, lon, time_values = append_and_concatenate(valid_results, day) + if obs is None: + return None # No valid data for this time slice + + # Filter by exact time range + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) + + # Aggregate on the grid + if isinstance(grid, RotatedGrid): + lon, lat, rlon, rlat, obs, count_obs = grid.aggregate(lon, lat, obs, None) + elif isinstance(grid, RegularGrid): + lon, lat, obs, obserr, count_obs = grid.aggregate(lon, lat, obs, None) + else: + raise ValueError("Unsupported grid type") + + final_obs, final_lat, final_lon, final_count = filter_observations(obs, lat, lon, count_obs) + + return (date, final_lon, final_lat, final_obs, final_count, time_values, center_time.date()) + class MAPIES: """ @@ -117,12 +164,11 @@ class MAPIES: self.qualityFlags = qa_dict["qa_variables"] - @timeit @logger.catch def process_avg_data( self, - monthly_avg: bool = 0, + frequency_avg: str | None = None, min_num_obs: int = 0, batch_size: int = 100, save: bool = False, @@ -151,35 +197,13 @@ class MAPIES: if geospatial_crop is not None: logger.info(f"Applying Geospatial cropping: {geospatial_crop}") self.geospatial_crop = np.array(geospatial_crop) - if monthly_avg: - for month in self.monthly_dict.keys(): - - # Catch if they haven't selected a grid - try: - logger.info(f"Running average on {self.grid}") - except AttributeError: - raise Exception("Please specify a grid in the initialisation of MAPIES, using grid_repr=''") - - # Process the data for the current month - final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( - batch_size=batch_size, month=month - ) - - self.monthly_dict[month] = { - "lon": final_lon, - "lat": final_lat, - "obs": final_obs, - "count": cumulative_count, - } - logger.debug(f'Updated monthly dictionary for month {month}') - logger.debug(f'Lon shape: {final_lon.shape}, Lat shape: {final_lat.shape}, Obs shape: {final_obs.shape}') - if np.all(final_lon == 0) or np.all(np.isnan(final_lon)): - logger.warning(f"All longitude values for month {month} are 0 or NaN.") - if np.all(final_lat == 0) or np.all(np.isnan(final_lat)): - logger.warning(f"All latitude values for month {month} are 0 or NaN.") - if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): - logger.warning(f"All observation values for month {month} are 0 or NaN.") - + if frequency_avg is not None: + logger.info(f"Running frequency-based averaging with frequency='{frequency_avg}'") + # self.build_frequency_dict(frequency=frequency_avg) + # self.process_lazy_data2(apply_qa=apply_qa, geospatial_crop=geospatial_crop, save=False) + # self.average_by_frequency2(frequency=frequency_avg, save_nc=save) + self.process_lazy_data(apply_qa=apply_qa, geospatial_crop=geospatial_crop, save=False) + self.average_by_frequency(frequency=frequency_avg, save_nc=save) else: final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( batch_size=batch_size, month=None @@ -193,8 +217,45 @@ class MAPIES: self.to_netCDF(self.obs, self.lat, self.lon, mid_time_array, self.start_date, self.end_date) logger.debug(f'Saved processed data to netCDF') + + + @logger.catch + def average_by_frequency(self, frequency="D", save_nc=False): + freq_hours = frequency_int(frequency) + logger.debug(f"Frequency in hours: {freq_hours}") + avg_results = [] + + dates_slice = time_slicing(self.start_date, self.end_date, frequency) + logger.debug(f"Dates slice: {dates_slice}") + + args = [ + ( + date, + self.daily_dict, + self.grid, + self.start_date, + self.end_date, + freq_hours) + for date in dates_slice + ] + + with multiprocessing.Pool(processes=8) as pool: + results = pool.starmap(process_single_date_slice, args) + + for result in results: + if result is None: + continue + date, lon, lat, obs, count_obs, time_values, center_time = result + avg_results.append((date, lon, lat, obs, count_obs)) + + # Optionally save NetCDF + if save_nc: + # Use the center_time date for boundaries + day_start, day_end = day_boundaries(center_time.date(), self.start_date, self.end_date) + self.to_netCDF(obs, lat, lon, time_values, day_start, day_end) + self.avg_results = avg_results def process_data_mp( self, @@ -275,51 +336,6 @@ class MAPIES: return final_lon, final_lat, final_obs, cumulative_count - @timeit - @logger.catch - def yearly_average(self): - """ - Compute the yearly average for the processed data. - """ - cumulative_obs = None - cumulative_lat = None - cumulative_lon = None - cumulative_count = None - - for key, month_data in self.monthly_dict.items(): - if not all(var in month_data for var in ["lon", "lat", "obs", "count"]): - logger.warning(f"Skipping month {key}: Missing data.") - continue - if any(month_data[var] is None for var in ["lon", "lat", "obs", "count"]): - logger.warning(f"Skipping month {key}: No valid data.") - continue - - lon_values = month_data["lon"] - lat_values = month_data["lat"] - obs_values = month_data["obs"] - count_values = month_data["count"] - - results = [(obs_values, lat_values, lon_values, count_values)] - cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count = combine_arrays(results, cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count) - - if cumulative_obs is None or cumulative_count is None or np.all(cumulative_count == 0): - logger.warning("No valid data for the entire year. Returning empty arrays.") - self.yearly_lon = None - self.yearly_lat = None - self.yearly_obs = None - self.yearly_count = None - return - - yearly_obs, yearly_lat, yearly_lon = compute_final_averages(cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count) - - self.yearly_lon = yearly_lon - self.yearly_lat = yearly_lat - self.yearly_obs = yearly_obs - self.yearly_count = cumulative_count - - logger.debug("Yearly average computation completed.") - - @timeit @logger.catch def process_lazy_data( @@ -362,6 +378,11 @@ class MAPIES: final_obs, final_lat, final_lon, final_time = append_and_concatenate(valid_results, day) + logger.info(f"Size of obs in daily dict: {sys.getsizeof(final_obs)} bytes") + logger.info(f"Size of lon in daily dict: {sys.getsizeof(final_lon)} bytes") + logger.info(f"Size of lat in daily dict: {sys.getsizeof(final_lat)} bytes") + logger.info(f"Size of time in daily dict: {sys.getsizeof(final_time)} bytes") + # Skip the day if no valid data was processed if final_obs is None: continue @@ -382,6 +403,57 @@ class MAPIES: if save: self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) + @timeit + @logger.catch + def process_lazy_data2(self, apply_qa=True, geospatial_crop: NDArray | None = None, save=True): + """ + Process the data for the specified time range. It only retrieves a daily dictionary using dask lazy loading. + """ + flag = True + #Check if the user wants qa flags + if apply_qa: + self.apply_qa = True + else: + self.apply_qa = False + if geospatial_crop is not None: + self.geospatial_crop = np.array(geospatial_crop) + for dict_key in self.frequency_dict.keys(): + files = self.frequency_dict[dict_key]["files"] + + # Use multiprocessing to process files within the batch + + viirs_mp = self.mp_type( + self.grid, + self.obs_var, + self.lat_var, + self.lon_var, + self.time_var, + self.start_date, + self.end_date, + self.apply_qa, + self.qualityFlags, + files, + self.geospatial_crop, + flag, + ) + valid_results, failed_files = viirs_mp.process_batch() + + final_obs, final_lat, final_lon, final_time = append_and_concatenate(valid_results, dict_key) + + # Skip the day if no valid data was processed + if final_obs is None: + continue + + # Check if any valid data exists + if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): + logger.warning(f"All observation values for day {dict_key} are 0 or NaN. Skipping...") + continue + + # Update day dict + self.frequency_dict[dict_key]["obs"] = final_obs + self.frequency_dict[dict_key]["lon"] = final_lon + self.frequency_dict[dict_key]["lat"] = final_lat + self.frequency_dict[dict_key]["time"] = final_time @logger.catch def to_netCDF( @@ -476,44 +548,28 @@ class MAPIES: filename=filename, ) return - elif months is not None: - for m in months: - try: - month_data = self.monthly_dict[m] - lon_values = month_data["lon"] - lat_values = month_data["lat"] - obs_values = month_data["obs"] - except Exception as e: - logger.error(f"Error for month {m}: {e}") - continue - title = f"Observation 2D plot of month {m} from {self.start_date} to {self.end_date}" - filename = filename or f"{self.datatype}_2D_obs_month_{m}_year_{self.year}.png" - self.plotter.plot_2D_obs_custom( - lon_values=lon_values, - lat_values=lat_values, - obs_values=obs_values, - outdir=outdir, - title=title, - filename=filename, - ) - else: + elif months == [1]: try: - lon_values = self.yearly_lon - lat_values = self.yearly_lat - obs_values = self.yearly_obs + for i, (date, lon_values, lat_values, obs_values, count_obs) in enumerate(self.avg_results): + date_obj = datetime.strptime(date, "%Y%m%d%H%M") + title = f"Observation 2D plot - {date_obj}, average {i}" + filename = f"{self.datatype}_2D_obs_from_{self.start_date}_to_{self.end_date}_average_{i}.png" + + self.plot_2D_obs_custom( + lon_values=lon_values, + lat_values=lat_values, + obs_values=obs_values, + outdir=outdir, + title=title, + filename=filename, + ) except Exception as e: - logger.error(f"Error for yearly data: {e}") + logger.error(f"Error plotting all data: {e}") return - title = f"Observation 2D plot of yearly average of year {self.year}" - filename = filename or f"{self.datatype}_2D_yearly_obs_{self.year}.png" - self.plotter.plot_2D_obs_custom( - lon_values=lon_values, - lat_values=lat_values, - obs_values=obs_values, - outdir=outdir, - title=title, - filename=filename, - ) + else: + logger.error("Error: months can be either [0] for all data, [1] for averages by frequency.") + return + def plot_2D_num_obs( @@ -545,45 +601,28 @@ class MAPIES: filename=filename, ) return - elif months is not None: - for m in months: - try: - month_data = self.monthly_dict[m] - lon_values = month_data["lon"] - lat_values = month_data["lat"] - cum_count = month_data["count"] - except Exception as e: - logger.error(f"Error for month {m}: {e}") - continue - title = f"Plot of the number of valid observations for month {m} from {self.start_date} to {self.end_date}" - filename = filename or f"{self.datatype}_2D_obs_count_month_{m}_year_{self.year}.png" - self.plotter.plot_2D_num_obs_custom( - lon_values=lon_values, - lat_values=lat_values, - num_obs=cum_count, - outdir=outdir, - title=title, - filename=filename, - ) - else: + + elif months == [1]: try: - lon_values = self.yearly_lon - lat_values = self.yearly_lat - cum_count = self.yearly_count + for i, (date, lon_values, lat_values, obs_values, count_obs) in enumerate(self.avg_results): + date_obj = datetime.strptime(date, "%Y%m%d%H%M") + title = f"Plot of the number of valid observations - {date_obj}, average {i}" + filename = f"{self.datatype}_2D_obs_count_from_{self.start_date}_to_{self.end_date}_average_{i}.png" + self.plot_2D_num_obs_custom( + lon_values=lon_values, + lat_values=lat_values, + num_obs=count_obs, + outdir=outdir, + title=title, + filename=filename, + ) except Exception as e: - logger.error(f"Error for yearly data: {e}") + logger.error(f"Error plotting all data: {e}") return - title = f"Plot of the number of valid observations for yearly average of {self.year}" - filename = filename or f"{outdir}/{self.datatype}_2D_obs_count_yearly_{self.year}.png" - self.plotter.plot_2D_num_obs_custom( - lon_values=lon_values, - lat_values=lat_values, - num_obs=cum_count, - outdir=outdir, - title=title, - filename=filename, - ) + else: + logger.error("Error: months can be either [0] for all data, [1] for averages by frequency.") + return @staticmethod def to_xarray( diff --git a/mapies/tests/test_func_tools.py b/mapies/tests/test_func_tools.py index caab469a87ed5a1d3c2d0e9a3da9c1a9cf621822..c69ce980c095a6d02ca2b88c6160a56817a85d14 100644 --- a/mapies/tests/test_func_tools.py +++ b/mapies/tests/test_func_tools.py @@ -4,6 +4,7 @@ import pytest import pandas as pd import xarray as xr import numpy as np +from datetime import datetime from mapies.util.func_tools import time_converter, day_boundaries, time_domain_selection, quality_assurance @@ -49,16 +50,42 @@ def test_quality_assurance(test_input, expected): assert quality_assurance(*test_input)[2].all() == expected[2].all() +# @pytest.mark.parametrize( +# "test_input,expected", +# [ +# ((1, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301010030"), pd.to_datetime("20230101235959").ceil(freq="d"))), +# ((2, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301020000"), pd.to_datetime("20230102235959").ceil(freq="d"))), +# ((3, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301030000"), pd.to_datetime("202301030030"))), + +# ] + +# ) @pytest.mark.parametrize( "test_input,expected", [ - ((1, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301010030"), pd.to_datetime("20230101235959").ceil(freq="d"))), - ((2, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301020000"), pd.to_datetime("20230102235959").ceil(freq="d"))), - ((3, pd.to_datetime("202301010030"), pd.to_datetime("202301030030")), (pd.to_datetime("202301030000"), pd.to_datetime("202301030030"))), - + ( + (datetime(2023, 1, 1).date(), pd.to_datetime("2023-01-01 00:30"), pd.to_datetime("2023-01-03 00:30")), + (pd.to_datetime("2023-01-01 00:30"), pd.to_datetime("2023-01-02 00:00")) + ), + ( + (datetime(2023, 1, 2).date(), pd.to_datetime("2023-01-01 00:30"), pd.to_datetime("2023-01-03 00:30")), + (pd.to_datetime("2023-01-02 00:00"), pd.to_datetime("2023-01-03 00:00")) + ), + ( + (datetime(2023, 1, 3).date(), pd.to_datetime("2023-01-01 00:30"), pd.to_datetime("2023-01-03 00:30")), + (pd.to_datetime("2023-01-03 00:00"), pd.to_datetime("2023-01-03 00:30")) + ), + ( + (datetime(2022, 12, 31).date(), pd.to_datetime("2023-01-01 00:00"), pd.to_datetime("2023-01-02 00:00")), + (pd.to_datetime("2023-01-01 00:00"), pd.to_datetime("2023-01-01 00:00")) # both clipped + ), + ( + (datetime(2023, 1, 4).date(), pd.to_datetime("2023-01-01 00:00"), pd.to_datetime("2023-01-03 23:59")), + (pd.to_datetime("2023-01-04 00:00"), pd.to_datetime("2023-01-03 23:59")) # end clipped + ) ] - ) + def test_day_boundaries(test_input, expected): assert day_boundaries(*test_input)[0] == expected[0] assert day_boundaries(*test_input)[1] == expected[1] diff --git a/mapies/tests/test_viirs.py b/mapies/tests/test_viirs.py index cea739430ac499c1247180e55ff01e88141b4b92..51737798202d25f78228e1075c447fb68edc7574 100644 --- a/mapies/tests/test_viirs.py +++ b/mapies/tests/test_viirs.py @@ -21,13 +21,14 @@ def test_to_da(): assert outfiles == [Path("obs202401310000.nc")] @pytest.mark.parametrize( - "monthly_avg, apply_qa", - [(True, True), - (True, False), - (False, True), - (False, False)], + "frequency_avg, apply_qa", + [("3H", True), + ("3H", False), + (None, True), + (None, False), + ], ) -def test_process_avg_data(monthly_avg, apply_qa): +def test_process_avg_data(frequency_avg, apply_qa): start_date = "202401310000" end_date = "202401312359" outdir = "./" @@ -37,35 +38,29 @@ def test_process_avg_data(monthly_avg, apply_qa): c.gather_nc_files() - c.process_avg_data(monthly_avg=monthly_avg, batch_size=100, apply_qa=apply_qa, save=False) - - month_key = 1 - final_lon = c.monthly_dict[month_key]["lon"] - final_lat = c.monthly_dict[month_key]["lat"] - final_obs = c.monthly_dict[month_key]["obs"] - cumulative_count = c.monthly_dict[month_key]["count"] + c.process_avg_data(frequency_avg=frequency_avg, batch_size=100, apply_qa=apply_qa, save=False) expected_values = { - (True, True): { - "obs": np.array([0.15000001, 0.16919138, 0.15477157]), - "lat": np.array([-9.30731487, -9.29670143, -9.13449192]), - "lon": np.array([-18.8669529, -18.80527687, -18.71993446]), - "count_obs": np.array([1, 1, 1]), + ("3H", True): { + "obs": np.array([1.32284749, 0.05722538, 0.06578986]), + "lat": np.array([28.73104858, 30.67773438, 30.59019089]), + "lon": np.array([80.22098541, 80.8727417, 80.99289703]), + "count_obs": np.array([1, 1, 2]), }, - (True, False): { - "obs": np.array([0.13365674, 0.13539954, 0.11358637]), - "lat": np.array([-9.26055002, -9.24970277, -9.14501572]), - "lon": np.array([-18.91040993, -18.81873067, -18.78282547]), - "count_obs": np.array([2, 3, 1]), + ("3H", False): { + "obs": np.array([1.14879799, 1.17684265, 1.02754712]), + "lat": np.array([25.38115501, 25.45448812, 25.67737961]), + "lon": np.array([78.80477142, 78.77971903, 78.68408966]), + "count_obs": np.array([1, 3, 1]), }, - (False, True): { + (None, True): { "obs": np.array([0.15000001, 0.16919138, 0.15477157]), "lat": np.array([-9.30731487, -9.29670143, -9.13449192]), "lon": np.array([-18.8669529, -18.80527687, -18.71993446]), "count_obs": np.array([1, 1, 1]), }, - (False, False): { + (None, False): { "obs": np.array([0.13365674, 0.13539954, 0.11358637]), "lat": np.array([-9.26055002, -9.24970277, -9.14501572]), "lon": np.array([-18.91040993, -18.81873067, -18.78282547]), @@ -73,22 +68,27 @@ def test_process_avg_data(monthly_avg, apply_qa): } } - if monthly_avg: - non_zero_indeces = np.nonzero(final_obs)[0] - obs_values = final_obs[non_zero_indeces[:3]] - lat_values = final_lat[non_zero_indeces[:3]] - lon_values = final_lon[non_zero_indeces[:3]] - count_obs_values = cumulative_count[non_zero_indeces[:3]] + if frequency_avg is not None: + obs_values = c.avg_results[1][3][:3] + lat_values = c.avg_results[1][2][:3] + lon_values = c.avg_results[1][1][:3] + count_obs_values = c.avg_results[1][4][:3] else: obs_values = c.obs[:3] lat_values = c.lat[:3] lon_values = c.lon[:3] count_obs_values = c.count_obs[:3] - expected_obs = expected_values[(monthly_avg, apply_qa)]["obs"] - expected_lat = expected_values[(monthly_avg, apply_qa)]["lat"] - expected_lon = expected_values[(monthly_avg, apply_qa)]["lon"] - expected_count_obs = expected_values[(monthly_avg, apply_qa)]["count_obs"] + expected_obs = expected_values[(frequency_avg, apply_qa)]["obs"] + expected_lat = expected_values[(frequency_avg, apply_qa)]["lat"] + expected_lon = expected_values[(frequency_avg, apply_qa)]["lon"] + expected_count_obs = expected_values[(frequency_avg, apply_qa)]["count_obs"] + + print(f"\nACTUAL [{frequency_avg}, {apply_qa}]") + print("OBS:", obs_values) + print("LAT:", lat_values) + print("LON:", lon_values) + print("COUNT:", count_obs_values) np.testing.assert_array_almost_equal(obs_values, expected_obs) np.testing.assert_array_almost_equal(lat_values, expected_lat) diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index 553b295131ca4446bd2273872ed3a68e1d46e147..3b7ff0ab560a528de7b4218f2a666cd9aec33732 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -271,6 +271,12 @@ def frequency_int(time_interval:str) -> int: frequency = 1 elif time_interval == "D": frequency = 24 + elif time_interval == "W": + frequency = 24*7 + elif time_interval == "M": + frequency = 24*30 + elif time_interval == "Y": + frequency = 24*365 else: frequency = int(time_interval[0]) return frequency @@ -376,9 +382,11 @@ def day_boundaries( If not return the start and end date as the boundaries """ from datetime import datetime, timedelta, time - date = start_date + timedelta(days=day) - timedelta(days=1) - day_start = date.floor(freq="d") - day_end = (date + timedelta(days=1)).floor(freq="d") + # date = start_date + timedelta(days=day) - timedelta(days=1) + # day_start = date.floor(freq="d") + # day_end = (date + timedelta(days=1)).floor(freq="d") + day_start = datetime.combine(day, datetime.min.time()) + day_end = day_start + timedelta(days=1) # If day_start or day_end are outside the regions of start_date and end_date # Return those instead @@ -550,3 +558,15 @@ def filter_observations( return final_obs, final_lat, final_lon, final_count + +def time_slicing(start_date, end_date, frequency): + """ + Time slicing function + """ + + date_slices = pd.date_range(start_date, end_date, freq=frequency).strftime('%Y%m%d%H%M') + + return date_slices + + + \ No newline at end of file diff --git a/mapies/util/multiprocessing.py b/mapies/util/multiprocessing.py index ad0d3a49d431fa87ea74814e2de9e136af04cc3e..b65d3554faf715158eb4a16c714870d4273853b3 100644 --- a/mapies/util/multiprocessing.py +++ b/mapies/util/multiprocessing.py @@ -2,6 +2,7 @@ import numpy as np import pandas as pd import xarray as xr +from datetime import datetime, timedelta from typing import List from abc import ABC, abstractmethod import multiprocessing @@ -108,6 +109,46 @@ def process_single_file_viirs( return obs, lat, lon, time_values +# def process_single_date_slice(date, daily_dict, grid, start_date, end_date, freq_hours): +# center_time = datetime.strptime(date, "%Y%m%d%H%M") +# l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') +# r_border = (datetime.strptime(date, "%Y%m%d%H%M") + timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') + +# # Collect data +# obs, lon, lat, time_values = np.array([]), np.array([]), np.array([]), np.array([], dtype='datetime64') +# days = pd.date_range(l_border, r_border, freq="D") + +# valid_results = [] +# for pandas_day in days: +# day = pandas_day.date() +# try: +# day_obs = daily_dict[day]["obs"] +# day_lon = daily_dict[day]["lon"] +# day_lat = daily_dict[day]["lat"] +# day_time = daily_dict[day]["time"] +# valid_results.append((day_obs, day_lat, day_lon, day_time)) +# except KeyError: +# continue + +# # Concatenate all arrays together +# if not valid_results: +# return None +# obs, lat, lon, time_values = append_and_concatenate(valid_results, day) +# if obs is None: +# return None # No valid data for this time slice + +# # Filter by exact time range +# obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) + +# # Aggregate on the grid +# if isinstance(grid, RotatedGrid): +# lon, lat, rlon, rlat, obs, count_obs = grid.aggregate(lon, lat, obs, None) +# elif isinstance(grid, RegularGrid): +# lon, lat, obs, obserr, count_obs = grid.aggregate(lon, lat, obs, None) +# else: +# raise ValueError("Unsupported grid type") + +# return (date, lon, lat, obs, count_obs, time_values, center_time.date()) class GenericMultiProcessing(ABC): """ @@ -117,6 +158,7 @@ class GenericMultiProcessing(ABC): @abstractmethod def process_batch(self): pass + @dataclass @@ -173,6 +215,33 @@ class VIIRSMultiprocessing(GenericMultiProcessing): failed_files= [file for file, res in zip(self.batch, results) if res is None] return valid_results, failed_files + + + # def average_by_frequency_mp(self, frequency="D", save_nc=False): + + # freq_hours = frequency_int(frequency) + # logger.debug(f"Frequency in hours: {freq_hours}") + # avg_results = [] + + # dates_slice = time_slicing(self.start_date, self.end_date, frequency) + # logger.debug(f"Dates slice: {dates_slice}") + + # args = [ + # ( + # date, + # self.daily_dict, + # self.r, + # self.start_date, + # self.end_date, + # freq_hours) + # for date in dates_slice + # ] + + # with multiprocessing.Pool(processes=8) as pool: + # results = pool.starmap(process_single_date_slice, args) + + # return results + @dataclass @@ -228,3 +297,5 @@ class TROPOMIMultiprocessing(GenericMultiProcessing): failed_files= [file for file, res in zip(self.batch, results) if res is None] return valid_results, failed_files + + diff --git a/mapies/viirs.py b/mapies/viirs.py index 5887af70d2a06b73751f5ddfc198a9ec47ba0106..462fdbfc05ef7b43e66412cbb6ef9e13a32b27e3 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -47,6 +47,7 @@ class VIIRS(MAPIES): self.end_day = int(end_date[6:8]) self.monthly_dict = {} self.daily_dict = {} # Dictionary to produce daily netcdf files + self.avg_results = [] # List to store averaged results (when averaging by frequency) self.dates_slice = pd.date_range( self.start_date, @@ -116,6 +117,7 @@ class VIIRS(MAPIES): parts = file.split('.') julian_day = parts[1][1:] date = datetime.strptime(julian_day, "%Y%j") + full_date = date.date() # Get the full date for the daily_dict keys if date.month not in self.monthly_dict: self.monthly_dict[date.month] = { "files": [], # List to store file paths @@ -124,8 +126,8 @@ class VIIRS(MAPIES): "obs": None, # Observations array "count": None, # Count of observations used to compute the average } - if date.day not in self.daily_dict: - self.daily_dict[date.day] = { + if full_date not in self.daily_dict: + self.daily_dict[full_date] = { "files": [], # List to store file paths "lon": None, # Longitudes array "lat": None, # Latitudes array @@ -134,7 +136,7 @@ class VIIRS(MAPIES): } self.monthly_dict[date.month]["files"].append(file) - self.daily_dict[date.day]["files"].append(file) + self.daily_dict[full_date]["files"].append(file) logger.debug(f"Discovered files for months: {list(self.monthly_dict.keys())}") for month, data in self.monthly_dict.items(): @@ -143,9 +145,9 @@ class VIIRS(MAPIES): logger.debug(f"Discovered files for days: {list(self.daily_dict.keys())}") for day, data in self.daily_dict.items(): - logger.debug(f"Day {day:02d}: {len(data['files'])} files") - + logger.debug(f"Day {day.strftime('%Y-%m-%d')}: {len(data['files'])} files") + # ============================================================================= # Supporting functions # ============================================================================= @@ -183,7 +185,7 @@ class VIIRS(MAPIES): time_values = np.array([], dtype='datetime64') days = pd.date_range(l_border, r_border, freq="D") for pandas_day in days: - day = pandas_day.day + day = pandas_day.date() try: day_obs = self.daily_dict[day]["obs"] day_lon = self.daily_dict[day]["lon"] @@ -268,4 +270,7 @@ class VIIRS(MAPIES): ds.to_netcdf(filename, encoding={}) outfiles.append(filename) - return outfiles \ No newline at end of file + return outfiles + + + \ No newline at end of file diff --git a/run/run_viirs.py b/run/run_viirs.py index af39004ec0d631c5c10c0503a49c00a3c116d1b4..26b08e0ab5fa13a6e7d003ee13de536d839b662a 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -16,7 +16,7 @@ if __name__ == "__main__": start_time = time.time() - c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="bdrc") + c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="rotated") c.gather_nc_files() c.process_avg_data(monthly_avg=False, min_num_obs = 20, save=True)