#!/usr/bin/env python3 import os import re import sys import collections import ipaddress # This script requires python3.7 or higher! def parse_ipv4_filter_rules(registry_path): with open(registry_path + '/data/filter.txt') as f: filter_data = f.read() filter_regex = re.compile(r'(?P\d+)(\s+)(?Ppermit|deny)(\s+)(?P\d+\.\d+\.\d+\.\d+\/\d+)(\s+)(?P\d+)(\s+)(?P\d+)(\s+)#(\s+)(?P.*)') ipv4_rules = collections.OrderedDict() for filter_line in filter_data.split('\n'): if filter_line.startswith('#'): continue filter_rule = filter_regex.match(filter_line) if filter_rule == None: continue parsed_rule = { 'filter_num': int(filter_rule.group('filter_num')), 'route_allowed': (True if filter_rule.group('route_allowed') == 'permit' else False), 'network': filter_rule.group('network'), 'min_length': int(filter_rule.group('min_length')), 'max_length': int(filter_rule.group('max_length')), 'comment': filter_rule.group('comment') } ipv4_rules[parsed_rule['filter_num']] = parsed_rule return ipv4_rules def parse_ipv6_filter_rules(registry_path): with open(registry_path + '/data/filter6.txt') as f: filter_data = f.read() filter_regex = re.compile(r'(?P\d+)(\s+)(?Ppermit|deny)(\s+)(?P([0-9a-f:]*)\/\d+)(\s+)(?P\d+)(\s+)(?P\d+)(\s+)#(\s+)(?P.*)') ipv6_rules = collections.OrderedDict() for filter_line in filter_data.split('\n'): if filter_line.startswith('#'): continue filter_rule = filter_regex.match(filter_line) if filter_rule == None: continue parsed_rule = { 'filter_num': int(filter_rule.group('filter_num')), 'route_allowed': (True if filter_rule.group('route_allowed') == 'permit' else False), 'network': filter_rule.group('network'), 'min_length': int(filter_rule.group('min_length')), 'max_length': int(filter_rule.group('max_length')), 'comment': filter_rule.group('comment') } ipv6_rules[parsed_rule['filter_num']] = parsed_rule return ipv6_rules def parse_route_objects(folder_path): route_objects = [] for root, dirs, files in os.walk(folder_path): for filename in files: current_file = str(root) + str(filename) route_object = read_schema_object_file(current_file) route_objects.append(route_object) return route_objects def read_schema_object_file(file_path): with open(file_path) as f: data = f.read() schema_object = {} current_key = None current_value = None for line in data.split('\n'): # Ignore completely empty lines if line == '': continue # If a new key/value pair comes up (meaning there's a : within the first 20 chars) if ':' in line[0:20]: # Write last processed key/value-pair into schema_object if needed if current_key != None: if current_key in schema_object.keys(): if isinstance(schema_object[current_key], list): schema_object[current_key].append(current_value) else: schema_object[current_key] = [schema_object[current_key], current_value] else: schema_object[current_key] = current_value # Then begin parsing the next one items = line.split(':', 1) current_key = items[0].strip() current_value = items[1].strip() else: # This denotes a linebreak in a multiline value if line[0] == '+': current_value += '\n' elif line.startswith(' ' * 20): # This is an additional line in a multiline value current_value += line[20:] + '\n' return schema_object def create_roa_entries(route_objects, filter_rules, mode, f): for route_object in route_objects: # Convert single value origin attribute to list if not isinstance(route_object['origin'], list): route_object['origin'] = [route_object['origin']] # Check if route is permitted by filter rules if mode == 'ipv4': route_network = ipaddress.IPv4Network(route_object['route']) elif mode == 'ipv6': route_network = ipaddress.IPv6Network(route_object['route6']) matching_filter_rule = None for id, filter_rule in filter_rules.items(): if mode == 'ipv4': filter_network = ipaddress.IPv4Network(filter_rule['network']) elif mode == 'ipv6': filter_network = ipaddress.IPv6Network(filter_rule['network']) if route_network.subnet_of(filter_network): # First final rule match. Take it and leave! #print('# ' + str(filter_rule) + ' matched for ' + str(route_object)) matching_filter_rule = filter_rule break # Now check if it permits or denies if matching_filter_rule['route_allowed'] == False: # Skip non-permitted routes f.write('# DENIED' + str(route_object) + ' by filter rule: ' + str(matching_filter_rule) + '\n') continue # Check if min-length fits filter requirements if matching_filter_rule['min_length'] > route_network.prefixlen: # Drop route objects which do not satisfy min_length requirement of matching filter rule f.write('# FAILED minimum prefix length requirement exceeded: ' + str(route_object) + ' against filter rule: ' + str(matching_filter_rule) + '\n') continue # Figure out max-length (filter rule wins over route object) if 'max-length' in route_object.keys(): allowed_max_len = min(int(matching_filter_rule['max_length']), int(route_object['max-length'])) else: allowed_max_len = int(matching_filter_rule['max_length']) if allowed_max_len < route_network.prefixlen: # Drop route objects which do not satisfy max_length requirement of matching filter rule f.write('# FAILED maximum prefix length exceeded: ' + str(route_object) + ' against filter rule: ' + str(matching_filter_rule) + '\n') continue # Create roa for every origin of this route for origin in route_object['origin']: # Make sure to strip "AS" prefix from AS value origin = origin.lstrip('AS') if mode == 'ipv4': f.write('route ' + route_object['route'] + ' max ' + str(allowed_max_len) + ' as ' + origin + ';' + "\n") elif mode == 'ipv6': f.write('route ' + route_object['route6'] + ' max ' + str(allowed_max_len) + ' as ' + origin + ';' + "\n") def create_valid_network_function(filter_rules, mode, f): f.write('function dn42_is_valid_' + mode + '_network() {\n') f.write(' return net ~ [\n') first_line_written = False for id, filter_rule in filter_rules.items(): if filter_rule['route_allowed']: if first_line_written: f.write(',\n') f.write(' ' + filter_rule['network'] + '{' + str(filter_rule['min_length']) + ',' + str(filter_rule['max_length']) + '}') if not first_line_written: first_line_written = True f.write('\n ];\n') f.write('}\n') if __name__ == '__main__': # Get registry path over commandline argument registry_path = sys.argv[1] # Uncomment this block to test the parser against that evil file in the registry. #print(read_schema_object_file(registry_path + '/data/schema/SCHEMA-SCHEMA')) #exit # Read filter rules ipv4_filter_rules = parse_ipv4_filter_rules(registry_path) ipv6_filter_rules = parse_ipv6_filter_rules(registry_path) # Read all route objects ipv4_route_objects = parse_route_objects(registry_path + '/data/route/') ipv6_route_objects = parse_route_objects(registry_path + '/data/route6/') # Create routes out of ipv4 route objects with open('./dn42_roa_ipv4.conf', 'w') as target_file: create_roa_entries(ipv4_route_objects, ipv4_filter_rules, 'ipv4', target_file) # Create routes out of ipv6 route objects with open('./dn42_roa_ipv6.conf', 'w') as target_file: create_roa_entries(ipv6_route_objects, ipv6_filter_rules, 'ipv6', target_file) # Create ipv4 network validation function with open('./dn42_valid_ipv4.conf', 'w') as target_file: create_valid_network_function(ipv4_filter_rules, 'ipv4', target_file) # Create ipv6 network validation function with open('./dn42_valid_ipv6.conf', 'w') as target_file: create_valid_network_function(ipv6_filter_rules, 'ipv6', target_file)