diff --git a/mapies/tropomi.py b/mapies/tropomi.py index a8ee92076182e1b7d3b631a3d129a31da25e1143..59f2d93d63a5c436b15f7c1ba155c664261f1cd7 100644 --- a/mapies/tropomi.py +++ b/mapies/tropomi.py @@ -40,12 +40,11 @@ def process_single_file( try: ds = xr.open_dataset(file, engine="h5netcdf", group="PRODUCT") - obs, lat, lon, time_values, time_values_index = new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end_date) - # obs = ds[obs_var].values.flatten() - # lat = ds[lat_var].values.flatten() + obs, lat, lon, time_values = new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end_date) if apply_qa: obs, lat, lon, time_values = quality_assurance(ds, obs, lat, lon, time_values, qualityFlags) + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, start_date, end_date) if geospatial_crop is not None: obs, lat, lon, time_values = spatial_domain_selection(obs, lat, lon, time_values, geospatial_crop) @@ -212,7 +211,6 @@ class TROPOMI(MAPIES): final_lon, final_lat, final_obs, cumulative_count = self.process_monthly_data( batch_size=batch_size, month=month ) - # final_lon, final_lat, final_obs, cumulative_count = self.process_files_in_blocks(month, block_size=300, batch_size=batch_size) # Update the monthly dictionary valid_obs = final_obs > 0 diff --git a/mapies/util/func_tools.py b/mapies/util/func_tools.py index 15356703284b96ba1a81fd08115eb8870f8e4341..c9d5098109cda6cda6b415d98e4a870c04c92368 100644 --- a/mapies/util/func_tools.py +++ b/mapies/util/func_tools.py @@ -179,6 +179,9 @@ def spatial_domain_selection(obs, lat, lon, time, spatial_bounds): # Time Domain Selection function based off of xarray Will this work with dask too, a question for later def time_domain_selection( + obs, + lat, + lon, time_values, start_date, end_date, @@ -213,11 +216,14 @@ def time_domain_selection( else: return (array >= start_date) & (array <= end_date) - time_values_index = np.where(is_between_times(time_values))[0] # np.where returns tuple - - time_values = time_values[time_values_index] + time_mask = np.where(is_between_times(time_values))[0] + + time_values = time_values[time_mask] + lon = lon[time_mask] + lat = lat[time_mask] + obs = obs[time_mask] - return time_values, time_values_index + return obs, lon, lat, time_values def inflate_array(var_array, shape1, shape2): @@ -309,56 +315,31 @@ def new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end time_values = pd.to_datetime(time_values) - # Time domain selection - # print(f'Time values before selection: {time_values}') - time_values, time_values_index = time_domain_selection(time_values, start_date, end_date) - # print(f'Time values after selection: {time_values}')obs_dims = ds[obs_var].dims - obs_shape = ds[obs_var].shape obs_attrs = ds[obs_var].attrs # Duplicate values in the time array to get the same shape as the flattened variable array if time_shape != obs_shape: time_values = inflate_array(time_values, time_shape, obs_shape) - time_values_index = inflate_array(time_values_index, time_shape, obs_shape) - time_values, time_values_index = time_domain_selection(time_values, start_date, end_date) - - + # Lon and lat values lon_dims = ds[lon_var].dims lon_shape = ds[lon_var].shape lon_attrs = ds[lon_var].attrs lon_values = ds[lon_var].values.flatten() - # Reindex lon column values - lon_values = reindex( - time_values_index, - lon_values, - ) lat_dims = ds[lat_var].dims lat_shape = ds[lat_var].shape lat_attrs = ds[lat_var].attrs lat_values = ds[lat_var].values.flatten() - # Reindex aod column values - lat_values = reindex( - time_values_index, - lat_values, - ) - # AOD column values, default "Aerosol_Optical_Thickness_550_Land_Ocean_Best_Estimate" obs = ds[obs_var].values.flatten() - # Reindex aod column values - obs = reindex( - time_values_index, - obs, - ) - - return obs, lat_values, lon_values, time_values, time_values_index + return obs, lat_values, lon_values, time_values def reindex( diff --git a/mapies/viirs.py b/mapies/viirs.py index b686bdaa952860817488e2323a14f67e9ce20419..5c8154611f10141e4e0ccd552d163759addc3e0a 100644 --- a/mapies/viirs.py +++ b/mapies/viirs.py @@ -61,11 +61,14 @@ def process_single_file( try: ds = xr.open_dataset(file, engine="h5netcdf") - obs, lat, lon, time_values, time_values_index = new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end_date) - # obs = ds[obs_var].values.flatten() - # lat = ds[lat_var].values.flatten() + obs, lat, lon, time_values = new_preprocess_vars(ds, obs_var, lat_var, lon_var, time_var, start_date, end_date) + + #Run a time domain selection if apply_qa: obs, lat, lon, time_values = quality_assurance(ds, obs, lat, lon, time_values, qualityFlags) + + obs, lon, lat, time_values = time_domain_selection(obs, lat, lon, time_values, start_date, end_date) + if geospatial_crop is not None: obs, lat, lon, time_values = spatial_domain_selection(obs, lat, lon, time_values, geospatial_crop) @@ -80,26 +83,13 @@ def process_single_file( else: ds.close() return obs, lat, lon, time_values - + except Exception as e: - print(f"Error processing file in process single file {file}: {e}") - return None - - -def filter_file(args): - root, filename, start_dt, end_dt = args - try: - parts = filename.split('.') - date = parts[1][1:] - hhmm = parts[2] - file_dt = datetime.strptime(date + hhmm, "%Y%j%H%M") + print(f"Error processing file {file}: {e}") - if start_dt <= file_dt <= end_dt: - return os.path.join(root, filename), file_dt.month - except Exception as e: - print(f"Error processing file {filename}: {e}") return None + class VIIRS(MAPIES): """ Class for VIIRS specific data @@ -226,8 +216,6 @@ class VIIRS(MAPIES): for day, data in self.daily_dict.items(): print(f"Day {day:02d}: {len(data['files'])} files") - # for filepath in data['files']: - # print(f" {filepath}") @@ -265,7 +253,7 @@ class VIIRS(MAPIES): try: # Process the data for the current month final_lon, final_lat, final_obs, cumulative_count = self.process_monthly_data( - batch_size=batch_size, month=month, apply_qa=apply_qa + batch_size=batch_size, month=month ) # Update the monthly dictionary @@ -289,7 +277,7 @@ class VIIRS(MAPIES): print(f"Error processing data for month {month}: {e}") continue else: - try: + try: final_lon, final_lat, final_obs, cumulative_count = self.process_monthly_data( batch_size=batch_size, month=None ) @@ -299,11 +287,6 @@ class VIIRS(MAPIES): self.lat = final_lat[valid_obs] self.obs = final_obs[valid_obs] self.count_obs = cumulative_count[valid_obs] - - # self.lon_values = final_lon - # self.lat_values = final_lat - # self.obs = final_obs - # self.count_obs = cumulative_count if save: try: @@ -477,7 +460,7 @@ class VIIRS(MAPIES): print("Yearly average computation completed.") @timeit - def process_lazy_data(self, apply_qa=False, geospatial_crop: NDArray | None = None): + def process_lazy_data(self, apply_qa=True, geospatial_crop: NDArray | None = None): """ Process the data for the specified time range. It only retrieves a daily dictionary using dask lazy loading. """ @@ -494,20 +477,20 @@ class VIIRS(MAPIES): files = self.daily_dict[day]["files"] args = [ ( - self.grid, - file, - self.obs_var, - self.lat_var, - self.lon_var, - self.time_var, - self.start_date, - self.end_date, - self.apply_qa, + self.grid, + file, + self.obs_var, + self.lat_var, + self.lon_var, + self.time_var, + self.start_date, + self.end_date, + self.apply_qa, self.qualityFlags, - flag, + flag, self.geospatial_crop - ) + ) for file in files ] diff --git a/run/run_viirs.py b/run/run_viirs.py index b81ead6e9c60deac9430e21335f14cf1589a450c..24ef6269416e1a3649bfc34c8e3371b13c2d4548 100644 --- a/run/run_viirs.py +++ b/run/run_viirs.py @@ -11,12 +11,8 @@ if __name__ == "__main__": - # outdir="/home/cmeikle/Projects/data/" - # indir="/home/cmeikle/Projects/data/VIIRS" - - outdir="/home/cgile/Documents/mapies/figures" - indir="/home/cgile/Documents/mapies/VIIRS" - + outdir="/home/cmeikle/Projects/data/" + indir="/home/cmeikle/Projects/data/VIIRS" start_time = time.time() @@ -24,11 +20,13 @@ if __name__ == "__main__": c = VIIRS(start_date, end_date, dest=outdir, indir=indir, grid_repr="rotated") c.read_nc() - # c.process_data(monthly_avg=False, batch_size = 50, save=True, apply_qa=False) + + c.process_data(monthly_avg=False, batch_size = 50, apply_qa=True, save=True) # c.yearly_average() - # c.plot_2D_observations(months=[0], filename="Viirs_qa_not_applied.png", outdir=outdir) - # c.plot_2D_num_obs(months=[0], filename="Viirs_num_obsqa_not_applied.png", outdir=outdir) - c.process_lazy_data(apply_qa=False, geospatial_crop=None) + c.plot_2D_observations(months=[0], outdir=outdir) + c.plot_2D_num_obs(months=[0], outdir=outdir) + c.process_lazy_data(apply_qa=True) + end_time = time.time() elapsed_time = end_time - start_time