diff --git a/.gitignore b/.gitignore index 7cef2fe677cbb692ccdc3f80e41f172c660f883c..6d5bef66aa441e8f39aa29b84e3a3682fdefba9c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ venv out-logs logs/mapies.log +mapies/logs figures mapies-env outputs diff --git a/CHANGELOG b/CHANGELOG index 07782405075826eaf7ce3fb0b5138670081b12ce..3006f6c730a14b317631a3fc9303ab243f580d35 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,16 @@ CHANGELOG -## 0.1.2 -2025/03/07 - BUG time in netcdf outputs +## 0.1.3 - 2025/03/07 - Logging implemented + +LOGS + +### Features +- Using loguru we have updated the logs so that the output of mapies is cleaner +- In the terminal only Info level logs are shown +- In logs/mapies.log all data levels are shown +- Also using logger.catch has allowed us to remove a lot of the try/excepts + +## 0.1.2 - 2025/03/07 - BUG time in netcdf outputs BUG FIX diff --git a/environment.yml b/environment.yml index 10a8e981b032117949ae732ed1b5c67723292f71..0bd6897f1e950f46c9d3dafe7836c9039d120680 100644 --- a/environment.yml +++ b/environment.yml @@ -1,6 +1,6 @@ --- -name: mapies +name: mapies-v0.1.1 channels: - conda-forge @@ -21,6 +21,7 @@ dependencies: - pytest - pyYAML - h5netcdf + - loguru - pip variables: diff --git a/mapies/config/logging_config.yaml b/mapies/config/logging_config.yaml deleted file mode 100644 index dffcdd1ff4d8eecc3895d0d237b9a2c9faf5d6be..0000000000000000000000000000000000000000 --- a/mapies/config/logging_config.yaml +++ /dev/null @@ -1,41 +0,0 @@ -version: 1 -disable_existing_loggers: False - -formatters: - simple: - format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - datefmt: '%Y-%m-%d %H:%M:%S' - -handlers: - console: - class: logging.StreamHandler - level: DEBUG - formatter: simple - stream: ext://sys.stdout - - file: - class: logging.FileHandler - level: INFO - formatter: simple - filename: mapies.log - mode: a - -loggers: - development: - level: DEBUG - handlers: [console] - propagate: no - - staging: - level: INFO - handlers: [console, file] - propagate: no - - production: - level: WARNING - handlers: [file] - propagate: no - -root: - level: DEBUG - handlers: [console] \ No newline at end of file diff --git a/mapies/grids/base_grid.py b/mapies/grids/base_grid.py index 31ef319fc9a1f245db8541d5db3234ca479e1493..c1ffdeec793cb25a66134353f69fadd0ee3c9b42 100644 --- a/mapies/grids/base_grid.py +++ b/mapies/grids/base_grid.py @@ -10,9 +10,6 @@ from shapely.geometry import Polygon, Point from mapies.util.func_tools import * import logging -#logger = logging.getLogger(__name__) -#logging.basicConfig(filename='./logs/mapies.log', level=logging.INFO) - diff --git a/mapies/mapies.py b/mapies/mapies.py index 222d6095106aa5c6832c03498e29ee3220c7a27b..bc412f73f1fa7205208c0738e5aa0969bfb7c36a 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -7,13 +7,13 @@ from mapies.util.func_tools import * from mapies.grids.rotated import RotatedGrid, IrregularRotatedGrid, CAMSGrid, BDRCGrid from mapies.grids.regular import RegularGrid import time -import logging import numpy as np import pandas as pd import xarray as xr import matplotlib.pyplot as plt import cartopy.crs as ccrs import os +import sys import yaml import json import matplotlib.cm as cm @@ -23,8 +23,12 @@ import matplotlib.colors as mcolors from mapies.util.variables_names import VARS_ATTRIBUTES from numpy.typing import DTypeLike, NDArray -#logger = logging.getLogger(__name__) -#logging.basicConfig(filename='../logs/mapies.log', level=logging.DEBUG) +from loguru import logger + +logger.remove() +logger.add(sys.stderr, level="INFO") +logger.add("../logs/mapies.log", level="DEBUG") + class MAPIES: """ @@ -52,7 +56,7 @@ class MAPIES: "global":RegularGrid } - + @logger.catch def read_config(self, **kwargs): """ Read yaml config file @@ -66,7 +70,7 @@ class MAPIES: try: self.config = yaml.safe_load(open(config_file)) except FileNotFoundError: - #logger.error("Config file passed does not exist: Using default config instead") + logger.warning("Config file passed does not exist: Using default config instead") self.config = yaml.safe_load(open(os.path.join(module_dir, "config/satellite_config.yaml"))) else: self.config = yaml.safe_load(open(os.path.join(module_dir, "config/satellite_config.yaml"))) @@ -99,7 +103,6 @@ class MAPIES: self.grid = self.grid_dict[self.grid_repr](**self.grid_config) else: self.grid = self.grid_dict[self.grid_repr]() - print(self.grid) qa_dict = self.config[self.datatype]["qa"] @@ -108,6 +111,7 @@ class MAPIES: @timeit + @logger.catch def process_avg_data(self, monthly_avg=False, batch_size=100, save=False, apply_qa=True, geospatial_crop: NDArray | None = None): """ @@ -124,65 +128,56 @@ class MAPIES: #Check if the user wants qa flags if apply_qa: + logger.info(f"Applying QA flags: {self.qualityFlags}") self.apply_qa = True else: self.apply_qa = False if geospatial_crop is not None: + logger.info(f"Applying Geospatial cropping: {geospatial_crop}") self.geospatial_crop = np.array(geospatial_crop) if monthly_avg: for month in self.monthly_dict.keys(): # Catch if they haven't selected a grid try: - print(f"Running average on {self.grid}") + logger.info(f"Running average on {self.grid}") except AttributeError: raise Exception("Please specify a grid in the initialisation of MAPIES, using grid_repr=''") - try: - # Process the data for the current month - final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( - batch_size=batch_size, month=month - ) - - self.monthly_dict[month] = { - "lon": final_lon, - "lat": final_lat, - "obs": final_obs, - "count": cumulative_count, - } - print(f'Updated monthly dictionary for month {month}') - print(f'Lon shape: {final_lon.shape}, Lat shape: {final_lat.shape}, Obs shape: {final_obs.shape}') - if np.all(final_lon == 0) or np.all(np.isnan(final_lon)): - print(f"All longitude values for month {month} are 0 or NaN.") - if np.all(final_lat == 0) or np.all(np.isnan(final_lat)): - print(f"All latitude values for month {month} are 0 or NaN.") - if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): - print(f"All observation values for month {month} are 0 or NaN.") - except Exception as e: - print(f"Error processing data for month {month}: {e}") - continue - else: - try: + # Process the data for the current month final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( - batch_size=batch_size, month=None + batch_size=batch_size, month=month ) - self.obs, self.lat, self.lon, self.count_obs = filter_observations(final_obs, final_lat, final_lon, cumulative_count) + self.monthly_dict[month] = { + "lon": final_lon, + "lat": final_lat, + "obs": final_obs, + "count": cumulative_count, + } + logger.debug(f'Updated monthly dictionary for month {month}') + logger.debug(f'Lon shape: {final_lon.shape}, Lat shape: {final_lat.shape}, Obs shape: {final_obs.shape}') + if np.all(final_lon == 0) or np.all(np.isnan(final_lon)): + logger.warning(f"All longitude values for month {month} are 0 or NaN.") + if np.all(final_lat == 0) or np.all(np.isnan(final_lat)): + logger.warning(f"All latitude values for month {month} are 0 or NaN.") + if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): + logger.warning(f"All observation values for month {month} are 0 or NaN.") + + else: + final_lon, final_lat, final_obs, cumulative_count = self.process_data_mp( + batch_size=batch_size, month=None + ) + + self.obs, self.lat, self.lon, self.count_obs = filter_observations(final_obs, final_lat, final_lon, cumulative_count) - if save: - try: - mid_time_array = generate_mid_time_array(self.start_date, self.end_date, self.obs) + if save: + mid_time_array = generate_mid_time_array(self.start_date, self.end_date, self.obs) - self.to_netCDF(self.obs, self.lat, self.lon, mid_time_array, self.start_date, self.end_date) - print(f'Saved processed data to netCDF') - except Exception as e: - print(f"Error saving data into netCDF: {e}") - return + self.to_netCDF(self.obs, self.lat, self.lon, mid_time_array, self.start_date, self.end_date) + logger.debug(f'Saved processed data to netCDF') - except Exception as e: - print(f"Error processing data for the time period from {self.start_date} to {self.end_date}: {e}") - return def process_data_mp(self, batch_size, month=None): @@ -192,10 +187,10 @@ class MAPIES: flag = False if month is None: month_files = self.files - print(f'Processing all files from {self.start_date} to {self.end_date}') + logger.info(f'Processing all files from {self.start_date} to {self.end_date}') else: month_files = self.monthly_dict[month]["files"] - print(f"Processing {len(month_files)} files for month {month}") + logger.info(f"Processing {len(month_files)} files for month {month}") # Split files into batches @@ -209,7 +204,7 @@ class MAPIES: failed_files = [] for batch in batches: - print(f"Processing a batch of {len(batch)} files...") + logger.debug(f"Processing a batch of {len(batch)} files...") # Use multiprocessing to process files within the batch @@ -231,29 +226,30 @@ class MAPIES: failed_files.extend(failed) if not valid_results: - print("All files in this batch failed. Skipping...") + logger.warning("All files in this batch failed. Skipping...") continue # Aggregate results incrementally cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count = combine_arrays(valid_results, cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count) if failed_files: - print(f"The following files failed to process: {failed_files}") + logger.warning(f"The following files failed to process: {failed_files}") # Check if any valid data was processed if cumulative_obs is None or cumulative_count is None: - print(f"No valid data processed for month {month}. Returning empty arrays.") + logger.warning(f"No valid data processed for month {month}. Returning empty arrays.") return None, None, None, None # Compute final averages final_obs, final_lat, final_lon = compute_final_averages(cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count) - print(f"Completed processing for month {month}.") + logger.debug(f"Completed processing for month {month}.") return final_lon, final_lat, final_obs, cumulative_count @timeit + @logger.catch def yearly_average(self): """ Compute the yearly average for the processed data. @@ -265,10 +261,10 @@ class MAPIES: for key, month_data in self.monthly_dict.items(): if not all(var in month_data for var in ["lon", "lat", "obs", "count"]): - print(f"Skipping month {key}: Missing data.") + logger.warning(f"Skipping month {key}: Missing data.") continue if any(month_data[var] is None for var in ["lon", "lat", "obs", "count"]): - print(f"Skipping month {key}: No valid data.") + logger.warning(f"Skipping month {key}: No valid data.") continue lon_values = month_data["lon"] @@ -280,7 +276,7 @@ class MAPIES: cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count = combine_arrays(results, cumulative_obs, cumulative_lat, cumulative_lon, cumulative_count) if cumulative_obs is None or cumulative_count is None or np.all(cumulative_count == 0): - print("No valid data for the entire year. Returning empty arrays.") + logger.warning("No valid data for the entire year. Returning empty arrays.") self.yearly_lon = None self.yearly_lat = None self.yearly_obs = None @@ -294,10 +290,11 @@ class MAPIES: self.yearly_obs = yearly_obs self.yearly_count = cumulative_count - print("Yearly average computation completed.") + logger.debug("Yearly average computation completed.") @timeit + @logger.catch def process_lazy_data(self, apply_qa=True, geospatial_crop: NDArray | None = None, save=True): """ Process the data for the specified time range. It only retrieves a daily dictionary using dask lazy loading. @@ -311,108 +308,101 @@ class MAPIES: if geospatial_crop is not None: self.geospatial_crop = np.array(geospatial_crop) for day in self.daily_dict.keys(): - try: - files = self.daily_dict[day]["files"] - - # Use multiprocessing to process files within the batch - - viirs_mp = self.mp_type( - self.grid, - self.obs_var, - self.lat_var, - self.lon_var, - self.time_var, - self.start_date, - self.end_date, - self.apply_qa, - self.qualityFlags, - files, - self.geospatial_crop, - flag, - ) - valid_results, failed_files = viirs_mp.process_batch() + files = self.daily_dict[day]["files"] - final_obs, final_lat, final_lon, final_time = append_and_concatenate(valid_results, day) - - # Skip the day if no valid data was processed - if final_obs is None: - continue + # Use multiprocessing to process files within the batch - # Check if any valid data exists - if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): - print(f"All observation values for day {day} are 0 or NaN. Skipping...") - continue + viirs_mp = self.mp_type( + self.grid, + self.obs_var, + self.lat_var, + self.lon_var, + self.time_var, + self.start_date, + self.end_date, + self.apply_qa, + self.qualityFlags, + files, + self.geospatial_crop, + flag, + ) + valid_results, failed_files = viirs_mp.process_batch() - # Update day dict - self.daily_dict[day]["obs"] = final_obs - self.daily_dict[day]["lon"] = final_lon - self.daily_dict[day]["lat"] = final_lat - self.daily_dict[day]["time"] = final_time + final_obs, final_lat, final_lon, final_time = append_and_concatenate(valid_results, day) - # Get day start and day end but only if they are within object start and end date - day_start, day_end = day_boundaries(day, self.start_date, self.end_date) - if save: - self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) + # Skip the day if no valid data was processed + if final_obs is None: + continue - except Exception as e: - print(f"Error processing data for day {day}: {e}") + # Check if any valid data exists + if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): + logger.warning(f"All observation values for day {day} are 0 or NaN. Skipping...") continue + # Update day dict + self.daily_dict[day]["obs"] = final_obs + self.daily_dict[day]["lon"] = final_lon + self.daily_dict[day]["lat"] = final_lat + self.daily_dict[day]["time"] = final_time + + # Get day start and day end but only if they are within object start and end date + day_start, day_end = day_boundaries(day, self.start_date, self.end_date) + if save: + self.to_netCDF(final_obs, final_lat, final_lon, final_time, day_start, day_end) + + @logger.catch def to_netCDF(self, obs, lat, lon, time, start_time, end_time): - try: - # Convert time to seconds since TIME_ORIGIN - final_time_seconds = (time - self.time_orig) / np.timedelta64(1, "s") - final_time_seconds = np.round(final_time_seconds, 0) - - grid_representation_str = json.dumps(self.grid_config, ensure_ascii=False) - qa_flags_str = json.dumps(self.qualityFlags, ensure_ascii=False) - - - # Create xarray Dataset - ds = xr.Dataset( - coords={"time": ("time", final_time_seconds, { - "units": f"seconds since {self.time_orig}", - "calendar": "gregorian" - })}, - - data_vars={ - "lon": (["time"], lon, VARS_ATTRIBUTES['lon'].copy()), - "lat": (["time"], lat, VARS_ATTRIBUTES['lat'].copy()), - VARS_ATTRIBUTES[self.obs_var]["mapies_variable"]: (["time"], obs, VARS_ATTRIBUTES[self.obs_var].copy()), - }, - attrs={ - "title": f"{VARS_ATTRIBUTES['title']['description']} from {start_time} to {end_time}", - "institution": "Barcelona Supercomputing Center", - "grid": f"{self.grid_repr}: {grid_representation_str}", - "Developers": "MAPIES team", - "QA": f"Quality Assurance: {self.apply_qa}, quality assurance applied: {qa_flags_str}", - "history": f"Created {datetime.now()}", - }, - ) - # Define encoding for compact storage - encoding = { - "time": {"dtype": "float32"}, - "lon": {"dtype": "float32"}, - "lat": {"dtype": "float32"}, - VARS_ATTRIBUTES[self.obs_var]["mapies_variable"]: {"dtype": "float32"}, - } + # Convert time to seconds since TIME_ORIGIN + final_time_seconds = (time - self.time_orig) / np.timedelta64(1, "s") + final_time_seconds = np.round(final_time_seconds, 0) + + grid_representation_str = json.dumps(self.grid_config, ensure_ascii=False) + qa_flags_str = json.dumps(self.qualityFlags, ensure_ascii=False) + + # Create xarray Dataset + ds = xr.Dataset( + coords={"time": ("time", final_time_seconds, { + "units": "seconds since 1900-01-01 00:00:00", + "calendar": "gregorian" + })}, + + data_vars={ + "lon": (["time"], lon, VARS_ATTRIBUTES['lon'].copy()), + "lat": (["time"], lat, VARS_ATTRIBUTES['lat'].copy()), + VARS_ATTRIBUTES[self.obs_var]["mapies_variable"]: (["time"], obs, VARS_ATTRIBUTES[self.obs_var].copy()), + }, + attrs={ + "title": f"{VARS_ATTRIBUTES['title']['description']} from {start_time} to {end_time}", + "institution": "Barcelona Supercomputing Center", + "grid": f"{self.grid_repr}: {grid_representation_str}", + "Developers": "MAPIES team", + "QA": f"Quality Assurance: {self.apply_qa}, quality assurance applied: {qa_flags_str}", + "history": f"Created {datetime.now()}", + }, + ) - # Save to NetCDF file - filename = f"{self.dest}/Data_{self.datatype}_{start_time}_{end_time}.nc" - ds.to_netcdf(filename, mode='w', encoding=encoding) + # Define encoding for compact storage + encoding = { + "time": {"dtype": "float32"}, + "lon": {"dtype": "float32"}, + "lat": {"dtype": "float32"}, + VARS_ATTRIBUTES[self.obs_var]["mapies_variable"]: {"dtype": "float32"}, + } - print(f"Saved data to {filename}") + # Save to NetCDF file + filename = f"{self.dest}/Data_{self.datatype}_{start_time}_{end_time}.nc" + ds.to_netcdf(filename, mode='w', encoding=encoding) + + logger.info(f"Saved data to {filename}") - except Exception as e: - print(f"Error saving data into netCDF: {e}") - return # ============================================================================= # Plotting # ============================================================================= + @logger.catch def plot_2D_observations(self, months=[0], filename=None, outdir=None): if months == [0]: @@ -421,7 +411,7 @@ class MAPIES: lat_values = self.lat obs_values = self.obs except Exception as e: - print(f"Error plotting all data: {e}") + logger.error(f"Error plotting all data: {e}") return title = f"Observation 2D plot from {self.start_date} to {self.end_date}" filename = filename or f"{self.datatype}_2D_obs_from_{self.start_date}_to_{self.end_date}.png" @@ -442,7 +432,7 @@ class MAPIES: lat_values = month_data["lat"] obs_values = month_data["obs"] except Exception as e: - print(f"Error for month {m}: {e}") + logger.error(f"Error for month {m}: {e}") continue title = f"Observation 2D plot of month {m} from {self.start_date} to {self.end_date}" filename = filename or f"{self.datatype}_2D_obs_month_{m}_year_{self.year}.png" @@ -460,7 +450,7 @@ class MAPIES: lat_values = self.yearly_lat obs_values = self.yearly_obs except Exception as e: - print(f"Error for yearly data: {e}") + logger.error(f"Error for yearly data: {e}") return title = f"Observation 2D plot of yearly average of year {self.year}" filename = filename or f"{self.datatype}_2D_yearly_obs_{self.year}.png" @@ -482,7 +472,7 @@ class MAPIES: lat_values = self.lat cum_count = self.count_obs except Exception as e: - print(f"Error plotting all data: {e}") + logger.error(f"Error plotting all data: {e}") return title = f"Plot of the number of valid observations from {self.start_date} to {self.end_date}" filename = filename or f"{self.datatype}_2D_obs_count_from_{self.start_date}_to_{self.end_date}.png" @@ -503,7 +493,7 @@ class MAPIES: lat_values = month_data["lat"] cum_count = month_data["count"] except Exception as e: - print(f"Error for month {m}: {e}") + logger.error(f"Error for month {m}: {e}") continue title = f"Plot of the number of valid observations for month {m} from {self.start_date} to {self.end_date}" filename = filename or f"{self.datatype}_2D_obs_count_month_{m}_year_{self.year}.png" @@ -521,7 +511,7 @@ class MAPIES: lat_values = self.yearly_lat cum_count = self.yearly_count except Exception as e: - print(f"Error for yearly data: {e}") + logger.error(f"Error for yearly data: {e}") return title = f"Plot of the number of valid observations for yearly average of {self.year}" filename = filename or f"{outdir}/{self.datatype}_2D_obs_count_yearly_{self.year}.png" @@ -573,7 +563,6 @@ class MAPIES: ax.gridlines() ax.coastlines(resolution="10m") - print("Plotting observations (new method)") # Scatter plot with discrete color mapping im = ax.scatter( @@ -595,7 +584,7 @@ class MAPIES: plt.savefig(filepath, format="png") if display_fig: plt.show() - print(f"Saved plot: {filepath}") + logger.info(f"Saved plot: {filepath}") # TODO this can be moved to a plotting function python file @@ -654,7 +643,7 @@ class MAPIES: plt.savefig(filepath, format="png") if display_fig: plt.show() - print(f"Saved plot: {filepath}") + logger.info(f"Saved plot: {filepath}") diff --git a/mapies/tropomi.py b/mapies/tropomi.py index fe13be38b7b134c8d1b1d8bfb3e94980923d01d3..0f8016570b1b1582928ba3edb5d6b63483f3ea82 100644 --- a/mapies/tropomi.py +++ b/mapies/tropomi.py @@ -8,16 +8,13 @@ import numpy as np import pandas as pd import xarray as xr import os +import sys from mapies.util.func_tools import * from mapies.util.multiprocessing import TROPOMIMultiprocessing from numpy.typing import DTypeLike, NDArray os.environ["OMP_NUM_THREADS"] = "8" -#logger = logging.getLogger(__name__) -#logging.basicConfig(filename='../logs/mapies.log', level=logging.INFO) - - class TROPOMI(MAPIES): @@ -37,7 +34,6 @@ class TROPOMI(MAPIES): raise Exception("Output directory doesn't exist") self.indir = kwargs.get("indir") self.year = int(start_date[:4]) - #self.quality_flag_limit self.monthly_dict = {} self.daily_dict = {} # Dictionary to produce daily netcdf files self.datatype = "tropomi" @@ -60,12 +56,13 @@ class TROPOMI(MAPIES): @timeit + @logger.catch def gather_nc_files(self): """ Returns list of files needed to be processed by the application """ file_patterns = [] - print(f' Date slice = {self.dates_slice}') + logger.debug(f' Date slice = {self.dates_slice}') for date in self.dates_slice: # Convert to Julian day date = datetime.strptime(date, '%Y%m%d%H%M').strftime('%Y%m%d') @@ -75,7 +72,7 @@ class TROPOMI(MAPIES): file_patterns.append(filepaths) # Add pattern to the set files = sorted(get_file_list(file_patterns)) - print(f"Total number of files: {len(files)}") + logger.debug(f"Total number of files: {len(files)}") # I'm not sure we need this part @@ -93,7 +90,7 @@ class TROPOMI(MAPIES): last_idx = min(len(files) - 1, files.index(files_filtered[-1]) + 2) self.files = files[first_idx : last_idx + 1] - print(f"Total number of filtered files: {len(self.files)}") + logger.debug(f"Total number of filtered files: {len(self.files)}") for file in self.files: parts = file.split('_') @@ -121,12 +118,12 @@ class TROPOMI(MAPIES): self.monthly_dict[date.month]["files"].append(file) self.daily_dict[date.day]["files"].append(file) - print(f"Discovered files for months: {list(self.monthly_dict.keys())}") + logger.debug(f"Discovered files for months: {list(self.monthly_dict.keys())}") for month, data in self.monthly_dict.items(): - print(f" Month {month:02d}: {len(data['files'])} files") + logger.debug(f" Month {month:02d}: {len(data['files'])} files") - print(f"Discovered files for days: {list(self.daily_dict.keys())}") + logger.debug(f"Discovered files for days: {list(self.daily_dict.keys())}") # Print the daily disctionary to see if files are stored in chronological order for day, data in self.daily_dict.items(): - print(f"Day {day:02d}: {len(data['files'])} files") + logger.debug(f"Day {day:02d}: {len(data['files'])} files") diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index 60fd8709ff8e41184bb5b14dee26e90a9396f7ed..fb2c05b00a85ff9356c353348ec170f9ea940816 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -3,6 +3,7 @@ import logging import numpy as np import pandas as pd import xarray as xr +import sys from functools import wraps from numpy import cos, sin, arctan, pi, nan @@ -10,6 +11,8 @@ from glob import glob from typing import List from datetime import datetime import multiprocessing +from loguru import logger + TIME_ORIGIN = np.datetime64("1993-01-01T00:00:00") @@ -27,7 +30,7 @@ def timeit(func): result = func(*args, **kwargs) end_time = time.perf_counter() total_time = end_time - start_time - print(f'Function {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds') + logger.debug(f'Function {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds') return result return timeit_wrapper @@ -460,7 +463,7 @@ def append_and_concatenate(valid_results, day): daily_time.append(time_values) if not daily_obs: - print(f"No valid data for day {day}") + logger.warning(f"No valid data for day {day}") return None, None, None, None final_obs = np.concatenate(daily_obs, axis=0) diff --git a/mapies/util/multiprocessing.py b/mapies/util/multiprocessing.py index 65204c94a360614bea2c481cd9915b87e6c67a1e..034fa2be1518ed557ed6ac60e8a997b69d428d10 100644 --- a/mapies/util/multiprocessing.py +++ b/mapies/util/multiprocessing.py @@ -5,6 +5,7 @@ import xarray as xr from typing import List from abc import ABC, abstractmethod import multiprocessing +import sys from dataclasses import dataclass from numpy.typing import DTypeLike, NDArray from mapies.util.func_tools import * @@ -13,7 +14,7 @@ from mapies.grids.rotated import RotatedGrid from mapies.grids.regular import RegularGrid - +@logger.catch def process_single_file_tropomi( grid, file, @@ -31,59 +32,36 @@ def process_single_file_tropomi( """ Process a single file and return aggregated results or flattened arrays. """ - try: - ds = xr.open_dataset(file, engine="h5netcdf", group="PRODUCT") - except Exception as e: - print(f"Reading file {file}: {e}") - return None + ds = xr.open_dataset(file, engine="h5netcdf", group="PRODUCT") # Flatten arrays - try: - obs, lat, lon, time_values = flatten_arrays(ds, obs_var, lat_var, lon_var, time_var) - except Exception as e: - print(f"Error processing file {file}: {e}") - return None + obs, lat, lon, time_values = flatten_arrays(ds, obs_var, lat_var, lon_var, time_var) # Apply quality assurance if applicable if apply_qa: - try: - obs, lat, lon, time_values = quality_assurance(ds, obs, lat, lon, time_values, qualityFlags) - except Exception as e: - print(f"Error applying QA Flags for file {file}: {e}") - return None + obs, lat, lon, time_values = quality_assurance(ds, obs, lat, lon, time_values, qualityFlags) # Perform time domain selection beteen start and end date - try: - obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, start_date, end_date) - except Exception as e: - print(f"Error performing time domain selection for file {file}: {e}") - return None + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, start_date, end_date) + # Perform station domain selection if specified if geospatial_crop is not None: - try: - obs, lat, lon, time_values = spatial_domain_selection(obs, lat, lon, time_values, geospatial_crop) - except Exception as e: - print(f"Error performing spatial domain selection for file {file}: {e}") - return None + obs, lat, lon, time_values = spatial_domain_selection(obs, lat, lon, time_values, geospatial_crop) if not flag: - try: - # Regrid and aggregate - if isinstance(grid, RotatedGrid): - lon_agg, lat_agg, rlon_agg, rlat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs) - elif isinstance(grid, RegularGrid): - lon_agg, lat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs) - ds.close() - return obs_agg, lat_agg, lon_agg, count_obs - except Exception as e: - print(f"Error performing regridding for file {file}: {e}") - return None + # Regrid and aggregate + if isinstance(grid, RotatedGrid): + lon_agg, lat_agg, rlon_agg, rlat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs) + elif isinstance(grid, RegularGrid): + lon_agg, lat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs) + ds.close() + return obs_agg, lat_agg, lon_agg, count_obs else: ds.close() return obs, lat, lon, time_values - +@logger.catch def process_single_file_viirs( grid, file, @@ -101,53 +79,30 @@ def process_single_file_viirs( """ Process a single file and return aggregated results or flattened arrays. """ - try: - ds = xr.open_dataset(file, engine="h5netcdf") - except Exception as e: - print(f"Reading file {file}: {e}") - return None + ds = xr.open_dataset(file, engine="h5netcdf") # Flatten arrays - try: - obs, lat, lon, time_values = flatten_arrays(ds, obs_var, lat_var, lon_var, time_var) - except Exception as e: - print(f"Error processing file {file}: {e}") - return None + obs, lat, lon, time_values = flatten_arrays(ds, obs_var, lat_var, lon_var, time_var) + # Apply quality assurance if applicable if apply_qa: - try: - obs, lat, lon, time_values = quality_assurance(ds, obs, lat, lon, time_values, qualityFlags) - except Exception as e: - print(f"Error applying QA Flags for file {file}: {e}") - return None + obs, lat, lon, time_values = quality_assurance(ds, obs, lat, lon, time_values, qualityFlags) # Perform time domain selection beteen start and end date - try: - obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, start_date, end_date) - except Exception as e: - print(f"Error performing time domain selection for file {file}: {e}") - return None + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, start_date, end_date) # Perform station domain selection if specified if geospatial_crop is not None: - try: - obs, lat, lon, time_values = spatial_domain_selection(obs, lat, lon, time_values, geospatial_crop) - except Exception as e: - print(f"Error performing spatial domain selection for file {file}: {e}") - return None + obs, lat, lon, time_values = spatial_domain_selection(obs, lat, lon, time_values, geospatial_crop) if not flag: - try: - # Regrid and aggregate - if isinstance(grid, RotatedGrid): - lon_agg, lat_agg, rlon_agg, rlat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs) - elif isinstance(grid, RegularGrid): - lon_agg, lat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs) - ds.close() - return obs_agg, lat_agg, lon_agg, count_obs - except Exception as e: - print(f"Error performing regridding for file {file}: {e}") - return None + # Regrid and aggregate + if isinstance(grid, RotatedGrid): + lon_agg, lat_agg, rlon_agg, rlat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs) + elif isinstance(grid, RegularGrid): + lon_agg, lat_agg, obs_agg, count_obs = grid.aggregate(lon, lat, obs) + ds.close() + return obs_agg, lat_agg, lon_agg, count_obs else: ds.close() return obs, lat, lon, time_values @@ -214,7 +169,7 @@ class VIIRSMultiprocessing(GenericMultiProcessing): # Filter valid results and log failed files valid_results = [res for res in results if res is not None] - print(f"Valid results: {len(valid_results)}") + logger.debug(f"Valid results: {len(valid_results)}") failed_files= [file for file, res in zip(self.batch, results) if res is None] return valid_results, failed_files @@ -269,7 +224,7 @@ class TROPOMIMultiprocessing(GenericMultiProcessing): # Filter valid results and log failed files valid_results = [res for res in results if res is not None] - print(f"Valid results: {len(valid_results)}") + logger.debug(f"Valid results: {len(valid_results)}") failed_files= [file for file, res in zip(self.batch, results) if res is None] return valid_results, failed_files diff --git a/mapies/viirs.py b/mapies/viirs.py index 17ab2c160d3d53cb6e5fd8e22c5ce4a1cc05dd29..05e033306675fd5ee3b672cab4a71eec5663aa1b 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -3,6 +3,7 @@ from datetime import datetime, timedelta import logging import os +import sys import time import dask.array as da @@ -16,13 +17,10 @@ from mapies.util.variables_names import VARS_ATTRIBUTES from pathlib import Path from mapies.util.multiprocessing import VIIRSMultiprocessing +from loguru import logger os.environ["OMP_NUM_THREADS"] = "8" -#logger = logging.getLogger(__name__) -#logging.basicConfig(filename='../logs/mapies.log', level=logging.INFO) - - class VIIRS(MAPIES): """ @@ -64,6 +62,7 @@ class VIIRS(MAPIES): @timeit + @logger.catch def read_config(self, **kwargs): """ Read yaml config file @@ -83,12 +82,13 @@ class VIIRS(MAPIES): @timeit + @logger.catch def gather_nc_files(self): """ Returns list of files needed to be processed by the application """ file_patterns = [] - print(f' Date slice = {self.dates_slice}') + logger.debug(f' Date slice = {self.dates_slice}') if self.indir: for date in self.dates_slice: date = datetime.strptime(date, '%Y%m%d%H%M').strftime('%Y%j') @@ -97,7 +97,7 @@ class VIIRS(MAPIES): file_patterns.append(filepaths) files = sorted(get_file_list(file_patterns)) - print(f"Total number of files: {len(files)}") + logger.debug(f"Total number of files: {len(files)}") else: raise Exception("Please specify an input directory") @@ -107,7 +107,7 @@ class VIIRS(MAPIES): last_idx = min(len(files) - 1, files.index(files_filtered[-1]) + 2) self.files = files[first_idx : last_idx + 1] - print(f"Total number of filtered files: {len(self.files)}") + logger.debug(f"Total number of filtered files: {len(self.files)}") for file in self.files: parts = file.split('.') @@ -133,20 +133,21 @@ class VIIRS(MAPIES): self.monthly_dict[date.month]["files"].append(file) self.daily_dict[date.day]["files"].append(file) - print(f"Discovered files for months: {list(self.monthly_dict.keys())}") + logger.debug(f"Discovered files for months: {list(self.monthly_dict.keys())}") for month, data in self.monthly_dict.items(): - print(f" Month {month:02d}: {len(data['files'])} files") + logger.debug(f" Month {month:02d}: {len(data['files'])} files") - print(f"Discovered files for days: {list(self.daily_dict.keys())}") + logger.debug(f"Discovered files for days: {list(self.daily_dict.keys())}") for day, data in self.daily_dict.items(): - print(f"Day {day:02d}: {len(data['files'])} files") + logger.debug(f"Day {day:02d}: {len(data['files'])} files") # ============================================================================= # Supporting functions # ============================================================================= @timeit + @logger.catch def to_da(self, frequency="D", save_nc= True, save_figs=False): """ Function that returns all the needed variables for the DA @@ -180,7 +181,7 @@ class VIIRS(MAPIES): day_lat = self.daily_dict[day]["lat"] day_time = self.daily_dict[day]["time"] except KeyError: - print("No data for day selected") + logger.warning("No data for day selected") continue obs = np.append(obs, day_obs) lon = np.append(lon, day_lon) @@ -191,7 +192,7 @@ class VIIRS(MAPIES): try: obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, l_border, r_border) except TypeError: - print("No data, skipping") + logger.warning("No data, skipping") continue # Calculate obserr obserr = error_estimation(self.datatype, obs, self.unc_const) @@ -245,7 +246,7 @@ class VIIRS(MAPIES): ds = self.to_xarray(coords=coords, data_vars=data_vars) if save_nc: #logger.info(f"Outputting da data with {filename}") - print(f"Outputting da data with {filename}") + logger.debug(f"Outputting da data with {filename}") ds.to_netcdf(filename, encoding={}) outfiles.append(filename) diff --git a/requirements.txt b/requirements.txt index f46a0d4621ff18dc7cc7eab641f775cfec0830f9..613d33202e07101209fca94da3f6f0fd87d869c2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,6 +25,7 @@ iniconfig==2.0.0 Jinja2==3.1.4 kiwisolver==1.4.7 locket==1.0.0 +loguru==0.7.3 MarkupSafe==2.1.5 matplotlib==3.7.5 memory-profiler==0.61.0 diff --git a/run/run_tropomi.py b/run/run_tropomi.py index c9bdf577569d3f748331f950ab5ca15f06802a95..93f97fd2da1f734d8746055c219eb2dc6c64bf7e 100644 --- a/run/run_tropomi.py +++ b/run/run_tropomi.py @@ -12,6 +12,7 @@ if __name__ == "__main__": start_time = time.time() + c = TROPOMI(start_date, end_date, dest=outdir, indir=indir, grid_repr="global") c.gather_nc_files() c.process_avg_data(monthly_avg=False, batch_size = 14, apply_qa=True, geospatial_crop=[[-40, 60], [0, 70]], save=True) diff --git a/run/run_viirs.py b/run/run_viirs.py index fe1a2fee8b11f07fbefefc215a18db3b3a1ad8e1..fac7593188549444839f5d17653c3c77a488175d 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -19,15 +19,17 @@ if __name__ == "__main__": start_time = time.time() - c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="cams") + c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="bdrc") c.gather_nc_files() + c.process_avg_data(monthly_avg=False, batch_size = 100, apply_qa=True, save=True) #c.yearly_average() c.plot_2D_observations(months=[0], filename="new.png", outdir=outdir) c.plot_2D_num_obs(months=[0], outdir=outdir) c.process_lazy_data(apply_qa=True, save=False) + c.to_da(frequency="3H", save_figs=True) diff --git a/setup.py b/setup.py index 18fbe5e7a395289e0c9ac88198b1bdb8b2f73bd1..68f1e785346fe91df5340844dc5466716640282d 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ from setuptools import find_packages from setuptools import setup # Could update this using versioneer -version="0.1.2" +version="0.1.3" setup( name="mapies",