Title: | 'Foreach' Parallel Adapter Using the 'Redis' Database |
---|---|
Description: | Create and manage fault-tolerant task queues for the 'foreach' package using the 'Redis' key/value database. |
Authors: | B. W. Lewis <[email protected]> |
Maintainer: | B. W. Lewis <[email protected]> |
License: | GPL-2 |
Version: | 3.0.0 |
Built: | 2024-11-09 05:23:18 UTC |
Source: | https://github.com/bwlewis/doredis |
The doRedis package imlpements an elastic parallel back end for foreach using the Redis key/value database.
registerDoRedis
, startLocalWorkers
internal function called by foreach
.doRedis(obj, expr, envir, data)
.doRedis(obj, expr, envir, data)
obj |
a foreach object |
expr |
the expression to evaluate |
envir |
the expression environment |
data |
a list of parameters from registerDoRedis |
the foreach result
List doRedis jobs
jobs(queue = "*")
jobs(queue = "*")
queue |
List jobs for the specified queue, or set to "*" to list jobs for all queues |
a data frame listing jobs by row with variables queue, id, user, host and time (submitted).
Use to help debug remote doRedis workers.
logger(msg)
logger(msg)
msg |
a character message to print to the standard error stream |
The character message that was printed, decorated with time and system info.
This function is normally not needed, use the redux package functions instead, or simply registerDoRedis.
redisConnect(host = "localhost", port = 6379L, password, ...)
redisConnect(host = "localhost", port = 6379L, password, ...)
host |
character Redis host name |
port |
integer Redis port number |
password |
optional character Redis password |
... |
optional additional arguments for compatability with old rredis, ignored |
registerDoRedis
, redisWorker
, startLocalWorkers
A convenience function to delete a Redis key
redisDelete(key)
redisDelete(key)
key |
(character or raw) Redis key name to delete |
Redis status message
This function assumes the value associated with the Redis key is a serialized (binary) R value and unserializes it on return.
redisGet(key)
redisGet(key)
key |
(character or raw) Redis key name |
Unserialized R value.
This function serializes the val argument.
redisSet(key, val)
redisSet(key, val)
key |
(character or raw) Redis key name |
val |
R value to set |
Redis status message
The redisWorker function enrolls the current R session in one or
more doRedis worker pools specified by the work queue names. The worker
loop takes over the R session until the work queue(s) are deleted, after
which the worker loop exits after the linger
period, or until
the worker has processed iter
tasks.
Running workers also terminate after network activity with Redis remains
inactive for longer than the timeout
period set in the redisConnect
function. That value defaults internally to 30 seconds in redisWorker
.
You can increase it by including a timeout=n argument value.
redisWorker( queue, host = "localhost", port = 6379, iter = Inf, linger = 30, log = stderr(), connected = FALSE, password = NULL, loglevel = 0, timelimit = 0, ... )
redisWorker( queue, host = "localhost", port = 6379, iter = Inf, linger = 30, log = stderr(), connected = FALSE, password = NULL, loglevel = 0, timelimit = 0, ... )
queue |
work queue name or a vector of queue names |
host |
Redis database host name or IP address |
port |
Redis database port number |
iter |
maximum number of tasks to process before exiting the worker loop |
linger |
timeout in seconds after which the work queue is deleted that the worker terminates |
log |
print messages to the specified file connection |
connected |
set to |
password |
optional Redis database password |
loglevel |
set to > 0 to increase verbosity in the log |
timelimit |
set to > 0 to specify a task time limit in seconds, after which worker processes are killed; beware that setting this value > 0 will terminate any R worker process if their task takes too long. |
... |
Optional additional parameters passed to |
NULL is invisibly returned.
The worker connection to Redis uses a TCP timeout value of 30 seconds by
default. That means that the worker will exit after about 30 seconds of inactivity.
If you want the worker to remain active for longer periods, set the timeout
option to a larger value.
Use the linger
option to instruct the worker to linger for up to the indicated
number of seconds after the listening work queue has been removed. After at most that
interval, the worker will exit after removing the queue.
registerDoRedis
, startLocalWorkers
The doRedis package imlpements a simple but flexible parallel back end for foreach that uses Redis for inter-process communication. The work queue name specifies the base name of a small set of Redis keys that the coordinator and worker processes use to exchange data.
registerDoRedis( queue, host = "localhost", port = 6379, password, ftinterval = 30, chunkSize = 1, progress = FALSE, ... )
registerDoRedis( queue, host = "localhost", port = 6379, password, ftinterval = 30, chunkSize = 1, progress = FALSE, ... )
queue |
A work queue name |
host |
The Redis server host name or IP address |
port |
The Redis server port number |
password |
An optional Redis database password |
ftinterval |
Default fault tolerance interval in seconds |
chunkSize |
Default iteration granularity, see |
progress |
(logical) Show progress bar for computations? |
... |
Optional arguments passed to |
Back-end worker R processes advertise their availablility for work
with the redisWorker
function.
The doRedis parallel back end tolerates faults among the worker processes and automatically resubmits failed tasks. It is also portable and supports heterogeneous sets of workers, even across operative systems. The back end supports dynamic pools of worker processes. New workers may be added to work queues at any time and can be used by running foreach computations.
NULL
is invisibly returned; this function is called for side effect of registering a foreach backend.
All doRedis functions require access to a Redis database server (not included with this package).
Worker processes default to same random number generator as the coordinator process by default with seeds set per iteration rather than per worker to yield reproducible output independent of the number of worker processes. The L'Ecuyer-CMRG RNG available from the parallel package is recommended when high-quality distributed pseudorandom numbers are needed. See package vignette for more details and additional options.
Avoid using fork-based parallel functions within doRedis expressions.
Use of mclapply
and similar functions in the body of a doRedis foreach
loop can result in worker faults.
foreach
, doRedis-package
, setChunkSize
, removeQueue
# Only run if a Redis server is running if (redux::redis_available()) { ## The example assumes that a Redis server is running on the local host ## and standard port. # 1. Start a single local R worker process startLocalWorkers(n=1, queue="jobs", linger=1) # 2. Run a simple sampling approximation of pi: registerDoRedis("jobs") pie = foreach(j=1:10, .combine=sum, .multicombine=TRUE) %dopar% 4 * sum((runif(1000000) ^ 2 + runif(1000000) ^ 2) < 1) / 10000000 removeQueue("jobs") print(pie) # Note that removing the work queue automatically terminates worker processes. }
# Only run if a Redis server is running if (redux::redis_available()) { ## The example assumes that a Redis server is running on the local host ## and standard port. # 1. Start a single local R worker process startLocalWorkers(n=1, queue="jobs", linger=1) # 2. Run a simple sampling approximation of pi: registerDoRedis("jobs") pie = foreach(j=1:10, .combine=sum, .multicombine=TRUE) %dopar% 4 * sum((runif(1000000) ^ 2 + runif(1000000) ^ 2) < 1) / 10000000 removeQueue("jobs") print(pie) # Note that removing the work queue automatically terminates worker processes. }
Remove Redis keys associated with one or more doRedis jobs
removeJob(job)
removeJob(job)
job |
Either a named character vector with "queue" and "id" entries corresponding to a doRedis
job queue and job id, or a list with equal-length "queue" and "id" entries, or a data frame with
"queue" and "id" entries, for example as returned by |
NULL
is invisibly returned; this function is used for its side effect–in particular, removing all Redis keys associated with the specified job.
Removing a doRedis queue cleans up associated keys in the Redis database and signals to workers listening on the queue to terminate. Workers terminate after their timeout period after their work queue is deleted.
removeQueue(queue)
removeQueue(queue)
queue |
the doRedis queue name |
NULL
is invisibly returned; this function is called for the side effect of removing
Redis keys associated with the specified queue.
Workers listening for work on more than one queue will only
terminate after all their queues have been deleted. See registerDoRedis
for an example.
A job is the collection of all tasks in a foreach loop.
A task is a collection of loop iterations of at most size chunkSize
.
R workers are assigned work by task in blocks of at most
chunkSize
loop iterations per task.
The default value is one iteration per task.
Setting the default chunk size larger for shorter-running jobs can
substantially improve performance. Setting this value too high can
negatively impact load-balancing across workers, however.
setChunkSize(value = 1)
setChunkSize(value = 1)
value |
positive integer chunk size setting |
value
is invisibly returned; this value is called for its side effect.
# Only run if a Redis server is running if (redux::redis_available()) { # Start a single local R worker process startLocalWorkers(n=1, queue="jobs", linger=1) # Register the work queue with the coordinator R process registerDoRedis("jobs") # Compare verbose task submission output from... setChunkSize(1) foreach(j=1:4, .combine=c, .verbose=TRUE) %dopar% j # with the verbose task submission output from: setChunkSize(2) foreach(j=1:4, .combine=c, .verbose=TRUE) %dopar% j # Clean up removeQueue("jobs") }
# Only run if a Redis server is running if (redux::redis_available()) { # Start a single local R worker process startLocalWorkers(n=1, queue="jobs", linger=1) # Register the work queue with the coordinator R process registerDoRedis("jobs") # Compare verbose task submission output from... setChunkSize(1) foreach(j=1:4, .combine=c, .verbose=TRUE) %dopar% j # with the verbose task submission output from: setChunkSize(2) foreach(j=1:4, .combine=c, .verbose=TRUE) %dopar% j # Clean up removeQueue("jobs") }
The setExport function lets users manually declare symbol names of corresponding objects that should be exported to workers.
setExport(names = c())
setExport(names = c())
names |
A character vector of symbol names to export. |
The foreach
function includes a similar .export
parameter.
We provide this supplemental export option for users without direct access
to the foreach
function, for example, when foreach
is used
inside another package.
The value of names
is invisibly returned (this function is used ofr its side effect).
## Not run: registerDoRedis("work queue") startLocalWorkers(n=1, queue="work queue", linger=1) f <- function() pi (foreach(1) %dopar% tryCatch(eval(call("f")), error = as.character)) # Returns the error converted to a message: # Error in eval(call("f")) : task 1 failed - could not find function "f" # Manually export the symbol f: setExport("f") (foreach(1) %dopar% eval(call("f"))) # Now f is found. removeQueue("work queue") ## End(Not run)
## Not run: registerDoRedis("work queue") startLocalWorkers(n=1, queue="work queue", linger=1) f <- function() pi (foreach(1) %dopar% tryCatch(eval(call("f")), error = as.character)) # Returns the error converted to a message: # Error in eval(call("f")) : task 1 failed - could not find function "f" # Manually export the symbol f: setExport("f") (foreach(1) %dopar% eval(call("f"))) # Now f is found. removeQueue("work queue") ## End(Not run)
Failed tasks are automatically re-submitted to the work queue.
The setFtinterval
sets an upper bound on how frequently
the system checks for failure. See the package vignette for
discussion and examples.
setFtinterval(value = 30)
setFtinterval(value = 30)
value |
positive integer number of seconds |
value
is invisibly returned (this function is used for its side effect).
The setPackages
function lets users manually declare packages
that R worker processes need to load before running their tasks.
setPackages(packages = c())
setPackages(packages = c())
packages |
A character vector of package names. |
The foreach
function includes a similar .packages
parameter.
Defines a way to set the foreach .packages
option for users without direct access
to the foreach
function, for example, when foreach
is used
inside another package.
The value of packages
is invisibly returned (this function is used for its side effect).
Progress bar
setProgress(value = FALSE)
setProgress(value = FALSE)
value |
if |
value
is invisibly returned (this function is used for its side effect).
Use startLocalWorkers
to start one or more doRedis R worker processes
in the background. The worker processes are started on the local system using
the redisWorker
function.
startLocalWorkers( n, queue, host = "localhost", port = 6379, iter = Inf, linger = 30, log = stdout(), Rbin = paste(R.home(component = "bin"), "R", sep = "/"), password, ... )
startLocalWorkers( n, queue, host = "localhost", port = 6379, iter = Inf, linger = 30, log = stdout(), Rbin = paste(R.home(component = "bin"), "R", sep = "/"), password, ... )
n |
number of workers to start |
queue |
work queue name |
host |
Redis database host name or IP address |
port |
Redis database port number |
iter |
maximum number of tasks to process before exiting the worker loop |
linger |
timeout in seconds after which the work queue is deleted that the worker terminates |
log |
print messages to the specified file connection |
Rbin |
full path to the command-line R program |
password |
optional Redis database password |
... |
optional additional parameters passed to the |
Running workers self-terminate after a linger
period if their work queues are deleted with the
removeQueue
function, or when network activity with Redis remains
inactive for longer than the timeout
period set in the redisConnect
function. That value defaults internally to 3600 (one hour) in startLocalWorkers
.
You can increase it by including a timeout=n argument value.
NULL is invisibly returned.
# Only run if a Redis server is running if (redux::redis_available()) { ## The example assumes that a Redis server is running on the local host ## and standard port. # Start a single local R worker process startLocalWorkers(n=1, queue="R jobs", linger=1) # Run a simple sampling approximation of pi: registerDoRedis("R jobs") print(foreach(j=1:10, .combine=sum, .multicombine=TRUE) %dopar% 4 * sum((runif(1000000) ^ 2 + runif(1000000) ^ 2) < 1) / 10000000) # Clean up removeQueue("R jobs") }
# Only run if a Redis server is running if (redux::redis_available()) { ## The example assumes that a Redis server is running on the local host ## and standard port. # Start a single local R worker process startLocalWorkers(n=1, queue="R jobs", linger=1) # Run a simple sampling approximation of pi: registerDoRedis("R jobs") print(foreach(j=1:10, .combine=sum, .multicombine=TRUE) %dopar% 4 * sum((runif(1000000) ^ 2 + runif(1000000) ^ 2) < 1) / 10000000) # Clean up removeQueue("R jobs") }
List running doRedis tasks
tasks(queue = "*", id = "*")
tasks(queue = "*", id = "*")
queue |
List jobs for the specified queue, or set to "*" to list jobs for all queues |
id |
List tasks for the specified job id, or set to "*" to list tasks for all job ids |
a data frame listing jobs by row with variables queue, id, user, coordinator, time, iter, host, pid (see Note)
The returned values indicate
queue
the doRedis queue name
id
the doRedis job id
user
the user running the job
coordinator
the host name or I.P. address where the job was submitted (and the coordinator R process runs)
time
system time on the worker node when the task was started
iter
the loop iterations being run by the task
host
the host name or I.P. address where the task is running
pid
the process ID of the R worker running the task on host
Tasks are listed until a key associated with them expires in Redis. Thus running tasks are not explicitly removed from the task list immediately when they terminate, but may linger on the list for a short while after (a few seconds).