Commit 805e55dc authored by Miguel Angel Reina Ortega's avatar Miguel Angel Reina Ortega
Browse files

Rewrite the way changes are extracted into md files

parent dc8eb70b
Loading
Loading
Loading
Loading
+197 −4
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@
#	(c) 2023 by Miguel Angel Reina Ortega
#	License: BSD 3-Clause License. See the LICENSE file for further details.
#

from typing import Tuple
import argparse, os, re, sys
from rich import print
from rich.progress import Progress, TextColumn, TimeElapsedColumn
@@ -341,16 +341,209 @@ https://forge.etsi.org/rep/cdm/pipeline-scripts/-/blob/main/common/Dockerfile.st
    #clauseMDlines.insert(j, "\n\n<br />")
    writeMDFile(progress, clauseMDlines, changed_clause.clause_nr.replace(" ","") + '.md', outDirectory)


def integrate_changes(progress: Progress, mdLines: list[str], mr: MR) -> list[Tuple[str, bool]]:
    '''
    Integrate changes from merge request to the target document
https://forge.etsi.org/rep/cdm/pipeline-scripts/-/blob/main/common/Dockerfile.stfubuntu    Returns a list of Clauses, start index and end index
    '''

    _taskID = progress.add_task('[blue]Integrate changes', start=False, total=0)

    spec_with_changes: list[Tuple[str, bool]] = []

    changed_clauses: list[Clause] = []
    empty = ""
    changed_clause = Clause(empty, 0, 0, empty)

    for patched_file in mr.patch_set:
        index_source = 1
        if patched_file.source_file.startswith("a/TS") or patched_file.source_file.startswith("a/TR"):
            logging.debug(f"Looking at changes in {patched_file.source_file}")
            lines_added = 0
            lines_removed = 0
            previous_change_lines_added = 0
            for change in patched_file:
                logging.debug(f'Change from patch details: source_start: {change.source_start} - target_start: {change.target_start}')
                while index_source < change.source_start:
                    if mdLines[index_source-1].startswith("#"):
                        spec_with_changes.append((mdLines[index_source-1]+"\n\n", False))
                    else:
                        spec_with_changes.append((mdLines[index_source - 1]+"\n", False))
                    index_source += 1

                # Sanity check
                if change.target_start == index_source + lines_added - lines_removed:
                    logging.debug(
                        f'Change applied correctly, indexes on track. Added {lines_added} lines and removed {lines_removed} lines')
                else:
                    logging.debug(f'Something is wrong ...')
                    break

                change_lines_added = 0
                change_lines_removed = 0
                for line in change:
                    #print(index_source, len(mdLines))
                    if line.is_added:
                        spec_with_changes.append((addedLine(line), True))
                        change_lines_added += 1
                    elif line.is_removed:
                        spec_with_changes.append((removedLine(line), True))
                        index_source += 1
                        change_lines_removed += 1
                    else:
                        if line.value.startswith("#"):
                            spec_with_changes.append((line.value+"\n\n", False))
                        else:
                            spec_with_changes.append((line.value + "\n", False))
                        #spec_with_changes.append((mdLines[index_source-1], False))
                        index_source += 1

                lines_added += change_lines_added
                lines_removed += change_lines_removed

            logging.debug(f'Applied changes. Total added lines: {lines_added}.Total removed lines {lines_removed}')

    return spec_with_changes

def addedLine(line: str) -> str:
    if line.value.strip().startswith("|"):  # It is a table
        tableElements = line.value.strip().split("|")
        modifiedElements: list[str] = []
        for element in tableElements:
            if not element.strip() == '':
                modifiedElements.append("<span class=\"underline\">" + element.strip() + "</span>")
            # modifiedRow = "|" + "|".join(modifiedElements) + "|" + "\n"
            else:
                modifiedElements.append(" ")
        modifiedRow = "|".join(modifiedElements)
        return modifiedRow + "\n"
    else:
        if not line.value.strip() == '':
            if line.value.startswith("!["):  # It is a figure
                return line.value.strip() + "\n\n" #TODO How a figure should be marked if it is modified
                # clauseMDlines.insert(j, "<span class=\"underline\">" + line.value.strip() + "</span>\n") #Track change OK Caption Not OK
            else:
                return "<span class=\"underline\">" + line.value.strip() + "</span>" + "\n\n"  # it works for simple lines, not for lines in a list
            # clauseMDlines.insert(j, "<mark>" + line.value.strip("\n") + "</mark>\n\n")
            # if (j + 1) <= len(clauseMDlines):
            #    clauseMDlines.pop(j+1)
        else:
            return line.value  # Add an extra line not marked as added
            #return "\n"  # Add an extra line not marked as added

def removedLine(line: str) -> str:
    if line.value.strip().startswith("|"):  # It is a table
        tableElements = line.value.strip().split("|")
        modifiedElements: list[str] = []
        for element in tableElements:
            if not element.strip() == '':
                modifiedElements.append("~~" + element.strip() + "~~ ")
            # modifiedRow = "|" + "|".join(modifiedElements) + "|" + "\n"
            else:
                modifiedElements.append(" ")
        modifiedRow = "|".join(modifiedElements)
        return modifiedRow + "\n"
    else:
        if not line.value.strip() == '':
            return "~~" + line.value.strip() + "~~" + "\n\n"

    return line.value

def find_clauses_with_changes(progress: Progress, mdLines_changes: list[Tuple[str, bool]]) -> Tuple[list[Clause], list[Clause]]:
    '''
    Scans the body of the document to find all clauses
    Returns a list of Clauses, start index and end index
    '''

    _taskID = progress.add_task('[blue]Find clauses with changes', start=False, total=0)

    clauseregex = re.compile('^#+\s(\d+(\.\d+)*|Annex \w(\.\d+)*|\w*(\.\d+)*).*')
    clauses: list[Clause] = []
    changed_clauses: list[Clause] = []
    changeInClause = False
    empty = ""
    clause = Clause(empty, 1, 1, "0")

    # for line in mdLines:
    #    if line.startswith('#'):
    #        # Clause 0 (from start to first clause) found
    #        clause.to_id = index - 1
    #        clauses.append(clause)
    #        break
    #    index = index + 1

    index = 1
    for line, change in mdLines_changes:
        if change:
            changeInClause = True
        if line.startswith('#'):
            matches = re.findall(clauseregex, line)  # Match heading
            if matches:  # It may be the end of the clause or the start of a subclause
                if index - 2 == clause.from_id:  # It is a subclause
                    clause.from_id = index
                    clause.raw = line
                    clause.clause_nr = matches[0][0]
                else:  # It is the end of the clause
                    clause.to_id = index - 1
                    clauses.append(clause)
                    if changeInClause:
                        changed_clauses.append(clause)
                    clause = Clause(line, index, index, matches[0][0])
                    changeInClause = False
            else:  # it is last clause
                print("Unknown heading")

        index = index + 1

        # Append last clause (usually History)
    clause.to_id = index - 1
    clauses.append(clause)
    if changeInClause:
        changed_clauses.append(clause)

    logging.debug(f"Number of clauses: {len(clauses)}")
    for clause in clauses:
        logging.debug(clause.clause_nr)
        logging.debug(clause.from_id)
        logging.debug(clause.to_id)

    return (clauses, changed_clauses)

def saveChangedClauses(progress: Progress, outDirectory:str, changed_clauses: list[Clause], mdLines: list[str]):
    for clause in changed_clauses:
        clauseMDLines = []
        i = clause.from_id
        while i < clause.to_id:
            clauseMDLines.append(mdLines[i-1])
            i += 1
        writeMDFile(progress, clauseMDLines, clause.clause_nr.replace(" ", "") + '.md', outDirectory)


def process(document:str, outDirectory:str, mr:MR) -> None:
    with Progress(TextColumn('{task.description}'),  TimeElapsedColumn()) as progress:
                #old process
                #sourceText = mr.retrieve_text(mr.target_branch, document)
                #sourceMdLines = sourceText.splitlines(keepends=False)
                #clauses = find_all_clauses(progress, sourceMdLines)
                #changed_clauses = find_changed_clauses(progress, sourceMdLines, clauses, mr, outDirectory)
                #Export list of changed clauses
                #with open(f'{outDirectory}/changedClauses.txt', "w", encoding='utf-8', errors='replace') as f:
                #    f.write("\n".join([clause.clause_nr for clause in changed_clauses]))
                #    f.close()

                #new process
                sourceText = mr.retrieve_text(mr.target_branch, document)
                sourceMdLines = sourceText.splitlines(keepends=False)
                clauses = find_all_clauses(progress, sourceMdLines)
                changed_clauses = find_changed_clauses(progress, sourceMdLines, clauses, mr, outDirectory)
                targetMdLines_changes = integrate_changes(progress,sourceMdLines, mr)
                #print(targetMdLines)
                all_clauses, changed_clauses = find_clauses_with_changes(progress, targetMdLines_changes)
                # Export list of changed clauses
                with open(f'{outDirectory}/changedClauses.txt', "w", encoding='utf-8', errors='replace') as f:
                    f.write("\n".join([clause.clause_nr for clause in changed_clauses]))
                    f.close()
                saveChangedClauses(progress, outDirectory, changed_clauses, [l[0] for l in targetMdLines_changes])


def main(args=None):
    # Parse command line arguments