Start.R 168 KB
Newer Older
  } else {
    progress_message <- ''
    selected_pieces <- NULL
  }
  piece_counter <- 1
  step_counter <- 1
  work_pieces <- lapply(work_pieces, 
    function (x) {
      if (piece_counter %in% selected_pieces) {
        wp <- c(x, list(progress_amount = progress_steps[step_counter]))
        step_counter <<- step_counter + 1
      } else {
        wp <- x
      }
      piece_counter <<- piece_counter + 1
      wp
    })
  if (!silent) {
    .message("If the size of the requested data is close to or above the free shared RAM memory, R may crash.")
    .message("If the size of the requested data is close to or above the half of the free RAM memory, R may crash.")
    .message(paste0("Will now proceed to read and process ", length(work_pieces), " data files:"))
    if (length(work_pieces) < 30) {
      lapply(work_pieces, function (x) .message(x[['file_path']], indent = 2))
    } else {
      .message("The list of files is long. You can check it after Start() finishes in the output '$Files'.", indent = 2, exdent = 5)
    }
  }

  # Build the cluster of processes that will do the work and dispatch work pieces.
  # The function .LoadDataFile is applied to each work piece. This function will
  # open the data file, regrid if needed, subset, apply the mask, 
  # compute and apply the weights if needed,
  # disable extreme values and store in the shared memory matrix.
#print("O")
  if (!silent) {
    .message("Loading... This may take several minutes...")
    if (progress_message != '') {
      .message(progress_message, appendLF = FALSE)
    }
  }
  if (num_procs == 1) {
    found_files <- lapply(work_pieces, .LoadDataFile, 
                          shared_matrix_pointer = shared_matrix_pointer,
                          file_data_reader = file_data_reader, 
                          synonims = synonims,
                          transform = transform, 
                          transform_params = transform_params,
                          silent = silent, debug = debug)
  } else {
    cluster <- makeCluster(num_procs, outfile = "")
    # Send the heavy work to the workers
    work_errors <- try({
      found_files <- clusterApplyLB(cluster, work_pieces, .LoadDataFile, 
                                    shared_matrix_pointer = shared_matrix_pointer,
                                    file_data_reader = file_data_reader,
                                    synonims = synonims,
                                    transform = transform, 
                                    transform_params = transform_params,
                                    silent = silent, debug = debug)
    })
    stopCluster(cluster)
  }

  if (!silent) {
    if (progress_message != '') {
      .message("\n", tag = '')
    }
  }
#print("P")

# NOTE: If merge_across_dims = TRUE, there might be additional NAs due to
#       unequal inner_dim ('time') length across file_dim ('file_date').
#       If merge_across_dims_narm = TRUE, add additional lines to remove these NAs.
# TODO: Now it assumes that only one '_across'. Add a for loop for more-than-one case. 
   if (merge_across_dims_narm) {

     across_inner_dim <- inner_dims_across_files[[1]]  #TODO: more than one?
     across_file_dim <- names(inner_dims_across_files)  #TODO: more than one?
    # Get the length of each inner_dim ('time') along each file_dim ('file_date')
    wp_inner_across_dim <- lapply(lapply(work_pieces, '[[', 'first_round_indices'),
                                  '[[', across_inner_dim)
    length_inner_across_dim <- lapply(wp_inner_across_dim, length)

    # Get the length of these two dimensions in final_dims
    length_inner_across_store_dims <- final_dims[across_inner_dim]
    length_file_across_store_dims <- final_dims[across_file_dim]

    # Create a logical array for merge_across_dims
    logi_array <- array(rep(FALSE,
                            length_file_across_store_dims * length_inner_across_store_dims),
                            dim = c(length_inner_across_store_dims, length_file_across_store_dims))
    for (i in 1:length_file_across_store_dims) {  #1:4
      logi_array[1:length_inner_across_dim[[i]], i] <- TRUE
    }

    # First, get the data array with final_dims dimension
    data_array_final_dims <- array(bigmemory::as.matrix(data_array), dim = final_dims)

    # Change the NA derived from additional spaces to -9999, then remove these -9999
    func_remove_blank <- function(data_array, logi_array) {
      # dim(data_array) = [time, file_date]
      # dim(logi_array) = [time, file_date]
      # Change the blank spaces from NA to -9999
      data_array[which(!logi_array)] <- -9999
      return(data_array)
    }
    data_array_final_dims <- Apply(data_array_final_dims,
                                   target_dims = c(across_inner_dim, across_file_dim),  #c('time', 'file_date')
                                   output_dims = c(across_inner_dim, across_file_dim),
                                   fun = func_remove_blank,
                                   logi_array = logi_array)$output1
    ## reorder back to the correct dim
    tmp <- match(names(final_dims), names(dim(data_array_final_dims)))
    data_array_final_dims <- s2dverification:::.aperm2(data_array_final_dims, tmp)
    data_array_tmp <- data_array_final_dims[which(data_array_final_dims != -9999)]  # become a vector

    if (!split_multiselected_dims) {
    # only merge_across_dims -> the 'time' dim length needs to be adjusted
      final_dims_fake_name <- names(final_dims_fake)
      pos_across_inner_dim <- which(final_dims_fake_name == across_inner_dim)
      new_length_inner_dim <- sum(unlist(length_inner_across_dim))
      if (pos_across_inner_dim != length(final_dims_fake)) {
      final_dims_fake <- c(final_dims_fake[1:(pos_across_inner_dim - 1)],
                           new_length_inner_dim,
                           final_dims_fake[(pos_across_inner_dim + 1):length(final_dims_fake)])
      } else {
      final_dims_fake <- c(final_dims_fake[1:(pos_across_inner_dim - 1)],
                           new_length_inner_dim)
      }
      names(final_dims_fake) <- final_dims_fake_name
    }
    data_array <- array(data_array_tmp, dim = final_dims_fake)

  } else {  # merge_across_dims_narm = F (old version)
    data_array <- array(bigmemory::as.matrix(data_array), dim = final_dims_fake)
  }

  # Load metadata and remove the metadata folder
  if (!is.null(metadata_dims)) {
    loaded_metadata_files <- list.files(metadata_folder)
    loaded_metadata <- lapply(paste0(metadata_folder, '/', loaded_metadata_files), readRDS)
    unlink(metadata_folder, recursive = TRUE)
    return_metadata <- vector('list', length = prod(dim(array_of_metadata_flags)[metadata_dims]))
    return_metadata[as.numeric(loaded_metadata_files)] <- loaded_metadata
    dim(return_metadata) <- dim(array_of_metadata_flags[metadata_dims])
    attr(data_array, 'Variables') <- return_metadata
    # TODO: Try to infer data type from loaded_metadata
    # as.integer(data_array) 
  }
  failed_pieces <- work_pieces[which(unlist(found_files))]
  for (failed_piece in failed_pieces) {
    array_of_not_found_files <- do.call('[<-', 
      c(list(array_of_not_found_files), 
        as.list(failed_piece[['file_indices_in_array_of_files']]),
        list(value = TRUE)))
  }
  if (any(array_of_not_found_files)) {
    for (i in 1:prod(dim(array_of_files_to_load))) {
      if (is.na(array_of_not_found_files[i])) {
        array_of_files_to_load[i] <- NA
      } else {
        if (array_of_not_found_files[i]) {
          array_of_not_found_files[i] <- array_of_files_to_load[i]
          array_of_files_to_load[i] <- NA
        } else {
          array_of_not_found_files[i] <- NA
        }
      }
    }
  } else {
    array_of_not_found_files <- NULL
  }

  } # End if (retrieve)
  # Replace the vars and common vars by the transformed vars and common vars
  for (i in 1:length(dat)) {
    if (length(names(transformed_vars[[i]])) > 0) {
      picked_vars[[i]][names(transformed_vars[[i]])] <- transformed_vars[[i]]
    } else if (length(names(picked_vars_ordered[[i]])) > 0) {
      picked_vars[[i]][names(picked_vars_ordered[[i]])] <- picked_vars_ordered[[i]]
    }
  }
  if (length(names(transformed_common_vars)) > 0) {
    picked_common_vars[names(transformed_common_vars)] <- transformed_common_vars
  } else if (length(names(picked_common_vars_ordered)) > 0) {
    picked_common_vars[names(picked_common_vars_ordered)] <- picked_common_vars_ordered
  }
if (debug) {
print("-> THE TRANSFORMED VARS:")
print(str(transformed_vars))
print("-> THE PICKED VARS:")
print(str(picked_vars))
}
Nicolau Manubens's avatar
Nicolau Manubens committed
  file_selectors <- NULL
  for (i in 1:length(dat)) {
    file_selectors[[dat[[i]][['name']]]] <- dat[[i]][['selectors']][which(names(dat[[i]][['selectors']]) %in% found_file_dims[[i]])]
  }
  if (retrieve) {
    if (!silent) {
      .message("Successfully retrieved data.")
    }
Nicolau Manubens Gil's avatar
Nicolau Manubens Gil committed
    var_backup <- attr(data_array, 'Variables')[[1]]
    attr(data_array, 'Variables') <- NULL
    attributes(data_array) <- c(attributes(data_array), 
                                list(Variables = c(list(common = c(picked_common_vars, var_backup)), 
                                                   picked_vars),
                                     Files = array_of_files_to_load, 
                                     NotFoundFiles = array_of_not_found_files,
                                     FileSelectors = file_selectors,
                                     PatternDim = found_pattern_dim)
Nicolau Manubens's avatar
Nicolau Manubens committed
    attr(data_array, 'class') <- c('startR_array', attr(data_array, 'class'))
  } else {
    if (!silent) {
      .message("Successfully discovered data dimensions.")
    }
    start_call <- match.call()
Nicolau Manubens's avatar
Nicolau Manubens committed
    for (i in 2:length(start_call)) {
Nicolau Manubens's avatar
Nicolau Manubens committed
      if (class(start_call[[i]]) %in% c('name', 'call')) {
        start_call[[i]] <- eval.parent(start_call[[i]])
Nicolau Manubens's avatar
Nicolau Manubens committed
      }
    }
    start_call[['retrieve']] <- TRUE
    attributes(start_call) <- c(attributes(start_call),
                                     Variables = c(list(common = picked_common_vars), picked_vars),
                                     ExpectedFiles = array_of_files_to_load,
Nicolau Manubens's avatar
Nicolau Manubens committed
                                     FileSelectors = file_selectors,
                                     PatternDim = found_pattern_dim,
Nicolau Manubens's avatar
Nicolau Manubens committed
                                     MergedDims = if (merge_across_dims) {
                                                    inner_dims_across_files
                                                  } else {
                                                    NULL
                                                  },
                                     SplitDims = if (split_multiselected_dims) {
                                                   all_split_dims
                                                 } else {
                                                   NULL
                                                 })
Nicolau Manubens's avatar
Nicolau Manubens committed
    attr(start_call, 'class') <- c('startR_cube', attr(start_call, 'class'))
}

# This function is the responsible for loading the data of each work
# piece.
.LoadDataFile <- function(work_piece, shared_matrix_pointer, 
                          file_data_reader, synonims,
                          transform, transform_params,
                          silent = FALSE, debug = FALSE) {
#  suppressPackageStartupMessages({library(bigmemory)})
### TODO: Specify dependencies as parameter
#  suppressPackageStartupMessages({library(ncdf4)})

#print("1")
  store_indices <- as.list(work_piece[['store_position']])
  first_round_indices <- work_piece[['first_round_indices']]
  second_round_indices <- work_piece[['second_round_indices']]
#print("2")
  file_to_open <- work_piece[['file_path']]
  sub_array <- file_data_reader(file_to_open, NULL, 
                                work_piece[['file_selectors']],
                                first_round_indices, synonims)
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> LOADING A WORK PIECE")
print("-> STRUCTURE OF READ UNTRANSFORMED DATA:")
print(str(sub_array))
print("-> STRUCTURE OF VARIABLES TO TRANSFORM:")
print(str(work_piece[['vars_to_transform']]))
print("-> COMMON ARRAY DIMENSIONS:")
print(str(work_piece[['store_dims']]))
}
}
  if (!is.null(sub_array)) {
    # Apply data transformation once we have the data arrays.
    if (!is.null(transform)) {
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> PROCEEDING TO TRANSFORM ARRAY")
print("-> DIMENSIONS OF ARRAY RIGHT BEFORE TRANSFORMING:")
print(dim(sub_array))
}
}
      sub_array <- do.call(transform, c(list(data_array = sub_array,
                                             variables = work_piece[['vars_to_transform']],
                                             file_selectors = work_piece[['file_selectors']]),
                                             transform_params))
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER TRANSFORMING:")
print(str(sub_array))
print("-> DIMENSIONS OF ARRAY RIGHT AFTER TRANSFORMING:")
print(dim(sub_array$data_array))
}
}
      sub_array <- sub_array$data_array
      # Subset with second round of indices
      dims_to_crop <- which(!sapply(second_round_indices, is.null))
      if (length(dims_to_crop) > 0) {
        dimnames_to_crop <- names(second_round_indices)[dims_to_crop]
        sub_array <- Subset(sub_array, dimnames_to_crop, 
                            second_round_indices[dimnames_to_crop])
      }
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER SUBSETTING WITH 2nd ROUND INDICES:")
print(str(sub_array))
}
}
    }

    metadata <- attr(sub_array, 'variables')

    store_indices <- lapply(names(store_indices), 
      function (x) {
        if (!(x %in% names(first_round_indices))) {
          store_indices[[x]]
        } else if (is.null(second_round_indices[[x]])) {
          1:dim(sub_array)[x]
        } else {
          if (is.numeric(second_round_indices[[x]])) {
            ## TODO: Review carefully this line. Inner indices are all 
            ## aligned to the left-most positions. If dataset A has longitudes
            ## 1, 2, 3, 4 but dataset B has only longitudes 3 and 4, then
            ## they will be stored as follows:
            ## 1, 2, 3, 4
            ## 3, 4, NA, NA
            ##x - min(x) + 1
            1:length(second_round_indices[[x]])
          } else {
            1:length(second_round_indices[[x]])
          }
        }
      })
Nicolau Manubens Gil's avatar
Nicolau Manubens Gil committed
if (all(unlist(store_indices) == 1)) {
print("-> STRUCTURE OF FIRST ROUND INDICES FOR THIS WORK PIECE:")
print(str(first_round_indices))
print("-> STRUCTURE OF SECOND ROUND INDICES FOR THIS WORK PIECE:")
print(str(second_round_indices))
print("-> STRUCTURE OF STORE INDICES FOR THIS WORK PIECE:")
print(str(store_indices))
}
}
    store_dims <- work_piece[['store_dims']]
    # split the storage work of the loaded subset in parts
    largest_dim_name <- names(dim(sub_array))[which.max(dim(sub_array))]
    max_parts <- length(store_indices[[largest_dim_name]])

    # Indexing a data file of N MB with expand.grid takes 30*N MB
    # The peak ram of Start is, minimum, 2 * total data to load from all files
    # due to inefficiencies in other regions of the code
    # The more parts we split the indexing done below in, the lower
    # the memory footprint of the indexing and the fast. 
    # But more than 10 indexing iterations (parts) for each MB processed 
    # makes the iteration slower (tested empirically on BSC workstations).
    subset_size_in_mb <- prod(dim(sub_array)) * 8 / 1024 / 1024
    best_n_parts <- ceiling(subset_size_in_mb * 10)
    # We want to set n_parts to a greater value than the one that would 
    # result in a memory footprint (of the subset indexing code below) equal
    # to 2 * total data to load from all files.
    # s = subset size in MB
    # p = number of parts to break it in
    # T = total size of data to load
    # then, s / p * 30 = 2 * T
    # then, p = s * 15 / T
    min_n_parts <- ceiling(prod(dim(sub_array)) * 15 / prod(store_dims))
    # Make sure we pick n_parts much greater than the minimum calculated
    n_parts <- min_n_parts * 10
    if (n_parts > best_n_parts) {
      n_parts <- best_n_parts
    }
    # Boundary checks
    if (n_parts < 1) {
      n_parts <- 1
    }
    if (n_parts > max_parts) {
      n_parts <- max_parts
    }

    if (n_parts > 1) {
      make_parts <- function(length, n) {
        clusters <- cut(1:length, n, labels = FALSE)
        lapply(1:n, function(y) which(clusters == y))
      }
      part_indices <- make_parts(max_parts, n_parts)
      parts <- lapply(part_indices, 
                      function(x) {
                        store_indices[[largest_dim_name]][x]
                      })
    } else {
      part_indices <- list(1:max_parts)
      parts <- store_indices[largest_dim_name]
    }
    # do the storage work
    weights <- sapply(1:length(store_dims), 
                      function(i) prod(c(1, store_dims)[1:i]))
    part_indices_in_sub_array <- as.list(rep(TRUE, length(dim(sub_array))))
    names(part_indices_in_sub_array) <- names(dim(sub_array))
    data_array <- bigmemory::attach.big.matrix(shared_matrix_pointer)
    for (i in 1:n_parts) {
      store_indices[[largest_dim_name]] <- parts[[i]]
      # Converting array indices to vector indices
      matrix_indices <- do.call("expand.grid", store_indices)
      # Given a matrix where each row is a set of array indices of an element
      # the vector indices are computed
      matrix_indices <- 1 + colSums(t(matrix_indices - 1) * weights)
      part_indices_in_sub_array[[largest_dim_name]] <- part_indices[[i]]
      data_array[matrix_indices] <- as.vector(do.call('[',
        c(list(x = sub_array), 
          part_indices_in_sub_array)))
    }
    rm(data_array)
    gc()

    if (!is.null(work_piece[['save_metadata_in']])) {
      saveRDS(metadata, file = work_piece[['save_metadata_in']])
    }
  }
  if (!is.null(work_piece[['progress_amount']]) && !silent) {
    message(work_piece[['progress_amount']], appendLF = FALSE)
  }
  is.null(sub_array)
}