网络编程是现代软件开发中的重要技能。Python提供了强大的网络编程能力,从简单的网络通信到复杂的网络应用都能胜任。
Socket(套接字)是网络通信的基础工具。可以把Socket想象成两个程序之间的电话线:一个程序拨打电话,另一个程序接听电话,这样它们就能相互通信了。
在Python中,使用socket模块来创建网络连接。
import socket
# 创建TCP Socket
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 创建UDP Socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("Socket创建成功")参数说明:
AF_INET:使用IPv4地址
SOCK_STREAM:TCP协议,可靠连接
SOCK_DGRAM:UDP协议,无连接
| 函数 | 描述 |
|---|---|
| 服务器端套接字 | |
| s.bind() | 绑定地址(host,port)到套接字, 在AF_INET下,以元组(host,port)的形式表示地址。 |
| s.listen() | 开始TCP监听。backlog指定在拒绝连接之前,操作系统可以挂起的最大连接数量。该值至少为1,大部分应用程序设为5就可以了。 |
| s.accept() | 被动接受TCP客户端连接,(阻塞式)等待连接的到来 |
| 客户端套接字 | |
| s.connect() | 主动初始化TCP服务器连接,。一般address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。 |
| s.connect_ex() | connect()函数的扩展版本,出错时返回出错码,而不是抛出异常 |
| 公共用途的套接字函数 | |
| s.recv() | 接收TCP数据,数据以字符串形式返回,bufsize指定要接收的最大数据量。flag提供有关消息的其他信息,通常可以忽略。 |
| s.send() | 发送TCP数据,将string中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于string的字节大小。 |
| s.sendall() | 完整发送TCP数据。将string中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回None,失败则抛出异常。 |
| s.recvfrom() | 接收UDP数据,与recv()类似,但返回值是(data,address)。其中data是包含接收数据的字符串,address是发送数据的套接字地址。 |
| s.sendto() | 发送UDP数据,将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。 |
| s.close() | 关闭套接字 |
| s.getpeername() | 返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)。 |
| s.getsockname() | 返回套接字自己的地址。通常是一个元组(ipaddr,port) |
| s.setsockopt(level,optname,value) | 设置给定套接字选项的值。 |
| s.getsockopt(level,optname[.buflen]) | 返回套接字选项的值。 |
| s.settimeout(timeout) | 设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect()) |
| s.gettimeout() | 返回当前超时期的值,单位是秒,如果没有设置超时期,则返回None。 |
| s.fileno() | 返回套接字的文件描述符。 |
| s.setblocking(flag) | 如果 flag 为 False,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。非阻塞模式下,如果调用 recv() 没有发现任何数据,或 send() 调用无法立即发送数据,那么将引起 socket.error 异常。 |
| s.makefile() | 创建一个与该套接字相关连的文件 |
TCP服务器监听客户端连接,接收和发送数据。
import socket
def start_tcp_server(host='localhost', port=8888):
"""启动TCP服务器"""
# 创建TCP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 允许地址重用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址和端口
server_socket.bind((host, port))
# 开始监听,最多5个等待连接
server_socket.listen(5)
print(f"服务器启动在 {host}:{port}")
try:
while True:
# 等待客户端连接
client_socket, client_address = server_socket.accept()
print(f"客户端连接来自: {client_address}")
try:
# 接收客户端数据
data = client_socket.recv(1024).decode('utf-8')
print(f"收到消息: {data}")
# 发送响应
response = f"服务器已收到你的消息: {data}"
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"处理客户端数据时出错: {e}")
finally:
# 关闭客户端连接
client_socket.close()
print(f"与 {client_address} 的连接已关闭")
except KeyboardInterrupt:
print("服务器关闭")
finally:
server_socket.close()
# 启动服务器
if __name__ == "__main__":
start_tcp_server()TCP客户端连接到服务器,发送请求并接收响应。
import socket
def tcp_client(host='localhost', port=8888):
"""TCP客户端"""
try:
# 创建TCP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 连接服务器
client_socket.connect((host, port))
print(f"已连接到服务器 {host}:{port}")
# 发送消息
message = "你好,服务器!"
client_socket.send(message.encode('utf-8'))
print(f"发送消息: {message}")
# 接收响应
response = client_socket.recv(1024).decode('utf-8')
print(f"服务器响应: {response}")
except Exception as e:
print(f"客户端错误: {e}")
finally:
client_socket.close()
print("连接已关闭")
# 运行客户端
if __name__ == "__main__":
tcp_client()UDP是无连接的协议,传输速度更快,但不保证数据一定能到达。
import socket
def start_udp_server(host='localhost', port=8889):
"""启动UDP服务器"""
# 创建UDP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定地址和端口
server_socket.bind((host, port))
print(f"UDP服务器启动在 {host}:{port}")
try:
while True:
# 接收数据
data, client_address = server_socket.recvfrom(1024)
message = data.decode('utf-8')
print(f"收到来自 {client_address} 的消息: {message}")
# 发送响应
response = f"UDP服务器已收到: {message}"
server_socket.sendto(response.encode('utf-8'), client_address)
except KeyboardInterrupt:
print("UDP服务器关闭")
finally:
server_socket.close()
# 启动UDP服务器
if __name__ == "__main__":
start_udp_server()import socket
def udp_client(host='localhost', port=8889):
"""UDP客户端"""
try:
# 创建UDP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 发送消息
message = "你好,UDP服务器!"
client_socket.sendto(message.encode('utf-8'), (host, port))
print(f"发送消息: {message}")
# 接收响应
data, server_address = client_socket.recvfrom(1024)
response = data.decode('utf-8')
print(f"服务器响应: {response}")
except Exception as e:
print(f"UDP客户端错误: {e}")
finally:
client_socket.close()
# 运行UDP客户端
if __name__ == "__main__":
udp_client()import socket
import threading
from concurrent.futures import ThreadPoolExecutor
def scan_port(host, port, timeout=1):
"""扫描单个端口"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
print(f"端口 {port} 开放")
return port
except Exception:
pass
return None
def port_scanner(host, start_port=1, end_port=1024, max_workers=100):
"""多线程端口扫描器"""
print(f"开始扫描 {host} 的端口 {start_port}-{end_port}")
open_ports = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有扫描任务
futures = [
executor.submit(scan_port, host, port)
for port in range(start_port, end_port + 1)
]
# 收集结果
for future in futures:
result = future.result()
if result:
open_ports.append(result)
print(f"扫描完成,发现 {len(open_ports)} 个开放端口")
return open_ports
# 使用示例
if __name__ == "__main__":
# 扫描本地主机常用端口
open_ports = port_scanner('localhost', 80, 100, 50)
print(f"开放端口: {open_ports}")import socket
import threading
class SimpleHTTPServer:
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.server_socket = None
def handle_client(self, client_socket, client_address):
"""处理客户端请求"""
try:
# 接收HTTP请求
request = client_socket.recv(1024).decode('utf-8')
print(f"收到来自 {client_address} 的请求")
# 解析请求行
request_line = request.split('\n')[0]
method, path, version = request_line.split()
# 生成响应
if path == '/':
response_body = """
<html>
<head><title>简单HTTP服务器</title></head>
<body>
<h1>欢迎使用Python HTTP服务器</h1>
<p>这是一个简单的HTTP服务器示例</p>
</body>
</html>
"""
response = f"""HTTP/1.1 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: {len(response_body)}
{response_body}"""
else:
response = """HTTP/1.1 404 Not Found
Content-Type: text/plain
页面未找到"""
# 发送响应
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"处理请求时出错: {e}")
finally:
client_socket.close()
def start(self):
"""启动HTTP服务器"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"HTTP服务器启动在 http://{self.host}:{self.port}")
try:
while True:
client_socket, client_address = self.server_socket.accept()
# 为每个客户端创建新线程
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, client_address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("HTTP服务器关闭")
finally:
if self.server_socket:
self.server_socket.close()
# 启动HTTP服务器
if __name__ == "__main__":
server = SimpleHTTPServer()
server.start()import socket
import threading
class ChatServer:
def __init__(self, host='localhost', port=9999):
self.host = host
self.port = port
self.clients = []
self.nicknames = []
self.server_socket = None
def broadcast(self, message, sender_socket=None):
"""向所有客户端广播消息"""
for client in self.clients:
if client != sender_socket:
try:
client.send(message)
except:
# 移除断开连接的客户端
self.remove_client(client)
def remove_client(self, client_socket):
"""移除客户端"""
if client_socket in self.clients:
index = self.clients.index(client_socket)
nickname = self.nicknames[index]
self.clients.remove(client_socket)
self.nicknames.remove(nickname)
broadcast_message = f"{nickname} 离开了聊天室".encode('utf-8')
self.broadcast(broadcast_message)
print(f"{nickname} 断开连接")
def handle_client(self, client_socket):
"""处理客户端消息"""
while True:
try:
message = client_socket.recv(1024)
if message:
self.broadcast(message, client_socket)
else:
self.remove_client(client_socket)
break
except:
self.remove_client(client_socket)
break
def start(self):
"""启动聊天服务器"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"聊天服务器启动在 {self.host}:{self.port}")
try:
while True:
client_socket, address = self.server_socket.accept()
print(f"新连接来自: {address}")
# 请求昵称
client_socket.send("请输入你的昵称: ".encode('utf-8'))
nickname = client_socket.recv(1024).decode('utf-8').strip()
self.clients.append(client_socket)
self.nicknames.append(nickname)
print(f"用户 {nickname} 加入聊天室")
welcome_message = f"{nickname} 加入了聊天室".encode('utf-8')
self.broadcast(welcome_message)
# 为每个客户端创建线程
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket,)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("聊天服务器关闭")
finally:
if self.server_socket:
self.server_socket.close()
# 启动聊天服务器
if __name__ == "__main__":
chat_server = ChatServer()
chat_server.start()import socket
import threading
class ChatClient:
def __init__(self, host='localhost', port=9999):
self.host = host
self.port = port
self.client_socket = None
self.nickname = ""
def receive_messages(self):
"""接收服务器消息"""
while True:
try:
message = self.client_socket.recv(1024).decode('utf-8')
print(message)
except:
print("与服务器断开连接")
self.client_socket.close()
break
def send_messages(self):
"""发送消息到服务器"""
while True:
try:
message = input()
if message.lower() == '退出':
break
self.client_socket.send(message.encode('utf-8'))
except:
print("发送消息失败")
break
def start(self):
"""启动聊天客户端"""
try:
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
# 接收昵称提示
nickname_prompt = self.client_socket.recv(1024).decode('utf-8')
print(nickname_prompt, end='')
self.nickname = input().strip()
self.client_socket.send(self.nickname.encode('utf-8'))
print(f"连接到聊天室,输入'退出'来离开")
# 启动接收消息线程
receive_thread = threading.Thread(target=self.receive_messages)
receive_thread.daemon = True
receive_thread.start()
# 主线程处理发送消息
self.send_messages()
except Exception as e:
print(f"连接失败: {e}")
finally:
if self.client_socket:
self.client_socket.close()
# 启动聊天客户端
if __name__ == "__main__":
chat_client = ChatClient()
chat_client.start()import socket
import errno
def robust_socket_operation():
"""健壮的Socket操作示例"""
try:
# 创建socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10) # 设置超时
# 连接服务器
sock.connect(('localhost', 8888))
# 发送数据
message = "测试消息"
sock.sendall(message.encode('utf-8'))
# 接收数据
data = sock.recv(1024)
print(f"收到响应: {data.decode('utf-8')}")
except socket.timeout:
print("连接超时")
except socket.gaierror as e:
print(f"地址解析错误: {e}")
except socket.error as e:
if e.errno == errno.ECONNREFUSED:
print("连接被拒绝")
else:
print(f"Socket错误: {e}")
except Exception as e:
print(f"其他错误: {e}")
finally:
if 'sock' in locals():
sock.close()
# 使用示例
robust_socket_operation()以下列出了 Python 网络编程的一些重要模块:
| 协议 | 功能用处 | 端口号 | Python 模块 |
|---|---|---|---|
| HTTP | 网页访问 | 80 | httplib, urllib, xmlrpclib |
| NNTP | 阅读和张贴新闻文章,俗称为"帖子" | 119 | nntplib |
| FTP | 文件传输 | 20 | ftplib, urllib |
| SMTP | 发送邮件 | 25 | smtplib |
| POP3 | 接收邮件 | 110 | poplib |
| IMAP4 | 获取邮件 | 143 | imaplib |
| Telnet | 命令行 | 23 | telnetlib |
| Gopher | 信息查找 | 70 | gopherlib, urllib |
选择合适的协议:
TCP:需要可靠传输,如文件传输、网页访问
UDP:实时性要求高,如视频流、游戏
处理并发连接:
使用多线程处理多个客户端
考虑使用异步编程提高性能
安全性考虑:
验证客户端输入
使用SSL/TLS加密敏感数据
资源管理:
及时关闭不需要的连接
使用with语句自动管理资源
掌握Python网络编程,你就能开发各种网络应用,从简单的工具到复杂的服务器系统。这些技能在Web开发、分布式系统和网络工具开发中都非常有用。
本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!