Modified lsb_rep to add a global supported ext dict. Sorted bit pos list, added else block for some loops. Changed max session size.

This commit is contained in:
devoalda 2023-05-31 07:46:04 +08:00
parent 129ecce981
commit 68c4f85cee
1 changed files with 44 additions and 14 deletions

View File

@ -6,12 +6,22 @@ import sys
import wave import wave
import warnings import warnings
MAX_SESSION_SIZE = 4093 MAX_SESSION_SIZE = 4096 * 2
WORKING_PATH = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "upload") + os.sep WORKING_PATH = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "upload") + os.sep
# Supported file extensions
supported_files = {
"txt_files": ["txt", "py", "sql", "html", "css", "js", "php", "c", "cpp", "java", "json", "xml", "yml", "md"],
"gen_files": ["mp3", "mp4"],
"doc_files": ["docx", "xlsx", "pptx"],
"wav_files": ["wav"],
"image_files": ["bmp", "png"]
}
app = Flask(__name__, template_folder='views') app = Flask(__name__, template_folder='views')
app.secret_key = 'b3a5e8d11fb3d8d3647b6cf2e51ad768' app.secret_key = 'b3a5e8d11fb3d8d3647b6cf2e51ad768'
def session_clear(): def session_clear():
if len(session) > 0: if len(session) > 0:
session.clear() session.clear()
@ -22,28 +32,32 @@ def modeSelection():
session_clear() session_clear()
return render_template('mode_selection.html') return render_template('mode_selection.html')
@app.route('/encode') @app.route('/encode')
def encode(): def encode():
session_clear() session_clear()
return render_template('encode.html') return render_template('encode.html')
@app.route("/encoding", methods=['POST']) @app.route("/encoding", methods=['POST'])
def encoding(): def encoding():
try: try:
file = request.files['origin'] file = request.files['origin']
b2c = [int(x) for x in request.form.getlist("b2c")] b2c = [int(x) for x in request.form.getlist("b2c")]
# Sort b2c in ascending order
b2c.sort()
payload = request.form['payload'] payload = request.form['payload']
if file.filename != "": if file.filename != "":
file.save(WORKING_PATH + file.filename) file.save(WORKING_PATH + file.filename)
file_extension = os.path.splitext(file.filename)[1][1:] file_extension = os.path.splitext(file.filename)[1][1:]
if file_extension in ["bmp", "png"]: if file_extension in supported_files["image_files"]:
encode = img_steg.img_steg(WORKING_PATH + file.filename, b2c).encode(payload) encode = img_steg.img_steg(WORKING_PATH + file.filename, b2c).encode(payload)
cv2.imwrite(WORKING_PATH + "encoded_" + file.filename, encode) cv2.imwrite(WORKING_PATH + "encoded_" + file.filename, encode)
session['image'] = file.filename session['image'] = file.filename
session['image2'] = "encoded_" + file.filename session['image2'] = "encoded_" + file.filename
elif file_extension == "wav": elif file_extension in supported_files["wav_files"]:
encode = wav_steg.wav_steg(WORKING_PATH + file.filename, b2c).encode(payload) encode = wav_steg.wav_steg(WORKING_PATH + file.filename, b2c).encode(payload)
# Write encoded data to file # Write encoded data to file
@ -55,20 +69,23 @@ def encoding():
new_wav_file.close() new_wav_file.close()
session['wav'] = file.filename session['wav'] = file.filename
session['wav2'] = "encoded_" + file.filename session['wav2'] = "encoded_" + file.filename
elif file_extension == "txt": elif file_extension in supported_files["txt_files"]:
encoded_data = txt_steg.txt_steg(WORKING_PATH + file.filename, b2c).encode(payload) encoded_data = txt_steg.txt_steg(WORKING_PATH + file.filename, b2c).encode(payload)
with open(os.path.join(WORKING_PATH, "encoded_" + file.filename), "w") as f: with open(os.path.join(WORKING_PATH, "encoded_" + file.filename), "w") as f:
f.write(encoded_data) f.write(encoded_data)
session['txt'] = file.filename session['txt'] = file.filename
session['txt2'] = "encoded_" + file.filename session['txt2'] = "encoded_" + file.filename
elif file_extension in ["mp3", "mp4", "docx", "xlsx", "pptx"]: elif file_extension in supported_files["gen_files"] + supported_files["doc_files"]:
encoded_data = file_steg.file_steg(WORKING_PATH + file.filename, b2c).encode(payload) encoded_data = file_steg.file_steg(WORKING_PATH + file.filename, b2c).encode(payload)
with open(os.path.join(WORKING_PATH, "encoded_" + file.filename), "wb") as f: with open(os.path.join(WORKING_PATH, "encoded_" + file.filename), "wb") as f:
f.write(encoded_data) f.write(encoded_data)
if file_extension in ["docx", "xlsx", "pptx"]: if file_extension in supported_files["doc_files"]:
file_extension = "document" file_extension = "document"
session[file_extension] = file.filename session[file_extension] = file.filename
session[file_extension + '_2'] = "encoded_" + file.filename session[file_extension + '_2'] = "encoded_" + file.filename
else:
print(f"Unsupported file extension: {file_extension}")
return redirect("/unsupported")
return redirect("/encode_result") return redirect("/encode_result")
except Exception as exception: except Exception as exception:
@ -78,6 +95,7 @@ def encoding():
print("ENCODE(warning):", warning) print("ENCODE(warning):", warning)
return redirect("/unsupported") return redirect("/unsupported")
@app.route('/encode_result') @app.route('/encode_result')
def encode_result(): def encode_result():
if len(session) > 0: if len(session) > 0:
@ -85,14 +103,15 @@ def encode_result():
else: else:
return redirect("/encode") return redirect("/encode")
@app.route('/decode') @app.route('/decode')
def decode(): def decode():
session_clear() session_clear()
return render_template('decode.html') return render_template('decode.html')
@app.route("/decoding", methods=['POST']) @app.route("/decoding", methods=['POST'])
def decoding(): def decoding():
def decode_files(class_name, _path, b2c, ext): def decode_files(class_name, _path, b2c, ext):
class_name_str = class_name.__name__.split(".")[-1] class_name_str = class_name.__name__.split(".")[-1]
method = getattr(class_name, class_name_str) method = getattr(class_name, class_name_str)
@ -100,8 +119,10 @@ def decoding():
if class_name_str == "img_steg": if class_name_str == "img_steg":
session["image"] = file.filename session["image"] = file.filename
elif ext in ["docx", "pptx", "xlsx"]: elif ext in supported_files["doc_files"]:
session["document"] = file.filename session["document"] = file.filename
elif ext in supported_files["txt_files"]:
session["txt"] = file.filename
else: else:
session[ext] = file.filename session[ext] = file.filename
@ -113,20 +134,25 @@ def decoding():
try: try:
file = request.files['encoded_file'] file = request.files['encoded_file']
b2c = [int(x) for x in request.form.getlist("b2c")] b2c = [int(x) for x in request.form.getlist("b2c")]
# Sort b2c in ascending order
b2c.sort()
if file.filename != "": if file.filename != "":
_path = WORKING_PATH + file.filename _path = WORKING_PATH + file.filename
file.save(_path) file.save(_path)
file_extension = os.path.splitext(file.filename)[1][1:] file_extension = os.path.splitext(file.filename)[1][1:]
if file_extension in ["bmp", "png"]: if file_extension in supported_files["image_files"]:
decode_files(img_steg, _path, b2c, file_extension) decode_files(img_steg, _path, b2c, file_extension)
elif file_extension == "wav": elif file_extension in supported_files["wav_files"]:
decode_files(wav_steg, _path, b2c, file_extension) decode_files(wav_steg, _path, b2c, file_extension)
elif file_extension == "txt": elif file_extension in supported_files["txt_files"]:
decode_files(txt_steg, _path, b2c, file_extension) decode_files(txt_steg, _path, b2c, file_extension)
elif file_extension in ["mp3", "mp4", "docx", "xlsx", "pptx"]: elif file_extension in supported_files["gen_files"] + supported_files["doc_files"]:
decode_files(file_steg, _path, b2c, file_extension) decode_files(file_steg, _path, b2c, file_extension)
else:
print(f"Unsupported file extension: {file_extension}")
return redirect("/unsupported")
return redirect("/decode_result") return redirect("/decode_result")
except Exception as exception: except Exception as exception:
@ -136,6 +162,7 @@ def decoding():
print("DECODE(warning):", warning) print("DECODE(warning):", warning)
return redirect("/unsupported") return redirect("/unsupported")
@app.route('/decode_result') @app.route('/decode_result')
def decode_result(): def decode_result():
if len(session) > 0: if len(session) > 0:
@ -143,10 +170,12 @@ def decode_result():
else: else:
return redirect("/decode") return redirect("/decode")
@app.route('/unsupported') @app.route('/unsupported')
def unsupported(): def unsupported():
return render_template('unsupported.html') return render_template('unsupported.html')
@app.route('/get_session') @app.route('/get_session')
def get_session(): def get_session():
session_data = dict(session) session_data = dict(session)
@ -157,6 +186,7 @@ def get_session():
def upload(filename): def upload(filename):
return send_from_directory('upload', filename) return send_from_directory('upload', filename)
if __name__ == "__main__": if __name__ == "__main__":
app.debug = True app.debug = True
app.run(host="localhost", port=8000) app.run(host="localhost", port=8000)