# -*- coding: utf-8 -*- from __future__ import division # Import floating-point division (1/4=0.25) instead of Euclidian division (1/4=0) import logging import re import os class regex_list(object): """ For testing purposes: https://regex101.com/ """ def __init__(self, p_logger): self.__logger = p_logger self.__state = 'none' self.__flags = re.DOTALL | re.VERBOSE self.__regexps = dict() self.__regexps['exec_test'] = re.compile(r'^(?P\d+):(?P\d+):(?P\d+).(?P\d+)(\s((?Pmtc|\d+))){0,1}\sTESTCASE\s.*\sTest\scase\s(?P\w+)\sstarted\.$', self.__flags) self.__regexps['map_port'] = re.compile(r'^(?P\d+):(?P\d+):(?P\d+).(?P\d+)(\s((?Pmtc|\d+))){0,1}\sPORTEVENT\s.*\sPort\s(?P\w+)\swas\s(?P\w+)\sto\s(?P\w+):(?P\w+)\.$', self.__flags) self.__regexps['unmap_port'] = re.compile(r'^(?P\d+):(?P\d+):(?P\d+).(?P\d+)(\s((?Pmtc|\d+))){0,1}\sPORTEVENT\s.*\sPort\s(?P\w+)\swas\s(?P\w+)\sfrom\s(?P\w+):(?P\w+)\.$', self.__flags) self.__regexps['messages'] = self.__regexps['unmap_port'] self.__regexps['message_out'] = re.compile(r'^(?P\d+):(?P\d+):(?P\d+).(?P\d+)(\s((?Pmtc|\d+))){0,1}\sPORTEVENT\s.*\sSent\son\s(?P\w+)\sto\ssystem(\(.*\)){0,1}\s@(?P\w+).(?P\w+)\s:\s\{(?P.+)\}$', self.__flags) self.__regexps['message_in'] = re.compile(r'^(?P\d+):(?P\d+):(?P\d+).(?P\d+)(\s((?Pmtc|\d+))){0,1}\sPORTEVENT\s.*\sMessage\senqueued\son\s(?P\w+)\sfrom\ssystem\s@(?P\w+).(?P\w+)\s:\s\{(?P.+)\}\sid\s\w+$', self.__flags) self.__regexps['timer'] = re.compile(r'^(?P\d+):(?P\d+):(?P\d+).(?P\d+)(\s((?Pmtc|\d+))){0,1}\sTIMEROP\s.*\s(?P\w+)\stimer\s(?P\w+):\s(?P\w+).*$', self.__flags) self.__regexps['final_verdict'] = re.compile(r'^(?P\d+):(?P\d+):(?P\d+).(?P\d+)(\s((?Pmtc|\d+))){0,1}\sTESTCASE\s.*\sTest\scase\s(.+)\sfinished\.\sVerdict:\s(?P\w+)$', self.__flags) self.__regexps['verdict'] = re.compile(r'^(?P\d+):(?P\d+):(?P\d+).(?P\d+)(\s((?Pmtc|\d+))){0,1}\sVERDICTOP\s.*\sLocal\sverdict\sof\s(MTC|PTC)(\s(?P.+)){0,1}:\s(?P\w+).*$', self.__flags) # End of __init__ class def reset_state(self): self.__state = 'exec_test' self.__logger.info('Reset state: ' + self.__state) # End of reset_state def parse_line(self, p_line): self.__logger.debug('>>> regex_list::parse_line: ' + self.__get_state()) self.__logger.debug('regex_list::parse_line: ' + p_line) if self.__get_state() == 'exec_test': m = self.__regexps[self.__state].match(p_line) if m: self.__set_next_state() return m.groupdict() elif self.__get_state() == 'map_port': m = self.__regexps[self.__state].match(p_line) if m: return m.groupdict() else: # In map_port, check for TTCN-3 message sent ('message_out') and received ('message_in') m = self.__regexps['message_out'].match(p_line) if m: self.__set_next_state() return m.groupdict() else: m = self.__regexps['message_in'].match(p_line) if m: self.__set_next_state() return m.groupdict() else: m = self.__regexps['timer'].match(p_line) if m: return m.groupdict() elif self.__get_state() == 'messages': m = self.__regexps[self.__state].match(p_line) if m: self.__set_next_state() return m.groupdict() else: # Check for TTCN-3 message sent ('message_out') and received ('message_in') m = self.__regexps['message_out'].match(p_line) if m: return m.groupdict() else: m = self.__regexps['message_in'].match(p_line) if m: return m.groupdict() else: m = self.__regexps['timer'].match(p_line) if m: return m.groupdict() elif self.__get_state() == 'unmap_port': m = self.__regexps[self.__state].match(p_line) if m: return m.groupdict() else: # In unmap_port, check for final_verdict m = self.__regexps['verdict'].match(p_line) if m: self.__set_next_state() return m.groupdict() elif self.__get_state() == 'verdict' or self.__get_state() == 'final_verdict': m = self.__regexps['verdict'].match(p_line) if m: return m.groupdict() else: m = self.__regexps['final_verdict'].match(p_line) if m: self.__set_next_state() return m.groupdict() return None # End of method parse_line def __set_next_state(self): if self.__state == 'exec_test': self.__logger.info('regex_list::parse_line: Set state to map_port') self.__state = 'map_port' elif self.__state == 'map_port': self.__logger.info('regex_list::parse_line: Set state to messages') self.__state = 'messages' elif self.__state == 'messages': self.__logger.info('regex_list::parse_line: Set state to unmap_port') self.__state = 'unmap_port' elif self.__state == 'unmap_port': self.__logger.info('regex_list::parse_line: Set state to verdict') self.__state = 'verdict' elif self.__state == 'verdict': self.__logger.info('regex_list::parse_line: Set state to final_verdict') self.__state = 'final_verdict' elif self.__state == 'final_verdict': self.__logger.info('regex_list::parse_line: Terminated') self.reset_state() # End of method __set_next_state def __get_state(self): return self.__state # End of method __get_state # End of class regex_list # End of file