from flask import Flask, render_template, request, redirect, session, send_from_directory from lib.steganography import img_steg, wav_steg, txt_steg, file_steg import cv2 import os import sys import wave import warnings MAX_SESSION_SIZE = 4093 WORKING_PATH = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "upload") + os.sep app = Flask(__name__, template_folder='views') app.secret_key = 'b3a5e8d11fb3d8d3647b6cf2e51ad768' def session_clear(): if len(session) > 0: session.clear() @app.route('/') def modeSelection(): session_clear() return render_template('mode_selection.html') @app.route('/encode') def encode(): session_clear() return render_template('encode.html') @app.route("/encoding", methods=['POST']) def encoding(): try: file = request.files['origin'] b2c = [int(x) for x in request.form.getlist("b2c")] payload = request.form['payload'] if file.filename != "": file.save(WORKING_PATH + file.filename) file_extension = os.path.splitext(file.filename)[1][1:] if file_extension in ["bmp", "png"]: encode = img_steg.img_steg(WORKING_PATH + file.filename, b2c).encode(payload) cv2.imwrite(WORKING_PATH + "encoded_" + file.filename, encode) session['image'] = file.filename session['image2'] = "encoded_" + file.filename elif file_extension == "wav": encode = wav_steg.wav_steg(WORKING_PATH + file.filename, b2c).encode(payload) # Write encoded data to file new_wav_file = wave.open(WORKING_PATH + "encoded_" + file.filename, "wb") new_wav_file.setnchannels(encode["num_channels"]) new_wav_file.setsampwidth(encode["sample_width"]) new_wav_file.setframerate(encode["frame_rate"]) new_wav_file.writeframes(encode["num_frames"]) new_wav_file.close() session['wav'] = file.filename session['wav2'] = "encoded_" + file.filename elif file_extension == "txt": encoded_data = txt_steg.txt_steg(WORKING_PATH + file.filename, b2c).encode(payload) with open(os.path.join(WORKING_PATH, "encoded_" + file.filename), "w") as f: f.write(encoded_data) session['txt'] = file.filename session['txt2'] = "encoded_" + file.filename elif file_extension in ["mp3", "mp4", "docx", "xlsx", "pptx"]: encoded_data = file_steg.file_steg(WORKING_PATH + file.filename, b2c).encode(payload) with open(os.path.join(WORKING_PATH, "encoded_" + file.filename), "wb") as f: f.write(encoded_data) if file_extension in ["docx", "xlsx", "pptx"]: file_extension = "document" session[file_extension] = file.filename session[file_extension + '_2'] = "encoded_" + file.filename return redirect("/encode_result") except Exception as exception: print("ENCODE(Exception):", exception) return redirect("/unsupported") except UserWarning as warning: print("ENCODE(warning):", warning) return redirect("/unsupported") @app.route('/encode_result') def encode_result(): if len(session) > 0: return render_template("encode_result.html") else: return redirect("/encode") @app.route('/decode') def decode(): session_clear() return render_template('decode.html') @app.route("/decoding", methods=['POST']) def decoding(): def decode_files(class_name, _path, b2c, ext): class_name_str = class_name.__name__.split(".")[-1] method = getattr(class_name, class_name_str) payload = method(_path, b2c).decode() if class_name_str == "img_steg": session["image"] = file.filename elif ext in ["docx", "pptx", "xlsx"]: session["document"] = file.filename else: session[ext] = file.filename if sys.getsizeof(payload) <= MAX_SESSION_SIZE: session["payload"] = payload else: raise Exception("Payload too big") try: file = request.files['encoded_file'] b2c = [int(x) for x in request.form.getlist("b2c")] if file.filename != "": _path = WORKING_PATH + file.filename file.save(_path) file_extension = os.path.splitext(file.filename)[1][1:] if file_extension in ["bmp", "png"]: decode_files(img_steg, _path, b2c, file_extension) elif file_extension == "wav": decode_files(wav_steg, _path, b2c, file_extension) elif file_extension == "txt": decode_files(txt_steg, _path, b2c, file_extension) elif file_extension in ["mp3", "mp4", "docx", "xlsx", "pptx"]: decode_files(file_steg, _path, b2c, file_extension) return redirect("/decode_result") except Exception as exception: print("DECODE(Exception):", exception) return redirect("/unsupported") except UserWarning as warning: print("DECODE(warning):", warning) return redirect("/unsupported") @app.route('/decode_result') def decode_result(): if len(session) > 0: return render_template("decode_result.html") else: return redirect("/decode") @app.route('/unsupported') def unsupported(): return render_template('unsupported.html') @app.route('/get_session') def get_session(): session_data = dict(session) return session_data @app.route('/upload/') def upload(filename): return send_from_directory('upload', filename) if __name__ == "__main__": app.debug = True app.run(host="localhost", port=8000)