--- title: "Running simulations in parallel" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Running simulations in parallel} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) ``` ```{r setup} library(confidenceSim) library(dplyr) library(parallel) library(pbapply) set.seed(613) requireNamespace("plyr", quietly = TRUE) ``` ``` ``` When running thousands of simulations, it is advisable to split the tasks across more than one node and run each node in parallel. This vignette will demonstrate one method to do this. The process is as follows: - Create or load input parameters - Create a cluster of nodes with `parallel` - Split the simulations across nodes in the cluster with `pbapply` (and catch your errors!) - Gather the results from across the clusters into a single data.frame - Summarize results with `dplyr` ## 1. Load inputs Load the example parameter list stored as `inputs`. ``` # load data data(inputs) # create temporary directory directory <- tempdir() ``` ## 2. Initiate cluster For this example we will run across two clusters. ``` clusters <- 2 cl <- makeCluster(clusters) ``` ## 3. Run simulations in parallel `pblapply` allows you to run your simulations as you would with `lapply` across an array of numbers. The numbers get passed to `runSingleTrial` as the first input, `sim.no`. The simulation code is written so that results from one node are kept separate from each other. This is done by using `Sys.getpid()` to name a sub-directory to store the results from that node. This way, results are not overwritten. ``` num.sims <- 4 res.list <- pblapply(1:num.sims, runSingleTrial, inputs=inputs, save.plot=FALSE, directory=directory, cl=cl) ``` ### Error handling Hopefully these run without error. However, if there is an error on any of the nodes, it will stop the whole process. The good news is, it is possible to instruct your code to skip to the next simulation if there is an error. In addition, you can log any errors by sending the error message to a text file. After the simulations have stopped running, you can read the error report. Create an error log: ``` # make sure there is a slash if ((! endsWith(directory, "/")) & (! endsWith(directory, "\\"))){ directory = paste0(directory, "/") } error_log <- paste0(directory, "error_log.txt") # make sure it exists # Ensure the log file exists if (!file.exists(error_log)) { file.create(error_log) } ``` We will use a `tryCatch` structure. It is a bit messier so we have to manually export all variables into the clusters with `clusterExport`, something we didn't have to do above, for some reason. ``` verbose <- FALSE save.plot <- TRUE clusterExport(cl, varlist = c("runSingleTrial", "inputs", "directory", "verbose", "save.plot", "error_log")) ``` Now encase the trial simulation in `tryCatch` and append the error to the error log file. The error message is printed along time a reference to the `special` parameter from `inputs`, which can be used to pass anything to `inputs`. In this case we've created a string for response rates. ``` res.list <- pblapply(1:num.sims, function(i) { tryCatch( runSingleTrial(i, inputs=inputs, save.plot=save.plot, directory=directory, verbose=verbose), error = function(e) { # Append error message to the log file message <- paste(Sys.time(), " - Error in simulation ", i, " for effect ", inputs$special, ": ", e$message, "\n") cat(message, file = error_log, append = TRUE) NULL # Optional: return NULL after logging } ) }, cl=cl) ``` Stop the cluster once you don't need it anymore: ``` stopCluster(cl) ``` ## 4. Collect results from nodes Now we have to get the results. I wrote a little code to collect the files together. It finds the "node" folders, named by the node number, and finds a cluster of them created at the same time: ``` collect.nodes <- function(clusters, directory){ dirs = list.dirs(directory)[-1] # exclude parent basenames = lapply(dirs, basename) # find basenames that are numeric only numfiles = suppressWarnings(lapply(basenames, as.numeric)) # mask list of directories dirs = dirs[which(!is.na(numfiles))] # get the time stamp for folders as numeric to 2 decimal places times=lapply(dirs,function(x){ info = file.info(x) t.str = strptime(info$ctime, "%Y-%m-%d %H:%M:%S") round(as.numeric(format(t.str, "%H")) + as.numeric(format(t.str, "%M"))/60, 2) } ) # find which ones were created at the same time, in the cluster dirs = dirs[which(times == unique(times)[unlist(lapply(unique(times), function(x) { sum(times==x)==clusters }))][[1]])] # gather all csvs in dirs csvs = unlist(lapply(dirs, function(x){dir(x, full.names=T, pattern=".csv+") })) return(lapply(csvs, read.csv)) } # execute and bind results into a list res.list <- collect.nodes(clusters, directory) res.list <- do.call("rbind", res.list) ``` ## 5. Summarize with `dplyr` Now all the results are in a list, we can summarize them with `dplyr`: ``` res.list <- data.frame(res.list) # mutate to make values numeric # get rid of fields that at only useful for multi-arm # change the 'misc' field to a specific description res.list %>% mutate_at(c("N.arm", "N.pair", "N.known", "N", "interim.arm", "interim.total", "arm", "conf.benefit", "conf.lack.meaningful.benefit", "mean", "standard.error", "resp.ctrl", "resp.trmt"), as.numeric) %>% mutate(interim=interim.total, .keep="unused") %>% mutate(fx=misc, .keep="unused") %>% select(!c(interim.arm, N.pair, arm)) %>% arrange(sim.no) %>% relocate(sim.no, interim) -> res.list ``` To get the outcome of each trial, we can define another data.frame, `res.file`: ``` # get the final outcome for each arm in each simulation res.list %>% group_by(fx, N.looks, sim.no)%>% filter(interim==max(interim)) -> res.final ``` Now we can summarize across all the simulations to see trends: ``` res.final %>% group_by(fx) %>% summarise(NSim=max(sim.no), NTrials =n(), StopFutility = mean(action=="stop.futile"), StopInferiority = mean(action=="stop.inferior"), StopEfficacy = mean(action=="stop.efficacy"), StopAny = mean(action=="stop.efficacy") + mean(action=="stop.inferior") + mean(action=="stop.futile"), MedianInterimStop =median(replace(interim, interim==6, NA), na.rm = TRUE), StopMaxNoEffect = mean(action=="fail"), StopMaxPosEffect = mean(action=="efficacy.significant"), OverallPosEffect = mean(action=="efficacy.significant") + mean(action=="stop.efficacy"), OverallNoEffect = mean(action=="fail") + mean(action=="stop.futile"), OverallNegEffect = mean(action=="stop.inferior") + mean(action=="inferiority.significant"), MedianSampleSize = median(N),) -> summary ``` As we have only run four simulations, the summary will not be particularly insightful but now you have an idea of how to summarize tens of thousands of simulations.