Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 273 additions & 2 deletions irc/client_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
that dispatches events to instance methods is included.

Current limitations:
* DCC chat has not yet been implemented
* DCC file transfers are not suppored
* RFCs 2810, 2811, 2812, and 2813 have not been considered.

Notes:
Expand All @@ -39,15 +37,20 @@

import asyncio
import logging
import socket
import threading
import warnings

from jaraco.stream import buffer
from . import connection
from .client import (
DCCConnectionError,
Event,
Reactor,
ServerConnection,
ServerNotConnectedError,
SimpleIRCClient,
DCCConnection,
_ping_ponger,
)

Expand Down Expand Up @@ -212,6 +215,239 @@ def disconnect(self, message=""):
self._handle_event(Event("disconnect", self.server, "", [message]))


class DCCProtocol(IrcProtocol):
"""
A protocol for handling DCC connections.

Currently, DCCProtocol uses the same methods as `IrcProtocol`
for handling incoming data. This should be fine for most use
cases, but in the unlikely event that a DCC connection needs to
handle incoming data in a different way than an IRC connection,
this class will need to be overridden.
"""

def connection_made(self, transport):
if self.connection.passive and not self.connection.connected:
self.connection.transport = transport
self.connection.connected = True
self.connection.peeraddress, self.connection.peerport = transport.get_extra_info('peername')
log.debug("DCC connection from %s:%d", self.connection.peeraddress, self.connection.peerport)
self.connection.reactor._handle_event(
self.connection, Event("dcc_connect", self.connection.peeraddress, None, None)
)
if hasattr(self.connection, 'server') and self.connection.server:
self.connection.server.close()
return

# For active connections, ensure transport is set if not already
if not getattr(self.connection, 'transport', None):
self.connection.transport = transport


class AioDCCConnection(DCCConnection):
"""
An asyncio-based DCCConnection.

This class overrides select-based methods with asyncio-based ones.
"""

reactor: "AioReactor"
buffer_class = buffer.LineBuffer

protocol_class = DCCProtocol
protocol: DCCProtocol
socket: None
connected: bool
passive: bool
peeraddress: str
peerport: int

async def connect(
self, address: str, port: int, connect_factory: connection.AioFactory = connection.AioFactory()
) -> "AioDCCConnection":
"""Connect/reconnect to a DCC peer.

Arguments:
address -- Host/IP address of the peer.
port -- The port number to connect to.
connect_factory -- A callable that takes the event loop and the
server address, and returns a connection (with a socket interface)

Returns the DCCConnection object.
"""
self.peeraddress = address
self.peerport = port
self.handlers = {}
self.buffer = self.buffer_class()

self.connect_factory = connect_factory
protocol_instance = self.protocol_class(self, self.reactor.loop)
try:
connection = self.connect_factory(protocol_instance, (self.peeraddress, self.peerport))
transport, protocol = await connection
except OSError as ex:
raise DCCConnectionError(f"Couldn't connect to socket: {ex}") from ex

self.transport = transport
self.protocol = protocol

self.connected = True
self.reactor._on_connect(self.protocol, self.transport)
return self

async def listen(self, addr=None, port=None, ipv6=False) -> "AioDCCConnection":
"""Wait for a connection/reconnection from a DCC peer.

Returns the DCCConnection object.

The local IP address and port are available as
self.localaddress and self.localport. After connection from a
peer, the peer address and port are available as
self.peeraddress and self.peerport.

Arguments:
addr -- Host string or (host, port) tuple to bind to.
If a tuple, the port is only used if `port` is None.
port -- Port to listen on. Can be an int, a (min, max) tuple
to try a range, or a list of ports to try in order.
Overrides the port in `addr` if both are provided.
ipv6 -- Use IPv6 if True.
"""
self.passive = True
self.handlers = {}
self.buffer = self.buffer_class()

# Resolve host and default port from addr
if addr is None:
host = socket.gethostbyname(socket.gethostname())
addr_port = 0
elif isinstance(addr, str):
host = addr
addr_port = 0
else:
host, addr_port = addr

# port parameter overrides addr port if specified
if port is None:
port = addr_port

def factory():
return self.protocol_class(self, self.reactor.loop)

family = socket.AF_INET6 if ipv6 else socket.AF_INET

# Build iterable of ports to try
if isinstance(port, int):
ports = [port]
elif isinstance(port, tuple):
ports = range(port[0], port[1] + 1)
else:
ports = port # assume list/iterable

last_error = None
for try_port in ports:
try:
self.server = await self.reactor.loop.create_server(
factory, host, try_port, family=family
)
break
except OSError as ex:
last_error = ex
continue
else:
raise DCCConnectionError(f"Couldn't bind socket: {last_error}") from last_error

# Get the actual bound address and port
socket_obj = self.server.sockets[0]
self.localaddress, self.localport = socket_obj.getsockname()

return self

def disconnect(self, message: str = "") -> None:
"""Hang up the connection and close the object.

Arguments:

message -- Quit message.
"""
try:
del self.connected
except AttributeError:
return

try:
if hasattr(self, 'server') and self.server:
self.server.close()
except AttributeError:
pass

try:
self.transport.close()
except AttributeError:
pass

self.reactor._handle_event(
self, Event("dcc_disconnect", self.peeraddress, "", [message])
)
self.reactor._remove_connection(self)

def process_data(self, new_data: bytes) -> None:
"""
handles incoming data from the `DCCProtocol` connection.
"""

if self.dcctype == "chat":
self.buffer.feed(new_data)

chunks = list(self.buffer)

if len(self.buffer) > 2**14:
# Bad peer! Naughty peer!
log.info(
"Received >16k from a peer without a newline; " "disconnecting."
)
self.disconnect()
return
else:
chunks = [new_data]

command = "dccmsg"
prefix = self.peeraddress
target = None
for chunk in chunks:
log.debug("FROM PEER: %s", chunk)
arguments = [chunk]
log.debug(
"command: %s, source: %s, target: %s, arguments: %s",
command,
prefix,
target,
arguments,
)
event = Event(command, prefix, target, arguments)
self.reactor._handle_event(self, event)

def privmsg(self, text: str) -> None:
"""
Send text to DCC peer.

The text will be padded with a newline if it's a DCC CHAT session.
"""
if self.dcctype == 'chat':
text += '\n'
return self.send_bytes(self.encode(text))

def send_bytes(self, data: bytes) -> None:
"""
Send data to DCC peer.
"""
try:
self.transport.write(data)
log.debug("TO PEER: %r\n", data)
except (OSError, AttributeError):
self.disconnect("Connection reset by peer.")


class AioReactor(Reactor):
"""
Processes message from on or more asyncio-based IRC server connections.
Expand Down Expand Up @@ -248,6 +484,7 @@ async def my_repeating_message(connection):
"""

connection_class = AioConnection
dcc_connection_class = AioDCCConnection

def __do_nothing(*args, **kwargs):
pass
Expand Down Expand Up @@ -275,6 +512,21 @@ class definied above.
"""
self.loop.run_forever()

def dcc(self, dcctype="chat"):
"""Creates and returns a DCCConnection object.

Arguments:

dcctype -- "chat" for DCC CHAT connections or "raw" for
DCC SEND (or other DCC types). If "chat",
incoming data will be split in newline-separated
chunks. If "raw", incoming data is not touched.
"""
with self.mutex:
conn = self.dcc_connection_class(self, dcctype)
self.connections.append(conn)
return conn


class AioSimpleIRCClient(SimpleIRCClient):
"""A simple single-server IRC client class.
Expand All @@ -288,6 +540,25 @@ class AioSimpleIRCClient(SimpleIRCClient):
"""

reactor_class = AioReactor
reactor: AioReactor

def connect(self, *args, **kwargs):
self.reactor.loop.run_until_complete(self.connection.connect(*args, **kwargs))

def dcc_connect(self, address, port, dcctype="chat"):
"""Connect to a DCC peer.

Returns an AioDCCConnection instance.
"""
warnings.warn("Use self.dcc(type).connect()", DeprecationWarning, stacklevel=2)
dcc = self.dcc(dcctype)
return self.reactor.loop.run_until_complete(dcc.connect(address, port))

def dcc_listen(self, dcctype="chat"):
"""Listen for connections from a DCC peer.

Returns an AioDCCConnection instance.
"""
warnings.warn("Use self.dcc(type).listen()", DeprecationWarning, stacklevel=2)
dcc = self.dcc(dcctype)
return self.reactor.loop.run_until_complete(dcc.listen())
Loading