zeromq3-haskell-0.4: Bindings to ZeroMQ 3.x

Portabilitynon-portable
Stabilityexperimental
MaintainerToralf Wittner <tw@dtex.org>
Safe HaskellNone

System.ZMQ3

Contents

Description

0MQ haskell binding. The API closely follows the C-API of 0MQ with the main difference that sockets are typed. The documentation of the individual socket types is copied from 0MQ's man pages authored by Martin Sustrik. For details please refer to http://api.zeromq.org

Differences to zeromq-haskell 2.x

Socket Types

Socket Options

Instead of a single SocketOption data-type, getter and setter functions are provided, e.g. one would write: affinity sock instead of getOption sock (Affinity 0)

Restrictions

Many option setters use a Restriction to further constrain the range of possible values of their integral types. For example the maximum message size can be given as -1, which means no limit or by greater values, which denote the message size in bytes. The type of setMaxMessageSize is therefore:

setMaxMessageSize :: Integral i => Restricted Nneg1 Int64 i -> Socket a -> IO ()

which means any integral value in the range of -1 to (maxBound :: Int64) can be given. To create a restricted value from plain value, use toRestricted or restrict.

Devices

Devices are no longer present in 0MQ 3.x and consequently have been removed form this binding as well.

Error Handling

The type ZMQError is introduced, together with inspection functions errno, source and message. zmq_strerror is used underneath to retrieve the correct error message. ZMQError will be thrown when native 0MQ procedures return an error status and it can be catched as an Exception.

Synopsis

Type Definitions

data Context Source

A 0MQ context representation.

data Socket a Source

A 0MQ Socket.

data Flag Source

Flags to apply on send operations (cf. man zmq_send)

Constructors

SendMore

ZMQ_SNDMORE

Instances

data Switch Source

Configuration switch

Constructors

Default

Use default setting

On

Activate setting

Off

De-activate setting

Instances

data Event Source

Socket events.

Constructors

In

ZMQ_POLLIN (incoming messages)

Out

ZMQ_POLLOUT (outgoing messages, i.e. at least 1 byte can be written)

Err

ZMQ_POLLERR

data Poll m whereSource

Type representing a descriptor, poll is waiting for (either a 0MQ socket or a file descriptor) plus the type of event to wait for.

Constructors

Sock :: Socket s -> [Event] -> Maybe ([Event] -> m ()) -> Poll m 
File :: Fd -> [Event] -> Maybe ([Event] -> m ()) -> Poll m 

Type Classes

class Subscriber a Source

Sockets which can subscribe.

Instances

Socket Types

data Pair Source

Socket to communicate with a single peer. Allows for only a single connect or a single bind. There's no message routing or message filtering involved. Compatible peer sockets: Pair.

Constructors

Pair 

data Pub Source

Socket to distribute data. receive function is not implemented for this socket type. Messages are distributed in fanout fashion to all the peers. Compatible peer sockets: Sub.

Constructors

Pub 

Instances

data Sub Source

Socket to subscribe for data. send function is not implemented for this socket type. Initially, socket is subscribed for no messages. Use subscribe to specify which messages to subscribe for. Compatible peer sockets: Pub.

Constructors

Sub 

data XPub Source

Same as Pub except that you can receive subscriptions from the peers in form of incoming messages. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Compatible peer sockets: Sub, XSub.

Constructors

XPub 

data XSub Source

Same as Sub except that you subscribe by sending subscription messages to the socket. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Compatible peer sockets: Pub, XPub.

Constructors

XSub 

data Req Source

Socket to send requests and receive replies. Requests are load-balanced among all the peers. This socket type allows only an alternated sequence of send's and recv's. Compatible peer sockets: Rep, Router.

Constructors

Req 

data Rep Source

Socket to receive requests and send replies. This socket type allows only an alternated sequence of receive's and send's. Each send is routed to the peer that issued the last received request. Compatible peer sockets: Req, Dealer.

Constructors

Rep 

data Dealer Source

Each message sent is round-robined among all connected peers, and each message received is fair-queued from all connected peers. Compatible peer sockets: Router, Req, Rep.

Constructors

Dealer 

data Router Source

When receiving messages a Router socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers. When sending messages a Router socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to. If the peer does not exist anymore the message shall be silently discarded. Compatible peer sockets: Dealer, Req, Rep.

Constructors

Router 

type XReq = DealerSource

Deprecated: Use Dealer

Deprecated Alias

type XRep = RouterSource

Deprecated: Use Router

Deprecated Alias

data Pull Source

A socket of type Pull is used by a pipeline node to receive messages from upstream pipeline nodes. Messages are fair-queued from among all connected upstream nodes. The zmq_send() function is not implemented for this socket type.

Constructors

Pull 

data Push Source

A socket of type Push is used by a pipeline node to send messages to downstream pipeline nodes. Messages are load-balanced to all connected downstream nodes. The zmq_recv() function is not implemented for this socket type.

When a Push socket enters an exceptional state due to having reached the high water mark for all downstream nodes, or if there are no downstream nodes at all, then any zmq_send(3) operations on the socket shall block until the exceptional state ends or at least one downstream node becomes available for sending; messages are not discarded.

Constructors

Push 

General Operations

withContext :: (Context -> IO a) -> IO aSource

Run an action with a 0MQ context. The Context supplied to your action will not be valid after the action either returns or throws an exception.

withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO bSource

Run an action with a 0MQ socket. The socket will be closed after running the supplied action even if an error occurs. The socket supplied to your action will not be valid after the action terminates.

bind :: Socket a -> String -> IO ()Source

Bind the socket to the given address (cf. zmq_bind)

unbind :: Socket a -> String -> IO ()Source

Unbind the socket from the given address (cf. zmq_unbind)

connect :: Socket a -> String -> IO ()Source

Connect the socket to the given address (cf. zmq_connect).

send :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()Source

Send the given ByteString over the socket (cf. zmq_sendmsg).

Note: This function always calls zmq_sendmsg in a non-blocking way, i.e. there is no need to provide the ZMQ_DONTWAIT flag as this is used by default. Still send is blocking the thread as long as the message can not be queued on the socket using GHC's threadWaitWrite.

send' :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()Source

Send the given ByteString over the socket (cf. zmq_sendmsg). This is operationally identical to send socket (Strict.concat (Lazy.toChunks lbs)) flags but may be more efficient.

Note: This function always calls zmq_sendmsg in a non-blocking way, i.e. there is no need to provide the ZMQ_DONTWAIT flag as this is used by default. Still send' is blocking the thread as long as the message can not be queued on the socket using GHC's threadWaitWrite.

sendMulti :: Sender a => Socket a -> NonEmpty ByteString -> IO ()Source

Send a multi-part message. This function applies the SendMore Flag between all message parts. 0MQ guarantees atomic delivery of a multi-part message (cf. zmq_sendmsg for details).

receive :: Receiver a => Socket a -> IO ByteStringSource

Receive a ByteString from socket (cf. zmq_recvmsg).

Note: This function always calls zmq_recvmsg in a non-blocking way, i.e. there is no need to provide the ZMQ_DONTWAIT flag as this is used by default. Still receive is blocking the thread as long as no data is available using GHC's threadWaitRead.

receiveMulti :: Receiver a => Socket a -> IO [ByteString]Source

Receive a multi-part message. This function collects all message parts send via sendMulti.

version :: IO (Int, Int, Int)Source

Return the runtime version of the underlying 0MQ library as a (major, minor, patch) triple.

monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg))Source

Monitor socket events. This function returns a function which can be invoked to retrieve the next socket event, potentially blocking until the next one becomes available. When applied to False, monitoring will terminate, i.e. internal monitoring resources will be disposed. Consequently after monitor has been invoked, the returned function must be applied once to False.

poll :: MonadIO m => Timeout -> [Poll m] -> m [[Event]]Source

Polls for events on the given Poll descriptors. Returns the same list of Poll descriptors with an updated PollEvent field (cf. zmq_poll). Sockets which have seen no activity have None in their PollEvent field.

subscribe :: Subscriber a => Socket a -> ByteString -> IO ()Source

Subscribe Socket to given subscription.

unsubscribe :: Subscriber a => Socket a -> ByteString -> IO ()Source

Unsubscribe Socket from given subscription.

Context Options (Read)

ioThreads :: Context -> IO WordSource

Cf. zmq_ctx_get ZMQ_IO_THREADS

maxSockets :: Context -> IO WordSource

Cf. zmq_ctx_get ZMQ_MAX_SOCKETS

Context Options (Write)

setIoThreads :: Word -> Context -> IO ()Source

Cf. zmq_ctx_set ZMQ_IO_THREADS

setMaxSockets :: Word -> Context -> IO ()Source

Cf. zmq_ctx_set ZMQ_MAX_SOCKETS

Socket Options (Read)

affinity :: Socket a -> IO Word64Source

Cf. zmq_getsockopt ZMQ_AFFINITY

backlog :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_BACKLOG

delayAttachOnConnect :: Socket a -> IO BoolSource

Cf. zmq_getsockopt ZMQ_DELAY_ATTACH_ON_CONNECT

events :: Socket a -> IO [Event]Source

Cf. zmq_getsockopt ZMQ_EVENTS

fileDescriptor :: Socket a -> IO FdSource

Cf. zmq_getsockopt ZMQ_FD

identity :: Socket a -> IO ByteStringSource

Cf. zmq_getsockopt ZMQ_IDENTITY

ipv4Only :: Socket a -> IO BoolSource

Cf. zmq_getsockopt ZMQ_IPV4ONLY

lastEndpoint :: Socket a -> IO StringSource

Cf. zmq_getsockopt ZMQ_LAST_ENDPOINT

linger :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_LINGER

maxMessageSize :: Socket a -> IO Int64Source

Cf. zmq_getsockopt ZMQ_MAXMSGSIZE

mcastHops :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_MULTICAST_HOPS

moreToReceive :: Socket a -> IO BoolSource

Cf. zmq_getsockopt ZMQ_RCVMORE

rate :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_RATE

receiveBuffer :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_RCVBUF

receiveHighWM :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_RCVHWM

receiveTimeout :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_RCVTIMEO

reconnectInterval :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_RECONNECT_IVL

reconnectIntervalMax :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_RECONNECT_IVL_MAX

recoveryInterval :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_RECOVERY_IVL

sendBuffer :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_SNDBUF

sendHighWM :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_SNDHWM

sendTimeout :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_SNDTIMEO

tcpKeepAlive :: Socket a -> IO SwitchSource

Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE

tcpKeepAliveCount :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE_CNT

tcpKeepAliveIdle :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE_IDLE

tcpKeepAliveInterval :: Socket a -> IO IntSource

Cf. zmq_getsockopt ZMQ_TCP_KEEPALIVE_INTVL

Socket Options (Write)

setAffinity :: Word64 -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_AFFINITY

setBacklog :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_BACKLOG

setDelayAttachOnConnect :: Bool -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_DELAY_ATTACH_ON_CONNECT

setIdentity :: Restricted N1 N254 ByteString -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_IDENTITY

setIpv4Only :: Bool -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_IPV4ONLY

setLinger :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_LINGER

setMaxMessageSize :: Integral i => Restricted Nneg1 Int64 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_MAXMSGSIZE

setMcastHops :: Integral i => Restricted N1 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_MULTICAST_HOPS

setRate :: Integral i => Restricted N1 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_RATE

setReceiveBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_RCVBUF

setReceiveHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_RCVHWM

setReceiveTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_RCVTIMEO

setReconnectInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_RECONNECT_IVL

setReconnectIntervalMax :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_RECONNECT_IVL_MAX

setRecoveryInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_RECOVERY_IVL

setRouterMandatory :: Bool -> Socket Router -> IO ()Source

Cf. zmq_setsockopt ZMQ_ROUTER_MANDATORY

setSendBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_SNDBUF

setSendHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_SNDHWM

setSendTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_SNDTIMEO

setTcpAcceptFilter :: Maybe ByteString -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_TCP_ACCEPT_FILTER

setTcpKeepAlive :: Switch -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE

setTcpKeepAliveCount :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE_CNT

setTcpKeepAliveIdle :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE_IDLE

setTcpKeepAliveInterval :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()Source

Cf. zmq_setsockopt ZMQ_TCP_KEEPALIVE_INTVL

setXPubVerbose :: Bool -> Socket XPub -> IO ()Source

Cf. zmq_setsockopt ZMQ_XPUB_VERBOSE

Restrictions

restrict :: Restriction l u v => v -> Restricted l u vSource

Create a restricted value. If the given value does not satisfy the restrictions, a modified variant is used instead, e.g. if an integer is larger than the upper bound, the upper bound value is used.

toRestricted :: Restriction l u v => v -> Maybe (Restricted l u v)Source

Create a restricted value. Returns Nothing if the given value does not satisfy all restrictions.

Error Handling

data ZMQError Source

ZMQError encapsulates information about errors, which occur when using the native 0MQ API, such as error number and message.

errno :: ZMQError -> IntSource

Error number value.

source :: ZMQError -> StringSource

Source where this error originates from.

message :: ZMQError -> StringSource

Actual error message.

Low-level Functions

init :: Size -> IO ContextSource

Deprecated: Use context

term :: Context -> IO ()Source

Deprecated: Use destroy

context :: IO ContextSource

Initialize a 0MQ context (cf. zmq_ctx_new for details). You should normally prefer to use withContext instead.

destroy :: Context -> IO ()Source

Terminate a 0MQ context (cf. zmq_ctx_destroy). You should normally prefer to use withContext instead.

socket :: SocketType a => Context -> a -> IO (Socket a)Source

Create a new 0MQ socket within the given context. withSocket provides automatic socket closing and may be safer to use.

close :: Socket a -> IO ()Source

Close a 0MQ socket. withSocket provides automatic socket closing and may be safer to use.

waitRead :: Socket a -> IO ()Source

Wait until data is available for reading from the given Socket. After this function returns, a call to receive will essentially be non-blocking.

waitWrite :: Socket a -> IO ()Source

Wait until data can be written to the given Socket. After this function returns, a call to send will essentially be non-blocking.

Utils

proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO ()Source

Starts built-in 0MQ proxy.

Proxy connects front to back socket

Before calling proxy all sockets should be binded

If the capture socket is not Nothing, the proxy shall send all messages, received on both frontend and backend, to the capture socket.