0
0
Fork 0
dn42-bird2-roa-generator/generate.py

181 lines
8.8 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import re
import sys
import collections
import ipaddress
# This script requires python3.7 or higher!
def parse_ipv4_filter_rules(registry_path):
with open(registry_path + '/data/filter.txt') as f:
filter_data = f.read()
filter_regex = re.compile(r'(?P<filter_num>\d+)(\s+)(?P<route_allowed>permit|deny)(\s+)(?P<network>\d+\.\d+\.\d+\.\d+\/\d+)(\s+)(?P<min_length>\d+)(\s+)(?P<max_length>\d+)(\s+)#(\s+)(?P<comment>.*)')
ipv4_rules = collections.OrderedDict()
for filter_line in filter_data.split('\n'):
if filter_line.startswith('#'): continue
filter_rule = filter_regex.match(filter_line)
if filter_rule == None: continue
parsed_rule = {
'filter_num': int(filter_rule.group('filter_num')),
'route_allowed': (True if filter_rule.group('route_allowed') == 'permit' else False),
'network': filter_rule.group('network'),
'min_length': int(filter_rule.group('min_length')),
'max_length': int(filter_rule.group('max_length')),
'comment': filter_rule.group('comment')
}
ipv4_rules[parsed_rule['filter_num']] = parsed_rule
return ipv4_rules
def parse_ipv6_filter_rules(registry_path):
with open(registry_path + '/data/filter6.txt') as f:
filter_data = f.read()
filter_regex = re.compile(r'(?P<filter_num>\d+)(\s+)(?P<route_allowed>permit|deny)(\s+)(?P<network>([0-9a-f:]*)\/\d+)(\s+)(?P<min_length>\d+)(\s+)(?P<max_length>\d+)(\s+)#(\s+)(?P<comment>.*)')
ipv6_rules = collections.OrderedDict()
for filter_line in filter_data.split('\n'):
if filter_line.startswith('#'): continue
filter_rule = filter_regex.match(filter_line)
if filter_rule == None: continue
parsed_rule = {
'filter_num': int(filter_rule.group('filter_num')),
'route_allowed': (True if filter_rule.group('route_allowed') == 'permit' else False),
'network': filter_rule.group('network'),
'min_length': int(filter_rule.group('min_length')),
'max_length': int(filter_rule.group('max_length')),
'comment': filter_rule.group('comment')
}
ipv6_rules[parsed_rule['filter_num']] = parsed_rule
return ipv6_rules
def parse_route_objects(folder_path):
route_objects = []
for root, dirs, files in os.walk(folder_path):
for filename in files:
current_file = str(root) + str(filename)
route_object = read_schema_object_file(current_file)
route_objects.append(route_object)
return route_objects
def read_schema_object_file(file_path):
with open(file_path) as f:
data = f.read()
schema_object = {}
current_key = None
current_value = None
for line in data.split('\n'):
# Ignore completely empty lines
if line == '': continue
# If a new key/value pair comes up (meaning there's a : within the first 20 chars)
if ':' in line[0:20]:
# Write last processed key/value-pair into schema_object if needed
if current_key != None:
if current_key in schema_object.keys():
if isinstance(schema_object[current_key], list):
schema_object[current_key].append(current_value)
else:
schema_object[current_key] = [schema_object[current_key], current_value]
else:
schema_object[current_key] = current_value
# Then begin parsing the next one
items = line.split(':', 1)
current_key = items[0].strip()
current_value = items[1].strip()
else:
# This denotes a linebreak in a multiline value
if line[0] == '+':
current_value += '\n'
elif line.startswith(' ' * 20):
# This is an additional line in a multiline value
current_value += line[20:] + '\n'
return schema_object
def create_roa_entries(route_objects, filter_rules, mode, f):
for route_object in route_objects:
# Convert single value origin attribute to list
if not isinstance(route_object['origin'], list):
route_object['origin'] = [route_object['origin']]
# Check if route is permitted by filter rules
if mode == 'ipv4':
route_network = ipaddress.IPv4Network(route_object['route'])
elif mode == 'ipv6':
route_network = ipaddress.IPv6Network(route_object['route6'])
matching_filter_rule = None
for id, filter_rule in filter_rules.items():
if mode == 'ipv4':
filter_network = ipaddress.IPv4Network(filter_rule['network'])
elif mode == 'ipv6':
filter_network = ipaddress.IPv6Network(filter_rule['network'])
if route_network.subnet_of(filter_network):
# First final rule match. Take it and leave!
#print('# ' + str(filter_rule) + ' matched for ' + str(route_object))
matching_filter_rule = filter_rule
break
# Now check if it permits or denies
if matching_filter_rule['route_allowed'] == False:
# Skip non-permitted routes
f.write('# DENIED' + str(route_object) + ' by filter rule: ' + str(matching_filter_rule) + '\n')
continue
# Check if min-length fits filter requirements
if matching_filter_rule['min_length'] > route_network.prefixlen:
# Drop route objects which do not satisfy min_length requirement of matching filter rule
f.write('# FAILED minimum prefix length requirement exceeded: ' + str(route_object) + ' against filter rule: ' + str(matching_filter_rule) + '\n')
continue
# Figure out max-length (filter rule wins over route object)
if 'max-length' in route_object.keys():
allowed_max_len = min(int(matching_filter_rule['max_length']), int(route_object['max-length']))
else:
allowed_max_len = int(matching_filter_rule['max_length'])
if allowed_max_len < route_network.prefixlen:
# Drop route objects which do not satisfy max_length requirement of matching filter rule
f.write('# FAILED maximum prefix length exceeded: ' + str(route_object) + ' against filter rule: ' + str(matching_filter_rule) + '\n')
continue
# Create roa for every origin of this route
for origin in route_object['origin']:
# Make sure to strip "AS" prefix from AS value
origin = origin.lstrip('AS')
if mode == 'ipv4':
f.write('route ' + route_object['route'] + ' max ' + str(allowed_max_len) + ' as ' + origin + ';' + "\n")
elif mode == 'ipv6':
f.write('route ' + route_object['route6'] + ' max ' + str(allowed_max_len) + ' as ' + origin + ';' + "\n")
def create_valid_network_function(filter_rules, mode, f):
f.write('function dn42_is_valid_' + mode + '_network() {\n')
f.write(' return net ~ [\n')
first_line_written = False
for id, filter_rule in filter_rules.items():
if filter_rule['route_allowed']:
if first_line_written:
f.write(',\n')
f.write(' ' + filter_rule['network'] + '{' + str(filter_rule['min_length']) + ',' + str(filter_rule['max_length']) + '}')
if not first_line_written:
first_line_written = True
f.write('\n ];\n')
f.write('}\n')
if __name__ == '__main__':
# Get registry path over commandline argument
registry_path = sys.argv[1]
# Uncomment this block to test the parser against that evil file in the registry.
#print(read_schema_object_file(registry_path + '/data/schema/SCHEMA-SCHEMA'))
#exit
# Read filter rules
ipv4_filter_rules = parse_ipv4_filter_rules(registry_path)
ipv6_filter_rules = parse_ipv6_filter_rules(registry_path)
# Read all route objects
ipv4_route_objects = parse_route_objects(registry_path + '/data/route/')
ipv6_route_objects = parse_route_objects(registry_path + '/data/route6/')
# Create routes out of ipv4 route objects
with open('./dn42_roa_ipv4.conf', 'w') as target_file:
create_roa_entries(ipv4_route_objects, ipv4_filter_rules, 'ipv4', target_file)
# Create routes out of ipv6 route objects
with open('./dn42_roa_ipv6.conf', 'w') as target_file:
create_roa_entries(ipv6_route_objects, ipv6_filter_rules, 'ipv6', target_file)
# Create ipv4 network validation function
with open('./dn42_valid_ipv4.conf', 'w') as target_file:
create_valid_network_function(ipv4_filter_rules, 'ipv4', target_file)
# Create ipv6 network validation function
with open('./dn42_valid_ipv6.conf', 'w') as target_file:
create_valid_network_function(ipv6_filter_rules, 'ipv6', target_file)