tt-server/server.py
2023-03-18 14:53:09 +08:00

310 lines
8.1 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# coding:utf-8
import socketserver
import subprocess
import pickle
from collections import defaultdict
import datetime
import time
import requests
# import json
SERVER = "http://www.pushplus.plus/send"
TOKEN = "ff328cba923a4225bc4acd0086a9014c"
# SERVER = "https://msgpusher.com/push/test111"
# TOKEN = "tt"
def s2wx(title,content):
"""发送至微信.
title:标题
content:内容"""
ppmsg = {"token":TOKEN,
"template":"json",
"channel":"wechat",
"description": content,
}
ppmsg["title"] = title
ppmsg["content"] = content
# ppmsg["description"]= content
requests.post(SERVER, json=ppmsg)
cache = defaultdict(list)
offs = 0 # 偏移量
# 保存终端信息ccid=地址
p = {}
# 地址 ==ccid
pp = {}
def add_timestamp():
print(datetime.datetime.now(), ":")
def save_cache(dic):
"""缓存未发送成功的消息
仅在目标终端未上线或发送失败时缓存
发送失败的一般原因是终端掉线但未及时更新在线列表"""
# f=open("./cache.txt",mode='w+')
# f.write(str(dic))
# f.close
pickle.dump(dic, open("./cache.txt", "wb"))
def load_cache():
"""程序启动时加载缓存"""
try:
# f=open("./cache.txt")
# c = eval(f.read())
# f.close()
c = pickle.load(open("./cache.txt", "rb"))
return c
except:
return None
def update_pairs(addr, ccid, clear=0):
"""更新终端库
addr:绑定tcp地址
ccid:终端号
clear:清空掉线终端"""
if ccid and not clear:
# 添加或更新客户端
if ccid not in p:
t = "终端信息:\nCCID:" + ccid.decode() + "\n地址:" + str(addr.getpeername())
s2wx("天通终端上线",t)
p[ccid] = addr
pp[addr] = ccid
else:
del p[ccid]
del pp[addr]
print("ccid已更新注册。")
# 打印在线终端
for i in p:
print(i, "<--->", p[i])
# for i in pp:
# print(i,"-",pp[i])
def get_addr(ccid):
"""获取ccid对应的地址"""
try:
# 如果有直接返回
return p[ccid]
except Exception:
# 没有就返回None
return None
def get_ccid(addr):
try:
# 如果有直接返回
return pp[addr]
except Exception:
# 没有就返回None
print("终端未上报心跳注册ccid。")
return None
"""
+--------+--------------+----------+----------+--------------+------------------------+--------+
| ff | 长度 | 地址信息 | 信令 | 内容 | 校验位 |
+--------+--------------+----------+----------+--------------+------------------------+--------+
| | 2Bytes | 4Bytes | 心跳 | 0x01 | 时间戳32bit单位ms | 1Bytes |
+ $TTMS +--------------+----------+----------+--------------+------------------------+--------+
| | 2Bytes | 4Bytes | 传输数据 | 0xaa | 载荷 | 1Bytes |
+--------+--------------+----------+----------+--------------+------------------------+--------+
"""
def tt_hh(addr, data):
"""处理心跳"""
# 更新pairs
ccid = data[7 + offs : 11 + offs]
update_pairs(addr, ccid)
# 组帧再返回
msg = bytearray(data[:])
val = "%04x" % int(time.time())
tmp=[]
for i in range(0,len(val),2):
tmp.append(int(val[i:i+2],16))
msg[12:-1] = tmp
# 计算校验和
crc = 0
for i in msg[:-1]:
crc = crc ^ i
msg[-1] = crc
add_timestamp()
addr.send(msg)
print("回复心跳。")
# 从缓存中匹配到数据才重发
# 仅有新终端上线时才重发
if cache and ccid in cache:
msgs = cache[ccid][:] # 浅拷贝
sccid = msgs[0][7 + offs : 11 + offs]
print("", len(msgs), "包缓存数据待发。", sccid, "-->", ccid)
for i in range(len(msgs)):
try:
add_timestamp()
# 按缓存先后顺序发送
addr.send(msgs[i])
# 成功则清空已发送成功的缓存数据
cache[ccid].pop(0)
print("", i + 1, "包缓存数据发送完成。")
except:
# 异常时退出循环不用再尝试发送后续msgs
print("", i + 1, "包发送失败,停止尝试")
break
if cache[ccid] == []:
print("缓存数据全部发送完成。", sccid, "-->", ccid)
del cache[ccid]
save_cache(cache)
return 0
def check_valid(data):
# 检查数据有效性
return bytearray(data[:])
def tt_trans(addr, data):
"""发送数据到指定ccid"""
sccid = get_ccid(addr)
if not sccid:
return -1
tccid = data[7 + offs : 11 + offs]
taddr = get_addr(tccid)
# 组帧
msg = check_valid(data)
msg[:5] = b"$TTMX"
msg[7 + offs : 11 + offs] = sccid
# 计算校验和
crc = 0
for i in msg[:-1]:
crc = crc ^ i
msg[-1] = crc
if not taddr:
# 未找到匹配的在线终端
# 更新缓存
# {tccid1=[msg1,msg2,...],...}
cache[tccid].append(msg)
save_cache(cache)
print("终端未在线或ccid", tccid, "错误。数据已缓存,待目标终端上线后重发。")
return 0
print("匹配在线终端为:", tccid, "<--->", taddr)
# 发送
try:
add_timestamp()
taddr.send(msg)
print("数据发送成功。", sccid, "-->", tccid)
except Exception:
# 发送失败
taddr.close()
# 更新pairs清空对应终端
update_pairs(taddr, tccid, 1)
# 更新缓存
# {tccid1=[msg1,msg2,...],...}
cache[tccid].append(msg)
save_cache(cache)
print("发送失败,终端可能已掉线。数据已缓存,待目标终端上线后重发。", taddr)
return 0
def data_split(data):
# 按帧头分割长数据
h = b"$TTMS" # b'\x24\x54\x54\x4d\x53'
dlist = data.split(h)[1:]
for i in range(len(dlist)):
dlist[i] = h + dlist[i]
print("解析为", len(dlist), "包数据。")
return dlist
def tt_decode(addr, data):
"""处理收到的数据
数据正常返回0
数据异常返回-1"""
flag = -1
# 依据帧头和最小长度初判数据有效性
# 只要接收数据给的缓存足够大,即使是多条数据组成的长数据也默认帧格式是完整的,前几个字节必须是帧头
if data[:5] != b"$TTMS" or len(data) < 12:
return -1
dlist = data_split(data)
for data in dlist:
cmd = data[11 + offs]
# 回退到if-elif而不用match因服务器默认版本3.8
if cmd == 0x01:
# 处理心跳
flag = tt_hh(addr, data)
elif cmd == 0xAA:
# 处理数据
if tt_trans(addr, data):
flag = -1
break
else:
flag = 0
else:
break
return flag
class MyServer(socketserver.BaseRequestHandler):
def handle(self): # 回调
add_timestamp()
print("终端", self.client_address, "已上线等待上报心跳注册ccid。")
conn = self.request
# print(type(conn),conn.fd)
while True:
data = conn.recv(1024000)
if not data:
break
add_timestamp()
print(
"接收到新数据", self.client_address, ",长度", len(data), "\r\n", data.hex(" ")
)
if tt_decode(conn, data):
conn.close()
print(self.client_address, "疑是非法连接,已切断。")
break
if __name__ == "__main__":
# load_cache
cache = load_cache()
server = socketserver.ThreadingTCPServer(("", 7222), MyServer)
add_timestamp()
print("服务端初始化成功。", server.server_address)
server.serve_forever()
"""
以下几种情况服务器会主动断开终端:
单次数据不以帧头开始
单次数据长度不足11
未上报心跳注册ccid而直接发数据
"""