Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
from __future__ import division # Import floating-point division (1/4=0.25) instead of Euclidian division (1/4=0)
import logging
import re
import os
class regex_list(object):
"""
For testing purposes: https://regex101.com/
"""
def __init__(self, p_logger):
self.__logger = p_logger
self.__state = 'none'
self.__flags = re.DOTALL | re.VERBOSE
self.__regexps = dict()
self.__regexps['exec_test'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sTESTCASE\s.*\sTest\scase\s(?P<testcase>\w+)\sstarted\.$', self.__flags)
self.__regexps['map_port'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sPORTEVENT\s.*\sPort\s(?P<protocol_port>\w+)\swas\s(?P<operation>\w+)\sto\s(?P<system>\w+):(?P<system_port>\w+)\.$', self.__flags)
self.__regexps['unmap_port'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sPORTEVENT\s.*\sPort\s(?P<protocol_port>\w+)\swas\s(?P<operation>\w+)\sfrom\s(?P<system>\w+):(?P<system_port>\w+)\.$', self.__flags)
self.__regexps['messages'] = self.__regexps['unmap_port']
self.__regexps['message_out'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sPORTEVENT\s.*\sSent\son\s(?P<port>\w+)\sto\ssystem(\(.*\)){0,1}\s@(?P<module>\w+).(?P<message>\w+)\s:\s\{(?P<request>.+)\}$', self.__flags)
self.__regexps['message_in'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sPORTEVENT\s.*\sMessage\senqueued\son\s(?P<port>\w+)\sfrom\ssystem\s@(?P<module>\w+).(?P<message>\w+)\s:\s\{(?P<response>.+)\}\sid\s\w+$', self.__flags)
self.__regexps['timer'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sTIMEROP\s.*\s(?P<start_stop>\w+)\stimer\s(?P<timer_name>\w+):\s(?P<duration>\w+).*$', self.__flags)
self.__regexps['final_verdict'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sTESTCASE\s.*\sTest\scase\s(.+)\sfinished\.\sVerdict:\s(?P<final_verdict>\w+)$', self.__flags)
self.__regexps['verdict'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sVERDICTOP\s.*\sLocal\sverdict\sof\s(MTC|PTC)(\s(?P<ptcid>.+)){0,1}:\s(?P<verdict>\w+).*$', self.__flags)
# End of __init__ class
def reset_state(self):
self.__state = 'exec_test'
self.__logger.info('Reset state: ' + self.__state)
# End of reset_state
def parse_line(self, p_line):
self.__logger.debug('>>> regex_list::parse_line: ' + self.__get_state())
self.__logger.debug('regex_list::parse_line: ' + p_line)
if self.__get_state() == 'exec_test':
m = self.__regexps[self.__state].match(p_line)
if m:
self.__set_next_state()
return m.groupdict()
elif self.__get_state() == 'map_port':
m = self.__regexps[self.__state].match(p_line)
if m:
return m.groupdict()
else: # In map_port, check for TTCN-3 message sent ('message_out') and received ('message_in')
m = self.__regexps['message_out'].match(p_line)
if m:
self.__set_next_state()
return m.groupdict()
else:
m = self.__regexps['message_in'].match(p_line)
if m:
self.__set_next_state()
return m.groupdict()
else:
m = self.__regexps['timer'].match(p_line)
if m:
return m.groupdict()
elif self.__get_state() == 'messages':
m = self.__regexps[self.__state].match(p_line)
if m:
self.__set_next_state()
return m.groupdict()
else: # Check for TTCN-3 message sent ('message_out') and received ('message_in')
m = self.__regexps['message_out'].match(p_line)
if m:
return m.groupdict()
else:
m = self.__regexps['message_in'].match(p_line)
if m:
return m.groupdict()
else:
m = self.__regexps['timer'].match(p_line)
if m:
return m.groupdict()
elif self.__get_state() == 'unmap_port':
m = self.__regexps[self.__state].match(p_line)
if m:
return m.groupdict()
else: # In unmap_port, check for final_verdict
m = self.__regexps['verdict'].match(p_line)
if m:
self.__set_next_state()
return m.groupdict()
elif self.__get_state() == 'verdict' or self.__get_state() == 'final_verdict':
m = self.__regexps['verdict'].match(p_line)
if m:
return m.groupdict()
else:
m = self.__regexps['final_verdict'].match(p_line)
if m:
self.__set_next_state()
return m.groupdict()
return None
# End of method parse_line
def __set_next_state(self):
if self.__state == 'exec_test':
self.__logger.info('regex_list::parse_line: Set state to map_port')
self.__state = 'map_port'
elif self.__state == 'map_port':
self.__logger.info('regex_list::parse_line: Set state to messages')
self.__state = 'messages'
elif self.__state == 'messages':
self.__logger.info('regex_list::parse_line: Set state to unmap_port')
self.__state = 'unmap_port'
elif self.__state == 'unmap_port':
self.__logger.info('regex_list::parse_line: Set state to verdict')
self.__state = 'verdict'
elif self.__state == 'verdict':
self.__logger.info('regex_list::parse_line: Set state to final_verdict')
self.__state = 'final_verdict'
elif self.__state == 'final_verdict':
self.__logger.info('regex_list::parse_line: Terminated')
self.reset_state()
# End of method __set_next_state
def __get_state(self):
return self.__state
# End of method __get_state
# End of class regex_list
# End of file