|
- #!/usr/bin/env python3
- """
- gns3feed - generate ATOM/RSS feed from GNS3 community website
-
- usage: gns3feed [-h] [-v] [-a file] [-r file]
-
- optional arguments:
- -h, --help, -? prints this screen
- -v, --version prints version
- -a file, --atom file generate ATOM feed
- -r file, --rss file generate RSS feed
- """
- __version__ = "0.2"
-
- import os
- import sys
- import argparse
- import hashlib
- import re
- import datetime
- import dateutil.parser
- import urllib3
- import requests
- from feedgen.feed import FeedGenerator
-
- HOST = "https://gns3.com"
-
- def requests_session_retry(retries=2, backoff=0.3):
- """ open requests session with retry parameter """
-
- session = requests.Session()
- retry = urllib3.util.retry.Retry(total=retries, backoff_factor=backoff)
- adapter = requests.adapters.HTTPAdapter(max_retries=retry)
- session.mount('http://', adapter)
- session.mount('https://', adapter)
- return session
-
-
- def get_data(session, url, data):
- """ get data from GNS3 website """
-
- headers = {'User-Agent': 'gns3feed/' + __version__,
- 'Content-Type': 'application/json',
- 'Referer': HOST+"/"}
-
- try:
- resp = session.post(HOST + url, headers=headers, timeout=30,
- json=data)
- resp.raise_for_status()
- if not resp.headers['Content-Type'].startswith('application/json'):
- sys.exit("{} - Unexpected content type {}".format(
- url, resp.headers['Content-Type']))
- data = resp.json()
- except (requests.exceptions.RequestException, ValueError) as err:
- msg = str(err)
- match = re.search(r"\(Caused by ([a-zA-Z0-9_]+)\('[^:]*: (.*)'\)", msg)
- if match:
- msg = "{}: {} for host: {}".format(*match.groups(), HOST)
- sys.exit("{} - {}".format(url, msg))
-
- return data
-
-
- def date_id(date):
- """ create an id from a date """
- return date.astimezone(datetime.timezone.utc).strftime('%Y%m%d%H%M%S')
-
-
- INVALID_CHARS = re.compile("[" \
- "\u0000-\u0008\u000B\u000E-\u001F\u007F-\u009F"
- "\uD800-\uDFFF\uFDD0-\uFDEF\uFFFE\uFFFF"
- "\U0001FFFE\U0001FFFF\U0002FFFE\U0002FFFF\U0003FFFE\U0003FFFF"
- "\U0004FFFE\U0004FFFF\U0005FFFE\U0005FFFF\U0006FFFE\U0006FFFF"
- "\U0007FFFE\U0007FFFF\U0008FFFE\U0008FFFF\U0009FFFE\U0009FFFF"
- "\U000AFFFE\U000AFFFF\U000BFFFE\U000BFFFF\U000CFFFE\U000CFFFF"
- "\U000DFFFE\U000DFFFF\U000EFFFE\U000EFFFF\U000FFFFE\U000FFFFF"
- "\U0010FFFE\U0010FFFF]")
- REPLACEMENT_CHAR = "\uFFFD"
-
- def clean_string(string):
- """ clean a string from invalid characters """
- return INVALID_CHARS.sub(REPLACEMENT_CHAR, string)
-
-
- def clean_content(content):
- """ clean the content of a post """
- content = clean_string(content)
- content = re.sub(r'<p>\s*</p>[ \t]*', "", content)
- content = re.sub(r'(</p>\s*)<br */?>[ \t]*', r'\1', content)
-
- return content
-
-
- ARTICLE_LIST_PARAMS = {
- "operationName": "getInitiativeList",
- "variables": {
- "limit": 50,
- "archived": "exclude",
- "sortBy": "dateLastUpdated",
- "typeIds": [
- "5cc2471c7979fb3b5d34a6cd",
- "5e47720e5f1d753f4500ec92",
- "5ebe1b516c92855b3c810e20"
- ]
- },
- "query": 'query getInitiativeList($archived: InitiativeListArchivedOptions, $start: ID, $limit: Int = 50, $sortBy: InitiativeListSortOptions, $typeIds: [ID!]) {initiativeList(archived: $archived, start: $start, limit: $limit, sortBy: $sortBy, typeIds: $typeIds) {items {...InitiativeListItemFragment __typename} __typename}}\nfragment InitiativeListItemFragment on Initiative {id addedBy {id fullName __typename} archived body {summary __typename} class dateTimeAdded: dateAdded(format: "YYYY-MM-DDTHH:mm:ssZ") dateTimeLastUpdated: dateLastUpdated(format: "YYYY-MM-DDTHH:mm:ssZ") discussionFormat discussionStatus {closed __typename} labels {id color name __typename} name numNotes numViews pinned privacy slug thread {id lastEntryDateTime: lastEntryDate(format: "YYYY-MM-DDTHH:mm:ssZ") numEntries __typename} type {id slug __typename} __typename}'
- }
-
- def add_articles(feed, session):
- """ add latest articles """
-
- articles = get_data(session, "/api/graphql", ARTICLE_LIST_PARAMS)
- articles = articles['data']['initiativeList']['items']
- for article in articles:
- if article.get('thread') and \
- article['thread'].get('lastEntryDateTime'):
- last_modified = article['thread']['lastEntryDateTime']
- else:
- last_modified = article['dateTimeAdded']
- article['_last_modified'] = dateutil.parser.parse(last_modified)
- articles = sorted(articles, key=lambda k: k['_last_modified'],
- reverse=True)[0:-20]
-
- for article in articles:
- title = article['name']
- if article.get('discussionStatus') and \
- article['discussionStatus'].get('closed'):
- article['_last_modified'] += datetime.timedelta(seconds=1)
- title = "[Solved] " + title
- if article.get('thread') and \
- article['thread']['numEntries'] > 0:
- title += " ({})".format(article['thread']['numEntries'])
- feed_entry = feed.add_entry(order='append')
- feed_entry.id(HOST + '/feed/article/' + article['id'] + \
- '-' + date_id(article['_last_modified']))
- feed_entry.title(title)
- feed_entry.published(article['_last_modified'])
- feed_entry.updated(article['_last_modified'])
- feed_entry.author(name=article['addedBy']['fullName'])
- feed_entry.dc.dc_creator(article['addedBy']['fullName'])
- feed_entry.content(content=clean_content(article['body']['summary']),
- type="html")
- feed_entry.link(href="/".join((HOST, "community",
- article['type']['slug'],
- article['slug'])),
- rel='alternate')
-
-
- def generate_feed(atom_file, rss_file):
- """ generate ATOM/RSS feed """
-
- feed = FeedGenerator()
- feed.load_extension("dc", atom=False, rss=True)
- feed.title("GNS3 Discussions")
- feed.description("GNS3 Discussions")
- feed.link(href=HOST+"/community/featured", rel='alternate')
- feed.language('en')
-
- sess = requests_session_retry()
- add_articles(feed, sess)
-
- md5_hash = hashlib.md5()
- for feed_entry in feed.entry():
- md5_hash.update(feed_entry.id().encode('utf-8'))
- feed.id(HOST + '/feed/' + md5_hash.hexdigest())
-
- try:
- if atom_file:
- feed_type = "ATOM"
- atom_file = os.path.expanduser(atom_file)
- feed.atom_file(atom_file + ".new")
- os.replace(atom_file + ".new", atom_file)
- if rss_file:
- feed_type = "RSS"
- rss_file = os.path.expanduser(rss_file)
- feed.rss_file(rss_file + ".new")
- os.replace(rss_file + ".new", rss_file)
- except OSError as err:
- sys.exit("Can't create {} feed: {}".format(feed_type, err))
-
-
- # Main
- parser = argparse.ArgumentParser(add_help=False, \
- description='%(prog)s - generate ATOM/RSS feed from GNS3 community website')
- parser.add_argument('-h', '--help', '-?', action='help',
- help='prints this screen')
- parser.add_argument('-v', '--version', action='version',
- version="%(prog)s v" + __version__,
- help='prints version')
- parser.add_argument('-a', '--atom', metavar="file",
- help='generate ATOM feed')
- parser.add_argument('-r', '--rss', metavar="file",
- help='generate RSS feed')
- args = parser.parse_args()
-
- if not args.atom and not args.rss:
- parser.error("at least one of the options -a/--atom or -r/--rss must be used.")
-
- generate_feed(args.atom, args.rss)
|