diff --git a/docs/project-management/gantt.py b/docs/project-management/gantt.py index 8ecb57677d2efff7e40b98127a227ca6259d5e0b..eb0faa43349deaa8c9bfbf47e4c2eacf3df08c0d 100644 --- a/docs/project-management/gantt.py +++ b/docs/project-management/gantt.py @@ -4,17 +4,22 @@ import plotly.figure_factory as ff df = [ dict(Task="Viirs base processor", Start='2024-08-01', Finish='2024-09-30', Resource='Complete'), - dict(Task="Add pytests for already created functions (3-9)", Start='2024-08-01', Finish='2024-12-31', Resource='Incomplete'), - dict(Task="Tropomi base processor", Start='2024-08-01', Finish='2024-11-28', Resource='Incomplete'), - dict(Task="Tropomi to_plumes functionality (11)", Start='2024-12-01', Finish='2024-12-31', Resource='Not Started'), - dict(Task="Time evaluation statistics (18)", Start='2024-11-01', Finish='2024-11-28', Resource='Incomplete'), - dict(Task="Test DA implementation for VIIRS", Start='2024-11-11', Finish='2024-11-28', Resource='Not Started'), - dict(Task="Cloudsat processor", Start='2024-11-01', Finish='2024-11-28', Resource='Incomplete'), - dict(Task="Creating Grid Config for rotated and regular global grids (14)", Start='2024-11-18', Finish='2024-11-28', Resource='Not Started'), - dict(Task="Add original indices of the data to the outputted netcdf (13)", Start='2025-01-01', Finish='2025-01-14', Resource='Not Started'), - dict(Task="Add history to the (10)", Start='2025-01-01', Finish='2025-01-14', Resource='Not Started'), - dict(Task="Change variable names to IODA variables (17)", Start='2024-12-01', Finish='2025-01-14', Resource='Not Started'), - dict(Task="Update grid representations to include Vertical profiling (20)", Start='2024-12-01', Finish='2025-01-14', Resource='Not Started') + dict(Task="Add pytests for already created functions (3-9)", Start='2024-08-01', Finish='2024-12-31', Resource='Complete'), + dict(Task="Tropomi base processor", Start='2024-08-01', Finish='2025-02-03', Resource='Incomplete'), + dict(Task="Tropomi to_plumes functionality (11)", Start='2025-02-03', Finish='2025-03-31', Resource='Not Started'), + dict(Task="Time evaluation statistics (18)", Start='2024-11-01', Finish='2024-11-28', Resource='Complete'), + dict(Task="Test DA implementation for VIIRS", Start='2025-02-03', Finish='2025-02-28', Resource='Not Started'), + dict(Task="Convert Grid representation to numpy from geopandas", Start='2024-12-01', Finish='2024-12-31', Resource="Complete"), + dict(Task="Creating Grid Config for rotated and regular global grids (14)", Start='2024-11-18', Finish='2025-02-03', Resource='Incomplete'), + dict(Task="Add original indices of the data to the outputted netcdf (13)", Start='2025-02-03', Finish='2025-01-14', Resource='Not Started'), + dict(Task="Add history of processes to the netcdf (10)", Start='2025-01-01', Finish='2025-02-03', Resource='Not Started'), + dict(Task="Update grid representations to include Vertical profiling (20)", Start='2025-03-01', Finish='2025-03-31', Resource='Not Started'), + dict(Task="Test Viirs data with Providentia Interpolation", Start="2025-03-01", Finish="2025-03-31", Resource="Not Started"), + dict(Task="Produce Number of counts plots (29)", Start="2025-01-13", Finish="2025-01-20", Resource="Complete"), + dict(Task="QA flags (23)", Start="2025-01-13", Finish="2025-02-03", Resource="Incomplete"), + dict(Task="Finalise netcdf storage output (30)", Start="2025-01-20", Finish="2025-02-03", Resource="Not Started"), + dict(Task="Create conda environment", Start="2025-01-01", Finish="2025-01-14", Resource="Complete"), + dict(Task="Create Logging file output", Start="2025-01-01", Finish="2025-02-03", Resource="Incomplete"), ] diff --git a/environment.yml b/environment.yml index de9580fdff3a404f7c71db0a1bc81709ae69ecb7..10a8e981b032117949ae732ed1b5c67723292f71 100644 --- a/environment.yml +++ b/environment.yml @@ -20,6 +20,7 @@ dependencies: - xarray - pytest - pyYAML + - h5netcdf - pip variables: diff --git a/mapies/config/satellite_config.yaml b/mapies/config/satellite_config.yaml index fed6c4fc0fd0c8a8b69c9300a276c3576605e4ad..44c2b8a00eef468ea25fd0b201ba4f6230ab77d3 100644 --- a/mapies/config/satellite_config.yaml +++ b/mapies/config/satellite_config.yaml @@ -11,13 +11,34 @@ viirs: obsid: 34 uncertainty_constants: [0.2, 0.05, 0.02] grid_repr: "rotated" + qa: + qa_variables: + qa_land: + qa_name: "Aerosol_Optical_Thickness_QA_Flag_Land" #name of flag + qa_values: [2,3] # Values you want to keep + qa_combined: False #This makes sure that & operator is used in the python script, else default is or + qa_ocean: + qa_name: "Aerosol_Optical_Thickness_QA_Flag_Ocean" + qa_values: [3] + qa_combined: False + qa_type: + qa_name: "Aerosol_Type_Land_Ocean" + qa_values: [0] + qa_combined: True + tropomi: variables: time_variable: "delta_time" lon_variable: "longitude" lat_variable: "latitude" - obs_variable: "nitrogendioxide_tropospheric_column" + obs_variable: "nitrogendioxide_tropospheric_column" + qa: + qa_variables: + qa_value: + qa_name: "qa_value" + qa_values: [0.5] + qa_combined: True diff --git a/mapies/grids/monarch.py b/mapies/grids/monarch.py index 61f0cac1c9faf55d20bfe25423b63615063deda7..6929811a93ca06676f1e744f04d58f2752d52e7c 100644 --- a/mapies/grids/monarch.py +++ b/mapies/grids/monarch.py @@ -195,16 +195,6 @@ class RegularGrid(Grid): if aggregated_obs_err is not None: aggregated_obs_err[non_zero_counts] /= count_obs[non_zero_counts] - - # # Mask for zero counts - # aggregated_obs = aggregated_obs[non_zero_counts] - # aggregated_lon = aggregated_lon[non_zero_counts] - # aggregated_lat = aggregated_lat[non_zero_counts] - # count_obs = count_obs[non_zero_counts] - - # if aggregated_obs_err is not None: - # aggregated_obs_err = aggregated_obs_err[non_zero_counts] - # TODO: Add a function that takes the center or aggreagated value # Return the results @@ -308,19 +298,6 @@ class RotatedGrid(Grid): if aggregated_obs_err is not None: aggregated_obs_err[non_zero_counts] /= count_obs[non_zero_counts] - - # # Mask for zero counts - # aggregated_obs = aggregated_obs[non_zero_counts] - # aggregated_lon = aggregated_lon[non_zero_counts] - # aggregated_lat = aggregated_lat[non_zero_counts] - # aggregated_rlon = aggregated_rlon[non_zero_counts] - # aggregated_rlat = aggregated_rlat[non_zero_counts] - # count_obs = count_obs[non_zero_counts] - - # if aggregated_obs_err is not None: - # aggregated_obs_err = aggregated_obs_err[non_zero_counts] - - # Return the results lon_out = aggregated_lon.flatten() lat_out = aggregated_lat.flatten() diff --git a/mapies/mapies.py b/mapies/mapies.py index 9bde90ce5ef25d82eb8dac994a0c64e8c01b4a3f..17d9bac5a5e80acd002c62a406090c32c17b73aa 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -235,8 +235,33 @@ class MAPIES: # return reindexed values return independent_var_values - - + def quality_assurance(self): + """ + Applying quality assurance flags to mapies data + """ + mask = np.zeros_like(self.obs, dtype=bool) + for i, (flag, info) in enumerate(self.qualityFlags.items()): + + flag_mask = np.zeros_like(self.obs, dtype=bool) + arr = self.ds[info["qa_name"]].values.flatten() + arr = self.reindex(self.time_values_index, arr) + + + for value in info["qa_values"]: + flag_mask |= (arr == value) + if i == 0: + mask = flag_mask + else: + if info["qa_combined"]: + mask &= flag_mask + else: + mask |= flag_mask + + self.obs = self.obs[mask] + self.lon_values = self.lon_values[mask] + self.lat_values = self.lat_values[mask] + + @staticmethod def to_xarray(coords:dict, data_vars:dict, **kwargs): """ @@ -308,7 +333,14 @@ class MAPIES: print(f"Saved plot: {filepath}") @timeit - def plot_2D_num_obs_custom(self, lon_values=None, lat_values=None, num_obs=None, outdir="./", title=None, filename=None,): + def plot_2D_num_obs_custom( + self, lon_values=None, + lat_values=None, + num_obs=None, + outdir="./", + title=None, + filename=None, + ): """ General method for plotting the number of observations contributing to the final averages. """ @@ -352,7 +384,6 @@ class MAPIES: plt.close(fig) print(f"Saved plot: {filepath}") - @timeit def plot_2D_num_obs_custom_log_option( self, lon_values=None, lat_values=None, num_obs=None, outdir="./", diff --git a/mapies/tests/test_func_tools.py b/mapies/tests/test_func_tools.py index 23f8731604439eedee7bcbee5ecce1fedeb25802..52afee646630899855d3b23b4797b2eb870492a7 100644 --- a/mapies/tests/test_func_tools.py +++ b/mapies/tests/test_func_tools.py @@ -2,8 +2,9 @@ import pytest import pandas as pd +import xarray as xr import numpy as np -from mapies.util.func_tools import time_converter, time_domain_selection, geo_to_rot +from mapies.util.func_tools import time_converter, time_domain_selection, geo_to_rot, quality_assurance @@ -35,6 +36,28 @@ def test_time_converter_error(): def test_geo_to_rot(test_input, expected): assert (round(geo_to_rot(*test_input)[0], 8), round(geo_to_rot(*test_input)[1], 8)) == expected +ds = xr.Dataset( + data_vars={"flag1":np.array([0, 0, 0, 1]), "flag2":np.array([1, 0, 1, 1])} +) +obs = np.array([1, 2, 3, 4]) +lon = np.array([1, 2, 3, 4]) +lat = np.array([1, 2, 3, 4]) +qualityFlags1 = {"qa_flag1":{"qa_name":"flag1", "qa_values":[1], "qa_combined":False}, "qa_flag2":{"qa_name":"flag2", "qa_values":[1], "qa_combined":False}} +qualityFlags2 = {"qa_flag1":{"qa_name":"flag1", "qa_values":[1], "qa_combined":True}, "qa_flag2":{"qa_name":"flag2", "qa_values":[1], "qa_combined":True}} +qualityFlags3 = {"qa_flag1":{"qa_name":"flag1", "qa_values":[1], "qa_combined":True}, "qa_flag2":{"qa_name":"flag2", "qa_values":[1], "qa_combined":False}} + +@pytest.mark.parametrize( + "test_input,expected", + [ + ((ds, obs, lat, lon, qualityFlags1), (np.array([1, 3, 4]), np.array([1, 3, 4]), np.array([1, 3, 4]))), + ((ds, obs, lat, lon, qualityFlags2), (np.array([4]), np.array([4]), np.array([4]))), + ((ds, obs, lat, lon, qualityFlags3), (np.array([1, 3, 4]), np.array([1, 3, 4]), np.array([1, 3, 4]))), + ] +) +def test_quality_assurance(test_input, expected): + assert quality_assurance(*test_input)[0].all() == expected[0].all() + assert quality_assurance(*test_input)[1].all() == expected[1].all() + assert quality_assurance(*test_input)[2].all() == expected[2].all() """ @pytest.mark.parametrize( "test_input,expected", diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index e7e905ded4bdd074b1b67da4405454de1e8b3f67..0722ec2e5d6fe67df05ff0a5108e4e06a431d12b 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -30,7 +30,7 @@ def get_filepaths(pattern): """ Get file paths from a pattern """ - return glob(pattern) + return glob(pattern, recursive=True) # Function from Guillaume's script could be useful # Only supported in python3.10 @@ -81,7 +81,33 @@ def time_converter(date:str) -> pd.Timestamp: return date - +def quality_assurance(ds, obs, lat, lon, qualityFlags): + """ + Applying quality assurance flags to mapies data + """ + mask = np.zeros_like(obs, dtype=bool) + + for i, (flag, info) in enumerate(qualityFlags.items()): + + flag_mask = np.zeros_like(obs, dtype=bool) + arr = ds[info["qa_name"]].values.flatten() + for value in info["qa_values"]: + flag_mask |= (arr == value) + + if i == 0: + mask = flag_mask + else: + if info["qa_combined"]: + mask &= flag_mask + else: + mask |= flag_mask + + obs = obs[mask] + lat = lat[mask] + lon = lon[mask] + + + return obs, lat, lon # Time Domain Selection function based off of xarray Will this work with dask too, a question for later def time_domain_selection( diff --git a/mapies/viirs.py b/mapies/viirs.py index 16c2f2bdbafa1312846bbfe915e8be5ba02d8a17..88cc64ec038be9fb4bec03b80ddaaf13a78bea9d 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -53,29 +53,28 @@ def rotate_function(r, lon, lat, obs): else: raise ValueError("Invalid grid representation") -def process_single_file(r, file, obs_var, lat_var, lon_var): + +def process_single_file(r, file, obs_var, lat_var, lon_var, apply_qa, qualityFlags): """ Process a single file and return aggregated results. """ - try: - # Attempt to open the dataset - ds = xr.open_dataset(file, engine="h5netcdf") # Open the file - obs = ds[obs_var].values.flatten() - lat = ds[lat_var].values.flatten() - lon = ds[lon_var].values.flatten() + ds = xr.open_dataset(file, engine="h5netcdf") # Open the file + obs = ds[obs_var].values.flatten() + lat = ds[lat_var].values.flatten() + lon = ds[lon_var].values.flatten() + + if apply_qa: + obs, lat, lon = quality_assurance(ds, obs, lat, lon, qualityFlags) + + # Rotate and aggregate + lon_agg, lat_agg, obs_agg, count_obs = rotate_function(r, lon, lat, obs) - # Rotate and aggregate - lon_agg, lat_agg, obs_agg, count_obs = rotate_function(r, lon, lat, obs) - ds.close() # Close the file to free memory - return obs_agg, lat_agg, lon_agg, count_obs + ds.close() # Close the file to free memory + return obs_agg, lat_agg, lon_agg, count_obs - except Exception as e: - # Log the error and skip the file - print(f"Error processing file {file}: {e}") - return None # Indicate that this file was not processed -def process_batch(r, batch_files, obs_var, lat_var, lon_var, chunk_size): +def process_batch(r, batch_files, obs_var, lat_var, lon_var, chunk_size, qualityFlags): """ Process a batch of files to calculate cumulative sums and counts. @@ -139,20 +138,10 @@ class VIIRS(MAPIES): self.dest = kwargs.get("dest") self.indir = kwargs.get("indir") - # ============================================================================= - # CARLOTTA ADDED THIS - # ============================================================================= - # self.year = kwargs.get("year", 2024) # Default to 2024 - # self.months = kwargs.get("months", list(range(1, 13))) # Default to all months (1-12) - # self.monthly_files = {} # Dictionary to store files by month - self.year = int(start_date[:4]) self.start_day = int(start_date[6:8]) self.end_day = int(end_date[6:8]) self.monthly_dict = {} - # ============================================================================= - # - # ============================================================================= self.dates_slice = pd.date_range( self.start_date, @@ -219,6 +208,16 @@ class VIIRS(MAPIES): else: self.grid_repr = da_dict["grid_repr"] #str + qa_dict = config[self.datatype]["qa"] + #Check if the user wants qa flags + apply_qa = kwargs.get("apply_qa") + + if apply_qa is None: + self.apply_qa = True + else: + self.apply_qa = apply_qa + self.qualityFlags = qa_dict["qa_variables"] + @timeit @@ -229,6 +228,7 @@ class VIIRS(MAPIES): print(f'Inside preprocess_vars') super().preprocess_vars() + # Lon and lat values lon_dims = self.ds[self.lon_var].dims lon_shape = self.ds[self.lon_var].shape @@ -266,6 +266,9 @@ class VIIRS(MAPIES): self.obs, ) + # Apply QA + self.quality_assurance() + @timeit def rotate(self): @@ -273,11 +276,11 @@ class VIIRS(MAPIES): Perform Rotation of Grid representation """ # Calculate the grid representation - r = RotatedGrid(dlon=.1, dlat=.1, west=-51, south=-35, centre_lon=20, centre_lat=35) - #r = RegularGrid(dlon=.1, dlat=.1, west=-180, south=-90) + #r = RotatedGrid(dlon=.1, dlat=.1, west=-51, south=-35, centre_lon=20, centre_lat=35) + r = RegularGrid(dlon=.1, dlat=.1, west=-180, south=-90) # Aggregate the the observations to the grid representation - lon, lat, rlon, rlat, obs, count_obs = r.aggregate(self.lon_values, self.lat_values, self.obs) - #lon, lat, obs, count_obs = r.aggregate(self.lon_values, self.lat_values, self.obs) + #lon, lat, rlon, rlat, obs, count_obs = r.aggregate(self.lon_values, self.lat_values, self.obs) + lon, lat, obs, count_obs = r.aggregate(self.lon_values, self.lat_values, self.obs) self.lon_values = lon self.lat_values = lat self.obs = obs @@ -293,8 +296,7 @@ class VIIRS(MAPIES): for date in self.dates_slice: # Convert to Julian day date = datetime.strptime(date, '%Y%m%d%H%M').strftime('%Y%j') - # filepaths = f'{self.indir}/{date[0:4]}/{date[4:]}/AERDB_L2_VIIRS_NOAA20.A{date}*' - filepaths = f'{self.indir}/2024/**/AERDB_L2_VIIRS_NOAA20.A{date}*' + filepaths = f'{self.indir}/**/AERDB_L2_VIIRS_NOAA20.A{date}*' file_patterns.append(filepaths) # Add pattern to the set files = sorted(get_file_list(file_patterns)) @@ -425,15 +427,9 @@ class VIIRS(MAPIES): super().plot_2D_obs_custom() -# ============================================================================= -# MY NEW METHODS -# ============================================================================= @timeit def plot_2D_observations(self, months=None, outdir=None): - # if isinstance(months, int): - # months = [months] - # if 0 in months: if months == [0]: try: @@ -529,6 +525,7 @@ class VIIRS(MAPIES): filename=filename, ) + def process_data(self, monthly_avg=True, batch_size=100): """ @@ -684,7 +681,9 @@ class VIIRS(MAPIES): Returns: - lon_final, lat_final, obs_final: Aggregated longitude, latitude, and observation arrays. """ - r = RotatedGrid(centre_lon=20, centre_lat=35, dlon=.1, dlat=.1, west=-51, south=-35) + #r = RotatedGrid(centre_lon=20, centre_lat=35, dlon=.1, dlat=.1, west=-51, south=-35) + r = RegularGrid(dlon=.1, dlat=.1, west=-180, south=-90) + month_files = self.monthly_dict[month]["files"] print(f"Processing {len(month_files)} files for month {month}") @@ -704,7 +703,7 @@ class VIIRS(MAPIES): print(f"Processing a batch of {len(batch)} files...") # Use multiprocessing to process files within the batch - args = [(r, file, self.obs_var, self.lat_var, self.lon_var) for file in batch] + args = [(r, file, self.obs_var, self.lat_var, self.lon_var, self.apply_qa, self.qualityFlags) for file in batch] with multiprocessing.Pool(processes=8) as pool: # Adjust number of processes as needed results = pool.starmap(process_single_file, args) @@ -750,5 +749,4 @@ class VIIRS(MAPIES): print(f"Completed processing for month {month}.") - return final_lon, final_lat, final_obs, cumulative_count - + return final_lon, final_lat, final_obs, cumulative_count \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 940b48d0705b6be41fe68d404d23914e9f34d403..aadd1aa32fba3f5bc38e4b0d5aff0105840edd48 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ - attrs==24.2.0 Cartopy==0.22.0 certifi==2024.8.30 diff --git a/run/run_viirs.py b/run/run_viirs.py index 09050fc90270963aef9f1b49614be26408620362..3b3d13ba566997df624fe667f6ad9085ce877224 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -5,39 +5,15 @@ import time if __name__ == "__main__": # Change these variables - # 00:30 to 02:36 from 2024-01-01 to 2024-01-03 - # start_date = "202401010030" - # end_date = "202401030236" + start_date = "202401010000" + end_date = "202401020000" + outdir="/esarchive/scratch/cmeikle/" + indir="/esarchive/obs/nasa/viirs_noaa20_aerdb_l2/original_files/VIIRS" + c = VIIRS(start_date, end_date, frequency="D", dest=outdir, indir=indir, apply_qa=True) - # 00:30 to 03:36 of 2024-01-01 - # start_date = "202401010030" - # end_date = "202401010336" - - # ONE YEAR - start_date = "202401300000" - # end_date = "202412312359" - end_date = "202402022359" - - # ONE MONTH - JANUARY 2024 - # start_date = "202407010000" - # end_date = "202407312359" - - # outdir="/home/cmeikle/Projects/data/" - # indir="/home/cmeikle/Projects/data/VIIRS/original_files/AERDB_L2_VIIRS_NOAA20" - outdir = "/home/cgile/Documents/mapies/mapies/figures" - # indir = "/home/cgile/bscearth000/esarchive/obs/nasa/viirs_noaa20_aerdb_l2/original_files/VIIRS" - indir = "/home/cgile/Documents/mapies/VIIRS" - - start_time = time.time() - - c = VIIRS(start_date, end_date, frequency="D", dest=outdir, indir=indir) + # Monthly average plot (in 'month' variables specify the month) c.read_nc() - c.process_data(monthly_avg=True, batch_size = 100) - c.yearly_average() - c.plot_2D_observations(months=None, outdir=outdir) - # c.plot_2D_num_obs(months=None, outdir=outdir) - end_time = time.time() - elapsed_time = end_time - start_time - # print(f"Script executed in {elapsed_time:.2f} seconds.") - - + c.process_data(monthly_avg=False, batch_size = 20) + #c.plot_2D_obs() + c.plot_2D_observations(months=[0], outdir=outdir) + #c.plot_2D_num_obs(months=[0], outdir=outdir) \ No newline at end of file