Skip to content
GitLab
Explore
Sign in
LI - Lawful Interception
TC LI schemas definitions
Compare revisions
ff240e65ac364d3054f6e5761071ff30e85101b3 to 94a7ee52e42b8a21458fe31913761949f950c107
Hide whitespace changes
Inline
Side-by-side
testing/xsd_compile_targets.json
View file @
94a7ee52
...
@@ -51,5 +51,19 @@
...
@@ -51,5 +51,19 @@
"exampleFiles"
:
[
"exampleFiles"
:
[
"103120/examples/xml"
"103120/examples/xml"
]
]
}
},
{
"coreSchema"
:
"103707/TS_103_707.xsd"
,
"supportingSchemas"
:
[
"103280/TS_103_280.xsd"
,
"103120/schema/xsd/ts_103120_Common.xsd"
,
"103120/schema/xsd/ts_103120_Core.xsd"
,
"103120/schema/xsd/ts_103120_Task.xsd"
,
"testing/deps/xmldsig/xmldsig-core-schema.xsd"
,
"103707/examples/FooServiceSchema.xsd"
],
"exampleFiles"
:
[
"103707/examples"
]
}
]
]
utils/json_validator.py
0 → 100644
View file @
94a7ee52
import
sys
from
jsonschema
import
validate
,
RefResolver
,
Draft202012Validator
from
jsonschema.exceptions
import
ValidationError
import
json
from
pathlib
import
Path
import
logging
import
argparse
from
itertools
import
chain
class
JsonValidator
:
def
__init__
(
self
,
core_schema
:
str
,
other_schemas
:
dict
):
self
.
_core_schema
=
json
.
load
(
Path
(
core_schema
).
open
())
self
.
_schema_dict
=
{
self
.
_core_schema
[
'
$id
'
]
:
self
.
_core_schema
}
self
.
_supporting_paths
=
[]
for
thing
in
other_schemas
:
path
=
Path
(
thing
)
if
path
.
is_dir
():
logging
.
debug
(
f
"
Searching
{
path
}
for schema files
"
)
self
.
_supporting_paths
.
extend
(
path
.
rglob
(
"
*.schema.json
"
))
else
:
logging
.
debug
(
f
"
Appending
{
path
}
as schema file
"
)
self
.
_supporting_paths
.
append
(
path
)
logging
.
info
(
f
"
Supporting schema paths:
{
self
.
_supporting_paths
}
"
)
self
.
_supporting_schemas
=
[
json
.
load
(
p
.
open
())
for
p
in
self
.
_supporting_paths
]
self
.
_schema_dict
=
self
.
_schema_dict
|
{
s
[
'
$id
'
]
:
s
for
s
in
self
.
_supporting_schemas
}
logging
.
info
(
f
"
Loaded schema IDs:
{
[
k
for
k
in
self
.
_schema_dict
.
keys
()]
}
"
)
self
.
_resolver
=
RefResolver
(
None
,
referrer
=
None
,
store
=
self
.
_schema_dict
)
logging
.
info
(
"
Created RefResolver
"
)
self
.
_validator
=
Draft202012Validator
(
self
.
_core_schema
,
resolver
=
self
.
_resolver
)
logging
.
info
(
"
Created validator
"
)
def
validate
(
self
,
instance_doc
:
str
):
errors
=
list
(
self
.
_validator
.
iter_errors
(
instance_doc
))
return
errors
class
TS103120Validator
(
JsonValidator
):
def
__init__
(
self
,
path_to_repo
):
repo_path
=
Path
(
path_to_repo
)
schema_dirs
=
[
str
(
repo_path
/
"
103120/schema/json
"
),
str
(
"
103280/
"
)]
core_schema
=
str
(
repo_path
/
"
103120/schema/json/ts_103120_Core.schema.json
"
)
JsonValidator
.
__init__
(
self
,
core_schema
,
schema_dirs
)
request_fragment_schema
=
{
"
$ref
"
:
"
ts_103120_Core_2019_10#/$defs/RequestPayload
"
}
self
.
_request_fragment_validator
=
Draft202012Validator
(
request_fragment_schema
,
resolver
=
self
.
_resolver
)
response_fragment_schema
=
{
"
$ref
"
:
"
ts_103120_Core_2019_10#/$defs/ResponsePayload
"
}
self
.
_response_fragment_validator
=
Draft202012Validator
(
response_fragment_schema
,
resolver
=
self
.
_resolver
)
def
expand_request_response_exception
(
self
,
ex
):
if
list
(
ex
.
schema_path
)
==
[
'
properties
'
,
'
Payload
'
,
'
oneOf
'
]:
logging
.
info
(
"
Error detected validating payload oneOf - attempting explicit validation...
"
)
if
'
RequestPayload
'
in
instance_doc
[
'
Payload
'
].
keys
():
ret_list
=
list
(
chain
(
*
[
self
.
expand_action_exception
(
x
)
for
x
in
self
.
_request_fragment_validator
.
iter_errors
(
instance_doc
[
'
Payload
'
][
'
RequestPayload
'
])]))
for
r
in
ret_list
:
r
.
path
=
ex
.
path
+
r
.
path
return
ret_list
elif
'
ResponsePayload
'
in
instance_doc
[
'
Payload
'
].
keys
():
ret_list
=
list
(
chain
(
*
[
self
.
expand_action_exception
(
x
)
for
x
in
self
.
_request_fragment_validator
.
iter_errors
(
instance_doc
[
'
Payload
'
][
'
ResponsePayload
'
])]))
for
r
in
ret_list
:
r
.
path
=
ex
.
path
+
r
.
path
return
ret_list
else
:
logging
.
error
(
"
No RequestPayload or ResponsePayload found - is the Payload malformed?
"
)
return
[
ex
]
else
:
return
[
ex
]
def
expand_action_exception
(
self
,
ex
):
logging
.
error
(
"
Error detected in ActionRequests/ActionResponses
"
)
error_path
=
list
(
ex
.
schema_path
)
if
error_path
!=
[
'
properties
'
,
'
ActionRequests
'
,
'
properties
'
,
'
ActionRequest
'
,
'
items
'
,
'
allOf
'
,
1
,
'
oneOf
'
]
and
error_path
!=
[
'
properties
'
,
'
ActionResponses
'
,
'
properties
'
,
'
ActionResponse
'
,
'
items
'
,
'
allOf
'
,
1
,
'
oneOf
'
]:
logging
.
error
(
"
Error not in inner Request/Response allOf/oneOf constraint
"
)
return
[
ex
]
j
=
ex
.
instance
j
.
pop
(
'
ActionIdentifier
'
)
# Remove ActionIdentifier - one remaining key will be the verb
verb
=
list
(
j
.
keys
())[
0
]
message
=
"
Request
"
if
error_path
[
1
]
==
"
ActionRequests
"
else
"
Response
"
v
=
Draft202012Validator
({
"
$ref
"
:
f
"
ts_103120_Core_2019_10#/$defs/
{
verb
}{
message
}
"
},
resolver
=
self
.
_resolver
)
ret_list
=
list
(
chain
(
*
[
self
.
expand_object_exception
(
x
)
for
x
in
v
.
iter_errors
(
j
[
verb
])]))
for
r
in
ret_list
:
r
.
path
=
ex
.
path
+
r
.
path
return
ret_list
def
expand_object_exception
(
self
,
ex
):
logging
.
error
(
"
Error detected in verb
"
)
# The final level of validation is for the actual HI1Object validation
if
list
(
ex
.
schema_path
)
!=
[
'
properties
'
,
'
HI1Object
'
,
'
oneOf
'
]:
logging
.
error
(
"
Error not inside HI1Object
"
)
return
[
ex
]
object_type
=
ex
.
instance
[
'
@xsi:type
'
].
split
(
'
}
'
)[
-
1
]
object_ref
=
{
'
AuthorisationObject
'
:
'
ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject
'
,
'
LITaskObject
'
:
'
ts_103120_Task_2020_09#/$defs/LITaskObject
'
,
'
LDTaskObject
'
:
'
ts_103120_Task_2020_09#/$defs/LDTaskObject
'
,
'
LPTaskObject
'
:
'
ts_103120_Task_2020_09#/$defs/LPTaskObject
'
,
'
DocumentObject
'
:
'
ts_103120_Document_2020_09#/$defs/DocumentObject
'
,
'
NotificationObject
'
:
'
ts_103120_Notification_2016_02#/$defs/NotificationObject
'
,
'
DeliveryObject
'
:
'
ts_103120_Delivery_2019_10#/$defs/DeliveryObject
'
,
'
TrafficPolicyObject
'
:
'
ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject
'
,
'
TrafficRuleObject
'
:
'
ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject
'
,
}[
object_type
]
v
=
Draft202012Validator
({
"
$ref
"
:
object_ref
},
resolver
=
self
.
_resolver
)
return
list
(
v
.
iter_errors
(
ex
.
instance
))
def
validate
(
self
,
instance_doc
:
str
):
errors
=
JsonValidator
.
validate
(
self
,
instance_doc
)
out_errors
=
list
(
chain
(
*
[
self
.
expand_request_response_exception
(
ex
)
for
ex
in
errors
]))
return
out_errors
if
__name__
==
"
__main__
"
:
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
'
-s
'
,
'
--schemadir
'
,
action
=
"
append
"
,
help
=
"
Directory containing supporting schema files to use for validation
"
)
parser
.
add_argument
(
'
-v
'
,
'
--verbose
'
,
action
=
"
count
"
,
help
=
"
Verbose logging (can be specified multiple times)
"
)
parser
.
add_argument
(
'
-i
'
,
'
--input
'
,
type
=
argparse
.
FileType
(
'
r
'
),
default
=
sys
.
stdin
,
help
=
"
Path to input file (if absent, stdin is used)
"
)
parser
.
add_argument
(
'
--ts103120
'
,
action
=
"
store_true
"
,
help
=
"
Validate a TS 103 120 JSON document
"
)
parser
.
add_argument
(
'
--schema
'
,
default
=
None
,
help
=
"
Primary schema to validate against
"
)
parser
.
add_argument
(
'
-p
'
,
'
--printerror
'
,
action
=
"
count
"
,
help
=
"
Controls how verbose validation error printing is (can be specified multiple times)
"
)
args
=
parser
.
parse_args
()
match
args
.
verbose
:
case
v
if
v
and
v
>=
2
:
logging
.
basicConfig
(
level
=
logging
.
DEBUG
)
case
1
:
logging
.
basicConfig
(
level
=
logging
.
INFO
)
case
_
:
logging
.
basicConfig
(
level
=
logging
.
WARNING
)
logging
.
debug
(
f
"
Arguments:
{
args
}
"
)
if
(
args
.
ts103120
):
v
=
TS103120Validator
(
"
./
"
)
else
:
v
=
JsonValidator
(
args
.
schema
,
args
.
schemadir
)
logging
.
info
(
f
"
Taking instance doc input from
{
args
.
input
.
name
}
"
)
instance_doc
=
json
.
loads
(
args
.
input
.
read
())
args
.
input
.
close
()
errors
=
v
.
validate
(
instance_doc
)
for
error
in
errors
:
if
args
.
printerror
==
2
:
logging
.
error
(
error
)
elif
args
.
printerror
==
1
:
logging
.
error
(
f
"
{
list
(
error
.
path
)
}
-
{
error
.
message
}
"
)
if
len
(
errors
)
>
0
:
logging
.
error
(
f
"
{
len
(
errors
)
}
errors detected
"
)
else
:
logging
.
info
(
f
"
{
len
(
errors
)
}
errors detected
"
)
if
len
(
errors
)
>
0
:
exit
(
-
1
)
else
:
exit
(
0
)
utils/sign_json.py
0 → 100644
View file @
94a7ee52
import
argparse
import
logging
import
sys
from
jose
import
jws
from
pathlib
import
Path
import
json
def
insert_sig_block
(
j
):
j
[
'
Signature
'
]
=
{
'
protected
'
:
''
,
'
signature
'
:
''
}
return
j
if
__name__
==
"
__main__
"
:
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
'
-v
'
,
'
--verbose
'
,
action
=
'
count
'
,
help
=
'
Verbose logging (can be specified multiple times)
'
)
parser
.
add_argument
(
'
--pretty
'
,
action
=
"
store_true
"
,
help
=
'
Pretty-print the JSON document before signing
'
)
parser
.
add_argument
(
'
-i
'
,
'
--input
'
,
type
=
argparse
.
FileType
(
'
r
'
),
default
=
sys
.
stdin
,
help
=
"
Path to input file (if absent, stdin is used)
"
)
args
=
parser
.
parse_args
()
match
args
.
verbose
:
case
v
if
v
and
v
>=
2
:
logging
.
basicConfig
(
level
=
logging
.
DEBUG
)
case
1
:
logging
.
basicConfig
(
level
=
logging
.
INFO
)
case
_
:
logging
.
basicConfig
(
level
=
logging
.
WARNING
)
logging
.
debug
(
f
"
Arguments:
{
args
}
"
)
json_text
=
args
.
input
.
read
()
args
.
input
.
close
()
j
=
json
.
loads
(
json_text
)
j
=
insert_sig_block
(
j
)
indent
=
None
if
args
.
pretty
:
indent
=
'
'
presigned_json_text
=
json
.
dumps
(
j
,
indent
=
indent
)
Path
(
'
presigned.json
'
).
write_text
(
presigned_json_text
)
presigned_json_bytes
=
presigned_json_text
.
encode
(
'
utf-8
'
)
signed
=
jws
.
sign
(
presigned_json_bytes
,
'
secret_key
'
,
algorithm
=
"
HS256
"
)
components
=
signed
.
split
(
'
.
'
)
j
[
'
Signature
'
][
'
protected
'
]
=
components
[
0
]
j
[
'
Signature
'
][
'
signature
'
]
=
components
[
2
]
signed_json_text
=
json
.
dumps
(
j
,
indent
=
indent
)
print
(
signed_json_text
)
utils/translate_spec.py
View file @
94a7ee52
...
@@ -10,6 +10,14 @@ from translate import *
...
@@ -10,6 +10,14 @@ from translate import *
logging
.
basicConfig
(
level
=
logging
.
INFO
)
logging
.
basicConfig
(
level
=
logging
.
INFO
)
json_signature_struct
=
{
"
properties
"
:
{
"
protected
"
:
{
"
type
"
:
"
string
"
},
"
signature
"
:
{
"
type
"
:
"
string
"
}
},
"
required
"
:
[
"
protected
"
,
"
signature
"
]
}
def
build_schema_locations
(
paths
):
def
build_schema_locations
(
paths
):
schema_locations
=
[]
schema_locations
=
[]
for
schemaFile
in
paths
:
for
schemaFile
in
paths
:
...
@@ -70,15 +78,17 @@ if __name__ == "__main__":
...
@@ -70,15 +78,17 @@ if __name__ == "__main__":
json_schemas
=
{}
json_schemas
=
{}
for
schema_tuple
in
schema_locations
:
for
schema_tuple
in
schema_locations
:
logging
.
info
(
f
"
Translating
{
schema_tuple
}
"
)
logging
.
info
(
f
"
Translating
{
schema_tuple
}
"
)
if
'
xmldsig
'
in
(
schema_tuple
[
1
]):
if
'
skip
'
in
ns_map
[
schema_tuple
[
0
]]:
# TODO - work out what to do here
logging
.
info
(
f
"
Skipping
{
schema_tuple
[
0
]
}
...
"
)
logging
.
info
(
"
Skipping XML Dsig...
"
)
continue
continue
js
=
translate_schema
(
schema_tuple
[
1
],
ns_map
,
schema_locations
)
js
=
translate_schema
(
schema_tuple
[
1
],
ns_map
,
schema_locations
)
# TODO - Special case, get rid of signature
# TODO - Special case, get rid of XML Dsig signature and insert JSON signature
if
ns_map
[
schema_tuple
[
0
]]
==
'
core.json
'
:
if
schema_tuple
[
0
]
==
'
http://uri.etsi.org/03120/common/2019/10/Core
'
:
js
[
'
$defs
'
][
'
HI1Message
'
][
'
properties
'
].
pop
(
'
Signature
'
)
logging
.
info
(
"
Modifying signature elements
"
)
js
[
'
$defs
'
][
'
HI1Message
'
][
'
properties
'
].
pop
(
'
xmldsig:Signature
'
)
js
[
'
$defs
'
][
'
HI1Message
'
][
'
properties
'
][
'
Signature
'
]
=
json_signature_struct
js_path
=
output_path
/
convert_xsd_to_filename
(
schema_tuple
[
1
])
js_path
=
output_path
/
convert_xsd_to_filename
(
schema_tuple
[
1
])
# TODO - Special case - abstract HI1Object
# TODO - Special case - abstract HI1Object
...
...
utils/ts103280_config.json
0 → 100644
View file @
94a7ee52
{
"schemas"
:
{
"./103280/TS_103_280.xsd"
:
{
"prefix"
:
"etsi280"
}
},
"output"
:
"./103280/"
}
utils/validate_json.py
deleted
100644 → 0
View file @
e648774f
import
sys
from
jsonschema
import
validate
,
RefResolver
,
Draft202012Validator
from
jsonschema.exceptions
import
ValidationError
import
json
from
pathlib
import
Path
import
logging
import
argparse
def
handle_uri
(
u
):
print
(
u
)
def
load_json
(
path
:
str
):
with
open
(
path
)
as
f
:
return
json
.
load
(
f
)
if
__name__
==
"
__main__
"
:
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
'
-s
'
,
'
--schemadir
'
,
action
=
"
append
"
,
help
=
"
Directory containing supporting schema files to use for validation
"
)
parser
.
add_argument
(
'
-v
'
,
'
--verbose
'
,
action
=
"
count
"
,
help
=
"
Verbose logging (can be specified multiple times)
"
)
parser
.
add_argument
(
'
-i
'
,
'
--input
'
,
type
=
argparse
.
FileType
(
'
r
'
),
default
=
sys
.
stdin
,
help
=
"
Path to input file (if absent, stdin is used)
"
)
parser
.
add_argument
(
'
schema
'
,
help
=
"
Primary schema to validate against
"
)
args
=
parser
.
parse_args
()
match
args
.
verbose
:
case
v
if
v
and
v
>=
2
:
logging
.
basicConfig
(
level
=
logging
.
DEBUG
)
case
1
:
logging
.
basicConfig
(
level
=
logging
.
INFO
)
case
_
:
logging
.
basicConfig
(
level
=
logging
.
WARNING
)
logging
.
debug
(
f
"
Arguments:
{
args
}
"
)
instance_doc
=
json
.
loads
(
args
.
input
.
read
())
args
.
input
.
close
()
main_schema
=
load_json
(
args
.
schema
)
schema_dict
=
{
main_schema
[
'
$id
'
]
:
main_schema
}
if
args
.
schemadir
:
schema_paths
=
[]
for
d
in
args
.
schemadir
:
logging
.
info
(
f
"
Searching
{
d
}
"
)
logging
.
info
(
list
(
Path
(
d
).
rglob
(
"
*.schema.json
"
)))
schema_paths
+=
[
f
for
f
in
Path
(
d
).
rglob
(
"
*.schema.json
"
)]
logging
.
info
(
f
"
Schema files loaded:
{
schema_paths
}
"
)
schemas_json
=
[
json
.
load
(
p
.
open
())
for
p
in
schema_paths
]
schema_dict
=
schema_dict
|
{
s
[
'
$id
'
]
:
s
for
s
in
schemas_json
}
logging
.
info
(
f
"
Schema IDs loaded:
{
[
k
for
k
in
schema_dict
.
keys
()]
}
"
)
logging
.
debug
(
f
"
Instance doc:
{
instance_doc
}
"
)
logging
.
debug
(
f
"
Main schema:
{
main_schema
}
"
)
resolver
=
RefResolver
(
None
,
referrer
=
None
,
store
=
schema_dict
)
v
=
Draft202012Validator
(
main_schema
,
resolver
=
resolver
)
try
:
v
.
validate
(
instance_doc
)
except
ValidationError
as
ex
:
# Any failure within the Payload element results in a failure against the oneOf constraint in the Payload element
# This isn't terribly helpful in working out what is actually wrong, so in this case we attempt an explicit
# validation against the relevant oneOf alternation to try and get a more useful validation error
if
list
(
ex
.
schema_path
)
==
[
'
properties
'
,
'
Payload
'
,
'
oneOf
'
]:
logging
.
error
(
"
Error detected validating payload oneOf - attempting explicit validation...
"
)
try
:
if
'
RequestPayload
'
in
instance_doc
[
'
Payload
'
].
keys
():
request_fragment_schema
=
{
"
$ref
"
:
"
ts_103120_Core_2019_10#/$defs/RequestPayload
"
}
v
=
Draft202012Validator
(
request_fragment_schema
,
resolver
=
resolver
)
v
.
validate
(
instance_doc
[
'
Payload
'
][
'
RequestPayload
'
])
elif
'
ResponsePayload
'
in
instance_doc
[
'
Payload
'
].
keys
():
request_fragment_schema
=
{
"
$ref
"
:
"
ts_103120_Core_2019_10#/$defs/ResponsePayload
"
}
v
=
Draft202012Validator
(
request_fragment_schema
,
resolver
=
resolver
)
v
.
validate
(
instance_doc
[
'
Payload
'
][
'
ResponsePayload
'
])
else
:
logging
.
error
(
"
No RequestPayload or ResponsePayload found - is the Payload malformed?
"
)
raise
ex
except
ValidationError
as
ex2
:
# Similar to above, this is inner validation to try and get a more useful error in the event
# that something fails the verb oneOf constraint
logging
.
error
(
"
Error detected in ActionRequests/ActionResponses
"
)
error_path
=
list
(
ex2
.
schema_path
)
if
error_path
!=
[
'
properties
'
,
'
ActionRequests
'
,
'
properties
'
,
'
ActionRequest
'
,
'
items
'
,
'
allOf
'
,
1
,
'
oneOf
'
]
and
error_path
!=
[
'
properties
'
,
'
ActionResponses
'
,
'
properties
'
,
'
ActionResponse
'
,
'
items
'
,
'
allOf
'
,
1
,
'
oneOf
'
]:
logging
.
error
(
"
Error not in inner Request/Response allOf/oneOf constraint
"
)
raise
ex2
j
=
ex2
.
instance
j
.
pop
(
'
ActionIdentifier
'
)
# Remove ActionIdentifier - one remaining key will be the verb
verb
=
list
(
j
.
keys
())[
0
]
message
=
"
Request
"
if
error_path
[
1
]
==
"
ActionRequests
"
else
"
Response
"
v
=
Draft202012Validator
({
"
$ref
"
:
f
"
ts_103120_Core_2019_10#/$defs/
{
verb
}{
message
}
"
},
resolver
=
resolver
)
try
:
v
.
validate
(
j
[
verb
])
except
ValidationError
as
ex3
:
logging
.
error
(
"
Error detected in verb
"
)
# The final level of validation is for the actual HI1Object validation
if
list
(
ex3
.
schema_path
)
!=
[
'
properties
'
,
'
HI1Object
'
,
'
oneOf
'
]:
logging
.
error
(
"
Error not inside HI1Object
"
)
raise
ex3
object_type
=
ex3
.
instance
[
'
@xsi:type
'
].
split
(
'
}
'
)[
-
1
]
object_ref
=
{
'
AuthorisationObject
'
:
'
ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject
'
,
'
LITaskObject
'
:
'
ts_103120_Task_2020_09#/$defs/LITaskObject
'
,
'
LDTaskObject
'
:
'
ts_103120_Task_2020_09#/$defs/LDTaskObject
'
,
'
LPTaskObject
'
:
'
ts_103120_Task_2020_09#/$defs/LPTaskObject
'
,
'
DocumentObject
'
:
'
ts_103120_Document_2020_09#/$defs/DocumentObject
'
,
'
NotificationObject
'
:
'
ts_103120_Notification_2016_02#/$defs/NotificationObject
'
,
'
DeliveryObject
'
:
'
ts_103120_Delivery_2019_10#/$defs/DeliveryObject
'
,
'
TrafficPolicyObject
'
:
'
ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject
'
,
'
TrafficRuleObject
'
:
'
ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject
'
,
}[
object_type
]
v
=
Draft202012Validator
({
"
$ref
"
:
object_ref
},
resolver
=
resolver
)
v
.
validate
(
ex3
.
instance
)
exit
(
-
1
)
logging
.
info
(
"
Done
"
)
utils/verify_json.py
0 → 100644
View file @
94a7ee52
import
argparse
import
sys
import
logging
import
base64
from
jose
import
jws
from
pathlib
import
Path
import
json
if
__name__
==
"
__main__
"
:
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
'
-v
'
,
'
--verbose
'
,
action
=
'
count
'
,
help
=
'
Verbose logging (can be specified multiple times)
'
)
parser
.
add_argument
(
'
-i
'
,
'
--input
'
,
type
=
argparse
.
FileType
(
'
r
'
),
default
=
sys
.
stdin
,
help
=
"
Path to input file (if absent, stdin is used)
"
)
args
=
parser
.
parse_args
()
match
args
.
verbose
:
case
v
if
v
and
v
>=
2
:
logging
.
basicConfig
(
level
=
logging
.
DEBUG
)
case
1
:
logging
.
basicConfig
(
level
=
logging
.
INFO
)
case
_
:
logging
.
basicConfig
(
level
=
logging
.
WARNING
)
logging
.
debug
(
f
"
Arguments:
{
args
}
"
)
signed_json_text
=
args
.
input
.
read
()
args
.
input
.
close
()
j
=
json
.
loads
(
signed_json_text
)
protected_header
=
j
[
'
Signature
'
][
'
protected
'
]
signature
=
j
[
'
Signature
'
][
'
signature
'
]
# TODO some safety checks needed here
# Remove the newline that appears from the console
if
signed_json_text
.
endswith
(
'
\n
'
):
signed_json_text
=
signed_json_text
[:
-
1
]
signed_json_text
=
signed_json_text
.
replace
(
protected_header
,
""
).
replace
(
signature
,
""
)
payload_bytes
=
signed_json_text
.
encode
(
'
utf-8
'
)
payload_token
=
base64
.
b64encode
(
payload_bytes
).
decode
(
'
ascii
'
)
# Un-pad the token, as per RFC7515 annex C
payload_token
=
payload_token
.
split
(
'
=
'
)[
0
]
payload_token
=
payload_token
.
replace
(
'
+
'
,
'
-
'
)
payload_token
=
payload_token
.
replace
(
'
/
'
,
'
_
'
)
token
=
protected_header
+
"
.
"
+
payload_token
+
"
.
"
+
signature
result
=
jws
.
verify
(
token
,
key
=
"
secret_key
"
,
algorithms
=
[
'
HS256
'
])
print
(
"
Signature verified
"
)
Prev
1
2
Next