Commit f018f3c1 authored by Bernhard Heinloth's avatar Bernhard Heinloth
Browse files

validation according to german rules

parent 1ce5f8e2
......@@ -2,3 +2,6 @@ __pycache__/
*.pem
*.crt
*.json
*.csv
*.log
*.err
......@@ -37,7 +37,7 @@ Rekursiv (mit Submodule) klonen, Abhängigkeiten installieren und ausführen, z.
git clone git@gitlab.cs.fau.de:heinloth/covpass-check.git --recursive
cd covpass-check
sudo apt install libzbar0
sudo apt install zenity libzbar0
pip3 install -r requirements.txt
python3 webcam.py
......@@ -49,6 +49,13 @@ Für akustische Teilnehmerbeschränkung bei einer geringen Webcamauflösung beis
python3 webcam.py -a students.txt -s -r 640x480 -l log.txt
Für den Übungsbetrieb einfach die Teilnehmerliste(n) der Veranstaltung(en) im
oben beschriebenen`.csv` Format in den Ordner legen und schlicht
./start.sh
ausführen (bei Bedarf die Parameter in der Variable `PARAMS` anpassen).
Weiterführende Informationen
----------------------------
......
#!/bin/bash
PARAMS="-s -f"
#echo "Downloading root certs"
export CH_TOKEN='0795dc8b-d8d0-4313-abf2-510b12d50939'
export FR_TOKEN='eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJqUG0zZ1BzUlZaMWRRUmhHOG1HMGhFN3Jlb2ZXTTNINzJCV1RtajdJcFd3In0.eyJleHAiOjE2ODUxODU5MDYsImlhdCI6MTYyMjExMzkwNiwianRpIjoiOTdjODgyM2EtNjlhZS00NzA4LWE4N2UtNzYxM2NhNGU3ODU5IiwiaXNzIjoiaHR0cHM6Ly9hdXRoLm1lc3NlcnZpY2VzLmluZ3JvdXBlLmNvbS9hdXRoL3JlYWxtcy9QSU5HIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImVhMWY1NWVlLTUxMGMtNGMxNi05MWQ4LTE1MjI4OGJhZDViYSIsInR5cCI6IkJlYXJlciIsImF6cCI6InRhY3YtY2xpZW50LWxpdGUiLCJzZXNzaW9uX3N0YXRlIjoiNjk5ODExY2YtODFlZS00ZmNkLTk4NDctY2FkMGJmYjZhOTdiIiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJST0xFX1ZFUklGWV9DT05UUk9MXzJERE9DX0wxIiwiUk9MRV9WRVJJRllfQ09OVFJPTF8yRERPQ19CMiIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUgb2ZmbGluZV9hY2Nlc3MiLCJzaXJlbiI6IjAwMDAwMDAwMCIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwicHJlZmVycmVkX3VzZXJuYW1lIjoidGFjdi1tb2JpbGUtbGl0ZSIsImdpdmVuX25hbWUiOiIiLCJmYW1pbHlfbmFtZSI6IiJ9.mpfrIP8ayElTm7yoVayCF11oYrDQEnauk9hbbVBw8idAiE6OsMlWNloZtUbbnwrJZsMX3_NoEyzkiB3HNbxyhPWp7eRZ7qhn8XjZVgg6sVytXqcVZo9R5-Q9JftMKv7JelsY3PsaOo5x-pYOX30ancPRjd78TeenorGopsVN_LLRLQpenfgjjgwx-srZnLa-TFYTcbSvXozfJT7uk5CHyz_MIFLM7pl9Zdt66yTGBkLIyOLFsV5vPeH5SYvgRNDYdxZy4XMo6Gyfz0lAI9Xfcjs20NBoOQMV4JREH4Z-IcJJXeszC9QeA1-tRmxujqIRuyvBal7msLy7Zimd2q7i3Q'
if [[ ! -f 'DE.pem' ]] ; then
echo "Downloading root certs"
python3 verify_ehc.py --download-all-root-certs
fi
......@@ -13,7 +12,6 @@ CERTS="FR,GB"
for ROOT_CERT in *.pem ; do
if [[ ${#ROOT_CERT} -eq 6 ]] ; then
COUNTRY=${ROOT_CERT%.pem}
echo "Root cert from $COUNTRY"
#openssl x509 -text -in $ROOT_CERT
CERTS+=",$COUNTRY"
export ${COUNTRY}_ROOT_CERT=$ROOT_CERT
......@@ -30,4 +28,12 @@ elif [[ $(date --date="$TRUSTLIST_MAX_AGE ago" +%s) -gt $(date -r "$TRUSTLIST" +
bash -c "python3 verify_ehc.py --certs-from $CERTS --save-certs tmp-$TRUSTLIST && mv -f tmp-$TRUSTLIST $TRUSTLIST && echo done" &
fi
python3 webcam.py -s -t $TRUSTLIST
LOGFILE="$(date +%Y%m%d%H%M)"
if compgen -G "*.csv" > /dev/null ; then
ALLOW=$(ls *.csv | zenity --list --title="Teilnehmerliste" --column="Datei" )
if [[ -n "$ALLOW" ]] ; then
PARAMS+=" -a $ALLOW"
LOGFILE="${ALLOW%.csv}-${LOGFILE}"
fi
fi
python3 webcam.py ${PARAMS} -t $TRUSTLIST -l ${LOGFILE}.log -v >${LOGFILE}.err 2>&1
......@@ -13,15 +13,20 @@ import numpy as np
import cv2
import unicodedata
import argparse
import traceback
from playsound import playsound
parser = argparse.ArgumentParser(description='CovPass Check via Webcam (PoC)')
parser.add_argument('-v', '--verbose', action='store_true', help='verbose output')
parser.add_argument('--skip-verification', action='store_true', help='do not verify certificate (for debugging)')
parser.add_argument('--skip-validation', action='store_true', help='do not validate payload (for debugging)')
parser.add_argument('--skip-uniquecheck', action='store_true', help='do not check if certificate owner is unique (for debugging)')
parser.add_argument('-d', '--device', type=int,help='webcam device number (/dev/videoX)', default=0)
parser.add_argument('-r', '--resolution', help='webcam resolution (WxH)', default='1280x720')
parser.add_argument('-c', '--count', type=int, help='initial count value', default=0)
parser.add_argument('-s', '--sound', action='store_true', help='accoustic notification after each new (!) scan')
parser.add_argument('-f', '--freeze', type=float, help='seconds to freeze image after each new (!) scan', default=0.7)
parser.add_argument('-f', '--fullscreen', action='store_true', help='start in full screen')
parser.add_argument('-F', '--freeze', type=float, help='seconds to freeze image after each new (!) scan', default=0.7)
parser.add_argument('-t', '--trustlist', type=argparse.FileType('rb'), help='use given trustlist (json) instead of downloading new one')
parser.add_argument('-a', '--allowed', type=argparse.FileType('r'), help='list of allowed names')
parser.add_argument('-l', '--log', type=argparse.FileType('a'), help='access log file', default=sys.stderr)
......@@ -35,11 +40,12 @@ cap.set(cv2.CAP_PROP_FRAME_WIDTH, int(w))
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, int(h))
font_simplex = cv2.FONT_HERSHEY_SIMPLEX
font_duplex = cv2.FONT_HERSHEY_DUPLEX
cv2.namedWindow(args.window, cv2.WINDOW_NORMAL)
#cv2.namedWindow(args.window, cv2.WND_PROP_FULLSCREEN)
#cv2.setWindowProperty(args.window, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
if args.fullscreen:
cv2.namedWindow(args.window, cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty(args.window, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
else:
cv2.namedWindow(args.window, cv2.WINDOW_NORMAL)
qrcodes={}
process=[]
......@@ -66,7 +72,7 @@ def normalize(s):
s = s.replace(f, t)
return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn').strip()
def validate(ehc_msg, ehc_payload):
def verify(ehc_msg, ehc_payload):
try:
issued_at = EPOCH + timedelta(seconds=ehc_payload[6])
......@@ -81,6 +87,101 @@ def validate(ehc_msg, ehc_payload):
print('Failed', sys.exc_info())
return False
# https://github.com/eu-digital-green-certificates/dgc-business-rules-testdata/tree/main/DE
def validate(data):
# vaccinated
if 'v' in data:
# https://github.com/ehn-dcc-development/ehn-dcc-valuesets/blob/release/2.1.0/vaccine-medicinal-product.json
allowed_products = [ 'EU/1/20/1507', 'EU/1/20/1525', 'EU/1/20/1528', 'EU/1/20/1529' ]
dose_num = int(data['v'][0]['dn'])
series_dose = int(data['v'][0]['sd'])
date = datetime.strptime(data['v'][0]['dt'], '%Y-%m-%d')
medicinal_product = data['v'][0]['mp']
# VR-DE-0002
if not medicinal_product in allowed_products:
print(f'Only the following vaccines are accepted: AstraZeneca, Biontech, Janssen, Moderna.')
return False
# VR-DE-0001
elif dose_num < series_dose:
print(f'The vaccination schedule must be complete ({dose_num} / {series_dose}).')
return False
# VR-DE-0003
elif not (date + timedelta(days=15) <= datetime.now() or dose_num > 2 or (dose_num > 1 and medicinal_product == "EU/1/20/1525") or (series_dose == 1 and dose_num == 1 and medicinal_product in [ 'EU/1/20/1507', 'EU/1/20/1528', 'EU/1/20/1529' ])):
print(f'At least 14 days must have elapsed since completing the primary course of immunization ({date}). A booster shot or vaccination of someone who recovered from COVID-19 is valid immediately as long as it is clearly identified as such.')
return False
# VR-DE-0004
elif date + timedelta(days=365) < datetime.now():
print(f'Vaccine is not anymore effective (365 days after {date}!)')
return False
else:
return True
# tested
elif 't' in data:
test_type = data['t'][0]['tt']
test_result = data['t'][0]['tr']
sample_collection = datetime.strptime(data['t'][0]['sc'], '%Y-%m-%dT%H:%M:%SZ')
if sample_collection > datetime.now():
print(f'Invalid sample collection time {sample_collection} (in the future!)')
return False
# TR-DE-0004
if test_result == "260373001":
print(f'The test result must be negative.')
return False
elif test_result != "260415000":
print(f'The test result must be negative ({test_result} is invalid).')
return False
# Rapid antigen test
if test_type == 'LP217198-3':
# TR-DE-0003 / 24h
if sample_collection + timedelta(hours=24) < datetime.now():
print(f'The sample for an antigen test (e.g., rapid test) must have been taken no longer than 24 hours ago ({sample_collection}).')
return False
else:
return True
# Molecular test (nucleic acid amplification test)
elif test_type == 'LP6464-4':
# TR-DE-0003 / 48h
if sample_collection + timedelta(hours=48) < datetime.now():
print(f'The sample for an NAA test (e.g., PCR) must have been taken no longer than 48 hours ago ({sample_collection}).')
return False
else:
return True
# TR-DE-0001
else:
print('This must be an antigen test (e.g., rapid test) or NAA test (e.g., PCR) instead of {test_type}', test_type)
return False
# recovered
elif 'r' in data:
first_positive = datetime.strptime(data['r'][0]['fr'], '%Y-%m-%d')
if first_positive > datetime.now():
print(f'Invalid positive result date {first_positive} (in the future!)')
return False
# RR-DE-0001
elif first_positive + timedelta(days=28) > datetime.now():
print(f'The positive NAA test result ({first_positive}) must be older than 28 days.')
return False
# RR-DE-0001
elif first_positive + timedelta(days=180) < datetime.now():
print(f'The positive NAA test result ({first_positive}) must be no older than 6 months.')
return False
else:
return True
# unknown
else:
print("Unknown payload data type", [i for i in data.keys() if i not in ['dob', 'nam', 'ver']])
return False
class Color:
RED = (41,20,141)
GREEN = (119,155,0)
......@@ -158,29 +259,41 @@ while cap.isOpened():
# Check unprocessed qr codes
for obj, ehc_code in process:
print("New qr code", ehc_code)
qrcodes[ehc_code] = {}
if args.verbose:
print("New qr code", ehc_code)
qrcodes[ehc_code] = { 'valid': False }
try:
ehc_msg = decode_ehc(ehc_code)
ehc_payload = cbor2.loads(ehc_msg.payload)
print("Payload ", ehc_payload[-260][1])
gn = normalize(ehc_payload[-260][1]['nam']['gn'])
fn = normalize(ehc_payload[-260][1]['nam']['fn'])
payload_data = ehc_payload[-260][1]
if args.verbose:
print("Payload ", payload_data)
gn = normalize(payload_data['nam']['gn'])
fn = normalize(payload_data['nam']['fn'])
qrcodes[ehc_code]['name'] = gn + ' ' + fn
uid = ehc_payload[-260][1]['nam']['fnt'] + '<<<' + ehc_payload[-260][1]['nam']['gnt'] + '<<<<' + ehc_payload[-260][1]['dob']
qrcodes[ehc_code]['uid'] = uid
if uid in uniqusers:
print(uid, "not unique!")
qrcodes[ehc_code]['unique'] = False
else:
uniqusers.append(uid)
# check if unique
if args.skip_uniquecheck:
qrcodes[ehc_code]['unique'] = True
qrcodes[ehc_code]['valid'] = validate(ehc_msg, ehc_payload)
if qrcodes[ehc_code]['valid']:
else:
uid = payload_data['nam']['fnt'] + '<<<' + payload_data['nam']['gnt'] + '<<<<' + payload_data['dob']
qrcodes[ehc_code]['uid'] = uid
if uid in uniqusers:
print(uid, "not unique!")
qrcodes[ehc_code]['unique'] = False
else:
uniqusers.append(uid)
qrcodes[ehc_code]['unique'] = True
# verify certificate and validate payload
if (args.skip_verification or verify(ehc_msg, ehc_payload)) and (args.skip_validation or validate(payload_data)):
qrcodes[ehc_code]['valid'] = True
# check if allowed
if len(allowed) > 0:
qrcodes[ehc_code]['allowed'] = False
gnt = ehc_payload[-260][1]['nam']['gnt'].replace('<',' ')
fnt = ehc_payload[-260][1]['nam']['fnt'].replace('<',' ')
gnt = payload_data['nam']['gnt'].replace('<',' ')
fnt = payload_data['nam']['fnt'].replace('<',' ')
for a in allowed:
if (fn in a[0] and gn in a[1]) or (fnt in a[0].upper() and gnt in a[1].upper()):
qrcodes[ehc_code]['allowed'] = True
......@@ -188,25 +301,27 @@ while cap.isOpened():
if len(allowed) == 0 or qrcodes[ehc_code]['allowed']:
validusers = validusers + 1
note = ''
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), qrcodes[ehc_code]['name'],file=args.log)
else:
note = '(not allowed)'
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), qrcodes[ehc_code]['name'], note, file=args.log)
print(qrcodes[ehc_code]['name'], 'is not in list!')
except:
qrcodes[ehc_code]['valid'] = False
if not 'name' in qrcodes[ehc_code]:
qrcodes[ehc_code]['name'] = '(Invalid EHC QR)'
print('Failed', sys.exc_info())
print(traceback.format_exc())
# Graphical highlight of code
highlight_ehc(frame, obj, qrcodes[ehc_code])
cv2.imshow(args.window, frame)
# Play sound
if args.sound:
playsound('ok.mp3' if qrcodes[ehc_code]['valid'] and (len(allowed) == 0 or qrcodes[ehc_code]['allowed']) else 'error.mp3', False)
# Freeze output (for a short time)
wait(int(args.freeze * 1000))
process.clear()
wait(1)
wait()
# When everything done, release the capture
cap.release()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment