优化添加待发送文件的逻辑

优化发送文件逻辑
添加数据重发功能
修复网口接收数据不缓存的bug
This commit is contained in:
CSSC-WORK\murmur 2023-08-24 11:06:44 +08:00
parent f22b5de75f
commit f30132d825
9 changed files with 138 additions and 115 deletions

View File

@ -37,41 +37,13 @@ static void clearLock()
} }
extern rt_sem_t cfgUpdate; extern rt_sem_t cfgUpdate;
int get_cfg(const char *k); int get_cfg(const char *k);
static struct rt_messagequeue upfilelist;
typedef struct typedef struct
{ {
char fname[60]; char fname[60];
uint8_t index; uint8_t index;
}FILE_INFO; }FILE_INFO;
static uint8_t msg_pool[512] ;
void addToList_thread_entry(void *parameter);
static void iniUFMsg(void)
{
/* 初始化消息队列 */
rt_mq_init(&upfilelist, "uplist",
msg_pool, /* 存放消息的缓冲区 */
sizeof(FILE_INFO), /* 一条消息的最大长度 */
sizeof(msg_pool), /* 存放消息的缓冲区大小 */
RT_IPC_FLAG_FIFO); /* 如果有多个线程等待,按照先来先得到的方法分配消息 */
/* 创建 serial 线程 */
rt_thread_t thread = rt_thread_create("filelist", addToList_thread_entry, RT_NULL, 1024*3, 30-2, 10);
/* 创建成功则启动线程 */
if (thread != RT_NULL)
{
rt_thread_startup(thread);
}
else
{
LOG_E("thread 'updatelist' create failure.");
}
}
/* 导出到自动初始化 */
INIT_APP_EXPORT(iniUFMsg);
/** /**
* config项 * config项
* @param s * @param s
@ -81,7 +53,7 @@ INIT_APP_EXPORT(iniUFMsg);
*/ */
int set_cfg(const char *k, long v) int set_cfg(const char *k, long v)
{ {
return; // return;
// setLock(); // setLock();
// if (rt_strcmp(v,"NULL") == 0) {//delete key // if (rt_strcmp(v,"NULL") == 0) {//delete key
@ -341,28 +313,53 @@ static void nclearLock()
{ {
nislock=0; nislock=0;
} }
/**
*
* @param f static int setFileToSend_thread_entry(void *parameter)
* @param v
* @return
*/
int setFileToSend(const char *f, int v)
{ {
FILE_INFO *msg;
msg=(FILE_INFO * )parameter;
nsetLock(); nsetLock();
int rst = ini_putl(SECTION_TO_SEND, f, v, FILE_TO_SEND); int rst = ini_putl(SECTION_TO_SEND, msg->fname, msg->index, FILE_TO_SEND);
if (!rst) { if (!rst) {
LOG_E("add file to send error."); LOG_E("add file to send error.");
nclearLock(); nclearLock();
return RT_ERROR; // return RT_ERROR;
} }
LOG_D("add %s-%d to list.",f,v); LOG_D("add %s-%d to list.",msg->fname,msg->index);
nclearLock(); nclearLock();
return RT_EOK; fileIsReady();
// list_thread();
// return RT_EOK;
} }
/** /**
* *
* @param fin
* @param index
*/
void postFileInfo(const char *fin, uint8_t index)
{
static FILE_INFO msg;
strcpy(msg.fname,fin);
msg.index=index;
/* 创建 serial 线程 */
rt_thread_t thread = rt_thread_create("filelist", setFileToSend_thread_entry, (void *)&msg, 1024*2, 30-2, 10);
/* 创建成功则启动线程 */
if (thread != RT_NULL)
{
rt_thread_startup(thread);
}
else
{
LOG_E("thread 'updatelist' create failure.");
}
}
/**
* 5
* @param kstr * @param kstr
* @param v 0 * @param v 0
* @return * @return
@ -418,35 +415,16 @@ static void gf()
static void add(int argc, char **argv) static void add(int argc, char **argv)
{ {
setFileToSend(argv[1],atoi(argv[2])); postFileInfo(argv[1],atoi(argv[2]));
gf();
clearFileToSend(argv[1]);
gf(); gf();
// clearFileToSend(argv[1]);
// gf();
} }
void addToList_thread_entry(void *parameter)
{
FILE_INFO msg;
while(1)
{
rt_memset(&msg, 0, sizeof(msg));
if (rt_mq_recv(&upfilelist, &msg, sizeof(msg), RT_WAITING_FOREVER) == RT_EOK) {
LOG_D("get %s",msg.fname);
setFileToSend(msg.fname,msg.index);
fileIsReady();
}
}
}
void postFileInfo(const char *fin, uint8_t index)
{
// setFileToSend(fin,0);
FILE_INFO msg;
strcpy(msg.fname,fin);
msg.index=index;
rt_mq_send(&upfilelist, &msg, sizeof(msg));
}
/** /**
* *

View File

@ -41,6 +41,6 @@ int add_val(const char *k);
long get_val(const char *k); long get_val(const char *k);
int set_val(const char *k, long v); int set_val(const char *k, long v);
int setFileToSend(const char *f, int v); //int setFileToSend(const char *f, int v);
size_t getFilesToSend(char (*kstr)[MAX_KEY_LEN], int *v); size_t getFilesToSend(char (*kstr)[MAX_KEY_LEN], int *v);
int clearFileToSend(const char *k); int clearFileToSend(const char *k);

View File

@ -88,10 +88,10 @@ static void updatecfg(void)
void sysSemInit() void sysSemInit()
{ {
okTosend = rt_sem_create("okTosend", 0, RT_IPC_FLAG_PRIO); okTosend = rt_sem_create("okTosend", 0, RT_IPC_FLAG_PRIO);//同一时间仅一个线程发送文件
cfgUpdate = rt_sem_create("cfgUpdate", 0, RT_IPC_FLAG_PRIO); cfgUpdate = rt_sem_create("cfgUpdate", 0, RT_IPC_FLAG_PRIO);//更新cfg
shuntDownTT = rt_sem_create("shuntDNTT", 0, RT_IPC_FLAG_PRIO); shuntDownTT = rt_sem_create("shuntDNTT", 0, RT_IPC_FLAG_PRIO);//关闭TT
rt_event_init(&chkSta, "chkSta", RT_IPC_FLAG_PRIO); rt_event_init(&chkSta, "chkSta", RT_IPC_FLAG_PRIO);//检查天通状态
rt_sem_release(cfgUpdate); //上电更新值 rt_sem_release(cfgUpdate); //上电更新值
updatecfg(); updatecfg();
@ -181,7 +181,7 @@ static void upSendFile_thread_entry(void *parameter)
static rt_uint8_t d[BUFFER_ROW][200] = { };//need static? static rt_uint8_t d[BUFFER_ROW][200] = { };//need static?
static rt_uint8_t s[BUFFER_ROW] = { }; static rt_uint8_t s[BUFFER_ROW] = { };
if (getFileSize(f->fname) > scfg.maxSizePerFile+200) { if (getFileSize(f->fname) > scfg.maxSizePerFile+200) {//部分demo数据体积>>1k
LOG_W("file '%s' is too large to send.",f->fname); LOG_W("file '%s' is too large to send.",f->fname);
clearFileToSend(f->fname); clearFileToSend(f->fname);
rt_sem_release(okTosend); rt_sem_release(okTosend);
@ -193,7 +193,7 @@ static void upSendFile_thread_entry(void *parameter)
LOG_HEX("sendFile",27,d[i],s[i]); LOG_HEX("sendFile",27,d[i],s[i]);
} }
// list_thread(); // list_thread();
if (len)//部分demo数据体积>>1k if (len)
{ {
LOG_D("%d pack(s) to send", f->index ? 1 : len); LOG_D("%d pack(s) to send", f->index ? 1 : len);
for (rt_uint8_t var = 0; var < len; var++) for (rt_uint8_t var = 0; var < len; var++)
@ -206,7 +206,7 @@ static void upSendFile_thread_entry(void *parameter)
rt_thread_mdelay(3000);//发送间隔目前服务器未处理暂设为3s避免粘包 rt_thread_mdelay(3000);//发送间隔目前服务器未处理暂设为3s避免粘包
} }
} }
LOG_I("upSendFile done."); LOG_I("upSendFile '%s' done.",f->fname);
clearFileToSend(f->fname); clearFileToSend(f->fname);
// list_thread(); // list_thread();
@ -255,7 +255,7 @@ void d_upSendFile(int argc, char **argv)
void getAndSendFile() void getAndSendFile()
{ {
static int index[MAX_KEY_LEN]; static int index[MAX_KEY_LEN];
static char f[10][MAX_KEY_LEN]; static char f[5][MAX_KEY_LEN];
int cnt = getFilesToSend(f, index); int cnt = getFilesToSend(f, index);
if(cnt) if(cnt)
{ {
@ -266,18 +266,19 @@ void getAndSendFile()
return; return;
} }
rt_sem_release(okTosend); rt_sem_release(okTosend);//初始赋值
while(cnt) while(cnt)
{ {
int i=0; int i=0;
for (i = cnt-1; i > -1; i--) for (i = cnt-1; i > -1; i--)//倒序发送
{ {
if (rt_sem_take(okTosend, RT_WAITING_FOREVER) == RT_EOK) { if (rt_sem_take(okTosend, RT_WAITING_FOREVER) == RT_EOK) {
rt_thread_mdelay(1000);//thread close rt_thread_mdelay(1000);//thread close
upSendFile(f[i],index[i]); upSendFile(f[i],index[i]);
} }
} }
if (i == -1) {//上一队列发送完成 rt_thread_mdelay(3000);
if (rt_sem_take(okTosend, RT_WAITING_FOREVER) == RT_EOK) {//等待上一轮最后一个文件发送完毕
cnt = getFilesToSend(f, index); cnt = getFilesToSend(f, index);
} }
@ -414,6 +415,8 @@ void d_remain()
rt_timer_control(tmrToPNTT, RT_TIMER_CTRL_GET_TIME, (void*)&arg1); rt_timer_control(tmrToPNTT, RT_TIMER_CTRL_GET_TIME, (void*)&arg1);
rt_timer_control(tmrToPNTT, RT_TIMER_CTRL_GET_REMAIN_TIME, (void*)&arg2); rt_timer_control(tmrToPNTT, RT_TIMER_CTRL_GET_REMAIN_TIME, (void*)&arg2);
LOG_D("%d / %d min",arg2/60000,arg1/60000); LOG_D("%d / %d min",arg2/60000,arg1/60000);
rt_timer_control(tmrToPNTT, RT_TIMER_CTRL_GET_STATE, (void*)&arg1);
LOG_D("%s",arg1?"YES":"NO");
} }
/* 定时器超时函数 */ /* 定时器超时函数 */
@ -460,6 +463,7 @@ void initTT_thread_entry()
// tcpInit(); // tcpInit();
if (tcpInit() != RT_EOK) { if (tcpInit() != RT_EOK) {
cmd_free(); cmd_free();
//此处
} }
// rt_thread_mdelay(1000); // rt_thread_mdelay(1000);
if (isTCPok()) if (isTCPok())
@ -468,8 +472,7 @@ void initTT_thread_entry()
tcpRecMQ(); //开启tcp接收线程 tcpRecMQ(); //开启tcp接收线程
recTT(); recTT();
repGetTT(); repGetTT();
// upSendLoc(); reportLoc();
// chkAndSendFile();
} }
} }
@ -489,26 +492,18 @@ void deInitTT_thread_entry()
{ {
deinitThread =RT_NULL; deinitThread =RT_NULL;
tcpClose(); tcpClose();
rt_hw_stm32_eth_deinit(); //qu激活网口 if (isEthUP()) {
// rt_thread_mdelay(3000); rt_hw_stm32_eth_deinit(); //qu激活网口
}
pwTT_thread_entry("0"); pwTT_thread_entry("0");
// stopTM();
startAlarm(); startAlarm();
clearWindowMode(); clearWindowMode();
// if (0&& tmrToPNTT)
// {
// rt_timer_delete(tmrToPNTT); //关闭倒计时
// tmrToPNTT = RT_NULL;
// }
LOG_W("shunt down TT DONE"); LOG_W("shunt down TT DONE");
} }
initThread = RT_NULL; // initThread = RT_NULL;
deinitThread = RT_NULL; deinitThread = RT_NULL;
// list_thread();
// cmd_free();
} }
/** /**

View File

@ -22,7 +22,7 @@ extern int pointInPolygon(int polyCorners,float polyX[], float polyY[],float x,f
* ASCII字符形式存入log * ASCII字符形式存入log
* @param din * @param din
* @param len * @param len
* @return 1-0- * @return 1-0-
*/ */
int trDataTolog(uint8_t *din, size_t len, uint8_t isTx) int trDataTolog(uint8_t *din, size_t len, uint8_t isTx)
{ {
@ -47,8 +47,20 @@ int trDataTolog(uint8_t *din, size_t len, uint8_t isTx)
// LOG_HEX("d",16,din,len); // LOG_HEX("d",16,din,len);
int fd = open(fn, O_WRONLY | O_CREAT | O_APPEND); int fd = open(fn, O_WRONLY | O_CREAT | O_APPEND);
if (fd < 0)
//没有加锁,多试几次再报错
int trycnt=0;
while (fd < 0)
{ {
rt_thread_mdelay(500);
fd = open(fn, O_WRONLY | O_CREAT | O_APPEND);
trycnt +=1;
if (trycnt>5) {
break;
}
}
if (trycnt>5) {
LOG_E("open file %s failed!", fn); LOG_E("open file %s failed!", fn);
return -RT_ERROR; return -RT_ERROR;
} }
@ -628,21 +640,19 @@ int isInFence(uint8_t *loc);
*/ */
static void packAndSendLoc_thread_entry(void *parameter) static void packAndSendLoc_thread_entry(void *parameter)
{ {
#define CHECK_INTERVAL 15000
size_t i = 0;
while (isTCPok()) while (isTCPok())
{ {
static size_t i = 0; // static size_t i = 0;
int isReadyToSendLoc=0; int isReadyToSendLoc=0;
uint8_t rst[200]; uint8_t rst[200];
int len = packLocMsg(rst); int len = packLocMsg(rst);
if (!len) {//无有效位置数据 if (!len) {//无有效位置数据
return; // return;
continue; continue;
} }
// LOG_D("len=%d", len);
// len = cryptSingleMsg(rst, len, rst);
// LOG_D("len=%d", len);
// LOG_HEX("crypt", 16, rst, len);
//检测是否在围栏内 //检测是否在围栏内
if (!isInFence(rst+len-11))//不在围栏内 if (!isInFence(rst+len-11))//不在围栏内
@ -650,9 +660,9 @@ static void packAndSendLoc_thread_entry(void *parameter)
rst[8]=1; rst[8]=1;
isReadyToSendLoc = 1; isReadyToSendLoc = 1;
} }
rt_thread_mdelay(10000); //默认10s刷新一次 rt_thread_mdelay(CHECK_INTERVAL); //默认4s刷新一次
i += 1 ;
if (i++ > scfg.locRepInterval * 60)//定时发送默认5分钟 if (i*CHECK_INTERVAL > scfg.locRepInterval * 60 * 1000)//定时发送默认5分钟
{ {
i=0; i=0;
isReadyToSendLoc = 1; isReadyToSendLoc = 1;
@ -693,7 +703,7 @@ static void packAndSendLoc_thread_entry(void *parameter)
// } // }
//} //}
void upSendLoc() void reportLoc()
{ {
// isReadyToSendLoc = rt_sem_create("SendLoc", 0, RT_IPC_FLAG_PRIO); // isReadyToSendLoc = rt_sem_create("SendLoc", 0, RT_IPC_FLAG_PRIO);
// repLoc = rt_timer_create("repLoc", cb_upSendLoc, // repLoc = rt_timer_create("repLoc", cb_upSendLoc,
@ -712,8 +722,7 @@ void upSendLoc()
// LOG_E("thread 'sendLoc' create failure."); // LOG_E("thread 'sendLoc' create failure.");
// return; // return;
// } // }
rt_thread_t thread = rt_thread_create("RPLoc", packAndSendLoc_thread_entry, RT_NULL, 1024 * 2, 28, 10);
rt_thread_t thread = rt_thread_create("PCLoc", packAndSendLoc_thread_entry, RT_NULL, 1024 * 2, 27, 10);
/* 创建成功则启动线程 */ /* 创建成功则启动线程 */
if (thread != RT_NULL) if (thread != RT_NULL)
{ {
@ -829,7 +838,32 @@ void clearAllData()
mkfs("elm", "W25Q128");//format flash mkfs("elm", "W25Q128");//format flash
} }
/**
*
* @param din
* @param len
*/
void reSend(uint8_t *din, uint8_t len)
{
//file example: sd/rxdata/2023_08_23/23_08_23_08_31_44_14.bin
LOG_I("FUNC = resend file");
char f[60]="";
strcat(f,ROOT_PATH_DATA);
char str[30]="";
bytes2str(din, len-1, 10, "_", str);
strncat(f,str,8);
strcat(f,"/");
strcat(f,str);
strcat(f,".bin");
LOG_D("resend '%s'",f);
for (size_t var = 7; var < len; var++) {
postFileInfo(f,din[var]);
}
}
/** /**
@ -891,6 +925,9 @@ void ttRunCMD(uint8_t *din, size_t len)
reportSysCfg(); reportSysCfg();
} }
break; break;
case _CMD_RETRY_DATA:
reSend(din+8, din[7]);
break;
default: default:
LOG_W("0x%04X=未支持的指令。",cmd); LOG_W("0x%04X=未支持的指令。",cmd);
break; break;
@ -962,7 +999,8 @@ void parse3SData(uint8_t *din, size_t count)
if (chk3SDataValid(din, count) != RT_EOK) { if (chk3SDataValid(din, count) != RT_EOK) {
return; return;
} }
//有效的数据才能复位超时
resetTM();
// uint8_t dout[200]; // uint8_t dout[200];
// 未采用switch case // 未采用switch case
@ -1315,7 +1353,7 @@ int isEthUP()
MSH_CMD_EXPORT(d_getFreeSpace,getFreeSpace); MSH_CMD_EXPORT(d_getFreeSpace,getFreeSpace);
MSH_CMD_EXPORT(selfTest,sysSelfTest); MSH_CMD_EXPORT(selfTest,sysSelfTest);
MSH_CMD_EXPORT(d_packLocMsg,dpackLocMsg); MSH_CMD_EXPORT(d_packLocMsg,dpackLocMsg);
MSH_CMD_EXPORT(upSendLoc,); MSH_CMD_EXPORT(reportLoc,);
MSH_CMD_EXPORT(d_cacheData,d_cacheData); MSH_CMD_EXPORT(d_cacheData,d_cacheData);
MSH_CMD_EXPORT(d_getFileSize,d_getFileSize); MSH_CMD_EXPORT(d_getFileSize,d_getFileSize);
MSH_CMD_EXPORT(isEthUP,isEthUP); MSH_CMD_EXPORT(isEthUP,isEthUP);

View File

@ -305,7 +305,7 @@ static int webclient_get_data(const char *url)
static unsigned char *buffer = RT_NULL; static unsigned char *buffer = RT_NULL;
size_t length = 0; size_t length = 0;
if (!isTCPok() || webclient_request(url, RT_NULL, RT_NULL, 0, (void **) &buffer, &length) < 0) if (!isTCPok() || webclient_request(url, RT_NULL, RT_NULL, 0, (void **) &buffer, &length) <= 0)
{ {
LOG_E("TT server or webclient is not ready."); LOG_E("TT server or webclient is not ready.");
if (buffer) if (buffer)
@ -318,7 +318,7 @@ static int webclient_get_data(const char *url)
LOG_D("webclient GET request response data :"); LOG_D("webclient GET request response data :");
LOG_D("%s", buffer); LOG_D("%s", buffer);
if (buffer && length && buffer[length-1] == 0x7d) //0x7d='}' if (buffer && length && buffer[length-1] == 0x7d) //0x7d='}',首次返回的数据不完整,此处直接丢掉避免报错
{ {
tt_parse(buffer); tt_parse(buffer);
web_free(buffer); web_free(buffer);
@ -455,7 +455,7 @@ void repGetTT_thread_entry(void* parameter)
appendInfo(rstInfo, RST_LEN, buffer, len); appendInfo(rstInfo, RST_LEN, buffer, len);
} }
rulecheck(); rulecheck();
rt_thread_mdelay(3 * 1000);//间隔3s更新一次数据 rt_thread_mdelay(10 * 1000);//间隔3s更新一次数据
} }
memset(rstInfo,0x3f,RST_LEN);//初始化 memset(rstInfo,0x3f,RST_LEN);//初始化
} }

View File

@ -1,3 +1,6 @@
# V1.0 # V1.0
[tosend] [tosend]
sd/rxdata/2023_08_23/23_08_23_08_31_44_14.bin 0
sd/rxdata/2023_08_23/23_08_23_08_31_44_14.bin 0
[lstfile] [lstfile]

View File

@ -453,9 +453,10 @@ void recTT_thread_entry()
} }
LOG_I("%d Bytes received from TT",msg.size); LOG_I("%d Bytes received from TT",msg.size);
LOG_HEX("TTrec", 27, msg.data, msg.size); LOG_HEX("TTrec", 27, msg.data, msg.size);
//存入log
char tmp[200]="";//接收的数据不会很长200足矣
trDataTolog(bytes2str(msg.data, msg.size, 16, " ", tmp), strlen(tmp), 0);
//此处调用处理函数 //此处调用处理函数
// LOG_D("try to parse data.");
// resetTM();
parseTTData(msg.data,msg.size); parseTTData(msg.data,msg.size);
} }
} }

View File

@ -169,15 +169,21 @@ static int uart_dma_sample(int argc, char *argv[])
return ret; return ret;
} }
/* 导出到 msh 命令列表中 */ /* 导出到 msh 命令列表中 */
MSH_CMD_EXPORT(uart_dma_sample, uart device dma sample); //MSH_CMD_EXPORT(uart_dma_sample, uart device dma sample);
INIT_COMPONENT_EXPORT(uart_dma_sample); INIT_COMPONENT_EXPORT(uart_dma_sample);
/**
* 3S
* @param din
* @param len
*/
void sendTo3S(uint8_t *din, size_t len) void sendTo3S(uint8_t *din, size_t len)
{ {
size_t rst = rt_device_write(serial, 0, din, len); size_t rst = rt_device_write(serial, 0, din, len);
LOG_D("send %d Bytes done.",rst); LOG_D("send %d Bytes done.",rst);
} }
#ifdef DEMO
void sendData(int argc, char *argv[]) void sendData(int argc, char *argv[])
{ {
//待发数据 //待发数据
@ -198,3 +204,4 @@ void sendData(int argc, char *argv[])
LOG_D("send %d Bytes ok.",rst); LOG_D("send %d Bytes ok.",rst);
} }
MSH_CMD_EXPORT(sendData,3) MSH_CMD_EXPORT(sendData,3)
#endif

View File

@ -51,6 +51,7 @@ typedef struct
#define ROOT_PATH_DATA "/sd/rxdata/" #define ROOT_PATH_DATA "/sd/rxdata/"
#define CRYPT_BEFRE_PACK #define CRYPT_BEFRE_PACK
#define UPDATE_INTERVAL 10
#define RELAY #define RELAY
#endif /* APPLICATIONS_USRCFG_H_ */ #endif /* APPLICATIONS_USRCFG_H_ */