Newer
Older
}
sub_array <- sub_array$data_array
# Subset with second round of indices
dims_to_crop <- which(!sapply(second_round_indices, is.null))
if (length(dims_to_crop) > 0) {
dimnames_to_crop <- names(second_round_indices)[dims_to_crop]
sub_array <- Subset(sub_array, dimnames_to_crop,
second_round_indices[dimnames_to_crop])
}
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER SUBSETTING WITH 2nd ROUND INDICES:")
print(str(sub_array))
}
}
}
metadata <- attr(sub_array, 'variables')
Nicolau Manubens
committed
names_bk <- names(store_indices)
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
store_indices <- lapply(names(store_indices),
function (x) {
if (!(x %in% names(first_round_indices))) {
store_indices[[x]]
} else if (is.null(second_round_indices[[x]])) {
1:dim(sub_array)[x]
} else {
if (is.numeric(second_round_indices[[x]])) {
## TODO: Review carefully this line. Inner indices are all
## aligned to the left-most positions. If dataset A has longitudes
## 1, 2, 3, 4 but dataset B has only longitudes 3 and 4, then
## they will be stored as follows:
## 1, 2, 3, 4
## 3, 4, NA, NA
##x - min(x) + 1
1:length(second_round_indices[[x]])
} else {
1:length(second_round_indices[[x]])
}
}
})
Nicolau Manubens
committed
names(store_indices) <- names_bk
print("-> STRUCTURE OF FIRST ROUND INDICES FOR THIS WORK PIECE:")
print(str(first_round_indices))
print("-> STRUCTURE OF SECOND ROUND INDICES FOR THIS WORK PIECE:")
print(str(second_round_indices))
print("-> STRUCTURE OF STORE INDICES FOR THIS WORK PIECE:")
print(str(store_indices))
}
}
Nicolau Manubens
committed
store_indices <- lapply(store_indices, as.integer)
Nicolau Manubens
committed
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
3099
3100
3101
3102
3103
3104
3105
# split the storage work of the loaded subset in parts
largest_dim_name <- names(dim(sub_array))[which.max(dim(sub_array))]
max_parts <- length(store_indices[[largest_dim_name]])
# Indexing a data file of N MB with expand.grid takes 30*N MB
# The peak ram of Start is, minimum, 2 * total data to load from all files
# due to inefficiencies in other regions of the code
# The more parts we split the indexing done below in, the lower
# the memory footprint of the indexing and the fast.
# But more than 10 indexing iterations (parts) for each MB processed
# makes the iteration slower (tested empirically on BSC workstations).
subset_size_in_mb <- prod(dim(sub_array)) * 8 / 1024 / 1024
best_n_parts <- ceiling(subset_size_in_mb * 10)
# We want to set n_parts to a greater value than the one that would
# result in a memory footprint (of the subset indexing code below) equal
# to 2 * total data to load from all files.
# s = subset size in MB
# p = number of parts to break it in
# T = total size of data to load
# then, s / p * 30 = 2 * T
# then, p = s * 15 / T
min_n_parts <- ceiling(prod(dim(sub_array)) * 15 / prod(store_dims))
# Make sure we pick n_parts much greater than the minimum calculated
n_parts <- min_n_parts * 10
if (n_parts > best_n_parts) {
n_parts <- best_n_parts
}
# Boundary checks
if (n_parts < 1) {
n_parts <- 1
}
if (n_parts > max_parts) {
n_parts <- max_parts
}
if (n_parts > 1) {
make_parts <- function(length, n) {
clusters <- cut(1:length, n, labels = FALSE)
lapply(1:n, function(y) which(clusters == y))
}
part_indices <- make_parts(max_parts, n_parts)
parts <- lapply(part_indices,
function(x) {
store_indices[[largest_dim_name]][x]
})
} else {
part_indices <- list(1:max_parts)
parts <- store_indices[largest_dim_name]
}
Nicolau Manubens
committed
# do the storage work
weights <- sapply(1:length(store_dims),
function(i) prod(c(1, store_dims)[1:i]))
part_indices_in_sub_array <- as.list(rep(TRUE, length(dim(sub_array))))
names(part_indices_in_sub_array) <- names(dim(sub_array))
data_array <- bigmemory::attach.big.matrix(shared_matrix_pointer)
for (i in 1:n_parts) {
store_indices[[largest_dim_name]] <- parts[[i]]
# Converting array indices to vector indices
matrix_indices <- do.call("expand.grid", store_indices)
# Given a matrix where each row is a set of array indices of an element
# the vector indices are computed
matrix_indices <- 1 + colSums(t(matrix_indices - 1) * weights)
part_indices_in_sub_array[[largest_dim_name]] <- part_indices[[i]]
data_array[matrix_indices] <- as.vector(do.call('[',
c(list(x = sub_array),
part_indices_in_sub_array)))
}
rm(data_array)
gc()
if (!is.null(work_piece[['save_metadata_in']])) {
saveRDS(metadata, file = work_piece[['save_metadata_in']])
}
}
if (!is.null(work_piece[['progress_amount']]) && !silent) {
message(work_piece[['progress_amount']], appendLF = FALSE)
}
is.null(sub_array)
}