From 08b9a94e7771cd2734b8d8f3934b1f7858c176ea Mon Sep 17 00:00:00 2001 From: Carlotta Date: Wed, 7 May 2025 16:12:47 +0200 Subject: [PATCH 1/2] new method to save netC --- mapies/mapies.py | 86 ++++++++++++++++++++++++++++++++++++++++-------- run/run_viirs.py | 18 +++++----- 2 files changed, 80 insertions(+), 24 deletions(-) diff --git a/mapies/mapies.py b/mapies/mapies.py index 7508c267..e38a90ab 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -27,6 +27,7 @@ from numpy.typing import DTypeLike, NDArray from loguru import logger from tqdm import tqdm +from scipy.interpolate import griddata logger.remove() logger.add(sys.stderr, level="INFO") @@ -206,7 +207,7 @@ class MAPIES: self.average_by_frequency(frequency=frequency_avg, save_nc=save) else: final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( - batch_size=batch_size, month=None + batch_size=batch_size ) self.obs, self.lat, self.lon, self.count_obs = filter_observations(final_obs, final_lat, final_lon, cumulative_count) @@ -215,7 +216,7 @@ class MAPIES: if save: mid_time_array = generate_mid_time_array(self.start_date, self.end_date, self.obs) - self.to_netCDF(self.obs, self.lat, self.lon, mid_time_array, self.start_date, self.end_date) + self.to_netCDF_for_ncview(self.obs, self.lat, self.lon, mid_time_array, self.start_date, self.end_date) logger.debug(f'Saved processed data to netCDF') @@ -260,20 +261,15 @@ class MAPIES: def process_data_mp( self, batch_size: int, - month: int | None =None ): """ Process the data with multiprocessing. """ flag = False - if month is None: - month_files = self.files - logger.info(f'Processing all files from {self.start_date} to {self.end_date}') - else: - month_files = self.monthly_dict[month]["files"] - logger.info(f"Processing {len(month_files)} files for month {month}") - - + + month_files = self.files + logger.info(f'Processing all files from {self.start_date} to {self.end_date}') + # Split files into batches batches = create_batches(month_files, batch_size) @@ -318,7 +314,7 @@ class MAPIES: # Check if any valid data was processed if cumulative_obs is None or cumulative_count is None: - logger.warning(f"No valid data processed for month {month}. Returning empty arrays.") + logger.warning(f"No valid data. Returning empty arrays.") return None, None, None, None # Compute final averages @@ -331,7 +327,7 @@ class MAPIES: cumulative_count = cumulative_count[min_num_counts] - logger.debug(f"Completed processing for month {month}.") + logger.debug(f"Completed processing.") return final_lon, final_lat, final_obs, cumulative_count @@ -401,7 +397,7 @@ class MAPIES: # Get day start and day end but only if they are within object start and end date day_start, day_end = day_boundaries(day, self.start_date, self.end_date) if save: - self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) + self.to_netCDF_for_ncview(final_obs, final_lat, final_lon, final_time, day_start, day_end) @timeit @logger.catch @@ -513,7 +509,69 @@ class MAPIES: logger.info(f"Saved data to {filename}") + @logger.catch + def to_netCDF_for_ncview( + self, + obs: NDArray | None = None, + lat: NDArray | None = None, + lon: NDArray | None = None, + time: NDArray | None = None, + start_time: pd.Timestamp | None = None, + end_time: pd.Timestamp | None = None, + ): + """ + Function to return the start MAPIES output as a netcdf + """ + + # Convert time to seconds since TIME_ORIGIN + final_time_seconds = (time - self.time_orig) / np.timedelta64(1, "s") + final_time_seconds = np.round(final_time_seconds, 0) + + grid_representation_str = json.dumps(self.grid_config, ensure_ascii=False) + qa_flags_str = json.dumps(self.qualityFlags, ensure_ascii=False) + + dlot = dlat = 0.1 + + lon_min, lon_max = np.floor(lon.min()), np.ceil(lon.max()) + lat_min, lat_max = np.floor(lat.min()), np.ceil(lat.max()) + grid_lon = np.arange(lon_min, lon_max + dlot, dlot) + grid_lat = np.arange(lat_min, lat_max + dlat, dlat) + + grid_lon2d, grid_lat2d = np.meshgrid(grid_lon, grid_lat) + + obs_grid = griddata( + points=(lat, lon), + values=obs, + xi=(grid_lat2d, grid_lon2d), + method='nearest' + ) + + # Create xarray Dataset + ds = xr.Dataset( + coords={ + "lat": ("lat", grid_lat2d[:,0].astype("float32")), + "lon": ("lon", grid_lon2d[0,:].astype("float32")), + }, + + data_vars={ + VARS_ATTRIBUTES[self.obs_var]["mapies_variable"]: (["lat", "lon"], obs_grid, VARS_ATTRIBUTES[self.obs_var].copy()), + }, + attrs={ + "title": f"{VARS_ATTRIBUTES['title']['description']} from {start_time} to {end_time}", + "institution": "Barcelona Supercomputing Center", + "grid": f"{self.grid_repr}: {grid_representation_str}", + "Developers": "MAPIES team", + "QA": f"Quality Assurance: {self.apply_qa}, quality assurance applied: {qa_flags_str}", + "history": f"Created {datetime.now()}", + "Extra Filters Applied": f"{json.dumps(self.filters, ensure_ascii=False)}" + }, + ) + + # Save to NetCDF file + filename = f"{self.dest}/Data_{self.datatype}_{start_time}_{end_time}_ncview.nc" + ds.to_netcdf(filename, mode='w', format="NETCDF4_CLASSIC") + logger.info(f"Saved data ncview-compatible NetCDF to {filename}") # ============================================================================= # Plotting diff --git a/run/run_viirs.py b/run/run_viirs.py index 26b08e0a..717a9393 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -7,28 +7,26 @@ if __name__ == "__main__": # Change these variables # 00:30 to 02:36 from 2024-01-01 to 2024-01-03 start_date = "202401010000" - end_date = "202401312359" + end_date = "202401012359" - outdir="/home/cmeikle/Projects/data/" - indir="/home/cmeikle/Projects/data/VIIRS" + outdir="/home/cgile/Documents/mapies/figures/averages/ncview" + indir="/home/cgile/Documents/mapies/VIIRS" start_time = time.time() - c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="rotated") + c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="regular") c.gather_nc_files() - c.process_avg_data(monthly_avg=False, min_num_obs = 20, save=True) - c.plot_2D_observations(months=[0], filename="month_without_min_obs_and_qa_applied.png", outdir=outdir) - c.plot_2D_num_obs(months=[0], outdir=outdir) - #c.process_lazy_data(apply_qa=True, save=False) + c.process_avg_data(frequency_avg=None, min_num_obs = 20, batch_size = 100, save=True, apply_qa = True, geospatial_crop = None) + # c.plot_2D_observations(months=[0], filename="ncview_trial.png", outdir=outdir) + # c.plot_2D_num_obs(months=[0], outdir=outdir) + # c.process_lazy_data(apply_qa=True, save=True) #c.to_da(frequency="D", min_num_obs=2, save_figs=True) - - end_time = time.time() elapsed_time = end_time - start_time print(f"Script executed in {elapsed_time:.2f} seconds.") -- GitLab From 7614d25ab6fdb69a2b726fcf87b9fe8a662ee534 Mon Sep 17 00:00:00 2001 From: Carlotta Date: Tue, 10 Jun 2025 17:07:04 +0200 Subject: [PATCH 2/2] Final changes in netcdf files --- mapies/config/satellite_config.yaml | 4 +- mapies/grids/base_grid.py | 2 + mapies/grids/regular.py | 32 +++-- mapies/grids/rotated.py | 64 +++++++-- mapies/mapies.py | 208 +++++++++------------------- mapies/util/func_tools.py | 161 +++++++++++++++++++++ mapies/util/multiprocessing.py | 73 +--------- mapies/util/plotting.py | 2 +- run/run_viirs.py | 19 ++- 9 files changed, 312 insertions(+), 253 deletions(-) diff --git a/mapies/config/satellite_config.yaml b/mapies/config/satellite_config.yaml index f2d04bb3..b1e31c61 100644 --- a/mapies/config/satellite_config.yaml +++ b/mapies/config/satellite_config.yaml @@ -52,8 +52,8 @@ grids: regular: west: -180 south: -90 - dlon: 0.1 - dlat: 0.1 + dlon: 0.2 + dlat: 0.2 #nlon: 3600 #nlat: 1800 cams: diff --git a/mapies/grids/base_grid.py b/mapies/grids/base_grid.py index 2e78183d..b0c0cbc0 100644 --- a/mapies/grids/base_grid.py +++ b/mapies/grids/base_grid.py @@ -49,6 +49,8 @@ class Grid: self.center_latitudes = np.linspace(self.cenlat, self.cenlat + self.dlat * (self.nlat) - self.dlat, self.nlat, dtype=float) self.center_longitudes = np.linspace(self.cenlon, self.cenlon + self.dlon * (self.nlon) - self.dlat, self.nlon, dtype=float) + self.grid_lon2d, self.grid_lat2d = np.meshgrid(self.center_longitudes, self.center_latitudes) + counter = 0 self.coords = np.empty((self.center_latitudes.shape[0] * self.center_longitudes.shape[0], 2)) for i in range(self.center_longitudes.shape[0]): diff --git a/mapies/grids/regular.py b/mapies/grids/regular.py index a03bc045..085c0a77 100644 --- a/mapies/grids/regular.py +++ b/mapies/grids/regular.py @@ -27,7 +27,8 @@ class RegularGrid(Grid): lon: NDArray, lat: NDArray, obs: NDArray, - obserr: NDArray | None = None + obserr: NDArray | None = None, + flattening: bool = True ): """ Aggregate the observations to the grid cells @@ -86,16 +87,21 @@ class RegularGrid(Grid): if aggregated_obs_err is not None: aggregated_obs_err[non_zero_counts] /= count_obs[non_zero_counts] - - # TODO: Add a function that takes the center or aggreagated value - # Return the results - lon_out = aggregated_lon.flatten() #self.center_longitudes - lat_out = aggregated_lat.flatten() #self.center_latitudes - obs_out = aggregated_obs.flatten() - count_obs = count_obs.flatten() - - if aggregated_obs_err is not None: - obs_err_out = aggregated_obs_err.flatten() - return lon_out, lat_out, obs_out, obs_err_out, count_obs + if flattening: + # TODO: Add a function that takes the center or aggreagated value + # Return the results + lon_out = aggregated_lon.flatten() #self.center_longitudes + lat_out = aggregated_lat.flatten() #self.center_latitudes + obs_out = aggregated_obs.flatten() + count_obs = count_obs.flatten() + + if aggregated_obs_err is not None: + obs_err_out = aggregated_obs_err.flatten() + return lon_out, lat_out, obs_out, obs_err_out, count_obs + else: + return lon_out, lat_out, obs_out, count_obs else: - return lon_out, lat_out, obs_out, count_obs + if aggregated_obs_err is not None: + return self.grid_lon2d, self.grid_lat2d, aggregated_obs, aggregated_obs_err, count_obs + else: + return self.grid_lon2d, self.grid_lat2d, aggregated_obs, count_obs diff --git a/mapies/grids/rotated.py b/mapies/grids/rotated.py index c6847103..6c0801ce 100644 --- a/mapies/grids/rotated.py +++ b/mapies/grids/rotated.py @@ -11,6 +11,8 @@ from mapies.util.func_tools import * from mapies.grids.base_grid import Grid import logging +from cartopy.crs import RotatedPole, PlateCarree + def geo_to_rot( lons: NDArray, @@ -39,6 +41,26 @@ def geo_to_rot( rlat = np.degrees(rlat) return rlon, rlat +def rot_to_geo_cartopy(rlon, rlat, centre_lon, centre_lat): + """ + Convert rotated pole coordinates to geographic lat/lon using cartopy's built-in transform. + """ + + # Define rotated pole projection + rotated_crs = RotatedPole(pole_latitude=90-centre_lat, pole_longitude=-180+centre_lon) + geographic_crs = PlateCarree() + + # Ensure inputs are 2D + rlon2d, rlat2d = np.meshgrid(rlon, rlat) + + # Transform to geographic coordinates + transformed = geographic_crs.transform_points(rotated_crs, rlon2d, rlat2d) + + lon = transformed[..., 0] + lat = transformed[..., 1] + + return lat, lon + @dataclass class RotatedGrid(Grid): @@ -58,7 +80,8 @@ class RotatedGrid(Grid): lon: NDArray, lat: NDArray, obs: NDArray, - obserr: NDArray | None = None + obserr: NDArray | None = None, + flattening: bool = True ): """ Aggregate the observations to the grid cells @@ -72,6 +95,13 @@ class RotatedGrid(Grid): # Perform the rotation rlon, rlat = geo_to_rot(lon, lat, centre_lon=self.centre_lon, centre_lat=self.centre_lat) + # Print max and min values for debugging + # print(f"Rotated lon range: {np.nanmin(rlon)} to {np.nanmax(rlon)}") + # print(f"Rotated lat range: {np.nanmin(rlat)} to {np.nanmax(rlat)}") + # print(f"Original lon range: {np.nanmin(lon)} to {np.nanmax(lon)}") + # print(f"Original lat range: {np.nanmin(lat)} to {np.nanmax(lat)}") + + # Make sure rlon and rlat are within the borders of the grid # Filter out values that are outside the grid bounds mask = (rlon >= self.west) & (rlon <= self.west + (self.dlon * (self.nlon - 1))) & \ @@ -88,7 +118,6 @@ class RotatedGrid(Grid): if obserr is not None: obserr = obserr[mask] - # Vectorized calculation of grid indices rlon_idx = np.searchsorted(self.center_longitudes, rlon) - 1 # Index of longitude grid rlat_idx = np.searchsorted(self.center_latitudes, rlat) - 1 # Index of latitude grid @@ -135,19 +164,26 @@ class RotatedGrid(Grid): aggregated_obs_err[non_zero_counts] /= count_obs[non_zero_counts] # Return the results - lon_out = aggregated_lon.flatten() - lat_out = aggregated_lat.flatten() - rlon_out = aggregated_lon.flatten() #self.center_longitudes - rlat_out = aggregated_lat.flatten() #self.center_latitudess - obs_out = aggregated_obs.flatten() - count_obs = count_obs.flatten() - - - if aggregated_obs_err is not None: - obs_err_out = aggregated_obs_err.flatten() - return lon_out, lat_out, rlon_out, rlat_out, obs_out, obs_err_out, count_obs + if flattening: + lon_out = aggregated_lon.flatten() + lat_out = aggregated_lat.flatten() + rlon_out = aggregated_lon.flatten() #self.center_longitudes + rlat_out = aggregated_lat.flatten() #self.center_latitudess + obs_out = aggregated_obs.flatten() + count_obs = count_obs.flatten() + if aggregated_obs_err is not None: + obs_err_out = aggregated_obs_err.flatten() + return lon_out, lat_out, rlon_out, rlat_out, obs_out, obs_err_out, count_obs + else: + return lon_out, lat_out, rlon_out, rlat_out, obs_out, count_obs else: - return lon_out, lat_out, rlon_out, rlat_out, obs_out, count_obs + if aggregated_obs_err is not None: + return self.grid_lon2d, self.grid_lat2d, aggregated_obs, aggregated_obs_err, count_obs + else: + LON2D, LAT2D = rot_to_geo_cartopy(self.center_longitudes, self.center_latitudes, self.centre_lon, self.centre_lat) + return aggregated_obs, LON2D, LAT2D, count_obs, self.center_latitudes, self.center_longitudes, self.centre_lat, self.centre_lon + + @dataclass diff --git a/mapies/mapies.py b/mapies/mapies.py index e38a90ab..833a10b4 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -34,7 +34,7 @@ logger.add(sys.stderr, level="INFO") logger.add("../logs/mapies.log", level="DEBUG") @logger.catch -def process_single_date_slice(date, daily_dict, grid, start_date, end_date, freq_hours): +def process_single_date_slice(date, daily_dict, grid, start_date, end_date, freq_hours, save=False, outdir="./"): logger.debug(f"Processing date slice: {date}") center_time = datetime.strptime(date, "%Y%m%d%H%M") l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') @@ -69,16 +69,35 @@ def process_single_date_slice(date, daily_dict, grid, start_date, end_date, freq # Aggregate on the grid if isinstance(grid, RotatedGrid): - lon, lat, rlon, rlat, obs, count_obs = grid.aggregate(lon, lat, obs, None) + if save: + obs2D, lat2D, lon2D, count2D, rlat, rlon, centre_lon, centre_lat = grid.aggregate(lon, lat, obs, None, flattening=False) + + if obs2D is not None and not np.all(np.isnan(obs2D)) and not np.all(obs2D == 0): + logger.debug(f"Saving data to NetCDF for date: {date}") + + day_start, day_end = day_boundaries(center_time, start_date, end_date) + to_netCDF_gridded_rotated(obs2D, lat2D, lon2D, count2D, rlat, rlon, date, centre_lon, centre_lat, outdir) + else: + lon, lat, rlon_out, rlat_out, obs, count_obs = grid.aggregate(lon, lat, obs, None, flattening=True) + final_obs, final_lat, final_lon, final_count = filter_observations(obs, lat, lon, count_obs) + return (date, final_lon, final_lat, final_obs, final_count, time_values, center_time.date()) + elif isinstance(grid, RegularGrid): - lon, lat, obs, obserr, count_obs = grid.aggregate(lon, lat, obs, None) + if save: + lon2D, lat2D, obs2D, count2D = grid.aggregate(lon, lat, obs, None, flattening=False) + + if obs2D is not None and not np.all(np.isnan(obs2D)) and not np.all(obs2D == 0): + logger.debug(f"Saving data to NetCDF for date: {date}") + + day_start, day_end = day_boundaries(center_time, start_date, end_date) + to_netCDF_gridded_regular(obs2D, lat2D, lon2D, count2D, time_values, day_start, day_end, date, outdir) + else: + lon, lat, obs, count_obs = grid.aggregate(lon, lat, obs, None, flattening=True) + final_obs, final_lat, final_lon, final_count = filter_observations(obs, lat, lon, count_obs) + return (date, final_lon, final_lat, final_obs, final_count, time_values, center_time.date()) else: raise ValueError("Unsupported grid type") - final_obs, final_lat, final_lon, final_count = filter_observations(obs, lat, lon, count_obs) - - return (date, final_lon, final_lat, final_obs, final_count, time_values, center_time.date()) - class MAPIES: """ @@ -200,14 +219,11 @@ class MAPIES: self.geospatial_crop = np.array(geospatial_crop) if frequency_avg is not None: logger.info(f"Running frequency-based averaging with frequency='{frequency_avg}'") - # self.build_frequency_dict(frequency=frequency_avg) - # self.process_lazy_data2(apply_qa=apply_qa, geospatial_crop=geospatial_crop, save=False) - # self.average_by_frequency2(frequency=frequency_avg, save_nc=save) self.process_lazy_data(apply_qa=apply_qa, geospatial_crop=geospatial_crop, save=False) self.average_by_frequency(frequency=frequency_avg, save_nc=save) else: final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( - batch_size=batch_size + batch_size=batch_size, month=None ) self.obs, self.lat, self.lon, self.count_obs = filter_observations(final_obs, final_lat, final_lon, cumulative_count) @@ -216,7 +232,7 @@ class MAPIES: if save: mid_time_array = generate_mid_time_array(self.start_date, self.end_date, self.obs) - self.to_netCDF_for_ncview(self.obs, self.lat, self.lon, mid_time_array, self.start_date, self.end_date) + self.to_netCDF(self.obs, self.lat, self.lon, mid_time_array, self.start_date, self.end_date) logger.debug(f'Saved processed data to netCDF') @@ -228,7 +244,8 @@ class MAPIES: dates_slice = time_slicing(self.start_date, self.end_date, frequency) logger.debug(f"Dates slice: {dates_slice}") - + save = save_nc + outdir = self.dest args = [ ( date, @@ -236,13 +253,17 @@ class MAPIES: self.grid, self.start_date, self.end_date, - freq_hours) + freq_hours, + save, + outdir) for date in dates_slice ] with multiprocessing.Pool(processes=8) as pool: results = pool.starmap(process_single_date_slice, args) + # return (date, final_lon, final_lat, final_obs, final_count, final_time_values, center_time.date()) + for result in results: if result is None: continue @@ -250,25 +271,36 @@ class MAPIES: avg_results.append((date, lon, lat, obs, count_obs)) - # Optionally save NetCDF - if save_nc: - # Use the center_time date for boundaries - day_start, day_end = day_boundaries(center_time.date(), self.start_date, self.end_date) - self.to_netCDF(obs, lat, lon, time_values, day_start, day_end) + # # Optionally save NetCDF + # if save_nc: + # if obs is not None and len(obs) > 0: + # # Save the data to NetCDF + # logger.debug(f"Saving data to NetCDF for date: {date}") + # # Use the center_time date for boundaries + # day_start, day_end = day_boundaries(center_time, self.start_date, self.end_date) + # self.to_netCDF_for_ncview(obs, lat, lon, time_values, day_start, day_end, n) + # n += 1 self.avg_results = avg_results def process_data_mp( self, batch_size: int, + month: int | None = None, ): """ Process the data with multiprocessing. """ flag = False + + logger.info(f"Processing data from {self.start_date} to {self.end_date}, grid: {self.grid_repr}, qa: {self.apply_qa}, geospatial crop: {self.geospatial_crop}") - month_files = self.files - logger.info(f'Processing all files from {self.start_date} to {self.end_date}') + if month is not None: + month_files = self.files + logger.info(f'Processing all files from {self.start_date} to {self.end_date}') + else: + month_files = self.files + logger.info(f'Processing all files from {self.start_date} to {self.end_date}') # Split files into batches batches = create_batches(month_files, batch_size) @@ -374,11 +406,6 @@ class MAPIES: final_obs, final_lat, final_lon, final_time = append_and_concatenate(valid_results, day) - logger.info(f"Size of obs in daily dict: {sys.getsizeof(final_obs)} bytes") - logger.info(f"Size of lon in daily dict: {sys.getsizeof(final_lon)} bytes") - logger.info(f"Size of lat in daily dict: {sys.getsizeof(final_lat)} bytes") - logger.info(f"Size of time in daily dict: {sys.getsizeof(final_time)} bytes") - # Skip the day if no valid data was processed if final_obs is None: continue @@ -397,59 +424,8 @@ class MAPIES: # Get day start and day end but only if they are within object start and end date day_start, day_end = day_boundaries(day, self.start_date, self.end_date) if save: - self.to_netCDF_for_ncview(final_obs, final_lat, final_lon, final_time, day_start, day_end) - - @timeit - @logger.catch - def process_lazy_data2(self, apply_qa=True, geospatial_crop: NDArray | None = None, save=True): - """ - Process the data for the specified time range. It only retrieves a daily dictionary using dask lazy loading. - """ - flag = True - #Check if the user wants qa flags - if apply_qa: - self.apply_qa = True - else: - self.apply_qa = False - if geospatial_crop is not None: - self.geospatial_crop = np.array(geospatial_crop) - for dict_key in self.frequency_dict.keys(): - files = self.frequency_dict[dict_key]["files"] + self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) - # Use multiprocessing to process files within the batch - - viirs_mp = self.mp_type( - self.grid, - self.obs_var, - self.lat_var, - self.lon_var, - self.time_var, - self.start_date, - self.end_date, - self.apply_qa, - self.qualityFlags, - files, - self.geospatial_crop, - flag, - ) - valid_results, failed_files = viirs_mp.process_batch() - - final_obs, final_lat, final_lon, final_time = append_and_concatenate(valid_results, dict_key) - - # Skip the day if no valid data was processed - if final_obs is None: - continue - - # Check if any valid data exists - if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): - logger.warning(f"All observation values for day {dict_key} are 0 or NaN. Skipping...") - continue - - # Update day dict - self.frequency_dict[dict_key]["obs"] = final_obs - self.frequency_dict[dict_key]["lon"] = final_lon - self.frequency_dict[dict_key]["lat"] = final_lat - self.frequency_dict[dict_key]["time"] = final_time @logger.catch def to_netCDF( @@ -509,69 +485,6 @@ class MAPIES: logger.info(f"Saved data to {filename}") - @logger.catch - def to_netCDF_for_ncview( - self, - obs: NDArray | None = None, - lat: NDArray | None = None, - lon: NDArray | None = None, - time: NDArray | None = None, - start_time: pd.Timestamp | None = None, - end_time: pd.Timestamp | None = None, - ): - """ - Function to return the start MAPIES output as a netcdf - """ - - # Convert time to seconds since TIME_ORIGIN - final_time_seconds = (time - self.time_orig) / np.timedelta64(1, "s") - final_time_seconds = np.round(final_time_seconds, 0) - - grid_representation_str = json.dumps(self.grid_config, ensure_ascii=False) - qa_flags_str = json.dumps(self.qualityFlags, ensure_ascii=False) - - dlot = dlat = 0.1 - - lon_min, lon_max = np.floor(lon.min()), np.ceil(lon.max()) - lat_min, lat_max = np.floor(lat.min()), np.ceil(lat.max()) - grid_lon = np.arange(lon_min, lon_max + dlot, dlot) - grid_lat = np.arange(lat_min, lat_max + dlat, dlat) - - grid_lon2d, grid_lat2d = np.meshgrid(grid_lon, grid_lat) - - obs_grid = griddata( - points=(lat, lon), - values=obs, - xi=(grid_lat2d, grid_lon2d), - method='nearest' - ) - - # Create xarray Dataset - ds = xr.Dataset( - coords={ - "lat": ("lat", grid_lat2d[:,0].astype("float32")), - "lon": ("lon", grid_lon2d[0,:].astype("float32")), - }, - - data_vars={ - VARS_ATTRIBUTES[self.obs_var]["mapies_variable"]: (["lat", "lon"], obs_grid, VARS_ATTRIBUTES[self.obs_var].copy()), - }, - attrs={ - "title": f"{VARS_ATTRIBUTES['title']['description']} from {start_time} to {end_time}", - "institution": "Barcelona Supercomputing Center", - "grid": f"{self.grid_repr}: {grid_representation_str}", - "Developers": "MAPIES team", - "QA": f"Quality Assurance: {self.apply_qa}, quality assurance applied: {qa_flags_str}", - "history": f"Created {datetime.now()}", - "Extra Filters Applied": f"{json.dumps(self.filters, ensure_ascii=False)}" - }, - ) - - # Save to NetCDF file - filename = f"{self.dest}/Data_{self.datatype}_{start_time}_{end_time}_ncview.nc" - ds.to_netcdf(filename, mode='w', format="NETCDF4_CLASSIC") - - logger.info(f"Saved data ncview-compatible NetCDF to {filename}") # ============================================================================= # Plotting @@ -609,11 +522,14 @@ class MAPIES: elif months == [1]: try: for i, (date, lon_values, lat_values, obs_values, count_obs) in enumerate(self.avg_results): + # if obs_values is None or len(obs_values) == 0: + # continue + print(f"Plotting date: {date}, average {i}") date_obj = datetime.strptime(date, "%Y%m%d%H%M") title = f"Observation 2D plot - {date_obj}, average {i}" - filename = f"{self.datatype}_2D_obs_from_{self.start_date}_to_{self.end_date}_average_{i}.png" + filename = f"{self.datatype}_2D_obs_from_{self.start_date}_to_{self.end_date}_{date}_average_{i}.png" - self.plot_2D_obs_custom( + self.plotter.plot_2D_obs_custom( lon_values=lon_values, lat_values=lat_values, obs_values=obs_values, @@ -649,7 +565,7 @@ class MAPIES: logger.error(f"Error plotting all data: {e}") return title = f"Plot of the number of valid observations from {self.start_date} to {self.end_date}" - filename = filename or f"{self.datatype}_2D_obs_count_from_{self.start_date}_to_{self.end_date}.png" + filename = f"{self.datatype}_2D_obs_count_from_{self.start_date}_to_{self.end_date}.png" self.plotter.plot_2D_num_obs_custom( lon_values=lon_values, lat_values=lat_values, @@ -665,8 +581,8 @@ class MAPIES: for i, (date, lon_values, lat_values, obs_values, count_obs) in enumerate(self.avg_results): date_obj = datetime.strptime(date, "%Y%m%d%H%M") title = f"Plot of the number of valid observations - {date_obj}, average {i}" - filename = f"{self.datatype}_2D_obs_count_from_{self.start_date}_to_{self.end_date}_average_{i}.png" - self.plot_2D_num_obs_custom( + filename = filename or f"{self.datatype}_2D_obs_count_from_{self.start_date}_to_{self.end_date}_{date}_average_{i}.png" + self.plotter.plot_2D_num_obs_custom( lon_values=lon_values, lat_values=lat_values, num_obs=count_obs, diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index 3b7ff0ab..73caae3e 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -13,6 +13,10 @@ from numpy.typing import DTypeLike, NDArray from datetime import datetime import multiprocessing from loguru import logger +import json +import os + +from mapies.util.variables_names import VARS_ATTRIBUTES @@ -569,4 +573,161 @@ def time_slicing(start_date, end_date, frequency): return date_slices +def to_netCDF_gridded_regular( + obs: NDArray | None = None, + lat: NDArray | None = None, + lon: NDArray | None = None, + count: NDArray | None = None, + time: NDArray | None = None, + start_time: pd.Timestamp | None = None, + end_time: pd.Timestamp | None = None, + date: NDArray | None = None, + outdir: str = "./", + ): + """ + Function to return the start MAPIES output as a netcdf compatible with ncview, Panoply + """ + + obs_grid = obs.astype("float32") + obs_grid[count == 0] = np.nan + obs_grid = np.ma.masked_invalid(obs_grid) + + fill_value = -9999.0 + obs_filled = np.ma.filled(obs_grid, fill_value) + + obs_filled = obs_filled[np.newaxis,:,:] + + grid_lat2d = lat.astype("float32") + grid_lon2d = lon.astype("float32") + + grid_lat = grid_lat2d[:, 0] # assume regular lat along rows + grid_lon = grid_lon2d[0, :] + + if not np.allclose(grid_lat2d, grid_lat2d[:,0][:, np.newaxis]): + raise ValueError("Latitude grid is not regular") + if not np.allclose(grid_lon2d, grid_lon2d[0,:][np.newaxis, :]): + raise ValueError("Longitude grid is not regular") + + print(f"Type of start and end time: {type(start_time)}, {type(end_time)}") + + # Convert date to a datetime object + if isinstance(date, str): + t = pd.to_datetime(date, format="%Y%m%d%H%M") + + ds = xr.Dataset( + coords={ + "time": ("time", [t]), + "lat": ("lat", grid_lat), + "lon": ("lon", grid_lon), + }, + + data_vars={ + "aod550": (["time", "lat", "lon"], obs_filled, { + "units": "1", + "long_name": "Aerosol_Optical_Thickness_550_Land_Ocean_Best_Estimate", + "standard_name": "Deep Blue/SOAR aerosol optical thickness at 550 nm over land and ocean", + "coordinates": "Time Latitude Longitude", + "_FillValue": fill_value, + "missing_value": fill_value, + }), + }, + attrs={ + "title": f"aod550 averaged around {date}", + "institution": "Barcelona Supercomputing Center", + # "grid": f"{grid_repr}: {grid_representation_str}", + "Developers": "MAPIES team", + # "QA": f"Quality Assurance: {apply_qa}, quality assurance applied: {qa_flags_str}", + "history": f"Created {datetime.now()}", + # "Extra Filters Applied": f"{json.dumps(filters, ensure_ascii=False)}" + }, + ) + + # Save to NetCDF file + filename = f"regular_{date}.nc" + filepath = os.path.join(outdir, filename) + ds.to_netcdf(filepath, mode='w', format="NETCDF4_CLASSIC") + + logger.info(f"Saved data ncview-compatible NetCDF to {filename}") + + +def to_netCDF_gridded_rotated( + obs: NDArray | None = None, + lat: NDArray | None = None, + lon: NDArray | None = None, + count: NDArray | None = None, + rlat: NDArray | None = None, + rlon: NDArray | None = None, + date: NDArray | None = None, + centre_lat: float = None, + centre_lon: float = None, + outdir: str = "./", +): + + obs_grid = obs.astype("float32") + obs_grid[count==0] = np.nan + obs_grid = np.ma.masked_invalid(obs_grid) + fill_values = -9999.0 + obs_filled = np.ma.filled(obs_grid, fill_values) + + t = pd.to_datetime(date, format="%Y%m%d%H%M") + + obs_filled = obs_filled[np.newaxis,:,:] + + ds = xr.Dataset( + coords={ + "time": ("time", [t]), + "rlat": ("rlat", rlat, { + "units": "0.0174532925199433 rad", + "standard_name": "grid_latitude", + "long_name": "latitude in rotated pole grid", + }), + "rlon": ("rlon", rlon, { + "units": "0.0174532925199433 rad", + "standard_name": "grid_longitude", + "long_name": "longitude in rotated pole grid", + }), + }, + data_vars={ + "lat": (["rlat", "rlon"], lat, { + "units": "degrees_north", + "axis": "Y", + "long_name": "Latitude coordinate", + "standard_name": "latitude", + }), + "lon": (["rlat", "rlon"], lon, { + "units": "degrees_east", + "axis": "X", + "long_name": "Longitude coordinate", + "standard_name": "longitude", + }), + "aod550": (["time", "rlat", "rlon"], obs_filled, { + "grid_mapping": "rotated_pole", + "units": "1", + "long_name": "Aerosol_Optical_Thickness_550_Land_Ocean_Best_Estimate", + "standard_name": "Deep Blue/SOAR aerosol optical thickness at 550 nm over land and ocean", + "coordinates": "Time lat lon", + "_FillValue": fill_values, + "missing_value": fill_values, + }), + "rotated_pole": ((), 0, { + "grid_mapping_name": "rotated_latitude_longitude", + "grid_north_pole_latitude": centre_lat, + "grid_north_pole_longitude": centre_lon, + }) + }, + attrs={"title": f"aod550 averaged around {date}", + "institution": "Barcelona Supercomputing Center", + # "grid": f"{grid_repr}: {grid_representation_str}", + "Developers": "MAPIES team", + # "QA": f"Quality Assurance: {apply_qa}, quality assurance applied: {qa_flags_str}", + "history": f"Created {datetime.now()}", + # "Extra Filters Applied": f"{json.dumps(filters, ensure_ascii=False)}"} + } + ) + filename = f"rotated_{date}.nc" + filepath = os.path.join(outdir, filename) + ds.to_netcdf(filepath, mode='w', format='NETCDF4_CLASSIC') + + print(f"Saved CF-compliant rotated grid NetCDF to {filename}") + \ No newline at end of file diff --git a/mapies/util/multiprocessing.py b/mapies/util/multiprocessing.py index b65d3554..4413446a 100644 --- a/mapies/util/multiprocessing.py +++ b/mapies/util/multiprocessing.py @@ -84,7 +84,6 @@ def process_single_file_viirs( # Flatten arrays obs, lat, lon, time_values = flatten_arrays(ds, obs_var, lat_var, lon_var, time_var) - # Apply quality assurance if applicable if apply_qa: @@ -99,9 +98,9 @@ def process_single_file_viirs( if not flag: # Regrid and aggregate if isinstance(grid, RotatedGrid): - lon_agg, lat_agg, rlon_agg, rlat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs) + lon_agg, lat_agg, rlon_agg, rlat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs, None, flattening=True) elif isinstance(grid, RegularGrid): - lon_agg, lat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs) + lon_agg, lat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs, None, flattening=True) ds.close() return obs_agg, lat_agg, lon_agg, count_obs else: @@ -109,47 +108,6 @@ def process_single_file_viirs( return obs, lat, lon, time_values -# def process_single_date_slice(date, daily_dict, grid, start_date, end_date, freq_hours): -# center_time = datetime.strptime(date, "%Y%m%d%H%M") -# l_border = (datetime.strptime(date, "%Y%m%d%H%M") - timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') -# r_border = (datetime.strptime(date, "%Y%m%d%H%M") + timedelta(hours=freq_hours/2)).strftime('%Y%m%d%H%M') - -# # Collect data -# obs, lon, lat, time_values = np.array([]), np.array([]), np.array([]), np.array([], dtype='datetime64') -# days = pd.date_range(l_border, r_border, freq="D") - -# valid_results = [] -# for pandas_day in days: -# day = pandas_day.date() -# try: -# day_obs = daily_dict[day]["obs"] -# day_lon = daily_dict[day]["lon"] -# day_lat = daily_dict[day]["lat"] -# day_time = daily_dict[day]["time"] -# valid_results.append((day_obs, day_lat, day_lon, day_time)) -# except KeyError: -# continue - -# # Concatenate all arrays together -# if not valid_results: -# return None -# obs, lat, lon, time_values = append_and_concatenate(valid_results, day) -# if obs is None: -# return None # No valid data for this time slice - -# # Filter by exact time range -# obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) - -# # Aggregate on the grid -# if isinstance(grid, RotatedGrid): -# lon, lat, rlon, rlat, obs, count_obs = grid.aggregate(lon, lat, obs, None) -# elif isinstance(grid, RegularGrid): -# lon, lat, obs, obserr, count_obs = grid.aggregate(lon, lat, obs, None) -# else: -# raise ValueError("Unsupported grid type") - -# return (date, lon, lat, obs, count_obs, time_values, center_time.date()) - class GenericMultiProcessing(ABC): """ Base method for multiprocessing @@ -215,33 +173,6 @@ class VIIRSMultiprocessing(GenericMultiProcessing): failed_files= [file for file, res in zip(self.batch, results) if res is None] return valid_results, failed_files - - - # def average_by_frequency_mp(self, frequency="D", save_nc=False): - - # freq_hours = frequency_int(frequency) - # logger.debug(f"Frequency in hours: {freq_hours}") - # avg_results = [] - - # dates_slice = time_slicing(self.start_date, self.end_date, frequency) - # logger.debug(f"Dates slice: {dates_slice}") - - # args = [ - # ( - # date, - # self.daily_dict, - # self.r, - # self.start_date, - # self.end_date, - # freq_hours) - # for date in dates_slice - # ] - - # with multiprocessing.Pool(processes=8) as pool: - # results = pool.starmap(process_single_date_slice, args) - - # return results - @dataclass diff --git a/mapies/util/plotting.py b/mapies/util/plotting.py index 38a0417d..39cd42fa 100644 --- a/mapies/util/plotting.py +++ b/mapies/util/plotting.py @@ -58,7 +58,7 @@ class MapiesPlotter: # Create the plot and add features fig, ax = plt.subplots(subplot_kw={"projection": proj}, figsize=figsize) - ax.gridlines() + ax.gridlines(draw_labels=True) ax.coastlines(resolution="10m") logger.debug("Plotting observations (new method)") diff --git a/run/run_viirs.py b/run/run_viirs.py index 717a9393..aefceebb 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -6,21 +6,28 @@ import time if __name__ == "__main__": # Change these variables # 00:30 to 02:36 from 2024-01-01 to 2024-01-03 - start_date = "202401010000" - end_date = "202401012359" + start_date = "202401020000" + end_date = "202401042359" - outdir="/home/cgile/Documents/mapies/figures/averages/ncview" + outdir="/home/cgile/Documents/mapies/figures/averages/10062025" indir="/home/cgile/Documents/mapies/VIIRS" start_time = time.time() - c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="regular") + c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="rotated") c.gather_nc_files() - c.process_avg_data(frequency_avg=None, min_num_obs = 20, batch_size = 100, save=True, apply_qa = True, geospatial_crop = None) - # c.plot_2D_observations(months=[0], filename="ncview_trial.png", outdir=outdir) + # geospatial_crop = [[-51,75], [-35,45]] + geospatial_crop = None + frequency_avg = "D" + min_num_obs = 0 + + c.process_avg_data(frequency_avg=None, min_num_obs = min_num_obs, batch_size = 100, save=True, apply_qa = False, geospatial_crop = geospatial_crop) + c.plot_2D_observations(months=[0], filename="plot_10062025.png", outdir=outdir) + # c.plot_2D_observations(months=[1], outdir=outdir) + # c.plot_2D_num_obs(months=[0], outdir=outdir) # c.process_lazy_data(apply_qa=True, save=True) -- GitLab