zzz.R 70.4 KB
Newer Older
    }
    j <- j + 1
  }

  return(work_pieces)
}

# Calculate the progress %s that will be displayed and assign them to the appropriate work pieces.
retrieve_progress_message <- function(work_pieces, num_procs, silent) {
  if (length(work_pieces) / num_procs >= 2 && !silent) {
    if (length(work_pieces) / num_procs < 10) {
      amount <- 100 / ceiling(length(work_pieces) / num_procs)
      reps <- ceiling(length(work_pieces) / num_procs)
    } else {
      amount <- 10
      reps <- 10
    }
    progress_steps <- rep(amount, reps)
    if (length(work_pieces) < (reps + 1)) {
      selected_pieces <- length(work_pieces)
      progress_steps <- c(sum(head(progress_steps, reps)),
                          tail(progress_steps, reps))
    } else {
      selected_pieces <- round(seq(1, length(work_pieces), 
                                   length.out = reps + 1))[-1]
    }
    progress_steps <- paste0(' + ', round(progress_steps, 2), '%')
    progress_message <- 'Progress: 0%'
  } else {
    progress_message <- ''
    selected_pieces <- NULL
  }
  piece_counter <- 1
  step_counter <- 1
  work_pieces <- lapply(work_pieces, 
                        function (x) {
                          if (piece_counter %in% selected_pieces) {
                            wp <- c(x, list(progress_amount = progress_steps[step_counter]))
                            step_counter <<- step_counter + 1
                          } else {
                            wp <- x
                          }
                          piece_counter <<- piece_counter + 1
                          wp
                        })
  if (!silent) {
    .message("If the size of the requested data is close to or above the free shared RAM memory, R may crash.")
    .message("If the size of the requested data is close to or above the half of the free RAM memory, R may crash.")
    .message(paste0("Will now proceed to read and process ", length(work_pieces), " data files:"))
    if (length(work_pieces) < 30) {
      lapply(work_pieces, function (x) .message(x[['file_path']], indent = 2))
    } else {
      .message("The list of files is long. You can check it after Start() finishes in the output '$Files'.", indent = 2, exdent = 5)
    }
  }
    
  # Build the cluster of processes that will do the work and dispatch work pieces.
  # The function .LoadDataFile is applied to each work piece. This function will
  # open the data file, regrid if needed, subset, apply the mask, 
  # compute and apply the weights if needed,
  # disable extreme values and store in the shared memory matrix.
  #print("O")
  if (!silent) {
    .message("Loading... This may take several minutes...")
    if (progress_message != '') {
      .message(progress_message, appendLF = FALSE)
    }
  }
  return(work_pieces)
}

# If merge_across_dims = TRUE and merge_across_dims_narm = TRUE, remove the additional NAs 
# due to unequal inner_dim ('time') length across file_dim ('sdate').
aho's avatar
aho committed
remove_additional_na_from_merge <- function(data_array = NULL, merge_dim_metadata = NULL,
                                            inner_dims_across_files, final_dims, length_inner_across_dim) {
  # data_array is a vector from bigmemory::as.matrix
  # merge_dim_metadata is an array

  across_file_dim <- names(inner_dims_across_files)  #TODO: more than one?
aho's avatar
aho committed
  across_inner_dim <- inner_dims_across_files[[1]]  #TODO: more than one?
  # Get the length of these two dimensions in final_dims
  length_inner_across_store_dims <- final_dims[across_inner_dim]
  length_file_across_store_dims <- final_dims[across_file_dim]
       
  # Create a logical array for merge_across_dims
  logi_array <- array(rep(FALSE,
                          length_file_across_store_dims * length_inner_across_store_dims),
                      dim = c(length_inner_across_store_dims, length_file_across_store_dims))
  for (i in 1:length_file_across_store_dims) {  #1:4
    logi_array[1:length_inner_across_dim[[i]], i] <- TRUE
  }
 
  if (!is.null(data_array)) {
    # First, turn the data vector into array with final_dims
    data_array_final_dims <- array(data_array, dim = final_dims)
  }

  # Change the NA derived from additional spaces to -9999, then remove these -9999
  func_remove_blank <- function(data_array, logi_array) {
    # dim(data_array) = [time, file_date]
    # dim(logi_array) = [time, file_date]
    # data_array can be data or metadata; if data, change the blank spaces from 
    # NA to -9999; if metadata (supposed to be 'time'), change the corresponding
    # spaces to -12^10.
aho's avatar
aho committed
    if (is(data_array, "POSIXct")) {
      # change to numeric first
      data_array <- array(as.vector(data_array), dim = dim(data_array))
aho's avatar
aho committed
      data_array[which(!logi_array)] <- -12^10
    } else {
      data_array[which(!logi_array)] <- -9999
    }
    return(data_array)
  }

  if (!is.null(data_array)) {
    data_array_final_dims <- multiApply::Apply(data_array_final_dims,
                                               target_dims = c(across_inner_dim, across_file_dim),  #c('time', 'file_date')
                                               output_dims = c(across_inner_dim, across_file_dim),
                                               fun = func_remove_blank,
                                               logi_array = logi_array)$output1
  }
  if (!is.null(merge_dim_metadata)) {
    tmp_attr <- attributes(merge_dim_metadata)$variables
    merge_dim_metadata <- multiApply::Apply(merge_dim_metadata,
                            target_dims = c(across_inner_dim, across_file_dim),
                            output_dims = c(across_inner_dim, across_file_dim),
                            fun = func_remove_blank,
                            logi_array = logi_array)$output1
  }

  if (!is.null(data_array)) {
    ## reorder back to the correct dim
    tmp <- match(names(final_dims), names(dim(data_array_final_dims)))
    data_array_final_dims <- .aperm2(data_array_final_dims, tmp)
    data_array_tmp <- data_array_final_dims[data_array_final_dims != -9999]  # become a vector
  } else {
    data_array_tmp <- NULL
  }
  if (!is.null(merge_dim_metadata)) {
    # Reorder metadata dim as final dim
    tmp <- match(names(final_dims), names(dim(merge_dim_metadata)))
    merge_dim_metadata <- aperm(merge_dim_metadata, tmp[!is.na(tmp)])
    merge_dim_metadata <- merge_dim_metadata[merge_dim_metadata != -12^10]
    attr(merge_dim_metadata, 'variables') <- tmp_attr
  }
  #NOTE: both outputs are vectors. If 'merge_dim_metadata' is actually time, it is just numeric here.
  return(list(data_array = data_array_tmp, merge_dim_metadata = merge_dim_metadata))
}


# When merge_across_dims = TRUE and split_multiselected_dims = TRUE, rearrange the chunks 
# (i.e., work_piece) is necessary if one file contains values for discrete dimensions
rebuild_array_merge_split <- function(data_array = NULL, metadata = NULL, indices_chunk,
                                      all_split_dims, final_dims_fake, across_inner_dim, length_inner_across_dim) {

  rebuild_data <- ifelse(is.null(data_array), FALSE, TRUE)
  rebuild_metadata <- ifelse(is.null(metadata), FALSE, TRUE)

  # generate the correct order list from indices_chunk 
  final_order_list <- list()
  i <- 1
  j <- 1
  a <- indices_chunk[i]
  while (i <= length(indices_chunk)) {
    while (indices_chunk[i+1] == indices_chunk[i] & i < length(indices_chunk)) {
      a <- c(a, indices_chunk[i+1])
        i <- i + 1
    }
    final_order_list[[j]] <- a
    a <- indices_chunk[i+1]
    i <- i + 1
    j <- j + 1
  }
  names(final_order_list) <- sapply(final_order_list, '[[', 1)
  final_order_list <- lapply(final_order_list, length)
        
  if (!all(diff(as.numeric(names(final_order_list))) > 0)) {
    # shape the vector into the array without split_dims
    split_dims_pos <- match(names(all_split_dims[[1]]), names(final_dims_fake))
    new_dims <- c()
    if (split_dims_pos[1] > 1) {
      new_dims <- c(new_dims, final_dims_fake[1:(split_dims_pos[1] - 1)])
    }
    new_dims <- c(new_dims,  prod(all_split_dims[[1]]))
    names(new_dims)[split_dims_pos[1]] <- across_inner_dim
    if (split_dims_pos[length(split_dims_pos)] < length(final_dims_fake)) {
      new_dims <- c(new_dims, final_dims_fake[(split_dims_pos[length(split_dims_pos)] + 1):length(final_dims_fake)])
    }
    if (rebuild_data) {
      data_array <- array(data_array, dim = new_dims)
      # seperate 'time' dim into each work_piece length
      data_array_seperate <- vector('list', length = length(length_inner_across_dim))
      array_piece <- vector('list', length = length(final_order_list))
    if (rebuild_metadata) {
      metadata <- array(metadata, dim = length(metadata)) #metadata_no_split
      names(dim(metadata)) <- across_inner_dim
      metadata_seperate <- vector('list', length = length(length_inner_across_dim))
      metadata_piece <- vector('list', length = length(final_order_list))
    tmp <- cumsum(unlist(length_inner_across_dim))
    tmp <- c(0, tmp)
    for (i in 1:length(length_inner_across_dim)) {
      if (rebuild_data) {
        data_array_seperate[[i]] <- ClimProjDiags::Subset(data_array,
                                                          across_inner_dim,
                                                          (tmp[i] + 1):tmp[i + 1])
      }
      if (rebuild_metadata) {
        metadata_seperate[[i]] <- ClimProjDiags::Subset(metadata,
                                                        across_inner_dim,
                                                        (tmp[i] + 1):tmp[i + 1])
    # re-build the array: chunk 
    which_chunk <- as.numeric(names(final_order_list))
    sort_which_chunk <- sort(unique(which_chunk))
    which_chunk <- sapply(lapply(which_chunk, '==', sort_which_chunk), which)
    how_many_indices <- unlist(final_order_list)
    if (rebuild_data) {
      ind_in_array_seperate <- as.list(rep(1, length(data_array_seperate)))
    } else if (rebuild_metadata) {
      ind_in_array_seperate <- as.list(rep(1, length(metadata_seperate)))
    }

    for (i in 1:length(final_order_list)) {
      if (rebuild_data) {
        array_piece[[i]] <- ClimProjDiags::Subset(
                              data_array_seperate[[which_chunk[i]]], across_inner_dim,
                              ind_in_array_seperate[[which_chunk[i]]]:(ind_in_array_seperate[[which_chunk[i]]] + how_many_indices[i] - 1))
      }
      if (rebuild_metadata) {
        metadata_piece[[i]] <- ClimProjDiags::Subset(
                                 metadata_seperate[[which_chunk[i]]], across_inner_dim,
                                 ind_in_array_seperate[[which_chunk[i]]]:(ind_in_array_seperate[[which_chunk[i]]] + how_many_indices[i] - 1))
      }
      ind_in_array_seperate[[which_chunk[i]]] <- ind_in_array_seperate[[which_chunk[i]]] + how_many_indices[i]
    }
    # re-build the array: paste
    if (rebuild_data) {
      data_array_tmp <- array_piece[[1]]
    } else {
      data_array_tmp <- NULL
    }
    if (rebuild_metadata) {
      metadata_tmp <- metadata_piece[[1]]
    } else {
      metadata_tmp <- NULL
    }

    if (rebuild_data) {
      along_pos <- which(names(dim(data_array_tmp)) == across_inner_dim)
      length_piece <- length(array_piece)
    } 
    if (rebuild_metadata) {
      along_pos_metadata <- which(names(dim(metadata_tmp)) == across_inner_dim)
      if (!rebuild_data)
        length_piece <- length(metadata_piece)
    }

    if (length_piece > 1) {
      for (i in 2:length_piece) {
        if (rebuild_data) {
          data_array_tmp <- abind::abind(data_array_tmp, array_piece[[i]],
                                         along = along_pos)
        }
        if (rebuild_metadata) {
          metadata_tmp <- abind::abind(metadata_tmp, metadata_piece[[i]],
                                       along = along_pos_metadata)
  } else {
    data_array_tmp <- data_array
    metadata_tmp <- metadata
  }
  return(list(data_array = data_array_tmp, metadata = metadata_tmp))
}


# Create a list of metadata of the variable (e.g., tas)
create_metadata_list <- function(array_of_metadata_flags, metadata_dims, pattern_dims,
                                 loaded_metadata_files, loaded_metadata, dat_names,
                                 dataset_has_files) {
  #NOTE: Here, metadata can be saved in one of two ways: one for $common and the other for $dat
  #      for $common, it is a list of metadata length. For $dat, it is a list of dat length,
  #      and each sublist has the metadata for each dat.
aho's avatar
aho committed
  dim_of_metadata <- dim(array_of_metadata_flags)[metadata_dims]
  if (!any(names(dim_of_metadata) == pattern_dims) |
      (any(names(dim_of_metadata) == pattern_dims) &
       dim_of_metadata[pattern_dims] == 1)) {  # put under $common; old code
    return_metadata <- vector('list',
                              length = prod(dim_of_metadata))
    return_metadata[as.numeric(loaded_metadata_files)] <- loaded_metadata
    dim(return_metadata) <- dim_of_metadata

  } else { # put under $dat. metadata_dims has 'dat' and dat length > 1
    return_metadata <- vector('list',
                              length = dim_of_metadata[pattern_dims])
    names(return_metadata) <- dat_names
    for (kk in 1:length(return_metadata)) {
      return_metadata[[kk]] <- vector('list', length = prod(dim_of_metadata[-1])) # 1 is dat
    }
    loaded_metadata_count <- 1
    for (kk in 1:length(return_metadata)) {
      for (jj in 1:length(return_metadata[[kk]])) {
        if (dataset_has_files[kk]) {
          if (loaded_metadata_count %in% loaded_metadata_files) {
            return_metadata[[kk]][jj] <- loaded_metadata[[which(loaded_metadata_files == loaded_metadata_count)]]
            names(return_metadata[[kk]])[jj] <- names(loaded_metadata[[which(loaded_metadata_files == loaded_metadata_count)]])

          } else {
            return_metadata[[kk]][jj] <- NULL
aho's avatar
aho committed
          loaded_metadata_count <- loaded_metadata_count + 1
        } else {
          return_metadata[[kk]][jj] <- NULL

  return(return_metadata)
}

# This function adds the metadata of the variable (e.g., tas) into the list of picked_vars or
# picked_common_vars. The metadata is only retrieved when 'retrieve = TRUE'.
combine_metadata_picked_vars <- function(return_metadata, picked_vars, picked_common_vars,
                                         metadata_dims, pattern_dims, length_dat) {
#NOTE: The metadata of variables can be saved in one of the two different structures.
#      (1) metadata_dims != 'dat', or (metadata_dims == 'dat' & length(dat) == 1):
#          put under $common
#      (2) (metadata_dims == 'dat' & length(dat) > 1):
#          put under $dat1, $dat2, .... Put it in picked_vars list
#TODO: The current (2) uses the inefficient method. Should define the list structure first
#      then fill the list, rather than expand it in the for loop.

  if (any(metadata_dims == pattern_dims) & length_dat > 1) { # (2)
    for (kk in 1:length(return_metadata)) {
      sublist_names <- lapply(return_metadata, names)[[kk]]
      if (!is.null(sublist_names)) {
        for (jj in 1:length(sublist_names)) {
          picked_vars[[kk]][[sublist_names[jj]]] <- return_metadata[[kk]][[jj]]
        }
      }
    }
    Variables_list <- c(list(common = picked_common_vars), picked_vars)

  } else {  #(1)
    len <- unlist(lapply(return_metadata, length))
    len <- sum(len) + length(which(len == 0))  #0 means NULL
    name_list <- lapply(return_metadata, names)
    new_list <- vector('list', length = len)
    count <- 1

    for (kk in 1:length(return_metadata)) {
      if (length(return_metadata[[kk]]) == 0) {  #NULL
        count <- count + 1
      } else {
        for (jj in 1:length(return_metadata[[kk]])) {
          new_list[[count]] <- return_metadata[[kk]][[jj]]
          names(new_list)[count] <- name_list[[kk]][jj]
          count <- count + 1
        }
      }
    }
    Variables_list <- c(list(common = c(picked_common_vars, new_list)), picked_vars)
  }

  return(Variables_list)
}

# This function generates a list of 3, containing picked(_common)_vars, 
# picked(_common)_vars_ordered, and picked(_common)_vars_unorder_indices for the 'var_to_read'
# of this dataset (i) and file (j).
generate_picked_var_of_read <- function(var_to_read, var_to_check, array_of_files_to_load,
                                        var_dims, array_of_var_files, file_var_reader,
                                        file_object, synonims, associated_dim_name,
                                        dim_reorder_params, aiat, current_indices, var_params,
                                        either_picked_vars,
                                        either_picked_vars_ordered,
                                        either_picked_vars_unorder_indices) {
  var_file_dims <- NULL

  if (any(names(dim(array_of_files_to_load)) %in% var_to_check)) {
    var_file_dims <- dim(array_of_files_to_load)[which(names(dim(array_of_files_to_load)) %in%
                                                       var_to_check)]
  }
  if (is.null(either_picked_vars)) {

    if (any(names(var_file_dims) %in% names(var_dims))) {
      stop("Found a requested var in 'return_var' requested for a ",
           "file dimension which also appears in the dimensions of ",
           "the variable inside the file.\n", array_of_var_files)
    }
    first_sample <- file_var_reader(NULL, file_object, NULL,
                                    var_to_read, synonims)
    if (any(class(first_sample) %in% names(time_special_types()))) {
      array_size <- prod(c(var_file_dims, var_dims))
      new_array <- rep(time_special_types()[[class(first_sample)[1]]](NA), array_size)
      dim(new_array) <- c(var_file_dims, var_dims)
    } else {
      new_array <- array(dim = c(var_file_dims, var_dims))
    }
    attr(new_array, 'variables') <- attr(first_sample, 'variables')

    either_picked_vars <- new_array
    pick_ordered <- FALSE
    if (var_to_read %in% unlist(var_params)) {
      if (associated_dim_name %in% names(dim_reorder_params) && !aiat) {
        either_picked_vars_ordered <- new_array
        pick_ordered <- TRUE
      }
    }
    if (!pick_ordered) {
      either_picked_vars_ordered <- NULL
    }

  } else {
    array_var_dims <- dim(either_picked_vars)
    full_array_var_dims <- array_var_dims
    if (any(names(array_var_dims) %in% names(var_file_dims))) {
      array_var_dims <- array_var_dims[-which(names(array_var_dims) %in% names(var_file_dims))]
    }
    if (any(names(array_var_dims) != names(var_dims))) {
      stop("Error while reading the variable '", var_to_read, "' from ",
           "the file. Dimensions do not match.\nExpected ",
           paste(paste0("'", names(array_var_dims), "'"), collapse = ', '),
           " but found ",
           paste(paste0("'", names(var_dims), "'"), collapse = ', '),
aho's avatar
aho committed
           ".\n", array_of_var_files)
    }
    if (any(var_dims > array_var_dims)) {
      longer_dims <- which(var_dims > array_var_dims)
      if (length(longer_dims) == 1) {
        longer_dims_in_full_array <- longer_dims
        if (any(names(full_array_var_dims) %in% names(var_file_dims))) {
          candidates <- (1:length(full_array_var_dims))[-which(names(full_array_var_dims) %in% names(var_file_dims))]
          longer_dims_in_full_array <- candidates[longer_dims]
        }
        padding_dims <- full_array_var_dims
        padding_dims[longer_dims_in_full_array] <-
          var_dims[longer_dims] - array_var_dims[longer_dims]

        var_class <- class(either_picked_vars)
        if (any(var_class %in% names(time_special_types()))) {
          padding_size <- prod(padding_dims)
          padding <- rep(time_special_types()[[var_class[1]]](NA), padding_size)
          dim(padding) <- padding_dims
        } else {
          padding <- array(dim = padding_dims)
        }
        tmp_attr <- attributes(either_picked_vars)$variables
        either_picked_vars <- .abind2(either_picked_vars, padding,
                                      names(full_array_var_dims)[longer_dims_in_full_array])
        attr(either_picked_vars, 'variables') <- tmp_attr

      } else {
        stop("Error while reading the variable '", var_to_read, "' from ",
             "the file. Found size (", paste(var_dims, collapse = ' x '),
             ") is greater than expected maximum size (", array_var_dims, ").")
      }
    }
  }

  var_store_indices <- c(as.list(current_indices[names(var_file_dims)]),
                         lapply(var_dims, function(x) 1:x))
  var_values <- file_var_reader(NULL, file_object, NULL, var_to_read, synonims)
  if (var_to_read %in% unlist(var_params)) {
    if ((associated_dim_name %in% names(dim_reorder_params)) && !aiat) {
      ## Is this check really needed?
      if (length(dim(var_values)) > 1) {
        stop("Requested a '", associated_dim_name, "_reorder' for a dimension ",
             "whose coordinate variable that has more than 1 dimension. This is ",
             "not supported.")
      }
      ordered_var_values <- dim_reorder_params[[associated_dim_name]](var_values)
      attr(ordered_var_values$x, 'variables') <- attr(var_values, 'variables')
      if (!all(c('x', 'ix') %in% names(ordered_var_values))) {
        stop("All the dimension reorder functions must return a list with the components 'x' and 'ix'.")
      }
      # Save the indices to reorder the ordered variable values back to original order.
      # 'unorder' refers to the indices of 'ordered_var_values' if it is unordered.
      # This will be used to define the first round indices.
      unorder <- sort(ordered_var_values$ix, index.return = TRUE)$ix
      either_picked_vars_ordered <- do.call('[<-',
                                            c(list(x = either_picked_vars_ordered),
                                                   var_store_indices,
                                                   list(value = ordered_var_values$x)))
      either_picked_vars_unorder_indices <- do.call('[<-',
                                            c(list(x = either_picked_vars_unorder_indices),
                                                   var_store_indices,
                                                   list(value = unorder)))


    }
  }

  either_picked_vars <- do.call('[<-',
                                c(list(x = either_picked_vars),
                                       var_store_indices,
                                       list(value = var_values)))
  # Turn time zone back to UTC if this var_to_read is 'time'
  if (all(class(either_picked_vars) == names(time_special_types))) {
    attr(either_picked_vars, "tzone") <- 'UTC'
  }


  return(list(either_picked_vars = either_picked_vars,
              either_picked_vars_ordered = either_picked_vars_ordered,
              either_picked_vars_unorder_indices = either_picked_vars_unorder_indices))
}


# Trnasforms a vector of indices v expressed in a world of 
# length N from 1 to N, into a world of length M, from
# 1 to M. Repeated adjacent indices are collapsed.
transform_indices <- function(v, n, m) {
  #unique2 turns e.g. 1 1 2 2 2 3 3 1 1 1 into 1 2 3 1
  unique2 <- function(v) {
    if (length(v) < 2) {
      v
    } else {
      v[c(1, v[2:length(v)] - v[1:(length(v) - 1)]) != 0]
    }
  }
  unique2(round(((v - 1) / (n - 1)) * (m - 1))) + 1 # this rounding may generate 0s. what then?
}

aho's avatar
aho committed
replace_character_with_indices <- function(selectors, data_dims, chunk_amount) {
aho's avatar
aho committed
  if (selectors == 'all') {
aho's avatar
aho committed
    selectors <- indices(1:(data_dims * chunk_amount))
aho's avatar
aho committed
  } else if (selectors == 'first') {
    selectors <- indices(1)
  } else if (selectors == 'last') {
aho's avatar
aho committed
    selectors <- indices(data_dims * chunk_amount)
aho's avatar
aho committed
  }
  return(selectors)
}