From 358ec62af629b2df451f125b606e6ce921248796 Mon Sep 17 00:00:00 2001 From: Jan Philipp Timme Date: Thu, 19 Dec 2019 11:34:18 +0100 Subject: [PATCH] Use builtin python tools to do address/header parsing --- main.py | 78 +++++++++++++++++++++++++++------------------------------ 1 file changed, 37 insertions(+), 41 deletions(-) diff --git a/main.py b/main.py index 92dda03..df96d9b 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,10 @@ import Milter import re +from email.header import decode_header +from email.utils import getaddresses + + # Basic logger that also logs to stdout # TODO: Improve this a lot. logger = logging.getLogger(__name__) @@ -18,36 +22,27 @@ handler.setFormatter(formatter) logger.addHandler(handler) -split_from_regex = re.compile('(?P.*)<(?P.*)>$') address_domain_regex = re.compile('.*@(?P[\.\w-]+)') +def get_decoded_header(value): + decoded_header_items = decode_header(value) + decoded_header_value = '' + for item in decoded_header_items: + decoded_item = item[0].decode(item[1]) if item[1] is not None else item[0] + if isinstance(decoded_item, bytes): + decoded_item = decoded_item.decode('ascii') + decoded_header_value += decoded_item + return getaddresses([decoded_header_value])[0] + + def normalizeRawFromHeader(value): return value.replace('\n', '').replace('\r', '').strip() -def parseFromHeader(value): - """Split 'From:'-header into label and address values.""" - match = split_from_regex.match(value) - if match is None: - return None - result = { - 'label': match.group('from_label').strip(), - 'address': match.group('from_address').strip() - } - result['label_domain'] = getDomainFromLabel(result['label']) - result['address_domain'] = getDomainFromAddress(result['address']) - return result - - -def getDomainFromLabel(label): +def getDomainFromValue(value): """ Check whether given 'From:' header label contains something that looks like an email address.""" - match = address_domain_regex.match(label) - return match.group('domain').strip() if match is not None else None - - -def getDomainFromAddress(address): - match = address_domain_regex.match(address) + match = address_domain_regex.match(value) return match.group('domain').strip() if match is not None else None @@ -67,27 +62,28 @@ class SuspiciousFrom(Milter.Base): logger.debug(f"({self.id}) Got \"From:\" header raw value: '{value}'") value = normalizeRawFromHeader(value) if value == '': - logger.info(f"Got empty from header value! WTF! Skipping.") + logger.warn(f"Got empty from header value! WTF! Skipping.") return Milter.CONTINUE - data = parseFromHeader(value) - if data is None: - logger.info(f"Failed to parse given from header value! Skipping.") - return Milter.CONTINUE - logger.info(f"({self.id}) Label: '{data['label']}', Address: '{data['address']}'") - if data['label_domain'] is not None: - logger.debug(f"({self.id}) Label '{data['label']}' contains an address with domain '{data['label_domain']}'.") - if data['label_domain'].lower() == data['address_domain'].lower(): - logger.info(f"({self.id}) Label domain '{data['label_domain']}' matches address domain '{data['address_domain']}'. Good!") - self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - Label domain matches address domain'}) - else: - logger.info(f"({self.id}) Label domain '{data['label_domain']}' did NOT match address domain '{data['address_domain']}'. BAD!") - self.new_headers.append({'name': 'X-From-Checked', 'value': 'FAIL - Label domain does NOT match address domain'}) + data = get_decoded_header(value) + logger.info(f"({self.id}) Label: '{data[0]}', Address: '{data[1]}'") + if data[0] == '': + logger.info(f"({self.id}) No label in from header, OK!") + self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - No label specified'}) else: - # Supposedly no additional address in the label, accept it for now - # TODO: Also decode utf-8 weirdness and check in there - logger.info(f"({self.id}) Label '{data['label']}' probably did not contain an address. Everything is fine.") - self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - No address found in label'}) - self.final_result = Milter.ACCEPT + label_domain = getDomainFromValue(data[0]) + address_domain = getDomainFromValue(data[1]) + logger.info(f"({self.id})Extracted label_domain '{label_domain}' and address_domain '{address_domain}'") + if label_domain is not None: + logger.debug(f"({self.id}) Label '{data[0]}' contains an address with domain '{label_domain}'.") + if label_domain.lower() == address_domain.lower(): + logger.info(f"({self.id}) Label domain '{label_domain}' matches address domain '{address_domain}'. Good!") + self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - Label domain matches address domain'}) + else: + logger.info(f"({self.id}) Label domain '{label_domain}' did NOT match address domain '{address_domain}'. BAD!") + self.new_headers.append({'name': 'X-From-Checked', 'value': 'FAIL - Label domain does NOT match address domain'}) + else: + logger.info(f"({self.id}) No domain found in label. Good!") + self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - No domain found in label.'}) # Use continue here, so we can reach eom hook. # TODO: Log and react if multiple From-headers are found? return Milter.CONTINUE