Automatic IOS Base Configuration for GNS3
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

608 lines
22 KiB

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018-2019 Bernhard Ehlers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Base interface and loopback configuration for Cisco router
"""
import os
import sys
import ipaddress
import re
import telnetlib
import uuid
import xml.etree.ElementTree as ET
import gns3api
def die(*msg_list):
""" abort program with error message """
error_msg = ' '.join(str(x) for x in msg_list)
sys.exit(error_msg.rstrip("\n\r"))
def next_ip_address(ip_intf):
"""" next IP address """
return ipaddress.ip_interface(str(ip_intf.ip + 1) +
"/" + str(ip_intf.network.prefixlen))
def next_ip_network(ip_intf):
"""" next IP network """
return ipaddress.ip_interface(
str(ip_intf.ip + ip_intf.network.num_addresses) +
"/" + str(ip_intf.network.prefixlen))
def send_cisco_commands(name, host, port, commands, privileged=True):
""" send config to Cisco router/switch """
if not commands:
return True
prompt = b"[>#] ?$"
status = "???"
try:
# open telnet connection
status = "connect"
tn = telnetlib.Telnet(host, port, 10)
# read old junk
while tn.read_until(b"xyzzy", timeout=0.3):
pass
# send a <CR>
status = "first contact"
tn.write(b"\r")
try:
response = tn.expect([prompt], 5)[2]
except OSError:
pass
tn.write(b"\r") # second <return>
response = tn.expect([prompt], 5)[2]
if privileged and response.endswith(b">"):
sys.stderr.write("{}: Router must be in priviledged mode.\n".format(name))
return False
# commands
status = "sending commands"
for line in commands:
while tn.read_until(b"xyzzy", timeout=0.1):
pass
tn.write((line + "\r").encode('utf-8', errors='replace'))
# wait for prompt
tn.expect([prompt])
# close connection
status = "close"
tn.close()
except OSError as err:
sys.stderr.write("{}: I/O error during {} - {}\n".format(name, status, err))
return False
except KeyboardInterrupt:
sys.stderr.write("{}: Aborted\n".format(name))
return False
return True # No error
def get_project_data(api, project_id, sel_items):
""" get node (with link information) and notes of a project by GNS3 API """
# get all node and link information
all_nodes = {}
all_links = {}
notes = []
try:
# check project status
project = api.request('GET', ('/v2/projects', project_id))
if project['status'] != 'opened':
die("Project '{}' is {}, please open it.".format(
project['name'], project['status']))
compute_host = {}
for compute in api.request('GET', '/v2/computes'):
compute_host[compute["compute_id"]] = compute["host"]
for node in api.request('GET', ('/v2/projects', project_id, 'nodes')):
console_host = node.get("console_host")
if console_host in ('0.0.0.0', '::'):
node["console_host"] = compute_host[node["compute_id"]]
all_nodes[node["node_id"]] = node
for link in api.request('GET', ('/v2/projects', project_id, 'links')):
all_links[link["link_id"]] = link
if sel_items: # get selected notes
for item in sel_items:
if item.startswith("text_drawings/"):
drawing = api.request('GET', \
('/v2/projects', project_id, "drawings", item[14:]))
svg = ET.fromstring(drawing["svg"])
if svg[0].tag == 'text':
notes.append(svg[0].text)
else: # nothing selected: get all of them
for drawing in api.request('GET', ('/v2/projects', project_id, "drawings")):
svg = ET.fromstring(drawing["svg"])
if svg[0].tag == 'text':
notes.append(svg[0].text)
except gns3api.GNS3ApiException as err:
die("Can't get node/link information:", err)
nodes = select_nodes(all_nodes, all_links, sel_items)
return nodes, notes
def select_nodes(all_nodes, all_links, sel_items):
""" select nodes and add link information """
nodes = {}
try:
if sel_items: # get selected nodes
for item in sel_items:
if item.startswith("nodes/"):
item = item[6:]
nodes[all_nodes[item]['name']] = all_nodes[item]
nodes[all_nodes[item]['name']]['_links'] = []
else: # nothing selected: get all nodes
for item in all_nodes:
nodes[all_nodes[item]['name']] = all_nodes[item]
nodes[all_nodes[item]['name']]['_links'] = []
# add link informations to nodes
for link in all_links:
link_nodes = all_links[link]['nodes']
node_0_id = link_nodes[0]['node_id']
node_1_id = link_nodes[1]['node_id']
node_0_name = all_nodes[node_0_id]['name']
node_1_name = all_nodes[node_1_id]['name']
label_0 = link_nodes[0].get("label", {}).get("text")
label_1 = link_nodes[1].get("label", {}).get("text")
if node_0_name in nodes:
nodes[node_0_name]['_links'].append(
{'link_id': link, 'link_type': all_links[link]['link_type'],
'adapter_number': link_nodes[0].get('adapter_number'),
'port_number': link_nodes[0].get('port_number'),
'label': label_0,
'remote_name': node_1_name, 'remote_label': label_1})
if node_1_name in nodes:
nodes[node_1_name]['_links'].append(
{'link_id': link, 'link_type': all_links[link]['link_type'],
'adapter_number': link_nodes[1].get('adapter_number'),
'port_number': link_nodes[1].get('port_number'),
'label': label_1,
'remote_name': node_0_name, 'remote_label': label_0})
except KeyError:
die("Project informations are inconsistent")
return nodes
def get_vlan_interfaces(nodes):
""" get vlan interfaces in switch groups """
vlan_interfaces = {}
for name in nodes:
is_switch = False
switch_group = {} # new switch group
switch_list = [name]
while switch_list: # process a group of switches
name = switch_list.pop()
if name not in nodes: # node not selected
continue
if name in vlan_interfaces: # already processed
continue
for link in nodes[name]['_links']:
if not link['label']:
pass
elif re.search(r'\btrunk\b', link['label'], re.IGNORECASE):
is_switch = True
switch_list.append(link['remote_name'])
else:
match = re.search(r'\bvlan *(\d+)\b', link['label'], re.IGNORECASE)
if match: # add vlan / link to vlan_interfaces
is_switch = True
vlan = int(match.group(1))
switch_group.setdefault(vlan, [])
switch_group[vlan].append(
[name, link['label'],
link['remote_name'], link['remote_label'],
link['link_id']])
if is_switch:
vlan_interfaces[name] = switch_group
for vlan in switch_group:
switch_group[vlan].sort(key=lambda k: [k[0].lower(), k[1].lower()])
for name in vlan_interfaces:
nodes[name]['_vlans'] = sorted(vlan_interfaces[name].keys())
return vlan_interfaces
def base_networks(notes):
""" get base IP networks for loopbacks and infrastructure interfaces """
loopback_base = None
infra_base = None
for note in notes:
# loopback address
match = re.search(r'^ *loopback: *(\S+)', note,
flags=re.IGNORECASE|re.MULTILINE)
if match:
if loopback_base is None:
try:
loopback_base = ipaddress.ip_interface(match.group(1))
except ValueError:
die("Invalid loopback address '{}'".format(match.group(1)))
else:
die("Multiple loopback addresses")
# infrastructure address
match = re.search(r'^ *infralink: *(\S+)', note,
flags=re.IGNORECASE|re.MULTILINE)
if match:
if infra_base is None:
try:
infra_base = ipaddress.ip_interface(match.group(1))
except ValueError:
die("Invalid infrastructure link address '{}'".format(match.group(1)))
if infra_base.network.num_addresses < 4:
die("Network '{}' is too small for 2 interface addresses.".format(infra_base.with_prefixlen))
if infra_base.ip == infra_base.network.network_address:
infra_base = next_ip_address(infra_base)
if infra_base.ip + 1 >= infra_base.network.broadcast_address:
die("'{}' or it's next address is the broadcast address.".format(infra_base.with_prefixlen))
else:
die("Multiple infrastructure link addresses")
if loopback_base is None:
die("No loopback address defined")
if infra_base is None:
die("No infrastructure link (InfraLink) address defined")
return loopback_base, infra_base
def cisco_router_config(notes):
""" get additional cisco router config """
router_config = []
for note in notes:
match = re.search(r'\bcisco +router +config *:', note,
flags=re.IGNORECASE)
if match:
conf_pos = match.end()
router_config.extend(note[conf_pos:].strip().splitlines())
return router_config
def sorted_node_names(nodes):
""" return sorted list of node names """
return sorted(nodes, key=lambda k: str(k).lower())
def sorted_links(links):
""" return sorted list of node links """
return sorted(links, key=lambda k: "" if k['label'] is None else \
str(k['label']).lower())
def add_link_ip(nodes, vlan_interfaces, ip_net_base):
"""" Add IP addresses to the links between the nodes """
for name in sorted_node_names(nodes):
if '_vlans' in nodes[name]:
continue
for link in sorted_links(nodes[name]['_links']):
if not link['label'] or not link['remote_label']:
continue
if 'IP' in link:
continue
if link['remote_name'] in vlan_interfaces:
match = re.search(r'\bvlan *(\d+)\b', link['remote_label'], re.IGNORECASE)
if match: # link to switch group
vlan = int(match.group(1))
ip_addr = ip_net_base
ip_count = 0
ip_last_link = None
for switch_if in vlan_interfaces[link['remote_name']][vlan]:
rem_name = switch_if[2]
rem_link_id = switch_if[4]
if rem_name not in vlan_interfaces:
if rem_name in nodes:
for rem_link in nodes[rem_name]['_links']:
if rem_link['link_id'] == rem_link_id:
rem_link['IP'] = ip_addr
ip_count += 1
ip_last_link = rem_link
break
ip_addr = next_ip_address(ip_addr)
if ip_count == 1: # ignore networks with 1 IP
del ip_last_link['IP']
elif ip_count >= 2:
ip_net_base = next_ip_network(ip_net_base)
elif link['remote_name'] in nodes:
for rem_link in nodes[link['remote_name']]['_links']:
if rem_link['link_id'] == link['link_id']:
if 'IP' not in rem_link:
link['IP'] = ip_net_base
rem_link['IP'] = next_ip_address(ip_net_base)
ip_net_base = next_ip_network(ip_net_base)
break
return ip_net_base
def add_loopback(nodes, ip_net_base):
"""" Add loopback to nodes """
for name in sorted_node_names(nodes):
if '_vlans' in nodes[name]:
continue
nodes[name].setdefault("_loopback", {})
if 0 not in nodes[name]['_loopback']:
nodes[name]['_loopback'][0] = ip_net_base
ip_net_base = next_ip_network(ip_net_base)
return ip_net_base
def str_clean(arg):
""" cleanup string: collapse whitespaces """
return " ".join(str(arg).split())
class CiscoRouter(dict):
""" cisco router """
def create_config(self):
""" create router configuration """
config = []
if self['node_type'] == 'qemu':
config.append("hostname {}".format(self['name']))
for loopback in sorted(self.get('_loopback', [])):
ip, mask = self['_loopback'][loopback].with_netmask.split('/', 1)
config.append("interface lo{}".format(loopback))
config.append(" ip address {} {}".format(ip, mask))
for link in sorted_links(self['_links']):
if not link['label']:
continue
config.append("interface {}".format(link['label']))
config.append(" description {} {}".format(
link['remote_name'], str_clean(link['remote_label'])))
if 'IP' in link:
ip, mask = link['IP'].with_netmask.split('/', 1)
config.append(" ip address {} {}".format(ip, mask))
config.append(" no shutdown")
config += self.get('_router_config', [])
if config:
config = ["configure terminal"] + config + ["end"]
return config
def send_commands(self, commands):
""" send commands to router """
return send_cisco_commands(self['name'], self["console_host"],
self["console"], commands)
class CiscoSwitch(dict):
""" cisco switch """
def create_config(self):
""" create switch configuration """
config = []
vlan_database = []
if self['node_type'] == 'qemu':
config.append("hostname {}".format(self['name']))
if self['node_type'] == 'dynamips':
for vlan in self.get('_vlans', []):
vlan_database.append("vlan {}".format(vlan))
if vlan_database:
vlan_database = ["vlan database"] + vlan_database + ["exit"]
else:
for vlan in self.get('_vlans', []):
config.append("vlan {}".format(vlan))
for link in sorted_links(self['_links']):
if not link['label']:
continue
match = re.match(r'([a-zA-Z]+ ?)?[0-9.:/]*[0-9]', link['label'])
if not match:
continue
ifname = match.group(0)
config.append("interface {}".format(ifname))
config.append(" description {} {}".format(
link['remote_name'], str_clean(link['remote_label'])))
if re.search(r'\btrunk\b', link['label'], re.IGNORECASE):
config.append(" switchport trunk encapsulation dot1q")
config.append(" switchport mode trunk")
else:
match = re.search(r'\bvlan *(\d+)\b', link['label'], re.IGNORECASE)
if match: # access link
vlan = int(match.group(1))
config.append(" switchport access vlan {}".format(vlan))
config.append(" switchport mode access")
if config:
config = ["configure terminal"] + config + ["end"]
config = vlan_database + config
return config
def send_commands(self, commands):
""" send commands to switch """
return send_cisco_commands(self['name'], self["console_host"],
self["console"], commands)
def select_cisco_devices(nodes, notes):
""" select cisco devices, using some heuristics """
devices = {}
router_config = cisco_router_config(notes)
print("Checking for devices, that are non Cisco router/switches...")
for name in sorted_node_names(nodes):
node = nodes[name]
if node["node_type"] == 'dynamips':
properties = node.get("properties", {})
if "NM-16ESW" in (properties.get("slot0"), properties.get("slot1"),
properties.get("slot2"), properties.get("slot3"),
properties.get("slot4"), properties.get("slot5"),
properties.get("slot6")):
devices[name] = CiscoSwitch(node)
else:
devices[name] = CiscoRouter(node)
if router_config:
devices[name]['_router_config'] = router_config
elif node["node_type"] == 'iou':
properties = node.get("properties", {})
image = properties.get("path", "").lower()
image = image.split("/")[-1]
if "l2" in image:
devices[name] = CiscoSwitch(node)
elif "l3" in image:
devices[name] = CiscoRouter(node)
if router_config:
devices[name]['_router_config'] = router_config
else:
print(" {}: unknown IOU device type".format(node["name"]))
elif node["node_type"] == 'qemu':
properties = node.get("properties", {})
image = properties.get("hda_disk_image", "")
image = image.replace("\\", "/").split("/")[-1].lower()
if "ios" in image:
if "l2" in image:
devices[name] = CiscoSwitch(node)
else:
devices[name] = CiscoRouter(node)
if router_config:
devices[name]['_router_config'] = router_config
else:
print(" {}: is not an IOS node".format(node["name"]))
else:
print(" {}: Non Cisco type '{}'".format(node["name"], node["node_type"]))
return devices
def parse_args(argv):
""" parse command line args and determine the project ID """
argc = len(argv)
if argc <= 1 or argv[1] == '-h' or argv[1] == '-?':
prog_name = os.path.splitext(os.path.basename(argv[0]))[0]
die("Usage: {} [<GNS3 profile>] <project>".format(prog_name))
if argc <= 3: # started as a script
if argc == 2: # only project, default profile
profile = None
project_name = argv[1]
else:
profile = argv[1]
project_name = argv[2]
sel_items = []
# connect to GNS3 controller
try:
api = gns3api.GNS3Api(profile=profile)
except gns3api.GNS3ApiException as err:
die("Can't connect to GNS3 controller:", err)
try: # check, if project is UUID
uuid.UUID(project_name)
project_id = project_name
except ValueError: # convert project name to UUID
# search for the project id
for proj in api.request('GET', '/v2/projects'):
if proj['name'] == project_name:
project_id = proj['project_id']
break
else:
die("Project '{}' not found".format(project_name))
else: # started as an external tool
try:
with open(argv[2], "r") as file:
ctl_url, ctl_user, ctl_passwd, *_ = file.read(512).splitlines()
if argv[2].endswith(".tmp"):
os.remove(argv[2])
except (OSError, ValueError) as err:
die("Can't get controller connection params:", err)
project_id = argv[3]
sel_items = argv[4:]
# connect to GNS3 controller
try:
api = gns3api.GNS3Api(ctl_url, ctl_user, ctl_passwd)
except gns3api.GNS3ApiException as err:
die("Can't connect to GNS3 controller:", err)
return api, project_id, sel_items
def main(argv):
""" Main function """
api, project_id, sel_items = parse_args(argv)
# get nodes (with link informations) and notes by GNS3 API
nodes, notes = get_project_data(api, project_id, sel_items)
if not nodes:
die("No nodes selected")
vlan_interfaces = get_vlan_interfaces(nodes)
# get base networks from notes
loopback_base, infra_base = base_networks(notes)
# select the Cisco devices
devices = select_cisco_devices(nodes, notes)
if not devices:
die("No Cisco routers/switches found")
# assign links and IP addresses
add_loopback(devices, loopback_base)
add_link_ip(devices, vlan_interfaces, infra_base)
# configure devices
print("Configuring...")
for name in sorted_node_names(devices):
node = devices[name]
if node["status"] != "started":
sys.stderr.write("{}: Node status is '{}'\n".format(name, node["status"]))
elif node.get("console") is None or \
node.get("console_host") is None or \
node.get("console_type") != "telnet":
sys.stderr.write("{}: Doesn't use telnet console\n".format(name))
else:
config = node.create_config()
if config:
print("{}...".format(name))
node.send_commands(config)
else:
print("{}: Nothing to configure.".format(name))
if __name__ == "__main__":
main(sys.argv)