From 211ab8ef85172d12608fdc570587c41d04792fd9 Mon Sep 17 00:00:00 2001 From: LongHairedHacker Date: Wed, 28 Dec 2016 15:18:19 +0100 Subject: [PATCH] Added simple bus testing code --- software/python/test.py | 34 +++++--- software/python/tester.py | 166 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+), 10 deletions(-) create mode 100644 software/python/tester.py diff --git a/software/python/test.py b/software/python/test.py index 88b0418..d871f9b 100644 --- a/software/python/test.py +++ b/software/python/test.py @@ -4,6 +4,9 @@ from time import sleep from sss7 import SSS7 +packets = [[1, 0, 33, 2], + [1, 0, 22, 2]] + def to_byte_list(data): return [ord(c) for c in data] @@ -14,21 +17,32 @@ def to_string(data): def main(): SSS7.start("/dev/ttyUSB0") - while not SSS7.can_send() : - sleep(0.1) + for packet in packets: + while not SSS7.can_send(): + sleep(0.1) - SSS7.send(to_byte_list("Hello python")) - while not SSS7.can_send(): - sleep(0.1) + print "Paket: %s" % packet + SSS7.send(packet); - if SSS7.send_failed(): - print "Send failed" + #while(not SSS7.has_received()): + # sleep(0.01) + + #SSS7.get_received() + raw_input("Next ?") - while not SSS7.has_received(): - sleep(0.1) + #for i in [1,2,0]: + #while not SSS7.can_send(): + # sleep(0.1) - print to_string(SSS7.get_received()) + #print "Moving to %d" % i + #SSS7.send([8, 0, 255, i]); + + #while(not SSS7.has_received()): + # sleep(0.01) + + #SSS7.get_received() + raw_input("Next ?") diff --git a/software/python/tester.py b/software/python/tester.py new file mode 100644 index 0000000..d3c9f96 --- /dev/null +++ b/software/python/tester.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python2 + +import sys + +from cmd import Cmd +from time import sleep + +from sss7 import SSS7 + + +class Prompt(Cmd): + prompt = "SSS7> " + air_args = {'push' : 1, + 'pull' : 2, + 'off': 0} + + connect_routers = {'11' : ['0', '1', '2', '50'], + '22' : ['0', '1', '2', '50'], + '33' : ['0', '1', '2', '3', '4', '50']} + + def do_help(self, args): + print "Commands are:" + print "raw ... " + print "connnect " + print "air " + print "exit" + + def do_exit(self, args): + sys.exit() + + def do_raw(self, args): + args = args.strip().split() + + if len(args) > 16: + print "Payload can only contain 16 bytes" + + payload = [] + for byte in args: + + try: + byte = int(byte) + except ValueError: + print "Arguments should be integers" + return + + if byte > 255 or byte < 0: + print "Payload bytes must be in range 0..255" + return + payload += [byte] + + self.send_payload(payload) + + + def do_connect(self, args): + args = args.strip().split() + + if len(args) != 2: + print "Usage: connnect " + return + + try: + router = int(args[0]) + output = int(args[1]) + except ValueError: + print "Arguments should be integers" + return + + self.send_payload_wait([1, 0, router, output]) + + + def complete_connect(self, text, line, begin_index, end_index): + args = line.strip().split()[1:] + + matches = [] + if len(args) == 0 or (len(args) == 1 and not line.endswith(' ')): + matches = self.connect_routers.keys() + matches = filter(lambda x: x.startswith(text), matches) + + elif len(args) == 1 or (len(args) == 2 and not line.endswith(' ')): + if args[0] in self.connect_routers: + matches = self.connect_routers[args[0]] + + matches = filter(lambda x: x.startswith(text), matches) + return matches + + + def do_air(self, args): + args = args.strip().split() + + if len(args) != 1 or not args[0] in self.air_args: + print "Usage: air " + return + + command = self.air_args[args[0]] + if args[0] == "off": + self.send_payload([8, 0, 255, command]) + else: + self.send_payload_wait([8, 0, 255, command]) + + + def complete_air(self, text, line, begin_index, end_index): + matches = self.air_args.keys() + matches = filter(lambda x: x.startswith(text), matches) + return matches + + + def send_payload(self, payload): + while not SSS7.can_send(): + sleep(0.1) + + print "Sending payload %s" % payload + SSS7.send(payload) + + while not SSS7.can_send(): + sleep(0.1) + + if not SSS7.send_failed(): + print "Sucess !" + else: + print "Failed !" + + + def send_payload_wait(self, payload): + while not SSS7.can_send(): + sleep(0.1) + + print "Sending payload %s" % payload + SSS7.send(payload) + + while not SSS7.can_send(): + sleep(0.1) + + if SSS7.send_failed(): + print "Failed !" + + timeout = 20 + while timeout > 0 and not SSS7.has_received(): + sleep(1) + timeout -= 1 + sys.stdout.write('.') + sys.stdout.flush() + + print "" + + if not SSS7.has_received(): + print "No response" + return + + print "Received %s" % SSS7.get_received() + + +def main(): + if len(sys.argv) != 2: + print "Usage %s " % sys.argv[0] + sys.exit(0) + + if not SSS7.start(sys.argv[1]): + print "Could not start SSS7" + sys.exit(-1) + + prompt = Prompt() + prompt.cmdloop() + + +if __name__ == '__main__': + main()