From f2c24b454d4b372b3e3400a57498b0323252da93 Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Tue, 14 Jan 2025 17:33:53 +0100 Subject: [PATCH 1/8] Added quality assurance function --- mapies/config/satellite_config.yaml | 9 +++++++++ mapies/mapies.py | 21 ++++++++++++++++++++- mapies/viirs.py | 15 ++++++++++++++- run/run_viirs.py | 4 ++-- 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/mapies/config/satellite_config.yaml b/mapies/config/satellite_config.yaml index fed6c4fc0..029f568f0 100644 --- a/mapies/config/satellite_config.yaml +++ b/mapies/config/satellite_config.yaml @@ -11,6 +11,15 @@ viirs: obsid: 34 uncertainty_constants: [0.2, 0.05, 0.02] grid_repr: "rotated" + qa: + qa_variables: + qa_land: + qa_name: "Aerosol_Optical_Thickness_QA_Flag_Land" + qa_values: [2, 3] + qa_ocean: + qa_name: "Aerosol_Optical_Thickness_QA_Flag_Ocean" + qa_values: [2, 3] + tropomi: variables: diff --git a/mapies/mapies.py b/mapies/mapies.py index 05517be84..6ee107b7c 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -121,8 +121,27 @@ class MAPIES: # return reindexed values return independent_var_values + def quality_assurance(self): + """ + Applying quality assurance flags to mapies data + """ + mask = np.zeros_like(self.obs, dtype=bool) + + for flag, info in self.qualityFlags.items(): + + arr = self.ds[info["qa_name"]].values.flatten() + arr = self.reindex(self.time_values_index, arr) + + for value in info["qa_values"]: + mask |= (arr == value) + mask &= mask + + self.obs = self.obs[mask] + self.lon_values = self.lon_values[mask] + self.lat_values = self.lat_values[mask] - + + @staticmethod def to_xarray(coords:dict, data_vars:dict, **kwargs): """ diff --git a/mapies/viirs.py b/mapies/viirs.py index 00d211101..b862b5b76 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -203,6 +203,16 @@ class VIIRS(MAPIES): else: self.grid_repr = da_dict["grid_repr"] #str + qa_dict = config[self.datatype]["qa"] + #Check if the user wants qa flags + apply_qa = kwargs.get("apply_qa") + + if apply_qa is None: + self.apply_qa = True + else: + self.apply_qa = apply_qa + self.qualityFlags = qa_dict["qa_variables"] + @timeit @@ -212,6 +222,7 @@ class VIIRS(MAPIES): """ super().preprocess_vars() + # Lon and lat values lon_dims = self.ds[self.lon_var].dims lon_shape = self.ds[self.lon_var].shape @@ -248,6 +259,9 @@ class VIIRS(MAPIES): self.obs, ) + # Apply QA + self.quality_assurance() + @timeit def rotate(self): @@ -277,7 +291,6 @@ class VIIRS(MAPIES): for date in self.dates_slice: date = datetime.strptime(date, '%Y%m%d%H%M').strftime('%Y%j') filepaths = f'{self.indir}/{date[0:4]}/{date[4:]}/AERDB_L2_VIIRS_NOAA20.A{date}*' - print(filepaths) file_patterns.append(filepaths) # TODO: maybe change this to NETCDF4 diff --git a/run/run_viirs.py b/run/run_viirs.py index ec6858d9e..4bc456923 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -6,8 +6,8 @@ if __name__ == "__main__": # Change these variables start_date = "202404230730" end_date = "202404231830" - outdir="/home/cmeikle/Projects/data/" - indir="/home/cmeikle/Projects/data/VIIRS/original_files/AERDB_L2_VIIRS_NOAA20" + outdir="/esarchive/scratch/cmeikle/" + indir="/esarchive/obs/nasa/viirs_noaa20_aerdb_l2_nrt//original_files/VIIRS/" c = VIIRS(start_date, end_date, frequency="3H", dest=outdir, indir=indir) c.read_nc() c.preprocess_vars() -- GitLab From 33b4a562b5229042f33d33e47fd9d906d4b019eb Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Sat, 18 Jan 2025 20:12:06 +0100 Subject: [PATCH 2/8] Updated with some of processes --- mapies/grids/monarch.py | 12 +-- mapies/mapies.py | 6 +- mapies/util/func_tools.py | 19 ++++ mapies/viirs.py | 195 ++++++++++++++++++++++++++++++++++---- run/run_viirs.py | 18 ++-- 5 files changed, 212 insertions(+), 38 deletions(-) diff --git a/mapies/grids/monarch.py b/mapies/grids/monarch.py index 1a4e2b034..593673180 100644 --- a/mapies/grids/monarch.py +++ b/mapies/grids/monarch.py @@ -196,13 +196,13 @@ class RegularGrid(Grid): aggregated_obs_err[non_zero_counts] /= count_obs[non_zero_counts] # Mask for zero counts - aggregated_obs = aggregated_obs[non_zero_counts] - aggregated_lon = aggregated_lon[non_zero_counts] - aggregated_lat = aggregated_lat[non_zero_counts] - count_obs = count_obs[non_zero_counts] + #aggregated_obs = aggregated_obs[non_zero_counts] + #aggregated_lon = aggregated_lon[non_zero_counts] + #aggregated_lat = aggregated_lat[non_zero_counts] + #count_obs = count_obs[non_zero_counts] - if aggregated_obs_err is not None: - aggregated_obs_err = aggregated_obs_err[non_zero_counts] + #if aggregated_obs_err is not None: + #aggregated_obs_err = aggregated_obs_err[non_zero_counts] # TODO: Add a function that takes the center or aggreagated value diff --git a/mapies/mapies.py b/mapies/mapies.py index 6ee107b7c..044368321 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -160,11 +160,11 @@ class MAPIES: """ General method for plotting observations with CB-RdYlBu colormap and arrow-like ends for out-of-bound values. """ - if lon_values == None: + if lon_values is None: lon_values = self.lon_values - if lat_values == None: + if lat_values is None: lat_values = self.lat_values - if obs_values == None: + if obs_values is None: obs_values = self.obs # Set Basemap projection diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index 7bcc9cef4..017317d78 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -74,7 +74,26 @@ def time_converter(date:str) -> pd.Timestamp: return date +def quality_assurance(ds, obs, lon , lat, qualityFlags): + """ + Applying quality assurance flags to mapies data + """ + mask = np.zeros_like(obs, dtype=bool) + for flag, info in qualityFlags.items(): + + arr = ds[info["qa_name"]].values.flatten() + #arr = self.reindex(self.time_values_index, arr) + + for value in info["qa_values"]: + mask |= (arr == value) + mask &= mask + + obs = obs[mask] + lon = lon[mask] + lat = lat[mask] + + return obs, lon, lat # Time Domain Selection function based off of xarray Will this work with dask too, a question for later def time_domain_selection( diff --git a/mapies/viirs.py b/mapies/viirs.py index b862b5b76..7c134d8ec 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -32,20 +32,22 @@ os.environ["OMP_NUM_THREADS"] = "8" def rotate_function(r, lon, lat, obs): if isinstance(r, RotatedGrid): - # Aggregate the observations to the grid representation - lon_agg, lat_agg, rlon_agg, rlat_agg, obs_agg = r.aggregate(lon, lat, obs) - return lon_agg, lat_agg, obs_agg + lon_agg, lat_agg, rlon_agg, rlat_agg, obs_agg, count_obs = r.aggregate(lon, lat, obs) + + return lon_agg, lat_agg, obs_agg, count_obs elif isinstance(r, RegularGrid): - lon_agg, lat_agg, obs_agg = r.aggregate(lon, lat, obs) - return lon_agg, lat_agg, obs_agg + lon_agg, lat_agg, obs_agg, count_obs = r.aggregate(lon, lat, obs) + + return lon_agg, lat_agg, obs_agg, count_obs else: raise ValueError("Invalid grid representation") -def process_batch(r, batch_files, obs_var, lat_var, lon_var, chunk_size): + +def process_batch(r, batch_files, obs_var, lat_var, lon_var, chunk_size, qualityFlags): """ Process a batch of files to calculate cumulative sums and counts. @@ -78,9 +80,11 @@ def process_batch(r, batch_files, obs_var, lat_var, lon_var, chunk_size): lat = ds[lat_var].values.flatten() lon = ds[lon_var].values.flatten() + quality_assurance(ds, obs, lat, lon, qualityFlags) + # Rotate and aggregate t1 = time.perf_counter() - lon_agg, lat_agg, obs_agg = rotate_function(r,lon, lat, obs) + lon_agg, lat_agg, obs_agg, count_obs = rotate_function(r,lon, lat, obs) t2 = time.perf_counter() print(f'Finished rotation in time {t2-t1}', flush=True) @@ -134,9 +138,10 @@ class VIIRS(MAPIES): self.months = list(range(self.start_month, self.end_month + 1)) # Dictionary to store files by month self.start_day = int(start_date[6:8]) self.end_day = int(end_date[6:8]) - self.monthly_files = {} # Dictionary to store files by month + self.monthly_dict = {} + # ============================================================================= + # # ============================================================================= - self.dates_slice = pd.date_range( self.start_date, @@ -286,23 +291,98 @@ class VIIRS(MAPIES): """ Read netcdf files with xarray """ - # Take in start and end date and convert to julian dates + # # Take in start and end date and convert to julian dates + # file_patterns = [] + # for date in self.dates_slice: + # date = datetime.strptime(date, '%Y%m%d%H%M').strftime('%Y%j') + # filepaths = f'{self.indir}/{date[0:4]}/{date[4:]}/AERDB_L2_VIIRS_NOAA20.A{date}*' + # print(filepaths) + # file_patterns.append(filepaths) + + # # TODO: maybe change this to NETCDF4 + # files = get_file_list(file_patterns) + # print(f'Total number of files: {len(files)}') + # for file in files: + # print(file) + + # file_patterns = set() # Use a set to avoid duplicate file patterns file_patterns = [] + print(f' Date slice = {self.dates_slice}') for date in self.dates_slice: + # Convert to Julian day date = datetime.strptime(date, '%Y%m%d%H%M').strftime('%Y%j') - filepaths = f'{self.indir}/{date[0:4]}/{date[4:]}/AERDB_L2_VIIRS_NOAA20.A{date}*' - file_patterns.append(filepaths) + # filepaths = f'{self.indir}/{date[0:4]}/{date[4:]}/AERDB_L2_VIIRS_NOAA20.A{date}*' + filepaths = f'{self.indir}/**/**/AERDB_L2_VIIRS_NOAA20.A{date}*' + file_patterns.append(filepaths) # Add pattern to the set + # print(filepaths) + + # Convert the set back to a list + # files = sorted(get_file_list(file_patterns)) + files = sorted(set(get_file_list(file_patterns))) + print(f"Total number of files: {len(files)}") + # for file in files: + # print(file) + + start_dt = datetime.strftime(self.start_date, "%Y%m%d%H%M") + end_dt = datetime.strftime(self.end_date, "%Y%m%d%H%M") + + start_dt = datetime.strptime(start_dt, "%Y%m%d%H%M") + end_dt = datetime.strptime(end_dt, "%Y%m%d%H%M") + # print(f'Start date: {start_dt}, End date: {end_dt}') + + files_filtered = [] + for file in files: + parts = file.split('.') + date = parts[1][1:] + hhmm = parts[2] + # print(f'Date: {date}, HHMM: {hhmm}') + + file_dt = datetime.strptime(date + hhmm, "%Y%j%H%M") + if start_dt <= file_dt <= end_dt: + files_filtered.append(file) + + first_idx = max(0, files.index(files_filtered[0]) - 2) + last_idx = min(len(files) - 1, files.index(files_filtered[-1]) + 2) + self.files = files[first_idx : last_idx + 1] + + print(f"Total number of filtered files: {len(self.files)}") + # print("Valid files:") + # for f in self.files: + # print(f) + + for file in self.files: + parts = file.split('.') + julian_day = parts[1][1:] + # print(f'Julian day: {julian_day}') + date = datetime.strptime(julian_day, "%Y%j") + if date.month not in self.monthly_dict: + self.monthly_dict[date.month] = { + "files": [], # List to store file paths + "lon": None, # Longitudes array + "lat": None, # Latitudes array + "obs": None, # Observations array + "count": None, # Count of observations used to compute the average + } + + # Append the file to the list of files for the corresponding month + self.monthly_dict[date.month]["files"].append(file) + + print(f"Discovered files for months: {list(self.monthly_dict.keys())}") + for month, data in self.monthly_dict.items(): + print(f" Month {month:02d}: {len(data['files'])} files") + + # print("Dictionary") + # print(self.monthly_dict) + - # TODO: maybe change this to NETCDF4 - files = get_file_list(file_patterns) + # # A bit of preprocessing to start before reading in the files + # def preprocess(ds): + # ds = ds.expand_dims(dim={"index": [ds.attrs["product_name"]]}) + # return ds - # A bit of preprocessing to start before reading in the files - def preprocess(ds): - ds = ds.expand_dims(dim={"index": [ds.attrs["product_name"]]}) - return ds + # # Open dataset with xarray and dask + # self.ds = xr.open_mfdataset(files, preprocess=preprocess) - # Open dataset with xarray and dask - self.ds = xr.open_mfdataset(files, preprocess=preprocess) @timeit def to_da(self): @@ -422,6 +502,79 @@ class VIIRS(MAPIES): filename=filename, ) + def process_data(self, monthly_avg=True, batch_size=100): + """ + Process the data for the specified year and months. + + Parameters: + - monthly_avg: bool, whether to compute single monthly averages. + - batch_size: int, the number of files to process in each batch. + - chunk_size: int, the size of chunks for processing within each batch. + + Returns: + - if monthly_avg --> lon_final, lat_final, obs_final: Aggregated longitude, latitude, and observation arrays. + - if not monthly_avg --> preprocess_vars() is called. + """ + if monthly_avg: + for month in self.monthly_dict.keys(): + # print(f'Month: {self.monthly_dict[month]}') + try: + # Process the data for the current month + final_lon, final_lat, final_obs, cumulative_count = self.process_monthly_data( + month, batch_size=batch_size + ) + # final_lon, final_lat, final_obs, cumulative_count = self.process_files_in_blocks(month, block_size=300, batch_size=batch_size) + + # Update the monthly dictionary + self.monthly_dict[month] = { + "lon": final_lon, + "lat": final_lat, + "obs": final_obs, + "count": cumulative_count, + } + print(f'Updated monthly dictionary for month {month}') + print(f'Lon shape: {final_lon.shape}, Lat shape: {final_lat.shape}, Obs shape: {final_obs.shape}') + if np.all(final_lon == 0) or np.all(np.isnan(final_lon)): + print(f"All longitude values for month {month} are 0 or NaN.") + if np.all(final_lat == 0) or np.all(np.isnan(final_lat)): + print(f"All latitude values for month {month} are 0 or NaN.") + if np.all(final_obs == 0) or np.all(np.isnan(final_obs)): + print(f"All observation values for month {month} are 0 or NaN.") + + except Exception as e: + print(f"Error processing data for month {month}: {e}") + continue + else: + # Check if the number of files is too big + if isinstance(self.start_date, str): + start_dt = datetime.strptime(self.start_date, "%Y%m%d%H%M") + else: + start_dt = self.start_date + + if isinstance(self.end_date, str): + end_dt = datetime.strptime(self.end_date, "%Y%m%d%H%M") + else: + end_dt = self.end_date + + time_difference = (end_dt - start_dt).days + + if time_difference > 3: + raise ValueError(f"The time difference between start_date ({self.start_date}) " + f"and end_date ({self.end_date}) exceeds 3 days. Under development.") + + def preprocess(ds): + ds = ds.expand_dims(dim={"index": [ds.attrs["product_name"]]}) + return ds + try: + print(f"Processing {len(self.files)} files") + self.ds = xr.open_mfdataset(self.files, preprocess=preprocess) + if self.ds is None: + raise ValueError("Failed to initialize dataset `self.ds`.") + self.preprocess_vars() + except Exception as e: + print(f"Error processing data: {e}") + return + def julian_to_date(self, julian_date): """ @@ -491,7 +644,7 @@ class VIIRS(MAPIES): batches = [ month_files[i:i + batch_size] for i in range(0, len(month_files), batch_size) ] - args = [(r, batch, self.obs_var, self.lat_var, self.lon_var, chunk_size) for batch in batches] + args = [(r, batch, self.obs_var, self.lat_var, self.lon_var, chunk_size, self.qualityFlags) for batch in batches] # Use multiprocessing to process batches with multiprocessing.Pool(processes=8) as pool: diff --git a/run/run_viirs.py b/run/run_viirs.py index 4bc456923..ff4252afc 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -4,17 +4,19 @@ import time if __name__ == "__main__": # Change these variables - start_date = "202404230730" - end_date = "202404231830" + start_date = "202401010000" + end_date = "202402010000" outdir="/esarchive/scratch/cmeikle/" - indir="/esarchive/obs/nasa/viirs_noaa20_aerdb_l2_nrt//original_files/VIIRS/" + indir="/esarchive/obs/nasa/viirs_noaa20_aerdb_l2/original_files/VIIRS/" c = VIIRS(start_date, end_date, frequency="3H", dest=outdir, indir=indir) - c.read_nc() - c.preprocess_vars() + #c.read_nc() + #c.preprocess_vars() #c.plot_2D_obs() - c.plot_2D_obs_custom() + #c.plot_2D_obs_custom() #c.to_da() # Monthly average plot (in 'month' variables specify the month) - #c.discover_files() - #c.plot_2D_average(month=1, outdir=outdir, batch_size = 10, chunk_size = 1) + c.read_nc() + c.process_data(monthly_avg=True, batch_size = 100) + c.plot_2D_observations(months=None, outdir=outdir) + c.plot_2D_num_obs(months=None, outdir=outdir) -- GitLab From beffac52d27ec97dc71b25c38b4e8b99288cb5be Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Mon, 20 Jan 2025 08:58:33 +0100 Subject: [PATCH 3/8] Running month --- mapies/mapies.py | 50 ++++++++++++ mapies/viirs.py | 199 ++++++++++++++++++++++++++++++++++++++--------- run/run_viirs.py | 10 +-- 3 files changed, 217 insertions(+), 42 deletions(-) diff --git a/mapies/mapies.py b/mapies/mapies.py index 044368321..69b9d68c3 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -213,3 +213,53 @@ class MAPIES: plt.savefig(filepath, format="png") plt.close(fig) print(f"Saved plot: {filepath}") + + @timeit + def plot_2D_num_obs_custom(self, lon_values=None, lat_values=None, num_obs=None, outdir="./", title=None, filename=None): + """ + General method for plotting the number of observations contributing to the final averages. + """ + + # Set Basemap projection + figsize = (15, 10) + markersize = 2.5 + proj = ccrs.PlateCarree() + + # Define discrete levels for the color bar + unique_values = np.unique(num_obs) # Unique values in num_obs + levels = np.linspace(unique_values.min(), unique_values.max(), unique_values.max()-unique_values.min()+1) + print(unique_values) + print(unique_values.min()) + print(unique_values.max()+0.5) + print(np.arange(unique_values.min()-0.5, unique_values.max()+0.5, 1)) + print(levels) + + # Use a discrete colormap + cmap = plt.cm.rainbow + norm = mcolors.BoundaryNorm(np.arange(unique_values.min()-0.5, unique_values.max()+1, 1), cmap.N) + + # Create the plot and add features + fig, ax = plt.subplots(subplot_kw={"projection": proj}, figsize=figsize) + ax.gridlines() + ax.coastlines(resolution="10m") + + # Scatter plot for the number of observations + im = ax.scatter( + lon_values, lat_values, markersize, c=num_obs, cmap=cmap, norm=norm, transform=proj + ) + + # Add the discrete colorbar without arrow-like ends + cbar = fig.colorbar(im, ax=ax, orientation="vertical", ticks=levels) + cbar.set_label("Number of Observations") + # cbar.ax.set_yticklabels([f"{level}" for level in range(unique_values)]) + + # Add title and save the plot + ax.set_title(title or "Number of Observations Plot") + + # Ensure the output directory exists + os.makedirs(outdir, exist_ok=True) + + filepath = os.path.join(self.dest, filename or "num_observation_2D_plot.png") + plt.savefig(filepath, format="png") + plt.close(fig) + print(f"Saved plot: {filepath}") diff --git a/mapies/viirs.py b/mapies/viirs.py index 7c134d8ec..ec340d73f 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -46,6 +46,26 @@ def rotate_function(r, lon, lat, obs): else: raise ValueError("Invalid grid representation") +def process_single_file(r, file, obs_var, lat_var, lon_var, apply_qa, qualityFlags): + """ + Process a single file and return aggregated results. + """ + ds = xr.open_dataset(file, engine="h5netcdf") # Open the file + obs = ds[obs_var].values.flatten() + lat = ds[lat_var].values.flatten() + lon = ds[lon_var].values.flatten() + + if apply_qa: + print(f"Applying Quality assurance flags on file {file}") + quality_assurance(ds, obs, lat, lon, qualityFlags) + + # Rotate and aggregate + lon_agg, lat_agg, obs_agg, count_obs = rotate_function(r, lon, lat, obs) + + + ds.close() # Close the file to free memory + return obs_agg, lat_agg, lon_agg, count_obs + def process_batch(r, batch_files, obs_var, lat_var, lon_var, chunk_size, qualityFlags): """ @@ -312,7 +332,7 @@ class VIIRS(MAPIES): # Convert to Julian day date = datetime.strptime(date, '%Y%m%d%H%M').strftime('%Y%j') # filepaths = f'{self.indir}/{date[0:4]}/{date[4:]}/AERDB_L2_VIIRS_NOAA20.A{date}*' - filepaths = f'{self.indir}/**/**/AERDB_L2_VIIRS_NOAA20.A{date}*' + filepaths = f'{self.indir}/**/AERDB_L2_VIIRS_NOAA20.A{date}*' file_patterns.append(filepaths) # Add pattern to the set # print(filepaths) @@ -383,6 +403,106 @@ class VIIRS(MAPIES): # # Open dataset with xarray and dask # self.ds = xr.open_mfdataset(files, preprocess=preprocess) + @timeit + def plot_2D_observations(self, months=None, outdir=None): + + # if isinstance(months, int): + # months = [months] + + # if 0 in months: + if months == [0]: + try: + self.plot_2D_obs() + except Exception as e: + print(f"Error plotting all data: {e}") + return + elif months is not None: + # elif m in months: + for m in months: + try: + month_data = self.monthly_dict[m] # Access the dictionary for this month + # files = month_data["files"] # List of files for the month (not used for plotting) + lon_values = month_data["lon"] + lat_values = month_data["lat"] + obs_values = month_data["obs"] + except Exception as e: + print(f"Error for month {m}: {e}") + continue + # title = f"Observation 2D plot of {self.datatype.upper()} data from {self.start_date} to {self.end_date}" + title = f"Observation 2D plot of month {m} from {self.start_date} to {self.end_date}" + filename = f"{outdir}/{self.datatype}_2D_obs_{m}_{self.year}_JANUARY.png" + super().plot_2D_obs_custom( + lon_values=lon_values, + lat_values=lat_values, + obs_values=obs_values, + outdir=outdir, + title=title, + filename=filename, + ) + else: + try: + lon_values = self.yearly_lon + lat_values = self.yearly_lat + obs_values = self.yearly_obs + except Exception as e: + print(f"Error for yearly data: {e}") + return + title = f"Observation 2D plot of yearly average of year {self.year} from {self.start_date} to {self.end_date}" + filename = f"{outdir}/{self.datatype}_2D_yearly_obs_{self.year}.png" + super().plot_2D_obs_custom( + lon_values=lon_values, + lat_values=lat_values, + obs_values=obs_values, + outdir=outdir, + title=title, + filename=filename, + ) + + def plot_2D_num_obs(self, months=None, outdir=None): + + # if isinstance(months, int): + # months = [months] + if months is not None: + for m in months: + try: + month_data = self.monthly_dict[m] # Access the dictionary for this month + # files = month_data["files"] # List of files for the month (not used for plotting) + lon_values = month_data["lon"] + lat_values = month_data["lat"] + cum_count = month_data["count"] + except Exception as e: + print(f"Error for month {m}: {e}") + continue + # title = f"Observation 2D plot of {self.datatype.upper()} data from {self.start_date} to {self.end_date}" + title = f"Plot of the number of valid observations for month {m} from {self.start_date} to {self.end_date}" + filename = f"{outdir}/{self.datatype}_2D_obs_count_{m}_{self.year}_JANUARY_trial2.png" + super().plot_2D_num_obs_custom( + lon_values=lon_values, + lat_values=lat_values, + num_obs=cum_count, + outdir=outdir, + title=title, + filename=filename, + ) + else: + try: + lon_values = self.yearly_lon + lat_values = self.yearly_lat + cum_count = self.yearly_count + except Exception as e: + print(f"Error for yearly data: {e}") + return + title = f"Plot of the number of valid observations for yearly average of {self.year} from {self.start_date} to {self.end_date}" + filename = f"{outdir}/{self.datatype}_2D_obs_count_yearly_{self.year}.png" + super().plot_2D_num_obs_custom( + lon_values=lon_values, + lat_values=lat_values, + num_obs=cum_count, + outdir=outdir, + title=title, + filename=filename, + ) + @timeit def to_da(self): @@ -622,35 +742,26 @@ class VIIRS(MAPIES): @timeit - def process_monthly_data2(self, month, batch_size, chunk_size): + def process_monthly_data(self, month, batch_size): """ Process and compute the 2D monthly average for a given month/list of files using multiprocessing. Parameters: - month: int, the month to process (e.g., 1 for January). - batch_size: int, the number of files to process in each batch. - - chunk_size: int, the size of chunks for processing within each batch. Returns: - lon_final, lat_final, obs_final: Aggregated longitude, latitude, and observation arrays. """ - # r = RotatedGrid(centre_lon=20, centre_lat=35, dlon=.1, dlat=.1, west=-51, south=-35) - # r = RotatedGrid(centre_lon=0, centre_lat=0, dlat=0.1, dlon=0.1, west=-180, south=-90) - r = RegularGrid(centre_lon=0, centre_lat=0, dlat=0.5, dlon=0.5, west=-180, south=-90) - month_files = self.monthly_files[month] + #r = RotatedGrid(centre_lon=20, centre_lat=35, dlon=.1, dlat=.1, west=-51, south=-35) + r = RegularGrid(dlon=.1, dlat=.1, west=-180, south=-90) + month_files = self.monthly_dict[month]["files"] print(f"Processing {len(month_files)} files for month {month}") # Split files into batches batches = [ month_files[i:i + batch_size] for i in range(0, len(month_files), batch_size) ] - args = [(r, batch, self.obs_var, self.lat_var, self.lon_var, chunk_size, self.qualityFlags) for batch in batches] - - # Use multiprocessing to process batches - with multiprocessing.Pool(processes=8) as pool: - results = pool.starmap(process_batch, args) - - # print(f"Results from pool.starmap: {results}") # Initialize cumulative arrays cumulative_obs = None @@ -658,27 +769,42 @@ class VIIRS(MAPIES): cumulative_lon = None cumulative_count = None - t3 = time.perf_counter() - for obs, lat, lon, count in results: - if cumulative_obs is None: - # Initialize with the shape of the first result - cumulative_obs = obs - cumulative_lat = lat - cumulative_lon = lon - cumulative_count = count - else: - # Incrementally sum arrays - cumulative_obs += obs - cumulative_lat += lat - cumulative_lon += lon - cumulative_count += count - t4 = time.perf_counter() - print(f"Time to sum arrays: {t4 - t3}") - - t5 = time.perf_counter() + for batch in batches: + print(f"Processing a batch of {len(batch)} files...") + + # Use multiprocessing to process files within the batch + args = [(r, file, self.obs_var, self.lat_var, self.lon_var, self.apply_qa, self.qualityFlags) for file in batch] + with multiprocessing.Pool(processes=8) as pool: # Adjust number of processes as needed + # results = pool.starmap(process_single_file, args) + + # # Aggregate batch results + # for obs_agg, lat_agg, lon_agg, count_obs in results: + # if cumulative_obs is None: + # # Initialize cumulative arrays on the first iteration + # cumulative_obs = obs_agg * count_obs + # cumulative_lat = lat_agg * count_obs + # cumulative_lon = lon_agg * count_obs + # cumulative_count = count_obs + # else: + # cumulative_obs += obs_agg * count_obs + # cumulative_lat += lat_agg * count_obs + # cumulative_lon += lon_agg * count_obs + # cumulative_count += count_obs + for obs_agg, lat_agg, lon_agg, count_obs in pool.starmap(process_single_file, args): + # Aggregate results incrementally + if cumulative_obs is None: + cumulative_obs = obs_agg * count_obs + cumulative_lat = lat_agg * count_obs + cumulative_lon = lon_agg * count_obs + cumulative_count = count_obs + else: + cumulative_obs += obs_agg * count_obs + cumulative_lat += lat_agg * count_obs + cumulative_lon += lon_agg * count_obs + cumulative_count += count_obs + # Compute final averages valid_mask = cumulative_count > 0 - # print(f"Valid mask count: {np.sum(valid_mask)}") final_obs = np.zeros_like(cumulative_obs) final_lat = np.zeros_like(cumulative_lat) final_lon = np.zeros_like(cumulative_lon) @@ -686,8 +812,7 @@ class VIIRS(MAPIES): final_obs[valid_mask] = cumulative_obs[valid_mask] / cumulative_count[valid_mask] final_lat[valid_mask] = cumulative_lat[valid_mask] / cumulative_count[valid_mask] final_lon[valid_mask] = cumulative_lon[valid_mask] / cumulative_count[valid_mask] - - t6 = time.perf_counter() - print(f"Time to compute final averages: {t6 - t5}") + print(f"Completed processing for month {month}.") - return final_lon, final_lat, final_obs + + return final_lon, final_lat, final_obs, cumulative_count diff --git a/run/run_viirs.py b/run/run_viirs.py index ff4252afc..ada49dbd3 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -6,9 +6,9 @@ if __name__ == "__main__": # Change these variables start_date = "202401010000" end_date = "202402010000" - outdir="/esarchive/scratch/cmeikle/" - indir="/esarchive/obs/nasa/viirs_noaa20_aerdb_l2/original_files/VIIRS/" - c = VIIRS(start_date, end_date, frequency="3H", dest=outdir, indir=indir) + outdir="/Users/calummeikle/BSC/" + indir="/Users/calummeikle/BSC/data/VIIRS" + c = VIIRS(start_date, end_date, frequency="3H", dest=outdir, indir=indir, apply_qa=True) #c.read_nc() #c.preprocess_vars() #c.plot_2D_obs() @@ -18,5 +18,5 @@ if __name__ == "__main__": # Monthly average plot (in 'month' variables specify the month) c.read_nc() c.process_data(monthly_avg=True, batch_size = 100) - c.plot_2D_observations(months=None, outdir=outdir) - c.plot_2D_num_obs(months=None, outdir=outdir) + c.plot_2D_observations(months=[1], outdir=outdir) + c.plot_2D_num_obs(months=[1], outdir=outdir) -- GitLab From bffa700f4fe27a475c98e62df6e6a1de37fac629 Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Mon, 20 Jan 2025 15:15:41 +0100 Subject: [PATCH 4/8] Updating gantt making sure quality flags work --- docs/project-management/gantt.py | 24 +++++++++++++----------- mapies/config/satellite_config.yaml | 4 ++-- mapies/util/func_tools.py | 10 ++++++---- mapies/viirs.py | 6 ++++-- run/run_viirs.py | 8 ++++---- 5 files changed, 29 insertions(+), 23 deletions(-) diff --git a/docs/project-management/gantt.py b/docs/project-management/gantt.py index 8ecb57677..0edeb7340 100644 --- a/docs/project-management/gantt.py +++ b/docs/project-management/gantt.py @@ -4,17 +4,19 @@ import plotly.figure_factory as ff df = [ dict(Task="Viirs base processor", Start='2024-08-01', Finish='2024-09-30', Resource='Complete'), - dict(Task="Add pytests for already created functions (3-9)", Start='2024-08-01', Finish='2024-12-31', Resource='Incomplete'), - dict(Task="Tropomi base processor", Start='2024-08-01', Finish='2024-11-28', Resource='Incomplete'), - dict(Task="Tropomi to_plumes functionality (11)", Start='2024-12-01', Finish='2024-12-31', Resource='Not Started'), - dict(Task="Time evaluation statistics (18)", Start='2024-11-01', Finish='2024-11-28', Resource='Incomplete'), - dict(Task="Test DA implementation for VIIRS", Start='2024-11-11', Finish='2024-11-28', Resource='Not Started'), - dict(Task="Cloudsat processor", Start='2024-11-01', Finish='2024-11-28', Resource='Incomplete'), - dict(Task="Creating Grid Config for rotated and regular global grids (14)", Start='2024-11-18', Finish='2024-11-28', Resource='Not Started'), - dict(Task="Add original indices of the data to the outputted netcdf (13)", Start='2025-01-01', Finish='2025-01-14', Resource='Not Started'), - dict(Task="Add history to the (10)", Start='2025-01-01', Finish='2025-01-14', Resource='Not Started'), - dict(Task="Change variable names to IODA variables (17)", Start='2024-12-01', Finish='2025-01-14', Resource='Not Started'), - dict(Task="Update grid representations to include Vertical profiling (20)", Start='2024-12-01', Finish='2025-01-14', Resource='Not Started') + dict(Task="Add pytests for already created functions (3-9)", Start='2024-08-01', Finish='2024-12-31', Resource='Complete'), + dict(Task="Tropomi base processor", Start='2024-08-01', Finish='2025-02-03', Resource='Incomplete'), + dict(Task="Tropomi to_plumes functionality (11)", Start='2025-02-03', Finish='2025-03-31', Resource='Not Started'), + dict(Task="Time evaluation statistics (18)", Start='2024-11-01', Finish='2024-11-28', Resource='Complete'), + dict(Task="Test DA implementation for VIIRS", Start='2025-02-03', Finish='2025-02-28', Resource='Not Started'), + dict(Task="Convert Grid representation to numpy from geopandas", Start='2024-12-01', Finish='2025-12-31', Resource="Complete") + dict(Task="Creating Grid Config for rotated and regular global grids (14)", Start='2024-11-18', Finish='2025-02-03', Resource='Incomplete'), + dict(Task="Add original indices of the data to the outputted netcdf (13)", Start='2025-02-03', Finish='2025-01-14', Resource='Not Started'), + dict(Task="Add history of processes to the netcdf (10)", Start='2025-01-01', Finish='2025-02-03', Resource='Not Started'), + dict(Task="Update grid representations to include Vertical profiling (20)", Start='2024-03-01', Finish='2025-03-31', Resource='Incomplete') + dict(Task="Produce Number of counts plots (29)", Start="2025-01-13", Finish="2025-01-20", Resource="Complete") + dict(Task="QA flags (23)", Start="2025-01-13", Finish="2025-02-03", Resource="Incomplete") + dict(Task="Finalise netcdf storage output (30)", Start="2025-01-20", Finish="2025-02-03", Resource="Not Started") ] diff --git a/mapies/config/satellite_config.yaml b/mapies/config/satellite_config.yaml index 029f568f0..aa247cd7d 100644 --- a/mapies/config/satellite_config.yaml +++ b/mapies/config/satellite_config.yaml @@ -15,10 +15,10 @@ viirs: qa_variables: qa_land: qa_name: "Aerosol_Optical_Thickness_QA_Flag_Land" - qa_values: [2, 3] + qa_values: [3] qa_ocean: qa_name: "Aerosol_Optical_Thickness_QA_Flag_Ocean" - qa_values: [2, 3] + qa_values: [3] tropomi: diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index 017317d78..6359e1531 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -74,7 +74,7 @@ def time_converter(date:str) -> pd.Timestamp: return date -def quality_assurance(ds, obs, lon , lat, qualityFlags): +def quality_assurance(ds, obs, lat, lon, qualityFlags): """ Applying quality assurance flags to mapies data """ @@ -83,17 +83,19 @@ def quality_assurance(ds, obs, lon , lat, qualityFlags): for flag, info in qualityFlags.items(): arr = ds[info["qa_name"]].values.flatten() - #arr = self.reindex(self.time_values_index, arr) for value in info["qa_values"]: mask |= (arr == value) mask &= mask + print(arr[mask]) + obs = obs[mask] - lon = lon[mask] lat = lat[mask] + lon = lon[mask] + - return obs, lon, lat + return obs, lat, lon # Time Domain Selection function based off of xarray Will this work with dask too, a question for later def time_domain_selection( diff --git a/mapies/viirs.py b/mapies/viirs.py index ec340d73f..dca5ea472 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -54,11 +54,13 @@ def process_single_file(r, file, obs_var, lat_var, lon_var, apply_qa, qualityFla obs = ds[obs_var].values.flatten() lat = ds[lat_var].values.flatten() lon = ds[lon_var].values.flatten() + print(len(obs)) + print(len(obs[~np.isnan(obs)])) if apply_qa: print(f"Applying Quality assurance flags on file {file}") - quality_assurance(ds, obs, lat, lon, qualityFlags) - + obs, lat, lon = quality_assurance(ds, obs, lat, lon, qualityFlags) + print(len(obs)) # Rotate and aggregate lon_agg, lat_agg, obs_agg, count_obs = rotate_function(r, lon, lat, obs) diff --git a/run/run_viirs.py b/run/run_viirs.py index ada49dbd3..9786927f9 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -5,9 +5,9 @@ import time if __name__ == "__main__": # Change these variables start_date = "202401010000" - end_date = "202402010000" - outdir="/Users/calummeikle/BSC/" - indir="/Users/calummeikle/BSC/data/VIIRS" + end_date = "202401030000" + outdir="/home/cmeikle/Projects/data/VIIRS" + indir="/home/cmeikle/Projects/data/VIIRS/original_files/AERDB_L2_VIIRS_NOAA20/2024/" c = VIIRS(start_date, end_date, frequency="3H", dest=outdir, indir=indir, apply_qa=True) #c.read_nc() #c.preprocess_vars() @@ -17,6 +17,6 @@ if __name__ == "__main__": # Monthly average plot (in 'month' variables specify the month) c.read_nc() - c.process_data(monthly_avg=True, batch_size = 100) + c.process_data(monthly_avg=True, batch_size = 10) c.plot_2D_observations(months=[1], outdir=outdir) c.plot_2D_num_obs(months=[1], outdir=outdir) -- GitLab From c88a17b8ddbb439d077138612a374be6d3bb3a21 Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Tue, 21 Jan 2025 18:04:26 +0100 Subject: [PATCH 5/8] Updated all tests and gantt chart plus dust filter --- docs/project-management/gantt.py | 13 ++++++++----- mapies/config/satellite_config.yaml | 12 +++++++++--- mapies/tests/test_func_tools.py | 25 ++++++++++++++++++++++++- mapies/util/func_tools.py | 27 ++++++++++++++++----------- mapies/viirs.py | 5 +---- run/run_viirs.py | 6 +++--- 6 files changed, 61 insertions(+), 27 deletions(-) diff --git a/docs/project-management/gantt.py b/docs/project-management/gantt.py index 0edeb7340..eb0faa433 100644 --- a/docs/project-management/gantt.py +++ b/docs/project-management/gantt.py @@ -9,14 +9,17 @@ df = [ dict(Task="Tropomi to_plumes functionality (11)", Start='2025-02-03', Finish='2025-03-31', Resource='Not Started'), dict(Task="Time evaluation statistics (18)", Start='2024-11-01', Finish='2024-11-28', Resource='Complete'), dict(Task="Test DA implementation for VIIRS", Start='2025-02-03', Finish='2025-02-28', Resource='Not Started'), - dict(Task="Convert Grid representation to numpy from geopandas", Start='2024-12-01', Finish='2025-12-31', Resource="Complete") + dict(Task="Convert Grid representation to numpy from geopandas", Start='2024-12-01', Finish='2024-12-31', Resource="Complete"), dict(Task="Creating Grid Config for rotated and regular global grids (14)", Start='2024-11-18', Finish='2025-02-03', Resource='Incomplete'), dict(Task="Add original indices of the data to the outputted netcdf (13)", Start='2025-02-03', Finish='2025-01-14', Resource='Not Started'), dict(Task="Add history of processes to the netcdf (10)", Start='2025-01-01', Finish='2025-02-03', Resource='Not Started'), - dict(Task="Update grid representations to include Vertical profiling (20)", Start='2024-03-01', Finish='2025-03-31', Resource='Incomplete') - dict(Task="Produce Number of counts plots (29)", Start="2025-01-13", Finish="2025-01-20", Resource="Complete") - dict(Task="QA flags (23)", Start="2025-01-13", Finish="2025-02-03", Resource="Incomplete") - dict(Task="Finalise netcdf storage output (30)", Start="2025-01-20", Finish="2025-02-03", Resource="Not Started") + dict(Task="Update grid representations to include Vertical profiling (20)", Start='2025-03-01', Finish='2025-03-31', Resource='Not Started'), + dict(Task="Test Viirs data with Providentia Interpolation", Start="2025-03-01", Finish="2025-03-31", Resource="Not Started"), + dict(Task="Produce Number of counts plots (29)", Start="2025-01-13", Finish="2025-01-20", Resource="Complete"), + dict(Task="QA flags (23)", Start="2025-01-13", Finish="2025-02-03", Resource="Incomplete"), + dict(Task="Finalise netcdf storage output (30)", Start="2025-01-20", Finish="2025-02-03", Resource="Not Started"), + dict(Task="Create conda environment", Start="2025-01-01", Finish="2025-01-14", Resource="Complete"), + dict(Task="Create Logging file output", Start="2025-01-01", Finish="2025-02-03", Resource="Incomplete"), ] diff --git a/mapies/config/satellite_config.yaml b/mapies/config/satellite_config.yaml index aa247cd7d..f33d2ef58 100644 --- a/mapies/config/satellite_config.yaml +++ b/mapies/config/satellite_config.yaml @@ -14,11 +14,17 @@ viirs: qa: qa_variables: qa_land: - qa_name: "Aerosol_Optical_Thickness_QA_Flag_Land" - qa_values: [3] + qa_name: "Aerosol_Optical_Thickness_QA_Flag_Land" #name of flag + qa_values: [2,3] # Values you want to keep + qa_combined: False #This makes sure that & operator is used in the python script, else default is or qa_ocean: qa_name: "Aerosol_Optical_Thickness_QA_Flag_Ocean" qa_values: [3] + qa_combined: False + qa_type: + qa_name: "Aerosol_Type_Land_Ocean" + qa_values: [0] + qa_combined: True tropomi: @@ -26,7 +32,7 @@ tropomi: time_variable: "delta_time" lon_variable: "longitude" lat_variable: "latitude" - obs_variable: "nitrogendioxide_tropospheric_column" + obs_variable: "nitrogendioxide_tropospheric_column" diff --git a/mapies/tests/test_func_tools.py b/mapies/tests/test_func_tools.py index 23f873160..52afee646 100644 --- a/mapies/tests/test_func_tools.py +++ b/mapies/tests/test_func_tools.py @@ -2,8 +2,9 @@ import pytest import pandas as pd +import xarray as xr import numpy as np -from mapies.util.func_tools import time_converter, time_domain_selection, geo_to_rot +from mapies.util.func_tools import time_converter, time_domain_selection, geo_to_rot, quality_assurance @@ -35,6 +36,28 @@ def test_time_converter_error(): def test_geo_to_rot(test_input, expected): assert (round(geo_to_rot(*test_input)[0], 8), round(geo_to_rot(*test_input)[1], 8)) == expected +ds = xr.Dataset( + data_vars={"flag1":np.array([0, 0, 0, 1]), "flag2":np.array([1, 0, 1, 1])} +) +obs = np.array([1, 2, 3, 4]) +lon = np.array([1, 2, 3, 4]) +lat = np.array([1, 2, 3, 4]) +qualityFlags1 = {"qa_flag1":{"qa_name":"flag1", "qa_values":[1], "qa_combined":False}, "qa_flag2":{"qa_name":"flag2", "qa_values":[1], "qa_combined":False}} +qualityFlags2 = {"qa_flag1":{"qa_name":"flag1", "qa_values":[1], "qa_combined":True}, "qa_flag2":{"qa_name":"flag2", "qa_values":[1], "qa_combined":True}} +qualityFlags3 = {"qa_flag1":{"qa_name":"flag1", "qa_values":[1], "qa_combined":True}, "qa_flag2":{"qa_name":"flag2", "qa_values":[1], "qa_combined":False}} + +@pytest.mark.parametrize( + "test_input,expected", + [ + ((ds, obs, lat, lon, qualityFlags1), (np.array([1, 3, 4]), np.array([1, 3, 4]), np.array([1, 3, 4]))), + ((ds, obs, lat, lon, qualityFlags2), (np.array([4]), np.array([4]), np.array([4]))), + ((ds, obs, lat, lon, qualityFlags3), (np.array([1, 3, 4]), np.array([1, 3, 4]), np.array([1, 3, 4]))), + ] +) +def test_quality_assurance(test_input, expected): + assert quality_assurance(*test_input)[0].all() == expected[0].all() + assert quality_assurance(*test_input)[1].all() == expected[1].all() + assert quality_assurance(*test_input)[2].all() == expected[2].all() """ @pytest.mark.parametrize( "test_input,expected", diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index 6359e1531..99b3140e4 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -80,19 +80,24 @@ def quality_assurance(ds, obs, lat, lon, qualityFlags): """ mask = np.zeros_like(obs, dtype=bool) - for flag, info in qualityFlags.items(): - + for i, (flag, info) in enumerate(qualityFlags.items()): + + flag_mask = np.zeros_like(obs, dtype=bool) arr = ds[info["qa_name"]].values.flatten() - for value in info["qa_values"]: - mask |= (arr == value) - mask &= mask - print(arr[mask]) - - - obs = obs[mask] - lat = lat[mask] - lon = lon[mask] + flag_mask |= (arr == value) + + if i == 0: + mask = flag_mask + else: + if info["qa_combined"]: + mask &= flag_mask + else: + mask |= flag_mask + + obs = obs[~mask] + lat = lat[~mask] + lon = lon[~mask] return obs, lat, lon diff --git a/mapies/viirs.py b/mapies/viirs.py index dca5ea472..677789404 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -54,13 +54,10 @@ def process_single_file(r, file, obs_var, lat_var, lon_var, apply_qa, qualityFla obs = ds[obs_var].values.flatten() lat = ds[lat_var].values.flatten() lon = ds[lon_var].values.flatten() - print(len(obs)) - print(len(obs[~np.isnan(obs)])) if apply_qa: - print(f"Applying Quality assurance flags on file {file}") obs, lat, lon = quality_assurance(ds, obs, lat, lon, qualityFlags) - print(len(obs)) + # Rotate and aggregate lon_agg, lat_agg, obs_agg, count_obs = rotate_function(r, lon, lat, obs) diff --git a/run/run_viirs.py b/run/run_viirs.py index 9786927f9..717b8c1be 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -5,8 +5,8 @@ import time if __name__ == "__main__": # Change these variables start_date = "202401010000" - end_date = "202401030000" - outdir="/home/cmeikle/Projects/data/VIIRS" + end_date = "202402010000" + outdir="/home/cmeikle/Projects/data/" indir="/home/cmeikle/Projects/data/VIIRS/original_files/AERDB_L2_VIIRS_NOAA20/2024/" c = VIIRS(start_date, end_date, frequency="3H", dest=outdir, indir=indir, apply_qa=True) #c.read_nc() @@ -17,6 +17,6 @@ if __name__ == "__main__": # Monthly average plot (in 'month' variables specify the month) c.read_nc() - c.process_data(monthly_avg=True, batch_size = 10) + c.process_data(monthly_avg=True, batch_size = 20) c.plot_2D_observations(months=[1], outdir=outdir) c.plot_2D_num_obs(months=[1], outdir=outdir) -- GitLab From e49f81ecb698208977c7f74eddaaefa3a62ae6b9 Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Wed, 22 Jan 2025 13:17:36 +0100 Subject: [PATCH 6/8] Tropomi QA filters --- mapies/config/satellite_config.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/mapies/config/satellite_config.yaml b/mapies/config/satellite_config.yaml index f33d2ef58..4a8644b28 100644 --- a/mapies/config/satellite_config.yaml +++ b/mapies/config/satellite_config.yaml @@ -33,6 +33,13 @@ tropomi: lon_variable: "longitude" lat_variable: "latitude" obs_variable: "nitrogendioxide_tropospheric_column" + qa: + qa_variables: + qa_value: + qa_name: "qa_value" + qa_values: [0.5] + qa_ + qa_combined: True -- GitLab From bf1023a54efff3fcdc34c6f4ccbdf7d3bac28ccf Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Wed, 22 Jan 2025 18:36:39 +0100 Subject: [PATCH 7/8] Updating to run also from preprocess_vars --- mapies/config/satellite_config.yaml | 1 - mapies/mapies.py | 14 ++++++++++---- mapies/util/func_tools.py | 8 ++++---- mapies/viirs.py | 20 +++++--------------- run/run_viirs.py | 20 ++++++++------------ 5 files changed, 27 insertions(+), 36 deletions(-) diff --git a/mapies/config/satellite_config.yaml b/mapies/config/satellite_config.yaml index 4a8644b28..44c2b8a00 100644 --- a/mapies/config/satellite_config.yaml +++ b/mapies/config/satellite_config.yaml @@ -38,7 +38,6 @@ tropomi: qa_value: qa_name: "qa_value" qa_values: [0.5] - qa_ qa_combined: True diff --git a/mapies/mapies.py b/mapies/mapies.py index 2ff7f6186..4bc9a7205 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -240,15 +240,21 @@ class MAPIES: Applying quality assurance flags to mapies data """ mask = np.zeros_like(self.obs, dtype=bool) - - for flag, info in self.qualityFlags.items(): - + for i, (flag, info) in enumerate(self.qualityFlags.items()): + + flag_mask = np.zeros_like(self.obs, dtype=bool) arr = self.ds[info["qa_name"]].values.flatten() arr = self.reindex(self.time_values_index, arr) for value in info["qa_values"]: mask |= (arr == value) - mask &= mask + if i == 0: + mask = flag_mask + else: + if info["qa_combined"]: + mask &= flag_mask + else: + mask |= flag_mask self.obs = self.obs[mask] self.lon_values = self.lon_values[mask] diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index f034af0ac..0722ec2e5 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -30,7 +30,7 @@ def get_filepaths(pattern): """ Get file paths from a pattern """ - return glob(pattern) + return glob(pattern, recursive=True) # Function from Guillaume's script could be useful # Only supported in python3.10 @@ -102,9 +102,9 @@ def quality_assurance(ds, obs, lat, lon, qualityFlags): else: mask |= flag_mask - obs = obs[~mask] - lat = lat[~mask] - lon = lon[~mask] + obs = obs[mask] + lat = lat[mask] + lon = lon[mask] return obs, lat, lon diff --git a/mapies/viirs.py b/mapies/viirs.py index 61470785c..88cc64ec0 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -138,20 +138,10 @@ class VIIRS(MAPIES): self.dest = kwargs.get("dest") self.indir = kwargs.get("indir") - # ============================================================================= - # CARLOTTA ADDED THIS - # ============================================================================= - # self.year = kwargs.get("year", 2024) # Default to 2024 - # self.months = kwargs.get("months", list(range(1, 13))) # Default to all months (1-12) - # self.monthly_files = {} # Dictionary to store files by month - self.year = int(start_date[:4]) self.start_day = int(start_date[6:8]) self.end_day = int(end_date[6:8]) self.monthly_dict = {} - # ============================================================================= - # - # ============================================================================= self.dates_slice = pd.date_range( self.start_date, @@ -286,11 +276,11 @@ class VIIRS(MAPIES): Perform Rotation of Grid representation """ # Calculate the grid representation - r = RotatedGrid(dlon=.1, dlat=.1, west=-51, south=-35, centre_lon=20, centre_lat=35) - #r = RegularGrid(dlon=.1, dlat=.1, west=-180, south=-90) + #r = RotatedGrid(dlon=.1, dlat=.1, west=-51, south=-35, centre_lon=20, centre_lat=35) + r = RegularGrid(dlon=.1, dlat=.1, west=-180, south=-90) # Aggregate the the observations to the grid representation - lon, lat, rlon, rlat, obs, count_obs = r.aggregate(self.lon_values, self.lat_values, self.obs) - #lon, lat, obs, count_obs = r.aggregate(self.lon_values, self.lat_values, self.obs) + #lon, lat, rlon, rlat, obs, count_obs = r.aggregate(self.lon_values, self.lat_values, self.obs) + lon, lat, obs, count_obs = r.aggregate(self.lon_values, self.lat_values, self.obs) self.lon_values = lon self.lat_values = lat self.obs = obs @@ -713,7 +703,7 @@ class VIIRS(MAPIES): print(f"Processing a batch of {len(batch)} files...") # Use multiprocessing to process files within the batch - args = [(r, file, self.obs_var, self.lat_var, self.lon_var) for file in batch] + args = [(r, file, self.obs_var, self.lat_var, self.lon_var, self.apply_qa, self.qualityFlags) for file in batch] with multiprocessing.Pool(processes=8) as pool: # Adjust number of processes as needed results = pool.starmap(process_single_file, args) diff --git a/run/run_viirs.py b/run/run_viirs.py index 186d4ef71..8075df967 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -6,18 +6,14 @@ import time if __name__ == "__main__": # Change these variables start_date = "202401010000" - end_date = "202402010000" - outdir="/home/cmeikle/Projects/data/" - indir="/home/cmeikle/Projects/data/VIIRS/original_files/AERDB_L2_VIIRS_NOAA20/2024/" - c = VIIRS(start_date, end_date, frequency="3H", dest=outdir, indir=indir, apply_qa=True) - #c.read_nc() - #c.preprocess_vars() - #c.plot_2D_obs() - #c.plot_2D_obs_custom() - #c.to_da() + end_date = "202401020000" + outdir="/esarchive/scratch/cmeikle/" + indir="/esarchive/obs/nasa/viirs_noaa20_aerdb_l2/original_files/VIIRS" + c = VIIRS(start_date, end_date, frequency="D", dest=outdir, indir=indir, apply_qa=True) # Monthly average plot (in 'month' variables specify the month) c.read_nc() - c.process_data(monthly_avg=True, batch_size = 20) - c.plot_2D_observations(months=[1], outdir=outdir) - c.plot_2D_num_obs(months=[1], outdir=outdir) \ No newline at end of file + c.process_data(monthly_avg=False, batch_size = 20) + c.plot_2D_obs() + #c.plot_2D_observations(months=[0], outdir=outdir) + #c.plot_2D_num_obs(months=[0], outdir=outdir) \ No newline at end of file -- GitLab From 81ea2d47bb56a47d4bbdab182d5c2cac6d1b8d35 Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Thu, 23 Jan 2025 12:01:37 +0100 Subject: [PATCH 8/8] Got the inhereted qa function running --- mapies/mapies.py | 4 ++-- run/run_viirs.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mapies/mapies.py b/mapies/mapies.py index 4bc9a7205..17d9bac5a 100644 --- a/mapies/mapies.py +++ b/mapies/mapies.py @@ -245,9 +245,10 @@ class MAPIES: flag_mask = np.zeros_like(self.obs, dtype=bool) arr = self.ds[info["qa_name"]].values.flatten() arr = self.reindex(self.time_values_index, arr) + for value in info["qa_values"]: - mask |= (arr == value) + flag_mask |= (arr == value) if i == 0: mask = flag_mask else: @@ -259,7 +260,6 @@ class MAPIES: self.obs = self.obs[mask] self.lon_values = self.lon_values[mask] self.lat_values = self.lat_values[mask] - @staticmethod diff --git a/run/run_viirs.py b/run/run_viirs.py index 8075df967..3b3d13ba5 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -14,6 +14,6 @@ if __name__ == "__main__": # Monthly average plot (in 'month' variables specify the month) c.read_nc() c.process_data(monthly_avg=False, batch_size = 20) - c.plot_2D_obs() - #c.plot_2D_observations(months=[0], outdir=outdir) + #c.plot_2D_obs() + c.plot_2D_observations(months=[0], outdir=outdir) #c.plot_2D_num_obs(months=[0], outdir=outdir) \ No newline at end of file -- GitLab