From cb45556f8378070fdfb71b822ae512cda4a4cf2b Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Tue, 18 Feb 2025 16:20:52 +0100 Subject: [PATCH 1/2] Update where and when time domain selection is done as to not impact qa function --- mapies/tropomi.py | 6 ++-- mapies/util/func_tools.py | 43 ++++++++------------------ mapies/viirs.py | 63 ++++++++++++++------------------------- run/run_viirs.py | 11 +++---- 4 files changed, 40 insertions(+), 83 deletions(-) diff --git a/mapies/tropomi.py b/mapies/tropomi.py index a8ee9207..59f2d93d 100644 --- a/mapies/tropomi.py +++ b/mapies/tropomi.py @@ -40,12 +40,11 @@ def process_single_file( try: ds = xr.open_dataset(file, engine="h5netcdf", group="PRODUCT") - obs, lat, lon, time_values, time_values_index = new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end_date) - # obs = ds[obs_var].values.flatten() - # lat = ds[lat_var].values.flatten() + obs, lat, lon, time_values = new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end_date) if apply_qa: obs, lat, lon, time_values = quality_assurance(ds, obs, lat, lon, time_values, qualityFlags) + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, start_date, end_date) if geospatial_crop is not None: obs, lat, lon, time_values = spatial_domain_selection(obs, lat, lon, time_values, geospatial_crop) @@ -212,7 +211,6 @@ class TROPOMI(MAPIES): final_lon, final_lat, final_obs, cumulative_count = self.process_monthly_data( batch_size=batch_size, month=month ) - # final_lon, final_lat, final_obs, cumulative_count = self.process_files_in_blocks(month, block_size=300, batch_size=batch_size) # Update the monthly dictionary valid_obs = final_obs > 0 diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index 15356703..c9d50981 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -179,6 +179,9 @@ def spatial_domain_selection(obs, lat, lon, time, spatial_bounds): # Time Domain Selection function based off of xarray Will this work with dask too, a question for later def time_domain_selection( + obs, + lat, + lon, time_values, start_date, end_date, @@ -213,11 +216,14 @@ def time_domain_selection( else: return (array >= start_date) & (array <= end_date) - time_values_index = np.where(is_between_times(time_values))[0] # np.where returns tuple - - time_values = time_values[time_values_index] + time_mask = np.where(is_between_times(time_values))[0] + + time_values = time_values[time_mask] + lon = lon[time_mask] + lat = lat[time_mask] + obs = obs[time_mask] - return time_values, time_values_index + return obs, lon, lat, time_values def inflate_array(var_array, shape1, shape2): @@ -309,56 +315,31 @@ def new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end time_values = pd.to_datetime(time_values) - # Time domain selection - # print(f'Time values before selection: {time_values}') - time_values, time_values_index = time_domain_selection(time_values, start_date, end_date) - # print(f'Time values after selection: {time_values}')obs_dims = ds[obs_var].dims - obs_shape = ds[obs_var].shape obs_attrs = ds[obs_var].attrs # Duplicate values in the time array to get the same shape as the flattened variable array if time_shape != obs_shape: time_values = inflate_array(time_values, time_shape, obs_shape) - time_values_index = inflate_array(time_values_index, time_shape, obs_shape) - time_values, time_values_index = time_domain_selection(time_values, start_date, end_date) - - + # Lon and lat values lon_dims = ds[lon_var].dims lon_shape = ds[lon_var].shape lon_attrs = ds[lon_var].attrs lon_values = ds[lon_var].values.flatten() - # Reindex lon column values - lon_values = reindex( - time_values_index, - lon_values, - ) lat_dims = ds[lat_var].dims lat_shape = ds[lat_var].shape lat_attrs = ds[lat_var].attrs lat_values = ds[lat_var].values.flatten() - # Reindex aod column values - lat_values = reindex( - time_values_index, - lat_values, - ) - # AOD column values, default "Aerosol_Optical_Thickness_550_Land_Ocean_Best_Estimate" obs = ds[obs_var].values.flatten() - # Reindex aod column values - obs = reindex( - time_values_index, - obs, - ) - - return obs, lat_values, lon_values, time_values, time_values_index + return obs, lat_values, lon_values, time_values def reindex( diff --git a/mapies/viirs.py b/mapies/viirs.py index 65d95b6b..d32580a9 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -61,11 +61,14 @@ def process_single_file( try: ds = xr.open_dataset(file, engine="h5netcdf") - obs, lat, lon, time_values, time_values_index = new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end_date) - # obs = ds[obs_var].values.flatten() - # lat = ds[lat_var].values.flatten() + obs, lat, lon, time_values = new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end_date) + + #Run a time domain selection if apply_qa: obs, lat, lon, time_values = quality_assurance(ds, obs, lat, lon, time_values, qualityFlags) + + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, start_date, end_date) + if geospatial_crop is not None: obs, lat, lon, time_values = spatial_domain_selection(obs, lat, lon, time_values, geospatial_crop) @@ -80,29 +83,12 @@ def process_single_file( else: ds.close() return obs, lat, lon, time_values - - - except Exception as e: print(f"Error processing file {file}: {e}") - return None - - -def filter_file(args): - root, filename, start_dt, end_dt = args - try: - parts = filename.split('.') - date = parts[1][1:] - hhmm = parts[2] - file_dt = datetime.strptime(date + hhmm, "%Y%j%H%M") - - if start_dt <= file_dt <= end_dt: - return os.path.join(root, filename), file_dt.month - except Exception as e: - print(f"Error processing file {filename}: {e}") return None + class VIIRS(MAPIES): """ Class for VIIRS specific data @@ -229,8 +215,6 @@ class VIIRS(MAPIES): for day, data in self.daily_dict.items(): print(f"Day {day:02d}: {len(data['files'])} files") - # for filepath in data['files']: - # print(f" {filepath}") @@ -268,7 +252,7 @@ class VIIRS(MAPIES): try: # Process the data for the current month final_lon, final_lat, final_obs, cumulative_count = self.process_monthly_data( - batch_size=batch_size, month=month, apply_qa=apply_qa + batch_size=batch_size, month=month ) # Update the monthly dictionary @@ -292,7 +276,7 @@ class VIIRS(MAPIES): print(f"Error processing data for month {month}: {e}") continue else: - try: + try: final_lon, final_lat, final_obs, cumulative_count = self.process_monthly_data( batch_size=batch_size, month=None ) @@ -302,11 +286,6 @@ class VIIRS(MAPIES): self.lat = final_lat[valid_obs] self.obs = final_obs[valid_obs] self.count_obs = cumulative_count[valid_obs] - - # self.lon_values = final_lon - # self.lat_values = final_lat - # self.obs = final_obs - # self.count_obs = cumulative_count if save: self.to_netCDF(self.obs, self.lat, self.lon, self.start_date, self.end_date) @@ -477,6 +456,8 @@ class VIIRS(MAPIES): #Check if the user wants qa flags if apply_qa: self.apply_qa = True + else: + self.apply_qa = False if geospatial_crop is not None: self.geospatial_crop = np.array(geospatial_crop) for day in self.daily_dict.keys(): @@ -484,20 +465,20 @@ class VIIRS(MAPIES): files = self.daily_dict[day]["files"] args = [ ( - self.grid, - file, - self.obs_var, - self.lat_var, - self.lon_var, - self.time_var, - self.start_date, - self.end_date, - self.apply_qa, + self.grid, + file, + self.obs_var, + self.lat_var, + self.lon_var, + self.time_var, + self.start_date, + self.end_date, + self.apply_qa, self.qualityFlags, - flag, + flag, self.geospatial_crop - ) + ) for file in files ] diff --git a/run/run_viirs.py b/run/run_viirs.py index 04967a55..6f8a9403 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -15,19 +15,16 @@ if __name__ == "__main__": indir="/home/cmeikle/Projects/data/VIIRS" - - - start_time = time.time() c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="rotated") c.read_nc() - c.process_data(monthly_avg=False, batch_size = 50, apply_qa=False, geospatial_crop=[[0, 20], [10, 20]]) + c.process_data(monthly_avg=False, batch_size = 50, apply_qa=True, save=True) # c.yearly_average() - c.plot_2D_observations(months=[0], filename="Viirs_qa_not_applied.png", outdir=outdir) - c.plot_2D_num_obs(months=[0], filename="Viirs_num_obsqa_not_applied.png", outdir=outdir) - # c.process_lazy_data() + c.plot_2D_observations(months=[0], outdir=outdir) + c.plot_2D_num_obs(months=[0], outdir=outdir) + c.process_lazy_data(apply_qa=False) end_time = time.time() elapsed_time = end_time - start_time -- GitLab From a1fd9a2a5a5a4bc24a6c27d10732903b08447eeb Mon Sep 17 00:00:00 2001 From: Calum Meikle Date: Tue, 18 Feb 2025 16:34:12 +0100 Subject: [PATCH 2/2] Apply qa True in process_lazy data --- mapies/viirs.py | 2 +- run/run_viirs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mapies/viirs.py b/mapies/viirs.py index 2f42d3b2..5c815461 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -460,7 +460,7 @@ class VIIRS(MAPIES): print("Yearly average computation completed.") @timeit - def process_lazy_data(self, apply_qa=False, geospatial_crop: NDArray | None = None): + def process_lazy_data(self, apply_qa=True, geospatial_crop: NDArray | None = None): """ Process the data for the specified time range. It only retrieves a daily dictionary using dask lazy loading. """ diff --git a/run/run_viirs.py b/run/run_viirs.py index c38204c3..24ef6269 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -25,7 +25,7 @@ if __name__ == "__main__": # c.yearly_average() c.plot_2D_observations(months=[0], outdir=outdir) c.plot_2D_num_obs(months=[0], outdir=outdir) - c.process_lazy_data(apply_qa=False) + c.process_lazy_data(apply_qa=True) end_time = time.time() -- GitLab