From 436c7db707211475209f05571baf25e0976920ee Mon Sep 17 00:00:00 2001 From: Jan Philipp Timme Date: Fri, 20 Dec 2019 12:16:26 +0100 Subject: [PATCH] Big refactoring, getaddresses() is unreliable. :-( --- main.py | 62 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/main.py b/main.py index c373088..52222c7 100644 --- a/main.py +++ b/main.py @@ -23,7 +23,8 @@ handler.setFormatter(formatter) logger.addHandler(handler) # Rough regex to fetch domain values from address-like text -address_domain_regex = re.compile('.*@(?P[\.\w-]+)') +# Not matching the part in front of @ because greedy and stuff :( +address_domain_regex = re.compile('\@(?P[\.\w-]+)') def get_decoded_header(value): @@ -61,43 +62,42 @@ class SuspiciousFrom(Milter.Base): self.final_result = Milter.ACCEPT self.new_headers = [] + def set_suspicious_headers(self, is_suspicious, reason): + str_okay = "PASS" if not is_suspicious else "FAIL" + str_suspicious = "YES" if is_suspicious else "NO" + self.new_headers.append({'name': 'X-From-Checked', 'value': f"{str_okay} - {reason}"}) + self.new_headers.append({'name': 'X-From-Suspicious', 'value': str_suspicious}) + def header(self, field, value): """Header hook gets called for every header within the email processed.""" if field.lower() == 'from': - logger.debug(f"({self.id}) Got \"From:\" header raw value: '{value}'") + logger.debug(f"({self.id}) \"From:\" raw: '{value}'") value = normalizeRawFromHeader(value) + logger.info(f"({self.id}) \"From:\" cleaned: '{value}'") if value == '': - logger.warn(f"Got empty from header value! WTF! Skipping.") - return Milter.CONTINUE - decoded_from = get_decoded_header(value) - logger.debug(f"({self.id}) Decoded from as a whole: '{decoded_from}'") - data = getaddresses([decoded_from])[0] - logger.info(f"({self.id}) Label: '{data[0]}', Address: '{data[1]}'") - if data[0] == '': - logger.info(f"({self.id}) No label in from header, should be OK!") - if decoded_from.count('@') > 1: - logger.info(f"({self.id}) Raw decoded from header contains multiple '@'-charecters - investigating!") - self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - No label specified'}) - self.new_headers.append({'name': 'X-From-Suspicious', 'value': 'NO'}) + logger.warning(f"\"From:\" header empty! WTF, but nothing to do. OK for now.") + self.set_suspicious_headers(False, "EMPTY FROM HEADER - WTF") else: - label_domain = getDomainFromValue(data[0]) - address_domain = getDomainFromValue(data[1]) - logger.info(f"({self.id}) Extracted label_domain '{label_domain}' and address_domain '{address_domain}'") - if label_domain is not None: - logger.debug(f"({self.id}) Label '{data[0]}' contains an address with domain '{label_domain}'.") - if label_domain.lower() == address_domain.lower(): - logger.info(f"({self.id}) Label domain '{label_domain}' matches address domain '{address_domain}'. Good!") - self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - Label domain matches address domain'}) - self.new_headers.append({'name': 'X-From-Suspicious', 'value': 'NO'}) - else: - logger.info(f"({self.id}) Label domain '{label_domain}' did NOT match address domain '{address_domain}'. BAD!") - self.new_headers.append({'name': 'X-From-Checked', 'value': 'FAIL - Label domain does NOT match address domain'}) - self.new_headers.append({'name': 'X-From-Suspicious', 'value': 'YES'}) + decoded_from = get_decoded_header(value) + logger.debug(f"({self.id}) \"From:\" decoded raw: '{value}'") + decoded_from = normalizeRawFromHeader(decoded_from) + logger.info(f"({self.id}) \"From:\" decoded cleaned: '{decoded_from}'") + all_domains = address_domain_regex.findall(decoded_from) + if len(all_domains) == 0: + logger.warning(f"({self.id}) No domain in decoded \"From:\" - WTF! OK, though") + self.set_suspicious_headers(False, "No domains in decoded FROM") + elif len(all_domains) == 1: + logger.debug(f"({self.id}) Only one domain in decoded \"From:\": '{all_domains[0]}' - OK") + self.set_suspicious_headers(False, "Only one domain in decoded FROM") else: - logger.info(f"({self.id}) No domain found in label. Good!") - self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - No domain found in label.'}) - self.new_headers.append({'name': 'X-From-Suspicious', 'value': 'NO'}) - # Use continue here, so we can reach eom hook. + logger.info(f"({self.id}) Raw decoded from header contains multiple domains: '{all_domains}' - Checking") + if len(set(all_domains)) > 1: + logger.info(f"({self.id}) Multiple different domains in decoded \"From:\". - NOT OK") + self.set_suspicious_headers(True, "Multiple domains in decoded FROM are different") + else: + logger.info(f"({self.id}) All domains in decoded \"From:\" are identical - OK") + self.set_suspicious_headers(False, "Multiple domains in decoded FROM match properly") + # CONTINUE so we reach eom hook. # TODO: Log and react if multiple From-headers are found? return Milter.CONTINUE