Newer
Older
if (!silent) {
.message("Loading... This may take several minutes...")
if (progress_message != '') {
.message(progress_message, appendLF = FALSE)
}
}
if (num_procs == 1) {
found_files <- lapply(work_pieces, .LoadDataFile,
shared_matrix_pointer = shared_matrix_pointer,
file_data_reader = file_data_reader,
transform = transform,
transform_params = transform_params,
silent = silent, debug = debug)
} else {
cluster <- makeCluster(num_procs, outfile = "")
# Send the heavy work to the workers
work_errors <- try({
found_files <- clusterApplyLB(cluster, work_pieces, .LoadDataFile,
shared_matrix_pointer = shared_matrix_pointer,
file_data_reader = file_data_reader,
transform = transform,
transform_params = transform_params,
silent = silent, debug = debug)
})
stopCluster(cluster)
}
if (!silent) {
if (progress_message != '') {
.message("\n", tag = '')
}
}
#print("P")
Nicolau Manubens
committed
data_array <- array(bigmemory::as.matrix(data_array), dim = final_dims_fake)
# Load metadata and remove the metadata folder
if (!is.null(metadata_dims)) {
loaded_metadata_files <- list.files(metadata_folder)
loaded_metadata <- lapply(paste0(metadata_folder, '/', loaded_metadata_files), readRDS)
unlink(metadata_folder, recursive = TRUE)
return_metadata <- vector('list', length = prod(dim(array_of_metadata_flags)[metadata_dims]))
return_metadata[as.numeric(loaded_metadata_files)] <- loaded_metadata
dim(return_metadata) <- dim(array_of_metadata_flags[metadata_dims])
attr(data_array, 'Variables') <- return_metadata
# TODO: Try to infer data type from loaded_metadata
# as.integer(data_array)
}
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
failed_pieces <- work_pieces[which(unlist(found_files))]
for (failed_piece in failed_pieces) {
array_of_not_found_files <- do.call('[<-',
c(list(array_of_not_found_files),
as.list(failed_piece[['file_indices_in_array_of_files']]),
list(value = TRUE)))
}
if (any(array_of_not_found_files)) {
for (i in 1:prod(dim(array_of_files_to_load))) {
if (is.na(array_of_not_found_files[i])) {
array_of_files_to_load[i] <- NA
} else {
if (array_of_not_found_files[i]) {
array_of_not_found_files[i] <- array_of_files_to_load[i]
array_of_files_to_load[i] <- NA
} else {
array_of_not_found_files[i] <- NA
}
}
}
} else {
array_of_not_found_files <- NULL
}
# Replace the vars and common vars by the transformed vars and common vars
for (i in 1:length(dat)) {
if (length(names(transformed_vars[[i]])) > 0) {
picked_vars[[i]][names(transformed_vars[[i]])] <- transformed_vars[[i]]
} else if (length(names(picked_vars_ordered[[i]])) > 0) {
picked_vars[[i]][names(picked_vars_ordered[[i]])] <- picked_vars_ordered[[i]]
}
}
if (length(names(transformed_common_vars)) > 0) {
picked_common_vars[names(transformed_common_vars)] <- transformed_common_vars
} else if (length(names(picked_common_vars_ordered)) > 0) {
picked_common_vars[names(picked_common_vars_ordered)] <- picked_common_vars_ordered
}
if (debug) {
print("-> THE TRANSFORMED VARS:")
print(str(transformed_vars))
print("-> THE PICKED VARS:")
print(str(picked_vars))
}
file_selectors <- NULL
for (i in 1:length(dat)) {
file_selectors[[dat[[i]][['name']]]] <- dat[[i]][['selectors']][which(names(dat[[i]][['selectors']]) %in% found_file_dims[[i]])]
}
if (retrieve) {
if (!silent) {
.message("Successfully retrieved data.")
}
var_backup <- attr(data_array, 'Variables')[[1]]
attr(data_array, 'Variables') <- NULL
attributes(data_array) <- c(attributes(data_array),
list(Variables = c(list(common = c(picked_common_vars, var_backup)),
picked_vars),
Files = array_of_files_to_load,
NotFoundFiles = array_of_not_found_files,
FileSelectors = file_selectors,
PatternDim = found_pattern_dim)
attr(data_array, 'class') <- c('startR_array', attr(data_array, 'class'))
} else {
if (!silent) {
.message("Successfully discovered data dimensions.")
}
start_call <- match.call()
start_call[[i]] <- eval.parent(start_call[[i]])
start_call[['retrieve']] <- TRUE
attributes(start_call) <- c(attributes(start_call),
Nicolau Manubens
committed
list(Dimensions = final_dims_fake,
Variables = c(list(common = picked_common_vars), picked_vars),
ExpectedFiles = array_of_files_to_load,
PatternDim = found_pattern_dim,
MergedDims = if (merge_across_dims) {
inner_dims_across_files
} else {
NULL
},
SplitDims = if (split_multiselected_dims) {
all_split_dims
} else {
NULL
})
attr(start_call, 'class') <- c('startR_cube', attr(start_call, 'class'))
}
# This function is the responsible for loading the data of each work
# piece.
.LoadDataFile <- function(work_piece, shared_matrix_pointer,
file_data_reader, synonims,
transform, transform_params,
silent = FALSE, debug = FALSE) {
# suppressPackageStartupMessages({library(bigmemory)})
### TODO: Specify dependencies as parameter
# suppressPackageStartupMessages({library(ncdf4)})
#print("1")
store_indices <- as.list(work_piece[['store_position']])
first_round_indices <- work_piece[['first_round_indices']]
second_round_indices <- work_piece[['second_round_indices']]
#print("2")
file_to_open <- work_piece[['file_path']]
sub_array <- file_data_reader(file_to_open, NULL,
work_piece[['file_selectors']],
first_round_indices, synonims)
3170
3171
3172
3173
3174
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185
3186
3187
3188
3189
3190
3191
3192
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> LOADING A WORK PIECE")
print("-> STRUCTURE OF READ UNTRANSFORMED DATA:")
print(str(sub_array))
print("-> STRUCTURE OF VARIABLES TO TRANSFORM:")
print(str(work_piece[['vars_to_transform']]))
print("-> COMMON ARRAY DIMENSIONS:")
print(str(work_piece[['store_dims']]))
}
}
if (!is.null(sub_array)) {
# Apply data transformation once we have the data arrays.
if (!is.null(transform)) {
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> PROCEEDING TO TRANSFORM ARRAY")
print("-> DIMENSIONS OF ARRAY RIGHT BEFORE TRANSFORMING:")
print(dim(sub_array))
}
}
sub_array <- do.call(transform, c(list(data_array = sub_array,
variables = work_piece[['vars_to_transform']],
file_selectors = work_piece[['file_selectors']]),
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
3204
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
transform_params))
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER TRANSFORMING:")
print(str(sub_array))
print("-> DIMENSIONS OF ARRAY RIGHT AFTER TRANSFORMING:")
print(dim(sub_array$data_array))
}
}
sub_array <- sub_array$data_array
# Subset with second round of indices
dims_to_crop <- which(!sapply(second_round_indices, is.null))
if (length(dims_to_crop) > 0) {
dimnames_to_crop <- names(second_round_indices)[dims_to_crop]
sub_array <- Subset(sub_array, dimnames_to_crop,
second_round_indices[dimnames_to_crop])
}
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER SUBSETTING WITH 2nd ROUND INDICES:")
print(str(sub_array))
}
}
}
metadata <- attr(sub_array, 'variables')
Nicolau Manubens
committed
names_bk <- names(store_indices)
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240
3241
3242
store_indices <- lapply(names(store_indices),
function (x) {
if (!(x %in% names(first_round_indices))) {
store_indices[[x]]
} else if (is.null(second_round_indices[[x]])) {
1:dim(sub_array)[x]
} else {
if (is.numeric(second_round_indices[[x]])) {
## TODO: Review carefully this line. Inner indices are all
## aligned to the left-most positions. If dataset A has longitudes
## 1, 2, 3, 4 but dataset B has only longitudes 3 and 4, then
## they will be stored as follows:
## 1, 2, 3, 4
## 3, 4, NA, NA
##x - min(x) + 1
1:length(second_round_indices[[x]])
} else {
1:length(second_round_indices[[x]])
}
}
})
Nicolau Manubens
committed
names(store_indices) <- names_bk
print("-> STRUCTURE OF FIRST ROUND INDICES FOR THIS WORK PIECE:")
print(str(first_round_indices))
print("-> STRUCTURE OF SECOND ROUND INDICES FOR THIS WORK PIECE:")
print(str(second_round_indices))
print("-> STRUCTURE OF STORE INDICES FOR THIS WORK PIECE:")
print(str(store_indices))
}
}
Nicolau Manubens
committed
store_indices <- lapply(store_indices, as.integer)
Nicolau Manubens
committed
3258
3259
3260
3261
3262
3263
3264
3265
3266
3267
3268
3269
3270
3271
3272
3273
3274
3275
3276
3277
3278
3279
3280
3281
3282
3283
3284
3285
3286
3287
3288
3289
3290
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
# split the storage work of the loaded subset in parts
largest_dim_name <- names(dim(sub_array))[which.max(dim(sub_array))]
max_parts <- length(store_indices[[largest_dim_name]])
# Indexing a data file of N MB with expand.grid takes 30*N MB
# The peak ram of Start is, minimum, 2 * total data to load from all files
# due to inefficiencies in other regions of the code
# The more parts we split the indexing done below in, the lower
# the memory footprint of the indexing and the fast.
# But more than 10 indexing iterations (parts) for each MB processed
# makes the iteration slower (tested empirically on BSC workstations).
subset_size_in_mb <- prod(dim(sub_array)) * 8 / 1024 / 1024
best_n_parts <- ceiling(subset_size_in_mb * 10)
# We want to set n_parts to a greater value than the one that would
# result in a memory footprint (of the subset indexing code below) equal
# to 2 * total data to load from all files.
# s = subset size in MB
# p = number of parts to break it in
# T = total size of data to load
# then, s / p * 30 = 2 * T
# then, p = s * 15 / T
min_n_parts <- ceiling(prod(dim(sub_array)) * 15 / prod(store_dims))
# Make sure we pick n_parts much greater than the minimum calculated
n_parts <- min_n_parts * 10
if (n_parts > best_n_parts) {
n_parts <- best_n_parts
}
# Boundary checks
if (n_parts < 1) {
n_parts <- 1
}
if (n_parts > max_parts) {
n_parts <- max_parts
}
if (n_parts > 1) {
make_parts <- function(length, n) {
clusters <- cut(1:length, n, labels = FALSE)
lapply(1:n, function(y) which(clusters == y))
}
part_indices <- make_parts(max_parts, n_parts)
parts <- lapply(part_indices,
function(x) {
store_indices[[largest_dim_name]][x]
})
} else {
part_indices <- list(1:max_parts)
parts <- store_indices[largest_dim_name]
}
Nicolau Manubens
committed
# do the storage work
weights <- sapply(1:length(store_dims),
function(i) prod(c(1, store_dims)[1:i]))
part_indices_in_sub_array <- as.list(rep(TRUE, length(dim(sub_array))))
names(part_indices_in_sub_array) <- names(dim(sub_array))
data_array <- bigmemory::attach.big.matrix(shared_matrix_pointer)
for (i in 1:n_parts) {
store_indices[[largest_dim_name]] <- parts[[i]]
# Converting array indices to vector indices
matrix_indices <- do.call("expand.grid", store_indices)
# Given a matrix where each row is a set of array indices of an element
# the vector indices are computed
matrix_indices <- 1 + colSums(t(matrix_indices - 1) * weights)
part_indices_in_sub_array[[largest_dim_name]] <- part_indices[[i]]
data_array[matrix_indices] <- as.vector(do.call('[',
c(list(x = sub_array),
part_indices_in_sub_array)))
}
rm(data_array)
gc()
if (!is.null(work_piece[['save_metadata_in']])) {
saveRDS(metadata, file = work_piece[['save_metadata_in']])
}
}
if (!is.null(work_piece[['progress_amount']]) && !silent) {
message(work_piece[['progress_amount']], appendLF = FALSE)
}
is.null(sub_array)
}