diff --git a/hermesv3_bu/sectors/sector_manager.py b/hermesv3_bu/sectors/sector_manager.py index 7bb7d0ec036239170532c18719bccc8931915dab..b726d3229b1de9baf3e42333a15446b68c3298d2 100755 --- a/hermesv3_bu/sectors/sector_manager.py +++ b/hermesv3_bu/sectors/sector_manager.py @@ -4,8 +4,8 @@ import timeit from hermesv3_bu.logger.log import Log from hermesv3_bu.tools.checker import error_exit -SECTOR_LIST = ['traffic', 'traffic_area', 'aviation', 'point_sources', 'recreational_boats', 'shipping_port', - 'residential', 'livestock', 'crop_operations', 'crop_fertilizers', 'agricultural_machinery', 'solvents'] +SECTOR_LIST = ['aviation', 'point_sources', 'recreational_boats', 'shipping_port', 'residential', 'livestock', + 'crop_operations', 'crop_fertilizers', 'agricultural_machinery', 'solvents', 'traffic_area', 'traffic'] class SectorManager(object): diff --git a/hermesv3_bu/sectors/traffic_sector.py b/hermesv3_bu/sectors/traffic_sector.py index 6ff9e65f3cffa61b178a09600fa5ba868a0990ba..1f2959c58f73a84d818689f7a82c4b9328f67091 100755 --- a/hermesv3_bu/sectors/traffic_sector.py +++ b/hermesv3_bu/sectors/traffic_sector.py @@ -13,13 +13,18 @@ import warnings from hermesv3_bu.logger.log import Log from hermesv3_bu.sectors.sector import Sector from hermesv3_bu.io_server.io_netcdf import IoNetcdf +from hermesv3_bu.io_server.io_shapefile import IoShapefile from hermesv3_bu.tools.checker import check_files, error_exit +import gc +from memory_profiler import profile from ctypes import cdll, CDLL cdll.LoadLibrary("libc.so.6") libc = CDLL("libc.so.6") libc.malloc_trim(0) +downcasting = True + MIN_RAIN = 0.254 # After USEPA (2011) RECOVERY_RATIO = 0.0872 # After Amato et al. (2012) FINAL_PROJ = {'init': 'epsg:3035'} # https://epsg.io/3035 ETRS89 / LAEA Europe @@ -44,7 +49,6 @@ class TrafficSector(Sector): geolocalization of each road link, the temporal proxies, the emission factors files and also the information relative to the timesteps. """ - def __init__(self, comm, logger, auxiliary_dir, grid, clip, date_array, source_pollutants, vertical_levels, road_link_path, fleet_compo_path, speed_hourly_path, monthly_profiles_path, weekly_profiles_path, hourly_mean_profiles_path, hourly_weekday_profiles_path, hourly_saturday_profiles_path, @@ -134,6 +138,8 @@ class TrafficSector(Sector): self.expanded = self.expand_road_links() del self.fleet_compo, self.speed_hourly, self.monthly_profiles, self.weekly_profiles, self.hourly_profiles + libc.malloc_trim(0) + gc.collect() self.do_hot = do_hot self.do_cold = do_cold @@ -144,6 +150,23 @@ class TrafficSector(Sector): self.logger.write_time_log('TrafficSector', '__init__', timeit.default_timer() - spent_time) + @staticmethod + def downcast(df): + """ + + :param df: + :type df: DataFrame + :return: + """ + for col_name in df.columns: + if str(df[col_name].dtype) == 'float64': + df[col_name] = pd.to_numeric(df[col_name], downcast='float') + elif str(df[col_name].dtype) == 'int64': + df[[col_name]] = df[[col_name]].astype(np.int32) + sys.stdout.flush() + libc.malloc_trim(0) + gc.collect() + def check_profiles(self): spent_time = timeit.default_timer() # Checking speed profiles IDs @@ -202,6 +225,10 @@ class TrafficSector(Sector): self.read_hourly_profiles(hourly_saturday_profiles_path), self.read_hourly_profiles(hourly_sunday_profiles_path)]) hourly_profiles.index = hourly_profiles.index.astype(str) + + if downcasting: + self.downcast(hourly_profiles) + return hourly_profiles def read_speciation_map(self, path): @@ -231,7 +258,11 @@ class TrafficSector(Sector): """ spent_time = timeit.default_timer() speciation_map = pd.read_csv(path) + if downcasting: + self.downcast(speciation_map) dataframe = pd.read_csv(path) + if downcasting: + self.downcast(dataframe) # input_pollutants = list(self.source_pollutants) input_pollutants = ['nmvoc' if x == 'voc' else x for x in list(self.source_pollutants)] if 'PMC' in dataframe['dst'].values and all(element in input_pollutants for element in ['pm']): @@ -249,6 +280,7 @@ class TrafficSector(Sector): # dataframe['PM10'] = 'pm10' # if 'pm' in self.source_pollutants and 'PM25' in speciation_map[['dst']].values: # dataframe['PM25'] = 'pm25' + self.logger.write_time_log('TrafficSector', 'read_speciation_map', timeit.default_timer() - spent_time) return dataframe @@ -300,9 +332,12 @@ class TrafficSector(Sector): spent_time = timeit.default_timer() df = pd.read_csv(path, sep=',', dtype=np.float32) - df['P_speed'] = df['P_speed'].astype(int) + df['P_speed'] = df['P_speed'].astype(np.int32) df.set_index('P_speed', inplace=True) + if downcasting: + self.downcast(df) + self.logger.write_time_log('TrafficSector', 'read_speed_hourly', timeit.default_timer() - spent_time) return df @@ -311,6 +346,10 @@ class TrafficSector(Sector): df = pd.read_csv(path, sep=',') if vehicle_list is not None: df = df.loc[df['Code'].isin(vehicle_list), :] + + if downcasting: + self.downcast(df) + self.logger.write_time_log('TrafficSector', 'read_fleet_compo', timeit.default_timer() - spent_time) return df @@ -344,20 +383,30 @@ class TrafficSector(Sector): if self.comm.Get_rank() == 0: df = gpd.read_file(path) + df['sp_wd'] = df['sp_wd'].astype(np.int16) + df['sp_we'] = df['sp_we'].astype(np.int16) + df['sp_hour_su'] = df['sp_hour_su'].astype(np.int16) + df['sp_hour_mo'] = df['sp_hour_mo'].astype(np.int16) + df['sp_hour_tu'] = df['sp_hour_tu'].astype(np.int16) + df['sp_hour_we'] = df['sp_hour_we'].astype(np.int16) + df['sp_hour_th'] = df['sp_hour_th'].astype(np.int16) + df['sp_hour_fr'] = df['sp_hour_fr'].astype(np.int16) + df['sp_hour_sa'] = df['sp_hour_sa'].astype(np.int16) + + if downcasting: + self.downcast(df) try: df.drop(columns=['Adminis', 'CCAA', 'NETWORK_ID', 'Province', 'Road_name', 'aadt_m_sat', 'aadt_m_sun', 'aadt_m_wd', 'Source'], inplace=True) except KeyError as e: error_exit(str(e).replace('axis', 'the road links shapefile')) libc.malloc_trim(0) - # df.to_file('~/temp/road_links.shp') df = gpd.sjoin(df, self.clip.shapefile.to_crs(df.crs), how="inner", op='intersects') - # df.to_file('~/temp/road_links_selected.shp') df.drop(columns=['index_right'], inplace=True) libc.malloc_trim(0) # Filtering road links to CONSiderate. - df['CONS'] = df['CONS'].astype(np.int16) + df['CONS'] = df['CONS'].astype(np.int8) df = df[df['CONS'] != 0] df = df[df['aadt'] > 0] @@ -434,6 +483,9 @@ class TrafficSector(Sector): ef_path = os.path.join(self.ef_common_path, '{0}_{1}.csv'.format(emission_type, pollutant_name)) df = self.read_profiles(ef_path) + if downcasting: + self.downcast(df) + # Pollutants different to NH3 if pollutant_name != 'nh3': try: @@ -476,6 +528,10 @@ class TrafficSector(Sector): # Specific case for cold NH3 emission factors that needs the hot emission factors and the cold ones. if emission_type == 'cold': df_hot = self.read_ef('hot', pollutant_name) + + if downcasting: + self.downcast(df_hot) + df_hot.columns = [x + '_hot' for x in df_hot.columns.values] df = df.merge(df_hot, left_on=['Code', 'Mode'], right_on=['Code_hot', 'Mode_hot'], @@ -503,6 +559,10 @@ class TrafficSector(Sector): warnings.warn('No mileage correction applied to {0}'.format(pollutant_name)) df = None + if downcasting: + if df is not None: + self.downcast(df) + self.logger.write_time_log('TrafficSector', 'read_ef', timeit.default_timer() - spent_time) return df @@ -575,6 +635,9 @@ class TrafficSector(Sector): error_exit(str(e).replace('axis', 'the road links shapefile')) libc.malloc_trim(0) + if downcasting: + self.downcast(df) + self.logger.write_time_log('TrafficSector', 'update_fleet_value', timeit.default_timer() - spent_time) return df @@ -608,7 +671,7 @@ class TrafficSector(Sector): # Flat profile x['P_speed'].replace([0, np.nan], 1, inplace=True) - x['P_speed'] = x['P_speed'].astype(int) + x['P_speed'] = x['P_speed'].astype(np.int32) return x[['speed_mean', 'P_speed']] @@ -665,6 +728,8 @@ class TrafficSector(Sector): df['f_{0}'.format(i_t)] = get_temporal_factor( df[['month', 'weekday', 'hour', 'aadt_m_mn', 'aadt_week', 'aadt_h_mn', 'aadt_h_wd', 'aadt_h_sat', 'aadt_h_sun']]) + if downcasting: + self.downcast(df) try: df.drop(columns=['month', 'weekday', 'hour', 'P_speed', 'speed_mean', 'sp_wd', 'sp_we', 'sp_hour_mo', @@ -685,6 +750,8 @@ class TrafficSector(Sector): # Expands each road link by any vehicle type that the selected road link has. df_list = [] road_link_aux = self.road_links.copy().reset_index() + if downcasting: + self.downcast(road_link_aux) road_link_aux.drop(columns='geometry', inplace=True) libc.malloc_trim(0) @@ -697,6 +764,8 @@ class TrafficSector(Sector): df = pd.concat(df_list, ignore_index=True) + # df = IoShapefile(self.comm).balance(df) + df.set_index(['Link_ID', 'Fleet_Code'], inplace=True) libc.malloc_trim(0) @@ -728,6 +797,8 @@ class TrafficSector(Sector): spent_time = timeit.default_timer() expanded_aux = self.expanded.copy().reset_index() + if downcasting: + self.downcast(expanded_aux) for pollutant in self.source_pollutants: if pollutant != 'nh3': @@ -744,16 +815,22 @@ class TrafficSector(Sector): df_code = expanded_aux.merge(ef_code, left_on=['Fleet_Code'], right_on=['Code'], how='inner') del ef_code_slope_road, ef_code_slope, ef_code_road, ef_code + libc.malloc_trim(0) + gc.collect() expanded_aux = pd.concat([df_code_slope_road, df_code_slope, df_code_road, df_code]) expanded_aux.drop(columns=['Code', 'Road.Slope', 'Mode'], inplace=True) + libc.malloc_trim(0) + gc.collect() else: ef_code_road = self.read_ef('hot', pollutant) expanded_aux = expanded_aux.merge(ef_code_road, left_on=['Fleet_Code', 'Road_type'], right_on=['Code', 'Mode'], how='inner') expanded_aux.drop(columns=['Code', 'Mode'], inplace=True) + libc.malloc_trim(0) + gc.collect() # Warnings and Errors original_ef_profile = np.unique(self.expanded.index.get_level_values('Fleet_Code')) @@ -772,6 +849,8 @@ class TrafficSector(Sector): if m_corr is not None: expanded_aux = expanded_aux.merge(m_corr, left_on='Fleet_Code', right_on='Code', how='left') expanded_aux.drop(columns=['Code'], inplace=True) + libc.malloc_trim(0) + gc.collect() for tstep in range(len(self.date_array)): ef_name = 'ef_{0}_{1}'.format(pollutant, tstep) @@ -818,19 +897,31 @@ class TrafficSector(Sector): expanded_aux['Fleet_value'] * expanded_aux[ef_name] * expanded_aux['Mcorr'] * \ expanded_aux['f_{0}'.format(tstep)] expanded_aux.drop(columns=[ef_name, 'Mcorr'], inplace=True) + libc.malloc_trim(0) + gc.collect() if pollutant != 'nh3': expanded_aux.drop(columns=['v_aux', 'Min.Speed', 'Max.Speed', 'Alpha', 'Beta', 'Gamma', 'Delta', 'Epsilon', 'Zita', 'Hta', 'RF', 'Q', 'PF', 'T'], inplace=True) + libc.malloc_trim(0) + gc.collect() else: expanded_aux.drop(columns=['a', 'Cmileage', 'b', 'EFbase', 'TF'], inplace=True) + libc.malloc_trim(0) + gc.collect() if m_corr is not None: expanded_aux.drop(columns=['A_urban', 'B_urban', 'A_road', 'B_road', 'M'], inplace=True) + libc.malloc_trim(0) + gc.collect() expanded_aux.drop(columns=['road_grad'], inplace=True) expanded_aux.drop(columns=['f_{0}'.format(x) for x in range(len(self.date_array))], inplace=True) libc.malloc_trim(0) + gc.collect() + + if downcasting: + self.downcast(expanded_aux) self.logger.write_time_log('TrafficSector', 'calculate_hot', timeit.default_timer() - spent_time) @@ -839,11 +930,11 @@ class TrafficSector(Sector): def calculate_cold(self, hot_expanded): spent_time = timeit.default_timer() - cold_links = self.road_links.copy().reset_index() + cold_links = self.road_links.reset_index() cold_links.drop(columns=['aadt', 'PcHeavy', 'PcMoto', 'PcMoped', 'sp_wd', 'sp_we', 'sp_hour_su', 'sp_hour_mo', 'sp_hour_tu', 'sp_hour_we', 'sp_hour_th', 'sp_hour_fr', 'sp_hour_sa', 'Road_type', 'aadt_m_mn', 'aadt_h_mn', 'aadt_h_wd', 'aadt_h_sat', 'aadt_h_sun', 'aadt_week', - 'fleet_comp', 'road_grad', 'PcLight', 'start_date'], inplace=True) + 'fleet_comp', 'road_grad', 'PcLight', 'start_date', 'FID'], inplace=True) libc.malloc_trim(0) cold_links['centroid'] = cold_links['geometry'].centroid @@ -853,6 +944,8 @@ class TrafficSector(Sector): temperature = IoNetcdf(self.comm).get_hourly_data_from_netcdf( link_lons.min(), link_lons.max(), link_lats.min(), link_lats.max(), self.temp_common_path, 'tas', self.date_array) + if downcasting: + self.downcast(temperature) temperature.rename(columns={x: 't_{0}'.format(x) for x in range(len(self.date_array))}, inplace=True) # From Kelvin to Celsius degrees temperature[['t_{0}'.format(x) for x in range(len(self.date_array))]] = \ @@ -861,7 +954,7 @@ class TrafficSector(Sector): unary_union = temperature.unary_union cold_links['REC'] = cold_links.apply(self.nearest, geom_union=unary_union, df1=cold_links, df2=temperature, geom1_col='centroid', src_column='REC', axis=1) - + temperature.drop(columns=['geometry'], inplace=True) cold_links.drop(columns=['geometry', 'centroid', 'geometry'], inplace=True) libc.malloc_trim(0) @@ -869,12 +962,14 @@ class TrafficSector(Sector): cold_links.drop(columns=['REC'], inplace=True) libc.malloc_trim(0) - c_expanded = hot_expanded.merge(cold_links, left_on='Link_ID', right_on='Link_ID', how='left') + if downcasting: + self.downcast(cold_links) df_list = [] for pollutant in self.source_pollutants: ef_cold = self.read_ef('cold', pollutant) + ef_cold.rename(columns={'Code': 'Fleet_Code', 'Mode': 'Road_type'}, inplace=True) if pollutant != 'nh3': ef_cold.loc[ef_cold['Tmin'].isnull(), 'Tmin'] = -999 @@ -882,48 +977,50 @@ class TrafficSector(Sector): ef_cold.loc[ef_cold['Min.Speed'].isnull(), 'Min.Speed'] = -999 ef_cold.loc[ef_cold['Max.Speed'].isnull(), 'Max.Speed'] = 999 - c_expanded_p = c_expanded.merge(ef_cold, left_on=['Fleet_Code', 'Road_type'], - right_on=['Code', 'Mode'], how='inner') - cold_exp_p_aux = c_expanded_p.copy() - - cold_exp_p_aux.drop(columns=['Road_type', 'Fleet_value', 'Code'], inplace=True) - libc.malloc_trim(0) - for tstep in range(len(self.date_array)): v_column = 'v_{0}'.format(tstep) p_column = '{0}_{1}'.format(pollutant, tstep) t_column = 't_{0}'.format(tstep) + + c_expanded_p = hot_expanded[['Link_ID', 'Fleet_Code', 'Road_type', v_column, p_column]].merge( + cold_links[['Link_ID', t_column]], on='Link_ID', how='left') + c_expanded_p = c_expanded_p.merge(ef_cold, on=['Fleet_Code', 'Road_type'], how='inner') + c_expanded_p.drop(columns=['Road_type'], inplace=True) + libc.malloc_trim(0) + if pollutant != 'nh3': - cold_exp_p_aux = cold_exp_p_aux.loc[cold_exp_p_aux[t_column] >= cold_exp_p_aux['Tmin'], :] - cold_exp_p_aux = cold_exp_p_aux.loc[cold_exp_p_aux[t_column] < cold_exp_p_aux['Tmax'], :] - cold_exp_p_aux = cold_exp_p_aux.loc[cold_exp_p_aux[v_column] >= cold_exp_p_aux['Min.Speed'], :] - cold_exp_p_aux = cold_exp_p_aux.loc[cold_exp_p_aux[v_column] < cold_exp_p_aux['Max.Speed'], :] + c_expanded_p = c_expanded_p.loc[c_expanded_p[t_column] >= c_expanded_p['Tmin'], :] + c_expanded_p = c_expanded_p.loc[c_expanded_p[t_column] < c_expanded_p['Tmax'], :] + c_expanded_p = c_expanded_p.loc[c_expanded_p[v_column] >= c_expanded_p['Min.Speed'], :] + c_expanded_p = c_expanded_p.loc[c_expanded_p[v_column] < c_expanded_p['Max.Speed'], :] # Beta - cold_exp_p_aux['Beta'] = \ - (0.6474 - (0.02545 * cold_exp_p_aux['ltrip']) - (0.00974 - (0.000385 * cold_exp_p_aux['ltrip'])) * - cold_exp_p_aux[t_column]) * cold_exp_p_aux['bc'] + c_expanded_p['Beta'] = \ + (0.6474 - (0.02545 * c_expanded_p['ltrip']) - (0.00974 - (0.000385 * c_expanded_p['ltrip'])) * + c_expanded_p[t_column]) * c_expanded_p['bc'] if pollutant != 'nh3': - cold_exp_p_aux['cold_hot'] = \ - cold_exp_p_aux['A'] * cold_exp_p_aux[v_column] + cold_exp_p_aux['B'] * \ - cold_exp_p_aux[t_column] + cold_exp_p_aux['C'] + c_expanded_p['cold_hot'] = \ + c_expanded_p['A'] * c_expanded_p[v_column] + c_expanded_p['B'] * \ + c_expanded_p[t_column] + c_expanded_p['C'] else: - cold_exp_p_aux['cold_hot'] = \ - ((cold_exp_p_aux['a'] * cold_exp_p_aux['Cmileage'] + cold_exp_p_aux['b']) * - cold_exp_p_aux['EFbase'] * cold_exp_p_aux['TF']) / \ - ((cold_exp_p_aux['a_hot'] * cold_exp_p_aux['Cmileage'] + cold_exp_p_aux['b_hot']) * - cold_exp_p_aux['EFbase_hot'] * cold_exp_p_aux['TF_hot']) - cold_exp_p_aux.loc[cold_exp_p_aux['cold_hot'] < 1, 'cold_hot'] = 1 + c_expanded_p['cold_hot'] = \ + ((c_expanded_p['a'] * c_expanded_p['Cmileage'] + c_expanded_p['b']) * + c_expanded_p['EFbase'] * c_expanded_p['TF']) / \ + ((c_expanded_p['a_hot'] * c_expanded_p['Cmileage'] + c_expanded_p['b_hot']) * + c_expanded_p['EFbase_hot'] * c_expanded_p['TF_hot']) + c_expanded_p.loc[c_expanded_p['cold_hot'] < 1, 'cold_hot'] = 1 # Formula Cold emissions - cold_exp_p_aux[p_column] = \ - cold_exp_p_aux[p_column] * cold_exp_p_aux['Beta'] * (cold_exp_p_aux['cold_hot'] - 1) - df_list.append((cold_exp_p_aux[['Link_ID', 'Fleet_Code', p_column]]).set_index( - ['Link_ID', 'Fleet_Code'])) + c_expanded_p[p_column] = \ + c_expanded_p[p_column] * c_expanded_p['Beta'] * (c_expanded_p['cold_hot'] - 1) + c_expanded_p = (c_expanded_p[['Link_ID', 'Fleet_Code', p_column]]).set_index( + ['Link_ID', 'Fleet_Code']) + self.downcast(c_expanded_p) + df_list.append(c_expanded_p) try: - cold_df = pd.concat(df_list, axis=1, ).reset_index() + cold_df = pd.concat(df_list, axis=1).reset_index() except Exception: error_fleet_code = [] for df in df_list: @@ -956,6 +1053,9 @@ class TrafficSector(Sector): cold_df = self.speciate_traffic(cold_df, self.hot_cold_speciation) libc.malloc_trim(0) + + if downcasting: + self.downcast(cold_df) self.logger.write_time_log('TrafficSector', 'calculate_cold', timeit.default_timer() - spent_time) return cold_df @@ -1157,65 +1257,89 @@ class TrafficSector(Sector): df.rename(columns={p_name: p_name_new}, inplace=True) pollutants_renamed.append(p_name_new) - df_aux = df[['Link_ID', 'Fleet_Code'] + pollutants_renamed].copy() + df_aux = df[['Link_ID', 'Fleet_Code'] + pollutants_renamed] df_aux['tstep'] = tstep df_list.append(df_aux) df.drop(columns=pollutants_renamed, inplace=True) + del df_aux + libc.malloc_trim(0) + gc.collect() df = pd.concat(df_list, ignore_index=True) self.logger.write_time_log('TrafficSector', 'transform_df', timeit.default_timer() - spent_time) return df + @profile def speciate_traffic(self, df, speciation): spent_time = timeit.default_timer() # Reads speciation profile speciation = self.read_profiles(speciation) + speciation.rename(columns={'Code': 'Fleet_Code'}, inplace=True) speciation.drop(columns=['Copert_V_name'], inplace=True) + if downcasting: + self.downcast(speciation) + self.downcast(df) # Transform dataset into timestep rows instead of timestep columns df = self.transform_df(df) + if self.comm.Get_rank() == 0: + print('AKIIIIIIIII') + print(df.columns) + print(df.head()) + print(speciation.columns) + print(speciation.head()) in_list = list(df.columns.values) in_columns = ['Link_ID', 'Fleet_Code', 'tstep'] for in_col in in_columns: in_list.remove(in_col) + libc.malloc_trim(0) + gc.collect() df_out_list = [] # PMC if not set(speciation.columns.values).isdisjoint(pmc_list): out_p = set(speciation.columns.values).intersection(pmc_list).pop() - speciation_by_in_p = speciation[[out_p] + ['Code']].copy() + # speciation_by_in_p = speciation[[out_p, 'Code']] - speciation_by_in_p.rename(columns={out_p: 'f_{0}'.format(out_p)}, inplace=True) - df_aux = df[['pm10', 'pm25', 'Fleet_Code', 'tstep', 'Link_ID']] - df_aux = df_aux.merge(speciation_by_in_p, left_on='Fleet_Code', right_on='Code', how='left') - df_aux.drop(columns=['Code'], inplace=True) + # speciation_by_in_p.rename(columns={out_p: 'f_{0}'.format(out_p)}, inplace=True) + # df_aux = df[['pm10', 'pm25', 'Fleet_Code', 'tstep', 'Link_ID']] + df_aux = df[['pm10', 'pm25', 'Fleet_Code', 'tstep', 'Link_ID']].rename( + columns={out_p: 'f_{0}'.format(out_p)}).merge( + speciation[[out_p, 'Fleet_Code']], on='Fleet_Code', how='left') + libc.malloc_trim(0) + gc.collect() df_aux[out_p] = df_aux['pm10'] - df_aux['pm25'] - df_out_list.append(df_aux[[out_p] + ['tstep', 'Link_ID']].groupby(['tstep', 'Link_ID']).sum()) - + df_out_list.append(df_aux[[out_p, 'tstep', 'Link_ID']].groupby(['tstep', 'Link_ID']).sum()) + del df_aux + libc.malloc_trim(0) + gc.collect() + print('2 ->', df.columns) for in_p in in_list: + print('\t{0}'.format(in_p)) involved_out_pollutants = [key for key, value in self.speciation_map.items() if value == in_p] # Selecting only necessary speciation profiles - speciation_by_in_p = speciation[involved_out_pollutants + ['Code']].copy() + speciation_by_in_p = speciation[involved_out_pollutants + ['Fleet_Code']] # Adding "f_" in the formula column names for p in involved_out_pollutants: speciation_by_in_p.rename(columns={p: 'f_{0}'.format(p)}, inplace=True) # Getting a slice of the full dataset to be merged - df_aux = df[[in_p] + ['Fleet_Code', 'tstep', 'Link_ID']] - df_aux = df_aux.merge(speciation_by_in_p, left_on='Fleet_Code', right_on='Code', how='left') - df_aux.drop(columns=['Code'], inplace=True) + df_aux = df[[in_p, 'Fleet_Code', 'tstep', 'Link_ID']].merge(speciation_by_in_p, on='Fleet_Code', how='left') + libc.malloc_trim(0) + gc.collect() # Renaming pollutant columns by adding "old_" to the beginning. df_aux.rename(columns={in_p: 'old_{0}'.format(in_p)}, inplace=True) for p in involved_out_pollutants: + print('\t\t{0}'.format(p)) if in_p is not np.nan: if in_p != 0: df_aux[p] = df_aux['old_{0}'.format(in_p)].multiply(df_aux['f_{0}'.format(p)]) @@ -1231,13 +1355,21 @@ class TrafficSector(Sector): else: df_aux.loc[:, p] = 0 - - df_out_list.append(df_aux[[p] + ['tstep', 'Link_ID']].groupby(['tstep', 'Link_ID']).sum()) + if downcasting: + self.downcast(df_aux) + df_out_list.append(df_aux[[p, 'tstep', 'Link_ID']].groupby(['tstep', 'Link_ID']).sum()) + libc.malloc_trim(0) + gc.collect() del df_aux df.drop(columns=[in_p], inplace=True) + libc.malloc_trim(0) + gc.collect() df_out = pd.concat(df_out_list, axis=1) - + libc.malloc_trim(0) + gc.collect() + if downcasting: + self.downcast(df_out) self.logger.write_time_log('TrafficSector', 'speciate_traffic', timeit.default_timer() - spent_time) return df_out @@ -1261,36 +1393,43 @@ class TrafficSector(Sector): self.logger.write_log('\t\tCalculating Hot emissions.', message_level=2) hot_emis = self.calculate_hot() - if self.do_hot: - self.logger.write_log('\t\tCompacting Hot emissions.', message_level=2) - df_accum = pd.concat([df_accum, self.compact_hot_expanded(hot_emis.copy())]).groupby( - ['tstep', 'Link_ID']).sum() - libc.malloc_trim(0) - if self.do_cold: - self.logger.write_log('\t\tCalculating Cold emissions.', message_level=2) - df_accum = pd.concat([df_accum, self.calculate_cold(hot_emis)]).groupby( - ['tstep', 'Link_ID']).sum() - libc.malloc_trim(0) - if self.do_hot or self.do_cold: + if self.do_hot: + self.logger.write_log('\t\tCompacting Hot emissions.', message_level=2) + hot_emis_aux = self.compact_hot_expanded(hot_emis.copy()) + df_accum = pd.concat([df_accum, hot_emis_aux]).groupby( + ['tstep', 'Link_ID']).sum() + libc.malloc_trim(0) + + if self.do_cold: + self.logger.write_log('\t\tCalculating Cold emissions.', message_level=2) + cold_emis = self.calculate_cold(hot_emis) + df_accum = pd.concat([df_accum, cold_emis]).groupby( + ['tstep', 'Link_ID']).sum() + libc.malloc_trim(0) + del hot_emis libc.malloc_trim(0) if self.do_tyre_wear: self.logger.write_log('\t\tCalculating Tyre wear emissions.', message_level=2) - df_accum = pd.concat([df_accum, self.calculate_tyre_wear()], sort=False).groupby(['tstep', 'Link_ID']).sum() + tyre_emis = self.calculate_tyre_wear() + df_accum = pd.concat([df_accum, tyre_emis], sort=False).groupby(['tstep', 'Link_ID']).sum() libc.malloc_trim(0) if self.do_brake_wear: self.logger.write_log('\t\tCalculating Brake wear emissions.', message_level=2) - df_accum = pd.concat([df_accum, self.calculate_brake_wear()], sort=False).groupby( + brake_emis = self.calculate_brake_wear() + df_accum = pd.concat([df_accum, brake_emis], sort=False).groupby( ['tstep', 'Link_ID']).sum() libc.malloc_trim(0) if self.do_road_wear: self.logger.write_log('\t\tCalculating Road wear emissions.', message_level=2) - df_accum = pd.concat([df_accum, self.calculate_road_wear()], sort=False).groupby(['tstep', 'Link_ID']).sum() + road_emis = self.calculate_road_wear() + df_accum = pd.concat([df_accum, road_emis], sort=False).groupby(['tstep', 'Link_ID']).sum() libc.malloc_trim(0) if self.do_resuspension: self.logger.write_log('\t\tCalculating Resuspension emissions.', message_level=2) - df_accum = pd.concat([df_accum, self.calculate_resuspension()], sort=False).groupby( + resuspension_emis = self.calculate_resuspension() + df_accum = pd.concat([df_accum, resuspension_emis], sort=False).groupby( ['tstep', 'Link_ID']).sum() libc.malloc_trim(0) df_accum = df_accum.reset_index().merge(self.road_links.reset_index().loc[:, ['Link_ID', 'geometry']],