tt-server/server.py
murmur 14aa2427d2 增加分包处理逻辑
扩大接收缓存至约1MB
实测1520字节连发1分钟无丢包
完善部分log
2023-03-13 15:14:36 +08:00

285 lines
7.5 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# coding:utf-8
import socketserver
import subprocess
import pickle
offs = 0 # 偏移量
p = {}
# 保存终端信息ccid与地址对
# p={ccid=addr,}
# 缓存未能正常发送消息的客户端
pp = {}
# cache={}
from collections import defaultdict
cache = defaultdict(list)
def save_cache(dic):
"""缓存未发送成功的消息
仅在目标终端未上线或发送失败时缓存
发送失败的一般原因是终端掉线但未及时更新在线列表"""
# f=open("./cache.txt",mode='w+')
# f.write(str(dic))
# f.close
pickle.dump(dic, open("./cache.txt", "wb"))
def load_cache():
"""程序启动时加载缓存"""
try:
# f=open("./cache.txt")
# c = eval(f.read())
# f.close()
c = pickle.load(open("./cache.txt", "rb"))
return c
except:
return None
def update_pairs(addr, ccid, clear=0):
"""更新终端库
addr:绑定tcp地址
ccid:终端号
clear:清空掉线终端"""
if ccid and not clear:
# 添加或更新客户端
p[ccid] = addr
pp[addr] = ccid
else:
del p[ccid]
del pp[addr]
# 打印在线终端
for i in p:
print(i, "<--->", p[i])
# for i in pp:
# print(i,"-",pp[i])
def get_addr(ccid):
"""获取ccid对应的地址"""
try:
# 如果有直接返回
return p[ccid]
except Exception:
# 没有就返回None
return None
def get_ccid(addr):
try:
# 如果有直接返回
return pp[addr]
except Exception:
# 没有就返回None
print("终端未上报心跳注册ccid。")
return None
"""
+--------+--------------+----------+----------+--------------+------------------------+--------+
| ff | 长度 | 地址信息 | 信令 | 内容 | 校验位 |
+--------+--------------+----------+----------+--------------+------------------------+--------+
| | 2Bytes | 4Bytes | 心跳 | 0x01 | 时间戳32bit单位ms | 2Bytes |
+ $TTMS +--------------+----------+----------+--------------+------------------------+--------+
| | 2Bytes | 4Bytes | 传输数据 | 0xaa | 载荷 | 2Bytes |
+--------+--------------+----------+----------+--------------+------------------------+--------+
"""
def send(fd, data):
try:
fd.send(data)
return 0
except:
return -1
def tt_hh(addr, data):
"""处理心跳"""
# 更新pairs
ccid = data[7 + offs : 11 + offs]
update_pairs(addr, ccid)
# 组帧再返回
# 从缓存中匹配数据并重发
# 仅有新终端上线时才重发
if ccid in cache:
msgs = cache[ccid][:] # 浅拷贝
sccid=msgs[0][7 + offs : 11 + offs]
print("",len(msgs),"包缓存数据待发。",sccid,"-->",ccid)
for i in range(len(msgs)):
try:
# 按缓存先后顺序发送
addr.send(msgs[i])
cache[ccid].pop(0)
print("",i+1,"包缓存数据发送完成。")
except:
# 异常时退出循环不用再尝试发送后续msgs
# 退出前清空已发送成功的缓存数据
# del方法会改变list大小
# while i:
# cache[ccid].pop(0)
# i = i - 1
print("",i+1,"包发送失败,停止尝试")
break
if cache[ccid] == []:
print("缓存数据全部发送完成。",sccid,"-->",ccid)
del cache[ccid]
save_cache(cache)
return 0
def tt_trans(addr, data):
"""发送数据到指定ccid"""
sccid = get_ccid(addr)
if not sccid:
return -1
tccid = data[7 + offs : 11 + offs]
taddr = get_addr(tccid)
# 组帧
msg = bytearray(data[:])
msg[:5] = b"$TTMX"
msg[7 + offs : 11 + offs] = sccid
# 计算校验和
crc = 0
for i in msg[:-1]:
crc = crc ^ i
msg[-1] = crc
if not taddr:
# 未找到匹配的在线终端
# 更新缓存
# {tccid1=[msg1,msg2,...],...}
cache[tccid].append(msg)
save_cache(cache)
print("终端未在线或ccid",tccid,"错误。数据已缓存,待目标终端上线后重发。")
return 0
print("匹配在线终端为:", tccid, "<--->", taddr)
# 发送
try:
taddr.send(msg)
print("数据发送成功。",sccid,"-->",tccid)
except Exception:
# 发送失败
taddr.close()
# 更新pairs清空对应终端
update_pairs(taddr, tccid, 1)
# 更新缓存
# {tccid1=[msg1,msg2,...],...}
cache[tccid].append(msg)
save_cache(cache)
print("发送失败,终端可能已掉线。数据已缓存,待目标终端上线后重发。", taddr)
return 0
def err_handle(flag, addr):
"""错误处理\n
默认打印错误信息
1-非法连接,关闭连接
"""
return 0
def data_split(data):
# 按帧头分割长数据
h= b"$TTMS" #b'\x24\x54\x54\x4d\x53'
dlist= data.split(h)[1:]
for i in range(len(dlist)):
dlist[i] = h + dlist[i]
print("收到",len(dlist),"包连续数据。")
return dlist
def tt_decode(addr, data):
"""处理收到的数据
数据正常返回0
数据异常返回-1"""
flag = -1
# 依据帧头和最小长度初判数据有效性
# 即使是多条数据组成的长数据也默认帧格式是完整的,前几个字节必须是帧头
if data[:5] != b"$TTMS" or len(data) < 12 :
return -1
if len(data)<200:
# 可能大多数场景是小包
# 小包数据不处理分包,速度快
dlist=[data]
else:
# 长数据需要分包
dlist = data_split(data)
for data in dlist:
cmd = data[11 + offs]
# 回退到if-elif而不用match因服务器默认版本3.8
if cmd == 0x01:
# 处理心跳
flag = tt_hh(addr, data)
elif cmd == 0xAA:
# 处理数据
if tt_trans(addr, data):
flag = -1
break
else:
flag=0
else:
break
return flag
class MyServer(socketserver.BaseRequestHandler):
def handle(self): # 回调
print("终端", self.client_address,"已上线等待上报心跳注册ccid。")
conn = self.request
# print(type(conn),conn.fd)
while True:
data = conn.recv(1000000)
# update_pairs(conn,data[7+offs:11+offs])
if not data:
break
print(
"接收到新数据", self.client_address, ",长度", len(data), "\r\n", data.hex(" ")
)
# ack_msg = "got from "+ str(ip) + " to " + str(self.client_address) + data
# conn.send(data[:6])
if tt_decode(conn, data):
conn.close()
print(self.client_address, "疑是非法连接,已切断。")
break
if __name__ == "__main__":
# load_cache
cache = load_cache()
server = socketserver.ThreadingTCPServer(("", 7222), MyServer)
ip, port = server.server_address
print("服务端初始化成功。", ip, port)
server.serve_forever()
"""
以下几种情况服务器会主动断开终端:
单次数据不以帧头开始
单次数据长度不足11
未上报心跳注册ccid而直接发数据
"""