Start.R 140 KB
Newer Older
                    if (any(na.omit(unlist(sub_array_of_selectors)) < 1) ||
                        any(na.omit(unlist(sub_array_of_selectors)) > data_dims[inner_dim] * chunk_amount)) {
                      stop("Provided indices out of range for dimension '", inner_dim, "' ", 
                           "for dataset '", dat[[i]][['name']], "' (accepted range: 1 to ", 
                           data_dims[inner_dim] * chunk_amount, ").")
                    }
                  } else {
                    if (inner_dim %in% names(dim(sub_array_of_values))) {
                      inner_dim_pos_in_sub_array <- which(names(dim(sub_array_of_values)) == inner_dim)
                      if (inner_dim_pos_in_sub_array != 1) {
                        new_sub_array_order <- (1:length(dim(sub_array_of_values)))[-inner_dim_pos_in_sub_array]
                        new_sub_array_order <- c(inner_dim_pos_in_sub_array, new_sub_array_order)
                        sub_array_of_values <- .aperm2(sub_array_of_values, new_sub_array_order)
                      }
                    }
                  }
                  inner_dim_pos_in_sub_array <- which(names(dim(sub_array_of_selectors)) == inner_dim)
                  if (inner_dim_pos_in_sub_array != 1) {
                    new_sub_array_order <- (1:length(dim(sub_array_of_selectors)))[-inner_dim_pos_in_sub_array]
                    new_sub_array_order <- c(inner_dim_pos_in_sub_array, new_sub_array_order)
                    sub_array_of_selectors <- .aperm2(sub_array_of_selectors, new_sub_array_order)
                  }
                  sub_array_of_indices <- selector_checker(sub_array_of_selectors, sub_array_of_values,
                                                           tolerance = tolerance_params[[inner_dim]])
                  sub_array_of_indices <- sub_array_of_indices[chunk_indices(length(sub_array_of_indices),
                                                                             chunks[[inner_dim]]['chunk'],
                                                                             chunks[[inner_dim]]['n_chunks'],
                                                                             inner_dim)]
                  sub_array_is_list <- FALSE
                  if (is.list(sub_array_of_indices)) {
                    sub_array_is_list <- TRUE
                    sub_array_of_indices <- unlist(sub_array_of_indices)
                  }
                  if (is.null(var_with_selectors_name)) {
                    indices_chunk <- floor((sub_array_of_indices - 1) / data_dims[inner_dim]) + 1
                    transformed_indices <- ((sub_array_of_indices - 1) %% data_dims[inner_dim]) + 1
                  } else {
                    indices_chunk <- floor((sub_array_of_indices - 1) / var_full_dims[inner_dim]) + 1
                    transformed_indices <- ((sub_array_of_indices - 1) %% var_full_dims[inner_dim]) + 1
                  }
                  if (sub_array_is_list) {
                    sub_array_of_indices <- as.list(sub_array_of_indices)
                  }
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> GOING TO ITERATE ALONG CHUNKS.")
}
}
                  for (chunk in 1:chunk_amount) {
                    if (!is.null(names(selector_store_position))) {
                      selector_store_position[file_dim] <- chunk
                    } else {
                      selector_store_position <- chunk
                    }
                    chunk_selectors <- transformed_indices[which(indices_chunk == chunk)]
                    sub_array_of_indices <- chunk_selectors
                    if (with_transform) {
                      # If the provided selectors are expressed in the world
                      # before transformation
Nicolau Manubens's avatar
Nicolau Manubens committed
                      if (!aiat) {
                        first_index <- min(unlist(sub_array_of_indices))
                        last_index <- max(unlist(sub_array_of_indices))
                        sub_array_of_fri <- max(c(first_index - beta, 1)):min(c(last_index + beta, n))
                        sub_array_of_sri <- transform_indices(unlist(sub_array_of_indices) - first_index + 1, n, m)
                        if (is.list(sub_array_of_indices)) {
                          if (length(sub_array_of_sri) > 1) {
                            sub_array_of_sri <- sub_array_of_sri[[1]]:sub_array_of_sri[[2]]
                          }
                        }
##TODO: TRANSFORM SUBSET VARIABLE AS ABOVE, TO COMPUTE SRI
                      # If the selectors are expressed after transformation
                      } else {
                        first_index <- min(unlist(sub_array_of_indices))
                        last_index <- max(unlist(sub_array_of_indices))
                        first_index_before_transform <- max(transform_indices(first_index, m, n) - beta, 1)
                        last_index_before_transform <- min(transform_indices(last_index, m, n) + beta, n)
                        sub_array_of_fri <- first_index_before_transform:last_index_before_transform
                        if (is.list(sub_array_of_indices) && (length(sub_array_of_indices) > 1)) {
                          sub_array_of_sri <- 1:(last_index - first_index + 1) + 
                                              round(beta / n * m) 
                        } else {
                          sub_array_of_sri <- sub_array_of_indices - first_index + 1 +
                                              round(beta / n * m)
                        }
##TODO: FILL IN TVI
                      }
                      sri <- do.call('[[<-', c(list(x = sri), as.list(selector_store_position),
                                               list(value = sub_array_of_sri)))
                      if (length(sub_array_of_sri) > 0) {
                        taken_chunks[chunk] <- TRUE
                      }
                    } else {
                      sub_array_of_fri <- sub_array_of_indices
                      if (length(sub_array_of_fri) > 0) {
                        taken_chunks[chunk] <- TRUE
                      }
                    }
                    if (!is.null(var_unorder_indices)) {
                      ordered_fri <- sub_array_of_fri
                      sub_array_of_fri <- var_unorder_indices[sub_array_of_fri]
                    }
                    fri <- do.call('[[<-', c(list(x = fri), as.list(selector_store_position),
                                             list(value = sub_array_of_fri)))
                  }
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> FINISHED ITERATING ALONG CHUNKS")
}
}
                } else {
                  stop("Provided array of indices for dimension '", inner_dim, "', ",
                       "which goes across the file dimension '", file_dim, "', but ",
                       "the provided array does not have the dimension '", inner_dim, 
                       "', which is mandatory.")
                }
              }
            }
          }
if (debug) {
if (inner_dim %in% dims_to_check) {
print("-> PROCEEDING TO CROP VARIABLES")
}
}
Nicolau Manubens Gil's avatar
Nicolau Manubens Gil committed
          #if ((length(selector_array) == 1) && (selector_array %in% c('all', 'first', 'last'))) {
            #if (!is.null(var_with_selectors_name) || (is.null(var_with_selectors_name) && is.character(selector_array) &&
            #    (length(selector_array) == 1) && (selector_array %in% c('all', 'first', 'last')))) {
              empty_chunks <- which(!taken_chunks)
              if (length(empty_chunks) >= length(taken_chunks)) {
                stop("Selectors do not match any of the possible values for the dimension '", inner_dim, "'.")
              }
              if (length(empty_chunks) > 0) {
#                # Get the first group of chunks to remove, and remove them. 
#                # E.g., from c(1, 2, 4, 5, 6, 8, 9) remove only 1 and 2
#                dist <- abs(rev(empty_chunks) - c(rev(empty_chunks)[1] - 1, head(rev(empty_chunks), length(rev(empty_chunks)) - 1)))
#                if (all(dist == 1)) {
#                  start_chunks_to_remove <- NULL
#                } else {
#                  first_chunk_to_remove <- tail(which(dist > 1), 1)
#                  start_chunks_to_remove <- rev(rev(empty_chunks)[first_chunk_to_remove:length(empty_chunks)])
#                }
#                # Get the last group of chunks to remove, and remove them. 
#                # E.g., from c(1, 2, 4, 5, 6, 8, 9) remove only 8 and 9
#                dist <- abs(empty_chunks - c(empty_chunks[1] - 1, head(empty_chunks, length(empty_chunks) - 1)))
#                if (all(dist == 1)) {
#                  first_chunk_to_remove <- 1
#                } else {
#                  first_chunk_to_remove <- tail(which(dist > 1), 1)
#                }
#                end_chunks_to_remove <- empty_chunks[first_chunk_to_remove:length(empty_chunks)]
#                chunks_to_keep <- which(!((1:length(taken_chunks)) %in% c(start_chunks_to_remove, end_chunks_to_remove)))
                chunks_to_keep <- which(taken_chunks)
                dims_to_crop[[file_dim]] <- c(dims_to_crop[[file_dim]], list(chunks_to_keep))
#                found_indices <- Subset(found_indices, file_dim, chunks_to_keep)
#                # Crop dataset variables file dims.
#                for (picked_var in names(picked_vars[[i]])) {
#                  if (file_dim %in% names(dim(picked_vars[[i]][[picked_var]]))) {
#                    picked_vars[[i]][[picked_var]] <- Subset(picked_vars[[i]][[picked_var]], file_dim, chunks_to_keep)
#                  }
#                }
              }
            #}
            dat[[i]][['selectors']][[inner_dim]] <- list(fri = fri, sri = sri)
            # Crop dataset variables inner dims.
            # Crop common variables inner dims.
            types_of_var_to_crop <- 'picked'
            if (with_transform) {
              types_of_var_to_crop <- c(types_of_var_to_crop, 'transformed')
            }
            if (!is.null(dim_reorder_params[[inner_dim]])) {
              types_of_var_to_crop <- c(types_of_var_to_crop, 'reordered')
            }
            for (type_of_var_to_crop in types_of_var_to_crop) {
              if (type_of_var_to_crop == 'transformed') {
                if (is.null(tvi)) {
                  if (!is.null(dim_reorder_params[[inner_dim]])) {
                    crop_indices <- unique(unlist(ordered_sri))
                  } else {
                    crop_indices <- unique(unlist(sri))
                  }
                } else {
                  crop_indices <- unique(unlist(tvi))
                }
                vars_to_crop <- transformed_vars[[i]]
                common_vars_to_crop <- transformed_common_vars
              } else if (type_of_var_to_crop == 'reordered') {
                crop_indices <- unique(unlist(ordered_fri))
                vars_to_crop <- picked_vars_ordered[[i]]
                common_vars_to_crop <- picked_common_vars_ordered
              } else {
                crop_indices <- unique(unlist(fri))
                vars_to_crop <- picked_vars[[i]]
                common_vars_to_crop <- picked_common_vars
              }
              for (var_to_crop in names(vars_to_crop)) {
                if (inner_dim %in% names(dim(vars_to_crop[[var_to_crop]]))) {
                  if (!is.null(crop_indices)) {
                    if (type_of_var_to_crop == 'transformed') {
Nicolau Manubens's avatar
Nicolau Manubens committed
                      if (!aiat) {
                        vars_to_crop[[var_to_crop]] <- Subset(transformed_subset_var, inner_dim, crop_indices)
                      } else {
                        vars_to_crop[[var_to_crop]] <- Subset(vars_to_crop[[var_to_crop]], inner_dim, crop_indices)
                      }
                    } else {
                      vars_to_crop[[var_to_crop]] <- Subset(vars_to_crop[[var_to_crop]], inner_dim, crop_indices)
                    }
                  }
                }
              }
              if (i == length(dat)) {
                for (common_var_to_crop in names(common_vars_to_crop)) {
                  if (inner_dim %in% names(dim(common_vars_to_crop[[common_var_to_crop]]))) {
                    common_vars_to_crop[[common_var_to_crop]] <- Subset(common_vars_to_crop[[common_var_to_crop]], inner_dim, crop_indices)
                  }
                }
              }
              if (type_of_var_to_crop == 'transformed') {
                if (!is.null(vars_to_crop)) {
                  transformed_vars[[i]] <- vars_to_crop
                }
                if (i == length(dat)) {
                  transformed_common_vars <- common_vars_to_crop
                }
              } else if (type_of_var_to_crop == 'reordered') {
                if (!is.null(vars_to_crop)) {
                  picked_vars_ordered[[i]] <- vars_to_crop
                }
                if (i == length(dat)) {
                  picked_common_vars_ordered <- common_vars_to_crop
                }
              } else {
                if (!is.null(vars_to_crop)) {
                  picked_vars[[i]] <- vars_to_crop
                }
                if (i == length(dat)) {
                  picked_common_vars <- common_vars_to_crop
                }
              }
            }
Nicolau Manubens Gil's avatar
Nicolau Manubens Gil committed
          #}
        }
        # After the selectors have been picked (using the original variables), 
        # the variables are transformed. At that point, the original selectors
        # for the transformed variables are also kept in the variable original_selectors.
#print("L")
      }
    }
  }
#  if (!is.null(transformed_common_vars)) {
#    picked_common_vars[names(transformed_common_vars)] <- transformed_common_vars
#  }
  # Remove the trailing chunks, if any.
  for (file_dim in names(dims_to_crop)) {
#    indices_to_keep <- min(sapply(dims_to_crop[[file_dim]], min)):max(sapply(dims_to_crop[[file_dim]], max))
    ## TODO: Merge indices in dims_to_crop with some advanced mechanism?
    indices_to_keep <- unique(unlist(dims_to_crop[[file_dim]]))
    array_of_files_to_load <- Subset(array_of_files_to_load, file_dim, indices_to_keep)
    array_of_not_found_files <- Subset(array_of_not_found_files, file_dim, indices_to_keep)
    for (i in 1:length(dat)) {
      # Crop selectors
      for (selector_dim in names(dat[[i]][['selectors']])) {
        if (selector_dim == file_dim) {
          for (j in 1:length(dat[[i]][['selectors']][[selector_dim]][['fri']])) {
            dat[[i]][['selectors']][[selector_dim]][['fri']][[j]] <- dat[[i]][['selectors']][[selector_dim]][['fri']][[j]][indices_to_keep]
          }
          for (j in 1:length(dat[[i]][['selectors']][[selector_dim]][['sri']])) {
            dat[[i]][['selectors']][[selector_dim]][['sri']][[j]] <- dat[[i]][['selectors']][[selector_dim]][['sri']][[j]][indices_to_keep]
          }
        }
        if (file_dim %in% names(dim(dat[[i]][['selectors']][[selector_dim]][['fri']]))) {
          dat[[i]][['selectors']][[selector_dim]][['fri']] <- Subset(dat[[i]][['selectors']][[selector_dim]][['fri']], file_dim, indices_to_keep)
          dat[[i]][['selectors']][[selector_dim]][['sri']] <- Subset(dat[[i]][['selectors']][[selector_dim]][['sri']], file_dim, indices_to_keep)
        }
      }
      # Crop dataset variables file dims.
      for (picked_var in names(picked_vars[[i]])) {
        if (file_dim %in% names(dim(picked_vars[[i]][[picked_var]]))) {
          picked_vars[[i]][[picked_var]] <- Subset(picked_vars[[i]][[picked_var]], file_dim, indices_to_keep)
        }
      }
      for (transformed_var in names(transformed_vars[[i]])) {
        if (file_dim %in% names(dim(transformed_vars[[i]][[transformed_var]]))) {
          transformed_vars[[i]][[transformed_var]] <- Subset(transformed_vars[[i]][[transformed_var]], file_dim, indices_to_keep)
        }
      }
    }
    # Crop common variables file dims.
Nicolau Manubens's avatar
Nicolau Manubens committed
    for (picked_common_var in names(picked_common_vars)) {
      if (file_dim %in% names(dim(picked_common_vars[[picked_common_var]]))) {
        picked_common_vars[[picked_common_var]] <- Subset(picked_common_vars[[picked_common_var]], file_dim, indices_to_keep)
      }
    }
    for (transformed_common_var in names(transformed_common_vars)) {
      if (file_dim %in% names(dim(transformed_common_vars[[transformed_common_var]]))) {
        transformed_common_vars[[transformed_common_var]] <- Subset(transformed_common_vars[[transformed_common_var]], file_dim, indices_to_keep)
      }
    }
  }
  # Calculate the size of the final array.
  total_inner_dims <- NULL
  for (i in 1:length(dat)) {
    if (dataset_has_files[i]) {
      inner_dims <- expected_inner_dims[[i]]
      inner_dims <- sapply(inner_dims, 
        function(x) {
          if (!all(sapply(dat[[i]][['selectors']][[x]][['sri']], is.null))) {
            max(sapply(dat[[i]][['selectors']][[x]][['sri']], length))
          } else {
            if (length(var_params[[x]]) > 0) {
              if (var_params[[x]] %in% names(transformed_vars[[i]])) {
                length(transformed_vars[[i]][[var_params[[x]]]])
              } else if (var_params[[x]] %in% names(transformed_common_vars)) {
                length(transformed_common_vars[[var_params[[x]]]])
              } else {
                max(sapply(dat[[i]][['selectors']][[x]][['fri']], length))
              }
            } else {
              max(sapply(dat[[i]][['selectors']][[x]][['fri']], length))
            }
          }
        })
      names(inner_dims) <- expected_inner_dims[[i]]
      if (is.null(total_inner_dims)) {
        total_inner_dims <- inner_dims
      } else {
        new_dims <- .MergeArrayDims(total_inner_dims, inner_dims)
        total_inner_dims <- pmax(new_dims[[1]], new_dims[[2]])
      }
    }
  }
  new_dims <- .MergeArrayDims(dim(array_of_files_to_load), total_inner_dims)
  final_dims <- pmax(new_dims[[1]], new_dims[[2]])[dim_names]
  if (!silent) {
    .message("Detected dimension sizes:")
    longest_dim_len <- max(sapply(names(final_dims), nchar))
    longest_size_len <- max(sapply(paste0(final_dims, ''), nchar))
    sapply(names(final_dims), 
      function(x) {
        message(paste0("*   ", paste(rep(' ', longest_dim_len - nchar(x)), collapse = ''), 
                       x, ": ", paste(rep(' ', longest_size_len - nchar(paste0(final_dims[x], ''))), collapse = ''), 
                       final_dims[x]))
      })
    bytes <- prod(c(final_dims, 8))
    dim_sizes <- paste(final_dims, collapse = ' x ')
    if (retrieve) {
      .message(paste("Total size of requested data:"))
    } else {
      .message(paste("Total size of involved data:"))
    }
    .message(paste(dim_sizes, " x 8 bytes =", 
                   format(structure(bytes, class = "object_size"), units = "auto")), 
                   indent = 2)
  }
  # The following several lines will only be run if retrieve = TRUE
  if (retrieve) {
  ########## CREATING THE SHARED MATRIX AND DISPATCHING WORK PIECES ###########
  # TODO: try performance of storing all in cols instead of rows
  # Create the shared memory array, and a pointer to it, to be sent
  # to the work pieces.
  data_array <- big.matrix(nrow = prod(final_dims), ncol = 1)
  shared_matrix_pointer <- describe(data_array)
  if (is.null(num_procs)) {
    num_procs <- availableCores()
  # Creating a shared tmp folder to store metadata from each chunk
  array_of_metadata_flags <- array(FALSE, dim = dim(array_of_files_to_load))
  if (!is.null(metadata_dims)) {
    metadata_indices_to_load <- as.list(rep(1, length(dim(array_of_files_to_load))))
    names(metadata_indices_to_load) <- names(dim(array_of_files_to_load))
    metadata_indices_to_load[metadata_dims] <- as.list(rep(TRUE, length(metadata_dims)))
    array_of_metadata_flags <- do.call('[<-', c(list(array_of_metadata_flags),  metadata_indices_to_load,
                                                list(value = rep(TRUE, prod(dim(array_of_files_to_load)[metadata_dims])))))
  }
  metadata_file_counter <- 0
  metadata_folder <- tempfile('metadata')
  dir.create(metadata_folder)
  # Build the work pieces, each with:
  # - file path
  # - total size (dims) of store array
  # - start position in store array
  # - file selectors (to provide extra info. useful e.g. to select variable)
  # - indices to take from file
  work_pieces <- list()
  for (i in 1:length(dat)) {
    if (dataset_has_files[i]) {
      selectors <- dat[[i]][['selectors']]
      file_dims <- found_file_dims[[i]]
      inner_dims <- expected_inner_dims[[i]]
      sub_array_dims <- final_dims[file_dims]
      sub_array_dims[found_pattern_dim] <- 1
      sub_array_of_files_to_load <- array(1:prod(sub_array_dims), 
Nicolau Manubens's avatar
Nicolau Manubens committed
                                          dim = sub_array_dims)
      names(dim(sub_array_of_files_to_load)) <- names(sub_array_dims)
      # Detect which of the dimensions of the dataset go across files.
      file_dim_across_files <- lapply(inner_dims, 
        function(x) {
          dim_across <- sapply(inner_dims_across_files, function(y) x %in% y)
          if (any(dim_across)) {
            names(inner_dims_across_files)[which(dim_across)[1]]
          } else {
            NULL
          }
        })
      names(file_dim_across_files) <- inner_dims
      j <- 1
      while (j <= prod(sub_array_dims)) {
        # Work out file path.
        file_to_load_sub_indices <- which(sub_array_of_files_to_load == j, arr.ind = TRUE)[1, ]
        names(file_to_load_sub_indices) <- names(sub_array_dims)
        file_to_load_sub_indices[found_pattern_dim] <- i
        big_dims <- rep(1, length(dim(array_of_files_to_load)))
        names(big_dims) <- names(dim(array_of_files_to_load))
        file_to_load_indices <- .MergeArrayDims(file_to_load_sub_indices, big_dims)[[1]]
        file_to_load <- do.call('[[', c(list(array_of_files_to_load), 
                                        as.list(file_to_load_indices)))
        not_found_file <- do.call('[[', c(list(array_of_not_found_files),
                                          as.list(file_to_load_indices)))
        load_file_metadata <- do.call('[', c(list(array_of_metadata_flags), 
                                             as.list(file_to_load_indices)))
        if (load_file_metadata) {
          metadata_file_counter <- metadata_file_counter + 1
        }
        if (!is.na(file_to_load) && !not_found_file) {
          # Work out indices to take
          first_round_indices <- lapply(inner_dims, 
            function (x) {
              if (is.null(file_dim_across_files[[x]])) {
                selectors[[x]][['fri']][[1]]
              } else {
                var_file_dims <- dim(selectors[inner_dims][[x]][['fri']])
                which_indices <- file_to_load_sub_indices[which(names(sub_array_dims) %in% names(var_file_dims))] 
                do.call('[[', c(list(selectors[[x]][['fri']]), as.list(which_indices)))
              }
            })
          names(first_round_indices) <- inner_dims
          second_round_indices <- lapply(inner_dims, 
            function (x) {
              if (is.null(file_dim_across_files[[x]])) {
                selectors[[x]][['sri']][[1]]
              } else {
                var_file_dims <- dim(selectors[inner_dims][[x]][['sri']])
                which_indices <- file_to_load_sub_indices[which(names(sub_array_dims) %in% names(var_file_dims))] 
                do.call('[[', c(list(selectors[[x]][['sri']]), as.list(which_indices)))
              }
            })
if (debug) {
print("-> BUILDING A WORK PIECE")
#print(str(selectors))
}
          names(second_round_indices) <- inner_dims
          if (!any(sapply(first_round_indices, length) == 0)) {
            work_piece <- list()
            work_piece[['first_round_indices']] <- first_round_indices
            work_piece[['second_round_indices']] <- second_round_indices
            work_piece[['file_indices_in_array_of_files']] <- file_to_load_indices
            work_piece[['file_path']] <- file_to_load
            work_piece[['store_dims']] <- final_dims
            # Work out store position
            store_position <- final_dims
            store_position[names(file_to_load_indices)] <- file_to_load_indices
            store_position[inner_dims] <- rep(1, length(inner_dims))
            work_piece[['store_position']] <- store_position
            # Work out file selectors
            file_selectors <- sapply(file_dims, 
              function (x) {
                vector_to_pick <- 1
                if (x %in% names(depending_file_dims)) {
                  vector_to_pick <- file_to_load_indices[depending_file_dims[[x]]]
                }
                selectors[file_dims][[x]][[vector_to_pick]][file_to_load_indices[x]]
              })
            names(file_selectors) <- file_dims
            work_piece[['file_selectors']] <- file_selectors
            # Send variables for transformation
            if (!is.null(transform) && (length(transform_vars) > 0)) {
              vars_to_transform <- NULL
              picked_vars_to_transform <- which(names(picked_vars[[i]]) %in% transform_vars)
              if (length(picked_vars_to_transform) > 0) {
                picked_vars_to_transform <- names(picked_vars[[i]])[picked_vars_to_transform]
                vars_to_transform <- c(vars_to_transform, picked_vars[[i]][picked_vars_to_transform])
                if (any(picked_vars_to_transform %in% names(picked_vars_ordered[[i]]))) {
                  picked_vars_ordered_to_transform <- picked_vars_to_transform[which(picked_vars_to_transform %in% names(picked_vars_ordered[[i]]))]
                  vars_to_transform[picked_vars_ordered_to_transform] <- picked_vars_ordered[[i]][picked_vars_ordered_to_transform]
                }
              }
              picked_common_vars_to_transform <- which(names(picked_common_vars) %in% transform_vars)
              if (length(picked_common_vars_to_transform) > 0) {
                picked_common_vars_to_transform <- names(picked_common_vars)[picked_common_vars_to_transform]
                vars_to_transform <- c(vars_to_transform, picked_common_vars[picked_common_vars_to_transform])
                if (any(picked_common_vars_to_transform %in% names(picked_common_vars_ordered))) {
                  picked_common_vars_ordered_to_transform <- picked_common_vars_to_transform[which(picked_common_vars_to_transform %in% names(picked_common_vars_ordered))]
                  vars_to_transform[picked_common_vars_ordered_to_transform] <- picked_common_vars_ordered[picked_common_vars_ordered_to_transform]
                }
              }
              work_piece[['vars_to_transform']] <- vars_to_transform
            }
            # Send flag to load metadata
            if (load_file_metadata) {
              work_piece[['save_metadata_in']] <- paste0(metadata_folder, '/', metadata_file_counter)
            }
            work_pieces <- c(work_pieces, list(work_piece))
          }
        }
        j <- j + 1
      }
    }
  }
#print("N")
if (debug) {
print("-> WORK PIECES BUILT")
}

  # Calculate the progress %s that will be displayed and assign them to 
  # the appropriate work pieces.
  if (length(work_pieces) / num_procs >= 2 && !silent) {
    if (length(work_pieces) / num_procs < 10) {
      amount <- 100 / ceiling(length(work_pieces) / num_procs)
      reps <- ceiling(length(work_pieces) / num_procs)
    } else {
      amount <- 10
      reps <- 10
    }
    progress_steps <- rep(amount, reps)
    if (length(work_pieces) < (reps + 1)) {
      selected_pieces <- length(work_pieces)
      progress_steps <- c(sum(head(progress_steps, reps)),
                          tail(progress_steps, reps))
    } else {
      selected_pieces <- round(seq(1, length(work_pieces), 
                                   length.out = reps + 1))[-1]
    }
    progress_steps <- paste0(' + ', round(progress_steps, 2), '%')
    progress_message <- 'Progress: 0%'
  } else {
    progress_message <- ''
    selected_pieces <- NULL
  }
  piece_counter <- 1
  step_counter <- 1
  work_pieces <- lapply(work_pieces, 
    function (x) {
      if (piece_counter %in% selected_pieces) {
        wp <- c(x, list(progress_amount = progress_steps[step_counter]))
        step_counter <<- step_counter + 1
      } else {
        wp <- x
      }
      piece_counter <<- piece_counter + 1
      wp
    })
  if (!silent) {
    .message("If the size of the requested data is close to or above the free shared RAM memory, R may crash.")
    .message("If the size of the requested data is close to or above the half of the free RAM memory, R may crash.")
    .message(paste0("Will now proceed to read and process ", length(work_pieces), " data files:"))
    if (length(work_pieces) < 30) {
      lapply(work_pieces, function (x) .message(x[['file_path']], indent = 2))
    } else {
      .message("The list of files is long. You can check it after Start() finishes in the output '$Files'.", indent = 2, exdent = 5)
    }
  }

  # Build the cluster of processes that will do the work and dispatch work pieces.
  # The function .LoadDataFile is applied to each work piece. This function will
  # open the data file, regrid if needed, subset, apply the mask, 
  # compute and apply the weights if needed,
  # disable extreme values and store in the shared memory matrix.
#print("O")
  if (!silent) {
    .message("Loading... This may take several minutes...")
    if (progress_message != '') {
      .message(progress_message, appendLF = FALSE)
    }
  }
  if (num_procs == 1) {
    found_files <- lapply(work_pieces, .LoadDataFile, 
                          shared_matrix_pointer = shared_matrix_pointer,
                          file_data_reader = file_data_reader, 
                          synonims = synonims,
                          transform = transform, 
                          transform_params = transform_params,
                          silent = silent, debug = debug)
  } else {
    cluster <- makeCluster(num_procs, outfile = "")
    # Send the heavy work to the workers
    work_errors <- try({
      found_files <- clusterApplyLB(cluster, work_pieces, .LoadDataFile, 
                                    shared_matrix_pointer = shared_matrix_pointer,
                                    file_data_reader = file_data_reader,
                                    synonims = synonims,
                                    transform = transform, 
                                    transform_params = transform_params,
                                    silent = silent, debug = debug)
    })
    stopCluster(cluster)
  }

  if (!silent) {
    if (progress_message != '') {
      .message("\n", tag = '')
    }
  }
#print("P")
  data_array <- array(bigmemory::as.matrix(data_array), dim = final_dims)
  gc()

  # Load metadata and remove the metadata folder
  if (!is.null(metadata_dims)) {
    loaded_metadata_files <- list.files(metadata_folder)
    loaded_metadata <- lapply(paste0(metadata_folder, '/', loaded_metadata_files), readRDS)
    unlink(metadata_folder, recursive = TRUE)
    return_metadata <- vector('list', length = prod(dim(array_of_metadata_flags)[metadata_dims]))
    return_metadata[as.numeric(loaded_metadata_files)] <- loaded_metadata
    dim(return_metadata) <- dim(array_of_metadata_flags[metadata_dims])
    attr(data_array, 'variables') <- return_metadata
    # TODO: Try to infer data type from loaded_metadata
    # as.integer(data_array) 
  }
  failed_pieces <- work_pieces[which(unlist(found_files))]
  for (failed_piece in failed_pieces) {
    array_of_not_found_files <- do.call('[<-', 
      c(list(array_of_not_found_files), 
        as.list(failed_piece[['file_indices_in_array_of_files']]),
        list(value = TRUE)))
  }
  if (any(array_of_not_found_files)) {
    for (i in 1:prod(dim(array_of_files_to_load))) {
      if (is.na(array_of_not_found_files[i])) {
        array_of_files_to_load[i] <- NA
      } else {
        if (array_of_not_found_files[i]) {
          array_of_not_found_files[i] <- array_of_files_to_load[i]
          array_of_files_to_load[i] <- NA
        } else {
          array_of_not_found_files[i] <- NA
        }
      }
    }
  } else {
    array_of_not_found_files <- NULL
  }

  } # End if (retrieve)
  # Replace the vars and common vars by the transformed vars and common vars
  for (i in 1:length(dat)) {
    if (length(names(transformed_vars[[i]])) > 0) {
      picked_vars[[i]][names(transformed_vars[[i]])] <- transformed_vars[[i]]
    } else if (length(names(picked_vars_ordered[[i]])) > 0) {
      picked_vars[[i]][names(picked_vars_ordered[[i]])] <- picked_vars_ordered[[i]]
    }
  }
  if (length(names(transformed_common_vars)) > 0) {
    picked_common_vars[names(transformed_common_vars)] <- transformed_common_vars
  } else if (length(names(picked_common_vars_ordered)) > 0) {
    picked_common_vars[names(picked_common_vars_ordered)] <- picked_common_vars_ordered
  }
if (debug) {
print("-> THE TRANSFORMED VARS:")
print(str(transformed_vars))
print("-> THE PICKED VARS:")
print(str(picked_vars))
}
Nicolau Manubens's avatar
Nicolau Manubens committed
  file_selectors <- NULL
  for (i in 1:length(dat)) {
    file_selectors[[dat[[i]][['name']]]] <- dat[[i]][['selectors']][which(names(dat[[i]][['selectors']]) %in% found_file_dims[[i]])]
  }
  if (retrieve) {
    if (!silent) {
      .message("Successfully retrieved data.")
    }
    attributes(data_array) <- c(attributes(data_array), 
                                list(Variables = c(list(common = picked_common_vars), picked_vars),
                                     Files = array_of_files_to_load, 
                                     NotFoundFiles = array_of_not_found_files,
                                     FileSelectors = file_selectors)
                               )
    attr(data_array, 'class') <- c('startR_cube', attr(data_array, 'class'))
    data_array
  } else {
    if (!silent) {
      .message("Successfully discovered data dimensions.")
    }
    start_call <- match.call()
Nicolau Manubens's avatar
Nicolau Manubens committed
    for (i in 2:length(start_call)) {
      if (class(start_call[[i]]) == 'name') {
        start_call[[i]] <- eval(start_call[[i]])
      }
    }
    start_call[['retrieve']] <- TRUE
    attributes(start_call) <- c(attributes(start_call),
                                list(Dimensions = final_dims,
                                     Variables = c(list(common = picked_common_vars), picked_vars),
                                     ExpectedFiles = array_of_files_to_load,
                                     FileSelectors = file_selectors)
                               )
    attr(start_call, 'class') <- c('startR_header', attr(start_call, 'class'))
    start_call
}

# This function is the responsible for loading the data of each work
# piece.
.LoadDataFile <- function(work_piece, shared_matrix_pointer, 
                          file_data_reader, synonims,
                          transform, transform_params,
                          silent = FALSE, debug = FALSE) {
#  suppressPackageStartupMessages({library(bigmemory)})
### TODO: Specify dependencies as parameter
#  suppressPackageStartupMessages({library(ncdf4)})
  # Auxiliar function to convert array indices to lineal indices.
  # This function expects a numeric vector of single indices or a list of 
  # numeric vectors with multiple indices for each dimension, and a 
  # numeric vector of dimension sizes.
  .arrayIndices2VectorIndices <- function(indices, dims) {
    # Check indices
    if (!is.list(indices)) {
      if (is.numeric(indices)) {
        indices <- as.list(indices)
      } else {
        stop("Parameter 'indices' must be a numeric vector or a list of ",
             "numeric vectors.")
      }
    }
    sapply(indices, 
      function(x) {
        if (!is.numeric(x)) {
          stop("Parameter 'indices' must be a numeric vector or a list ",
               "of numeric vectors.")
        } else if (any(is.na(x) | is.nan(x) | is.infinite(x))) {
          stop("Parameter 'indices' must not contain NA/NaN/Inf values.")
        } else if (length(x) < 1) {
          stop("Parameter 'indices' must contain at least one index ",
               "for each dimension.")
        }
      })
  
    # Check dims
    if (!is.numeric(dims)) {
      stop("Parameter 'dims' must be a numeric vector.")
    } else if (any(is.na(dims) | is.nan(dims) | is.infinite(dims))) {
      stop("Parameter 'dims' must not contain NA/NaN/Inf values.")
    } else if (any(sapply(dims, length) != 1)) {
      stop("Parameter 'dims' must contain a single numeric element for ",
           "each dimension.")
    } else if (length(indices) != length(dims)) {
      stop("There must be as many dimension selectors in 'indices' as ",
           "dimensions in 'dims'.")
    }
  
    find_indices <- function(indices, dims) {
      if (max(indices[[1]]) > dims[1] || min(indices[[1]]) < 1) {
        stop("Provided indices out of range.")
      }
      if (length(dims) == 1) {
        indices[[1]]
      } else {
        found_indices <- find_indices(indices[-1], dims[-1])
        new_found_indices <- c()
        for (i in 1:length(indices[[1]])) {
          new_found_indices <- c(new_found_indices, (indices[[1]][i] - 1) * prod(dims[-1]) + found_indices)
        }
        new_found_indices
      }
    }
  
    indices <- rev(indices)
    dims <- rev(dims)
  
    find_indices(indices, dims)
  }

#print("1")
  store_indices <- as.list(work_piece[['store_position']])
  first_round_indices <- work_piece[['first_round_indices']]
  second_round_indices <- work_piece[['second_round_indices']]
#print("2")
  file_to_open <- work_piece[['file_path']]
  sub_array <- file_data_reader(file_to_open, NULL, 
                                work_piece[['file_selectors']],
                                first_round_indices, synonims)
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> LOADING A WORK PIECE")
print("-> STRUCTURE OF READ UNTRANSFORMED DATA:")
print(str(sub_array))
print("-> STRUCTURE OF VARIABLES TO TRANSFORM:")
print(str(work_piece[['vars_to_transform']]))
print("-> COMMON ARRAY DIMENSIONS:")
print(str(work_piece[['store_dims']]))
}
}
  if (!is.null(sub_array)) {
    # Apply data transformation once we have the data arrays.
    if (!is.null(transform)) {
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> PROCEEDING TO TRANSFORM ARRAY")
print("-> DIMENSIONS OF ARRAY RIGHT BEFORE TRANSFORMING:")
print(dim(sub_array))
}
}
      sub_array <- do.call(transform, c(list(data_array = sub_array,
                                             variables = work_piece[['vars_to_transform']],
                                             file_selectors = work_piece[['file_selectors']]),
                                             transform_params))
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER TRANSFORMING:")
print(str(sub_array))
print("-> DIMENSIONS OF ARRAY RIGHT AFTER TRANSFORMING:")
print(dim(sub_array$data_array))
}
}
      sub_array <- sub_array$data_array
      # Subset with second round of indices
      dims_to_crop <- which(!sapply(second_round_indices, is.null))
      if (length(dims_to_crop) > 0) {
        dimnames_to_crop <- names(second_round_indices)[dims_to_crop]
        sub_array <- Subset(sub_array, dimnames_to_crop, 
                            second_round_indices[dimnames_to_crop])
      }
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER SUBSETTING WITH 2nd ROUND INDICES:")
print(str(sub_array))
}
}
    }

    metadata <- attr(sub_array, 'variables')

    store_indices <- lapply(names(store_indices), 
      function (x) {
        if (!(x %in% names(first_round_indices))) {
          store_indices[[x]]
        } else if (is.null(second_round_indices[[x]])) {
          1:dim(sub_array)[x]
        } else {
          if (is.numeric(second_round_indices[[x]])) {
            ## TODO: Review carefully this line. Inner indices are all 
            ## aligned to the left-most positions. If dataset A has longitudes
            ## 1, 2, 3, 4 but dataset B has only longitudes 3 and 4, then
            ## they will be stored as follows:
            ## 1, 2, 3, 4
            ## 3, 4, NA, NA
            ##x - min(x) + 1
            1:length(second_round_indices[[x]])
          } else {
            1:length(second_round_indices[[x]])
          }
        }
      })
if (debug) {
Nicolau Manubens Gil's avatar
Nicolau Manubens Gil committed
if (all(unlist(store_indices) == 1)) {
print("-> STRUCTURE OF FIRST ROUND INDICES FOR THIS WORK PIECE:")
print(str(first_round_indices))
print("-> STRUCTURE OF SECOND ROUND INDICES FOR THIS WORK PIECE:")
print(str(second_round_indices))
print("-> STRUCTURE OF STORE INDICES FOR THIS WORK PIECE:")
print(str(store_indices))
}
}
    matrix_indices <- .arrayIndices2VectorIndices(store_indices, work_piece[['store_dims']])
    data_array <- bigmemory::attach.big.matrix(shared_matrix_pointer)
Nicolau Manubens Gil's avatar
Nicolau Manubens Gil committed
    data_array[matrix_indices] <- as.vector(sub_array)

    if (!is.null(work_piece[['save_metadata_in']])) {
      saveRDS(metadata, file = work_piece[['save_metadata_in']])
    }
  }
  if (!is.null(work_piece[['progress_amount']]) && !silent) {
    message(work_piece[['progress_amount']], appendLF = FALSE)
  }
  is.null(sub_array)
}