Title: | R Bindings for 'ZeroMQ' |
---|---|
Description: | Interface to the 'ZeroMQ' lightweight messaging kernel (see <https://zeromq.org/> for more information). |
Authors: | Whit Armstrong [aut], Michael Schubert [ctb], Jeroen Ooms [aut, cre] |
Maintainer: | Jeroen Ooms <[email protected]> |
License: | GPL-3 |
Version: | 0.9.15 |
Built: | 2025-01-15 06:10:46 UTC |
Source: | https://github.com/ropensci/rzmq |
The zmq_bind() function shall create an endpoint for accepting connections and bind it to the socket referenced by the socket argument.
The endpoint argument is a string consisting of two parts as follows: transport ://address. The transport part specifies the underlying transport protocol to use. The meaning of the address part is specific to the underlying transport protocol selected.
The following transports are defined:
inproc local in-process (inter-thread) communication transport, see zmq_inproc(7) ipc local inter-process communication transport, see zmq_ipc(7) tcp unicast transport using TCP, see zmq_tcp(7) pgm, epgm reliable multicast transport using PGM, see zmq_pgm(7) With the exception of ZMQ_PAIR sockets, a single socket may be connected to multiple endpoints using zmq_connect(), while simultaneously accepting incoming connections from multiple endpoints bound to the socket using zmq_bind(). Refer to zmq_socket(3) for a description of the exact semantics involved when connecting or binding a socket to multiple endpoints.
bind.socket(socket, address)
bind.socket(socket, address)
socket |
a zmq socket object. |
address |
a transport as described above. |
TRUE if operation succeeds or FALSE if the operation fails
ZMQ was written by Martin Sustrik <[email protected]> and Martin Lucina <[email protected]>. rzmq was written by Whit Armstrong.
http://www.zeromq.org http://api.zeromq.org http://zguide.zeromq.org/page:all
connect.socket,bind.socket,receive.socket,send.socket,poll.socket
## Not run: library(rzmq) context = init.context() in.socket = init.socket(context,"ZMQ_PULL") bind.socket(in.socket,"tcp://*:5557") out.socket = init.socket(context,"ZMQ_PUSH") bind.socket(out.socket,"tcp://*:5558") ## End(Not run)
## Not run: library(rzmq) context = init.context() in.socket = init.socket(context,"ZMQ_PULL") bind.socket(in.socket,"tcp://*:5557") out.socket = init.socket(context,"ZMQ_PUSH") bind.socket(out.socket,"tcp://*:5558") ## End(Not run)
The zmq_connect() function shall connect the socket referenced by the socket argument to the endpoint specified by the endpoint argument.
The endpoint argument is a string consisting of two parts as follows: transport ://address. The transport part specifies the underlying transport protocol to use. The meaning of the address part is specific to the underlying transport protocol selected.
The following transports are defined:
inproc local in-process (inter-thread) communication transport, see zmq_inproc(7) ipc local inter-process communication transport, see zmq_ipc(7) tcp unicast transport using TCP, see zmq_tcp(7) pgm, epgm reliable multicast transport using PGM, see zmq_pgm(7) With the exception of ZMQ_PAIR sockets, a single socket may be connected to multiple endpoints using zmq_connect(), while simultaneously accepting incoming connections from multiple endpoints bound to the socket using zmq_bind(). Refer to zmq_socket(3) for a description of the exact semantics involved when connecting or binding a socket to multiple endpoints.
connect.socket(socket, address) disconnect.socket(socket, address)
connect.socket(socket, address) disconnect.socket(socket, address)
socket |
a zmq socket object. |
address |
a transport as described above. |
TRUE if operation succeeds or FALSE if the operation fails
ZMQ was written by Martin Sustrik <[email protected]> and Martin Lucina <[email protected]>. rzmq was written by Whit Armstrong.
http://www.zeromq.org http://api.zeromq.org http://zguide.zeromq.org/page:all
connect.socket,bind.socket,receive.socket,send.socket,poll.socket
## Not run: library(rzmq) context = init.context() in.socket = init.socket(context,"ZMQ_PULL") bind.socket(in.socket,"tcp://*:5557") out.socket = init.socket(context,"ZMQ_PUSH") bind.socket(out.socket,"tcp://*:5558") ## End(Not run)
## Not run: library(rzmq) context = init.context() in.socket = init.socket(context,"ZMQ_PULL") bind.socket(in.socket,"tcp://*:5557") out.socket = init.socket(context,"ZMQ_PUSH") bind.socket(out.socket,"tcp://*:5558") ## End(Not run)
initialize zmq context and zmq socket for to be used for further zmq operations.
init.context(threads=1L) init.socket(context, socket.type)
init.context(threads=1L) init.socket(context, socket.type)
threads |
number of threads for the context to use |
context |
a zmq context object. |
socket.type |
The ZMQ socket type requested e.g. ZMQ_REQ,ZMQ_REP,ZMQ_PULL,ZMQ_PUSH, etc. |
init.context
returns a zmq context object. init.socket
returns a zmq socket object.
ZMQ was written by Martin Sustrik <[email protected]> and Martin Lucina <[email protected]>. rzmq was written by Whit Armstrong.
http://www.zeromq.org http://api.zeromq.org http://zguide.zeromq.org/page:all
connect.socket,bind.socket,receive.socket,send.socket,poll.socket
## Not run: library(rzmq) context = init.context() in.socket = init.socket(context,"ZMQ_PULL") ## End(Not run)
## Not run: library(rzmq) context = init.context() in.socket = init.socket(context,"ZMQ_PULL") ## End(Not run)
Create a ZeroMQ message object that can be sent multiple times
init.message(data, serialize=TRUE, xdr=.Platform$endian=="big")
init.message(data, serialize=TRUE, xdr=.Platform$endian=="big")
data |
the R object to be sent |
serialize |
whether to call serialize before sending the data |
xdr |
passed directly to serialize command if serialize is requested |
a ZeroMQ message object as external pointer
ZMQ was written by Martin Sustrik <[email protected]> and Martin Lucina <[email protected]>. rzmq was written by Whit Armstrong.
http://www.zeromq.org http://api.zeromq.org http://zguide.zeromq.org/page:all
## Not run: ## remote execution server in rzmq library(rzmq) data = list(x=5) msg = init.message(data) ## End(Not run)
## Not run: ## remote execution server in rzmq library(rzmq) data = list(x=5) msg = init.message(data) ## End(Not run)
The zmq_poll() function shall poll a list of a sockets for either read, write, or error conditions subject to a millisecond resolution timeout.
poll.socket(sockets, events, timeout=0L)
poll.socket(sockets, events, timeout=0L)
sockets |
a list of zmq socket objects. |
events |
a list of character vectors containing one or more events in {read, write, error}. The first element in the list corresponds to the first zmq socket, and so on... |
timeout |
the numbers of seconds to wait for events. Fractional seconds are supported. ZeroMQ guarantees at most millisecond resolution. A timeout of -1L blocks until an event occurs; a timeout of 0L is non-blocking. |
A list of pairlists corresponding to the polled zmq sockets. Each list has one of more tags from {read, write, error} with logical values indicating the results of the poll operation.
ZMQ was written by Martin Sustrik <[email protected]> and Martin Lucina <[email protected]>. rzmq was written by Whit Armstrong.
http://www.zeromq.org http://api.zeromq.org http://zguide.zeromq.org/page:all
connect.socket,bind.socket,receive.socket,send.socket,poll.socket
## Not run: library(rzmq) # Create a set of REP-REQ sockets that # have a Send, Receive, Send, Receive, ... # pattern. context = init.context() in.socket = init.socket(context,"ZMQ_REP") bind.socket(in.socket,"tcp://*:5557") out.socket = init.socket(context,"ZMQ_REQ") connect.socket(out.socket,"tcp://*:5557") # Poll the REP and REQ sockets for all events. events <- poll.socket(list(in.socket, out.socket), list(c("read", "write", "error"), c("read", "write", "error")), timeout=0L) # The REQ socket is writable without blocking. paste("Is upstream REP socket readable without blocking?", events[[1]]$read) paste("Is upstream REP socket writable without blocking?", events[[1]]$write) paste("Is downstream REQ socket readable without blocking?", events[[2]]$read) paste("Is downstream REQ socket writable without blocking?", events[[2]]$write) # Send a message to the REP socket from the REQ socket. The # REQ socket must respond before the REP socket can send # another message. send.socket(out.socket, "Hello World") events <- poll.socket(list(in.socket, out.socket), list(c("read", "write", "error"), c("read", "write", "error")), timeout=0L) # The incoming message is readable on the REP socket. paste("Is upstream REP socket readable without blocking?", events[[1]]$read) paste("Is upstream REP socket writable without blocking?", events[[1]]$write) paste("Is downstream REQ socket readable without blocking?", events[[2]]$read) paste("Is downstream REQ socket writable without blocking?", events[[2]]$write) receive.socket(in.socket) events <- poll.socket(list(in.socket, out.socket), list(c("read", "write", "error"), c("read", "write", "error")), timeout=0L) # The REQ socket is waiting for a response from the REP socket. paste("Is upstream REP socket readable without blocking?", events[[1]]$read) paste("Is upstream REP socket writable without blocking?", events[[1]]$write) paste("Is downstream REQ socket readable without blocking?", events[[2]]$read) paste("Is downstream REQ socket writable without blocking?", events[[2]]$write) send.socket(in.socket, "Greetings") events <- poll.socket(list(in.socket, out.socket), list(c("read", "write", "error"), c("read", "write", "error")), timeout=0L) # The REP response is waiting to be read on the REQ socket. paste("Is upstream REP socket readable without blocking?", events[[1]]$read) paste("Is upstream REP socket writable without blocking?", events[[1]]$write) paste("Is downstream REQ socket readable without blocking?", events[[2]]$read) paste("Is downstream REQ socket writable without blocking?", events[[2]]$write) # Complete the REP-REQ transaction cycle by reading # the REP response. receive.socket(out.socket) ## End(Not run)
## Not run: library(rzmq) # Create a set of REP-REQ sockets that # have a Send, Receive, Send, Receive, ... # pattern. context = init.context() in.socket = init.socket(context,"ZMQ_REP") bind.socket(in.socket,"tcp://*:5557") out.socket = init.socket(context,"ZMQ_REQ") connect.socket(out.socket,"tcp://*:5557") # Poll the REP and REQ sockets for all events. events <- poll.socket(list(in.socket, out.socket), list(c("read", "write", "error"), c("read", "write", "error")), timeout=0L) # The REQ socket is writable without blocking. paste("Is upstream REP socket readable without blocking?", events[[1]]$read) paste("Is upstream REP socket writable without blocking?", events[[1]]$write) paste("Is downstream REQ socket readable without blocking?", events[[2]]$read) paste("Is downstream REQ socket writable without blocking?", events[[2]]$write) # Send a message to the REP socket from the REQ socket. The # REQ socket must respond before the REP socket can send # another message. send.socket(out.socket, "Hello World") events <- poll.socket(list(in.socket, out.socket), list(c("read", "write", "error"), c("read", "write", "error")), timeout=0L) # The incoming message is readable on the REP socket. paste("Is upstream REP socket readable without blocking?", events[[1]]$read) paste("Is upstream REP socket writable without blocking?", events[[1]]$write) paste("Is downstream REQ socket readable without blocking?", events[[2]]$read) paste("Is downstream REQ socket writable without blocking?", events[[2]]$write) receive.socket(in.socket) events <- poll.socket(list(in.socket, out.socket), list(c("read", "write", "error"), c("read", "write", "error")), timeout=0L) # The REQ socket is waiting for a response from the REP socket. paste("Is upstream REP socket readable without blocking?", events[[1]]$read) paste("Is upstream REP socket writable without blocking?", events[[1]]$write) paste("Is downstream REQ socket readable without blocking?", events[[2]]$read) paste("Is downstream REQ socket writable without blocking?", events[[2]]$write) send.socket(in.socket, "Greetings") events <- poll.socket(list(in.socket, out.socket), list(c("read", "write", "error"), c("read", "write", "error")), timeout=0L) # The REP response is waiting to be read on the REQ socket. paste("Is upstream REP socket readable without blocking?", events[[1]]$read) paste("Is upstream REP socket writable without blocking?", events[[1]]$write) paste("Is downstream REQ socket readable without blocking?", events[[2]]$read) paste("Is downstream REQ socket writable without blocking?", events[[2]]$write) # Complete the REP-REQ transaction cycle by reading # the REP response. receive.socket(out.socket) ## End(Not run)
Returns a list of raw vectors for the parts of a multipart message.
receive.multipart(socket)
receive.multipart(socket)
socket |
The ZMQ socket from which to receive data |
The zmq_recv() function shall receive a message from the socket referenced by the socket argument. If there are no messages available on the specified socket, by default the function shall block until the request can be satisfied. A non-blocking receive can be obtained by setting dont.wait to TRUE If there are no messages available on the specified socket, the receive.socket() call will return NULL immediately.
receive.socket(socket, unserialize=TRUE, dont.wait=FALSE) receive.null.msg(socket) receive.string(socket) receive.int(socket) receive.double(socket)
receive.socket(socket, unserialize=TRUE, dont.wait=FALSE) receive.null.msg(socket) receive.string(socket) receive.int(socket) receive.double(socket)
socket |
a zmq socket object |
unserialize |
whether to call unserialize on the received data |
dont.wait |
defaults to false, for blocking receive. Set to TRUE for non-blocking receive. |
the value sent from the remote server or NULL on failure. If dont.wait was TRUE and a message was not immediately available for receipt, NULL is returned and get.zmq.errno() is set to 11 or get.zmq.strerror() is set to EAGAIN.
ZMQ was written by Martin Sustrik <[email protected]> and Martin Lucina <[email protected]>. rzmq was written by Whit Armstrong.
http://www.zeromq.org http://api.zeromq.org http://zguide.zeromq.org/page:all
connect.socket,bind.socket,receive.socket,send.socket,poll.socket
## Not run: library(rzmq) remote.exec <- function(out.socket,in.socket,fun,...) { send.socket(out.socket,data=list(fun=fun,args=list(...))) receive.socket(in.socket) } context = init.context() out.socket = init.socket(context,"ZMQ_PUSH") bind.socket(out.socket,"tcp://*:5557") in.socket = init.socket(context,"ZMQ_PULL") bind.socket(in.socket,"tcp://*:5558") myfun <- function(x) { sum(abs(x)) } remote.exec(out.socket,in.socket,myfun,rnorm(1e3)) ## End(Not run)
## Not run: library(rzmq) remote.exec <- function(out.socket,in.socket,fun,...) { send.socket(out.socket,data=list(fun=fun,args=list(...))) receive.socket(in.socket) } context = init.context() out.socket = init.socket(context,"ZMQ_PUSH") bind.socket(out.socket,"tcp://*:5557") in.socket = init.socket(context,"ZMQ_PULL") bind.socket(in.socket,"tcp://*:5558") myfun <- function(x) { sum(abs(x)) } remote.exec(out.socket,in.socket,myfun,rnorm(1e3)) ## End(Not run)
Queue a list of raw vectors to be sent as a series of ZMQ message parts. Each part before the last will be sent with the SNDMORE flag.
send.multipart(socket, parts)
send.multipart(socket, parts)
socket |
The ZMQ socket on which to send data |
parts |
A list of raw vectors; each component will be sent as one part of the message, in the order of the list |
Queue the message referenced by the msg argument to be sent to the socket referenced by the socket argument.
A successful invocation of send.socket does not indicate that the message has been transmitted to the network, only that it has been queued on the socket and ZMQ has assumed responsibility for the message.
send.socket(socket, data, send.more=FALSE, serialize=TRUE, xdr=.Platform$endian=="big") send.null.msg(socket, send.more=FALSE) send.raw.string(socket,data,send.more=FALSE)
send.socket(socket, data, send.more=FALSE, serialize=TRUE, xdr=.Platform$endian=="big") send.null.msg(socket, send.more=FALSE) send.raw.string(socket,data,send.more=FALSE)
socket |
a zmq socket object |
data |
the R object to be sent |
send.more |
whether this message has more frames to be sent |
serialize |
whether to call serialize before sending the data |
xdr |
passed directly to serialize command if serialize is requested |
a boolean indicating success or failure of the operation.
ZMQ was written by Martin Sustrik <[email protected]> and Martin Lucina <[email protected]>. rzmq was written by Whit Armstrong.
http://www.zeromq.org http://api.zeromq.org http://zguide.zeromq.org/page:all
connect.socket,bind.socket,receive.socket,send.socket,poll.socket
## Not run: ## remote execution server in rzmq library(rzmq) context = init.context() in.socket = init.socket(context,"ZMQ_PULL") bind.socket(in.socket,"tcp://*:5557") out.socket = init.socket(context,"ZMQ_PUSH") bind.socket(out.socket,"tcp://*:5558") while(1) { msg = receive.socket(in.socket) fun <- msg$fun args <- msg$args print(args) ans <- do.call(fun,args) send.socket(out.socket,ans) } ## End(Not run)
## Not run: ## remote execution server in rzmq library(rzmq) context = init.context() in.socket = init.socket(context,"ZMQ_PULL") bind.socket(in.socket,"tcp://*:5557") out.socket = init.socket(context,"ZMQ_PUSH") bind.socket(out.socket,"tcp://*:5558") while(1) { msg = receive.socket(in.socket) fun <- msg$fun args <- msg$args print(args) ans <- do.call(fun,args) send.socket(out.socket,ans) } ## End(Not run)
The zmq_setsockopt() function shall set the option specified by the option_name argument to the value pointed to by the option_value argument for the ZMQ socket pointed to by the socket argument.
set.hwm(socket, option.value) set.swap(socket, option.value) set.affinity(socket, option.value) set.identity(socket, option.value) subscribe(socket, option.value) unsubscribe(socket, option.value) set.rate(socket, option.value) set.recovery.ivl(socket, option.value) set.recovery.ivl.msec(socket, option.value) set.mcast.loop(socket, option.value) set.sndbuf(socket, option.value) set.rcvbuf(socket, option.value) set.linger(socket, option.value) set.reconnect.ivl(socket, option.value) set.zmq.backlog(socket, option.value) set.reconnect.ivl.max(socket, option.value) get.rcvmore(socket) get.last.endpoint(socket) get.send.timeout(socket) set.send.timeout(socket, option.value) get.rcv.timeout(socket) set.rcv.timeout(socket, option.value)
set.hwm(socket, option.value) set.swap(socket, option.value) set.affinity(socket, option.value) set.identity(socket, option.value) subscribe(socket, option.value) unsubscribe(socket, option.value) set.rate(socket, option.value) set.recovery.ivl(socket, option.value) set.recovery.ivl.msec(socket, option.value) set.mcast.loop(socket, option.value) set.sndbuf(socket, option.value) set.rcvbuf(socket, option.value) set.linger(socket, option.value) set.reconnect.ivl(socket, option.value) set.zmq.backlog(socket, option.value) set.reconnect.ivl.max(socket, option.value) get.rcvmore(socket) get.last.endpoint(socket) get.send.timeout(socket) set.send.timeout(socket, option.value) get.rcv.timeout(socket) set.rcv.timeout(socket, option.value)
socket |
a zmq socket object |
option.value |
the new option value to bet set |
a boolean indicating success or failure of the operation or in the case of getsocketoptions, the value of the requsted option.
ZMQ was written by Martin Sustrik <[email protected]> and Martin Lucina <[email protected]>. rzmq was written by Whit Armstrong.
http://www.zeromq.org http://api.zeromq.org http://zguide.zeromq.org/page:all
connect.socket,bind.socket,receive.socket,send.socket,poll.socket
## Not run: library(rzmq) context = init.context() socket = init.socket(context,"ZMQ_REQ") set.hwm(socket, 1L) set.swap(socket, 100L) set.identity(socket, "big.ass.socket") ## End(Not run)
## Not run: library(rzmq) context = init.context() socket = init.socket(context,"ZMQ_REQ") set.hwm(socket, 1L) set.swap(socket, 100L) set.identity(socket, "big.ass.socket") ## End(Not run)
return the error number or error description after a zmq call
zmq.errno() zmq.strerror()
zmq.errno() zmq.strerror()
an integer for zmq.errno or a string for zmq.strerror
ZMQ was written by Martin Sustrik <[email protected]> and Martin Lucina <[email protected]>. rzmq was written by Whit Armstrong.
http://www.zeromq.org http://api.zeromq.org http://zguide.zeromq.org/page:all
connect.socket,bind.socket,receive.socket,send.socket
## Not run: library(rzmq) zmq.errno() zmq.strerror() ## End(Not run)
## Not run: library(rzmq) zmq.errno() zmq.strerror() ## End(Not run)
return the version string of the system zmq library
zmq.version()
zmq.version()
a string of the following format: major.minor.patch
ZMQ was written by Martin Sustrik <[email protected]> and Martin Lucina <[email protected]>. rzmq was written by Whit Armstrong.
http://www.zeromq.org http://api.zeromq.org http://zguide.zeromq.org/page:all
connect.socket,bind.socket,receive.socket,send.socket
## Not run: library(rzmq) zmq.version() ## End(Not run)
## Not run: library(rzmq) zmq.version() ## End(Not run)