Proof of concept level prototype - emails are only marked at this stage

This commit is contained in:
Jan Philipp Timme 2019-12-18 16:33:54 +01:00
parent d0274f8046
commit e538d99abd
1 changed files with 35 additions and 22 deletions

57
main.py
View File

@ -18,55 +18,66 @@ handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
split_from_regex = re.compile('(?P<from_label>("(.*)")|(.*))(.*)<(?P<from_address>.*)>') split_from_regex = re.compile('(?P<from_label>.*)<(?P<from_address>.*)>$')
address_domain_regex = re.compile('.*@(?P<domain>[\.\w-]+)') address_domain_regex = re.compile('.*@(?P<domain>[\.\w-]+)')
def splitFromHeader(value): def parseFromHeader(value):
"""Split 'From:' header into label and address values.""" """Split 'From:' header into label and address values."""
match = split_from_regex.match(value) match = split_from_regex.match(value)
return { result = {
'label': match.group('from_label').strip(), 'label': match.group('from_label').strip(),
'address': match.group('from_address').strip() 'address': match.group('from_address').strip()
} }
result['label_domain'] = getDomainFromLabel(result['label'])
result['address_domain'] = getDomainFromAddress(result['address'])
return result
def labelContainsAddress(label): def getDomainFromLabel(label):
""" Check whether given 'From:' header label contains something that looks like an email address.""" """ Check whether given 'From:' header label contains something that looks like an email address."""
return address_domain_regex.match(label) is not None match = address_domain_regex.match(label)
return match.group('domain').strip() if match is not None else None
def labelAndAddressDomainsMatch(split): def getDomainFromAddress(address):
label_domain = address_domain_regex.match(split['label']).group('domain').strip() match = address_domain_regex.match(address)
address_domain = address_domain_regex.match(split['address']).group('domain').strip() return match.group('domain').strip() if match is not None else None
return label_domain.lower() == address_domain.lower()
class SuspiciousFrom(Milter.Base): class SuspiciousFrom(Milter.Base):
def __init__(self): def __init__(self):
self.id = Milter.uniqueID() self.id = Milter.uniqueID()
self.reset()
logger.info(f"({self.id}) Instanciated.")
def reset(self):
self.final_result = Milter.ACCEPT self.final_result = Milter.ACCEPT
self.new_headers = [] self.new_headers = []
logger.info(f"{self.id} got fired up.")
def header(self, field, value): def header(self, field, value):
"""Header hook gets called for every header within the email processed.""" """Header hook gets called for every header within the email processed."""
if field.lower() == 'from': if field.lower() == 'from':
logger.info(f"Got \"From:\" header with raw value: '{value}'") logger.debug(f"({self.id}) Got \"From:\" header raw value: '{value}'")
split = splitFromHeader(value) value = value.strip('\n').strip()
logger.info(f"Label: {split['label']}, address: {split['address']}") if value == '':
if labelContainsAddress(split['label']): logger.info(f"Got empty from header value! WTF! Skipping.")
logger.info() return Milter.CONTINUE
if labelAndAddressDomainsMatch(split): data = parseFromHeader(value)
self.new_headers.append({'name': 'X-From-Checked', 'value': 'Maybe multiple domains - no match - BAD!'}) logger.info(f"({self.id}) Label: '{data['label']}', Address: '{data['address']}'")
self.final_result = Milter.ACCEPT if data['label_domain'] is not None:
logger.debug(f"({self.id}) Label '{data['label']}' contains an address with domain '{data['label_domain']}'.")
if data['label_domain'].lower() == data['address_domain'].lower():
logger.info(f"({self.id}) Label domain '{data['label_domain']}' matches address domain '{data['address_domain']}'. Good!")
self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - Label domain matches address domain'})
else: else:
self.new_headers.append({'name': 'X-From-Checked', 'value': 'Multiple domains - no match - BAD!'}) logger.info(f"({self.id}) Label domain '{data['label_domain']}' did NOT match address domain '{data['address_domain']}'. BAD!")
self.final_result = Milter.ACCEPT self.new_headers.append({'name': 'X-From-Checked', 'value': 'FAIL - Label domain does NOT match address domain'})
else: else:
# Supposedly no additional address in the label, accept it for now # Supposedly no additional address in the label, accept it for now
# TODO: Also decode utf-8 weirdness and check in there # TODO: Also decode utf-8 weirdness and check in there
self.new_headers.append({'name': 'X-From-Checked', 'value': 'Yes, no address in label.'}) logger.info(f"({self.id}) Label '{data['label']}' probably did not contain an address. Everything is fine.")
self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - No address found in label'})
self.final_result = Milter.ACCEPT self.final_result = Milter.ACCEPT
# Use continue here, so we can reach eom hook. # Use continue here, so we can reach eom hook.
# TODO: Log and react if multiple From-headers are found? # TODO: Log and react if multiple From-headers are found?
@ -74,9 +85,11 @@ class SuspiciousFrom(Milter.Base):
def eom(self): def eom(self):
"""EOM hook gets called at the end of message processed. Headers and final verdict are applied only here.""" """EOM hook gets called at the end of message processed. Headers and final verdict are applied only here."""
# Finish up message according to results collected on the way. logger.info(f"({self.id}) EOM: Final verdict is {self.final_result}. New headers: {self.new_headers}")
for new_header in self.new_headers: for new_header in self.new_headers:
self.addheader(new_header['name'], new_header['value']) self.addheader(new_header['name'], new_header['value'])
logger.info(f"({self.id}) EOM: Reseting self.")
self.reset()
return self.final_result return self.final_result