tt-server/server.py

246 lines
6.2 KiB
Python
Raw Normal View History

2023-03-11 08:04:43 +00:00
#!/usr/bin/env python3
2023-03-11 13:25:34 +00:00
# coding:utf-8
2023-03-10 01:34:44 +00:00
import socketserver
import subprocess
2023-03-11 03:51:56 +00:00
import pickle
2023-03-11 13:25:34 +00:00
offs = 0 # 偏移量
p = {}
# 保存终端信息ccid与地址对
# p={ccid=addr,}
# 缓存未能正常发送消息的客户端
pp = {}
2023-03-11 03:51:56 +00:00
# cache={}
2023-03-11 13:25:34 +00:00
from collections import defaultdict
cache = defaultdict(list)
2023-03-11 03:51:56 +00:00
2023-03-10 01:34:44 +00:00
2023-03-11 03:51:56 +00:00
def save_cache(dic):
2023-03-11 13:25:34 +00:00
"""缓存未发送成功的消息
仅在未发送成功时缓存"""
2023-03-11 03:51:56 +00:00
# f=open("./cache.txt",mode='w+')
# f.write(str(dic))
# f.close
2023-03-11 13:25:34 +00:00
pickle.dump(dic, open("./cache.txt", "wb"))
2023-03-11 03:51:56 +00:00
def load_cache():
2023-03-11 13:25:34 +00:00
"""程序启动时加载缓存"""
2023-03-11 03:51:56 +00:00
try:
# f=open("./cache.txt")
# c = eval(f.read())
# f.close()
2023-03-11 13:25:34 +00:00
c = pickle.load(open("./cache.txt", "rb"))
2023-03-11 03:51:56 +00:00
return c
except:
return None
2023-03-10 09:58:19 +00:00
2023-03-11 13:25:34 +00:00
def update_pairs(addr, ccid, clear=0):
"""更新终端库
2023-03-10 09:58:19 +00:00
addr:绑定tcp地址
ccid:终端号
2023-03-11 13:25:34 +00:00
clear:清空掉线终端"""
2023-03-10 09:58:19 +00:00
if ccid and not clear:
2023-03-11 13:25:34 +00:00
# 添加或更新客户端
2023-03-10 09:58:19 +00:00
p[ccid] = addr
pp[addr] = ccid
else:
del p[ccid]
del pp[addr]
# 打印在线终端
for i in p:
2023-03-11 13:25:34 +00:00
print(i, "<--->", p[i])
2023-03-10 09:58:19 +00:00
# for i in pp:
# print(i,"-",pp[i])
2023-03-11 13:25:34 +00:00
2023-03-10 09:58:19 +00:00
def get_addr(ccid):
2023-03-11 13:25:34 +00:00
"""获取ccid对应的地址"""
2023-03-10 09:58:19 +00:00
2023-03-10 01:34:44 +00:00
try:
2023-03-10 09:58:19 +00:00
# 如果有直接返回
return p[ccid]
2023-03-10 01:34:44 +00:00
except Exception:
2023-03-11 13:25:34 +00:00
# 没有就返回None
2023-03-10 09:58:19 +00:00
return None
2023-03-11 13:25:34 +00:00
2023-03-10 09:58:19 +00:00
def get_ccid(addr):
try:
# 如果有直接返回
return pp[addr]
except Exception:
2023-03-11 13:25:34 +00:00
# 没有就返回None
2023-03-10 09:58:19 +00:00
return None
2023-03-11 13:25:34 +00:00
"""
2023-03-10 09:58:19 +00:00
+--------+--------------+----------+----------+--------------+------------------------+--------+
| ff | 长度 | 地址信息 | 信令 | 内容 | 校验位 |
+--------+--------------+----------+----------+--------------+------------------------+--------+
| | 2Bytes | 4Bytes | 心跳 | 0x01 | 时间戳32bit单位ms | 2Bytes |
+ $TTMS +--------------+----------+----------+--------------+------------------------+--------+
| | 2Bytes | 4Bytes | 传输数据 | 0xaa | 载荷 | 2Bytes |
+--------+--------------+----------+----------+--------------+------------------------+--------+
2023-03-11 13:25:34 +00:00
"""
2023-03-10 09:58:19 +00:00
2023-03-11 13:25:34 +00:00
def send(fd, data):
2023-03-11 03:51:56 +00:00
try:
fd.send(data)
return 0
except:
return -1
2023-03-11 13:25:34 +00:00
def tt_hh(addr, data):
"""处理心跳"""
2023-03-10 09:58:19 +00:00
# 更新pairs
2023-03-11 13:25:34 +00:00
ccid = data[7 + offs : 11 + offs]
update_pairs(addr, ccid)
2023-03-10 09:58:19 +00:00
# 组帧再返回
2023-03-11 03:51:56 +00:00
# 从缓存中匹配数据并重发
2023-03-11 08:04:43 +00:00
# 仅有新终端上线时才重发
2023-03-11 03:51:56 +00:00
if ccid in cache:
2023-03-13 02:11:29 +00:00
msgs = cache[ccid][:] # 浅拷贝
sccid=msgs[0][7 + offs : 11 + offs].decode()
print("",len(msgs),"包缓存数据待发源ccid",sccid)
2023-03-11 08:04:43 +00:00
for i in range(len(msgs)):
try:
# 按缓存先后顺序发送
2023-03-11 08:04:43 +00:00
addr.send(msgs[i])
2023-03-13 02:11:29 +00:00
cache[ccid].pop(0)
print("",i+1,"包缓存数据发送完成。")
2023-03-11 08:04:43 +00:00
except:
# 异常时退出循环不用再尝试发送后续msgs
# 退出前清空已发送成功的缓存数据
# del方法会改变list大小
2023-03-13 02:11:29 +00:00
# while i:
# cache[ccid].pop(0)
# i = i - 1
print("",i+1,"包发送失败,停止尝试")
2023-03-11 08:04:43 +00:00
break
2023-03-13 02:11:29 +00:00
if cache[ccid] == []:
print("缓存数据发送完成源ccid",sccid)
del cache[ccid]
save_cache(cache)
2023-03-10 09:58:19 +00:00
return 0
2023-03-11 13:25:34 +00:00
def tt_trans(addr, data):
"""发送数据到指定ccid"""
tccid = data[7 + offs : 11 + offs]
2023-03-10 09:58:19 +00:00
taddr = get_addr(tccid)
# 组帧
msg = bytearray(data[:])
2023-03-11 13:25:34 +00:00
msg[:5] = b"$TTMX"
msg[7 + offs : 11 + offs] = get_ccid(addr)
2023-03-10 09:58:19 +00:00
# 计算校验和
crc = 0
2023-03-11 03:51:56 +00:00
for i in msg[:-1]:
2023-03-10 09:58:19 +00:00
crc = crc ^ i
2023-03-11 13:25:34 +00:00
msg[-1] = crc
2023-03-10 09:58:19 +00:00
2023-03-11 03:51:56 +00:00
if not taddr:
# 未找到匹配的在线终端
2023-03-11 08:04:43 +00:00
2023-03-11 13:25:34 +00:00
# 更新缓存
# {tccid1=[msg1,msg2,...],...}
2023-03-11 03:51:56 +00:00
cache[tccid].append(msg)
save_cache(cache)
2023-03-11 08:04:43 +00:00
print("终端未在线或ccid错。")
2023-03-11 03:51:56 +00:00
return 0
2023-03-11 13:25:34 +00:00
print("匹配终端:", tccid, "<--->", taddr)
2023-03-11 03:51:56 +00:00
2023-03-10 09:58:19 +00:00
# 发送
try:
taddr.send(msg)
except Exception:
2023-03-11 08:04:43 +00:00
# 发送失败
taddr.close()
2023-03-11 03:51:56 +00:00
# 更新pairs清空对应终端
2023-03-11 13:25:34 +00:00
update_pairs(taddr, tccid, 1)
# 更新缓存
# {tccid1=[msg1,msg2,...],...}
2023-03-11 03:51:56 +00:00
cache[tccid].append(msg)
save_cache(cache)
2023-03-11 13:25:34 +00:00
print("发送失败,终端可能已掉线。", taddr)
2023-03-10 09:58:19 +00:00
return 0
2023-03-11 13:25:34 +00:00
def err_handle(flag, addr):
"""错误处理
2023-03-10 09:58:19 +00:00
默认打印错误信息
1-非法连接关闭连接
2023-03-11 13:25:34 +00:00
"""
2023-03-10 09:58:19 +00:00
return 0
2023-03-11 13:25:34 +00:00
def tt_decode(addr, data):
"""处理收到的数据
数据正常返回0
2023-03-11 13:25:34 +00:00
数据异常返回-1"""
2023-03-10 09:58:19 +00:00
# 依据帧头和长度初判数据有效性
2023-03-11 13:25:34 +00:00
if data[:5] != b"$TTMS" or len(data) < 12 or len(data) > 200:
2023-03-10 09:58:19 +00:00
return -1
2023-03-11 13:25:34 +00:00
cmd = data[11 + offs]
2023-03-12 03:43:08 +00:00
# 回退到if-elif服务器默认版本3.8
if cmd == 0x01:
# 处理心跳
tt_hh(addr, data)
elif cmd == 0xAA:
# 处理数据
tt_trans(addr, data)
else:
return -1
2023-03-10 09:58:19 +00:00
2023-03-10 01:34:44 +00:00
class MyServer(socketserver.BaseRequestHandler):
2023-03-11 13:25:34 +00:00
def handle(self): # 回调
2023-03-13 02:11:29 +00:00
print("终端", self.client_address,"已上线,等待上报心跳。")
2023-03-10 01:34:44 +00:00
conn = self.request
2023-03-10 09:58:19 +00:00
# print(type(conn),conn.fd)
2023-03-11 13:25:34 +00:00
2023-03-10 01:34:44 +00:00
while True:
data = conn.recv(200)
2023-03-10 09:58:19 +00:00
# update_pairs(conn,data[7+offs:11+offs])
2023-03-10 01:34:44 +00:00
if not data:
break
2023-03-11 13:25:34 +00:00
print(
"接收到新数据", self.client_address, ",长度", len(data), "\r\n", data.hex(" ")
)
# ack_msg = "got from "+ str(ip) + " to " + str(self.client_address) + data
2023-03-10 09:58:19 +00:00
# conn.send(data[:6])
2023-03-11 13:25:34 +00:00
if tt_decode(conn, data):
2023-03-10 09:58:19 +00:00
conn.close()
2023-03-11 13:25:34 +00:00
print(self.client_address, "疑是非法连接,已切断。")
2023-03-10 09:58:19 +00:00
break
2023-03-10 01:34:44 +00:00
2023-03-11 13:25:34 +00:00
if __name__ == "__main__":
2023-03-11 03:51:56 +00:00
# load_cache
cache = load_cache()
2023-03-12 05:32:32 +00:00
server = socketserver.ThreadingTCPServer(("", 7222), MyServer)
2023-03-10 01:34:44 +00:00
ip, port = server.server_address
2023-03-11 13:25:34 +00:00
print("服务端已建立:", ip, port)
2023-03-10 01:34:44 +00:00
server.serve_forever()