tt-server/server.py

441 lines
12 KiB
Python
Raw Normal View History

2023-03-11 08:04:43 +00:00
#!/usr/bin/env python3
2023-03-11 13:25:34 +00:00
# coding:utf-8
2023-03-10 01:34:44 +00:00
import socketserver
import subprocess
2023-03-11 03:51:56 +00:00
import pickle
2023-03-13 07:27:36 +00:00
from collections import defaultdict
2023-03-13 09:20:43 +00:00
import datetime
import time
2023-03-18 06:53:09 +00:00
import requests
2023-03-19 05:32:33 +00:00
import socket
2023-03-18 06:53:09 +00:00
# import json
from threading import Timer
2023-03-18 06:53:09 +00:00
SERVER = "http://www.pushplus.plus/send"
TOKEN = "ff328cba923a4225bc4acd0086a9014c"
2023-03-18 09:06:09 +00:00
# SERVER = "http://push.nmhd.eu.org:12722/push/murmur"
2023-03-18 06:53:09 +00:00
# TOKEN = "tt"
2023-03-18 09:06:09 +00:00
def pp2wx(title,content,description):
"""通过pushplus发送至微信有频次限制.
2023-03-18 06:53:09 +00:00
title:标题
content:内容"""
ppmsg = {"token":TOKEN,
2023-03-18 09:06:09 +00:00
"template":"json",
"channel":"wechat",
"description": description,
}
2023-03-18 06:53:09 +00:00
ppmsg["title"] = title
ppmsg["content"] = content
# ppmsg["description"]= content
requests.post(SERVER, json=ppmsg)
2023-03-11 03:51:56 +00:00
msgcache=[]
def s2wx(title="",description="", content=""):
2023-03-18 09:06:09 +00:00
"""通过自建msgpusher发送至微信
当前title值无效
"""
2023-03-22 06:04:13 +00:00
# return
# 受API限制频繁发送会失败
# 拟合并发送满3条已发送超时时间10s
if len(content):
t=f"""
*时戳*
_{datetime.datetime.now()}_
*信息*
{content}"""
msgcache.append(t)
def send():
c= " \n".join(msgcache[:3])
requests.post("http://push.nmhd.eu.org:12722/push/murmur", json={
"title": title,
"description": description,
"content": c,
"token": "tt"})
if len(msgcache) >3:
send()
msgcache.pop(0)
msgcache.pop(0)
msgcache.pop(0)
t= Timer(10,s2wx).start()
elif (not len(content)) and len(msgcache):
# 等效超时
send()
msgcache.clear()
t= Timer(10,s2wx).start()
2023-03-18 09:06:09 +00:00
# GET 方式
# res = requests.get(f"{SERVER}/push/{USERNAME}?title={title}"
# f"&description={description}&content={content}&token={TOKEN}")
# POST 方式
# res = requests.post("http://push.nmhd.eu.org:12722/push/murmur", json={
# "title": title,
# "description": description,
# "content": f"""
# *时戳:*
# _{datetime.datetime.now()}_
# *信息:*
# {content}""",
# "token": "tt"
# })
# try:
# res = res.json()
# except:
# return
# else:
# if res["success"]:
# return None
# else:
# return res["message"]
t=Timer(10,s2wx).start()
2023-03-18 09:06:09 +00:00
2023-03-13 07:27:36 +00:00
cache = defaultdict(list)
2023-03-11 13:25:34 +00:00
offs = 0 # 偏移量
2023-03-13 07:27:36 +00:00
# 保存终端信息ccid=地址
2023-03-11 13:25:34 +00:00
p = {}
2023-03-13 07:27:36 +00:00
# 地址 ==ccid
2023-03-11 13:25:34 +00:00
pp = {}
2023-03-13 11:27:54 +00:00
2023-03-13 09:20:43 +00:00
def add_timestamp():
2023-03-19 02:27:02 +00:00
print("-------------------------↓",datetime.datetime.now(), "↓-------------------------")
2023-03-11 03:51:56 +00:00
2023-03-10 01:34:44 +00:00
2023-03-11 03:51:56 +00:00
def save_cache(dic):
2023-03-11 13:25:34 +00:00
"""缓存未发送成功的消息
仅在目标终端未上线或发送失败时缓存
发送失败的一般原因是终端掉线但未及时更新在线列表"""
2023-03-11 03:51:56 +00:00
# f=open("./cache.txt",mode='w+')
# f.write(str(dic))
# f.close
2023-03-11 13:25:34 +00:00
pickle.dump(dic, open("./cache.txt", "wb"))
2023-03-11 03:51:56 +00:00
def load_cache():
2023-03-11 13:25:34 +00:00
"""程序启动时加载缓存"""
2023-03-11 03:51:56 +00:00
try:
# f=open("./cache.txt")
# c = eval(f.read())
# f.close()
2023-03-11 13:25:34 +00:00
c = pickle.load(open("./cache.txt", "rb"))
2023-03-11 03:51:56 +00:00
return c
except:
return None
2023-03-10 09:58:19 +00:00
2023-03-11 13:25:34 +00:00
def update_pairs(addr, ccid, clear=0):
"""更新终端库
2023-03-10 09:58:19 +00:00
addr:绑定tcp地址
ccid:终端号
2023-03-11 13:25:34 +00:00
clear:清空掉线终端"""
2023-03-10 09:58:19 +00:00
if not clear:
2023-03-11 13:25:34 +00:00
# 添加或更新客户端
pp[addr] = ccid
p.clear
2023-03-18 06:53:09 +00:00
if ccid not in p:
2023-03-20 15:10:13 +00:00
t = f"""*信息:*
天通终端上线
2023-03-18 09:06:09 +00:00
CCID:{ccid.decode()}
地址:{str(addr.getpeername())}"""
print(t)
2023-03-19 09:48:50 +00:00
s2wx("","天通终端上线",t)
2023-03-20 15:10:13 +00:00
# p[ccid] = addr
2023-03-10 09:58:19 +00:00
else:
del p[ccid]
del pp[addr]
# 打印在线终端
2023-03-20 15:10:13 +00:00
# for i in p:
# print(i, "<--->", p[i])
od= []
2023-03-20 15:10:13 +00:00
for i in pp:
p[pp[i]] = i
# print(pp[i],"<--->",i)
od.append(f"{pp[i].decode()}<--->{i.getpeername()}")
# print('\r\n'.join(zd))
clents = ' \n'+'\r\n'.join(od) if len(od) else ""
print("在线终端已更新,数量",len(od),clents)
s2wx("","",f"在线终端已更新,数量{len(od)}{clents}")
2023-03-10 09:58:19 +00:00
2023-03-11 13:25:34 +00:00
2023-03-10 09:58:19 +00:00
def get_addr(ccid):
2023-03-11 13:25:34 +00:00
"""获取ccid对应的地址"""
2023-03-10 09:58:19 +00:00
2023-03-10 01:34:44 +00:00
try:
2023-03-10 09:58:19 +00:00
# 如果有直接返回
return p[ccid]
2023-03-10 01:34:44 +00:00
except Exception:
2023-03-11 13:25:34 +00:00
# 没有就返回None
2023-03-10 09:58:19 +00:00
return None
2023-03-11 13:25:34 +00:00
2023-03-10 09:58:19 +00:00
def get_ccid(addr):
try:
# 如果有直接返回
return pp[addr]
except Exception:
2023-03-11 13:25:34 +00:00
# 没有就返回None
print("终端未上报心跳注册ccid。")
2023-03-10 09:58:19 +00:00
return None
2023-03-11 13:25:34 +00:00
"""
2023-03-10 09:58:19 +00:00
+--------+--------------+----------+----------+--------------+------------------------+--------+
| ff | 长度 | 地址信息 | 信令 | 内容 | 校验位 |
+--------+--------------+----------+----------+--------------+------------------------+--------+
2023-03-13 09:20:43 +00:00
| | 2Bytes | 4Bytes | 心跳 | 0x01 | 时间戳32bit单位ms | 1Bytes |
2023-03-10 09:58:19 +00:00
+ $TTMS +--------------+----------+----------+--------------+------------------------+--------+
2023-03-13 09:20:43 +00:00
| | 2Bytes | 4Bytes | 传输数据 | 0xaa | 载荷 | 1Bytes |
2023-03-10 09:58:19 +00:00
+--------+--------------+----------+----------+--------------+------------------------+--------+
2023-03-11 13:25:34 +00:00
"""
2023-03-13 11:27:54 +00:00
2023-03-11 13:25:34 +00:00
def tt_hh(addr, data):
"""处理心跳"""
2023-03-10 09:58:19 +00:00
# 更新pairs
2023-03-11 13:25:34 +00:00
ccid = data[7 + offs : 11 + offs]
update_pairs(addr, ccid)
2023-03-10 09:58:19 +00:00
# 组帧再返回
2023-03-14 13:21:21 +00:00
msg = bytearray(data[:])
msg[:5] = b"$TTMX"
2023-03-14 14:04:51 +00:00
val = "%04x" % int(time.time())
tmp=[]
for i in range(0,len(val),2):
tmp.append(int(val[i:i+2],16))
msg[12:-1] = tmp
2023-03-13 09:20:43 +00:00
# 计算校验和
crc = 0
for i in msg[:-1]:
crc = crc ^ i
msg[-1] = crc
add_timestamp()
addr.send(msg)
2023-03-19 02:27:02 +00:00
print("服务器回复心跳包。")
2023-03-10 09:58:19 +00:00
2023-03-13 09:20:43 +00:00
# 从缓存中匹配到数据才重发
2023-03-11 08:04:43 +00:00
# 仅有新终端上线时才重发
2023-03-14 14:13:41 +00:00
if cache and ccid in cache:
2023-03-13 11:27:54 +00:00
msgs = cache[ccid][:] # 浅拷贝
sccid = msgs[0][7 + offs : 11 + offs]
print("", len(msgs), "包缓存数据待发。", sccid, "-->", ccid)
2023-03-20 15:10:13 +00:00
s2wx("","",f"{len(msgs)}包缓存数据待发。 \n{sccid} --> {ccid}")
2023-03-11 08:04:43 +00:00
for i in range(len(msgs)):
try:
2023-03-13 09:20:43 +00:00
add_timestamp()
# 按缓存先后顺序发送
2023-03-11 08:04:43 +00:00
addr.send(msgs[i])
2023-03-13 07:27:36 +00:00
# 成功则清空已发送成功的缓存数据
2023-03-13 02:11:29 +00:00
cache[ccid].pop(0)
2023-03-13 11:27:54 +00:00
print("", i + 1, "包缓存数据发送完成。")
2023-03-11 08:04:43 +00:00
except:
# 异常时退出循环不用再尝试发送后续msgs
2023-03-13 11:27:54 +00:00
print("", i + 1, "包发送失败,停止尝试")
2023-03-20 15:10:13 +00:00
s2wx("","",f"{i + 1}包发送失败,停止尝试。")
2023-03-11 08:04:43 +00:00
break
2023-03-13 02:11:29 +00:00
if cache[ccid] == []:
2023-03-13 11:27:54 +00:00
print("缓存数据全部发送完成。", sccid, "-->", ccid)
2023-03-20 15:10:13 +00:00
s2wx("","",f"缓存数据全部发送完成。 \n{sccid} --> {ccid}")
2023-03-13 02:11:29 +00:00
del cache[ccid]
save_cache(cache)
2023-03-10 09:58:19 +00:00
return 0
2023-03-13 11:27:54 +00:00
2023-03-13 09:20:43 +00:00
def check_valid(data):
# 检查数据有效性
return bytearray(data[:])
2023-03-10 09:58:19 +00:00
2023-03-22 03:26:46 +00:00
def get_info(data):
# print(type(data[12]))
o=0
def h2s(x):
# x= int.from_bytes(x)
return str(x)
pass
sn = "-".join(list(map(h2s,data[12:17])))
2023-03-22 06:04:13 +00:00
index=int.from_bytes(data[19:20],byteorder="big")
cnt=int.from_bytes(data[20:21],byteorder="big")
t=f"流水号20{sn} \n序号\[{index}/{cnt}]"
2023-03-22 03:26:46 +00:00
print(t)
s2wx("","",t)
2023-03-13 11:27:54 +00:00
2023-03-11 13:25:34 +00:00
def tt_trans(addr, data):
"""发送数据到指定ccid"""
2023-03-22 03:26:46 +00:00
get_info(data)
sccid = get_ccid(addr)
if not sccid:
return -1
2023-03-11 13:25:34 +00:00
tccid = data[7 + offs : 11 + offs]
2023-03-20 15:10:13 +00:00
# taddr = get_addr(tccid)
2023-03-10 09:58:19 +00:00
# 组帧
2023-03-13 09:20:43 +00:00
msg = check_valid(data)
2023-03-11 13:25:34 +00:00
msg[:5] = b"$TTMX"
msg[7 + offs : 11 + offs] = sccid
2023-03-10 09:58:19 +00:00
# 计算校验和
crc = 0
2023-03-11 03:51:56 +00:00
for i in msg[:-1]:
2023-03-10 09:58:19 +00:00
crc = crc ^ i
2023-03-11 13:25:34 +00:00
msg[-1] = crc
2023-03-10 09:58:19 +00:00
2023-03-20 15:10:13 +00:00
if tccid not in p:
# 未找到匹配的在线终端
2023-03-11 08:04:43 +00:00
2023-03-11 13:25:34 +00:00
# 更新缓存
# {tccid1=[msg1,msg2,...],...}
2023-03-11 03:51:56 +00:00
cache[tccid].append(msg)
save_cache(cache)
2023-03-13 11:27:54 +00:00
print("终端未在线或ccid", tccid, "错误。数据已缓存,待目标终端上线后重发。")
2023-03-20 15:10:13 +00:00
s2wx("","",f"终端未在线或ccid{tccid} 错误。 \n数据已缓存,待目标终端上线后重发。")
2023-03-11 03:51:56 +00:00
return 0
2023-03-20 15:10:13 +00:00
taddr = get_addr(tccid)
print("匹配在线终端为:", tccid, "<--->", taddr)
2023-03-11 03:51:56 +00:00
2023-03-10 09:58:19 +00:00
# 发送
try:
2023-03-13 09:20:43 +00:00
add_timestamp()
2023-03-10 09:58:19 +00:00
taddr.send(msg)
2023-03-13 11:27:54 +00:00
print("数据发送成功。", sccid, "-->", tccid)
2023-03-20 15:10:13 +00:00
s2wx("","",f"匹配到在线终端, 数据发送成功。 \n{sccid.decode()} ---> {tccid.decode()}")
2023-03-10 09:58:19 +00:00
except Exception:
2023-03-11 08:04:43 +00:00
# 发送失败
taddr.close()
2023-03-11 03:51:56 +00:00
# 更新pairs清空对应终端
2023-03-11 13:25:34 +00:00
update_pairs(taddr, tccid, 1)
# 更新缓存
# {tccid1=[msg1,msg2,...],...}
2023-03-11 03:51:56 +00:00
cache[tccid].append(msg)
save_cache(cache)
print("发送失败,终端可能已掉线。数据已缓存,待目标终端上线后重发。", taddr)
2023-03-20 15:10:13 +00:00
s2wx("","",f"发送失败,终端{taddr}可能已掉线。 \n数据已缓存,待目标终端上线后重发。")
2023-03-10 09:58:19 +00:00
return 0
2023-03-11 13:25:34 +00:00
def data_split(data):
# 按帧头分割长数据
2023-03-13 11:27:54 +00:00
h = b"$TTMS" # b'\x24\x54\x54\x4d\x53'
dlist = data.split(h)[1:]
for i in range(len(dlist)):
dlist[i] = h + dlist[i]
2023-03-13 11:27:54 +00:00
print("解析为", len(dlist), "包数据。")
return dlist
2023-03-10 09:58:19 +00:00
2023-03-11 13:25:34 +00:00
def tt_decode(addr, data):
"""处理收到的数据
数据正常返回0
2023-03-11 13:25:34 +00:00
数据异常返回-1"""
2023-03-10 09:58:19 +00:00
flag = -1
# 依据帧头和最小长度初判数据有效性
2023-03-13 09:20:43 +00:00
# 只要接收数据给的缓存足够大,即使是多条数据组成的长数据也默认帧格式是完整的,前几个字节必须是帧头
2023-03-13 11:27:54 +00:00
if data[:5] != b"$TTMS" or len(data) < 12:
2023-03-10 09:58:19 +00:00
return -1
2023-03-11 13:25:34 +00:00
2023-03-13 09:20:43 +00:00
dlist = data_split(data)
for data in dlist:
cmd = data[11 + offs]
# 回退到if-elif而不用match因服务器默认版本3.8
if cmd == 0x01:
# 处理心跳
flag = tt_hh(addr, data)
elif cmd == 0xAA:
# 处理数据
if tt_trans(addr, data):
flag = -1
break
else:
2023-03-13 11:27:54 +00:00
flag = 0
else:
break
return flag
2023-03-10 09:58:19 +00:00
2023-03-10 01:34:44 +00:00
class MyServer(socketserver.BaseRequestHandler):
2023-03-19 02:27:02 +00:00
def setup(self):
2023-03-19 05:32:33 +00:00
self.request.settimeout(60*2)
2023-03-22 03:26:46 +00:00
self.err = " "
2023-03-19 02:27:02 +00:00
2023-03-11 13:25:34 +00:00
def handle(self): # 回调
2023-03-13 09:20:43 +00:00
add_timestamp()
2023-03-19 02:27:02 +00:00
print("客户端", self.client_address, "已连接等待上报心跳注册ccid。")
2023-03-20 15:10:13 +00:00
s2wx("天通消息","TCP客户端接入",f"TCP客户端{str(self.client_address)}已连接等待上报心跳注册ccid。")
2023-03-10 01:34:44 +00:00
conn = self.request
2023-03-10 09:58:19 +00:00
# print(type(conn),conn.fd)
2023-03-11 13:25:34 +00:00
2023-03-10 01:34:44 +00:00
while True:
2023-03-19 05:32:33 +00:00
try:
data = conn.recv(1024000)
except socket.timeout:
2023-03-20 15:10:13 +00:00
self.err = "心跳超时。"
2023-03-19 05:32:33 +00:00
print(self.client_address, "超时。")
break
2023-03-19 09:48:50 +00:00
if not data:
break
add_timestamp()
2023-03-23 02:23:58 +00:00
t = f"接收到新数据,长度{len(data)}字节。"
print(t)
s2wx("","",t)
2023-03-19 09:48:50 +00:00
if tt_decode(conn, data):
self.err= "切断。"
print(self.client_address, "疑是非法连接")
# conn.close()
# break
2023-03-19 02:27:02 +00:00
def finish(self):
if self.request in pp:
2023-03-20 15:10:13 +00:00
t = f"""终端下线了。{self.err}
CCID:{pp[self.request].decode()}
地址:{str(self.client_address)}"""
print(t)
2023-03-19 02:27:02 +00:00
s2wx("","天通终端下线了",t)
# print("终端",self.client_address,"下线了。",self.err)
2023-03-19 02:27:02 +00:00
update_pairs(self.request,get_ccid(self.request),1)
else:
t= f"TCP客户端{str(self.client_address)}已断开连接。{self.err}"
print(t)
s2wx("天通消息","TCP客户端断开",t)
2023-03-19 02:27:02 +00:00
2023-03-10 09:58:19 +00:00
2023-03-10 01:34:44 +00:00
2023-03-11 13:25:34 +00:00
if __name__ == "__main__":
2023-03-11 03:51:56 +00:00
# load_cache
if load_cache():
cache = load_cache()
socketserver.TCPServer.allow_reuse_address = True
2023-03-12 05:32:32 +00:00
server = socketserver.ThreadingTCPServer(("", 7222), MyServer)
2023-03-13 09:20:43 +00:00
add_timestamp()
2023-03-13 11:27:54 +00:00
print("服务端初始化成功。", server.server_address)
2023-03-20 15:10:13 +00:00
s2wx("天通消息","服务器上线","服务器端初始化完成,等待终端连接。")
2023-03-19 02:27:02 +00:00
server.timeout = 10
2023-03-10 01:34:44 +00:00
server.serve_forever()
"""
以下几种情况服务器会主动断开终端
单次数据不以帧头开始
单次数据长度不足11
未上报心跳注册ccid而直接发数据
2023-03-13 11:27:54 +00:00
"""