Commit 2c7cea8f authored by canterburym's avatar canterburym
Browse files

Bringing into line with SA3LI

parent 1d23738c
Pipeline #11024 failed with stage
in 18 seconds
This diff is collapsed.
[
[
"./102232-1/LI-PS-PDU.asn",
"./103280/TS_103_280.asn1",
"./testing/deps/33128/TS33128Payloads.asn",
"./testing/deps/33108/Three3gppHI1Notifications.asn",
"./testing/deps/33108/UMTSHI2Operations.asn",
"./testing/deps/33108/UMTSHI3PS.asn",
"./testing/deps/33108/EpsHI3PS.asn",
"./testing/deps/33108/ConfHI3IMS.asn",
"./testing/deps/33108/VoipHI3IMS.asn",
"./testing/deps/33108/GCSEHI3.asn",
"./testing/deps/33108/CSVoiceHI3IP.asn",
"./testing/deps/33108/UMTSCSHI2Operations.asn",
"./testing/deps/33108/EpsHI2Operations.asn",
"./testing/deps/33108/ConfHI2Operations.asn",
"./testing/deps/33108/ProSeHI2Operations.asn",
"./testing/deps/33108/GCSEHI2Operations.asn",
"./testing/deps/101671/HI1NotificationOperations,ver7.asn",
"./testing/deps/101671/HI2Operations,ver18.asn",
"./testing/deps/101909/TS101909201.asn",
"./testing/deps/101909/TS101909202.asn",
"./testing/deps/101909/PCESP.asn",
"./testing/deps/301040/06132v203_C01.asn",
"./103462/ILHIPDU.asn",
"./102232-2/EmailPDU.asn",
"./102232-3/IPAccessPDU.asn",
"./102232-4/L2AccessPDU.asn",
"./102232-5/IPMultimediaPDU.asn",
"./102232-6/PstnIsdnPDU.asn"
],
["./102657/RDMessage.asn"]
]
\ No newline at end of file
deps
temp
\ No newline at end of file
dependencies
\ No newline at end of file
import logging
import json
from pathlib import Path
from subprocess import run
from re import sub
from pycrate_asn1c.asnproc import *
def reconstrainInteger (filename):
Path('temp.asn').write_text(Path(filename).read_text().replace("18446744073709551615", "65536"))
return 'temp.asn'
filesWithBigInts = [
'102232-1/LI-PS-PDU.asn',
'102232-3/IPAccessPDU.asn',
'102232-4/L2AccessPDU.asn'
]
def syntaxCheckASN (fileList):
"""
Performs ASN syntax checking on a list of filenames (or pathlib Paths)
:param fileList: List of filenames (str or Pathlib Path)
:returns: Dict with result, return code and message for each filename
Calls the open-source asn1c compiler with the "syntax only" option.
As a result, asn1c must be available to run.
"""
results = {}
for file in fileList:
try:
if file.as_posix() in filesWithBigInts:
newFile = reconstrainInteger(str(file))
p = run(['asn1c', '-E', newFile], capture_output=True)
Path(newFile).unlink()
else:
p = run(['asn1c', '-E', str(file)], capture_output=True)
if (p.returncode != 0):
errorMessage = p.stderr.decode().splitlines()[0]
if errorMessage.startswith(' Value "18446744073709551615" at line'):
results[str(file)] = { 'ok' : True}
continue
results[str(file)] = {
'ok' : False,
'code' : p.returncode,
'message' : p.stderr.decode().splitlines()[0]
}
else:
results[str(file)] = {
'ok' : True
}
except Exception as ex:
results[str(file)] = {
'ok' : False,
'code' : -1,
'message' : f"{ex!r}"
}
return results
duplicateObjects = {
'102232-1/LI-PS-PDU.asn' : [
'CCPayload',
'IRIPayload',
'Location'
],
'testing/mod1.asn' : [
'ClashField'
]
}
def fixDuplicateObjects(filename):
stringContent = filename.read_text()
for object in duplicateObjects[filename.as_posix()]:
stringContent = stringContent.replace(f'{object} ::=', f'Native{object} ::=')
stringContent = stringContent.replace(f'SEQUENCE OF {object}', f'SEQUENCE OF Native{object}')
#stringContent = sub(f"]\\w{object}", f"] Native{object}", stringContent)
Path('temp.asn').write_text(stringContent)
return 'temp.asn'
def compileAllTargets (compileTargets):
"""
Attempts to compile a set of compile targets using the pycrate ASN1 tools
:param compileTargets: list of compile targets, each of which is a list of filenames
:returns: A dict of outcome against the first filename of each compile target. Return code and message are included for failures.
For each compile target (list of filenames) the first filename is assumed
to be the "primary" file. This doesn't have any relavance to the compilation,
but will be used as the identifier when reporting any compile errors.
The compilation is performed by the pycrate ASN compile functions; errors
are caught as exceptions and rendered into a list.
Unfortunately, the pycrate compiler doesn't report line numbers.
The asn1c compiler does, but doesn't properly handle identifiers with the
same name in different modules; as this occurs multiple times in TS 33.108,
we can't use it.
"""
results = {}
for target in compileTargets:
firstTarget = target[0]
logging.debug(f"Compiling {firstTarget}")
try:
fileTexts = []
fileNames = []
GLOBAL.clear()
for filename in target:
pFile = Path(filename)
if pFile.as_posix() in duplicateObjects:
tmpFile = Path(fixDuplicateObjects(pFile))
fileTexts.append(tmpFile.read_text())
#tmpFile.unlink()
else:
fileTexts.append(pFile.read_text())
fileNames.append(filename)
logging.debug (f" Loading {filename}")
compile_text(fileTexts, filenames = fileNames)
results[str(firstTarget)] = {
'ok' : True,
}
except Exception as ex:
results[str(firstTarget)] = {
'ok' : False,
'code' : -1,
'message' : f"{ex!r}"
}
continue
return results
def processResults (results, stageName):
"""
Counts the number of errors and writes out the output per filename
:param results: List of filenames (str or Pathlib Path)
:param stageName: Name to decorate the output with
:returns: The number of files which had errors
"""
print("")
errorCount = sum([1 for r in results.values() if not r['ok']])
logging.info(f"{errorCount} {stageName} errors encountered")
print(f"{'-':-<60}")
print(f"{stageName} results:")
print(f"{'-':-<60}")
for filename, result in results.items():
print(f" {filename:.<55}{'..OK' if result['ok'] else 'FAIL'}")
if not result['ok']:
if isinstance(result['message'], list):
for thing in result['message']:
print(f" {thing['message']}")
else:
print(f" {result['message']}")
print(f"{'-':-<60}")
print(f"{stageName} errors: {errorCount}")
print(f"{'-':-<60}")
return errorCount
if __name__ == '__main__':
logging.info('Searching for ASN.1 files')
fileList = list(Path(".").rglob("*.asn1")) + list(Path(".").rglob("*.asn"))
logging.info(f'{len(fileList)} ASN.1 files found')
for file in fileList:
logging.debug(f' {file}')
ignoreList = Path('testing/asn_ignore.txt').read_text().splitlines()
ignoredFiles = []
for ignore in ignoreList:
logging.debug(f'Ignoring pattern {ignore}')
for file in fileList:
if ignore in str(file):
ignoredFiles.append(file)
logging.debug(f" Ignoring {str(file)} as contains {ignore}")
ignoredFiles = list(set(ignoredFiles))
logging.info(f'{len(ignoredFiles)} files ignored')
for file in ignoredFiles:
logging.debug(f' {file}')
fileList = [file for file in fileList if file not in ignoredFiles]
logging.info(f'{len(fileList)} files to process')
for file in fileList:
logging.debug(f' {file}')
if len(fileList) == 0:
logging.warning ("No files specified")
exit(0)
logging.info("Parsing ASN1 files")
parseResults = syntaxCheckASN(fileList)
if processResults(parseResults, "Parsing") > 0:
exit(-1)
logging.info ("Getting compile targets")
compileTargets = json.loads(Path('testing/asn_compile_targets.json').read_text())
logging.info (f"{len(compileTargets)} compile targets found")
compileResults = compileAllTargets(compileTargets)
if processResults(compileResults, "Compiling") > 0:
exit(-1)
exit(0)
import logging
from compile_asn import *
if __name__ == '__main__':
log = logging.getLogger()
log.setLevel(logging.INFO)
parseErrors, compileErrors, parser = validateAllASN1FilesInPath("./")
parseErrorCount = 0
print ("ASN.1 Parser checks:")
print ("-----------------------------")
for filename, errors in parseErrors.items():
if len(errors) > 0:
parseErrorCount += len(errors)
print (f"{filename}: {len(errors)} errors")
for error in errors:
print (" " + str(error))
else:
print (f"{filename}: OK")
print ("-----------------------------")
print ("ASN.1 Compilation:")
print ("-----------------------------")
if len(compileErrors) > 0:
for error in compileErrors:
print (" " + str(error))
else:
print ("Compilation OK")
print ("-----------------------------")
print (f"{parseErrorCount} parse errors, {len(compileErrors)} compile errors")
exit (parseErrorCount + len(compileErrors))
import logging
import glob
import sys
import argparse
from pathlib import Path
from pprint import pprint
import os
from lxml import etree
from xml.etree.ElementTree import ParseError
from xmlschema import XMLSchema, XMLSchemaParseError
def BuildSchemaDictonary (fileList):
if len(fileList) == 0:
logging.info("No schema files provided")
return []
logging.info("Schema locations:")
schemaLocations = []
for schemaFile in fileList:
try:
xs = XMLSchema(schemaFile, validation='skip')
schemaLocations.append((xs.target_namespace, str(Path(schemaFile).resolve())))
logging.info(" [ {0} -> {1} ]".format(xs.default_namespace, schemaFile))
except ParseError as ex:
logging.warning (" [ {0} failed to parse: {1} ]".format(schemaFile, ex))
return schemaLocations
def BuildSchema (coreFile, fileList = None):
schemaLocations = []
if fileList and len(fileList) > 0:
schemaLocations = BuildSchemaDictonary(fileList)
coreSchema = XMLSchema(str(Path(coreFile)), locations=schemaLocations)
return coreSchema
def ValidateSingleFile (schemaFile):
try:
xs = XMLSchema(schemaFile, validation='skip')
except ParseError as ex:
logging.warning (" [ {0} failed to parse: {1} ]".format(schemaFile, ex))
return ex
return None
def ValidateXSDFiles (fileList):
if len(fileList) == 0:
logging.info("No schema files provided")
return {}
schemaLocations = BuildSchemaDictonary(fileList)
errors = {}
schemaDictionary = {}
logging.info("Schema validation:")
for schemaFile in fileList:
try:
schema = XMLSchema(schemaFile, locations = schemaLocations)
logging.info(schemaFile + ": OK")
errors[schemaFile] = []
schemaDictionary[schema.target_namespace] = schema
except XMLSchemaParseError as ex:
if (ex.schema_url) and (ex.schema_url != ex.origin_url):
logging.info(" Error {1} comes from {0}, suppressing".format(ex.schema_url, ex.message))
errors[schemaFile] = []
else:
logging.warning(schemaFile + ": Failed validation ({0})".format(ex))
errors[schemaFile] = [ex.message]
return errors, schemaDictionary
def ValidateInstanceDocuments (coreFile, supportingSchemas, instanceDocs):
if (instanceDocs is None) or len(instanceDocs) == 0:
logging.warning ("No instance documents provided")
return []
schema = BuildSchema(coreFile, supportingSchemas)
errors = []
for instanceDoc in instanceDocs:
try:
schema.validate(instanceDoc)
logging.info ("{0} passed validation".format(instanceDoc))
except Exception as ex:
logging.error ("{0} failed validation: {1}".format(instanceDoc, ex))
return errors
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbosity", help="verbosity level", action="count", default=0)
parser.add_argument("input", help="include a directory or file", action="append", nargs="+")
parser.add_argument("-p", "--primaryNamespace", help="Primary schema namespace for instance doc validation")
args = parser.parse_args()
logging.getLogger().setLevel(logging.WARNING)
if (args.verbosity >= 1):
logging.getLogger().setLevel(logging.INFO)
if (args.verbosity >= 2):
logging.getLogger().setLevel(logging.DEBUG)
logging.debug("Very verbose selected")
logging.debug(f"Path: {args.input}")
includeFileList = []
includeInstanceDocList = []
for path in args.input[0]:
p = Path(path)
if not p.exists():
logging.error(f"Include path {path} not found")
exit(1)
if p.is_dir():
logging.debug(f"Expanding directory")
for g in glob.glob(os.path.join(str(p), "*.xsd")):
logging.info(f">Including {g}")
includeFileList.append(g)
for g in glob.glob(os.path.join(str(p), "*.xml")):
logging.info(f">Including instance doc {g}")
includeInstanceDocList.append(g)
else:
logging.info(f">Including {p.absolute()}")
if str(p.absolute()).endswith('.xml'):
includeInstanceDocList.append(str(p.absolute()))
elif str(p.absolute()).endswith('.xsd'):
includeFileList.append(str(p.absolute()))
else:
logging.warning(f'Ignoring file {p.absolute()}')
if len(includeInstanceDocList) and (args.primaryNamespace is None):
print("Cannot validate instance documents without specifying a primary namespace (use -h for usage guidelines)")
exit(-1)
syntaxErrors = 0
print ("=============================")
print ("XSD syntax checks:")
print ("-----------------------------")
for file in includeFileList:
error = ValidateSingleFile(file)
if (error):
print (f" {file} : Syntax error [{error}]")
syntaxErrors += 1
else:
print (f" {file} : OK")
print ("-----------------------------")
if (syntaxErrors > 0):
print (f"{syntaxErrors} syntax errors detected")
exit(syntaxErrors)
else:
print ("0 syntax errors detected")
results, schemaDict = ValidateXSDFiles(includeFileList)
print ("=============================")
print ("XSD build checks:")
print ("-----------------------------")
errorCount = 0
for fileName, errors in results.items():
if len(errors) > 0:
errorCount += len(errors)
print (f" {fileName}: {len(errors)} errors")
for error in errors:
if isinstance(error, XMLSchemaParseError):
print (error.msg)
else:
print (f" {str(error.strip())}")
else:
print (f" {fileName}: OK")
print ("-----------------------------")
print (f"{errorCount} build errors detected")
if (errorCount > 0):
exit(errorCount)
print ("=============================")
print ("Instance document checks")
print ("-----------------------------")
errorCount = 0
primarySchema = schemaDict[args.primaryNamespace]
for instanceDoc in includeInstanceDocList:
try:
results = primarySchema.validate(instanceDoc)
print (f" {instanceDoc} : OK")
except Exception as ex:
errorCount += 1
print (f" {instanceDoc} : {str(ex)}")
print ("-----------------------------")
print (f"{errorCount} instance doc errors detected")
print ("=============================")
exit(errorCount)
import logging
import copy
from asn1tools import parse_files, compile_dict, ParseError, CompileError
from glob import glob
from pathlib import Path
from pprint import pprint
def parseASN1File (asnFile):
try:
parse_files(asnFile)
except ParseError as ex:
return [ex]
return []
def parseASN1Files (fileList):
if len(fileList) == 0:
logging.warning ("No files specified")
return {}
errors = {}
logging.info("Parsing files...")
for f in fileList:
ex = parseASN1File(f)
if ex:
logging.info (f" {f}: Failed - {ex!r}")
else:
logging.info (f" {f}: OK")
errors[f] = ex
return errors
def fixDottedReference (dict, importingModule, importingType, importingMember, importedModule, importedType):
newName = importedModule + "_" + importedType
dict[importedModule]['types'][newName] = copy.deepcopy(dict[importedModule]['types'][importedType])
dict[importingModule]['imports'][importedModule].append(newName)
member = [x for x in dict[importingModule]['types'][importingType]['members'] if x is not None and x['name'] == importingMember][0]
member['type'] = newName
def compileASN1Files (fileList):
logging.info("Compiling files...")
errors = []
imports = {}
#p = re.compile(r"]\s+\S+\.\S+")
#for f in fileList:
# with open(f) as fh:
# s = fh.read()
# for match in p.findall(s):
# print (f"In {f}: {match}")
#exit()
try:
dr = parse_files(fileList)
for modulename, module in dr.items():
# Weird fix because the compiler doesn't like RELATIVE-OID as a type
# Not sure if the on-the-wire encoding would be affected or not
# but for most checking purposes this doesn't matter
module['types']["RELATIVE-OID"] = {'type' : 'OBJECT IDENTIFIER'}
for k,v in module['imports'].items():
if not k in imports:
imports[k] = []
imports[k].append({
"in" : modulename,
"types" : v
})
for k,v in imports.items():
if not k in dr.keys():
importers = [i['in'] for i in v]
errors.append(f"Unsatisfied import [{k}] for {importers}")
fixDottedReference(dr, 'LI-PS-PDU', 'Location', 'umtsHI2Location', 'UmtsHI2Operations', 'Location')
fixDottedReference(dr, 'LI-PS-PDU', 'Location', 'epsLocation', 'EpsHI2Operations', 'Location')
fixDottedReference(dr, 'LI-PS-PDU', 'Location', 'eTSI671HI2Location', 'HI2Operations', 'Location')
fixDottedReference(dr, 'LI-PS-PDU', 'UMTSIRI', 'iRI-Parameters', 'UmtsHI2Operations', 'IRI-Parameters')
fixDottedReference(dr, 'LI-PS-PDU', 'UMTSIRI', 'iRI-CS-Parameters', 'UmtsCS-HI2Operations', 'IRI-Parameters')
fixDottedReference(dr, 'LI-PS-PDU', 'ETSI671IRI', 'iRI-Parameters', 'HI2Operations', 'IRI-Parameters')
fixDottedReference(dr, 'LI-PS-PDU', 'EPSIRI', 'iRI-EPS-Parameters', 'EpsHI2Operations', 'IRI-Parameters')
fixDottedReference(dr, 'LI-PS-PDU', 'ConfIRI', 'iRI-Conf-Parameters', 'CONFHI2Operations', 'IRI-Parameters')
fixDottedReference(dr, 'LI-PS-PDU', 'ProSeIRI', 'iRI-ProSe-Parameters', 'ProSeHI2Operations', 'IRI-Parameters')
fixDottedReference(dr, 'LI-PS-PDU', 'GcseIRI', 'iRI-Gcse-Parameters', 'GCSEHI2Operations', 'IRI-Parameters')
fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'tTRAFFIC-1', 'TS101909201', 'TTRAFFIC')
fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'cTTRAFFIC-1', 'TS101909201', 'CTTRAFFIC')
fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'tTRAFFIC-2', 'TS101909202', 'TTRAFFIC')
fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'cTTRAFFIC-2', 'TS101909202', 'CTTRAFFIC')
#fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'cCIPPacketHeader', 'CDMA2000CCModule', 'CCIPPacketHeader')
fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'uMTSCC-CC-PDU', 'Umts-HI3-PS', 'CC-PDU')
fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'ePSCC-CC-PDU', 'Eps-HI3-PS', 'CC-PDU')
fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'confCC-CC-PDU', 'CONF-HI3-IMS', 'Conf-CC-PDU')
fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'voipCC-CC-PDU', 'VoIP-HI3-IMS', 'Voip-CC-PDU')
fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'gcseCC-CC-PDU', 'GCSE-HI3', 'Gcse-CC-PDU')
fixDottedReference(dr, 'LI-PS-PDU', 'CCContents', 'cSvoice-CC-PDU', 'CSvoice-HI3-IP', 'CSvoice-CC-PDU')
fixDottedReference(dr, 'LI-PS-PDU', 'IRIContents', 'tARGETACTIVITYMONITOR-1', 'TS101909201', 'TARGETACTIVITYMONITOR-1')
fixDottedReference(dr, 'LI-PS-PDU', 'IRIContents', 'tARGETACTIVITYMONITOR-2', 'TS101909202', 'TARGETACTIVITYMONITOR')
#fixDottedReference(dr, 'LI-PS-PDU', 'IRIContents', 'lAESProtocol', 'Laesp-j-std-025-b', 'LAESProtocol')
#fixDottedReference(dr, 'LI-PS-PDU', 'IRIContents', 'cDMA2000LAESMessage', 'CDMA2000CIIModule', 'CDMA2000LAESMessage')
fixDottedReference(dr, 'LI-PS-PDU', 'HI4Payload', 'threeGPP-LI-Notification', 'TS33128Payloads', 'LINotificationPayload')
fixDottedReference(dr, 'ILHIPDU', 'TimestampMapping', 'timeStampQualifier', 'LI-PS-PDU', 'TimeStampQualifier')
fixDottedReference(dr, 'ILHIPDU', 'ILHITimestamp', 'qualifiedDateTime', 'Common-Parameters', 'QualifiedDateTime')
fixDottedReference(dr, 'ILHIPDU', 'ILHITimestamp', 'qualifiedMicrosecondDateTime', 'Common-Parameters', 'QualifiedMicrosecondDateTime')
fixDottedReference(dr, 'ILHIPDU', 'OriginalTimestamp', 'microSecondTimeStamp', 'LI-PS-PDU', 'MicroSecondTimeStamp')
fixDottedReference(dr, 'ILHIPDU', 'LocationMapping', 'originalLocation', 'LI-PS-PDU', 'Location')
fixDottedReference(dr, 'ILHIPDU', 'GeocodedLocationData', 'wGS84CoordinateDecimal', 'Common-Parameters', 'WGS84CoordinateDecimal')
fixDottedReference(dr, 'ILHIPDU', 'GeocodedLocationData', 'wGS84CoordinateAngular', 'Common-Parameters', 'WGS84CoordinateAngular')
c = compile_dict(dr)
except CompileError as ex:
logging.info (f"Compiler error: {ex}")
errors.append(ex)
return errors, None
except ParseError as ex:
logging.info (f"Parse error: {ex}")
errors.append(ex)
return errors, None