This repository has been archived by the owner on Feb 5, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
4aec82b
commit 39a1cd7
Showing
6 changed files
with
415 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
from Crypto.PublicKey import RSA | ||
from Crypto.Cipher import PKCS1_OAEP | ||
from cryptography.hazmat.primitives import hashes, serialization | ||
from cryptography.hazmat.primitives.asymmetric import padding | ||
from cryptography.hazmat.backends import default_backend | ||
import os | ||
from constants import * | ||
from config import * | ||
|
||
|
||
# 生成RSA密钥对 | ||
def generate_key_pair(): | ||
key = RSA.generate(4096) # 4096位的RSA密钥对 | ||
private_key = key.export_key() | ||
public_key = key.publickey().export_key() | ||
return private_key, public_key | ||
|
||
|
||
# 使用公钥加密消息 | ||
def encrypt_with_public_key(message, public_key): | ||
key = RSA.import_key(public_key) | ||
cipher = PKCS1_OAEP.new(key) | ||
encrypted_message = cipher.encrypt(message.encode()) | ||
return encrypted_message | ||
|
||
|
||
# 使用私钥解密消息 | ||
def decrypt_with_private_key(encrypted_message_str, private_key): | ||
encrypted_message = eval(encrypted_message_str) | ||
key = RSA.import_key(private_key) | ||
cipher = PKCS1_OAEP.new(key) | ||
decrypted_message = cipher.decrypt(encrypted_message).decode() | ||
return decrypted_message | ||
|
||
|
||
# 使用私钥签名消息 | ||
def sign_message(message, private_key_bytes): | ||
private_key = serialization.load_pem_private_key( | ||
private_key_bytes, | ||
password=None, | ||
backend=default_backend() | ||
) | ||
signature = private_key.sign( | ||
message.encode('utf-8'), # 将消息编码为字节 | ||
padding.PSS( | ||
mgf=padding.MGF1(hashes.SHA256()), | ||
salt_length=padding.PSS.MAX_LENGTH | ||
), | ||
hashes.SHA256() | ||
) | ||
return signature | ||
|
||
|
||
# 使用公钥验证签名 | ||
def verify_signature(message, signature, public_key_bytes): | ||
public_key = serialization.load_pem_public_key( | ||
public_key_bytes, | ||
backend=default_backend() | ||
) | ||
try: | ||
public_key.verify( | ||
signature, | ||
message.encode('utf-8'), # 将消息编码为字节 | ||
padding.PSS( | ||
mgf=padding.MGF1(hashes.SHA256()), | ||
salt_length=padding.PSS.MAX_LENGTH | ||
), | ||
hashes.SHA256() | ||
) | ||
return True | ||
except Exception as e: | ||
print("Signature is invalid:", str(e)) | ||
return False | ||
|
||
|
||
# 检查是否有私钥文件,如果没有生成私钥,并保存在本地,如果有读取并返回 | ||
def load_private_key(key_name): | ||
private_key_path = key_path + key_name + '_private_key.pem' | ||
public_key_path = key_path + key_name + '_public_key.pem' | ||
if not os.path.exists(private_key_path): | ||
private_key, public_key = generate_key_pair() | ||
with open(private_key_path, "wb") as f: | ||
f.write(private_key) | ||
with open(public_key_path, "wb") as f: | ||
f.write(public_key) | ||
return private_key | ||
else: | ||
with open(private_key_path, "rb") as f: | ||
private_key = f.read() | ||
return private_key | ||
|
||
|
||
# 读取公钥 | ||
def load_public_key(key_name): | ||
public_key_path = key_path + key_name + '_public_key.pem' | ||
with open(public_key_path, "rb") as f: | ||
public_key = f.read() | ||
return public_key | ||
|
||
def start_private_key(): | ||
private_key_name =search_config("private_key_name") | ||
if not (private_key_name== None): | ||
private_key =load_private_key(search_config("private_key_name")) | ||
else: | ||
print("找不到密钥,现在创建密钥") | ||
key_name = input("请输入密钥名称:") | ||
private_key = load_private_key(key_name) | ||
save_config("private_key_name", key_name) | ||
return private_key |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import os | ||
import csv | ||
from constants import * | ||
|
||
|
||
def config_check(): | ||
config_file_path = config_path + 'config.csv' | ||
if not os.path.exists(config_file_path): | ||
# 如果文件不存在,创建文件 | ||
with open(config_file_path, 'w', newline='') as csvfile: | ||
fieldnames = ['key', 'value'] | ||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | ||
writer.writeheader() | ||
return True | ||
|
||
|
||
# 保存项目与值,如果遇到相同的项目就覆写 | ||
def save_config(key, value): | ||
config_file_path = config_path + 'config.csv' | ||
|
||
# 读取现有配置 | ||
existing_config = {} | ||
if os.path.exists(config_file_path): | ||
with open(config_file_path, 'r') as csvfile: | ||
reader = csv.DictReader(csvfile) | ||
existing_config = {row['key']: row['value'] for row in reader} | ||
|
||
# 更新或添加新的配置项 | ||
existing_config[key] = value | ||
|
||
# 写入配置到文件 | ||
with open(config_file_path, 'w', newline='') as csvfile: | ||
fieldnames = ['key', 'value'] | ||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | ||
writer.writeheader() | ||
for key, value in existing_config.items(): | ||
writer.writerow({'key': key, 'value': value}) | ||
|
||
|
||
def read_config(): | ||
config_file_path = config_path + 'config.csv' | ||
|
||
config_data = {} | ||
if os.path.exists(config_file_path): | ||
with open(config_file_path, 'r') as csvfile: | ||
reader = csv.DictReader(csvfile) | ||
for row in reader: | ||
config_data[row['key']] = row['value'] | ||
|
||
return config_data | ||
|
||
|
||
def search_config(key): | ||
config_file_path = config_path + 'config.csv' | ||
|
||
if os.path.exists(config_file_path): | ||
with open(config_file_path, 'r') as csvfile: | ||
reader = csv.DictReader(csvfile) | ||
for row in reader: | ||
if row['key'] == key: | ||
return row['value'] | ||
|
||
# 如果键不存在,返回None或者你认为合适的默认值 | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
key_path = "keys/" | ||
config_path = "config/" | ||
input_path = "input/" | ||
output_path = "output/" | ||
all_folders = ("keys", "config", "input", "output") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
from utils import * | ||
from RSA import * | ||
from config import * | ||
|
||
|
||
def encode_message2img(private_key,public_key): | ||
image_name = input("请输入图片名称:") | ||
encrypted_image = ImageProcessor("encrypted_image",image_name) | ||
message = input("请输入要加密的信息:") | ||
encrypted_message = encrypt_with_public_key(message, public_key) | ||
#print("加密后的信息:", encrypted_message) | ||
zero_count, one_count = encrypted_image.image_decode() | ||
#print(f"0的数量:{zero_count}") | ||
#print(f"1的数量:{one_count}") | ||
temp_file = encrypted_image.name + "_PBE.tem" | ||
with open(temp_file, 'r') as file: | ||
binary_ends = file.read() # 读取文件 | ||
binary_message = encode_massage(encrypted_message) | ||
binary_ends = overwrite_binary(binary_ends, binary_message) | ||
with open(temp_file, 'w') as file: | ||
file.write(binary_ends) | ||
encrypted_image.image_encode() | ||
|
||
|
||
def decode_img2message(private_key,public_key): | ||
image_name = input("请输入图片名称:") | ||
decrypted_image = ImageProcessor("decrypted_image",image_name) | ||
zero_count, one_count = decrypted_image.image_decode() | ||
#print(f"0的数量:{zero_count}") | ||
#print(f"1的数量:{one_count}") | ||
temp_file = decrypted_image.name + "_PBE.tem" | ||
with open(temp_file, 'r') as file: | ||
binary_ends = file.read() # 读取文件 | ||
binary_message =read_binary_message(binary_ends) | ||
encrypted_message = decode_massage(binary_message) | ||
decrypted_message = decrypt_with_private_key(encrypted_message, private_key) | ||
print(decrypted_message) | ||
|
||
|
||
def OptionMenu(): | ||
private_key = start_private_key() | ||
print("请选择操作:") | ||
print("1.加密图片") | ||
print("2.解密图片") | ||
option = int(input("请输入选项:")) | ||
if option == 1: | ||
public_key_name = input("请选择公钥:") | ||
public_key=load_public_key(public_key_name) | ||
encode_message2img(private_key,public_key) | ||
print("图片保存在output文件夹") | ||
elif option == 2: | ||
public_key_name = input("请选择公钥:") | ||
public_key=load_public_key(public_key_name) | ||
decode_img2message(private_key,public_key) | ||
else: | ||
print("无效的选项") | ||
|
||
|
||
def main(): | ||
start_folder_check() | ||
config_check() | ||
OptionMenu() | ||
|
||
|
||
#encode_message2img() | ||
#decode_img2message() | ||
delete_temp_files() | ||
|
||
|
||
|
||
|
||
|
||
if __name__ == '__main__': | ||
main() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
cryptography | ||
Pillow |
Oops, something went wrong.