As a simple example, consider the network equivalent of /dev/zero, below. (Or more realistically, just a web server sending a large file.)
If a client disconnects early, you get a barrage of log messages:
WARNING:asyncio:socket.send() raised exception.
But I'm not finding any way to catch said exception. The hypothetical server continues reading gigabytes from disk and sending them to a dead socket, with no effort on the client's part, and you've got yourself a DoS attack.
The only thing I've found from the docs is to yield from a read, with an empty string indicating closure. But that's no good here because a normal client isn't going to send anything, blocking the write loop.
What's the right way to detect failed writes, or be notified that the TCP connection has been closed, with the streams API or otherwise?
Code:
from asyncio import *
import logging@coroutine
def client_handler(reader, writer):while True:writer.write(bytes(1))yield from writer.drain()logging.basicConfig(level=logging.INFO)
loop = get_event_loop()
coro = start_server(client_handler, '', 12345)
server = loop.run_until_complete(coro)
loop.run_forever()
I did some digging into the asyncio
source to expand on dano's answer on why the exceptions aren't being raised without explicitly passing control to the event loop. Here's what I've found.
Calling yield from wirter.drain()
gives the control over to the StreamWriter.drain
coroutine. This coroutine checks for and raises any exceptions that that the StreamReaderProtocol
set on the StreamReader
. But since we passed control over to drain
, the protocol hasn't had the chance to set the exception yet. drain
then gives control over to the FlowControlMixin._drain_helper
coroutine. This coroutine the returns immediately because some more flags haven't been set yet, and the control ends up back with the coroutine that called yield from wirter.drain()
.
And so we have gone full circle without giving control to the event loop to allow it handle other coroutines and bubble up the exceptions to writer.drain()
.
yield
ing before a drain()
gives the transport/protocol a chance to set the appropriate flags and exceptions.
Here's a mock up of what's going on, with all the nested calls collapsed:
import asyncio as aiodef set_exception(ctx, exc):ctx["exc"] = exc@aio.coroutine
def drain(ctx):if ctx["exc"] is not None:raise ctx["exc"]return@aio.coroutine
def client_handler(ctx):i = 0while True:i += 1print("write", i)# yield # Uncommenting this allows the loop.call_later call to be scheduled.yield from drain(ctx)CTX = {"exc": None}loop = aio.get_event_loop()
# Set the exception in 5 seconds
loop.call_later(5, set_exception, CTX, Exception("connection lost"))
loop.run_until_complete(client_handler(CTX))
loop.close()
This should probably fixed upstream in the Streams API by the asyncio
developers.