#!/usr/bin/env python3 # coding:utf-8 import socketserver import subprocess import pickle from collections import defaultdict import datetime import time import requests import socket # import json from threading import Timer from loguru import logger import os def gettm(): """ 返回时戳和日期 """ t= datetime.datetime.now() d = str(t) return d,d[:10] def save2file(data,isR=1): """ 存储数据至文件 """ t,d = gettm() p = '/'.join(['./trlog',d]) if not os.path.exists(p): os.makedirs(p) l=f"{'[R]' if isR else '[T]'} - {t} --- {data}\n" f=open('/'.join(['./trlog',d,'trlog.txt']),'a') f.write(l) f.close SERVER = "http://www.pushplus.plus/send" TOKEN = "ff328cba923a4225bc4acd0086a9014c" # SERVER = "http://push.nmhd.eu.org:12722/push/murmur" # TOKEN = "tt" def pp2wx(title,content,description): """通过pushplus发送至微信,有频次限制. title:标题 content:内容""" ppmsg = {"token":TOKEN, "template":"json", "channel":"wechat", "description": description, } ppmsg["title"] = title ppmsg["content"] = content # ppmsg["description"]= content requests.post(SERVER, json=ppmsg) msgcache=[] pushisbusy=0 def s2wx(title="",description="", content=""): """通过自建msgpusher发送至微信 当前title值无效 """ # logger.debug(content) # return # 受API限制,频繁发送会失败 # 拟合并发送,满3条已发送,超时时间10s if len(content): t=f""" **时戳:** {datetime.datetime.now()} **信息:** {content}""" msgcache.append(t) global pushisbusy if pushisbusy: Timer(10,s2wx).start() return pushisbusy=1 le = len(msgcache) def send(): c= " \n".join(msgcache[:3]) try: res = requests.post("http://8.137.110.19:4014/push/murmur", json={ "title": title, "description": description, "content": c, "channel":"all-TT", "token": "gjvL2gMZqj326isgafmz"}) logger.debug("msgpusher {}",res) except: logger.error("msgpusher error"); try: res = requests.post("https://push.020824.xyz/push/murmur", json={ "title": title, "description": description, "content": c, "channel":"all-TT", "token": "gjvL2gMZqj326isgafmz"}) logger.debug("msgpusher {}",res) except: logger.error("msgpusher error"); # try: # try: # res = res.json() # except: # s="http://push.nmhd.eu.org:12722/push/murmur" # # s="http://47.108.213.155:11722/push/murmur" # res = requests.post(s, json={ # "title": title, # "description": description, # "content": c, # "token": "tt"}) # print("nmhd",s) # else: # if res["success"]: # return None # else: # return res["message"] if le >2: send() msgcache.pop(0) msgcache.pop(0) msgcache.pop(0) # Timer(10,s2wx).start() elif (not len(content)) and le: # 等效超时 send() for i in range(le): msgcache.pop(0) # Timer(10,s2wx).start() pushisbusy=0 Timer(10,s2wx).start() # GET 方式 # res = requests.get(f"{SERVER}/push/{USERNAME}?title={title}" # f"&description={description}&content={content}&token={TOKEN}") # POST 方式 # res = requests.post("http://push.nmhd.eu.org:12722/push/murmur", json={ # "title": title, # "description": description, # "content": f""" # *时戳:* # _{datetime.datetime.now()}_ # *信息:* # {content}""", # "token": "tt" # }) # try: # res = res.json() # except: # return # else: # if res["success"]: # return None # else: # return res["message"] Timer(10,s2wx).start() cache = defaultdict(list) offs = 0 # 偏移量 # 保存终端信息,ccid=地址 p = {} # 地址 ==ccid pp = {} def add_timestamp(): logger.info("-------------------------↓ {} ↓ -------------------------",datetime.datetime.now() ) def save_cache(dic): """缓存未发送成功的消息 仅在目标终端未上线或发送失败时缓存 发送失败的一般原因是终端掉线但未及时更新在线列表""" # f=open("./cache.txt",mode='w+') # f.write(str(dic)) # f.close pickle.dump(dic, open("./cache.txt", "wb")) def load_cache(): """程序启动时加载缓存""" try: # f=open("./cache.txt") # c = eval(f.read()) # f.close() c = pickle.load(open("./cache.txt", "rb")) return c except: return None def update_pairs(addr, ccid, clear=0,cnt=[0]): """更新终端库 addr:绑定tcp地址 ccid:终端号 clear:清空掉线终端 cnt:心跳包数量""" if not clear: # 添加或更新客户端 pp[addr] = ccid p.clear if ccid not in p: # 新终端上线,推送在线消息 cnt[0] = 0 t = f"""*信息:* 天通终端上线。 CCID:{ccid.decode()} 地址:{str(addr.getpeername())}""" logger.info(t) s2wx("","天通终端上线",t) # p[ccid] = addr else: del p[ccid] del pp[addr] # 打印在线终端 # for i in p: # print(i, "<--->", p[i]) od= [] for i in pp: p[pp[i]] = i # od.append(f"{pp[i].decode()}<--->{i.getpeername()}") # 可能客户端未断开,导致多个连接指向相同的ccid,故需更新后再遍历打印 for i in p: od.append(f"{i.decode()}<--->{p[i].getpeername()}") # print('\r\n'.join(zd)) clents = ' \n'+'\r\n'.join(od) if len(od) else "" t=f"在线终端已更新,数量{len(od)}。{clents}" logger.info(t) # cnt[0]是一个列表,它的默认值只会在函数定义时初始化一次,因此它的值在多次调用时保持不变 # cnt[0]为10的倍数 if cnt[0]%10 == 0: s2wx("","",t) cnt[0] += 1 def get_addr(ccid): """获取ccid对应的地址""" try: # 如果有直接返回 return p[ccid] except Exception: # 没有就返回None return None def get_ccid(addr): try: # 如果有直接返回 return pp[addr] except Exception: # 没有就返回None logger.warning("终端未上报心跳注册ccid。") return None """ +--------+--------------+----------+----------+--------------+------------------------+--------+ | ff | 长度 | 地址信息 | 信令 | 内容 | 校验位 | +--------+--------------+----------+----------+--------------+------------------------+--------+ | | 2Bytes | 4Bytes | 心跳 | 0x01 | 时间戳32bit(单位ms) | 1Bytes | + $TTMS +--------------+----------+----------+--------------+------------------------+--------+ | | 2Bytes | 4Bytes | 传输数据 | 0xaa | 载荷 | 1Bytes | +--------+--------------+----------+----------+--------------+------------------------+--------+ """ # 回复ACK给终端,使用命令0xAA,载荷为四字节0xFFFFFFFF def ack2client(addr,data): msg = bytearray(data[:]) msg[11 + offs] = 0xAA # 覆写32bit时戳为FF msg[12 + offs:16 + offs] = int(0xFFFFFFFF).to_bytes(4,byteorder="big") # 计算校验和 crc = 0 for i in msg[:-1]: crc = crc ^ i msg[-1] = crc addr.send(msg) # save2file(f"[SERVER --> {get_ccid(addr).decode() if get_ccid(addr) else 'None'}][{len(msg)}] {msg.hex(' ')}",0) # logger.debug("服务器回复ACK。") def tt_hh(addr, data): """处理心跳""" # 更新pairs ccid = data[7 + offs : 11 + offs] update_pairs(addr, ccid) # 组帧再返回 msg = bytearray(data[:]) msg[:5] = b"$TTMX" val = "%04x" % int(time.time()) #取Unix时间戳,以s为单位 tmp=[] for i in range(0,len(val),2): tmp.append(int(val[i:i+2],16)) msg[12:-1] = tmp # 计算校验和 crc = 0 for i in msg[:-1]: crc = crc ^ i msg[-1] = crc add_timestamp() addr.send(msg) save2file(f"[SERVER --> {get_ccid(addr).decode() if get_ccid(addr) else 'None'}][{len(msg)}] {msg.hex(' ')}",0) logger.debug("服务器回复心跳包。") ack2client(addr,msg) # 从缓存中匹配到数据才重发 # 仅有新终端上线时才重发 if cache and ccid in cache: msgs = cache[ccid][:] # 浅拷贝 sccid = msgs[0][7 + offs : 11 + offs] print("有", len(msgs), "包缓存数据待发。", sccid, "-->", ccid) s2wx("","",f"有{len(msgs)}包缓存数据待发。 \n{sccid} --> {ccid}") for i in range(len(msgs)): try: add_timestamp() # 按缓存先后顺序发送 addr.send(msgs[i]) save2file(f"[SERVER --> {get_ccid(addr).decode() if get_ccid(addr) else 'None'}][{len(msgs[i])}] {msgs[i].hex(' ')}") time.sleep(1) # 成功则清空已发送成功的缓存数据 cache[ccid].pop(0) logger.debug(f"第 {i + 1} 包缓存数据发送完成。") except: # 异常时退出循环,不用再尝试发送后续msgs logger.error(f"第 {i + 1} 包发送失败,停止尝试") s2wx("","",f"第{i + 1}包发送失败,停止尝试。") break if cache[ccid] == []: logger.info(f"缓存数据全部发送完成。 {sccid} --> {ccid}") s2wx("","",f"缓存数据全部发送完成。 \n{sccid} --> {ccid}") del cache[ccid] save_cache(cache) return 0 def check_valid(data): # 检查数据有效性 return bytearray(data[:]) def get_info(data): # print(type(data[12])) o=0 def h2s(x): # x= int.from_bytes(x) return str(x) pass sn = "_".join(list(map(h2s,data[12:19]))) index=int.from_bytes(data[20:21],byteorder="big") cnt=int.from_bytes(data[21:22],byteorder="big") t=f"流水号20{sn} \n序号\[{index}/{cnt}]" logger.info(t) s2wx("","",t) def tt_trans(addr, data): """发送数据到指定ccid""" get_info(data) sccid = get_ccid(addr) if not sccid: return -1 tccid = data[7 + offs : 11 + offs] # taddr = get_addr(tccid) # 组帧 msg = check_valid(data) msg[:5] = b"$TTMX" msg[7 + offs : 11 + offs] = sccid # 计算校验和 crc = 0 for i in msg[:-1]: crc = crc ^ i msg[-1] = crc if tccid not in p: # 未找到匹配的在线终端 # 更新缓存 # {tccid1=[msg1,msg2,...],...} cache[tccid].append(msg) save_cache(cache) t=f"目标终端{tccid.decode()}未在线。 \n数据已缓存,待目标终端上线后重发。" logger.info(t) s2wx("","",t) return 0 taddr = get_addr(tccid) logger.info(f"匹配在线终端为:{tccid} <---> {taddr}") # 发送 try: add_timestamp() taddr.send(msg) save2file(f"[SERVER --> {get_ccid(taddr).decode() if get_ccid(taddr) else 'None'}][{len(msg)}] {msg.hex(' ')}",0) logger.info(f"数据发送成功。 {sccid} --> {tccid}") s2wx("","",f"匹配到在线终端, 数据发送成功。 \n{sccid.decode()} ---> {tccid.decode()}") except Exception: # 发送失败 taddr.close() # 更新pairs,清空对应终端 update_pairs(taddr, tccid, 1) # 更新缓存 # {tccid1=[msg1,msg2,...],...} cache[tccid].append(msg) save_cache(cache) logger.error("发送失败,终端{}可能已掉线。数据已缓存,待目标终端上线后重发。", taddr) s2wx("","",f"发送失败,终端{taddr}可能已掉线。 \n数据已缓存,待目标终端上线后重发。") return 0 def isInlst(tar,lst): """ 在列表中查找目标子列表 返回匹配的列表 """ index = [] for x in range(len(lst)): if lst[x:x+len(tar)] == tar : index.append(x) rst = [] for x in range(len(index)): if x实收{}",i+1,flen,len(dlist[i])) # 清空缓存 recCache[addr].clear() # 因为无帧尾,需添加帧是否完整的校验 if len(dlist) : # 目前情形多为两包粘包,故最后一包肯定不完整 # logger.debug(bytes(dlist[-1]).hex(" ")) flen = (dlist[-1][5]<<8)+dlist[-1][6] if flen <= len(dlist[-1]) : recCache[addr].extend(dlist[-1][flen:]) for x in range(len(dlist[-1])-flen): dlist[-1].pop() else: recCache[addr].extend(dlist[-1][:]) dlist.pop() logger.debug(f"解析为 {len(dlist)} 包数据。") if len(recCache[addr]): logger.warning("缓存数据为{}",bytes(recCache[addr]).hex(" ")) return dlist[:] def tt_decode(addr, data): """处理收到的数据 数据正常返回0 数据异常返回-1""" # 追加至缓存末尾 if addr not in recCache: recCache[addr]=[] recCache[addr].extend(data) flag = -1 dlist = data_split(addr) if len(dlist)==0: logger.warning("not enough data") for data in dlist: data = bytes(data) cmd = data[11 + offs] # 回退到if-elif而不用match,因服务器默认版本3.8 if cmd == 0x01: # 处理心跳 flag = tt_hh(addr, data) elif cmd == 0xAA: # 处理数据 if tt_trans(addr, data): flag = -1 break else: flag = 0 else: break return flag class MyServer(socketserver.BaseRequestHandler): def setup(self): self.request.settimeout(60*1.5) self.err = " " self.recDataBuff=[] def handle(self): # 回调 add_timestamp() t=f"TCP客户端{str(self.client_address)}已连接,等待上报心跳注册ccid。" logger.info("TCP客户端{}已连接,等待上报心跳注册ccid。",str(self.client_address)) s2wx("天通消息","TCP客户端接入",t) conn = self.request # print(type(conn),conn.fd) while True: try: data = conn.recv(1024000) except socket.timeout: self.err = "心跳超时。" logger.warning("{self.client_address}超时。") break if not data: break add_timestamp() self.recDataBuff.extend(data) # logger.debug("[R] {}",data.hex(" ")) save2file(f"[{get_ccid(conn).decode() if get_ccid(conn) else 'None'} --> SERVER][{len(data)}] {data.hex(' ')}") t = f"从{str(self.client_address)}<->[{get_ccid(conn).decode() if get_ccid(conn) else 'None'}]接收到新数据,长度{len(data)}字节。" logger.info(t) logger.debug("[R] - {}",data.hex(" ")) # print(data.hex(" ")) # 心跳报文17字节,避免推送心跳报文刷屏 if len(data) > 17: s2wx("","",t) # print("--",len(self.recDataBuff)) # print(self.recDataBuff[:]) if tt_decode(conn, data): self.err= "切断。" # logger.warning(f"{self.client_address}疑是非法连接") # conn.close() # break def finish(self): if self.request in pp: t = f"""终端下线了。{self.err} CCID:{pp[self.request].decode()} 地址:{str(self.client_address)}""" update_pairs(self.request,get_ccid(self.request),1) logger.info(t) s2wx("","天通终端下线了",t) recCache[self.request].clear() # print("终端",self.client_address,"下线了。",self.err) # update_pairs(self.request,get_ccid(self.request),1) else: t= f"TCP客户端{str(self.client_address)}已断开连接。{self.err}" logger.info(t) recCache[self.request].clear() # s2wx("天通消息","TCP客户端断开",t) if __name__ == "__main__": _,t = gettm() if not os.path.exists('./debug'): os.mkdir('./debug') logger.add('/'.join(['./debug',t,'/log_{time}.log'])) # load_cache if load_cache(): cache = load_cache() socketserver.TCPServer.allow_reuse_address = True server = socketserver.ThreadingTCPServer(("", 7222), MyServer) add_timestamp() logger.info("服务端{}初始化成功。", server.server_address) s2wx("天通消息","服务器上线","服务器端初始化完成,等待终端连接。") server.timeout = 10 server.serve_forever() """ 以下几种情况服务器会主动断开终端: 单次数据不以帧头开始 单次数据长度不足11 未上报心跳(注册ccid)而直接发数据 """