Start.R 232 KB
Newer Older
aho's avatar
aho committed
    failed_pieces <- work_pieces[which(unlist(found_files))]
    for (failed_piece in failed_pieces) {
      array_of_not_found_files <- do.call('[<-', 
                                          c(list(array_of_not_found_files), 
                                            as.list(failed_piece[['file_indices_in_array_of_files']]),
                                            list(value = TRUE)))
    }
    if (any(array_of_not_found_files)) {
      for (i in 1:prod(dim(array_of_files_to_load))) {
        if (is.na(array_of_not_found_files[i])) {
          array_of_files_to_load[i] <- NA
        } else {
          if (array_of_not_found_files[i]) {
            array_of_not_found_files[i] <- array_of_files_to_load[i]
            array_of_files_to_load[i] <- NA
          } else {
            array_of_not_found_files[i] <- NA
          }
        }
      }
    } else {
      array_of_not_found_files <- NULL
    }
    
  } # End if (retrieve)
  else { # if retrieve = FALSE, metadata still needs to reshape

    if (merge_across_dims & (split_multiselected_dims | merge_across_dims_narm)) {
      if (!merge_across_dims_narm) {
        tmp <- match(names(final_dims), names(dims_of_merge_dim))
        if (any(diff(tmp[!is.na(tmp)]) < 0)) { #need to reorder
          picked_common_vars[[across_inner_dim]] <- .aperm2(picked_common_vars[[across_inner_dim]], tmp[!is.na(tmp)])
        }
        metadata_tmp <- picked_common_vars[[across_inner_dim]]
      } else {
        tmp <- remove_additional_na_from_merge(
aho's avatar
aho committed
                 data_array = NULL,
                 merge_dim_metadata = picked_common_vars[[across_inner_dim]],
                 inner_dims_across_files, final_dims,
aho's avatar
aho committed
                 length_inner_across_dim)
        metadata_tmp <- tmp$merge_dim_metadata
      }

aho's avatar
aho committed
      if (length(metadata_tmp) != prod(final_dims_fake_metadata)) {
        stop(paste0("After reshaping, the metadata do not fit into the expected output dimension. ",
                    "Check if the reshaping parameters are used correctly or contact support."))
      }

      #NOTE: When one file contains values for dicrete dimensions, rearrange the 
      #      chunks (i.e., work_piece) is necessary.
      if (split_multiselected_dims) {
        tmp <- rebuild_array_merge_split(
aho's avatar
aho committed
                 data_array = NULL, metadata = metadata_tmp, indices_chunk,
                 all_split_dims, final_dims_fake, across_inner_dim, length_inner_across_dim)
        metadata_tmp <- tmp$metadata
      }
aho's avatar
aho committed
      metadata_tmp <- array(metadata_tmp, dim = final_dims_fake_metadata)

      # If split_multiselected_dims + merge_across_dims, the dimension order may change above.
      # To get the user-required dim order, we need to reorder the array again.
      if (split_multiselected_dims) {
        if (inner_dim_pos_in_split_dims != 1) {
          correct_order <- match(names(final_dims_fake_output), names(final_dims_fake))
#          data_array <- .aperm2(data_array, correct_order)
          correct_order_metadata <- match(names(final_dims_fake_output), names(all_split_dims[[across_inner_dim]]))
          metadata_tmp <- .aperm2(metadata_tmp, correct_order_metadata[!is.na(correct_order_metadata)])
        }
      }
      # Convert numeric back to dates
      if ('time' %in% synonims[[across_inner_dim]]) {
        metadata_tmp <- as.POSIXct(metadata_tmp, origin = "1970-01-01", tz = 'UTC')
      }
      picked_common_vars[[across_inner_dim]] <- metadata_tmp
aho's avatar
aho committed
      attr(picked_common_vars[[across_inner_dim]], 'variables') <- saved_reshaped_attr
    } else {  # ! (merge_across_dims + split_multiselected_dims) (old version)
      if (merge_across_dims) {
        # merge_across_dims = TRUE but (merge_across_dims_narm = F & split_multiselected_dims = F)

        inner_dim_pos <- which(names(dims_of_merge_dim) == inner_dims_across_files)
        file_dim_pos <- which(names(dims_of_merge_dim) == names(inner_dims_across_files))
        if (file_dim_pos < inner_dim_pos) {  #need to reorder
          tmp <- seq(1, length(dims_of_merge_dim))
          tmp[inner_dim_pos] <- file_dim_pos
          tmp[file_dim_pos] <- inner_dim_pos
          picked_common_vars[[across_inner_dim]] <- .aperm2(picked_common_vars[[across_inner_dim]], tmp)
        }
aho's avatar
aho committed
        metadata_tmp <- array(picked_common_vars[[across_inner_dim]], dim = final_dims_fake_metadata)
        # Convert numeric back to dates
        if ('time' %in% synonims[[across_inner_dim]]) {
          metadata_tmp <- as.POSIXct(metadata_tmp, origin = "1970-01-01", tz = 'UTC')
        }
        picked_common_vars[[across_inner_dim]] <- metadata_tmp
aho's avatar
aho committed
        attr(picked_common_vars[[across_inner_dim]], 'variables') <- saved_reshaped_attr
      if (split_multiselected_dims & !is.null(picked_common_vars)) {
aho's avatar
aho committed
        if (!identical(inner_dim_has_split_dim, character(0))) {
aho's avatar
aho committed
          metadata_tmp <- array(picked_common_vars[[inner_dim_has_split_dim]], dim = final_dims_fake_metadata)
          # Convert numeric back to dates
aho's avatar
aho committed
          if (is(picked_common_vars[[inner_dim_has_split_dim]], 'POSIXct')) {
            metadata_tmp <- as.POSIXct(metadata_tmp, origin = "1970-01-01", tz = 'UTC')
          }
aho's avatar
aho committed
          picked_common_vars[[inner_dim_has_split_dim]] <- metadata_tmp
aho's avatar
aho committed
          attr(picked_common_vars[[inner_dim_has_split_dim]], 'variables') <- saved_reshaped_attr
aho's avatar
aho committed
  # Print the warnings from transform
Eva Rifà's avatar
Eva Rifà committed
  if (!is.null(c(warnings1, warnings2, warnings3))) {
aho's avatar
aho committed
    transform_warnings_list <- lapply(c(warnings1, warnings2, warnings3), function(x) {
Eva Rifà's avatar
Eva Rifà committed
      return(x$message)
    })
aho's avatar
aho committed
    transform_warnings_list <- unique(transform_warnings_list)
    for (i in 1:length(transform_warnings_list)) {
      .warning(transform_warnings_list[[i]])
aho's avatar
aho committed
  # Change final_dims_fake back because retrieve = FALSE will use it for attributes later
  if (exists("final_dims_fake_output")) {
    final_dims_fake <- final_dims_fake_output
  }
  # Replace the vars and common vars by the transformed vars and common vars
  for (i in 1:length(dat)) {
    if (length(names(transformed_vars[[i]])) > 0) {
      picked_vars[[i]][names(transformed_vars[[i]])] <- transformed_vars[[i]]
    } else if (length(names(picked_vars_ordered[[i]])) > 0) {
      picked_vars[[i]][names(picked_vars_ordered[[i]])] <- picked_vars_ordered[[i]]
    }
  }
  if (length(names(transformed_common_vars)) > 0) {
    picked_common_vars[names(transformed_common_vars)] <- transformed_common_vars
  } else if (length(names(picked_common_vars_ordered)) > 0) {
    picked_common_vars[names(picked_common_vars_ordered)] <- picked_common_vars_ordered
  }
  if (debug) {
    print("-> THE TRANSFORMED VARS:")
    print(str(transformed_vars))
    print("-> THE PICKED VARS:")
    print(str(picked_vars))
  }
  
  file_selectors <- NULL
  for (i in 1:length(dat)) {
    file_selectors[[dat[[i]][['name']]]] <- dat[[i]][['selectors']][which(names(dat[[i]][['selectors']]) %in% found_file_dims[[i]])]
  }
aho's avatar
aho committed
  if (retrieve) {
    if (!silent) {
      .message("Successfully retrieved data.")
    }

    if (all(sapply(return_metadata, is.null))) {
      # We don't have metadata of the variable (e.g., tas). The returned metadata list only 
      # contains those are specified in argument "return_vars".
      Variables_list <- c(list(common = picked_common_vars), picked_vars)
aho's avatar
aho committed
      .warning(paste0("Metadata cannot be retrieved. The reason may be the ",
                      "non-existence of the first file. Use parameter 'metadata_dims'",
                      " to assign to file dimensions along which to return metadata, ",
                      "or check the existence of the first file."))
    } else {
      # Add the metadata of the variable (e.g., tas) into the list of picked_vars or
      # picked_common_vars.
      Variables_list <- combine_metadata_picked_vars(
                          return_metadata, picked_vars, picked_common_vars,
                          metadata_dims, pattern_dims, length(dat))
aho's avatar
aho committed
    }
aho's avatar
aho committed
    attributes(data_array) <- c(attributes(data_array), 
                                list(Variables = Variables_list,
aho's avatar
aho committed
                                     Files = array_of_files_to_load, 
                                     NotFoundFiles = array_of_not_found_files,
                                     FileSelectors = file_selectors,
                                     PatternDim = found_pattern_dim,
                                     ObjectBigmemory = name_bigmemory_obj) #attr(shared_matrix_pointer, 'description')$sharedName)
aho's avatar
aho committed
    )
    attr(data_array, 'class') <- c('startR_array', attr(data_array, 'class'))
    data_array

  } else { # retrieve = FALSE
aho's avatar
aho committed
    if (!silent) {
      .message("Successfully discovered data dimensions.")
    }
    start_call <- match.call()
    for (i in 2:length(start_call)) {
      if (class(start_call[[i]]) %in% c('name', 'call')) {
        start_call[[i]] <- eval.parent(start_call[[i]])
      }
    }
    start_call[['retrieve']] <- TRUE
    attributes(start_call) <- c(attributes(start_call),
                                list(Dimensions = final_dims_fake,
                                     Variables = c(list(common = picked_common_vars), picked_vars),
                                     ExpectedFiles = array_of_files_to_load,
                                     FileSelectors = file_selectors,
                                     PatternDim = found_pattern_dim,
                                     MergedDims = if (merge_across_dims) {
                                       inner_dims_across_files
                                     } else {
                                       NULL
                                     },
                                     SplitDims = if (split_multiselected_dims) {
                                       all_split_dims
                                     } else {
                                       NULL
                                     })
    )
    attr(start_call, 'class') <- c('startR_cube', attr(start_call, 'class'))
    start_call
  }
}

# This function is the responsible for loading the data of each work
# piece.
.LoadDataFile <- function(work_piece, shared_matrix_pointer, 
                          file_data_reader, synonims,
aho's avatar
aho committed
                          transform, transform_params, transform_crop_domain = NULL,
aho's avatar
aho committed
                          silent = FALSE, debug = FALSE) {
  #warning(attr(shared_matrix_pointer, 'description')$sharedName)
aho's avatar
aho committed
  #  suppressPackageStartupMessages({library(bigmemory)})
  ### TODO: Specify dependencies as parameter
  #  suppressPackageStartupMessages({library(ncdf4)})
  
  #print("1")
  store_indices <- as.list(work_piece[['store_position']])
  first_round_indices <- work_piece[['first_round_indices']]
  second_round_indices <- work_piece[['second_round_indices']]
  #print("2")
  file_to_open <- work_piece[['file_path']]
  sub_array <- file_data_reader(file_to_open, NULL, 
                                work_piece[['file_selectors']],
                                first_round_indices, synonims)
  if (debug) {
    if (all(unlist(store_indices[1:6]) == 1)) {
      print("-> LOADING A WORK PIECE")
      print("-> STRUCTURE OF READ UNTRANSFORMED DATA:")
      print(str(sub_array))
      print("-> STRUCTURE OF VARIABLES TO TRANSFORM:")
      print(str(work_piece[['vars_to_transform']]))
      print("-> COMMON ARRAY DIMENSIONS:")
      print(str(work_piece[['store_dims']]))
    }
  }
  if (!is.null(sub_array)) {
    # Apply data transformation once we have the data arrays.
    if (!is.null(transform)) {
      if (debug) {
        if (all(unlist(store_indices[1:6]) == 1)) {
          print("-> PROCEEDING TO TRANSFORM ARRAY")
          print("-> DIMENSIONS OF ARRAY RIGHT BEFORE TRANSFORMING:")
          print(dim(sub_array))
        }
      }
      sub_array <- do.call(transform, c(list(data_array = sub_array,
                                             variables = work_piece[['vars_to_transform']],
                                             file_selectors = work_piece[['file_selectors']],
                                             crop_domain = transform_crop_domain),
aho's avatar
aho committed
                                        transform_params))
      if (debug) {
        if (all(unlist(store_indices[1:6]) == 1)) {
          print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER TRANSFORMING:")
          print(str(sub_array))
          print("-> DIMENSIONS OF ARRAY RIGHT AFTER TRANSFORMING:")
          print(dim(sub_array$data_array))
        }
      }
      sub_array <- sub_array$data_array
      # Subset with second round of indices
      dims_to_crop <- which(!sapply(second_round_indices, is.null))
      if (length(dims_to_crop) > 0) {
        dimnames_to_crop <- names(second_round_indices)[dims_to_crop]
        sub_array <- ClimProjDiags::Subset(sub_array, dimnames_to_crop, 
                                           second_round_indices[dimnames_to_crop])
aho's avatar
aho committed
      }
      if (debug) {
        if (all(unlist(store_indices[1:6]) == 1)) {
          print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER SUBSETTING WITH 2nd ROUND INDICES:")
          print(str(sub_array))
        }
      }
    }
    
    metadata <- attr(sub_array, 'variables')
    
    names_bk <- names(store_indices)
    store_indices <- lapply(names(store_indices), 
                            function (x) {
                              if (!(x %in% names(first_round_indices))) {
                                store_indices[[x]]
                              } else if (is.null(second_round_indices[[x]])) {
                                1:dim(sub_array)[x]
                              } else {
                                if (is.numeric(second_round_indices[[x]])) {
                                  ## TODO: Review carefully this line. Inner indices are all 
                                  ## aligned to the left-most positions. If dataset A has longitudes
                                  ## 1, 2, 3, 4 but dataset B has only longitudes 3 and 4, then
                                  ## they will be stored as follows:
                                  ## 1, 2, 3, 4
                                  ## 3, 4, NA, NA
                                  ##x - min(x) + 1
                                  1:length(second_round_indices[[x]])
                                } else {
                                  1:length(second_round_indices[[x]])
                                }
                              }
                            })
    names(store_indices) <- names_bk
    if (debug) {
      if (all(unlist(store_indices) == 1)) {
        print("-> STRUCTURE OF FIRST ROUND INDICES FOR THIS WORK PIECE:")
        print(str(first_round_indices))
        print("-> STRUCTURE OF SECOND ROUND INDICES FOR THIS WORK PIECE:")
        print(str(second_round_indices))
        print("-> STRUCTURE OF STORE INDICES FOR THIS WORK PIECE:")
        print(str(store_indices))
      }
    }
    
    store_indices <- lapply(store_indices, as.integer)
    store_dims <- work_piece[['store_dims']]
    
    # split the storage work of the loaded subset in parts
    largest_dim_name <- names(dim(sub_array))[which.max(dim(sub_array))]
    max_parts <- length(store_indices[[largest_dim_name]])
    
    # Indexing a data file of N MB with expand.grid takes 30*N MB
    # The peak ram of Start is, minimum, 2 * total data to load from all files
    # due to inefficiencies in other regions of the code
    # The more parts we split the indexing done below in, the lower
    # the memory footprint of the indexing and the fast. 
    # But more than 10 indexing iterations (parts) for each MB processed 
    # makes the iteration slower (tested empirically on BSC workstations).
    subset_size_in_mb <- prod(dim(sub_array)) * 8 / 1024 / 1024
    best_n_parts <- ceiling(subset_size_in_mb * 10)
    # We want to set n_parts to a greater value than the one that would 
    # result in a memory footprint (of the subset indexing code below) equal
    # to 2 * total data to load from all files.
    # s = subset size in MB
    # p = number of parts to break it in
    # T = total size of data to load
    # then, s / p * 30 = 2 * T
    # then, p = s * 15 / T
    min_n_parts <- ceiling(prod(dim(sub_array)) * 15 / prod(store_dims))
    # Make sure we pick n_parts much greater than the minimum calculated
    n_parts <- min_n_parts * 10
    if (n_parts > best_n_parts) {
      n_parts <- best_n_parts
    }
    # Boundary checks
    if (n_parts < 1) {
      n_parts <- 1
    }
    if (n_parts > max_parts) {
      n_parts <- max_parts
    }
    
    if (n_parts > 1) {
      make_parts <- function(length, n) {
        clusters <- cut(1:length, n, labels = FALSE)
        lapply(1:n, function(y) which(clusters == y))
      }
      part_indices <- make_parts(max_parts, n_parts)
      parts <- lapply(part_indices, 
                      function(x) {
                        store_indices[[largest_dim_name]][x]
                      })
    } else {
      part_indices <- list(1:max_parts)
      parts <- store_indices[largest_dim_name]
    }
    
    # do the storage work
    weights <- sapply(1:length(store_dims), 
                      function(i) prod(c(1, store_dims)[1:i]))
    part_indices_in_sub_array <- as.list(rep(TRUE, length(dim(sub_array))))
    names(part_indices_in_sub_array) <- names(dim(sub_array))
    data_array <- bigmemory::attach.big.matrix(shared_matrix_pointer)
    for (i in 1:n_parts) {
      store_indices[[largest_dim_name]] <- parts[[i]]
      # Converting array indices to vector indices
      matrix_indices <- do.call("expand.grid", store_indices)
      # Given a matrix where each row is a set of array indices of an element
      # the vector indices are computed
      matrix_indices <- 1 + colSums(t(matrix_indices - 1) * weights)
      part_indices_in_sub_array[[largest_dim_name]] <- part_indices[[i]]
      data_array[matrix_indices] <- as.vector(do.call('[',
                                                      c(list(x = sub_array), 
                                                        part_indices_in_sub_array)))
    }
    rm(data_array)
    gc()
    
    if (!is.null(work_piece[['save_metadata_in']])) {
      saveRDS(metadata, file = work_piece[['save_metadata_in']])
    }
  }
  if (!is.null(work_piece[['progress_amount']]) && !silent) {
    message(work_piece[['progress_amount']], appendLF = FALSE)
  }
  is.null(sub_array)
}