Loading tests/negtelnetserver.py +12 −21 Original line number Diff line number Diff line Loading @@ -9,7 +9,6 @@ import argparse import os import sys import logging import struct try: # Python 2 import SocketServer as socketserver except ImportError: # Python 3 Loading @@ -22,8 +21,8 @@ IDENT = "NTEL" # The strings that indicate the test framework is checking our aliveness VERIFIED_REQ = b"verifiedserver" VERIFIED_RSP = b"WE ROOLZ: {pid}" VERIFIED_REQ = "verifiedserver" VERIFIED_RSP = "WE ROOLZ: {pid}" def telnetserver(options): Loading @@ -34,7 +33,7 @@ def telnetserver(options): if options.pidfile: pid = os.getpid() with open(options.pidfile, "w") as f: f.write(b"{0}".format(pid)) f.write(str(pid)) local_bind = (HOST, options.port) log.info("Listening on %s", local_bind) Loading Loading @@ -68,9 +67,10 @@ class NegotiatingTelnetHandler(socketserver.BaseRequestHandler): data = neg.recv(1024) log.debug("Incoming data: %r", data) if VERIFIED_REQ in data: if VERIFIED_REQ.encode('ascii') in data: log.debug("Received verification request from test framework") response_data = VERIFIED_RSP.format(pid=os.getpid()) response = VERIFIED_RSP.format(pid=os.getpid()) response_data = response.encode('ascii') else: log.debug("Received normal request - echoing back") response_data = data.strip() Loading Loading @@ -113,11 +113,9 @@ class Negotiator(object): # TCP failed to give us any data. Break out. break for byte in data: byte_int = self.byte_to_int(byte) for byte_int in bytearray(data): if self.state == self.NO_NEG: self.no_neg(byte, byte_int, buffer) self.no_neg(byte_int, buffer) elif self.state == self.START_NEG: self.start_neg(byte_int) elif self.state in [self.WILL, self.WONT, self.DO, self.DONT]: Loading @@ -131,10 +129,7 @@ class Negotiator(object): return buffer def byte_to_int(self, byte): return struct.unpack(b'B', byte)[0] def no_neg(self, byte, byte_int, buffer): def no_neg(self, byte_int, buffer): # Not negotiating anything thus far. Check to see if we # should. if byte_int == NegTokens.IAC: Loading @@ -143,7 +138,7 @@ class Negotiator(object): self.state = self.START_NEG else: # Just append the incoming byte to the buffer buffer.append(byte) buffer.append(byte_int) def start_neg(self, byte_int): # In a negotiation. Loading Loading @@ -192,12 +187,8 @@ class Negotiator(object): self.state) self.state = self.NO_NEG def send_message(self, message): packed_message = self.pack(message) self.tcp.sendall(packed_message) def pack(self, arr): return struct.pack(b'{0}B'.format(len(arr)), *arr) def send_message(self, message_ints): self.tcp.sendall(bytearray(message_ints)) def send_iac(self, arr): message = [NegTokens.IAC] Loading Loading
tests/negtelnetserver.py +12 −21 Original line number Diff line number Diff line Loading @@ -9,7 +9,6 @@ import argparse import os import sys import logging import struct try: # Python 2 import SocketServer as socketserver except ImportError: # Python 3 Loading @@ -22,8 +21,8 @@ IDENT = "NTEL" # The strings that indicate the test framework is checking our aliveness VERIFIED_REQ = b"verifiedserver" VERIFIED_RSP = b"WE ROOLZ: {pid}" VERIFIED_REQ = "verifiedserver" VERIFIED_RSP = "WE ROOLZ: {pid}" def telnetserver(options): Loading @@ -34,7 +33,7 @@ def telnetserver(options): if options.pidfile: pid = os.getpid() with open(options.pidfile, "w") as f: f.write(b"{0}".format(pid)) f.write(str(pid)) local_bind = (HOST, options.port) log.info("Listening on %s", local_bind) Loading Loading @@ -68,9 +67,10 @@ class NegotiatingTelnetHandler(socketserver.BaseRequestHandler): data = neg.recv(1024) log.debug("Incoming data: %r", data) if VERIFIED_REQ in data: if VERIFIED_REQ.encode('ascii') in data: log.debug("Received verification request from test framework") response_data = VERIFIED_RSP.format(pid=os.getpid()) response = VERIFIED_RSP.format(pid=os.getpid()) response_data = response.encode('ascii') else: log.debug("Received normal request - echoing back") response_data = data.strip() Loading Loading @@ -113,11 +113,9 @@ class Negotiator(object): # TCP failed to give us any data. Break out. break for byte in data: byte_int = self.byte_to_int(byte) for byte_int in bytearray(data): if self.state == self.NO_NEG: self.no_neg(byte, byte_int, buffer) self.no_neg(byte_int, buffer) elif self.state == self.START_NEG: self.start_neg(byte_int) elif self.state in [self.WILL, self.WONT, self.DO, self.DONT]: Loading @@ -131,10 +129,7 @@ class Negotiator(object): return buffer def byte_to_int(self, byte): return struct.unpack(b'B', byte)[0] def no_neg(self, byte, byte_int, buffer): def no_neg(self, byte_int, buffer): # Not negotiating anything thus far. Check to see if we # should. if byte_int == NegTokens.IAC: Loading @@ -143,7 +138,7 @@ class Negotiator(object): self.state = self.START_NEG else: # Just append the incoming byte to the buffer buffer.append(byte) buffer.append(byte_int) def start_neg(self, byte_int): # In a negotiation. Loading Loading @@ -192,12 +187,8 @@ class Negotiator(object): self.state) self.state = self.NO_NEG def send_message(self, message): packed_message = self.pack(message) self.tcp.sendall(packed_message) def pack(self, arr): return struct.pack(b'{0}B'.format(len(arr)), *arr) def send_message(self, message_ints): self.tcp.sendall(bytearray(message_ints)) def send_iac(self, arr): message = [NegTokens.IAC] Loading