添加缓存机制,处理粘包
debug及trlog采用文件夹归类
This commit is contained in:
parent
d0f6d22824
commit
0f8e0dd8fa
17
s2f.py
17
s2f.py
@ -1,8 +1,19 @@
|
||||
import datetime
|
||||
import os
|
||||
def gettm():
|
||||
t= datetime.datetime.now()
|
||||
d = str(t)
|
||||
return d,d[:10]
|
||||
|
||||
def save2file(data,isR=1):
|
||||
# pass
|
||||
t= datetime.datetime.now()
|
||||
l=f"{'[R]' if isR else '[T]'} - {str(t)} --- {data.hex(' ')}\n"
|
||||
f=open('./rxlog.txt','a')
|
||||
t,d = gettm()
|
||||
|
||||
p = '/'.join(['trlog',d])
|
||||
if not os.path.exists(p):
|
||||
os.makedirs(p)
|
||||
|
||||
l=f"{'[R]' if isR else '[T]'} - {t} --- {data.hex(' ')}\n"
|
||||
f=open('/'.join(['trlog',d,'trlog.txt']),'a')
|
||||
f.write(l)
|
||||
f.close
|
80
server.py
80
server.py
@ -12,6 +12,7 @@ import socket
|
||||
from threading import Timer
|
||||
import s2f
|
||||
from loguru import logger
|
||||
import os
|
||||
|
||||
SERVER = "http://www.pushplus.plus/send"
|
||||
TOKEN = "ff328cba923a4225bc4acd0086a9014c"
|
||||
@ -284,6 +285,7 @@ def tt_hh(addr, data):
|
||||
msg[-1] = crc
|
||||
add_timestamp()
|
||||
addr.send(msg)
|
||||
s2f.save2file(msg,0)
|
||||
logger.debug("服务器回复心跳包。")
|
||||
|
||||
# 从缓存中匹配到数据才重发
|
||||
@ -298,6 +300,7 @@ def tt_hh(addr, data):
|
||||
add_timestamp()
|
||||
# 按缓存先后顺序发送
|
||||
addr.send(msgs[i])
|
||||
s2f.save2file(msg)
|
||||
time.sleep(1)
|
||||
# 成功则清空已发送成功的缓存数据
|
||||
cache[ccid].pop(0)
|
||||
@ -392,16 +395,61 @@ def tt_trans(addr, data):
|
||||
s2wx("","",f"发送失败,终端{taddr}可能已掉线。 \n数据已缓存,待目标终端上线后重发。")
|
||||
return 0
|
||||
|
||||
def isInlst(tar,lst):
|
||||
"""
|
||||
在列表中查找目标子列表
|
||||
|
||||
def data_split(data):
|
||||
返回匹配的列表
|
||||
"""
|
||||
index = []
|
||||
for x in range(len(lst)):
|
||||
if lst[x:x+len(tar)] == tar :
|
||||
index.append(x)
|
||||
|
||||
rst = []
|
||||
for x in range(len(index)):
|
||||
if x<len(index)-1:
|
||||
rst.append(lst[index[x]:index[x+1]])
|
||||
if x == len(index)-1:
|
||||
rst.append(lst[index[-1]:])
|
||||
|
||||
return rst
|
||||
|
||||
# 便于拼合数据的缓存
|
||||
recCache = {}
|
||||
def data_split(addr):
|
||||
# 按帧头分割长数据
|
||||
|
||||
h = b"$TTMS" # b'\x24\x54\x54\x4d\x53'
|
||||
dlist = data.split(h)[1:]
|
||||
dlist = isInlst([0x24,0x54,0x54,0x4d,0x53],recCache[addr])
|
||||
# print(dlist)
|
||||
|
||||
#校验每帧数据
|
||||
for i in range(len(dlist)):
|
||||
dlist[i] = h + dlist[i]
|
||||
flen = (dlist[i][5]<<8)+dlist[i][6]
|
||||
if flen != len(dlist[i]) :
|
||||
logger.error("第{}数据帧不完整{}--{}",i+1,flen,len(dlist[i]))
|
||||
|
||||
# 清空缓存
|
||||
recCache[addr].clear()
|
||||
|
||||
# 因为无帧尾,需添加帧是否完整的校验
|
||||
if len(dlist) :
|
||||
# 目前情形多为两包粘包,故最后一包肯定不完整
|
||||
logger.debug(bytes(dlist[-1]).hex(" "))
|
||||
flen = (dlist[-1][5]<<8)+dlist[-1][6]
|
||||
if flen <= len(dlist[-1]) :
|
||||
recCache[addr].extend(dlist[-1][flen:])
|
||||
for x in range(len(dlist[-1])-flen):
|
||||
dlist[-1].pop()
|
||||
else:
|
||||
recCache[addr].extend(dlist[-1][:])
|
||||
dlist.pop()
|
||||
|
||||
logger.debug(f"解析为 {len(dlist)} 包数据。")
|
||||
return dlist
|
||||
if len(recCache[addr]):
|
||||
logger.warning("缓存数据为{}",bytes(recCache[addr]).hex(" "))
|
||||
return dlist[:]
|
||||
|
||||
|
||||
def tt_decode(addr, data):
|
||||
@ -409,16 +457,19 @@ def tt_decode(addr, data):
|
||||
|
||||
数据正常返回0
|
||||
数据异常返回-1"""
|
||||
# 追加至缓存末尾
|
||||
if addr not in recCache:
|
||||
recCache[addr]=[]
|
||||
recCache[addr].extend(data)
|
||||
|
||||
flag = -1
|
||||
|
||||
# 依据帧头和最小长度初判数据有效性
|
||||
# 只要接收数据给的缓存足够大,即使是多条数据组成的长数据也默认帧格式是完整的,前几个字节必须是帧头
|
||||
if data[:5] != b"$TTMS" or len(data) < 12:
|
||||
return -1
|
||||
|
||||
dlist = data_split(data)
|
||||
dlist = data_split(addr)
|
||||
if len(dlist)==0:
|
||||
logger.warning("not enough data")
|
||||
|
||||
for data in dlist:
|
||||
data = bytes(data)
|
||||
cmd = data[11 + offs]
|
||||
# 回退到if-elif而不用match,因服务器默认版本3.8
|
||||
if cmd == 0x01:
|
||||
@ -471,7 +522,7 @@ class MyServer(socketserver.BaseRequestHandler):
|
||||
# print(self.recDataBuff[:])
|
||||
if tt_decode(conn, data):
|
||||
self.err= "切断。"
|
||||
logger.warning(f"{self.client_address}疑是非法连接")
|
||||
# logger.warning(f"{self.client_address}疑是非法连接")
|
||||
# conn.close()
|
||||
# break
|
||||
|
||||
@ -484,18 +535,23 @@ CCID:{pp[self.request].decode()}
|
||||
update_pairs(self.request,get_ccid(self.request),1)
|
||||
logger.info(t)
|
||||
s2wx("","天通终端下线了",t)
|
||||
recCache[self.request].clear()
|
||||
# print("终端",self.client_address,"下线了。",self.err)
|
||||
# update_pairs(self.request,get_ccid(self.request),1)
|
||||
else:
|
||||
t= f"TCP客户端{str(self.client_address)}已断开连接。{self.err}"
|
||||
logger.info(t)
|
||||
recCache[self.request].clear()
|
||||
# s2wx("天通消息","TCP客户端断开",t)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.add("log/log_{time}.log")
|
||||
_,t = s2f.gettm()
|
||||
if not os.path.exists('debug'):
|
||||
os.mkdir('debug')
|
||||
logger.add('/'.join(['debug',t,'/log_{time}.log']))
|
||||
# load_cache
|
||||
if load_cache():
|
||||
cache = load_cache()
|
||||
|
Loading…
Reference in New Issue
Block a user