Implementing mclapply() on Windows: a primer on embarrassingly parallel computation on multicore systems with R

An easy way to run R code in parallel on a multicore system is with the mclapply() function. Unfortunately, mclapply() does not work on Windows machines because the mclapply() implementation relies on forking and Windows does not support forking.

For me, this is somewhat of a headache because I am used to using mclapply(), and yet I need to support Windows users for one of my projects.

My hackish solution is to implement a fake mclapply() for Windows users with one of the Windows compatible parallel R strategies. For the impatient, it works like this:

require(parallel) 

## On Windows, the following line will take about 40 seconds to run
## because by default, mclapply is implemented as a serial function
## on Windows systems.
system.time( mclapply(1:4, function(xx){ Sys.sleep(10) }) )
##    user  system elapsed 
##    0.00    0.00   40.06 

## If we try to force  mclapply() to use multiple cores on Windows, it doesn't work:
system.time( mclapply(1:4, function(xx){ Sys.sleep(10) }), mc.cores=4 ) )
## Error in mclapply(1:4, function(xx){ Sys.sleep(10) }), mc.cores=4 ) : 
##   'mc.cores' > 1 is not supported on Windows

## Using the ideas developed in this post, we can implement
## a parallel (as it should be!) mclapply() on Windows. 
source("http://www.stat.cmu.edu/~nmv/setup/mclapply.hack.R")
## 
##     *** Microsoft Windows detected ***
##     
##     For technical reasons, the MS Windows version of mclapply()
##     is implemented as a serial function instead of a parallel
##     function.    
## 
##     As a quick hack, we replace this serial version of mclapply()
##     with a wrapper to parLapply() for this R session. Please see
## 
##       http://www.stat.cmu.edu/~nmv/2014/07/14/implementing-mclapply-on-windows 
## 
##     for details.

## And now the code from above will take about 10 seconds (plus overhead). 
system.time( mclapply(1:4, function(xx){ Sys.sleep(10) }) )
##    user  system elapsed 
##    0.01    0.06   11.25 

As we will see, however, there are a few reasons why no one has done this in the past.

Our goal: easy, Linux/Mac-like parallelization

On Linux or Mac, it is is very simple to parallelize R code across multiple cores. Consider the following function

wait.then.square <- function(xx){
    # Wait for one second
    Sys.sleep(1);
    # Square the argument 
    xx^2 } 

If we want to run it on the integers from 1 to 4 in serial, it will take about 4 seconds

## Run in serial 
system.time( serial.output <- lapply( 1:4, wait.then.square ) )
##  user  system elapsed 
## 0.000   0.000   4.004 

If we run it in parallel, it will take about 1 second:

## Run in parallel
require(parallel) 
## Note two changes: 
##   (1)  lapply to mclapply 
##   (2)  mc.cores (the number of processors to use in parallel) 
system.time( par.output <- mclapply( 1:4, wait.then.square,
                                     mc.cores=4             ) )
##  user  system elapsed 
## 0.008   0.000   1.008 

And we can verify that the output is, in fact, the same:

## Check if the output is the same
all.equal( serial.output, par.output )
## [1] TRUE

This toy example is a little unrealistic. It is often the case, at least for the work that I do, that the parallelized function either (i) uses an R library that isn't loaded at startup by deafault, e.g. the Matrix library for sparse matrices, or (ii) needs to access an object in the global environment, e.g. a variable.

The magic of mclapply() is that uses fork to replicate the R process into several child processes, tells the children to do the work, and then aggregates the children's results for you. Since it uses forking, the entire R session -- all of its variables, functions, and packages -- is replicated among the children. Therefore, you can do things like this:

## Setup a global variable that uses a non-base package
require(Matrix)
( a.global.variable <- Diagonal(3) )
## 3 x 3 diagonal matrix of class "ddiMatrix"
##      [,1] [,2] [,3]
## [1,]    1    .    .
## [2,]    .    1    .
## [3,]    .    .    1

## Write a proof-of-concept lapply
serial.output <- lapply( 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    })

## Parallelize it
par.output <- mclapply( 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    }, mc.cores=4)

## Check that they are equal 
all.equal(serial.output, par.output)
## [1] TRUE

It is, at least to me, a little magical! I don't have to think much.

The problem: Windows parallelization requires more setup

Windows doesn't fork. It is a limitation of the operating system that there is no easy way to replicate the parent R session to create new child R sessions that can do the work.

R gets around this by pretending that each core on the machine is an entirely separate machine. This makes the setup a little more involved because the user must:

  1. create a "cluster" of child processes,
  2. load the necessary R packages on the cluster,
  3. copy the necessary R objects to the cluster,
  4. distribute work to the cluster, and finally
  5. stop the cluster.

Recall that the setup of the example is as follows:

## Load packages 
   require(parallel)
   require(Matrix)
##
## Define example function and the global variable 
   wait.then.square <- function(xx){
       # Wait for one second
       Sys.sleep(1);
       # Square the argument 
      xx^2 } 
   a.global.variable <- Diagonal(3) 

and the serial version of the code is:

serial.output <- lapply( 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    })

Parallelizing this code requires more setup with the "cluster" approach.

## Step 1: Create a cluster of child processes 
cl <- makeCluster( 4 )

## Step 2: Load the necessary R package(s)
## N.B. length(cl) is the number of child processes
##      in the cluster 
par.setup <- parLapply( cl, 1:length(cl),
    function(xx) {
        require(Matrix) 
    })

## Step 3: Distribute the necessary R objects 
clusterExport( cl, c('wait.then.square', 'a.global.variable') )

## Step 4: Do the computation
par.output <- parLapply(cl, 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    })

## Step 5: Remember to stop the cluster!
stopCluster(cl)

## Check that the parallel and serial output are the same
all.equal(serial.output, par.output)
## [1] TRUE

This approach works on Windows, Linux, and Mac, but it requires a bit more bookkeeping.

The hack: implementing mclapply() with parLapply()

Even though Windows doesn't fork, I'd like to pretend that it does so that I can use the simpler syntax of mclapply(). My approach is to wrap the bookkeeping code for parLapply() into a single function: mclapply.hack().

This is likely a bad idea for general use. Creating and destroying clusters for every mclapply.hack() call defeats the advantages of having a persistent cluster to farm out work to. Copying every R object from the parent session to all of the cluster sessions takes up much more memory (and time!) then simply forking processes. Use this approach with caution!

The final code is as follows.

mclapply.hack <- function(...) {
    ## Create a cluster
    ## ... How many workers do you need?
    ## ... N.B. list(...)[[1]] returns the first 
    ##          argument passed to the function. In
    ##          this case it is the list to iterate over
    size.of.list <- length(list(...)[[1]])
    cl <- makeCluster( min(size.of.list, detectCores()) )

    ## Find out the names of the loaded packages 
    loaded.package.names <- c(
        ## Base packages
        sessionInfo()$basePkgs,
        ## Additional packages
        names( sessionInfo()$otherPkgs ))

    ## N.B. tryCatch() allows us to properly shut down the 
    ##      cluster if an error in our code halts execution
    ##      of the function. For details see: help(tryCatch)
    tryCatch( {

       ## Copy over all of the objects within scope to
       ## all clusters. 
       ## 
       ## The approach is as follows: Beginning with the 
       ## current environment, copy over all objects within
       ## the environment to all clusters, and then repeat
       ## the process with the parent environment. 
       ##
       this.env <- environment()
       while( identical( this.env, globalenv() ) == FALSE ) {
           clusterExport(cl,
                         ls(all.names=TRUE, env=this.env),
                         envir=this.env)
           this.env <- parent.env(environment())
       }
       ## repeat for the global environment
       clusterExport(cl,
                     ls(all.names=TRUE, env=globalenv()),
                     envir=globalenv())
       
       ## Load the libraries on all the clusters
       ## N.B. length(cl) returns the number of clusters
       parLapply( cl, 1:length(cl), function(xx){
           lapply(loaded.package.names, function(yy) {
               ## N.B. the character.only option of 
               ##      require() allows you to give the 
               ##      name of a package as a string. 
               require(yy , character.only=TRUE)})
       })
       
       ## Run the lapply in parallel 
       return( parLapply( cl, ...) )
    }, finally = {        
       ## Stop the cluster
       stopCluster(cl)
    })
}

We can test it as follows:

system.time( serial.output <- lapply( 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    })) 
##    user  system elapsed 
##   0.020   0.000   4.022 

system.time( par.output <- mclapply.hack( 1:4,
    function(xx) {
       return( wait.then.square(xx) + a.global.variable )
    })) 
##    user  system elapsed 
##   0.024   0.012   3.683 

all.equal( serial.output, par.output )
## [1] TRUE

In this case, it works, but we don't save much time because of the bookkeeping required to setup the cluster for parLapply(). If we run a more intense function, say one that takes 10 seconds per iteration to run, then we can begin to see gains:

wait.longer.then.square <- function(xx){
    ## Wait for ten seconds
    Sys.sleep(10);
    ## Square the argument
    xx^2 } 

system.time( serial.output <- lapply( 1:4,
    function(xx) {
       return( wait.longer.then.square(xx) + a.global.variable )
    })) 
##    user  system elapsed 
##   0.020   0.000  40.059

system.time( par.output <- mclapply.hack( 1:4,
    function(xx) {
       return( wait.longer.then.square(xx) + a.global.variable )
    })) 
##    user  system elapsed 
##  0.024   0.008  12.794 

all.equal( serial.output, par.output )
## [1] TRUE

The final answer: A multi-platform wrapper for mclappy()

My motivation for implementing mclapply() on Windows is so that code I wrote on Linux will "just work" on Windows.

I wrote a quick script to implement mclapply.hack() as mclapply() on Windows machines and leave mclapply() alone on Linux and Mac machines. The code is as follows:

##
## mclapply.hack.R
##
## Nathan VanHoudnos
## nathanvan AT northwestern FULL STOP edu
## July 14, 2014
##
## A script to implement a hackish version of 
## parallel:mclapply() on Windows machines.
## On Linux or Mac, the script has no effect
## beyond loading the parallel library. 

require(parallel)    

## Define the hack
mclapply.hack <- function(...) {
    ## Create a cluster
    size.of.list <- length(list(...)[[1]])
    cl <- makeCluster( min(size.of.list, detectCores()) )

    ## Find out the names of the loaded packages 
    loaded.package.names <- c(
        ## Base packages
        sessionInfo()$basePkgs,
        ## Additional packages
        names( sessionInfo()$otherPkgs ))

    tryCatch( {

       ## Copy over all of the objects within scope to
       ## all clusters. 
       this.env <- environment()
       while( identical( this.env, globalenv() ) == FALSE ) {
           clusterExport(cl,
                         ls(all.names=TRUE, env=this.env),
                         envir=this.env)
           this.env <- parent.env(environment())
       }
       clusterExport(cl,
                     ls(all.names=TRUE, env=globalenv()),
                     envir=globalenv())
       
       ## Load the libraries on all the clusters
       ## N.B. length(cl) returns the number of clusters
       parLapply( cl, 1:length(cl), function(xx){
           lapply(loaded.package.names, function(yy) {
               require(yy , character.only=TRUE)})
       })
       
       ## Run the lapply in parallel 
       return( parLapply( cl, ...) )
    }, finally = {        
       ## Stop the cluster
       stopCluster(cl)
    })
}

## Warn the user if they are using Windows
if( Sys.info()[['sysname']] == 'Windows' ){
    message(paste(
      "\n", 
      "   *** Microsoft Windows detected ***\n",
      "   \n",
      "   For technical reasons, the MS Windows version of mclapply()\n",
      "   is implemented as a serial function instead of a parallel\n",
      "   function.",
      "   \n\n",
      "   As a quick hack, we replace this serial version of mclapply()\n",
      "   with a wrapper to parLapply() for this R session. Please see\n\n",
      "     http://www.stat.cmu.edu/~nmv/2014/07/14/implementing-mclapply-on-windows \n\n",
      "   for details.\n\n"))
}

## If the OS is Windows, set mclapply to the
## the hackish version. Otherwise, leave the
## definition alone. 
mclapply <- switch( Sys.info()[['sysname']],
   Windows = {mclapply.hack}, 
   Linux   = {mclapply},
   Darwin  = {mclapply})

## end mclapply.hack.R

I posted the script at http://www.stat.cmu.edu/~nmv/setup/mclapply.hack.R. You can use it with

source('http://www.stat.cmu.edu/~nmv/setup/mclapply.hack.R')

as shown in the beginning of the post.

I would be grateful for any comments or suggestions for improving it. If there is sufficient interest, I can wrap it into a simple R package.

11 thoughts on “Implementing mclapply() on Windows: a primer on embarrassingly parallel computation on multicore systems with R

  1. ajdamico

    hi nathan, thank you for making this code so clean and easy to use. i tried hacking mclapply the same way (see http://stackoverflow.com/questions/24737166/is-it-possible-to-get-the-r-survey-packages-svyby-function-multicore-paramet ) but failed. i'm trying to come up with a speed test using your mclapply.hack() that doesn't involve Sys.sleep(). could you provide an example where the processing is actually faster? most of my results look like this, with mclapply.hack losing to lapply--

    > # load the windows-specific mclapply hack
    > source('http://www.stat.cmu.edu/~nmv/setup/mclapply.hack.R')
    >
    > system.time( lapply( rep( 50000000 , 6 ) , runif ) )
    user system elapsed
    11.95 0.27 12.21
    >
    > system.time( mclapply( rep( 50000000 , 6 ) , runif ) )
    user system elapsed
    4.15 5.08 21.72

    thanks! i really hope we can get this to work..

    Reply
    1. nmv Post author

      mclapply.hack() will lose to lapply() unless the savings from doing the work in parallel overcome the necessary overhead to do that work in parallel. For your example, random number generation is fast, so the majority of the time is likely taken in transferring the giant uniform vector back to the parent process, not generating the giant vector.

      Here is an example where the computation on each of the parallel workers takes some time:

      source("http://www.stat.cmu.edu/~nmv/setup/mclapply.hack.R")
      
      ## This machine has a four cores.
      detectCores()
      ## [1] 4
      
      ## Run the parallel version. 
      system.time( par.out < - mclapply( (1:4+rep(2500, 4)), function(xx) {
         set.seed(xx)
         test.matrix <- matrix( rnorm(xx^2), nrow=xx )
         pd.matrix   <- t(test.matrix) %*% test.matrix
         return( chol(pd.matrix) )
      }) )
      ##    user  system elapsed 
      ##    1.16    1.41   40.06 
      
      ## Run the serial version. 
      system.time( serial.out <- lapply( (1:4+rep(2500, 4)), function(xx) {
         set.seed(xx)
         test.matrix <- matrix( rnorm(xx^2), nrow=xx )
         pd.matrix   <- t(test.matrix) %*% test.matrix
         return( chol(pd.matrix) )
      }) )
      ##    user  system elapsed 
      ##   75.80    0.25   76.13 
      ##
      ## ... parallel version is about 30 seconds faster. 
      
      ## Did we get the same answer? 
      all.equal( serial.out, par.out )
      ## [1] TRUE
      
      ## Note that if we run the parallel version again, it will take 
      ## longer because par.out and serial.out are being copied to 
      ## all of the clusters... even though the function doesn't use them. 
      
      ## ... How big are they? 
      
         print(object.size(serial.out), units='Mb')
         ## 191.1 Mb
      
         print(object.size(par.out), units='Mb')
         ## 191.1 Mb
      
      ## ... How much longer does it take? 
      system.time( par.second.out <- mclapply( (1:4+rep(2500, 4)), function(xx) {
         set.seed(xx)
         test.matrix <- matrix( rnorm(xx^2), nrow=xx )
         pd.matrix   <- t(test.matrix) %*% test.matrix
         return( chol(pd.matrix) )
      }) )
      ##    user  system elapsed 
      ##    3.13   18.84   59.21 
      
      ## ... Therefore about 20 seconds was lost to needlessly transferring
      ##     the previous R objects. 
      

      Hope this helps.

      Reply
      1. ajdamico

        i think i understand. re-summarizing to make sure: if the object that needs to be transferred to mclapply.hack() is large and the processing is quick, parallelizing on windows isn't worth it. speed improvements are only feasible when it's a small object that gets transferred to each child process - and lots of processing needs to happen on that relatively small object. is that right?

        so that's why svyby's multicore=TRUE will not benefit from this on windows, because the survey design object is large and needs to be transferred to each of the child processess, eliminating any gains. http://stackoverflow.com/questions/24737166/is-it-possible-to-get-the-r-survey-packages-svyby-function-multicore-paramet/24737167#24737167

        thank you!!!

        Reply
        1. nmv Post author

          Close. The object passed to the child workers must be small AND the object passed from the child workers back to the parent must be small. Your original example had a small object passed to the workers rep( 50000000 , 6 ), but it had a 50,000,000 element vector being passed back. That's why it was slow.

          For example, if we do something like sum the 50,000,000 element vector on the workers all we are passing back is a single number. In this case mclapply.hack() wins.

           
          source("http://www.stat.cmu.edu/~nmv/setup/mclapply.hack.R")
          
          system.time( lapply( rep( 50000000 , 6 ) , function(xx) {
           sum(runif(xx))
          }))
          ##    user  system elapsed 
          ##   10.75    0.64   11.43 
          
          system.time( mclapply( rep( 50000000 , 6 ) , function(xx) {
           sum(runif(xx))
          }))
          ##    user  system elapsed 
          ##    0.04    0.03    5.14 
          

          As to your question about svyby, go ahead and give it a shot. I don't know enough (really anything!) about that package to know in which direction it will go.

          Reply
  2. Carl Witthoft

    Wrappers which check the type of operating system are a common fix for this sort of thing, so you don't really need to call it a "hack" :-) . I would, however, suggest you reduce or eliminate your massive warning message - it'll just annoy users. At the very least, put in a "verbose=FALSE" argument to your function so people can skip the message. You might add a list element to the output which provides the OS version so anyone examining the results of the calculation can tell what underlying parallel-processing tool was called.

    Reply
    1. nmv Post author

      Are you running the latest version of R? Could you (i) recreate the error, and then (ii) paste the output of sessionInfo()?

      Reply
      1. Yong Cha

        Yes, I run the latest version of R (3.1.1) and RStudio (Version 0.98.1056)

        I got a this messages

        (i) When I recreate the error

        Error in serialize(data, node$con) : error writing to connection
        In addition: Warning messages:
        1: closing unused connection 10 (<-WIN-CJ4G1VCP036:11205)
        2: closing unused connection 9 (<-WIN-CJ4G1VCP036:11205)
        3: closing unused connection 8 (<-WIN-CJ4G1VCP036:11205)
        4: closing unused connection 7 (<-WIN-CJ4G1VCP036:11205)
        5: closing unused connection 6 (<-WIN-CJ4G1VCP036:11205)
        6: closing unused connection 5 (<-WIN-CJ4G1VCP036:11205)
        7: closing unused connection 4 (<-WIN-CJ4G1VCP036:11205)
        8: closing unused connection 3 (<-WIN-CJ4G1VCP036:11205)

        (ii) My sessionInfo()

        R version 3.1.1 (2014-07-10)
        Platform: x86_64-w64-mingw32/x64 (64-bit)

        locale:
        [1] LC_COLLATE=Korean_Korea.949 LC_CTYPE=Korean_Korea.949
        [3] LC_MONETARY=Korean_Korea.949 LC_NUMERIC=C
        [5] LC_TIME=Korean_Korea.949

        attached base packages:
        [1] parallel stats graphics grDevices utils datasets methods
        [8] base

        other attached packages:
        [1] mgcv_1.8-3 nlme_3.1-117 doParallel_1.0.8 iterators_1.0.7
        [5] foreach_1.4.2 rpart_4.1-8 ggplot2_1.0.0 plyr_1.8.1
        [9] pbapply_1.1-1

        loaded via a namespace (and not attached):
        [1] codetools_0.2-8 colorspace_1.2-4 digest_0.6.4 grid_3.1.1
        [5] gtable_0.1.2 labeling_0.3 lattice_0.20-29 MASS_7.3-33
        [9] Matrix_1.1-4 munsell_0.4.2 proto_0.3-10 Rcpp_0.11.2
        [13] reshape2_1.4 scales_0.2.4 stringr_0.6.2 tools_3.1.1
        Warning messages:
        1: closing unused connection 10 (<-WIN-CJ4G1VCP036:11205)
        2: closing unused connection 9 (<-WIN-CJ4G1VCP036:11205)
        3: closing unused connection 8 (<-WIN-CJ4G1VCP036:11205)
        4: closing unused connection 7 (<-WIN-CJ4G1VCP036:11205)
        5: closing unused connection 6 (<-WIN-CJ4G1VCP036:11205)
        6: closing unused connection 5 (<-WIN-CJ4G1VCP036:11205)
        7: closing unused connection 4 (<-WIN-CJ4G1VCP036:11205)
        8: closing unused connection 3 (<-WIN-CJ4G1VCP036:11205)

        Reply
  3. Pingback: Post 10: Multicore parallelism in MCMC | Markov Chain Monte Carlo in Item Response Models

Leave a Reply