更新心跳回复函数

This commit is contained in:
murmur 2023-03-13 17:20:43 +08:00
parent ac787a2fde
commit a6ff83f9b7

View File

@ -4,6 +4,8 @@ import socketserver
import subprocess import subprocess
import pickle import pickle
from collections import defaultdict from collections import defaultdict
import datetime
import time
cache = defaultdict(list) cache = defaultdict(list)
offs = 0 # 偏移量 offs = 0 # 偏移量
@ -13,7 +15,8 @@ p = {}
# 地址 ==ccid # 地址 ==ccid
pp = {} pp = {}
def add_timestamp():
print(datetime.datetime.now(),":")
def save_cache(dic): def save_cache(dic):
@ -53,6 +56,7 @@ def update_pairs(addr, ccid, clear=0):
del p[ccid] del p[ccid]
del pp[addr] del pp[addr]
print("ccid已更新注册。")
# 打印在线终端 # 打印在线终端
for i in p: for i in p:
print(i, "<--->", p[i]) print(i, "<--->", p[i])
@ -85,9 +89,9 @@ def get_ccid(addr):
+--------+--------------+----------+----------+--------------+------------------------+--------+ +--------+--------------+----------+----------+--------------+------------------------+--------+
| ff | 长度 | 地址信息 | 信令 | 内容 | 校验位 | | ff | 长度 | 地址信息 | 信令 | 内容 | 校验位 |
+--------+--------------+----------+----------+--------------+------------------------+--------+ +--------+--------------+----------+----------+--------------+------------------------+--------+
| | 2Bytes | 4Bytes | 心跳 | 0x01 | 时间戳32bit单位ms | 2Bytes | | | 2Bytes | 4Bytes | 心跳 | 0x01 | 时间戳32bit单位ms | 1Bytes |
+ $TTMS +--------------+----------+----------+--------------+------------------------+--------+ + $TTMS +--------------+----------+----------+--------------+------------------------+--------+
| | 2Bytes | 4Bytes | 传输数据 | 0xaa | 载荷 | 2Bytes | | | 2Bytes | 4Bytes | 传输数据 | 0xaa | 载荷 | 1Bytes |
+--------+--------------+----------+----------+--------------+------------------------+--------+ +--------+--------------+----------+----------+--------------+------------------------+--------+
""" """
@ -98,8 +102,18 @@ def tt_hh(addr, data):
ccid = data[7 + offs : 11 + offs] ccid = data[7 + offs : 11 + offs]
update_pairs(addr, ccid) update_pairs(addr, ccid)
# 组帧再返回 # 组帧再返回
msg=data[:]
msg[12:-1]=val = '%04x'%int(time.time())
# 计算校验和
crc = 0
for i in msg[:-1]:
crc = crc ^ i
msg[-1] = crc
add_timestamp()
addr.send(msg)
print("回复心跳。")
# 从缓存中匹配数据并重发 # 从缓存中匹配到数据才重发
# 仅有新终端上线时才重发 # 仅有新终端上线时才重发
if ccid in cache: if ccid in cache:
msgs = cache[ccid][:] # 浅拷贝 msgs = cache[ccid][:] # 浅拷贝
@ -107,6 +121,7 @@ def tt_hh(addr, data):
print("",len(msgs),"包缓存数据待发。",sccid,"-->",ccid) print("",len(msgs),"包缓存数据待发。",sccid,"-->",ccid)
for i in range(len(msgs)): for i in range(len(msgs)):
try: try:
add_timestamp()
# 按缓存先后顺序发送 # 按缓存先后顺序发送
addr.send(msgs[i]) addr.send(msgs[i])
# 成功则清空已发送成功的缓存数据 # 成功则清空已发送成功的缓存数据
@ -121,8 +136,13 @@ def tt_hh(addr, data):
del cache[ccid] del cache[ccid]
save_cache(cache) save_cache(cache)
return 0 return 0
def check_valid(data):
# 检查数据有效性
return bytearray(data[:])
def tt_trans(addr, data): def tt_trans(addr, data):
"""发送数据到指定ccid""" """发送数据到指定ccid"""
@ -133,7 +153,7 @@ def tt_trans(addr, data):
tccid = data[7 + offs : 11 + offs] tccid = data[7 + offs : 11 + offs]
taddr = get_addr(tccid) taddr = get_addr(tccid)
# 组帧 # 组帧
msg = bytearray(data[:]) msg = check_valid(data)
msg[:5] = b"$TTMX" msg[:5] = b"$TTMX"
msg[7 + offs : 11 + offs] = sccid msg[7 + offs : 11 + offs] = sccid
@ -157,6 +177,7 @@ def tt_trans(addr, data):
# 发送 # 发送
try: try:
add_timestamp()
taddr.send(msg) taddr.send(msg)
print("数据发送成功。",sccid,"-->",tccid) print("数据发送成功。",sccid,"-->",tccid)
except Exception: except Exception:
@ -180,7 +201,7 @@ def data_split(data):
dlist= data.split(h)[1:] dlist= data.split(h)[1:]
for i in range(len(dlist)): for i in range(len(dlist)):
dlist[i] = h + dlist[i] dlist[i] = h + dlist[i]
print("收到",len(dlist),"连续数据。") print("解析为",len(dlist),"数据。")
return dlist return dlist
@ -193,16 +214,10 @@ def tt_decode(addr, data):
flag = -1 flag = -1
# 依据帧头和最小长度初判数据有效性 # 依据帧头和最小长度初判数据有效性
# 即使是多条数据组成的长数据也默认帧格式是完整的,前几个字节必须是帧头 # 只要接收数据给的缓存足够大,即使是多条数据组成的长数据也默认帧格式是完整的,前几个字节必须是帧头
if data[:5] != b"$TTMS" or len(data) < 12 : if data[:5] != b"$TTMS" or len(data) < 12 :
return -1 return -1
if len(data)<200:
# 可能大多数场景是小包
# 小包数据不处理分包,速度快
dlist=[data]
else:
# 长数据需要分包
dlist = data_split(data) dlist = data_split(data)
for data in dlist: for data in dlist:
cmd = data[11 + offs] cmd = data[11 + offs]
@ -224,7 +239,9 @@ def tt_decode(addr, data):
class MyServer(socketserver.BaseRequestHandler): class MyServer(socketserver.BaseRequestHandler):
def handle(self): # 回调 def handle(self): # 回调
add_timestamp()
print("终端", self.client_address,"已上线等待上报心跳注册ccid。") print("终端", self.client_address,"已上线等待上报心跳注册ccid。")
conn = self.request conn = self.request
# print(type(conn),conn.fd) # print(type(conn),conn.fd)
@ -233,6 +250,7 @@ class MyServer(socketserver.BaseRequestHandler):
data = conn.recv(1024000) data = conn.recv(1024000)
if not data: if not data:
break break
add_timestamp()
print( print(
"接收到新数据", self.client_address, ",长度", len(data), "\r\n", data.hex(" ") "接收到新数据", self.client_address, ",长度", len(data), "\r\n", data.hex(" ")
) )
@ -247,8 +265,8 @@ if __name__ == "__main__":
cache = load_cache() cache = load_cache()
server = socketserver.ThreadingTCPServer(("", 7222), MyServer) server = socketserver.ThreadingTCPServer(("", 7222), MyServer)
ip, port = server.server_address add_timestamp()
print("服务端初始化成功。", ip, port) print("服务端初始化成功。",server.server_address)
server.serve_forever() server.serve_forever()