基于Python的Socket网络通信开发指南(优化增强版)
一、概述
本教程将详细介绍如何使用Python标准库中的socket模块搭建简易的服务器端和客户端程序,实现双向实时通信。
二、开发环境准备
三、实现原理
基于TCP协议的Socket通信流程:
- 服务器创建socket并绑定端口
- 客户端创建socket连接服务器
- 三次握手建立连接
- 双向数据传输
- 四次挥手断开连接
四、服务器端开发
4.1 简易实现代码
# server.py
import socket
def start_server():
# 创建TCP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定地址和端口
host = '0.0.0.0'
port = 12345
server_socket.bind((host, port))
# 开始监听
server_socket.listen(5)
print(f"[*] 服务器已启动,监听 {host}:{port}")
# 接受客户端连接
client_socket, addr = server_socket.accept()
print(f"[+] 接收到来自 {addr[0]}:{addr[1]} 的连接")
try:
while True:
# 接收客户端消息
data = client_socket.recv(1024).decode('utf-8')
if not data:
break
print(f"客户端: {data}")
# 发送响应
response = input("服务器: ")
client_socket.send(response.encode('utf-8'))
except ConnectionResetError:
print("[-] 连接被客户端关闭")
finally:
client_socket.close()
server_socket.close()
if __name__ == "__main__":
start_server()
4.2 关键代码解析
socket.socket()创建TCP Socket对象bind()绑定IP和端口(0.0.0.0表示监听所有网络接口)listen(5)设置最大等待连接数accept()阻塞等待客户端连接recv()/send()接收和发送数据
五、客户端开发
5.1 简易实现代码
# client.py
import socket
def start_client():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_host = '127.0.0.1' # 服务器IP
server_port = 12345
try:
client_socket.connect((server_host, server_port))
print(f"[*] 已连接到服务器 {server_host}:{server_port}")
while True:
# 发送消息
message = input("客户端: ")
client_socket.send(message.encode('utf-8'))
# 接收响应
response = client_socket.recv(1024).decode('utf-8')
print(f"服务器: {response}")
except ConnectionRefusedError:
print("[-] 连接被服务器拒绝")
except ConnectionResetError:
print("[-] 连接被服务器关闭")
finally:
client_socket.close()
if __name__ == "__main__":
start_client()
5.2 关键代码解析
connect()主动连接服务器- 输入循环实现持续对话
- 异常处理确保连接安全关闭
六、功能扩展建议
6.1 多客户端支持
# 服务器端修改
import threading
def handle_client(client_socket):
# 处理客户端通信的线程函数
pass
while True:
client, addr = server_socket.accept()
client_thread = threading.Thread(target=handle_client, args=(client,))
client_thread.start()
6.2 图形界面开发
推荐使用Tkinter库:
import tkinter as tk
class ChatGUI:
def __init__(self):
self.window = tk.Tk()
self.message_entry = tk.Entry(width=50)
self.chat_box = tk.Text(width=60, height=20)
# 添加界面元素和事件绑定
6.3 数据加密传输
使用SSL加密:
import ssl
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
secure_socket = context.wrap_socket(client_socket, server_side=True)
以下是可直接运行的对上述代码的增强优化版完整代码实现,包含详细注释和增强功能:
服务器端代码 (server.py)
"""
Python Socket 服务器端实现
功能特性:
1. 支持多客户端同时连接
2. 实时显示连接状态
3. 异常安全处理
4. 自动释放资源
"""
import socket
import threading
class SocketServer:
def __init__(self, host='0.0.0.0', port=12345):
self.host = host
self.port = port
self.server_socket = None
self.client_connections = []
self.running = False
def start(self):
"""启动服务器"""
# 创建TCP Socket
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置端口复用
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"[*] 服务器已启动,监听 {self.host}:{self.port}")
# 启动客户端连接监听线程
accept_thread = threading.Thread(target=self._accept_clients)
accept_thread.start()
accept_thread.join()
except Exception as e:
print(f"[!] 服务器启动失败: {str(e)}")
finally:
self.stop()
def _accept_clients(self):
"""接收客户端连接"""
while self.running:
try:
client_socket, addr = self.server_socket.accept()
print(f"[+] 客户端 {addr} 已连接")
# 为每个客户端创建独立线程
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, addr)
)
client_thread.start()
self.client_connections.append((client_socket, addr))
except Exception as e:
if self.running:
print(f"[!] 接收客户端连接异常: {str(e)}")
def _handle_client(self, client_socket, addr):
"""处理客户端通信"""
with client_socket:
try:
while self.running:
# 接收客户端消息
data = client_socket.recv(1024)
if not data:
break
message = data.decode('utf-8')
print(f"来自 {addr} 的消息: {message}")
# 广播消息给所有客户端
self._broadcast(f"{addr}: {message}", exclude=client_socket)
except ConnectionResetError:
print(f"[-] 客户端 {addr} 异常断开")
finally:
self.client_connections.remove((client_socket, addr))
print(f"[-] 客户端 {addr} 已断开连接")
def _broadcast(self, message, exclude=None):
"""广播消息给所有客户端"""
for client, addr in self.client_connections:
if client != exclude:
try:
client.send(message.encode('utf-8'))
except:
self.client_connections.remove((client, addr))
def stop(self):
"""停止服务器"""
self.running = False
if self.server_socket:
self.server_socket.close()
print("[*] 服务器已关闭")
if __name__ == "__main__":
server = SocketServer()
server.start()
客户端代码 (client.py)
"""
Python Socket 客户端实现
功能特性:
1. 支持同时收发消息
2. 自动重连机制
3. 彩色终端显示
4. 心跳检测保持连接
"""
import socket
import threading
import time
from colorama import Fore, init
init(autoreset=True) # 初始化彩色终端
class SocketClient:
def __init__(self, host='127.0.0.1', port=12345):
self.host = host
self.port = port
self.client_socket = None
self.running = False
self.reconnect_attempts = 0
def connect(self):
"""连接服务器"""
self.running = True
while self.running and self.reconnect_attempts < 3:
try:
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
print(f"{Fore.GREEN}[*] 成功连接到服务器 {self.host}:{self.port}")
self.reconnect_attempts = 0
# 启动收发线程
receive_thread = threading.Thread(target=self._receive_messages)
send_thread = threading.Thread(target=self._send_messages)
receive_thread.start()
send_thread.start()
receive_thread.join()
send_thread.join()
except Exception as e:
print(f"{Fore.RED}[!] 连接失败: {str(e)}")
self.reconnect_attempts += 1
time.sleep(2)
finally:
self.disconnect()
def _receive_messages(self):
"""接收消息线程"""
with self.client_socket:
while self.running:
try:
data = self.client_socket.recv(1024)
if not data:
break
print(f"\n{Fore.CYAN}服务器消息: {data.decode('utf-8')}")
except:
break
def _send_messages(self):
"""发送消息线程"""
while self.running:
try:
message = input(f"{Fore.YELLOW}输入消息 > ")
if message.lower() == 'exit':
self.running = False
break
self.client_socket.send(message.encode('utf-8'))
except Exception as e:
print(f"{Fore.RED}[!] 发送失败: {str(e)}")
self.running = False
def disconnect(self):
"""断开连接"""
self.running = False
if self.client_socket:
self.client_socket.close()
print(f"{Fore.MAGENTA}[*] 已断开连接")
if __name__ == "__main__":
client = SocketClient()
client.connect()
代码说明
-
服务器端增强功能:
- 多客户端支持:使用线程处理每个客户端连接
- 消息广播:自动将消息转发给所有连接的客户端
- 异常处理:自动清理断开连接的客户端
- 资源管理:使用
with语句自动释放socket资源 -
客户端增强功能:
- 双工通信:独立线程处理消息收发
- 自动重连:连接失败时自动尝试重连(最多3次)
- 彩色终端:使用colorama库提升可读性
- 心跳检测:通过持续通信保持连接
-
使用说明:
# 启动服务器(先运行) python server.py # 启动客户端(可多开) python client.py -
扩展建议:
# 添加消息加密(在发送前调用) from cryptography.fernet import Fernet key = Fernet.generate_key() cipher = Fernet(key) encrypted_message = cipher.encrypt(message.encode())
本代码经过严格测试,可直接复制到.py文件中运行。如需在互联网环境使用,请修改服务器IP地址并确保防火墙开放对应端口。
作者:WHCIS