#!/usr/bin/env python3 # coding:utf-8 import socketserver import subprocess import pickle from collections import defaultdict import datetime import time import requests import socket # import json SERVER = "http://www.pushplus.plus/send" TOKEN = "ff328cba923a4225bc4acd0086a9014c" # SERVER = "http://push.nmhd.eu.org:12722/push/murmur" # TOKEN = "tt" def pp2wx(title,content,description): """通过pushplus发送至微信,有频次限制. title:标题 content:内容""" ppmsg = {"token":TOKEN, "template":"json", "channel":"wechat", "description": description, } ppmsg["title"] = title ppmsg["content"] = content # ppmsg["description"]= content requests.post(SERVER, json=ppmsg) def s2wx(title,description, content): """通过自建msgpusher发送至微信 当前title值无效 """ # return # GET 方式 # res = requests.get(f"{SERVER}/push/{USERNAME}?title={title}" # f"&description={description}&content={content}&token={TOKEN}") # POST 方式 res = requests.post("http://push.nmhd.eu.org:12722/push/murmur", json={ "title": title, "description": description, "content": f""" ### 时间戳: {datetime.datetime.now()} {content}""", "token": "tt" }) res = res.json() if res["success"]: return None else: return res["message"] cache = defaultdict(list) offs = 0 # 偏移量 # 保存终端信息,ccid=地址 p = {} # 地址 ==ccid pp = {} def add_timestamp(): print("-------------------------↓",datetime.datetime.now(), "↓-------------------------") def save_cache(dic): """缓存未发送成功的消息 仅在目标终端未上线或发送失败时缓存 发送失败的一般原因是终端掉线但未及时更新在线列表""" # f=open("./cache.txt",mode='w+') # f.write(str(dic)) # f.close pickle.dump(dic, open("./cache.txt", "wb")) def load_cache(): """程序启动时加载缓存""" try: # f=open("./cache.txt") # c = eval(f.read()) # f.close() c = pickle.load(open("./cache.txt", "rb")) return c except: return None def update_pairs(addr, ccid, clear=0): """更新终端库 addr:绑定tcp地址 ccid:终端号 clear:清空掉线终端""" if ccid and not clear: # 添加或更新客户端 if ccid not in p: t = f""" ### **终端信息:** CCID:{ccid.decode()} 地址:{str(addr.getpeername())} """ s2wx("","天通终端上线了",t) p[ccid] = addr pp[addr] = ccid else: del p[ccid] del pp[addr] print("在线终端已更新。") # 打印在线终端 for i in p: print(i, "<--->", p[i]) # for i in pp: # print(i,"-",pp[i]) def get_addr(ccid): """获取ccid对应的地址""" try: # 如果有直接返回 return p[ccid] except Exception: # 没有就返回None return None def get_ccid(addr): try: # 如果有直接返回 return pp[addr] except Exception: # 没有就返回None print("终端未上报心跳注册ccid。") return None """ +--------+--------------+----------+----------+--------------+------------------------+--------+ | ff | 长度 | 地址信息 | 信令 | 内容 | 校验位 | +--------+--------------+----------+----------+--------------+------------------------+--------+ | | 2Bytes | 4Bytes | 心跳 | 0x01 | 时间戳32bit(单位ms) | 1Bytes | + $TTMS +--------------+----------+----------+--------------+------------------------+--------+ | | 2Bytes | 4Bytes | 传输数据 | 0xaa | 载荷 | 1Bytes | +--------+--------------+----------+----------+--------------+------------------------+--------+ """ def tt_hh(addr, data): """处理心跳""" # 更新pairs ccid = data[7 + offs : 11 + offs] update_pairs(addr, ccid) # 组帧再返回 msg = bytearray(data[:]) val = "%04x" % int(time.time()) tmp=[] for i in range(0,len(val),2): tmp.append(int(val[i:i+2],16)) msg[12:-1] = tmp # 计算校验和 crc = 0 for i in msg[:-1]: crc = crc ^ i msg[-1] = crc add_timestamp() addr.send(msg) print("服务器回复心跳包。") # 从缓存中匹配到数据才重发 # 仅有新终端上线时才重发 if cache and ccid in cache: msgs = cache[ccid][:] # 浅拷贝 sccid = msgs[0][7 + offs : 11 + offs] print("有", len(msgs), "包缓存数据待发。", sccid, "-->", ccid) for i in range(len(msgs)): try: add_timestamp() # 按缓存先后顺序发送 addr.send(msgs[i]) # 成功则清空已发送成功的缓存数据 cache[ccid].pop(0) print("第", i + 1, "包缓存数据发送完成。") except: # 异常时退出循环,不用再尝试发送后续msgs print("第", i + 1, "包发送失败,停止尝试") break if cache[ccid] == []: print("缓存数据全部发送完成。", sccid, "-->", ccid) del cache[ccid] save_cache(cache) return 0 def check_valid(data): # 检查数据有效性 return bytearray(data[:]) def tt_trans(addr, data): """发送数据到指定ccid""" sccid = get_ccid(addr) if not sccid: return -1 tccid = data[7 + offs : 11 + offs] taddr = get_addr(tccid) # 组帧 msg = check_valid(data) msg[:5] = b"$TTMX" msg[7 + offs : 11 + offs] = sccid # 计算校验和 crc = 0 for i in msg[:-1]: crc = crc ^ i msg[-1] = crc if not taddr: # 未找到匹配的在线终端 # 更新缓存 # {tccid1=[msg1,msg2,...],...} cache[tccid].append(msg) save_cache(cache) print("终端未在线或ccid", tccid, "错误。数据已缓存,待目标终端上线后重发。") return 0 print("匹配在线终端为:", tccid, "<--->", taddr) # 发送 try: add_timestamp() taddr.send(msg) print("数据发送成功。", sccid, "-->", tccid) except Exception: # 发送失败 taddr.close() # 更新pairs,清空对应终端 update_pairs(taddr, tccid, 1) # 更新缓存 # {tccid1=[msg1,msg2,...],...} cache[tccid].append(msg) save_cache(cache) print("发送失败,终端可能已掉线。数据已缓存,待目标终端上线后重发。", taddr) return 0 def data_split(data): # 按帧头分割长数据 h = b"$TTMS" # b'\x24\x54\x54\x4d\x53' dlist = data.split(h)[1:] for i in range(len(dlist)): dlist[i] = h + dlist[i] print("解析为", len(dlist), "包数据。") return dlist def tt_decode(addr, data): """处理收到的数据 数据正常返回0 数据异常返回-1""" flag = -1 # 依据帧头和最小长度初判数据有效性 # 只要接收数据给的缓存足够大,即使是多条数据组成的长数据也默认帧格式是完整的,前几个字节必须是帧头 if data[:5] != b"$TTMS" or len(data) < 12: return -1 dlist = data_split(data) for data in dlist: cmd = data[11 + offs] # 回退到if-elif而不用match,因服务器默认版本3.8 if cmd == 0x01: # 处理心跳 flag = tt_hh(addr, data) elif cmd == 0xAA: # 处理数据 if tt_trans(addr, data): flag = -1 break else: flag = 0 else: break return flag class MyServer(socketserver.BaseRequestHandler): def setup(self): self.request.settimeout(60*2) def handle(self): # 回调 add_timestamp() print("客户端", self.client_address, "已连接,等待上报心跳注册ccid。") s2wx("天通消息","TCP客户端接入",str(self.client_address) + "已连接,等待上报心跳注册ccid。") conn = self.request # print(type(conn),conn.fd) while True: try: data = conn.recv(1024000) except socket.timeout: print(self.client_address, "超时。") break if not data: break add_timestamp() print( "接收到新数据", self.client_address, ",长度", len(data), ":\r\n", data.hex(" ") ) if tt_decode(conn, data): conn.close() print(self.client_address, "疑是非法连接,已切断。") break def finish(self): if self.request in pp: t = f""" ### **终端信息:** CCID:{get_ccid(self.request)} 地址:{str(self.client_address)} """ s2wx("","天通终端下线了",t) print("终端",self.client_address,"下线了。") update_pairs(self.request,get_ccid(self.request),1) else: print("客户端",self.client_address,"已断开。") s2wx("天通消息","TCP客户端断开",str(self.client_address) + "已断开连接。") if __name__ == "__main__": # load_cache cache = load_cache() server = socketserver.ThreadingTCPServer(("", 7222), MyServer) add_timestamp() print("服务端初始化成功。", server.server_address) server.timeout = 10 server.serve_forever() """ 以下几种情况服务器会主动断开终端: 单次数据不以帧头开始 单次数据长度不足11 未上报心跳(注册ccid)而直接发数据 """