Start.R 129 KB
Newer Older
                dims_to_crop[[file_dim]] <- c(dims_to_crop[[file_dim]], list(chunks_to_keep))
#                found_indices <- Subset(found_indices, file_dim, chunks_to_keep)
#                # Crop dataset variables file dims.
#                for (picked_var in names(picked_vars[[i]])) {
#                  if (file_dim %in% names(dim(picked_vars[[i]][[picked_var]]))) {
#                    picked_vars[[i]][[picked_var]] <- Subset(picked_vars[[i]][[picked_var]], file_dim, chunks_to_keep)
#                  }
#                }
              }
            #}
            dat[[i]][['selectors']][[inner_dim]] <- list(fri = fri, sri = sri)
            # Crop dataset variables inner dims.
            # Crop common variables inner dims.
            types_of_var_to_crop <- 'picked'
            if (with_transform) {
              types_of_var_to_crop <- c(types_of_var_to_crop, 'transformed')
            }
            if (!is.null(dim_reorder_params[[inner_dim]])) {
              types_of_var_to_crop <- c(types_of_var_to_crop, 'reordered')
            }
            for (type_of_var_to_crop in types_of_var_to_crop) {
              if (type_of_var_to_crop == 'transformed') {
                if (is.null(tvi)) {
                  if (!is.null(dim_reorder_params[[inner_dim]])) {
                    crop_indices <- unique(unlist(ordered_sri))
                  } else {
                    crop_indices <- unique(unlist(sri))
                  }
                } else {
                  crop_indices <- unique(unlist(tvi))
                }
                vars_to_crop <- transformed_vars[[i]]
                common_vars_to_crop <- transformed_common_vars
              } else if (type_of_var_to_crop == 'reordered') {
                crop_indices <- unique(unlist(ordered_fri))
                vars_to_crop <- picked_vars_ordered[[i]]
                common_vars_to_crop <- picked_common_vars_ordered
              } else {
                crop_indices <- unique(unlist(fri))
                vars_to_crop <- picked_vars[[i]]
                common_vars_to_crop <- picked_common_vars
              }
              for (var_to_crop in names(vars_to_crop)) {
                if (inner_dim %in% names(dim(vars_to_crop[[var_to_crop]]))) {
                  if (!is.null(crop_indices)) {
                    if (type_of_var_to_crop == 'transformed') {
Nicolau Manubens's avatar
Nicolau Manubens committed
                      if (!aiat) {
                        vars_to_crop[[var_to_crop]] <- Subset(transformed_subset_var, inner_dim, crop_indices)
                      } else {
                        vars_to_crop[[var_to_crop]] <- Subset(vars_to_crop[[var_to_crop]], inner_dim, crop_indices)
                      }
                    } else {
                      vars_to_crop[[var_to_crop]] <- Subset(vars_to_crop[[var_to_crop]], inner_dim, crop_indices)
                    }
                  }
                }
              }
              if (i == length(dat)) {
                for (common_var_to_crop in names(common_vars_to_crop)) {
                  if (inner_dim %in% names(dim(common_vars_to_crop[[common_var_to_crop]]))) {
                    common_vars_to_crop[[common_var_to_crop]] <- Subset(common_vars_to_crop[[common_var_to_crop]], inner_dim, crop_indices)
                  }
                }
              }
              if (type_of_var_to_crop == 'transformed') {
                if (!is.null(vars_to_crop)) {
                  transformed_vars[[i]] <- vars_to_crop
                }
                if (i == length(dat)) {
                  transformed_common_vars <- common_vars_to_crop
                }
              } else if (type_of_var_to_crop == 'reordered') {
                if (!is.null(vars_to_crop)) {
                  picked_vars_ordered[[i]] <- vars_to_crop
                }
                if (i == length(dat)) {
                  picked_common_vars_ordered <- common_vars_to_crop
                }
              } else {
                if (!is.null(vars_to_crop)) {
                  picked_vars[[i]] <- vars_to_crop
                }
                if (i == length(dat)) {
                  picked_common_vars <- common_vars_to_crop
                }
              }
            }
Nicolau Manubens Gil's avatar
Nicolau Manubens Gil committed
          #}
        }
        # After the selectors have been picked (using the original variables), 
        # the variables are transformed. At that point, the original selectors
        # for the transformed variables are also kept in the variable original_selectors.
#print("L")
      }
    }
  }
#  if (!is.null(transformed_common_vars)) {
#    picked_common_vars[names(transformed_common_vars)] <- transformed_common_vars
#  }
  # Remove the trailing chunks, if any.
  for (file_dim in names(dims_to_crop)) {
#    indices_to_keep <- min(sapply(dims_to_crop[[file_dim]], min)):max(sapply(dims_to_crop[[file_dim]], max))
    ## TODO: Merge indices in dims_to_crop with some advanced mechanism?
    indices_to_keep <- unique(unlist(dims_to_crop[[file_dim]]))
    array_of_files_to_load <- Subset(array_of_files_to_load, file_dim, indices_to_keep)
    array_of_not_found_files <- Subset(array_of_not_found_files, file_dim, indices_to_keep)
    for (i in 1:length(dat)) {
      # Crop selectors
      for (selector_dim in names(dat[[i]][['selectors']])) {
        if (selector_dim == file_dim) {
          for (j in 1:length(dat[[i]][['selectors']][[selector_dim]][['fri']])) {
            dat[[i]][['selectors']][[selector_dim]][['fri']][[j]] <- dat[[i]][['selectors']][[selector_dim]][['fri']][[j]][indices_to_keep]
          }
          for (j in 1:length(dat[[i]][['selectors']][[selector_dim]][['sri']])) {
            dat[[i]][['selectors']][[selector_dim]][['sri']][[j]] <- dat[[i]][['selectors']][[selector_dim]][['sri']][[j]][indices_to_keep]
          }
        }
        if (file_dim %in% names(dim(dat[[i]][['selectors']][[selector_dim]][['fri']]))) {
          dat[[i]][['selectors']][[selector_dim]][['fri']] <- Subset(dat[[i]][['selectors']][[selector_dim]][['fri']], file_dim, indices_to_keep)
          dat[[i]][['selectors']][[selector_dim]][['sri']] <- Subset(dat[[i]][['selectors']][[selector_dim]][['sri']], file_dim, indices_to_keep)
        }
      }
      # Crop dataset variables file dims.
      for (picked_var in names(picked_vars[[i]])) {
        if (file_dim %in% names(dim(picked_vars[[i]][[picked_var]]))) {
          picked_vars[[i]][[picked_var]] <- Subset(picked_vars[[i]][[picked_var]], file_dim, indices_to_keep)
        }
      }
      for (transformed_var in names(transformed_vars[[i]])) {
        if (file_dim %in% names(dim(transformed_vars[[i]][[transformed_var]]))) {
          transformed_vars[[i]][[transformed_var]] <- Subset(transformed_vars[[i]][[transformed_var]], file_dim, indices_to_keep)
        }
      }
    }
    # Crop common variables file dims.
Nicolau Manubens's avatar
Nicolau Manubens committed
    for (picked_common_var in names(picked_common_vars)) {
2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488
      if (file_dim %in% names(dim(picked_common_vars[[picked_common_var]]))) {
        picked_common_vars[[picked_common_var]] <- Subset(picked_common_vars[[picked_common_var]], file_dim, indices_to_keep)
      }
    }
    for (transformed_common_var in names(transformed_common_vars)) {
      if (file_dim %in% names(dim(transformed_common_vars[[transformed_common_var]]))) {
        transformed_common_vars[[transformed_common_var]] <- Subset(transformed_common_vars[[transformed_common_var]], file_dim, indices_to_keep)
      }
    }
  }
  # Calculate the size of the final array.
  total_inner_dims <- NULL
  for (i in 1:length(dat)) {
    if (dataset_has_files[i]) {
      inner_dims <- expected_inner_dims[[i]]
      inner_dims <- sapply(inner_dims, 
        function(x) {
          if (!all(sapply(dat[[i]][['selectors']][[x]][['sri']], is.null))) {
            max(sapply(dat[[i]][['selectors']][[x]][['sri']], length))
          } else {
            if (length(var_params[[x]]) > 0) {
              if (var_params[[x]] %in% names(transformed_vars[[i]])) {
                length(transformed_vars[[i]][[var_params[[x]]]])
              } else if (var_params[[x]] %in% names(transformed_common_vars)) {
                length(common_transformed_vars[[var_params[[x]]]])
              } else {
                max(sapply(dat[[i]][['selectors']][[x]][['fri']], length))
              }
            } else {
              max(sapply(dat[[i]][['selectors']][[x]][['fri']], length))
            }
          }
        })
      names(inner_dims) <- expected_inner_dims[[i]]
      if (is.null(total_inner_dims)) {
        total_inner_dims <- inner_dims
      } else {
        new_dims <- .MergeArrayDims(total_inner_dims, inner_dims)
        total_inner_dims <- pmax(new_dims[[1]], new_dims[[2]])
      }
    }
  }
  new_dims <- .MergeArrayDims(dim(array_of_files_to_load), total_inner_dims)
  final_dims <- pmax(new_dims[[1]], new_dims[[2]])[dim_names]
  # TODO: try performance of storing all in cols instead of rows
  # Create the shared memory array, and a pointer to it, to be sent
  # to the work pieces.
  data_array <- big.matrix(nrow = prod(final_dims), ncol = 1)
  shared_matrix_pointer <- describe(data_array)
  if (is.null(num_procs)) {
    num_procs <- ceiling(availableCores() / 2)
  }
  # Build the work pieces, each with:
  # - file path
  # - total size (dims) of store array
  # - start position in store array
  # - file selectors (to provide extra info. useful e.g. to select variable)
  # - indices to take from file
  work_pieces <- list()
  for (i in 1:length(dat)) {
    if (dataset_has_files[i]) {
      selectors <- dat[[i]][['selectors']]
      file_dims <- found_file_dims[[i]]
      inner_dims <- expected_inner_dims[[i]]
      sub_array_dims <- final_dims[file_dims]
      sub_array_dims[found_pattern_dim] <- 1
      sub_array_of_files_to_load <- array(1:prod(sub_array_dims), 
                                          dim = sub_array_dims,
                                          dimnames = names(sub_array_dims))
      # Detect which of the dimensions of the dataset go across files.
      file_dim_across_files <- lapply(inner_dims, 
        function(x) {
          dim_across <- sapply(inner_dims_across_files, function(y) x %in% y)
          if (any(dim_across)) {
            names(inner_dims_across_files)[which(dim_across)[1]]
          } else {
            NULL
          }
        })
      names(file_dim_across_files) <- inner_dims
      j <- 1
      while (j <= prod(sub_array_dims)) {
        # Work out file path.
        file_to_load_sub_indices <- which(sub_array_of_files_to_load == j, arr.ind = TRUE)[1, ]
        names(file_to_load_sub_indices) <- names(sub_array_dims)
        file_to_load_sub_indices[found_pattern_dim] <- i
        big_dims <- rep(1, length(dim(array_of_files_to_load)))
        names(big_dims) <- names(dim(array_of_files_to_load))
        file_to_load_indices <- .MergeArrayDims(file_to_load_sub_indices, big_dims)[[1]]
        file_to_load <- do.call('[[', c(list(array_of_files_to_load), 
                                        as.list(file_to_load_indices)))
        not_found_file <- do.call('[[', c(list(array_of_not_found_files),
                                          as.list(file_to_load_indices)))

        if (!is.na(file_to_load) && !not_found_file) {
          # Work out indices to take
          first_round_indices <- lapply(inner_dims, 
            function (x) {
              if (is.null(file_dim_across_files[[x]])) {
                selectors[[x]][['fri']][[1]]
              } else {
                var_file_dims <- dim(selectors[inner_dims][[x]][['fri']])
                which_indices <- file_to_load_sub_indices[which(names(sub_array_dims) %in% names(var_file_dims))] 
                do.call('[[', c(list(selectors[[x]][['fri']]), as.list(which_indices)))
              }
            })
          names(first_round_indices) <- inner_dims
          second_round_indices <- lapply(inner_dims, 
            function (x) {
              if (is.null(file_dim_across_files[[x]])) {
                selectors[[x]][['sri']][[1]]
              } else {
                var_file_dims <- dim(selectors[inner_dims][[x]][['sri']])
                which_indices <- file_to_load_sub_indices[which(names(sub_array_dims) %in% names(var_file_dims))] 
                do.call('[[', c(list(selectors[[x]][['sri']]), as.list(which_indices)))
              }
            })
if (debug) {
print("-> BUILDING A WORK PIECE")
#print(str(selectors))
}
          names(second_round_indices) <- inner_dims
          if (!any(sapply(first_round_indices, length) == 0)) {
            work_piece <- list()
            work_piece[['first_round_indices']] <- first_round_indices
            work_piece[['second_round_indices']] <- second_round_indices
            work_piece[['file_indices_in_array_of_files']] <- file_to_load_indices
            work_piece[['file_path']] <- file_to_load
            work_piece[['store_dims']] <- final_dims
            # Work out store position
            store_position <- final_dims
            store_position[names(file_to_load_indices)] <- file_to_load_indices
            store_position[inner_dims] <- rep(1, length(inner_dims))
            work_piece[['store_position']] <- store_position
            # Work out file selectors
            file_selectors <- sapply(file_dims, 
              function (x) {
                vector_to_pick <- 1
                if (x %in% names(depending_file_dims)) {
                  vector_to_pick <- file_to_load_indices[depending_file_dims[[x]]]
                }
                selectors[file_dims][[x]][[vector_to_pick]][file_to_load_indices[x]]
              })
            names(file_selectors) <- file_dims
            work_piece[['file_selectors']] <- file_selectors
            # Send variables for transformation
            if (!is.null(transform) && (length(transform_vars) > 0)) {
              vars_to_transform <- NULL
              picked_vars_to_transform <- which(names(picked_vars[[i]]) %in% transform_vars)
              if (length(picked_vars_to_transform) > 0) {
                picked_vars_to_transform <- names(picked_vars[[i]])[picked_vars_to_transform]
                vars_to_transform <- c(vars_to_transform, picked_vars[[i]][picked_vars_to_transform])
                if (any(picked_vars_to_transform %in% names(picked_vars_ordered[[i]]))) {
                  picked_vars_ordered_to_transform <- picked_vars_to_transform[which(picked_vars_to_transform %in% names(picked_vars_ordered[[i]]))]
                  vars_to_transform[picked_vars_ordered_to_transform] <- picked_vars_ordered[[i]][picked_vars_ordered_to_transform]
                }
              }
              picked_common_vars_to_transform <- which(names(picked_common_vars) %in% transform_vars)
              if (length(picked_common_vars_to_transform) > 0) {
                picked_common_vars_to_transform <- names(picked_common_vars)[picked_common_vars_to_transform]
                vars_to_transform <- c(vars_to_transform, picked_common_vars[picked_common_vars_to_transform])
                if (any(picked_common_vars_to_transform %in% names(picked_common_vars_ordered))) {
                  picked_common_vars_ordered_to_transform <- picked_common_vars_to_transform[which(picked_common_vars_to_transform %in% names(picked_common_vars_ordered))]
                  vars_to_transform[picked_common_vars_ordered_to_transform] <- picked_common_vars_ordered[picked_common_vars_ordered_to_transform]
                }
              }
              work_piece[['vars_to_transform']] <- vars_to_transform
            }
            work_pieces <- c(work_pieces, list(work_piece))
          }
        }
        j <- j + 1
      }
    }
  }
#print("N")
if (debug) {
print("-> WORK PIECES BUILT")
}

  # Calculate the progress %s that will be displayed and assign them to 
  # the appropriate work pieces.
  if (length(work_pieces) / num_procs >= 2 && !silent) {
    if (length(work_pieces) / num_procs < 10) {
      amount <- 100 / ceiling(length(work_pieces) / num_procs)
      reps <- ceiling(length(work_pieces) / num_procs)
    } else {
      amount <- 10
      reps <- 10
    }
    progress_steps <- rep(amount, reps)
    if (length(work_pieces) < (reps + 1)) {
      selected_pieces <- length(work_pieces)
      progress_steps <- c(sum(head(progress_steps, reps)),
                          tail(progress_steps, reps))
    } else {
      selected_pieces <- round(seq(1, length(work_pieces), 
                                   length.out = reps + 1))[-1]
    }
    progress_steps <- paste0(' + ', round(progress_steps, 2), '%')
    progress_message <- 'Progress: 0%'
  } else {
    progress_message <- ''
    selected_pieces <- NULL
  }
  piece_counter <- 1
  step_counter <- 1
  work_pieces <- lapply(work_pieces, 
    function (x) {
      if (piece_counter %in% selected_pieces) {
        wp <- c(x, list(progress_amount = progress_steps[step_counter]))
        step_counter <<- step_counter + 1
      } else {
        wp <- x
      }
      piece_counter <<- piece_counter + 1
      wp
    })
  if (!silent) {
    .message("Detected dimension sizes:")
    longest_dim_len <- max(sapply(names(final_dims), nchar))
    longest_size_len <- max(sapply(paste0(final_dims, ''), nchar))
    sapply(names(final_dims), 
      function(x) {
        message(paste0("*   ", paste(rep(' ', longest_dim_len - nchar(x)), collapse = ''), 
                       x, ": ", paste(rep(' ', longest_size_len - nchar(paste0(final_dims[x], ''))), collapse = ''), 
                       final_dims[x]))
      })
    bytes <- prod(c(final_dims, 8))
    dim_sizes <- paste(final_dims, collapse = ' x ')
    .message(paste("Total size of requested data:"))
    .message(paste(dim_sizes, " x 8 bytes =", 
                   format(structure(bytes, class = "object_size"), units = "auto")), 
                   indent = 2)
    .message("If the size of the requested data is close to or above the free shared RAM memory, R may crash.")
    .message("If the size of the requested data is close to or above the half of the free RAM memory, R may crash.")
    .message(paste0("Will now proceed to read and process ", length(work_pieces), " data files:"))
    if (length(work_pieces) < 30) {
      lapply(work_pieces, function (x) .message(x[['file_path']], indent = 2))
    } else {
      .message("The list of files is long. You can check it after .Load() finishes in the output '$Files'.", indent = 2, exdent = 5)
    }
  }

  # Build the cluster of processes that will do the work and dispatch work pieces.
  # The function .LoadDataFile is applied to each work piece. This function will
  # open the data file, regrid if needed, subset, apply the mask, 
  # compute and apply the weights if needed,
  # disable extreme values and store in the shared memory matrix.
#print("O")
  if (!silent) {
    .message("Loading... This may take several minutes...")
    if (progress_message != '') {
      .message(progress_message, appendLF = FALSE)
    }
  }
  if (num_procs == 1) {
    found_files <- lapply(work_pieces, .LoadDataFile, 
                          shared_matrix_pointer = shared_matrix_pointer,
                          file_data_reader = file_data_reader, 
                          transform = transform, 
                          transform_params = transform_params,
                          silent = silent, debug = debug)
  } else {
    cluster <- makeCluster(num_procs, outfile = "")
    # Send the heavy work to the workers
    work_errors <- try({
      found_files <- clusterApplyLB(cluster, work_pieces, .LoadDataFile, 
                                    shared_matrix_pointer = shared_matrix_pointer,
                                    file_data_reader = file_data_reader,
                                    transform = transform, 
                                    transform_params = transform_params,
                                    silent = silent, debug = debug)
    })
    stopCluster(cluster)
  }

  if (!silent) {
    if (progress_message != '') {
      .message("\n", tag = '')
    }
  }
#print("P")
  data_array <- array(bigmemory::as.matrix(data_array), dim = final_dims)
  gc()

  failed_pieces <- work_pieces[which(unlist(found_files))]
  for (failed_piece in failed_pieces) {
    array_of_not_found_files <- do.call('[<-', 
      c(list(array_of_not_found_files), 
        as.list(failed_piece[['file_indices_in_array_of_files']]),
        list(value = TRUE)))
  }
  if (any(array_of_not_found_files)) {
    for (i in 1:prod(dim(array_of_files_to_load))) {
      if (is.na(array_of_not_found_files[i])) {
        array_of_files_to_load[i] <- NA
      } else {
        if (array_of_not_found_files[i]) {
          array_of_not_found_files[i] <- array_of_files_to_load[i]
          array_of_files_to_load[i] <- NA
        } else {
          array_of_not_found_files[i] <- NA
        }
      }
    }
  } else {
    array_of_not_found_files <- NULL
  }
  #Hard:
  #- Reordering lon and lat: after .Load
  #- Interpolating: in LoadDataFile
  #- New config file system ?
  #Medium:
  #- Do that if a vector of selectors is provided without dimname, it's assigned the name of the inner dimension.
  #- Tolerance for date matching (0.5 or 'X days').
  #- Translation of dim and var names.
  #- Define dimension names in file
  #- Define variable names in file
  #- Allow to match closest dates (does it work already?)
  #- Add option to collapse dimensions
  #- Sort out problem of repeated open() and missing close()
  #- Recover all Load options
  #- Wrapper -> new Load()
  #- Review TODOs
  #Bonus:
  #- Read metadata
  #- Enhance regexps
  ## TODO: varmin and varmax
  ## TODO: use the dim_checkers
  ## TODO: change all prints to warnings
  ## TODO: make all warnings only appear if silent = FALSE.

  # Replace the vars and common vars by the transformed vars and common vars
  for (i in 1:length(dat)) {
    if (length(names(transformed_vars[[i]])) > 0) {
      picked_vars[[i]][names(transformed_vars[[i]])] <- transformed_vars[[i]]
    } else if (length(names(picked_vars_ordered[[i]])) > 0) {
      picked_vars[[i]][names(picked_vars_ordered[[i]])] <- picked_vars_ordered[[i]]
    }
  }
  if (length(names(transformed_common_vars)) > 0) {
    picked_common_vars[names(transformed_common_vars)] <- transformed_common_vars
  } else if (length(names(picked_common_vars_ordered)) > 0) {
    picked_common_vars[names(picked_common_vars_ordered)] <- picked_common_vars_ordered
  }
if (debug) {
print("-> THE TRANSFORMED VARS:")
print(str(transformed_vars))
print("-> THE PICKED VARS:")
print(str(picked_vars))
}
  if (!silent) {
    .message("Successfully retrieved data.")
  }

  list(Data = data_array, 
       Variables = c(list(common = picked_common_vars), picked_vars),
       Files = array_of_files_to_load, 
       NotFoundFiles = array_of_not_found_files)
}

# This function is the responsible for loading the data of each work
# piece.
.LoadDataFile <- function(work_piece, shared_matrix_pointer, 
                          file_data_reader, 
                          transform, transform_params,
                          silent = FALSE, debug = FALSE) {
#  suppressPackageStartupMessages({library(bigmemory)})
### TODO: Specify dependencies as parameter
#  suppressPackageStartupMessages({library(ncdf4)})
  # Auxiliar function to convert array indices to lineal indices.
  # This function expects a numeric vector of single indices or a list of 
  # numeric vectors with multiple indices for each dimension, and a 
  # numeric vector of dimension sizes.
  .arrayIndices2VectorIndices <- function(indices, dims) {
    # Check indices
    if (!is.list(indices)) {
      if (is.numeric(indices)) {
        indices <- as.list(indices)
      } else {
        stop("Parameter 'indices' must be a numeric vector or a list of ",
             "numeric vectors.")
      }
    }
    sapply(indices, 
      function(x) {
        if (!is.numeric(x)) {
          stop("Parameter 'indices' must be a numeric vector or a list ",
               "of numeric vectors.")
        } else if (any(is.na(x) | is.nan(x) | is.infinite(x))) {
          stop("Parameter 'indices' must not contain NA/NaN/Inf values.")
        } else if (length(x) < 1) {
          stop("Parameter 'indices' must contain at least one index ",
               "for each dimension.")
        }
      })
  
    # Check dims
    if (!is.numeric(dims)) {
      stop("Parameter 'dims' must be a numeric vector.")
    } else if (any(is.na(dims) | is.nan(dims) | is.infinite(dims))) {
      stop("Parameter 'dims' must not contain NA/NaN/Inf values.")
    } else if (any(sapply(dims, length) != 1)) {
      stop("Parameter 'dims' must contain a single numeric element for ",
           "each dimension.")
    } else if (length(indices) != length(dims)) {
      stop("There must be as many dimension selectors in 'indices' as ",
           "dimensions in 'dims'.")
    }
  
    find_indices <- function(indices, dims) {
      if (max(indices[[1]]) > dims[1] || min(indices[[1]]) < 1) {
        stop("Provided indices out of range.")
      }
      if (length(dims) == 1) {
        indices[[1]]
      } else {
        found_indices <- find_indices(indices[-1], dims[-1])
        new_found_indices <- c()
        for (i in 1:length(indices[[1]])) {
          new_found_indices <- c(new_found_indices, (indices[[1]][i] - 1) * prod(dims[-1]) + found_indices)
        }
        new_found_indices
      }
    }
  
    indices <- rev(indices)
    dims <- rev(dims)
  
    find_indices(indices, dims)
  }

#print("1")
  store_indices <- as.list(work_piece[['store_position']])
  first_round_indices <- work_piece[['first_round_indices']]
  second_round_indices <- work_piece[['second_round_indices']]
#print("2")
  file_to_open <- work_piece[['file_path']]
  sub_array <- file_data_reader(file_to_open, NULL, 
                                work_piece[['file_selectors']],
                                first_round_indices)
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> LOADING A WORK PIECE")
print("-> STRUCTURE OF READ UNTRANSFORMED DATA:")
print(str(sub_array))
print("-> STRUCTURE OF VARIABLES TO TRANSFORM:")
print(str(work_piece[['vars_to_transform']]))
print("-> COMMON ARRAY DIMENSIONS:")
print(str(work_piece[['store_dims']]))
}
}
  if (!is.null(sub_array)) {
    # Apply data transformation once we have the data arrays.
    if (!is.null(transform)) {
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> PROCEEDING TO TRANSFORM ARRAY")
print("-> DIMENSIONS OF ARRAY RIGHT BEFORE TRANSFORMING:")
print(dim(sub_array))
}
}
      sub_array <- do.call(transform, c(list(data_array = sub_array,
                                             variables = work_piece[['vars_to_transform']],
                                             file_dims = names(work_piece[['file_selectors']])),
                                             transform_params))
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER TRANSFORMING:")
print(str(sub_array))
print("-> DIMENSIONS OF ARRAY RIGHT AFTER TRANSFORMING:")
print(dim(sub_array$data_array))
}
}
      sub_array <- sub_array$data_array
      # Subset with second round of indices
      dims_to_crop <- which(!sapply(second_round_indices, is.null))
      if (length(dims_to_crop) > 0) {
        dimnames_to_crop <- names(second_round_indices)[dims_to_crop]
        sub_array <- Subset(sub_array, dimnames_to_crop, 
                            second_round_indices[dimnames_to_crop])
      }
if (debug) {
if (all(unlist(store_indices[1:6]) == 1)) {
print("-> STRUCTURE OF ARRAY AND VARIABLES RIGHT AFTER SUBSETTING WITH 2nd ROUND INDICES:")
print(str(sub_array))
}
}
    }
    store_indices <- lapply(names(store_indices), 
      function (x) {
        if (!(x %in% names(first_round_indices))) {
          store_indices[[x]]
        } else if (is.null(second_round_indices[[x]])) {
          1:dim(sub_array)[x]
        } else {
          if (is.numeric(second_round_indices[[x]])) {
            ## TODO: Review carefully this line. Inner indices are all 
            ## aligned to the left-most positions. If dataset A has longitudes
            ## 1, 2, 3, 4 but dataset B has only longitudes 3 and 4, then
            ## they will be stored as follows:
            ## 1, 2, 3, 4
            ## 3, 4, NA, NA
            ##x - min(x) + 1
            1:length(second_round_indices[[x]])
          } else {
            1:length(second_round_indices[[x]])
          }
        }
      })
if (debug) {
Nicolau Manubens Gil's avatar
Nicolau Manubens Gil committed
if (all(unlist(store_indices) == 1)) {
print("-> STRUCTURE OF FIRST ROUND INDICES FOR THIS WORK PIECE:")
print(str(first_round_indices))
print("-> STRUCTURE OF SECOND ROUND INDICES FOR THIS WORK PIECE:")
print(str(second_round_indices))
print("-> STRUCTURE OF STORE INDICES FOR THIS WORK PIECE:")
print(str(store_indices))
}
}
    matrix_indices <- .arrayIndices2VectorIndices(store_indices, work_piece[['store_dims']])
    data_array <- bigmemory::attach.big.matrix(shared_matrix_pointer)
Nicolau Manubens Gil's avatar
Nicolau Manubens Gil committed
    data_array[matrix_indices] <- as.vector(sub_array)
    rm(data_array)
    gc()
  }
  if (!is.null(work_piece[['progress_amount']]) && !silent) {
    message(work_piece[['progress_amount']], appendLF = FALSE)
  }
  is.null(sub_array)
}