This module implements asynchronous IO. This includes a dispatcher, a Future
type implementation, and an async
macro which allows asynchronous code to be written in a synchronous style with the await
keyword.
The dispatcher acts as a kind of event loop. You must call poll
on it (or a function which does so for you such as waitFor
or runForever
) in order to poll for any outstanding events. The underlying implementation is based on epoll on Linux, IO Completion Ports on Windows and select on other operating systems.
The poll
function will not, on its own, return any events. Instead an appropriate Future
object will be completed. A Future
is a type which holds a value which is not yet available, but which may be available in the future. You can check whether a future is finished by using the finished
function. When a future is finished it means that either the value that it holds is now available or it holds an error instead. The latter situation occurs when the operation to complete a future fails with an exception. You can distinguish between the two situations with the failed
function.
Future objects can also store a callback procedure which will be called automatically once the future completes.
Futures therefore can be thought of as an implementation of the proactor pattern. In this pattern you make a request for an action, and once that action is fulfilled a future is completed with the result of that action. Requests can be made by calling the appropriate functions. For example: calling the recv
function will create a request for some data to be read from a socket. The future which the recv
function returns will then complete once the requested amount of data is read or an exception occurs.
Code to read some data from a socket may look something like this:
var future = socket.recv(100) future.addCallback( proc () = echo(future.read) )
All asynchronous functions returning a Future
will not block. They will not however return immediately. An asynchronous function will have code which will be executed before an asynchronous request is made, in most cases this code sets up the request.
In the above example, the recv
function will return a brand new Future
instance once the request for data to be read from the socket is made. This Future
instance will complete once the requested amount of data is read, in this case it is 100 bytes. The second line sets a callback on this future which will be called once the future completes. All the callback does is write the data stored in the future to stdout
. The read
function is used for this and it checks whether the future completes with an error for you (if it did it will simply raise the error), if there is no error however it returns the value of the future.
Asynchronous procedures remove the pain of working with callbacks. They do this by allowing you to write asynchronous code the same way as you would write synchronous code.
An asynchronous procedure is marked using the {.async.}
pragma. When marking a procedure with the {.async.}
pragma it must have a Future[T]
return type or no return type at all. If you do not specify a return type then Future[void]
is assumed.
Inside asynchronous procedures await
can be used to call any procedures which return a Future
; this includes asynchronous procedures. When a procedure is "awaited", the asynchronous procedure it is awaited in will suspend its execution until the awaited procedure's Future completes. At which point the asynchronous procedure will resume its execution. During the period when an asynchronous procedure is suspended other asynchronous procedures will be run by the dispatcher.
The await
call may be used in many contexts. It can be used on the right hand side of a variable declaration: var data = await socket.recv(100)
, in which case the variable will be set to the value of the future automatically. It can be used to await a Future
object, and it can be used to await a procedure returning a Future[void]
: await socket.send("foobar")
.
If an awaited future completes with an error, then await
will re-raise this error. To avoid this, you can use the yield
keyword instead of await
. The following section shows different ways that you can handle exceptions in async procs.
The most reliable way to handle exceptions is to use yield
on a future then check the future's failed
property. For example:
var future = sock.recv(100) yield future if future.failed: # Handle exception
The async
procedures also offer limited support for the try statement.
try: let data = await sock.recv(100) echo("Received ", data) except: # Handle exception
Unfortunately the semantics of the try statement may not always be correct, and occasionally the compilation may fail altogether. As such it is better to use the former style when possible.
Futures should never be discarded. This is because they may contain errors. If you do not care for the result of a Future then you should use the asyncCheck
procedure instead of the discard
keyword.
For examples take a look at the documentation for the modules implementing asynchronous IO. A good place to start is the asyncnet module.
raises: []
) does not work with async procedures.CompletionData = object fd*: AsyncFD cb*: proc (fd: AsyncFD; bytesTransferred: DWORD; errcode: OSErrorCode) {...}{.closure, gcsafe.} cell*: ForeignCell
PDispatcher = ref object of PDispatcherBase ioPort: Handle handles: HashSet[AsyncFD]
PCustomOverlapped = ref CustomOverlapped
AsyncFD = distinct int
AsyncEvent = ptr AsyncEventImpl
proc `==`(x: AsyncFD; y: AsyncFD): bool {...}{.borrow.}
proc newDispatcher(): PDispatcher {...}{.raises: [], tags: [].}
proc setGlobalDispatcher(disp: PDispatcher) {...}{.raises: [Exception], tags: [RootEffect].}
Raises AssertionError
with msg if cond is false. Note that AssertionError
is hidden from the effect system, so it doesn't produce {.raises: [AssertionError].}
. This exception is only supposed to be caught by unit testing frameworks.
The compiler may not generate any code at all for assert
if it is advised to do so through the -d:release
or --assertions:off
command line switches.
proc getGlobalDispatcher(): PDispatcher {...}{.raises: [Exception], tags: [RootEffect].}
proc getIoHandler(disp: PDispatcher): Handle {...}{.raises: [], tags: [].}
proc register(fd: AsyncFD) {...}{.raises: [Exception, OSError], tags: [RootEffect].}
fd
with the dispatcher. proc hasPendingOperations(): bool {...}{.raises: [Exception], tags: [RootEffect].}
proc recv(socket: AsyncFD; size: int; flags = {SafeDisconn}): Future[string] {...}{. raises: [Exception, ValueError, FutureError], tags: [RootEffect].}
Reads up to size
bytes from socket
. Returned future will complete once all the data requested is read, a part of the data has been read, or the socket has disconnected in which case the future will complete with a value of ""
.
Warning: The Peek
socket flag is not supported on Windows.
proc recvInto(socket: AsyncFD; buf: pointer; size: int; flags = {SafeDisconn}): Future[int] {...}{. raises: [Exception, ValueError, FutureError], tags: [RootEffect].}
Reads up to size
bytes from socket
into buf
, which must at least be of that size. Returned future will complete once all the data requested is read, a part of the data has been read, or the socket has disconnected in which case the future will complete with a value of 0
.
Warning: The Peek
socket flag is not supported on Windows.
proc send(socket: AsyncFD; buf: pointer; size: int; flags = {SafeDisconn}): Future[void] {...}{. raises: [Exception, ValueError, FutureError], tags: [RootEffect].}
Sends size
bytes from buf
to socket
. The returned future will complete once all data has been sent.
WARNING: Use it with caution. If buf
refers to GC'ed object, you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
proc sendTo(socket: AsyncFD; data: pointer; size: int; saddr: ptr SockAddr; saddrLen: SockLen; flags = {SafeDisconn}): Future[void] {...}{. raises: [Exception, ValueError, FutureError], tags: [RootEffect].}
data
to specified destination saddr
, using socket socket
. The returned future will complete once all data has been sent. proc recvFromInto(socket: AsyncFD; data: pointer; size: int; saddr: ptr SockAddr; saddrLen: ptr SockLen; flags = {SafeDisconn}): Future[int] {...}{. raises: [Exception, ValueError, FutureError], tags: [RootEffect].}
socket
into buf
, which must be at least of size size
, address of datagram's sender will be stored into saddr
and saddrLen
. Returned future will complete once one datagram has been received, and will return size of packet received. proc acceptAddr(socket: AsyncFD; flags = {SafeDisconn}): Future[ tuple[address: string, client: AsyncFD]] {...}{.raises: [Exception, ValueError, OSError, FutureError, FutureError, Exception], tags: [RootEffect].}
Accepts a new connection. Returns a future containing the client socket corresponding to that connection and the remote address of the client. The future will complete when the connection is successfully accepted.
The resulting client socket is automatically registered to the dispatcher.
The accept
call may result in an error if the connecting socket disconnects during the duration of the accept
. If the SafeDisconn
flag is specified then this error will not be raised and instead accept will be called again.
proc closeSocket(socket: AsyncFD) {...}{.raises: [Exception], tags: [RootEffect].}
proc unregister(fd: AsyncFD) {...}{.raises: [Exception], tags: [RootEffect].}
fd
. proc contains(disp: PDispatcher; fd: AsyncFD): bool {...}{.raises: [], tags: [].}
proc addRead(fd: AsyncFD; cb: Callback) {...}{.raises: [Exception, OSError], tags: [RootEffect].}
Start watching the file descriptor for read availability and then call the callback cb
.
This is not pure
mechanism for Windows Completion Ports (IOCP), so if you can avoid it, please do it. Use addRead only if really need it (main usecase is adaptation of unix-like libraries to be asynchronous on Windows).
If you use this function, you don't need to use asyncdispatch.recv() or asyncdispatch.accept(), because they are using IOCP, please use nativesockets.recv() and nativesockets.accept() instead.
Be sure your callback cb
returns true
, if you want to remove watch of read notifications, and false
, if you want to continue receiving notifications.
proc addWrite(fd: AsyncFD; cb: Callback) {...}{.raises: [Exception, OSError], tags: [RootEffect].}
Start watching the file descriptor for write availability and then call the callback cb
.
This is not pure
mechanism for Windows Completion Ports (IOCP), so if you can avoid it, please do it. Use addWrite only if really need it (main usecase is adaptation of unix-like libraries to be asynchronous on Windows).
If you use this function, you don't need to use asyncdispatch.send() or asyncdispatch.connect(), because they are using IOCP, please use nativesockets.send() and nativesockets.connect() instead.
Be sure your callback cb
returns true
, if you want to remove watch of write notifications, and false
, if you want to continue receiving notifications.
proc addTimer(timeout: int; oneshot: bool; cb: Callback) {...}{. raises: [Exception, OSError], tags: [RootEffect].}
Registers callback cb
to be called when timer expired.
Parameters:
timeout
- timeout value in milliseconds.oneshot
proc addProcess(pid: int; cb: Callback) {...}{.raises: [Exception, OSError], tags: [RootEffect].}
cb
to be called when process with process ID pid
exited. proc newAsyncEvent(): AsyncEvent {...}{.raises: [OSError, Exception], tags: [].}
Creates a new thread-safe AsyncEvent
object.
New AsyncEvent
object is not automatically registered with dispatcher like AsyncSocket
.
proc trigger(ev: AsyncEvent) {...}{.raises: [OSError], tags: [].}
ev
to signaled state. proc unregister(ev: AsyncEvent) {...}{.raises: [Exception, OSError], tags: [RootEffect].}
ev
. proc close(ev: AsyncEvent) {...}{.raises: [Exception, OSError], tags: [].}
ev
. proc addEvent(ev: AsyncEvent; cb: Callback) {...}{.raises: [Exception, OSError], tags: [RootEffect].}
cb
to be called when ev
will be signaled proc drain(timeout = 500) {...}{.raises: [Exception, ValueError, FutureError, UnpackError, OSError, IndexError], tags: [RootEffect, TimeEffect].}
ValueError
if there are no pending operations. In contrast to poll
this processes as many events as are available. proc poll(timeout = 500) {...}{.raises: [Exception, ValueError, FutureError, UnpackError, OSError, IndexError], tags: [RootEffect, TimeEffect].}
ValueError
if there are no pending operations. This runs the underlying OS epoll or kqueue primitive only once. proc sleepAsync(ms: int | float): Future[void]
ms
milliseconds. proc withTimeout[T](fut: Future[T]; timeout: int): Future[bool]
Returns a future which will complete once fut
completes or after timeout
milliseconds has elapsed.
If fut
completes first the returned future will hold true, otherwise, if timeout
milliseconds has elapsed first, the returned future will hold false.
proc accept(socket: AsyncFD; flags = {SafeDisconn}): Future[AsyncFD] {...}{. raises: [Exception, ValueError, OSError, FutureError], tags: [RootEffect].}
proc send(socket: AsyncFD; data: string; flags = {SafeDisconn}): Future[void] {...}{. raises: [Exception, ValueError, FutureError], tags: [RootEffect].}
data
to socket
. The returned future will complete once all data has been sent. proc readAll(future: FutureStream[string]): Future[string] {...}{. raises: [FutureError], tags: [RootEffect].}
proc recvLine(socket: AsyncFD): Future[string] {...}{.deprecated, raises: [FutureError], tags: [RootEffect].}
Reads a line of data from socket
. Returned future will complete once a full line is read or an error occurs.
If a full line is read \r\L
is not added to line
, however if solely \r\L
is read then line
will be set to it.
If the socket is disconnected, line
will be set to ""
.
If the socket is disconnected in the middle of a line (before \r\L
is read) then line will be set to ""
. The partial line will be lost.
Warning: This assumes that lines are delimited by \r\L
.
Note: This procedure is mostly used for testing. You likely want to use asyncnet.recvLine
instead.
Deprecated since version 0.15.0: Use asyncnet.recvLine()
instead.
proc runForever() {...}{.raises: [Exception, ValueError, FutureError, UnpackError, OSError, IndexError], tags: [RootEffect, TimeEffect].}
proc waitFor[T](fut: Future[T]): T
proc setEvent(ev: AsyncEvent) {...}{.deprecated, raises: [OSError], tags: [].}
Set event ev
to signaled state.
Deprecated since v0.18.0: Use trigger
instead.
© 2006–2018 Andreas Rumpf
Licensed under the MIT License.
https://nim-lang.org/docs/asyncdispatch.html