|
- #!/usr/local/bin/python3
- # verify the path of python3, it might be /usr/bin/python3
-
- # Copyright (C) 2017-2019 Bernhard Ehlers
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
-
- """
- otrdecoder - decoder for .otrkey files.
-
- usage: otrdecoder [-h] [-v] -i FILE -e EMAIL -p PASSWORD [-o OUTDIR] [-f] [-q]
-
- optional arguments:
- -h, --help, -? prints this screen
- -v prints version
- -i FILE use FILE as input file
- -e EMAIL use EMAIL to fetch the key directly from otr
- -p PASSWORD use PASSWORD to fetch the key directly from otr
- -o OUTDIR use OUTDIR as output directory (default: .)
- -f force overwriting of output file
- -q don't verify input file before processing.
- """
- __version__ = "0.1"
-
- import argparse
- import binascii
- import hashlib
- import os.path
- import random
- import sys
- import tempfile
- import time
- from datetime import datetime
- from http.client import HTTPConnection, HTTPException
-
-
- BUF_SIZE = 1024*1024
- DECODER_VERSION = "0.4.1133"
- FILE_HDR_LEN = 10 + 512
- OTR_HOST = "www.onlinetvrecorder.com"
- LOGFILE = os.path.join(tempfile.gettempdir(), "otrdecoder.log")
-
-
- """
- detect cryptographic module, supported are cryptography and PyCryptodome
- """
- PYCRYPTO = False
- try:
- from Crypto.Cipher import Blowfish as crypto_Blowfish
- PYCRYPTO = True
- except ImportError:
- try:
- from cryptography.hazmat.primitives.ciphers import \
- Cipher as crypto_Cipher, \
- algorithms as crypto_algorithms, \
- modes as crypto_modes
- from cryptography.hazmat.backends import \
- default_backend as crypto_backend
- except ImportError:
- sys.exit("No cryptographic module found - install cryptography or PyCryptodome")
-
- class BlowfishLE():
- """ Blowfish cryptography with non-standard little endian"""
-
- MODE_ECB = 1 #: Electronic Code Book (ECB)
- MODE_CBC = 2 #: Cipher-Block Chaining (CBC)
-
- block_size = 8 # Blowfish block size in bytes
-
- @staticmethod
- def swap_endian(data):
- """ convert between big-endian and little-endian (and vice versa) """
-
- # Swap byte order in each DWORD
- endian_swapped = bytearray(len(data))
- endian_swapped[0::4] = data[3::4]
- endian_swapped[1::4] = data[2::4]
- endian_swapped[2::4] = data[1::4]
- endian_swapped[3::4] = data[0::4]
-
- return bytes(endian_swapped)
-
- if PYCRYPTO:
-
- def __init__(self, key, mode, iv=None):
- if mode == self.MODE_CBC:
- if iv is None:
- raise ValueError("Missing IV")
- self.cipher = crypto_Blowfish.new(key, crypto_Blowfish.MODE_CBC,
- self.swap_endian(iv))
- elif mode == self.MODE_ECB:
- self.cipher = crypto_Blowfish.new(key, crypto_Blowfish.MODE_ECB)
- else:
- raise ValueError("Unknown mode {}".format(mode))
-
- def decrypt(self, data):
- """ decrypt data """
- return self.swap_endian(self.cipher.decrypt(self.swap_endian(data)))
-
- def encrypt(self, data):
- """ encrypt data """
- return self.swap_endian(self.cipher.encrypt(self.swap_endian(data)))
-
- else:
-
- def __init__(self, key, mode, iv=None):
- if mode == self.MODE_CBC:
- if iv is None:
- raise ValueError("Missing IV")
- self.cipher = crypto_Cipher(crypto_algorithms.Blowfish(key),
- crypto_modes.CBC(self.swap_endian(iv)),
- backend=crypto_backend())
- elif mode == self.MODE_ECB:
- self.cipher = crypto_Cipher(crypto_algorithms.Blowfish(key),
- crypto_modes.ECB(),
- backend=crypto_backend())
- else:
- raise ValueError("Unknown mode {}".format(mode))
- self.decryptor = self.cipher.decryptor()
- self.encryptor = self.cipher.encryptor()
-
- def decrypt(self, data):
- """ decrypt data """
- if len(data) % self.block_size != 0:
- raise ValueError("Data length must be multiple of block size")
- return self.swap_endian(self.decryptor.update(self.swap_endian(data)))
-
- def encrypt(self, data):
- """ encrypt data """
- if len(data) % self.block_size != 0:
- raise ValueError("Data length must be multiple of block size")
- return self.swap_endian(self.encryptor.update(self.swap_endian(data)))
-
-
- def die(*msg_list):
- """ abort program with error message """
-
- error_msg = ' '.join(str(x) for x in msg_list).rstrip("\r\n")
- if LOGFILE:
- with open(LOGFILE, "a") as errlog:
- errlog.write(datetime.now().strftime("%Y-%m-%d %X: ") +
- error_msg + "\n")
- sys.exit(error_msg)
-
-
- def write(msg):
- """ print to stdout, unbuffered and without adding '\n' """
-
- sys.stdout.write(msg)
- sys.stdout.flush()
-
-
- def md5(data):
- """ calculate MD5 hash """
-
- if isinstance(data, str):
- data = data.encode('utf-8')
- md5_hash = hashlib.md5()
- md5_hash.update(data)
- return md5_hash.hexdigest().upper()
-
-
- def parse_file_header(file):
- """ parse OTR file header """
-
- hardkey = binascii.a2b_hex('EF3AB29CD19F0CAC5759C7ABD12CC92BA3FE0AFEBF960D63FEBD0F45')
-
- # read header and check magic
- header = file.read(FILE_HDR_LEN)
- if len(header) != FILE_HDR_LEN or header[:10] != b'OTRKEYFILE':
- die("OTRkey file has bad format.")
-
- # decipher header
- cipher = BlowfishLE(hardkey, BlowfishLE.MODE_ECB)
- header = cipher.decrypt(header[10:])
- header_len = header.find(b'&PD=')
- if header_len < 0:
- die("Corrupted file header: could not find padding.")
- try:
- header = header[:header_len].decode("ascii")
- except UnicodeError:
- die("Corrupted file header: invalid data.")
-
- # split header
- file_info = {}
- for option in header.split("&"):
- key_value = option.split("=", 1)
- if len(key_value) == 2:
- file_info[key_value[0]] = key_value[1]
-
- # check for important keys
- for key in ("FN", "FH", "OH", "SZ"):
- if key not in file_info:
- die("Corrupted file header: key '{}' is missing.".format(key))
-
- return file_info
-
-
- def generate_bigkey(email, password, utc_date):
- """ generate key for sending/receiving requests to the OTR server """
-
- mail_hash = md5(email)
- pass_hash = md5(password)
-
- bigkey_hex = mail_hash[0:13] + \
- utc_date[0:4] + \
- pass_hash[0:11] + \
- utc_date[4:6] + \
- mail_hash[21:32] + \
- utc_date[6:8] + \
- pass_hash[19:32]
- return binascii.a2b_hex(bigkey_hex)
-
-
- def generate_request(email, password, utc_date, file_info, bigkey):
- """ generate request string """
-
- request = "&A=" + email + \
- "&P=" + password + \
- "&FN=" + file_info["FN"] + \
- "&OH=" + file_info["OH"] + \
- "&M=" + md5("Foo Bar") + \
- "&OS=" + md5("Windows") + \
- "&LN=DE" + \
- "&VN=" + DECODER_VERSION + \
- "&IR=TRUE" + \
- "&IK=aFzW1tL7nP9vXd8yUfB5kLoSyATQ" + \
- "&D="
- request += ''.join(random.choice("0123456789ABCDEF") \
- for _ in range(512-BlowfishLE.block_size-len(request)))
-
- # encrypt request
- bf_iv = bytes(random.randint(0, 255) \
- for _ in range(BlowfishLE.block_size))
- cipher = BlowfishLE(bigkey, BlowfishLE.MODE_CBC, bf_iv)
- request = bf_iv + cipher.encrypt(request.encode('ascii'))
-
- # convert to a HTTP request
- request = "/quelle_neu1.php" + \
- "?code=" + binascii.b2a_base64(request)[:-1].decode('ascii') + \
- "&AA=" + email + \
- "&ZZ=" + utc_date
-
- return request
-
-
- def query_server(email, password, utc_date, file_info, bigkey):
- """ send request to OTR server and return its response """
-
- request = generate_request(email, password, utc_date, file_info, bigkey)
-
- try:
- # connect to host
- conn = HTTPConnection(OTR_HOST)
-
- # send request
- headers = {'User-Agent': 'Windows-OTR-Decoder/' + DECODER_VERSION,
- 'Accept': '*/*'}
- conn.request('GET', request, headers=headers)
-
- # get response, check for HTTP errors
- resp = conn.getresponse()
- response = resp.read()
- if resp.status < 200 or resp.status >= 300:
- die("HTTP failure: {} - {}".format(resp.status, resp.reason))
- except (HTTPException, IOError, OSError) as err:
- die("HTTP Exception:", err)
-
- return response
-
-
- def get_keyphrase(email, password, file_info):
- """ get keyphrase from remote OTR server """
-
- # query server for keyphrase
- utc_date = datetime.utcnow().strftime("%Y%m%d")
- bigkey = generate_bigkey(email, password, utc_date)
- response = query_server(email, password, utc_date, file_info, bigkey)
-
- # check for error message
- if response[:27] == b'MessageToBePrintedInDecoder':
- response = response[27:].lstrip().splitlines()[0]
- die('Response message:', response.decode('utf-8', 'replace'))
-
- # decrypt response
- try:
- response = binascii.a2b_base64(response)
- except binascii.Error:
- die("Corrupted response: invalid data.")
- if len(response) < 2 * BlowfishLE.block_size or \
- len(response) % BlowfishLE.block_size != 0:
- die("Corrupted response: length must be a multiple of {}."
- .format(BlowfishLE.block_size))
- bf_iv = response[0:BlowfishLE.block_size]
- cipher = BlowfishLE(bigkey, BlowfishLE.MODE_CBC, bf_iv)
- response = cipher.decrypt(response[BlowfishLE.block_size:])
-
- # strip padding
- resp_len = response.find(b'&D=')
- if resp_len < 0:
- die("Corrupted response: could not find padding")
- try:
- response = response[:resp_len].decode("ascii")
- except UnicodeError:
- die("Corrupted response: invalid data.")
-
- # search for keyphrase
- for option in response.split("&"):
- key_value = option.split("=", 1)
- if len(key_value) == 2 and key_value[0] == "HP":
- keyphrase = key_value[1]
- break
- else:
- die("Response lacks keyphrase")
- if len(keyphrase) != 56:
- die("Keyphrase has wrong length")
- try:
- keyphrase = binascii.a2b_hex(keyphrase)
- except (TypeError, binascii.Error):
- die("Keyphrase has invalid data.")
-
- return keyphrase
-
-
- class ShowProgress():
- """ show progress info """
-
- def __init__(self, maximum=100, text="Progress: {:3d}%\r"):
- self._maximum = float(maximum)
- self._text = text
- self._progress = -1
-
- def show(self, current):
- """ show current percentage """
- _progress = int(0.01 + 100.0 * float(current) / self._maximum)
- if _progress != self._progress:
- self._progress = _progress
- write(self._text.format(_progress))
-
- def end(self):
- """ end display of progress info """
- self._progress = -1
- write("\n")
-
-
- def verify_file(in_out, file, file_size, file_hash):
- """ verify file """
-
- write("Verifying {}...\n".format(in_out.lower()))
-
- # get target MD5
- if len(file_hash) != 48:
- die("Hash in file header has unexpected format.")
- try:
- md5_checksum = "".join(file_hash[i:i+2] for i in range(0, 48, 3))
- md5_checksum = binascii.a2b_hex(md5_checksum)
- except (TypeError, binascii.Error):
- die("Hash in file header has unexpected format.")
-
- # read file
- md5_hash = hashlib.md5()
- file_size = int(file_size) - FILE_HDR_LEN
- progress = ShowProgress(file_size)
- offset = 0
- while True:
- buffer = file.read(BUF_SIZE)
- if not buffer:
- break
- offset += len(buffer)
- if offset > file_size:
- progress.show(file_size)
- progress.end()
- die(in_out + " file contains trailing garbage.")
- md5_hash.update(buffer)
- progress.show(offset)
-
- progress.end()
-
- if offset < file_size:
- die(in_out + " file is too short.")
-
- # check MD5
- if md5_hash.digest() != md5_checksum:
- die(in_out + " file verification failed.")
-
-
- def verify_input(input_file, file_info):
- """ verify input file """
-
- input_file.seek(FILE_HDR_LEN, 0)
- verify_file("Input", input_file, file_info['SZ'], file_info['OH'])
- input_file.seek(FILE_HDR_LEN, 0)
- write("Successfully verified.\n")
-
-
- def verify_output(output_file, file_info):
- """ verify output file """
-
- output_file.seek(0, 0)
- verify_file("Output", output_file, file_info['SZ'], file_info['FH'])
- output_file.seek(0, 2)
-
-
- def decode_file(input_file, output_file, file_info, keyphrase):
- """ decode file """
-
- write("Decoding...\n")
-
- # decode with keyphrase from OTR server
- cipher = BlowfishLE(keyphrase, BlowfishLE.MODE_ECB)
- file_size = int(file_info['SZ']) - FILE_HDR_LEN
- progress = ShowProgress(file_size)
- offset = 0
- too_long = False
- while True:
- buffer = input_file.read(BUF_SIZE)
- data_len = len(buffer)
- if offset + data_len > file_size: # stop at file_size
- too_long = True
- data_len = file_size - offset
- buffer = buffer[:data_len]
- if data_len < BUF_SIZE:
- break
- buffer = cipher.decrypt(buffer)
- output_file.write(buffer)
- offset += data_len
- progress.show(offset)
-
- # last block
- if data_len > 0:
- crypt_len = data_len - data_len % cipher.block_size
- buffer = cipher.decrypt(buffer[:crypt_len]) + buffer[crypt_len:]
- output_file.write(buffer)
- offset += data_len
- progress.show(offset)
-
- progress.end()
-
- # error checks
- if too_long:
- write("Warning: Input file contains trailing garbage.\n")
- elif offset < file_size:
- write("Warning: Input file is truncated.\n")
-
-
- def main(argv):
- """ main """
-
- # parse command line
- parser = argparse.ArgumentParser(add_help=False, \
- description='%(prog)s - decoder for .otrkey files.')
- parser.add_argument('-h', '--help', '-?', action='help',
- help='prints this screen')
- parser.add_argument('-v', action='version',
- version="%(prog)s v" + __version__ + \
- " (emulates v" + DECODER_VERSION+")",
- help='prints version')
- parser.add_argument('-i', dest='input', metavar='FILE', required=True,
- help='use FILE as input file')
- parser.add_argument('-e', dest='email', metavar='EMAIL', required=True,
- help='use EMAIL to fetch the key directly from otr')
- parser.add_argument('-p', dest='password', metavar='PASSWORD', required=True,
- help='use PASSWORD to fetch the key directly from otr')
- parser.add_argument('-o', dest='outdir', metavar='OUTDIR', default='.',
- help='use OUTDIR as output directory (default: .)')
- parser.add_argument('-f', dest='force', action='store_true',
- help='force overwriting of output file')
- parser.add_argument('-q', dest='verify', action='store_false',
- help='don\'t verify input file before processing.')
- args = parser.parse_args(argv[1:])
-
- # process file
- try:
- with open(args.input, "rb") as otr_file:
- # read header
- otr_info = parse_file_header(otr_file)
- out_fname = os.path.join(args.outdir, otr_info['FN'])
- if not args.force and os.path.exists(out_fname):
- die("Output file already exists.")
- # verify input file
- if args.verify:
- verify_input(otr_file, otr_info)
- time.sleep(0.1)
-
- # get keyphrase for decoding rest of file
- otr_keyphrase = get_keyphrase(args.email, args.password, otr_info)
-
- # decode and save payload
- with open(out_fname, "w+b") as out_file:
- decode_file(otr_file, out_file, otr_info, otr_keyphrase)
- if args.verify:
- time.sleep(0.1)
- verify_output(out_file, otr_info)
-
- except (IOError, OSError) as err:
- die("I/O Error:", err)
-
-
- if __name__ == "__main__":
- main(sys.argv)
|