[!NOTE]
This package is experimental. The internals are hacky and subject to change. It is a proof of concept: kicking the tires on a system that can float above the level of a single ‘targets’ pipeline.
The goal of {tarflowr} (say: “tar flower”) is to provide a simple, high-level interface for creating and executing ‘targets’ pipelines, so users can focus more on their specific analysis, not on pipeline orchestration.
The philosophy is based on the concept of arbitrary “work units.” A {tarflowr} workflow is a {targets} pipeline, but work units within that workflow can be {targets} pipelines themselves: either generated by {tarflowr} or custom-made. This abstraction allows one to develop and test each sub-project independently, and then use {tarflowr} as an “orchestrator” to run and combine sub-projects at scale.
The tarflowr_run()
function serves as a high-level interface to the ‘targets’ and ‘crew’ packages. It programmatically generates a _targets.R
pipeline file based on user-provided inputs, runs the pipeline, and returns the final result. It is designed to abstract the complexity of setting up a ‘targets’-based workflow for common “map-reduce” or “split-apply-combine” tasks.
Installation
You can install the development version of tarflowr like so:
if (!require("remotes")) install.packages("remotes")
remotes::install_github("brownag/tarflowr")
Example
This is a basic example of calculating a sum of squares in parallel:
library(tarflowr)
PROJECT_DIR <- file.path(tempdir(), "_my_first_project")
# define the work: a list of numbers
my_work <- as.list(1:10)
# define the processing function to work on one item
square_a_number <- function(x) {
Sys.sleep(1*1)
return(x^2)
}
# define the combine function for the list of results
sum_all_results <- function(results_list) {
sum(unlist(results_list))
}
# run the workflow
final_sum <- tarflowr_run(
work_units = my_work,
process_func = square_a_number,
combine_func = sum_all_results,
project_dir = PROJECT_DIR,
workers = 4
)
#> ℹ Creating project directory: '/tmp/RtmppjxsU0/_my_first_project'
#>
#> ── Starting tarflowr workflow ──────────────────────────────────────────────────
#> • Project directory: '/tmp/RtmppjxsU0/_my_first_project'
#> • Number of work units: 10
#> • Parallel workers: 4
#> ℹ Executing targets pipeline...
#> + user_functions_file dispatched
#> ✔ user_functions_file completed [1ms, 148 B]
#> + work_units_file dispatched
#> ✔ work_units_file completed [0ms, 137 B]
#> + user_functions dispatched
#> ✔ user_functions completed [0ms, 150 B]
#> + project_work_units dispatched
#> ✔ project_work_units completed [0ms, 137 B]
#> + work_seq dispatched
#> ✔ work_seq completed [0ms, 99 B]
#> + processed_unit declared [10 branches]
#> ✔ processed_unit completed [10s, 514 B]
#> + my_first_project_result dispatched
#> ✔ my_first_project_result completed [0ms, 53 B]
#> ✔ ended pipeline [10.4s, 16 completed, 0 skipped]
# result is 385
final_sum
#> [1] 385
# see "_my_first_project" for _targets.R file and _targets/ cache.
# when re-run, we get the result from the cache:
cached_sum <- tarflowr_run(
work_units = my_work,
process_func = square_a_number,
combine_func = sum_all_results,
project_dir = PROJECT_DIR,
workers = 4,
)
#>
#> ── Starting tarflowr workflow ──────────────────────────────────────────────────
#> • Project directory: '/tmp/RtmppjxsU0/_my_first_project'
#> • Number of work units: 10
#> • Parallel workers: 4
#> ℹ Executing targets pipeline...
#> + user_functions dispatched
#> ✔ user_functions completed [1ms, 150 B]
#> + processed_unit declared [10 branches]
#> ✔ ended pipeline [234ms, 1 completed, 15 skipped]
cached_sum
#> [1] 385