Commit 3c542779 authored by Yann Garcia's avatar Yann Garcia
Browse files

Add log parser

parent e27e2510
Loading
Loading
Loading
Loading
Loading

plantuml/README.md

0 → 100644
+0 −0

Empty file added.

plantuml/__main__.py

0 → 100644
+186 −0
Original line number Original line Diff line number Diff line
# -*- Coding: utf-8 -*-

from __future__ import division # Import floating-point division (1/4=0.25) instead of Euclidian division (1/4=0)

import os
import glob
import sys
import logging

from regex_list import *

MSG_PATH = ""
IN_FILE_PATH = ""
OUT_FILE_PATH = ""
out_streams = object()
logger = object()
parser = object()
data_ctx = dict()

def process_command_line_args():
    global IN_FILE_PATH

    print(f"Arguments count: {len(sys.argv)}")
    for i, arg in enumerate(sys.argv):
        print(f"Argument {i:>6}: {arg}")
    IN_FILE_PATH = sys.argv[1]
    # End of function process_command_line_args

def process_line(line):
    global MSG_PATH, out_streams, logger, parser, data_ctx

    match = parser.parse_line(line)
    if match != None:
        logger.info(f"Matching: {match}")
        if 'testcase' in match:
            logger.info('Got testcase')
            out_streams.write('@startuml\n')
            out_streams.write(f"title {match['testcase']}\n")
            out_streams.write("/'Editor: https://plantuml-editor.kkeisuke.com/ '/\n")
            out_streams.write("/'Syntax: http://plantuml.com/fr/sequence-diagram '/\n")
            out_streams.write('!include etsi-style.iuml\n')
            out_streams.write('!pragma teoz true\n')
            out_streams.write('autonumber\n')
            out_streams.write('participant system\n')
            out_streams.write('box "TTCN-3 Environment" #LightBlue\n')
        elif 'operation' in match:
            logger.info('Got operation')
            if match['operation'] == 'mapped':
                out_streams.write(f"participant \"{match['comp']}:{match['protocol_port']}\"\n")
                data_ctx['protocol_port'].append(f"{match['comp']}:{match['protocol_port']}")
            else:
                if match['protocol_port'] != 'syncPort':
                    out_streams.write(f"\"{match['comp']}:{match['protocol_port']}\" --> system: unmap\n")
        elif 'request' in match:
            logger.info('Got request')
            if data_ctx['box_ended'] == False:
                out_streams.write('endbox\n')
                data_ctx['box_ended'] = True
                for p in data_ctx['protocol_port']:
                    out_streams.write(f"\"{p}\" --> system: map\n")
            out_streams.write(f"\"{match['comp']}:{match['port']}\" -[#green]> system: @{match['module']}.{match['message']}\n")
            s =  match['request'].replace(",", ",\\n").replace("\"", "").strip()
            if len(s) > 128:
                n = match['hours'] + "_" + match['minutes'] + "_" + match['seconds'] + "_" + match['milliseconds'] + ".txt"
                n = os.path.join(MSG_PATH, n)
                with open(n, "w") as msg:
                    msg.write(s)
                s = f'[[file://{n} request := ...]]' #{Click to read the message}
            out_streams.write(f"note right: {s}\n")
            out_streams.write(f"note left: {match['hours']}:{match['minutes']}:{match['seconds']}.{match['milliseconds']}\n")
        elif 'response' in match:
            logger.info('Got response')
            if data_ctx['box_ended'] == False:
                out_streams.write('endbox\n')
                data_ctx['box_ended'] = True
            out_streams.write(f"system -[#green]> \"{match['comp']}:{match['port']}\": @{match['module']}.{match['message']}\n")
            s = match['response'].replace(",", ",\\n").replace("\"", "").strip()
            if len(s) > 128:
                n = match['hours'] + "_" + match['minutes'] + "_" + match['seconds'] + "_" + match['milliseconds'] + ".txt"
                n = os.path.join(MSG_PATH, n)
                with open(n, "w") as msg:
                    msg.write(s)
                s = f'[[file://{n} response := ...]]' #{Click to read the message}
            out_streams.write(f"note right: {s}\n")
            out_streams.write(f"note left: {match['hours']}:{match['minutes']}:{match['seconds']}.{match['milliseconds']}\n")
        elif 'timer_name' in match:
            logger.info('Got timer')
            if match['timer_name'] == 'tc_sync':
                pass
            else:
                for p in data_ctx['protocol_port']:
                    if p.startswith(match['comp']):
                        out_streams.write(f"\"{p}\" -> \"{p}\": {match['start_stop']}, duration: {match['duration']}\n")
                        break
            out_streams.write(f"note left: {match['hours']}:{match['minutes']}:{match['seconds']}.{match['milliseconds']}\n")
        elif 'verdict' in match:
            logger.info('Got verdict')
            if 'ptcid' in match and match['ptcid'] != None:
                s = match['ptcid'].split('(')
                s = s[1].split(')')
                for p in data_ctx['protocol_port']:
                    if p.startswith(s[0]):
                        if match['verdict'] == 'fail':
                            out_streams.write(f"hnote over \"{p}\" #red: fail\n")
                        elif match['verdict'] == 'pass':
                            out_streams.write(f"hnote over \"{p}\" #green: pass\n")
                        elif match['verdict'] == 'inconc':
                            out_streams.write(f"hnote across #yellow: inconc\n")
                        else:
                            out_streams.write(f"hnote over \"{p}\" #gray: error\n")
                        out_streams.write(f"note left: {match['hours']}:{match['minutes']}:{match['seconds']}.{match['milliseconds']}\n")
                        break
        elif 'final_verdict' in match:
            logger.info('Got verdict')
            if match['final_verdict'] == 'fail':
                out_streams.write(f"hnote across #red: fail\n")
            elif match['final_verdict'] == 'pass':
                out_streams.write(f"hnote across #green: pass\n")
            elif match['final_verdict'] == 'inconc':
                out_streams.write(f"hnote across #yellow: inconc\n")
            else:
                out_streams.write(f"hnote across #gray: error\n")
            out_streams.write(f"note left: {match['hours']}:{match['minutes']}:{match['seconds']}.{match['milliseconds']}\n")
        else:
            logger.info('Got unsupported item')
        # End of 'if' statement
        out_streams.flush()
    # End of function process_line

def main():
    global MSG_PATH, OUT_FILE_PATH, out_streams, logger, parser, data_ctx

    process_command_line_args()

    # Initialize the logger
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    l = logging.StreamHandler()
    l.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
    logger.addHandler(l)

    # Prepare outputs
    OUT_FILE_PATH = sys.argv[2]
    CURRENT_PATH = os.getcwd()
    MSG_PATH = os.path.join(CURRENT_PATH, "msgs")
    logger.info(f'CURRENT_PATH:{CURRENT_PATH} - MSG_PATH{MSG_PATH}')
    if os.path.exists(MSG_PATH):
        files = glob.glob(os.path.join(MSG_PATH, '*'))
        for f in files:
            os.remove(f)
    else:
        os.mkdir(MSG_PATH)
    
    # Setup the regex
    parser = regex_list(logger)
    parser.reset_state()

    data_ctx = dict()
    data_ctx['protocol_port'] = []
    data_ctx['box_ended'] = False

    print(f"IN_FILE_PATH= {IN_FILE_PATH}")
    with open(IN_FILE_PATH, "r") as in_stream:
        with open(OUT_FILE_PATH, "w") as out_streams:
            line = in_stream.readline()
            while line != '':
                process_line(line)
                line = in_stream.readline()
            # End of 'for' statement
            # Add the end tag
            out_streams.write('@enduml\n')
            out_streams.flush()
        # End of 'with' statement
    # End of 'with' statement

    # Check if file is not empty
    if os.path.getsize(OUT_FILE_PATH) == 0:
        logger.error(f'Empty result, please check that FileMask is properly set: ERROR | WARNING | USER | MATCHING | EXECUTOR_RUNTIME | VERDICTOP | PORTEVENT | TIMEROP | TESTCASE')
    else:
        os.system('java -DPLANTUML_LIMIT_SIZE=8192 -jar ./plantuml.jar -svg {}'.format(OUT_FILE_PATH))
# End of main function

if __name__ == "__main__":
    main()

# End of file
+46 −0
Original line number Original line Diff line number Diff line
hide footbox
autonumber
skinparam {
    dpi 300
    shadowing false
    'handwritten true
    defaultFontSize 8
}

skinparam note {
    FontSize 8
    FontStyle bold
    FontColor White
    borderColor Black
    backgroundColor DimGray
}

skinparam participant {
    FontSize 8
    FontStyle bold
    FontColor White
    borderColor Navy
    backgroundColor RoyalBlue
}

skinparam actor {
    FontSize 8
    FontStyle bold
    FontColor Navy
    borderColor Navy
    backgroundColor RoyalBlue
}

skinparam sequence {
    ArrowFontColor Black
    ArrowColor SlateGray
    LifeLineBorderColor Navy
    LifeLineBackgroundColor DimGray
}

skinparam ParticipantPadding 5
skinparam BoxPadding 5

!define sendrcvRTP(a,b,c) a <-[#LimeGreen]> b : c
!define sendRTP(a,b,c) a -[#LimeGreen]> b : c
!define setParticipant(a,b) participant a as b

plantuml/plantuml.jar

0 → 100644
+8 MiB

File added.

No diff preview for this file type.

plantuml/regex_list.py

0 → 100644
+124 −0
Original line number Original line Diff line number Diff line
# -*- coding: utf-8 -*-
from __future__ import division # Import floating-point division (1/4=0.25) instead of Euclidian division (1/4=0)

import logging
import re
import os
class regex_list(object):
    """
    For testing purposes: https://regex101.com/
    """
    def __init__(self, p_logger):
        self.__logger = p_logger
        self.__state = 'none'
        self.__flags = re.DOTALL | re.VERBOSE
        self.__regexps = dict()
        self.__regexps['exec_test'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sTESTCASE\s.*\sTest\scase\s(?P<testcase>\w+)\sstarted\.$', self.__flags)
        self.__regexps['map_port'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sPORTEVENT\s.*\sPort\s(?P<protocol_port>\w+)\swas\s(?P<operation>\w+)\sto\s(?P<system>\w+):(?P<system_port>\w+)\.$', self.__flags)
        self.__regexps['unmap_port'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sPORTEVENT\s.*\sPort\s(?P<protocol_port>\w+)\swas\s(?P<operation>\w+)\sfrom\s(?P<system>\w+):(?P<system_port>\w+)\.$', self.__flags)
        self.__regexps['messages'] = self.__regexps['unmap_port']
        self.__regexps['message_out'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sPORTEVENT\s.*\sSent\son\s(?P<port>\w+)\sto\ssystem(\(.*\)){0,1}\s@(?P<module>\w+).(?P<message>\w+)\s:\s\{(?P<request>.+)\}$', self.__flags)
        self.__regexps['message_in'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sPORTEVENT\s.*\sMessage\senqueued\son\s(?P<port>\w+)\sfrom\ssystem\s@(?P<module>\w+).(?P<message>\w+)\s:\s\{(?P<response>.+)\}\sid\s\w+$', self.__flags)
        self.__regexps['timer'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sTIMEROP\s.*\s(?P<start_stop>\w+)\stimer\s(?P<timer_name>\w+):\s(?P<duration>\w+).*$', self.__flags)
        self.__regexps['final_verdict'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sTESTCASE\s.*\sTest\scase\s(.+)\sfinished\.\sVerdict:\s(?P<final_verdict>\w+)$', self.__flags)
        self.__regexps['verdict'] = re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+).(?P<milliseconds>\d+)(\s((?P<comp>mtc|\d+))){0,1}\sVERDICTOP\s.*\sLocal\sverdict\sof\s(MTC|PTC)(\s(?P<ptcid>.+)){0,1}:\s(?P<verdict>\w+).*$', self.__flags)
    # End of __init__ class

    def reset_state(self):
        self.__state = 'exec_test'
        self.__logger.info('Reset state: ' + self.__state)
    # End of reset_state

    def parse_line(self, p_line):
        self.__logger.debug('>>> regex_list::parse_line: ' + self.__get_state())
        self.__logger.debug('regex_list::parse_line: ' + p_line)

        if self.__get_state() == 'exec_test':
            m = self.__regexps[self.__state].match(p_line)
            if m:
                self.__set_next_state()
                return m.groupdict()
        elif self.__get_state() == 'map_port':
            m = self.__regexps[self.__state].match(p_line)
            if m:
                return m.groupdict()
            else: # In map_port, check for TTCN-3 message sent ('message_out') and received ('message_in')
                m = self.__regexps['message_out'].match(p_line)
                if m:
                    self.__set_next_state()
                    return m.groupdict()
                else:
                    m = self.__regexps['message_in'].match(p_line)
                    if m:
                        self.__set_next_state()
                        return m.groupdict()
                    else:
                        m = self.__regexps['timer'].match(p_line)
                        if m:
                            return m.groupdict()
        elif self.__get_state() == 'messages':
            m = self.__regexps[self.__state].match(p_line)
            if m:
                self.__set_next_state()
                return m.groupdict()
            else: # Check for TTCN-3 message sent ('message_out') and received ('message_in')
                m = self.__regexps['message_out'].match(p_line)
                if m:
                    return m.groupdict()
                else:
                    m = self.__regexps['message_in'].match(p_line)
                    if m:
                        return m.groupdict()
                    else:
                        m = self.__regexps['timer'].match(p_line)
                        if m:
                            return m.groupdict()
        elif self.__get_state() == 'unmap_port':
            m = self.__regexps[self.__state].match(p_line)
            if m:
                return m.groupdict()
            else: # In unmap_port, check for final_verdict
                m = self.__regexps['verdict'].match(p_line)
                if m:
                    self.__set_next_state()
                    return m.groupdict()
        elif self.__get_state() == 'verdict' or self.__get_state() == 'final_verdict':
            m = self.__regexps['verdict'].match(p_line)
            if m:
                return m.groupdict()
            else:
                m = self.__regexps['final_verdict'].match(p_line)
                if m:
                    self.__set_next_state()
                    return m.groupdict()
        return None
    # End of method parse_line

    def __set_next_state(self):
        if self.__state == 'exec_test':
            self.__logger.info('regex_list::parse_line: Set state to map_port')
            self.__state = 'map_port'
        elif self.__state == 'map_port':
            self.__logger.info('regex_list::parse_line: Set state to messages')
            self.__state = 'messages'
        elif self.__state == 'messages':
            self.__logger.info('regex_list::parse_line: Set state to unmap_port')
            self.__state = 'unmap_port'
        elif self.__state == 'unmap_port':
            self.__logger.info('regex_list::parse_line: Set state to verdict')
            self.__state = 'verdict'
        elif self.__state == 'verdict':
            self.__logger.info('regex_list::parse_line: Set state to final_verdict')
            self.__state = 'final_verdict'
        elif self.__state == 'final_verdict':
            self.__logger.info('regex_list::parse_line: Terminated')
            self.reset_state()
    # End of method __set_next_state

    def __get_state(self):
        return self.__state
    # End of method __get_state

# End of class regex_list

# End of file
Loading