tt-server/server.py
2024-10-24 16:10:28 +08:00

612 lines
18 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# coding:utf-8
import socketserver
import subprocess
import pickle
from collections import defaultdict
import datetime
import time
import requests
import socket
# import json
from threading import Timer
from loguru import logger
import os
def gettm():
"""
返回时戳和日期
"""
t= datetime.datetime.now()
d = str(t)
return d,d[:10]
def save2file(data,isR=1):
"""
存储数据至文件
"""
t,d = gettm()
p = '/'.join(['./trlog',d])
if not os.path.exists(p):
os.makedirs(p)
l=f"{'[R]' if isR else '[T]'} - {t} --- {data}\n"
f=open('/'.join(['./trlog',d,'trlog.txt']),'a')
f.write(l)
f.close
SERVER = "http://www.pushplus.plus/send"
TOKEN = "ff328cba923a4225bc4acd0086a9014c"
# SERVER = "http://push.nmhd.eu.org:12722/push/murmur"
# TOKEN = "tt"
def pp2wx(title,content,description):
"""通过pushplus发送至微信有频次限制.
title:标题
content:内容"""
ppmsg = {"token":TOKEN,
"template":"json",
"channel":"wechat",
"description": description,
}
ppmsg["title"] = title
ppmsg["content"] = content
# ppmsg["description"]= content
requests.post(SERVER, json=ppmsg)
msgcache=[]
pushisbusy=0
def s2wx(title="",description="", content=""):
"""通过自建msgpusher发送至微信
当前title值无效
"""
# logger.debug(content)
# return
# 受API限制频繁发送会失败
# 拟合并发送满3条已发送超时时间10s
if len(content):
t=f"""
**时戳:**
{datetime.datetime.now()}
**信息:**
{content}"""
msgcache.append(t)
global pushisbusy
if pushisbusy:
Timer(10,s2wx).start()
return
pushisbusy=1
le = len(msgcache)
def send():
c= " \n".join(msgcache[:3])
try:
res = requests.post("http://8.137.110.19:4014/push/murmur", json={
"title": title,
"description": description,
"content": c,
"channel":"all-TT",
"token": "gjvL2gMZqj326isgafmz"})
logger.debug("msgpusher {}",res)
except:
logger.error("msgpusher error");
try:
res = requests.post("https://push.020824.xyz/push/murmur", json={
"title": title,
"description": description,
"content": c,
"channel":"all-TT",
"token": "gjvL2gMZqj326isgafmz"})
logger.debug("msgpusher {}",res)
except:
logger.error("msgpusher error");
# try:
# try:
# res = res.json()
# except:
# s="http://push.nmhd.eu.org:12722/push/murmur"
# # s="http://47.108.213.155:11722/push/murmur"
# res = requests.post(s, json={
# "title": title,
# "description": description,
# "content": c,
# "token": "tt"})
# print("nmhd",s)
# else:
# if res["success"]:
# return None
# else:
# return res["message"]
if le >2:
send()
msgcache.pop(0)
msgcache.pop(0)
msgcache.pop(0)
# Timer(10,s2wx).start()
elif (not len(content)) and le:
# 等效超时
send()
for i in range(le):
msgcache.pop(0)
# Timer(10,s2wx).start()
pushisbusy=0
Timer(10,s2wx).start()
# GET 方式
# res = requests.get(f"{SERVER}/push/{USERNAME}?title={title}"
# f"&description={description}&content={content}&token={TOKEN}")
# POST 方式
# res = requests.post("http://push.nmhd.eu.org:12722/push/murmur", json={
# "title": title,
# "description": description,
# "content": f"""
# *时戳:*
# _{datetime.datetime.now()}_
# *信息:*
# {content}""",
# "token": "tt"
# })
# try:
# res = res.json()
# except:
# return
# else:
# if res["success"]:
# return None
# else:
# return res["message"]
Timer(10,s2wx).start()
cache = defaultdict(list)
offs = 0 # 偏移量
# 保存终端信息ccid=地址
p = {}
# 地址 ==ccid
pp = {}
def add_timestamp():
logger.info("-------------------------↓ {} ↓ -------------------------",datetime.datetime.now() )
def save_cache(dic):
"""缓存未发送成功的消息
仅在目标终端未上线或发送失败时缓存
发送失败的一般原因是终端掉线但未及时更新在线列表"""
# f=open("./cache.txt",mode='w+')
# f.write(str(dic))
# f.close
pickle.dump(dic, open("./cache.txt", "wb"))
def load_cache():
"""程序启动时加载缓存"""
try:
# f=open("./cache.txt")
# c = eval(f.read())
# f.close()
c = pickle.load(open("./cache.txt", "rb"))
return c
except:
return None
def update_pairs(addr, ccid, clear=0,cnt=[0]):
"""更新终端库
addr:绑定tcp地址
ccid:终端号
clear:清空掉线终端
cnt:心跳包数量"""
if not clear:
# 添加或更新客户端
pp[addr] = ccid
p.clear
if ccid not in p:
# 新终端上线,推送在线消息
cnt[0] = 0
t = f"""*信息:*
天通终端上线。
CCID:{ccid.decode()}
地址:{str(addr.getpeername())}"""
logger.info(t)
s2wx("","天通终端上线",t)
# p[ccid] = addr
else:
del p[ccid]
del pp[addr]
# 打印在线终端
# for i in p:
# print(i, "<--->", p[i])
od= []
for i in pp:
p[pp[i]] = i
# od.append(f"{pp[i].decode()}<--->{i.getpeername()}")
# 可能客户端未断开导致多个连接指向相同的ccid故需更新后再遍历打印
for i in p:
od.append(f"{i.decode()}<--->{p[i].getpeername()}")
# print('\r\n'.join(zd))
clents = ' \n'+'\r\n'.join(od) if len(od) else ""
t=f"在线终端已更新,数量{len(od)}{clents}"
logger.info(t)
# cnt[0]是一个列表,它的默认值只会在函数定义时初始化一次,因此它的值在多次调用时保持不变
# cnt[0]为10的倍数
if cnt[0]%10 == 0:
s2wx("","",t)
cnt[0] += 1
def get_addr(ccid):
"""获取ccid对应的地址"""
try:
# 如果有直接返回
return p[ccid]
except Exception:
# 没有就返回None
return None
def get_ccid(addr):
try:
# 如果有直接返回
return pp[addr]
except Exception:
# 没有就返回None
logger.warning("终端未上报心跳注册ccid。")
return None
"""
+--------+--------------+----------+----------+--------------+------------------------+--------+
| ff | 长度 | 地址信息 | 信令 | 内容 | 校验位 |
+--------+--------------+----------+----------+--------------+------------------------+--------+
| | 2Bytes | 4Bytes | 心跳 | 0x01 | 时间戳32bit单位ms | 1Bytes |
+ $TTMS +--------------+----------+----------+--------------+------------------------+--------+
| | 2Bytes | 4Bytes | 传输数据 | 0xaa | 载荷 | 1Bytes |
+--------+--------------+----------+----------+--------------+------------------------+--------+
"""
# 回复ACK给终端使用命令0xAA载荷为四字节0xFFFFFFFF
def ack2client(addr,data):
msg = bytearray(data[:])
msg[11 + offs] = 0xAA
# 覆写32bit时戳为FF
msg[12 + offs:16 + offs] = int(0xFFFFFFFF).to_bytes(4,byteorder="big")
# 计算校验和
crc = 0
for i in msg[:-1]:
crc = crc ^ i
msg[-1] = crc
addr.send(msg)
# save2file(f"[SERVER --> {get_ccid(addr).decode() if get_ccid(addr) else 'None'}][{len(msg)}] {msg.hex(' ')}",0)
# logger.debug("服务器回复ACK。")
def tt_hh(addr, data):
"""处理心跳"""
# 更新pairs
ccid = data[7 + offs : 11 + offs]
update_pairs(addr, ccid)
# 组帧再返回
msg = bytearray(data[:])
msg[:5] = b"$TTMX"
val = "%04x" % int(time.time()) #取Unix时间戳以s为单位
tmp=[]
for i in range(0,len(val),2):
tmp.append(int(val[i:i+2],16))
msg[12:-1] = tmp
# 计算校验和
crc = 0
for i in msg[:-1]:
crc = crc ^ i
msg[-1] = crc
add_timestamp()
addr.send(msg)
save2file(f"[SERVER --> {get_ccid(addr).decode() if get_ccid(addr) else 'None'}][{len(msg)}] {msg.hex(' ')}",0)
logger.debug("服务器回复心跳包。")
ack2client(addr,msg)
# 从缓存中匹配到数据才重发
# 仅有新终端上线时才重发
if cache and ccid in cache:
msgs = cache[ccid][:] # 浅拷贝
sccid = msgs[0][7 + offs : 11 + offs]
print("", len(msgs), "包缓存数据待发。", sccid, "-->", ccid)
s2wx("","",f"{len(msgs)}包缓存数据待发。 \n{sccid} --> {ccid}")
for i in range(len(msgs)):
try:
add_timestamp()
# 按缓存先后顺序发送
addr.send(msgs[i])
save2file(f"[SERVER --> {get_ccid(addr).decode() if get_ccid(addr) else 'None'}][{len(msgs[i])}] {msgs[i].hex(' ')}")
time.sleep(1)
# 成功则清空已发送成功的缓存数据
cache[ccid].pop(0)
logger.debug(f"{i + 1} 包缓存数据发送完成。")
except:
# 异常时退出循环不用再尝试发送后续msgs
logger.error(f"{i + 1} 包发送失败,停止尝试")
s2wx("","",f"{i + 1}包发送失败,停止尝试。")
break
if cache[ccid] == []:
logger.info(f"缓存数据全部发送完成。 {sccid} --> {ccid}")
s2wx("","",f"缓存数据全部发送完成。 \n{sccid} --> {ccid}")
del cache[ccid]
save_cache(cache)
return 0
def check_valid(data):
# 检查数据有效性
return bytearray(data[:])
def get_info(data):
# print(type(data[12]))
o=0
def h2s(x):
# x= int.from_bytes(x)
return str(x)
pass
sn = "_".join(list(map(h2s,data[12:19])))
index=int.from_bytes(data[20:21],byteorder="big")
cnt=int.from_bytes(data[21:22],byteorder="big")
t=f"流水号20{sn} \n序号\[{index}/{cnt}]"
logger.info(t)
s2wx("","",t)
def tt_trans(addr, data):
"""发送数据到指定ccid"""
get_info(data)
sccid = get_ccid(addr)
if not sccid:
return -1
tccid = data[7 + offs : 11 + offs]
# taddr = get_addr(tccid)
# 组帧
msg = check_valid(data)
msg[:5] = b"$TTMX"
msg[7 + offs : 11 + offs] = sccid
# 计算校验和
crc = 0
for i in msg[:-1]:
crc = crc ^ i
msg[-1] = crc
if tccid not in p:
# 未找到匹配的在线终端
# 更新缓存
# {tccid1=[msg1,msg2,...],...}
cache[tccid].append(msg)
save_cache(cache)
t=f"目标终端{tccid.decode()}未在线。 \n数据已缓存,待目标终端上线后重发。"
logger.info(t)
s2wx("","",t)
return 0
taddr = get_addr(tccid)
logger.info(f"匹配在线终端为:{tccid} <---> {taddr}")
# 发送
try:
add_timestamp()
taddr.send(msg)
save2file(f"[SERVER --> {get_ccid(taddr).decode() if get_ccid(taddr) else 'None'}][{len(msg)}] {msg.hex(' ')}",0)
logger.info(f"数据发送成功。 {sccid} --> {tccid}")
s2wx("","",f"匹配到在线终端, 数据发送成功。 \n{sccid.decode()} ---> {tccid.decode()}")
except Exception:
# 发送失败
taddr.close()
# 更新pairs清空对应终端
update_pairs(taddr, tccid, 1)
# 更新缓存
# {tccid1=[msg1,msg2,...],...}
cache[tccid].append(msg)
save_cache(cache)
logger.error("发送失败,终端{}可能已掉线。数据已缓存,待目标终端上线后重发。", taddr)
s2wx("","",f"发送失败,终端{taddr}可能已掉线。 \n数据已缓存,待目标终端上线后重发。")
return 0
def isInlst(tar,lst):
"""
在列表中查找目标子列表
返回匹配的列表
"""
index = []
for x in range(len(lst)):
if lst[x:x+len(tar)] == tar :
index.append(x)
rst = []
for x in range(len(index)):
if x<len(index)-1:
rst.append(lst[index[x]:index[x+1]])
if x == len(index)-1:
rst.append(lst[index[-1]:])
return rst
# 便于拼合数据的缓存
recCache = {}
def data_split(addr):
# 按帧头分割长数据
h = b"$TTMS" # b'\x24\x54\x54\x4d\x53'
dlist = isInlst([0x24,0x54,0x54,0x4d,0x53],recCache[addr])
# print(dlist)
#校验每帧数据
for i in range(len(dlist)):
flen = (dlist[i][5]<<8)+dlist[i][6]
if flen != len(dlist[i]) :
logger.warning("{}数据帧不匹配,应收{}<-->实收{}",i+1,flen,len(dlist[i]))
# 清空缓存
recCache[addr].clear()
# 因为无帧尾,需添加帧是否完整的校验
if len(dlist) :
# 目前情形多为两包粘包,故最后一包肯定不完整
# logger.debug(bytes(dlist[-1]).hex(" "))
flen = (dlist[-1][5]<<8)+dlist[-1][6]
if flen <= len(dlist[-1]) :
recCache[addr].extend(dlist[-1][flen:])
for x in range(len(dlist[-1])-flen):
dlist[-1].pop()
else:
recCache[addr].extend(dlist[-1][:])
dlist.pop()
logger.debug(f"解析为 {len(dlist)} 包数据。")
if len(recCache[addr]):
logger.warning("缓存数据为{}",bytes(recCache[addr]).hex(" "))
return dlist[:]
def tt_decode(addr, data):
"""处理收到的数据
数据正常返回0
数据异常返回-1"""
# 追加至缓存末尾
if addr not in recCache:
recCache[addr]=[]
recCache[addr].extend(data)
flag = -1
dlist = data_split(addr)
if len(dlist)==0:
logger.warning("not enough data")
for data in dlist:
data = bytes(data)
cmd = data[11 + offs]
# 回退到if-elif而不用match因服务器默认版本3.8
if cmd == 0x01:
# 处理心跳
flag = tt_hh(addr, data)
elif cmd == 0xAA:
# 处理数据
if tt_trans(addr, data):
flag = -1
break
else:
flag = 0
else:
break
return flag
class MyServer(socketserver.BaseRequestHandler):
def setup(self):
self.request.settimeout(60*1.5)
self.err = " "
self.recDataBuff=[]
def handle(self): # 回调
add_timestamp()
t=f"TCP客户端{str(self.client_address)}已连接等待上报心跳注册ccid。"
logger.info("TCP客户端{}已连接等待上报心跳注册ccid。",str(self.client_address))
s2wx("天通消息","TCP客户端接入",t)
conn = self.request
# print(type(conn),conn.fd)
while True:
try:
data = conn.recv(1024000)
except socket.timeout:
self.err = "心跳超时。"
logger.warning("{self.client_address}超时。")
break
if not data:
break
add_timestamp()
self.recDataBuff.extend(data)
# logger.debug("[R] {}",data.hex(" "))
save2file(f"[{get_ccid(conn).decode() if get_ccid(conn) else 'None'} --> SERVER][{len(data)}] {data.hex(' ')}")
t = f"{str(self.client_address)}<->[{get_ccid(conn).decode() if get_ccid(conn) else 'None'}]接收到新数据,长度{len(data)}字节。"
logger.info(t)
logger.debug("[R] - {}",data.hex(" "))
# print(data.hex(" "))
# 心跳报文17字节避免推送心跳报文刷屏
if len(data) > 17:
s2wx("","",t)
# print("--",len(self.recDataBuff))
# print(self.recDataBuff[:])
if tt_decode(conn, data):
self.err= "切断。"
# logger.warning(f"{self.client_address}疑是非法连接")
# conn.close()
# break
def finish(self):
if self.request in pp:
t = f"""终端下线了。{self.err}
CCID:{pp[self.request].decode()}
地址:{str(self.client_address)}"""
update_pairs(self.request,get_ccid(self.request),1)
logger.info(t)
s2wx("","天通终端下线了",t)
recCache[self.request].clear()
# print("终端",self.client_address,"下线了。",self.err)
# update_pairs(self.request,get_ccid(self.request),1)
else:
t= f"TCP客户端{str(self.client_address)}已断开连接。{self.err}"
logger.info(t)
recCache[self.request].clear()
# s2wx("天通消息","TCP客户端断开",t)
if __name__ == "__main__":
_,t = gettm()
if not os.path.exists('./debug'):
os.mkdir('./debug')
logger.add('/'.join(['./debug',t,'/log_{time}.log']))
# load_cache
if load_cache():
cache = load_cache()
socketserver.TCPServer.allow_reuse_address = True
server = socketserver.ThreadingTCPServer(("", 7222), MyServer)
add_timestamp()
logger.info("服务端{}初始化成功。", server.server_address)
s2wx("天通消息","服务器上线","服务器端初始化完成,等待终端连接。")
server.timeout = 10
server.serve_forever()
"""
以下几种情况服务器会主动断开终端:
单次数据不以帧头开始
单次数据长度不足11
未上报心跳注册ccid而直接发数据
"""