otrdecoder - decoder for .otrkey files
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

516 lines
17 KiB

#!/usr/bin/env python3
# Copyright (C) 2017-2019 Bernhard Ehlers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
otrdecoder - decoder for .otrkey files.
usage: otrdecoder [-h] [-v] -i FILE -e EMAIL -p PASSWORD [-o OUTDIR] [-f] [-q]
optional arguments:
-h, --help, -? prints this screen
-v prints version
-i FILE use FILE as input file
-e EMAIL use EMAIL to fetch the key directly from otr
-p PASSWORD use PASSWORD to fetch the key directly from otr
-o OUTDIR use OUTDIR as output directory (default: .)
-f force overwriting of output file
-q don't verify input file before processing.
"""
__version__ = "0.1"
import argparse
import binascii
import hashlib
import os.path
import random
import sys
import tempfile
import time
from datetime import datetime
from http.client import HTTPConnection, HTTPException
BUF_SIZE = 1024*1024
DECODER_VERSION = "0.4.593" # normal/simple decoder for all users
#DECODER_VERSION = "0.4.1133" # easydecoder only for premium users
FILE_HDR_LEN = 10 + 512
OTR_HOST = "www.onlinetvrecorder.com"
OTR_LANGUAGE = "EN" # language of OTR messages: DE or EN
LOGFILE = os.path.join(tempfile.gettempdir(), "otrdecoder.log")
"""
detect cryptographic module, supported are cryptography and PyCryptodome
"""
PYCRYPTO = False
try:
from Crypto.Cipher import Blowfish as crypto_Blowfish
PYCRYPTO = True
except ImportError:
try:
from cryptography.hazmat.primitives.ciphers import \
Cipher as crypto_Cipher, \
algorithms as crypto_algorithms, \
modes as crypto_modes
from cryptography.hazmat.backends import \
default_backend as crypto_backend
except ImportError:
sys.exit("No cryptographic module found - install cryptography or PyCryptodome")
class BlowfishLE():
""" Blowfish cryptography with non-standard little endian"""
MODE_ECB = 1 #: Electronic Code Book (ECB)
MODE_CBC = 2 #: Cipher-Block Chaining (CBC)
block_size = 8 # Blowfish block size in bytes
@staticmethod
def swap_endian(data):
""" convert between big-endian and little-endian (and vice versa) """
# Swap byte order in each DWORD
endian_swapped = bytearray(len(data))
endian_swapped[0::4] = data[3::4]
endian_swapped[1::4] = data[2::4]
endian_swapped[2::4] = data[1::4]
endian_swapped[3::4] = data[0::4]
return bytes(endian_swapped)
if PYCRYPTO:
def __init__(self, key, mode, iv=None):
if mode == self.MODE_CBC:
if iv is None:
raise ValueError("Missing IV")
self.cipher = crypto_Blowfish.new(key, crypto_Blowfish.MODE_CBC,
self.swap_endian(iv))
elif mode == self.MODE_ECB:
self.cipher = crypto_Blowfish.new(key, crypto_Blowfish.MODE_ECB)
else:
raise ValueError("Unknown mode {}".format(mode))
def decrypt(self, data):
""" decrypt data """
return self.swap_endian(self.cipher.decrypt(self.swap_endian(data)))
def encrypt(self, data):
""" encrypt data """
return self.swap_endian(self.cipher.encrypt(self.swap_endian(data)))
else:
def __init__(self, key, mode, iv=None):
if mode == self.MODE_CBC:
if iv is None:
raise ValueError("Missing IV")
self.cipher = crypto_Cipher(crypto_algorithms.Blowfish(key),
crypto_modes.CBC(self.swap_endian(iv)),
backend=crypto_backend())
elif mode == self.MODE_ECB:
self.cipher = crypto_Cipher(crypto_algorithms.Blowfish(key),
crypto_modes.ECB(),
backend=crypto_backend())
else:
raise ValueError("Unknown mode {}".format(mode))
self.decryptor = self.cipher.decryptor()
self.encryptor = self.cipher.encryptor()
def decrypt(self, data):
""" decrypt data """
if len(data) % self.block_size != 0:
raise ValueError("Data length must be multiple of block size")
return self.swap_endian(self.decryptor.update(self.swap_endian(data)))
def encrypt(self, data):
""" encrypt data """
if len(data) % self.block_size != 0:
raise ValueError("Data length must be multiple of block size")
return self.swap_endian(self.encryptor.update(self.swap_endian(data)))
def die(*msg_list):
""" abort program with error message """
error_msg = ' '.join(str(x) for x in msg_list).rstrip("\r\n")
if LOGFILE:
with open(LOGFILE, "a") as errlog:
errlog.write(datetime.now().strftime("%Y-%m-%d %X: ") +
error_msg + "\n")
sys.exit(error_msg)
def write(msg):
""" print to stdout, unbuffered and without adding '\n' """
sys.stdout.write(msg)
sys.stdout.flush()
def md5(data):
""" calculate MD5 hash """
if isinstance(data, str):
data = data.encode('utf-8')
md5_hash = hashlib.md5()
md5_hash.update(data)
return md5_hash.hexdigest().upper()
def parse_file_header(file):
""" parse OTR file header """
hardkey = binascii.a2b_hex('EF3AB29CD19F0CAC5759C7ABD12CC92BA3FE0AFEBF960D63FEBD0F45')
# read header and check magic
header = file.read(FILE_HDR_LEN)
if len(header) != FILE_HDR_LEN or header[:10] != b'OTRKEYFILE':
die("OTRkey file has bad format.")
# decipher header
cipher = BlowfishLE(hardkey, BlowfishLE.MODE_ECB)
header = cipher.decrypt(header[10:])
header_len = header.find(b'&PD=')
if header_len < 0:
die("Corrupted file header: could not find padding.")
try:
header = header[:header_len].decode("ascii")
except UnicodeError:
die("Corrupted file header: invalid data.")
# split header
file_info = {}
for option in header.split("&"):
key_value = option.split("=", 1)
if len(key_value) == 2:
file_info[key_value[0]] = key_value[1]
# check for important keys
for key in ("FN", "FH", "OH", "SZ"):
if key not in file_info:
die("Corrupted file header: key '{}' is missing.".format(key))
return file_info
def generate_bigkey(email, password, utc_date):
""" generate key for sending/receiving requests to the OTR server """
mail_hash = md5(email)
pass_hash = md5(password)
bigkey_hex = mail_hash[0:13] + \
utc_date[0:4] + \
pass_hash[0:11] + \
utc_date[4:6] + \
mail_hash[21:32] + \
utc_date[6:8] + \
pass_hash[19:32]
return binascii.a2b_hex(bigkey_hex)
def generate_request(email, password, utc_date, file_info, bigkey):
""" generate request string """
request = "&A=" + email + \
"&P=" + password + \
"&FN=" + file_info["FN"] + \
"&OH=" + file_info["OH"] + \
"&M=" + md5("Foo Bar") + \
"&OS=" + md5("Windows") + \
"&LN=" + OTR_LANGUAGE + \
"&VN=" + DECODER_VERSION + \
"&IR=TRUE" + \
"&IK=aFzW1tL7nP9vXd8yUfB5kLoSyATQ" + \
"&D="
request += ''.join(random.choice("0123456789ABCDEF") \
for _ in range(512-BlowfishLE.block_size-len(request)))
# encrypt request
bf_iv = bytes(random.randint(0, 255) \
for _ in range(BlowfishLE.block_size))
cipher = BlowfishLE(bigkey, BlowfishLE.MODE_CBC, bf_iv)
request = bf_iv + cipher.encrypt(request.encode('ascii'))
# convert to a HTTP request
request = "/quelle_neu1.php" + \
"?code=" + binascii.b2a_base64(request)[:-1].decode('ascii') + \
"&AA=" + email + \
"&ZZ=" + utc_date
return request
def query_server(email, password, utc_date, file_info, bigkey):
""" send request to OTR server and return its response """
request = generate_request(email, password, utc_date, file_info, bigkey)
try:
# connect to host
conn = HTTPConnection(OTR_HOST)
# send request
headers = {'User-Agent': 'Windows-OTR-Decoder/' + DECODER_VERSION,
'Accept': '*/*'}
conn.request('GET', request, headers=headers)
# get response, check for HTTP errors
resp = conn.getresponse()
response = resp.read()
if resp.status < 200 or resp.status >= 300:
die("HTTP failure: {} - {}".format(resp.status, resp.reason))
except (HTTPException, IOError, OSError) as err:
die("HTTP Exception:", err)
return response
def get_keyphrase(email, password, file_info):
""" get keyphrase from remote OTR server """
# query server for keyphrase
utc_date = datetime.utcnow().strftime("%Y%m%d")
bigkey = generate_bigkey(email, password, utc_date)
response = query_server(email, password, utc_date, file_info, bigkey)
# check for error message
if response[:27] == b'MessageToBePrintedInDecoder':
response = response[27:].lstrip().splitlines()[0]
try:
msg = response.decode('utf-8')
except UnicodeError:
msg = response.decode('latin-1', 'ignore')
die('Server response:', msg)
# decrypt response
try:
response = binascii.a2b_base64(response)
except binascii.Error:
die("Corrupted response: invalid data.")
if len(response) < 2 * BlowfishLE.block_size or \
len(response) % BlowfishLE.block_size != 0:
die("Corrupted response: length must be a multiple of {}."
.format(BlowfishLE.block_size))
bf_iv = response[0:BlowfishLE.block_size]
cipher = BlowfishLE(bigkey, BlowfishLE.MODE_CBC, bf_iv)
response = cipher.decrypt(response[BlowfishLE.block_size:])
# strip padding
resp_len = response.find(b'&D=')
if resp_len < 0:
die("Corrupted response: could not find padding")
try:
response = response[:resp_len].decode("ascii")
except UnicodeError:
die("Corrupted response: invalid data.")
# search for keyphrase
for option in response.split("&"):
key_value = option.split("=", 1)
if len(key_value) == 2 and key_value[0] == "HP":
keyphrase = key_value[1]
break
else:
die("Response lacks keyphrase")
if len(keyphrase) != 56:
die("Keyphrase has wrong length")
try:
keyphrase = binascii.a2b_hex(keyphrase)
except (TypeError, binascii.Error):
die("Keyphrase has invalid data.")
return keyphrase
class ShowProgress():
""" show progress info """
def __init__(self, maximum=100, text="Progress: {:3d}%\r"):
self._maximum = float(maximum)
self._text = text
self._progress = -1
def show(self, current):
""" show current percentage """
_progress = int(0.01 + 100.0 * float(current) / self._maximum)
if _progress != self._progress:
self._progress = _progress
write(self._text.format(_progress))
def end(self):
""" end display of progress info """
self._progress = -1
write("\n")
def verify_file(in_out, file, file_size, file_hash):
""" verify file """
write("Verifying {}...\n".format(in_out.lower()))
# get target MD5
if len(file_hash) != 48:
die("Hash in file header has unexpected format.")
try:
md5_checksum = "".join(file_hash[i:i+2] for i in range(0, 48, 3))
md5_checksum = binascii.a2b_hex(md5_checksum)
except (TypeError, binascii.Error):
die("Hash in file header has unexpected format.")
# read file
md5_hash = hashlib.md5()
file_size = int(file_size) - FILE_HDR_LEN
progress = ShowProgress(file_size)
offset = 0
while True:
buffer = file.read(BUF_SIZE)
if not buffer:
break
offset += len(buffer)
if offset > file_size:
progress.show(file_size)
progress.end()
die(in_out + " file contains trailing garbage.")
md5_hash.update(buffer)
progress.show(offset)
progress.end()
if offset < file_size:
die(in_out + " file is too short.")
# check MD5
if md5_hash.digest() != md5_checksum:
die(in_out + " file verification failed.")
def verify_input(input_file, file_info):
""" verify input file """
input_file.seek(FILE_HDR_LEN, 0)
verify_file("Input", input_file, file_info['SZ'], file_info['OH'])
input_file.seek(FILE_HDR_LEN, 0)
write("Successfully verified.\n")
def verify_output(output_file, file_info):
""" verify output file """
output_file.seek(0, 0)
verify_file("Output", output_file, file_info['SZ'], file_info['FH'])
output_file.seek(0, 2)
def decode_file(input_file, output_file, file_info, keyphrase):
""" decode file """
write("Decoding...\n")
# decode with keyphrase from OTR server
cipher = BlowfishLE(keyphrase, BlowfishLE.MODE_ECB)
file_size = int(file_info['SZ']) - FILE_HDR_LEN
progress = ShowProgress(file_size)
offset = 0
too_long = False
while True:
buffer = input_file.read(BUF_SIZE)
data_len = len(buffer)
if offset + data_len > file_size: # stop at file_size
too_long = True
data_len = file_size - offset
buffer = buffer[:data_len]
if data_len < BUF_SIZE:
break
buffer = cipher.decrypt(buffer)
output_file.write(buffer)
offset += data_len
progress.show(offset)
# last block
if data_len > 0:
crypt_len = data_len - data_len % cipher.block_size
buffer = cipher.decrypt(buffer[:crypt_len]) + buffer[crypt_len:]
output_file.write(buffer)
offset += data_len
progress.show(offset)
progress.end()
# error checks
if too_long:
write("Warning: Input file contains trailing garbage.\n")
elif offset < file_size:
write("Warning: Input file is truncated.\n")
def main(argv):
""" main """
# parse command line
parser = argparse.ArgumentParser(add_help=False, \
description='%(prog)s - decoder for .otrkey files.')
parser.add_argument('-h', '--help', '-?', action='help',
help='prints this screen')
parser.add_argument('-v', action='version',
version="%(prog)s v" + __version__ + \
" (emulates v" + DECODER_VERSION+")",
help='prints version')
parser.add_argument('-i', dest='input', metavar='FILE', required=True,
help='use FILE as input file')
parser.add_argument('-e', dest='email', metavar='EMAIL', required=True,
help='use EMAIL to fetch the key directly from otr')
parser.add_argument('-p', dest='password', metavar='PASSWORD', required=True,
help='use PASSWORD to fetch the key directly from otr')
parser.add_argument('-o', dest='outdir', metavar='OUTDIR', default='.',
help='use OUTDIR as output directory (default: .)')
parser.add_argument('-f', dest='force', action='store_true',
help='force overwriting of output file')
parser.add_argument('-q', dest='verify', action='store_false',
help='don\'t verify input file before processing.')
args = parser.parse_args(argv[1:])
# process file
try:
with open(args.input, "rb") as otr_file:
# read header
otr_info = parse_file_header(otr_file)
out_fname = os.path.join(args.outdir, otr_info['FN'])
if not args.force and os.path.exists(out_fname):
die("Output file already exists.")
# verify input file
if args.verify:
verify_input(otr_file, otr_info)
time.sleep(0.1)
# get keyphrase for decoding rest of file
otr_keyphrase = get_keyphrase(args.email, args.password, otr_info)
# decode and save payload
with open(out_fname, "w+b") as out_file:
decode_file(otr_file, out_file, otr_info, otr_keyphrase)
if args.verify:
time.sleep(0.1)
verify_output(out_file, otr_info)
except (IOError, OSError) as err:
die("I/O Error:", err)
if __name__ == "__main__":
main(sys.argv)