|
- #!/usr/bin/python3
- # -*- coding: utf-8 -*-
- #
- # Copyright (C) 2018-2019 Bernhard Ehlers
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
-
- """
- Base interface and loopback configuration for Cisco router
- """
-
- import os
- import sys
- import ipaddress
- import re
- import telnetlib
- import uuid
- import xml.etree.ElementTree as ET
- import gns3api
-
-
- def die(*msg_list):
- """ abort program with error message """
- error_msg = ' '.join(str(x) for x in msg_list)
- sys.exit(error_msg.rstrip("\n\r"))
-
-
- def next_ip_address(ip_intf):
- """" next IP address """
- return ipaddress.ip_interface(str(ip_intf.ip + 1) +
- "/" + str(ip_intf.network.prefixlen))
-
-
- def next_ip_network(ip_intf):
- """" next IP network """
- return ipaddress.ip_interface(
- str(ip_intf.ip + ip_intf.network.num_addresses) +
- "/" + str(ip_intf.network.prefixlen))
-
-
- def send_cisco_commands(name, host, port, commands, privileged=True):
- """ send config to Cisco router/switch """
-
- if not commands:
- return True
-
- prompt = b"[>#] ?$"
- status = "???"
- try:
-
- # open telnet connection
- status = "connect"
- tn = telnetlib.Telnet(host, port, 10)
-
- # read old junk
- while tn.read_until(b"xyzzy", timeout=0.3):
- pass
-
- # send a <CR>
- status = "first contact"
- tn.write(b"\r")
- try:
- response = tn.expect([prompt], 5)[2]
- except OSError:
- pass
- tn.write(b"\r") # second <return>
- response = tn.expect([prompt], 5)[2]
- if privileged and response.endswith(b">"):
- sys.stderr.write("{}: Router must be in priviledged mode.\n".format(name))
- return False
-
- # commands
- status = "sending commands"
- for line in commands:
- while tn.read_until(b"xyzzy", timeout=0.1):
- pass
- tn.write((line + "\r").encode('utf-8', errors='replace'))
- # wait for prompt
- tn.expect([prompt])
-
- # close connection
- status = "close"
- tn.close()
-
- except OSError as err:
- sys.stderr.write("{}: I/O error during {} - {}\n".format(name, status, err))
- return False
- except KeyboardInterrupt:
- sys.stderr.write("{}: Aborted\n".format(name))
- return False
-
- return True # No error
-
-
- def get_project_data(api, project_id, sel_items):
- """ get node (with link information) and notes of a project by GNS3 API """
-
- # get all node and link information
- all_nodes = {}
- all_links = {}
- notes = []
- try:
- # check project status
- project = api.request('GET', ('/v2/projects', project_id))
- if project['status'] != 'opened':
- die("Project '{}' is {}, please open it.".format(
- project['name'], project['status']))
-
- compute_host = {}
- for compute in api.request('GET', '/v2/computes'):
- compute_host[compute["compute_id"]] = compute["host"]
- for node in api.request('GET', ('/v2/projects', project_id, 'nodes')):
- console_host = node.get("console_host")
- if console_host in ('0.0.0.0', '::'):
- node["console_host"] = compute_host[node["compute_id"]]
- all_nodes[node["node_id"]] = node
- for link in api.request('GET', ('/v2/projects', project_id, 'links')):
- all_links[link["link_id"]] = link
-
- if sel_items: # get selected notes
- for item in sel_items:
- if item.startswith("text_drawings/"):
- drawing = api.request('GET', \
- ('/v2/projects', project_id, "drawings", item[14:]))
- svg = ET.fromstring(drawing["svg"])
- if svg[0].tag == 'text':
- notes.append(svg[0].text)
- else: # nothing selected: get all of them
- for drawing in api.request('GET', ('/v2/projects', project_id, "drawings")):
- svg = ET.fromstring(drawing["svg"])
- if svg[0].tag == 'text':
- notes.append(svg[0].text)
-
- except gns3api.GNS3ApiException as err:
- die("Can't get node/link information:", err)
-
- nodes = select_nodes(all_nodes, all_links, sel_items)
-
- return nodes, notes
-
-
- def select_nodes(all_nodes, all_links, sel_items):
- """ select nodes and add link information """
- nodes = {}
- try:
- if sel_items: # get selected nodes
- for item in sel_items:
- if item.startswith("nodes/"):
- item = item[6:]
- nodes[all_nodes[item]['name']] = all_nodes[item]
- nodes[all_nodes[item]['name']]['_links'] = []
- else: # nothing selected: get all nodes
- for item in all_nodes:
- nodes[all_nodes[item]['name']] = all_nodes[item]
- nodes[all_nodes[item]['name']]['_links'] = []
-
- # add link informations to nodes
- for link in all_links:
- link_nodes = all_links[link]['nodes']
- node_0_id = link_nodes[0]['node_id']
- node_1_id = link_nodes[1]['node_id']
- node_0_name = all_nodes[node_0_id]['name']
- node_1_name = all_nodes[node_1_id]['name']
- label_0 = link_nodes[0].get("label", {}).get("text")
- label_1 = link_nodes[1].get("label", {}).get("text")
- if node_0_name in nodes:
- nodes[node_0_name]['_links'].append(
- {'link_id': link, 'link_type': all_links[link]['link_type'],
- 'adapter_number': link_nodes[0].get('adapter_number'),
- 'port_number': link_nodes[0].get('port_number'),
- 'label': label_0,
- 'remote_name': node_1_name, 'remote_label': label_1})
- if node_1_name in nodes:
- nodes[node_1_name]['_links'].append(
- {'link_id': link, 'link_type': all_links[link]['link_type'],
- 'adapter_number': link_nodes[1].get('adapter_number'),
- 'port_number': link_nodes[1].get('port_number'),
- 'label': label_1,
- 'remote_name': node_0_name, 'remote_label': label_0})
- except KeyError:
- die("Project informations are inconsistent")
-
- return nodes
-
-
- def get_vlan_interfaces(nodes):
- """ get vlan interfaces in switch groups """
- vlan_interfaces = {}
- for name in nodes:
- is_switch = False
- switch_group = {} # new switch group
- switch_list = [name]
- while switch_list: # process a group of switches
- name = switch_list.pop()
- if name not in nodes: # node not selected
- continue
- if name in vlan_interfaces: # already processed
- continue
- for link in nodes[name]['_links']:
- if not link['label']:
- pass
- elif re.search(r'\btrunk\b', link['label'], re.IGNORECASE):
- is_switch = True
- switch_list.append(link['remote_name'])
- else:
- match = re.search(r'\bvlan *(\d+)\b', link['label'], re.IGNORECASE)
- if match: # add vlan / link to vlan_interfaces
- is_switch = True
- vlan = int(match.group(1))
- switch_group.setdefault(vlan, [])
- switch_group[vlan].append(
- [name, link['label'],
- link['remote_name'], link['remote_label'],
- link['link_id']])
- if is_switch:
- vlan_interfaces[name] = switch_group
- for vlan in switch_group:
- switch_group[vlan].sort(key=lambda k: [k[0].lower(), k[1].lower()])
- for name in vlan_interfaces:
- nodes[name]['_vlans'] = sorted(vlan_interfaces[name].keys())
-
- return vlan_interfaces
-
-
- def base_networks(notes):
- """ get base IP networks for loopbacks and infrastructure interfaces """
-
- loopback_base = None
- infra_base = None
-
- for note in notes:
- # loopback address
- match = re.search(r'^ *loopback: *(\S+)', note,
- flags=re.IGNORECASE|re.MULTILINE)
- if match:
- if loopback_base is None:
- try:
- loopback_base = ipaddress.ip_interface(match.group(1))
- except ValueError:
- die("Invalid loopback address '{}'".format(match.group(1)))
- else:
- die("Multiple loopback addresses")
-
- # infrastructure address
- match = re.search(r'^ *infralink: *(\S+)', note,
- flags=re.IGNORECASE|re.MULTILINE)
- if match:
- if infra_base is None:
- try:
- infra_base = ipaddress.ip_interface(match.group(1))
- except ValueError:
- die("Invalid infrastructure link address '{}'".format(match.group(1)))
- if infra_base.network.num_addresses < 4:
- die("Network '{}' is too small for 2 interface addresses.".format(infra_base.with_prefixlen))
- if infra_base.ip == infra_base.network.network_address:
- infra_base = next_ip_address(infra_base)
- if infra_base.ip + 1 >= infra_base.network.broadcast_address:
- die("'{}' or it's next address is the broadcast address.".format(infra_base.with_prefixlen))
- else:
- die("Multiple infrastructure link addresses")
-
- if loopback_base is None:
- die("No loopback address defined")
- if infra_base is None:
- die("No infrastructure link (InfraLink) address defined")
-
- return loopback_base, infra_base
-
-
- def cisco_router_config(notes):
- """ get additional cisco router config """
-
- router_config = []
-
- for note in notes:
- match = re.search(r'\bcisco +router +config *:', note,
- flags=re.IGNORECASE)
- if match:
- conf_pos = match.end()
- router_config.extend(note[conf_pos:].strip().splitlines())
-
- return router_config
-
-
- def sorted_node_names(nodes):
- """ return sorted list of node names """
- return sorted(nodes, key=lambda k: str(k).lower())
-
-
- def sorted_links(links):
- """ return sorted list of node links """
- return sorted(links, key=lambda k: "" if k['label'] is None else \
- str(k['label']).lower())
-
-
- def add_link_ip(nodes, vlan_interfaces, ip_net_base):
- """" Add IP addresses to the links between the nodes """
-
- for name in sorted_node_names(nodes):
- if '_vlans' in nodes[name]:
- continue
- for link in sorted_links(nodes[name]['_links']):
- if not link['label'] or not link['remote_label']:
- continue
- if 'IP' in link:
- continue
- if link['remote_name'] in vlan_interfaces:
- match = re.search(r'\bvlan *(\d+)\b', link['remote_label'], re.IGNORECASE)
- if match: # link to switch group
- vlan = int(match.group(1))
- ip_addr = ip_net_base
- ip_count = 0
- ip_last_link = None
- for switch_if in vlan_interfaces[link['remote_name']][vlan]:
- rem_name = switch_if[2]
- rem_link_id = switch_if[4]
- if rem_name not in vlan_interfaces:
- if rem_name in nodes:
- for rem_link in nodes[rem_name]['_links']:
- if rem_link['link_id'] == rem_link_id:
- rem_link['IP'] = ip_addr
- ip_count += 1
- ip_last_link = rem_link
- break
- ip_addr = next_ip_address(ip_addr)
- if ip_count == 1: # ignore networks with 1 IP
- del ip_last_link['IP']
- elif ip_count >= 2:
- ip_net_base = next_ip_network(ip_net_base)
- elif link['remote_name'] in nodes:
- for rem_link in nodes[link['remote_name']]['_links']:
- if rem_link['link_id'] == link['link_id']:
- if 'IP' not in rem_link:
- link['IP'] = ip_net_base
- rem_link['IP'] = next_ip_address(ip_net_base)
- ip_net_base = next_ip_network(ip_net_base)
- break
- return ip_net_base
-
-
- def add_loopback(nodes, ip_net_base):
- """" Add loopback to nodes """
-
- for name in sorted_node_names(nodes):
- if '_vlans' in nodes[name]:
- continue
- nodes[name].setdefault("_loopback", {})
- if 0 not in nodes[name]['_loopback']:
- nodes[name]['_loopback'][0] = ip_net_base
- ip_net_base = next_ip_network(ip_net_base)
- return ip_net_base
-
-
- def str_clean(arg):
- """ cleanup string: collapse whitespaces """
- return " ".join(str(arg).split())
-
-
- class CiscoRouter(dict):
- """ cisco router """
-
- def create_config(self):
- """ create router configuration """
-
- config = []
- if self['node_type'] == 'qemu':
- config.append("hostname {}".format(self['name']))
-
- for loopback in sorted(self.get('_loopback', [])):
- ip, mask = self['_loopback'][loopback].with_netmask.split('/', 1)
- config.append("interface lo{}".format(loopback))
- config.append(" ip address {} {}".format(ip, mask))
-
- for link in sorted_links(self['_links']):
- if not link['label']:
- continue
- config.append("interface {}".format(link['label']))
- config.append(" description {} {}".format(
- link['remote_name'], str_clean(link['remote_label'])))
- if 'IP' in link:
- ip, mask = link['IP'].with_netmask.split('/', 1)
- config.append(" ip address {} {}".format(ip, mask))
- config.append(" no shutdown")
-
- config += self.get('_router_config', [])
- if config:
- config = ["configure terminal"] + config + ["end"]
-
- return config
-
- def send_commands(self, commands):
- """ send commands to router """
- return send_cisco_commands(self['name'], self["console_host"],
- self["console"], commands)
-
- class CiscoSwitch(dict):
- """ cisco switch """
-
- def create_config(self):
- """ create switch configuration """
-
- config = []
- vlan_database = []
-
- if self['node_type'] == 'qemu':
- config.append("hostname {}".format(self['name']))
-
- if self['node_type'] == 'dynamips':
- for vlan in self.get('_vlans', []):
- vlan_database.append("vlan {}".format(vlan))
- if vlan_database:
- vlan_database = ["vlan database"] + vlan_database + ["exit"]
- else:
- for vlan in self.get('_vlans', []):
- config.append("vlan {}".format(vlan))
-
- for link in sorted_links(self['_links']):
- if not link['label']:
- continue
- match = re.match(r'([a-zA-Z]+ ?)?[0-9.:/]*[0-9]', link['label'])
- if not match:
- continue
- ifname = match.group(0)
- config.append("interface {}".format(ifname))
- config.append(" description {} {}".format(
- link['remote_name'], str_clean(link['remote_label'])))
- if re.search(r'\btrunk\b', link['label'], re.IGNORECASE):
- config.append(" switchport trunk encapsulation dot1q")
- config.append(" switchport mode trunk")
- else:
- match = re.search(r'\bvlan *(\d+)\b', link['label'], re.IGNORECASE)
- if match: # access link
- vlan = int(match.group(1))
- config.append(" switchport access vlan {}".format(vlan))
- config.append(" switchport mode access")
-
- if config:
- config = ["configure terminal"] + config + ["end"]
- config = vlan_database + config
-
- return config
-
- def send_commands(self, commands):
- """ send commands to switch """
- return send_cisco_commands(self['name'], self["console_host"],
- self["console"], commands)
-
-
- def select_cisco_devices(nodes, notes):
- """ select cisco devices, using some heuristics """
-
- devices = {}
- router_config = cisco_router_config(notes)
- print("Checking for devices, that are non Cisco router/switches...")
- for name in sorted_node_names(nodes):
- node = nodes[name]
- if node["node_type"] == 'dynamips':
- properties = node.get("properties", {})
- if "NM-16ESW" in (properties.get("slot0"), properties.get("slot1"),
- properties.get("slot2"), properties.get("slot3"),
- properties.get("slot4"), properties.get("slot5"),
- properties.get("slot6")):
- devices[name] = CiscoSwitch(node)
- else:
- devices[name] = CiscoRouter(node)
- if router_config:
- devices[name]['_router_config'] = router_config
- elif node["node_type"] == 'iou':
- properties = node.get("properties", {})
- image = properties.get("path", "").lower()
- image = image.split("/")[-1]
- if "l2" in image:
- devices[name] = CiscoSwitch(node)
- elif "l3" in image:
- devices[name] = CiscoRouter(node)
- if router_config:
- devices[name]['_router_config'] = router_config
- else:
- print(" {}: unknown IOU device type".format(node["name"]))
- elif node["node_type"] == 'qemu':
- properties = node.get("properties", {})
- image = properties.get("hda_disk_image", "")
- image = image.replace("\\", "/").split("/")[-1].lower()
- if "ios" in image:
- if "l2" in image:
- devices[name] = CiscoSwitch(node)
- else:
- devices[name] = CiscoRouter(node)
- if router_config:
- devices[name]['_router_config'] = router_config
- else:
- print(" {}: is not an IOS node".format(node["name"]))
- else:
- print(" {}: Non Cisco type '{}'".format(node["name"], node["node_type"]))
-
- return devices
-
-
- def parse_args(argv):
- """ parse command line args and determine the project ID """
-
- argc = len(argv)
- if argc <= 1 or argv[1] == '-h' or argv[1] == '-?':
- prog_name = os.path.splitext(os.path.basename(argv[0]))[0]
- die("Usage: {} [<GNS3 profile>] <project>".format(prog_name))
-
- if argc <= 3: # started as a script
- if argc == 2: # only project, default profile
- profile = None
- project_name = argv[1]
- else:
- profile = argv[1]
- project_name = argv[2]
- sel_items = []
-
- # connect to GNS3 controller
- try:
- api = gns3api.GNS3Api(profile=profile)
- except gns3api.GNS3ApiException as err:
- die("Can't connect to GNS3 controller:", err)
-
- try: # check, if project is UUID
- uuid.UUID(project_name)
- project_id = project_name
- except ValueError: # convert project name to UUID
- # search for the project id
- for proj in api.request('GET', '/v2/projects'):
- if proj['name'] == project_name:
- project_id = proj['project_id']
- break
- else:
- die("Project '{}' not found".format(project_name))
-
- else: # started as an external tool
- try:
- with open(argv[2], "r") as file:
- ctl_url, ctl_user, ctl_passwd, *_ = file.read(512).splitlines()
- if argv[2].endswith(".tmp"):
- os.remove(argv[2])
- except (OSError, ValueError) as err:
- die("Can't get controller connection params:", err)
- project_id = argv[3]
- sel_items = argv[4:]
-
- # connect to GNS3 controller
- try:
- api = gns3api.GNS3Api(ctl_url, ctl_user, ctl_passwd)
- except gns3api.GNS3ApiException as err:
- die("Can't connect to GNS3 controller:", err)
-
- return api, project_id, sel_items
-
-
- def main(argv):
- """ Main function """
-
- api, project_id, sel_items = parse_args(argv)
-
- # get nodes (with link informations) and notes by GNS3 API
- nodes, notes = get_project_data(api, project_id, sel_items)
- if not nodes:
- die("No nodes selected")
- vlan_interfaces = get_vlan_interfaces(nodes)
-
- # get base networks from notes
- loopback_base, infra_base = base_networks(notes)
-
- # select the Cisco devices
- devices = select_cisco_devices(nodes, notes)
- if not devices:
- die("No Cisco routers/switches found")
-
- # assign links and IP addresses
- add_loopback(devices, loopback_base)
- add_link_ip(devices, vlan_interfaces, infra_base)
-
- # configure devices
- print("Configuring...")
- for name in sorted_node_names(devices):
- node = devices[name]
- if node["status"] != "started":
- sys.stderr.write("{}: Node status is '{}'\n".format(name, node["status"]))
- elif node.get("console") is None or \
- node.get("console_host") is None or \
- node.get("console_type") != "telnet":
- sys.stderr.write("{}: Doesn't use telnet console\n".format(name))
- else:
- config = node.create_config()
- if config:
- print("{}...".format(name))
- node.send_commands(config)
- else:
- print("{}: Nothing to configure.".format(name))
-
-
- if __name__ == "__main__":
- main(sys.argv)
|