Initial import
This commit is contained in:
commit
535d9420a3
|
@ -0,0 +1,10 @@
|
||||||
|
# bird2-roa-generator
|
||||||
|
|
||||||
|
This script reads data from a local git clone of the dn42 registry and generates roa rules for bird2.
|
||||||
|
It requires python3.7 or greater to run.
|
||||||
|
|
||||||
|
## How to use
|
||||||
|
|
||||||
|
* Clone dn42 registry repository (or refresh your existing clone with `git pull`)
|
||||||
|
* `./generate.py '/path/to/your/registry-clone'`
|
||||||
|
* Énjoy your two new roa files! :-)
|
|
@ -0,0 +1,133 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
import collections
|
||||||
|
import ipaddress
|
||||||
|
|
||||||
|
# This script requires python3.7 or higher!
|
||||||
|
|
||||||
|
def parse_ipv4_filter_rules(registry_path):
|
||||||
|
with open(registry_path + '/data/filter.txt') as f:
|
||||||
|
filter_data = f.read()
|
||||||
|
filter_regex = re.compile(r'(?P<filter_num>\d+)(\s+)(?P<route_allowed>permit|deny)(\s+)(?P<network>\d+\.\d+\.\d+\.\d+\/\d+)(\s+)(?P<min_length>\d+)(\s+)(?P<max_length>\d+)(\s+)#(\s+)(?P<comment>.*)')
|
||||||
|
ipv4_rules = collections.OrderedDict()
|
||||||
|
for filter_line in filter_data.split('\n'):
|
||||||
|
if filter_line.startswith('#'): continue
|
||||||
|
filter_rule = filter_regex.match(filter_line)
|
||||||
|
if filter_rule == None: continue
|
||||||
|
parsed_rule = {
|
||||||
|
'filter_num': int(filter_rule.group('filter_num')),
|
||||||
|
'route_allowed': (True if filter_rule.group('route_allowed') == 'permit' else False),
|
||||||
|
'network': filter_rule.group('network'),
|
||||||
|
'min_length': int(filter_rule.group('min_length')),
|
||||||
|
'max_length': int(filter_rule.group('max_length')),
|
||||||
|
'comment': filter_rule.group('comment')
|
||||||
|
}
|
||||||
|
ipv4_rules[parsed_rule['filter_num']] = parsed_rule
|
||||||
|
return ipv4_rules
|
||||||
|
|
||||||
|
def parse_ipv6_filter_rules(registry_path):
|
||||||
|
with open(registry_path + '/data/filter6.txt') as f:
|
||||||
|
filter_data = f.read()
|
||||||
|
filter_regex = re.compile(r'(?P<filter_num>\d+)(\s+)(?P<route_allowed>permit|deny)(\s+)(?P<network>([0-9a-f:]*)\/\d+)(\s+)(?P<min_length>\d+)(\s+)(?P<max_length>\d+)(\s+)#(\s+)(?P<comment>.*)')
|
||||||
|
ipv6_rules = collections.OrderedDict()
|
||||||
|
for filter_line in filter_data.split('\n'):
|
||||||
|
if filter_line.startswith('#'): continue
|
||||||
|
filter_rule = filter_regex.match(filter_line)
|
||||||
|
if filter_rule == None: continue
|
||||||
|
parsed_rule = {
|
||||||
|
'filter_num': int(filter_rule.group('filter_num')),
|
||||||
|
'route_allowed': (True if filter_rule.group('route_allowed') == 'permit' else False),
|
||||||
|
'network': filter_rule.group('network'),
|
||||||
|
'min_length': int(filter_rule.group('min_length')),
|
||||||
|
'max_length': int(filter_rule.group('max_length')),
|
||||||
|
'comment': filter_rule.group('comment')
|
||||||
|
}
|
||||||
|
ipv6_rules[parsed_rule['filter_num']] = parsed_rule
|
||||||
|
return ipv6_rules
|
||||||
|
|
||||||
|
def parse_route_objects(folder_path):
|
||||||
|
route_objects = []
|
||||||
|
for root, dirs, files in os.walk(folder_path):
|
||||||
|
for filename in files:
|
||||||
|
current_file = str(root) + str(filename)
|
||||||
|
route_object = read_route_object_file(current_file)
|
||||||
|
route_objects.append(route_object)
|
||||||
|
return route_objects
|
||||||
|
|
||||||
|
def read_route_object_file(file_path):
|
||||||
|
with open(file_path) as f:
|
||||||
|
data = f.read()
|
||||||
|
route_object = {}
|
||||||
|
for line in data.split('\n'):
|
||||||
|
if line.strip() == '': continue
|
||||||
|
items = line.split(':', 1)
|
||||||
|
key = items[0].strip()
|
||||||
|
value = items[1].strip()
|
||||||
|
if key in route_object.keys():
|
||||||
|
if isinstance(route_object[key], list):
|
||||||
|
route_object[key].append(value)
|
||||||
|
else:
|
||||||
|
route_object[key] = [route_object[key], value]
|
||||||
|
else:
|
||||||
|
route_object[key] = value
|
||||||
|
return route_object
|
||||||
|
|
||||||
|
def create_roa_entries(route_objects, filter_rules, mode, f):
|
||||||
|
for route_object in route_objects:
|
||||||
|
# Convert single value origin attribute to list
|
||||||
|
if not isinstance(route_object['origin'], list):
|
||||||
|
route_object['origin'] = [route_object['origin']]
|
||||||
|
# Check if route is permitted by filter rules
|
||||||
|
if mode == 'ipv4':
|
||||||
|
route_network = ipaddress.IPv4Network(route_object['route'])
|
||||||
|
elif mode == 'ipv6':
|
||||||
|
route_network = ipaddress.IPv6Network(route_object['route6'])
|
||||||
|
matching_filter_rule = None
|
||||||
|
for id, filter_rule in filter_rules.items():
|
||||||
|
if mode == 'ipv4':
|
||||||
|
filter_network = ipaddress.IPv4Network(filter_rule['network'])
|
||||||
|
elif mode == 'ipv6':
|
||||||
|
filter_network = ipaddress.IPv6Network(filter_rule['network'])
|
||||||
|
if route_network.subnet_of(filter_network):
|
||||||
|
# First final rule match. Take it and leave!
|
||||||
|
#print('# ' + str(filter_rule) + ' matched for ' + str(route_object))
|
||||||
|
matching_filter_rule = filter_rule
|
||||||
|
break
|
||||||
|
# Now check if it permits or denies
|
||||||
|
if matching_filter_rule['route_allowed'] == False:
|
||||||
|
# Skip non-permitted routes
|
||||||
|
f.write('# route object not permitted: ' + str(route_network) + "\n")
|
||||||
|
f.write('# route object was denied by filter rule: ' + str(matching_filter_rule) + "\n")
|
||||||
|
continue
|
||||||
|
# Figure out max-length (filter rule wins over route object)
|
||||||
|
if 'max-length' in route_object.keys():
|
||||||
|
allowed_max_len = min(int(matching_filter_rule['max_length']), int(route_object['max-length']))
|
||||||
|
else:
|
||||||
|
allowed_max_len = int(matching_filter_rule['max_length'])
|
||||||
|
# Create roa for every origin of this route
|
||||||
|
for origin in route_object['origin']:
|
||||||
|
# Make sure to strip "AS" prefix from AS value
|
||||||
|
origin = origin.lstrip('AS')
|
||||||
|
if mode == 'ipv4':
|
||||||
|
f.write('route ' + route_object['route'] + ' max ' + str(allowed_max_len) + ' as ' + origin + ';' + "\n")
|
||||||
|
elif mode == 'ipv6':
|
||||||
|
f.write('route ' + route_object['route6'] + ' max ' + str(allowed_max_len) + ' as ' + origin + ';' + "\n")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# Get registry path over commandline argument
|
||||||
|
registry_path = sys.argv[1]
|
||||||
|
# Read filter rules
|
||||||
|
ipv4_filter_rules = parse_ipv4_filter_rules(registry_path)
|
||||||
|
ipv6_filter_rules = parse_ipv6_filter_rules(registry_path)
|
||||||
|
# Read all route objects
|
||||||
|
ipv4_route_objects = parse_route_objects(registry_path + '/data/route/')
|
||||||
|
ipv6_route_objects = parse_route_objects(registry_path + '/data/route6/')
|
||||||
|
# Create routes out of ipv4 route objects
|
||||||
|
with open('./roa_ipv4.conf', 'w') as target_file:
|
||||||
|
create_roa_entries(ipv4_route_objects, ipv4_filter_rules, 'ipv4', target_file)
|
||||||
|
# Create routes out of ipv6 route objects
|
||||||
|
with open('./roa_ipv6.conf', 'w') as target_file:
|
||||||
|
create_roa_entries(ipv6_route_objects, ipv6_filter_rules, 'ipv6', target_file)
|
Loading…
Reference in New Issue