Skip to contents

This function serves as a high-level interface to the 'targets' and 'crew' packages. It programmatically generates a _targets.R pipeline file based on user-provided inputs, runs the pipeline, and returns the final result. It is designed to abstract the complexity of setting up a 'targets'-based workflow for common "map-reduce" or "split-apply-combine" tasks.

Usage

tarflowr_run(
  work_units,
  process_func,
  combine_func = NULL,
  project_dir,
  result_target_name = NULL,
  packages = c(),
  metadata = list(),
  workers = 1L,
  crew_controller = NULL,
  seed = NULL,
  error = "stop",
  force = FALSE,
  callr_function = callr::r,
  callr_arguments = list(stdout = "|", stderr = "2>&1")
)

Arguments

work_units

list or vector. Each element represents a single unit of work to be processed.

process_func

function. Takes one element of work_units as its first argument and returns the "processed" result.

combine_func

function. Takes a list of all processed results from process_func and combines them into a single, final object. If NULL, the final result will be the list of all processed results.

project_dir

character. Path to the directory where the tarflowr project will be created. It will be created if it does not exist.

result_target_name

character. Name of the last target in the pipeline, which contains the result of evaluating combine_func on the list of work units. If NULL (default) the final target name is based on the project directory name, with the suffix "_result".

packages

character. Vector of R package names that are required by the process_func and combine_func. These will be passed as the option packages to targets::tar_option_set() to be loaded on each worker.

metadata

list. Named list of elements to write to _tarflowr_meta.yaml file on successful run.

workers

integer. Number of local parallel workers to use via the crew package. This is only used when the default crew_controller is NULL.

crew_controller

crew_class_controller. Custom crew controller. Default NULL uses crew::crew_controller_local() with the specified number of workers (parallel processes). targets::tar_make() is called with the R option "targets.controller set.

seed

integer. Random number seed. Passed to targets::tar_option_set() via argument seed.

error

character. Error behavior. Either "stop" (default) or "continue". Passed to targets::tar_option_set() via argument error.

force

logical. If FALSE (default), the hash of the input arguments will be checked to determine if the work units or _targets.R file are updated. If TRUE new work units and targets scripts will be written.

callr_function

function. Passed to targets::tar_make(). Default callr::r() uses a new R session. Use callr::r_bg() to run in background and suppress targets pipeline output.

callr_arguments

list. Arguments passed to targets::tar_make().

Value

The final combined result of the workflow, as returned by combine_func, or the list of all processed results when combine_func is NULL.

Details

The function works by creating a self-contained project in the project_dir. It serializes the user's work_units and functions (process_func, combine_func) into this directory. It then generates a _targets.R script that orchestrates the following steps:

  1. Load the work_units

  2. Map the process_func over each element of the work_units using specified crew controller

  3. Combine the results of the processing step using combine_func (optional)

  4. Execute the pipeline with targets::tar_make()

  5. Load the final result into original R session

Author

Andrew G. Brown

Examples

if (FALSE) { # \dontrun{

td <- file.path(tempdir(), "_my_first_project")

# define the work: a list of numbers
my_work <- as.list(1:10)

# define the processing function to work on one item
square_a_number <- function(x) {
  Sys.sleep(1)
  return(x^2)
}

# define the combine function for the list of results
sum_all_results <- function(results_list) {
  sum(unlist(results_list))
}

# run the workflow
final_sum <- tarflowr_run(
  work_units = my_work,
  process_func = square_a_number,
  combine_func = sum_all_results,
  project_dir = td,
  workers = 4
)

# final target value is 385
print(final_sum)

# now inspect "_my_first_project" folder to see the
# generated _targets.R file and the _targets/ cache

# rerun and get the result instantly from the cache:
cached_sum <- tarflowr_run(
  work_units = my_work,
  process_func = square_a_number,
  combine_func = sum_all_results,
  project_dir = td,
  workers = 4
)
print(cached_sum)

} # }