gns3api - Simple python module to access the GNS3 API
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

335 lines
11 KiB

# Copyright (C) 2017-2019 Bernhard Ehlers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Access GNS3 controller via API
"""
import os
import sys
import ssl
import json
from base64 import b64encode
import configparser
import http.client
from urllib.parse import urlsplit
class GNS3ApiException(OSError):
"""
GNS3 API Exceptions, base class
"""
def __init__(self, *args):
super().__init__()
self.args = args
class GNS3ConfigurationError(GNS3ApiException):
"""
GNS3 configuration error
"""
def __init__(self, message="Missing/invalid GNS3 configuration"):
GNS3ApiException.__init__(self, message)
class HTTPClientError(GNS3ApiException):
"""
HTTP client library error
"""
def __str__(self):
return ": ".join(str(x) for x in self.args)
class HTTPError(GNS3ApiException):
"""
HTTP response error
"""
def __str__(self):
if len(self.args) >= 2:
return '[Status {}] '.format(self.args[0]) + \
" ".join(str(x) for x in self.args[1:])
return str(self.args[0])
class GNS3Api:
"""
GNS3 API - an API to GNS3
"""
def __init__(self, url=None, user=None, password=None,
profile=None, version=None, verify=True):
"""
GNS3 API
:param url: Server URL, if None the connection parameters
are read from the GNS3 configuration file
:param user: User name, None for no authentification
:param password: Password
:param profile: GNS3 configuration profile
:param version: API version, None for autodetect
:param verify: Verify CERT (on https), default True
False: no CERT verification
True: verification using the system CA certificates
file: verification using the file and the system CA
"""
if not url:
(url, user, password) = \
GNS3Api.get_controller_connection(profile, version)
# split URL
try:
url_tuple = urlsplit(url, "http")
if not url_tuple.netloc: # fix missing "//" before host
url_tuple = urlsplit(url_tuple.scheme + "://" + url_tuple.path)
proto = url_tuple.scheme
host = url_tuple.hostname
port = url_tuple.port or 3080
if user is None:
user = url_tuple.username
if password is None:
password = url_tuple.password
except ValueError as err:
raise HTTPClientError("UrlError", str(err))
if host == '0.0.0.0':
host = '127.0.0.1'
elif host == '::':
host = '::1'
if ':' in host: # IPv6
self.controller = "{}://[{}]:{}".format(proto, host, port)
else:
self.controller = "{}://{}:{}".format(proto, host, port)
# authentication
self._auth = {}
if user is not None and user != '':
if password is None:
password = ''
self._auth['Authorization'] = 'Basic ' + \
b64encode((user+':'+password).encode('utf-8')).decode('ascii')
# open connection
self.status_code = None
try:
if proto == 'http':
self._conn = http.client.HTTPConnection(host, port, timeout=10)
elif proto == 'https':
context = ssl.create_default_context()
if isinstance(verify, str):
context.check_hostname = False
context.load_verify_locations(cafile=verify)
elif not verify:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self._conn = http.client.HTTPSConnection(host, port, timeout=10,
context=context)
else:
raise HTTPClientError("UnknownProtocol", proto)
self._conn.connect()
except (OSError, http.client.HTTPException) as err:
raise HTTPClientError(type(err).__name__, str(err))
@staticmethod
def get_controller_settings(profile=None, version=None):
"""
Get GNS3 controller settings
:param profile: GNS3 configuration profile
:param version: API version, None for autodetect
:returns: controller settings
"""
# find config base directory
if sys.platform.startswith('win'):
fn_conf = os.path.join(os.path.expandvars('%APPDATA%'), 'GNS3')
fn_file = 'gns3_server.ini'
else:
fn_conf = os.path.join(os.path.expanduser('~'), '.config', 'GNS3')
fn_file = 'gns3_server.conf'
# add version
if version is None: # search for highest version
version = ""
version_split = []
for name in os.listdir(fn_conf):
try:
name_split = list(map(int, name.split('.')))
if name_split > version_split:
version = name
version_split = name_split
except ValueError:
pass
else: # use only <major>.<minor> part
version = '.'.join(version.split('.', 2)[0:2])
if version not in ('', '2.0', '2.1'):
fn_conf = os.path.join(fn_conf, version)
# add profile
if profile and profile != "default":
fn_conf = os.path.join(fn_conf, 'profiles', profile)
# add config filename
fn_conf = os.path.join(fn_conf, fn_file)
# parse config
config = configparser.ConfigParser()
serv_conf = None
try:
if config.read(fn_conf):
serv_conf = config['Server']
except (OSError, KeyError, configparser.Error) as err:
raise GNS3ConfigurationError("Error reading GNS3 configuration: {}".format(err))
if serv_conf is None:
raise GNS3ConfigurationError("Missing GNS3 configuration file '{}'".format(fn_conf))
# check for mandatory host key
if 'host' not in serv_conf:
raise GNS3ConfigurationError("GNS3 configuration: Missing host")
return serv_conf
@staticmethod
def get_controller_connection(profile=None, version=None):
"""
Get GNS3 controller connection parameters
:param profile: GNS3 configuration profile
:param version: API version, None for autodetect
:returns: Tuple of url, user, password
"""
serv_conf = GNS3Api.get_controller_settings(profile, version)
host = serv_conf['host']
proto = serv_conf.get('protocol', 'http')
port = int(serv_conf.get('port', 3080))
if serv_conf.get('auth'):
user = serv_conf.get('user', None)
password = serv_conf.get('password', None)
else:
user = None
password = None
# create URL
if ':' in host: # IPv6
url = "{}://[{}]:{}".format(proto, host, port)
else:
url = "{}://{}:{}".format(proto, host, port)
return (url, user, password)
def request_file(self, method, path, args=None, timeout=60):
"""
API request
:param method: HTTP method ('GET'/'PUT'/'POST'/'DELETE')
:param path: URL path, can be a list or tuple
:param args: arguments to the API endpoint
:param timeout: timeout, default 60
:returns: HTTPResponse, a file object
"""
headers = {'User-Agent': 'GNS3Api'}
self.status_code = None
# json encode args
if isinstance(args, dict):
body = json.dumps(args, separators=(',', ':'))
headers['Content-Type'] = 'application/json'
else:
body = args
headers['Content-Type'] = 'application/octet-stream'
# methods are upper case
method = method.upper()
if method not in ('GET', 'PUT', 'POST', 'DELETE'):
raise HTTPError(405, "405: Method Not Allowed")
# make path variable to an URL path
if isinstance(path, (list, tuple)):
path = "/".join(str(x) for x in path)
else:
path = str(path)
if not path.startswith("/"):
path = "/" + path
# update timeout
if self._conn.timeout != timeout:
self._conn.timeout = timeout
if self._conn.sock:
self._conn.sock.settimeout(timeout)
# send request / get response
headers.update(self._auth)
try:
self._conn.request(method, path, body, headers=headers)
resp = self._conn.getresponse()
self.status_code = resp.status
except (OSError, http.client.HTTPException) as err:
raise HTTPClientError(type(err).__name__, str(err))
# check for errors
if self.status_code < 200 or self.status_code >= 300:
try:
message = resp.read()
except (OSError, http.client.HTTPException):
message = resp.reason
else:
if message:
message = message.decode('utf-8', errors='ignore')
if resp.getheader('Content-Type') == 'application/json':
message = json.loads(message)
try:
message = message['message']
except (TypeError, KeyError):
pass
else:
message = resp.reason
raise HTTPError(self.status_code, message)
return resp
def request(self, method, path, args=None, timeout=60):
"""
API request
:param method: HTTP method ('GET'/'PUT'/'POST'/'DELETE')
:param path: URL path, can be a list or tuple
:param args: arguments to the API endpoint
:param timeout: timeout, default 60
:returns: result
"""
resp = self.request_file(method, path, args, timeout)
# read response
try:
result = resp.read()
except (OSError, http.client.HTTPException) as err:
raise HTTPClientError(type(err).__name__, str(err))
if resp.getheader('Content-Type') == 'application/json':
result = json.loads(result.decode('utf-8', errors='ignore'))
return result
def close(self):
"""
Closes HTTP(S) connection
"""
self._conn.close()