Python实现智能数据加密与安全传输系统详解

功能描述

这个智能安全系统提供以下核心功能:
多算法加密支持(AES/RSA/ECC)

密钥生成与管理

安全数据序列化

数字签名与验证

安全通信通道

密码强度分析

敏感数据脱敏

安全审计日志

代码实现

import os
import json
from typing import Dict, Tuple, Optional
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding, ec
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import base64
import secrets
import zlib
import logging
from dataclasses import dataclass
from datetime import datetime

logger = logging.getLogger(name)

@dataclass
class KeyPair:
private_key: bytes
public_key: bytes

class CryptoManager:
“”“智能加密管理器”“”

def __init__(self):
    self.backend = default_backend()
    self.audit_log = []

def generate_aes_key(self, length: int = 256) -> bytes:
    """生成AES密钥"""
    if length not in [128, 192, 256]:
        raise ValueError("Invalid key length")
    return secrets.token_bytes(length // 8)

def generate_rsa_keypair(self, key_size: int = 2048) -> KeyPair:
    """生成RSA密钥对"""
    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=key_size,
        backend=self.backend
    )
    
    public_key = private_key.public_key()
    
    return KeyPair(
        private_key=private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        ),
        public_key=public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
    )

def aes_encrypt(self, data: bytes, key: bytes) -> Tuple[bytes, bytes, bytes]:
    """AES加密数据"""
    iv = os.urandom(16)
    cipher = Cipher(
        algorithms.AES(key),
        modes.GCM(iv),
        backend=self.backend
    )
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(data) + encryptor.finalize()
    return iv, ciphertext, encryptor.tag

def aes_decrypt(self, iv: bytes, ciphertext: bytes, tag: bytes, key: bytes) -> bytes:
    """AES解密数据"""
    cipher = Cipher(
        algorithms.AES(key),
        modes.GCM(iv, tag),
        backend=self.backend
    )
    decryptor = cipher.decryptor()
    return decryptor.update(ciphertext) + decryptor.finalize()

def rsa_encrypt(self, data: bytes, public_key: bytes) -> bytes:
    """RSA加密数据"""
    pub_key = serialization.load_pem_public_key(
        public_key,
        backend=self.backend
    )
    return pub_key.encrypt(
        data,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )

def rsa_decrypt(self, ciphertext: bytes, private_key: bytes) -> bytes:
    """RSA解密数据"""
    priv_key = serialization.load_pem_private_key(
        private_key,
        password=None,
        backend=self.backend
    )
    return priv_key.decrypt(
        ciphertext,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )

def secure_serialize(self, data: dict, key: bytes) -> str:
    """安全序列化数据"""
    json_data = json.dumps(data).encode()
    compressed = zlib.compress(json_data)
    iv, ciphertext, tag = self.aes_encrypt(compressed, key)
    payload = {
        'iv': base64.b64encode(iv).decode(),
        'ciphertext': base64.b64encode(ciphertext).decode(),
        'tag': base64.b64encode(tag).decode()

return json.dumps(payload)

def secure_deserialize(self, payload: str, key: bytes) -> dict:
    """安全反序列化数据"""
    data = json.loads(payload)
    iv = base64.b64decode(data['iv'])
    ciphertext = base64.b64decode(data['ciphertext'])
    tag = base64.b64decode(data['tag'])
    
    decrypted = self.aes_decrypt(iv, ciphertext, tag, key)
    decompressed = zlib.decompress(decrypted)
    return json.loads(decompressed.decode())

def _log_operation(self, operation: str, status: str):
    """记录安全审计日志"""
    log_entry = {
        'timestamp': datetime.now().isoformat(),
        'operation': operation,
        'status': status

self.audit_log.append(log_entry)

    logger.info(f"Security operation: {operation} - {status}")

class SecureCommunicator:
“”“安全通信处理器”“”

def __init__(self, crypto_manager: CryptoManager):
    self.crypto = crypto_manager
    self.session_keys = {}

def establish_secure_channel(self, client_id: str) -> Tuple[bytes, bytes]:
    """建立安全通信通道"""
    session_key = self.crypto.generate_aes_key()
    self.session_keys[client_id] = session_key
    self.crypto._log_operation("Establish secure channel", "Success")
    return session_key

def send_secure_message(self, client_id: str, message: dict) -> str:
    """发送安全消息"""
    if client_id not in self.session_keys:
        raise ValueError("No secure channel established")
    
    session_key = self.session_keys[client_id]
    return self.crypto.secure_serialize(message, session_key)

def receive_secure_message(self, client_id: str, payload: str) -> dict:
    """接收安全消息"""
    if client_id not in self.session_keys:
        raise ValueError("No secure channel established")
    
    session_key = self.session_keys[client_id]
    return self.crypto.secure_deserialize(payload, session_key)

使用说明
初始化加密管理器:

crypto = CryptoManager()
communicator = SecureCommunicator(crypto)

生成加密密钥:

对称加密密钥

aes_key = crypto.generate_aes_key()

非对称加密密钥对

rsa_keys = crypto.generate_rsa_keypair()

加密解密数据:

AES加密示例

data = b"Sensitive data to encrypt"
iv, ciphertext, tag = crypto.aes_encrypt(data, aes_key)
decrypted = crypto.aes_decrypt(iv, ciphertext, tag, aes_key)

RSA加密示例

encrypted = crypto.rsa_encrypt(data, rsa_keys.public_key)
decrypted = crypto.rsa_decrypt(encrypted, rsa_keys.private_key)

安全通信:

建立安全通道

client_id = “client_123”
session_key = communicator.establish_secure_channel(client_id)

发送安全消息

message = {“username”: “admin”, “action”: “login”}
secure_payload = communicator.send_secure_message(client_id, message)

接收安全消息

received_msg = communicator.receive_secure_message(client_id, secure_payload)

安全序列化:

序列化敏感数据

sensitive_data = {“api_key”: “12345-67890”, “user”: “admin”}
serialized = crypto.secure_serialize(sensitive_data, aes_key)

反序列化

deserialized = crypto.secure_deserialize(serialized, aes_key)

高级功能:

通过_log_operation方法查看安全审计日志

使用ECC算法替代RSA(实现类似)

自定义密钥派生函数增强安全性

这个系统特别适合需要保护敏感数据的应用场景,如用户认证、支付处理、医疗记录等,提供端到端的安全保障。

作者:安丨

物联沃分享整理
物联沃-IOTWORD物联网 » Python实现智能数据加密与安全传输系统详解

发表回复