Skip to content

Commit

Permalink
Merge branch 'Gjum-multisocket'
Browse files Browse the repository at this point in the history
  • Loading branch information
gamingrobot committed Apr 3, 2016
2 parents 2cbebd1 + 3e87135 commit 2e10637
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 84 deletions.
4 changes: 3 additions & 1 deletion spockbot/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from spockbot.plugins.core import auth, event, net, taskmanager, ticker, timers
from spockbot.plugins.core import auth, event, net, select, \
taskmanager, ticker, timers
from spockbot.plugins.helpers import auxiliary, channels, chat, clientinfo, \
craft, entities, interact, inventory, movement, \
pathfinding, physics, start, world
Expand All @@ -7,6 +8,7 @@
('auth', auth.AuthPlugin),
('event', event.EventPlugin),
('net', net.NetPlugin),
('select', select.SelectPlugin),
('taskmanager', taskmanager.TaskManager),
('ticker', ticker.TickerPlugin),
('timers', timers.TimersPlugin),
Expand Down
139 changes: 56 additions & 83 deletions spockbot/plugins/core/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"""

import logging
import select
import socket
import time

Expand Down Expand Up @@ -36,44 +35,11 @@ def decrypt(self, data):
return self.decryptifier.update(data)


class SelectSocket(socket.socket):
"""
Provides an asynchronous socket with a poll method built on
top of select.select for cross-platform compatiability
"""
def __init__(self, timer):
super(SelectSocket, self).__init__(socket.AF_INET, socket.SOCK_STREAM)
self.sending = False
self.timer = timer

def poll(self):
flags = []
if self.sending:
self.sending = False
slist = [(self,), (self,), (self,)]
else:
slist = [(self,), (), (self,)]
timeout = self.timer.get_timeout()
if timeout >= 0:
slist.append(timeout)
try:
rlist, wlist, xlist = select.select(*slist)
except select.error as e:
logger.error("SELECTSOCKET: Socket Error: %s", str(e))
rlist, wlist, xlist = [], [], []
if rlist:
flags.append('SOCKET_RECV')
if wlist:
flags.append('SOCKET_SEND')
if xlist:
flags.append('SOCKET_ERR')
return flags


class NetCore(object):
def __init__(self, sock, event):
def __init__(self, sock, event, select):
self.sock = sock
self.event = event
self.select = select
self.host = None
self.port = None
self.connected = False
Expand All @@ -84,21 +50,24 @@ def __init__(self, sock, event):
self.sbuff = b''
self.rbuff = BoundBuffer()

def reset(self, sock):
self.__init__(sock, self.event, self.select)

def connect(self, host='localhost', port=25565):
self.host = host
self.port = port
try:
logger.debug("NETCORE: Attempting to connect to host: %s port: %s",
logger.debug('NETCORE: Attempting to connect to host: %s port: %s',
host, port)
# Set the connect to be a blocking operation
self.sock.setblocking(True)
self.sock.connect((self.host, self.port))
self.sock.connect((host, port))
self.sock.setblocking(False)
self.connected = True
self.event.emit('net_connect', (self.host, self.port))
logger.debug("NETCORE: Connected to host: %s port: %s", host, port)
self.event.emit('net_connect', (host, port))
logger.debug('NETCORE: Connected to host: %s port: %s', host, port)
except socket.error as error:
logger.error("NETCORE: Error on Connect")
logger.error('NETCORE: Error on Connect')
self.event.emit('SOCKET_ERR', error)

def set_proto_state(self, state):
Expand All @@ -115,7 +84,7 @@ def push(self, packet):
self.sbuff += (self.cipher.encrypt(data) if self.encrypted else data)
self.event.emit(packet.ident, packet)
self.event.emit(packet.str_ident, packet)
self.sock.sending = True
self.select.schedule_sending(self.sock)

def push_packet(self, ident, data):
self.push(mcpacket.Packet(ident, data))
Expand Down Expand Up @@ -152,21 +121,19 @@ def disable_crypto(self):
self.cipher = None
self.encrypted = False

def reset(self, sock):
self.__init__(sock, self.event)


@pl_announce('Net')
class NetPlugin(PluginBase):
requires = ('Event', 'Timers')
requires = ('Event', 'Select', 'Timers')
defaults = {
'bufsize': 4096,
'sock_quit': True,
}
events = {
'event_tick': 'tick',
'SOCKET_RECV': 'handle_recv',
'SOCKET_SEND': 'handle_send',
'select_recv': 'handle_recv',
'select_send': 'handle_send',
'select_err': 'handle_err',
'SOCKET_ERR': 'handle_err',
'SOCKET_HUP': 'handle_hup',
'PLAY<Disconnect': 'handle_disconnect',
Expand All @@ -182,25 +149,25 @@ def __init__(self, ploader, settings):
super(NetPlugin, self).__init__(ploader, settings)
self.bufsize = self.settings['bufsize']
self.sock_quit = self.settings['sock_quit']
self.sock = SelectSocket(self.timers)
self.net = NetCore(self.sock, self.event)
self.sock = None
self.net = NetCore(self.sock, self.event, self.select)
self.reset_sock()
self.sock_dead = False
ploader.provides('Net', self.net)

def tick(self, name, data):
if self.net.connected:
for flag in self.sock.poll():
self.event.emit(flag)
self.net.select.poll()
else:
timeout = self.timers.get_timeout()
if timeout == -1:
time.sleep(1)
else:
time.sleep(timeout)

# SOCKET_RECV - Socket is ready to recieve data
def handle_recv(self, name, data):
if self.net.connected:
def handle_recv(self, name, fileno):
"""Socket is ready to recieve data"""
if self.net.connected and fileno == self.net.sock.fileno():
try:
data = self.sock.recv(self.bufsize)
if not data:
Expand All @@ -210,67 +177,73 @@ def handle_recv(self, name, data):
except socket.error as error:
self.event.emit('SOCKET_ERR', error)

# SOCKET_SEND - Socket is ready to send data and Send buffer contains
# data to send
def handle_send(self, name, data):
if self.net.connected:
def handle_send(self, name, fileno):
"""Socket is ready to send data and send buffer has data to send"""
if self.net.connected and fileno == self.net.sock.fileno():
try:
sent = self.sock.send(self.net.sbuff)
self.net.sbuff = self.net.sbuff[sent:]
if self.net.sbuff:
self.sock.sending = True
self.net.select.schedule_sending(self.sock)
except socket.error as error:
self.event.emit('SOCKET_ERR', error)

# SOCKET_ERR - Socket Error has occured
def handle_err(self, name, data):
self.sock.close()
self.sock = SelectSocket(self.timers)
self.net.reset(self.sock)
logger.error("NETPLUGIN: Socket Error: %s", data)
self.event.emit('net_disconnect', data)
if self.sock_quit and not self.event.kill_event:
self.sock_dead = True
self.event.kill()
def handle_select_err(self, name, fileno):
if self.net.connected and fileno == self.net.sock.fileno():
self.event.emit('SOCKET_ERR', 'select error')

def handle_err(self, name, error):
"""Socket Error has occured"""
logger.error('NETPLUGIN: Socket Error: %s', error)
self.reset_sock()
self.event.emit('net_disconnect', error)
self.check_quit()

# SOCKET_HUP - Socket has hung up
def handle_hup(self, name, data):
self.sock.close()
self.sock = SelectSocket(self.timers)
"""Socket has hung up"""
logger.error('NETPLUGIN: Socket has hung up')
self.reset_sock()
self.event.emit('net_disconnect', 'Socket Hung Up')
self.check_quit()

def reset_sock(self):
if self.sock:
self.sock.close()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.net.select.register_socket(self.sock)
self.net.reset(self.sock)
logger.error("NETPLUGIN: Socket has hung up")
self.event.emit('net_disconnect', "Socket Hung Up")

def check_quit(self):
if self.sock_quit and not self.event.kill_event:
self.sock_dead = True
self.event.kill()

# Handshake - Change to whatever the next state is going to be
def handle_handshake(self, name, packet):
"""Change to whatever the next state is going to be"""
self.net.set_proto_state(packet.data['next_state'])

# Login Success - Change to Play state
def handle_login_success(self, name, packet):
"""Change to Play state"""
self.net.set_proto_state(proto.PLAY_STATE)

# Handle Set Compression packets
def handle_comp(self, name, packet):
"""Handle Set Compression packets"""
self.net.set_comp_state(packet.data['threshold'])

def handle_disconnect(self, name, packet):
logger.debug("NETPLUGIN: Disconnected: %s", packet.data['reason'])
logger.debug('NETPLUGIN: Disconnected: %s', packet.data['reason'])
self.event.emit('net_disconnect', packet.data['reason'])

def handle_login_disconnect(self, name, packet):

reason = packet.data.get('json_data', {}).get('text', '???')

logger.debug("NETPLUGIN: Disconnected: %s", reason)
self.event.emit('net_disconnect', reason)

# Kill event - Try to shutdown the socket politely
def handle_kill(self, name, data):
"""Try to shutdown the socket politely"""
if self.net.connected:
logger.debug("NETPLUGIN: Kill event received, closing socket")
logger.debug('NETPLUGIN: Kill event received, closing socket')
if not self.sock_dead:
self.sock.shutdown(socket.SHUT_WR)
self.sock.close()
self.net.select.unregister_socket(self.net.sock)
70 changes: 70 additions & 0 deletions spockbot/plugins/core/select.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Provides an asynchronous multi-socket selector with a poll method
built on top of select.select for cross-platform compatibility.
After polling select, two events are emitted for each socket and kind-of-ready,
``select_<kind>`` and ``select_<kind>_<sock.fileno()>``, where
``<kind>`` is one of ``recv, send, err``.
The event payload is always the fileno of the corresponding socket.
(The event plugin deep-copies the payload, but sockets are not serializable)
Note that the event loop is stopped during selecting. This is good in that
the loop does not consume 100% CPU, but it means you have to register
at least a slow timer if you do stuff on ``event_tick`` and
expect it to be emitted frequently.
"""

import logging
import select

from spockbot.plugins.base import PluginBase, pl_announce

logger = logging.getLogger('spockbot')


@pl_announce('Select')
class SelectPlugin(PluginBase):
requires = ('Event', 'Timers')

def __init__(self, ploader, settings):
super(SelectPlugin, self).__init__(ploader, settings)
self.sockets = set()
self.sending = set()
ploader.provides('Select', self)

def register_socket(self, sock):
"""``poll()``ing will emit events when this socket is ready."""
self.sockets.add(sock)

def unregister_socket(self, sock):
self.sockets.remove(sock)

def schedule_sending(self, sock):
"""Emit one event the next time this socket is ready to send."""
self.sending.add(sock)

def poll(self):
timeout = self.timers.get_timeout()
if timeout < 0:
timeout = 5 # do not hang

select_args = [
tuple(self.sockets),
tuple(self.sending),
tuple(self.sockets),
timeout,
]
self.sending.clear()

try:
ready_lists = select.select(*select_args)
except select.error as e:
logger.error('SELECTSOCKET: Socket Error: "%s" %s', str(e), e.args)
return

for ready_socks, kind in zip(ready_lists, ('recv', 'send', 'err')):
for sock in ready_socks:
self.event.emit('select_%s' % kind, sock.fileno())
self.event.emit('select_%s_%s' % (kind, sock.fileno()),
sock.fileno())

0 comments on commit 2e10637

Please sign in to comment.