This function serves as a high-level interface to the 'targets' and 'crew'
packages. It programmatically generates a _targets.R pipeline file based on
user-provided inputs, runs the pipeline, and returns the final result. It is
designed to abstract the complexity of setting up a 'targets'-based workflow
for common "map-reduce" or "split-apply-combine" tasks.
Usage
tarflowr_run(
work_units,
process_func,
combine_func = NULL,
project_dir,
result_target_name = NULL,
packages = c(),
metadata = list(),
workers = 1L,
crew_controller = NULL,
seed = NULL,
error = "stop",
force = FALSE,
callr_function = callr::r,
callr_arguments = list(stdout = "|", stderr = "2>&1")
)Arguments
- work_units
list or vector. Each element represents a single unit of work to be processed.
- process_func
function. Takes one element of
work_unitsas its first argument and returns the "processed" result.- combine_func
function. Takes a list of all processed results from
process_funcand combines them into a single, final object. IfNULL, the final result will be the list of all processed results.- project_dir
character. Path to the directory where the tarflowr project will be created. It will be created if it does not exist.
- result_target_name
character. Name of the last target in the pipeline, which contains the result of evaluating
combine_funcon the list of work units. IfNULL(default) the final target name is based on the project directory name, with the suffix"_result".- packages
character. Vector of R package names that are required by the
process_funcandcombine_func. These will be passed as the optionpackagestotargets::tar_option_set()to be loaded on each worker.- metadata
list. Named list of elements to write to
_tarflowr_meta.yamlfile on successful run.- workers
integer. Number of local parallel workers to use via the
crewpackage. This is only used when the defaultcrew_controllerisNULL.- crew_controller
crew_class_controller. Custom
crewcontroller. DefaultNULLusescrew::crew_controller_local()with the specified number ofworkers(parallel processes).targets::tar_make()is called with the R option"targets.controllerset.- seed
integer. Random number seed. Passed to
targets::tar_option_set()via argumentseed.- error
character. Error behavior. Either
"stop"(default) or"continue". Passed totargets::tar_option_set()via argumenterror.- force
logical. If
FALSE(default), the hash of the input arguments will be checked to determine if the work units or _targets.R file are updated. IfTRUEnew work units and targets scripts will be written.- callr_function
function. Passed to
targets::tar_make(). Defaultcallr::r()uses a new R session. Usecallr::r_bg()to run in background and suppress targets pipeline output.- callr_arguments
list. Arguments passed to
targets::tar_make().
Value
The final combined result of the workflow, as returned by
combine_func, or the list of all processed results when combine_func is
NULL.
Details
The function works by creating a self-contained project in the
project_dir. It serializes the user's work_units and functions
(process_func, combine_func) into this directory. It then generates a
_targets.R script that orchestrates the following steps:
Load the
work_unitsMap the
process_funcover each element of thework_unitsusing specified crew controllerCombine the results of the processing step using
combine_func(optional)Execute the pipeline with
targets::tar_make()Load the final result into original R session
Examples
if (FALSE) { # \dontrun{
td <- file.path(tempdir(), "_my_first_project")
# define the work: a list of numbers
my_work <- as.list(1:10)
# define the processing function to work on one item
square_a_number <- function(x) {
Sys.sleep(1)
return(x^2)
}
# define the combine function for the list of results
sum_all_results <- function(results_list) {
sum(unlist(results_list))
}
# run the workflow
final_sum <- tarflowr_run(
work_units = my_work,
process_func = square_a_number,
combine_func = sum_all_results,
project_dir = td,
workers = 4
)
# final target value is 385
print(final_sum)
# now inspect "_my_first_project" folder to see the
# generated _targets.R file and the _targets/ cache
# rerun and get the result instantly from the cache:
cached_sum <- tarflowr_run(
work_units = my_work,
process_func = square_a_number,
combine_func = sum_all_results,
project_dir = td,
workers = 4
)
print(cached_sum)
} # }