const file = require("./utils/file"); const {getRustConfig} = require("./config"); const {spawn, exec, fork, execSync} = require("child_process"); const {logger} = require("./utils/logger"); const http = require("./utils/http"); const e = require("express"); const {is, tr} = require("date-fns/locale"); /******* * web * **** */ const RobotStatus = Object.freeze({ // 已停止的 STOPPED: "STOPPED", // 正在停止 STOP_PENDING: "STOP_PENDING", //正在运行 RUNNING: "RUNNING", //启动中 START_PENDING: "START_PENDING", //下载中 DOWNLOADING: "DOWNLOADING", //错误 ERROR: "ERROR" }); let appMap = new Map(); function getAppMap() { return appMap; } function getApp(key) { let app = { id: -1, port: -1, path: "", programName: "", strategyName: "", childProcess: undefined, nowBalance: -1, posNum: -1, messlist: [], predictorState:[], threadStatus: RobotStatus.STOPPED, errorMessage: "成功", threadStartTime: -1, status: 0, restartStatus: 0, closeTime: 0, isClearance: false, } // logger.info(appMap, appMap.has(key)); if (appMap.has(key)) { app = appMap.get(key) } else { appMap.set(key, app) } // logger.info(app); return app } function delApp(key) { appMap.delete(key) } async function run(param) { return new Promise(async (resolve, reject) => { var key = param.id var appName = param.path let newAppName = "4l_as_" + appName; var programName = param.programName var strategyName = param.strategyName var app = getApp(key) var config = getRustConfig() //检查当前机器人id 对应的as 是否已经启动,为了防止重复启动 if (app.status === 1) { logger.info(`防止重复启动!结束当前请求`) return resolve(false) } app.status = 1 // 初始化机器人状态 app.threadStatus = RobotStatus.START_PENDING app.id = key app.port = param.callPort app.path = param.path app.programName = param.programName app.strategyName = param.strategyName app.isClearance = false robotStatus(app) /**** *** 第二步:路径经组装 * 注意:可能存在一台服务器多个机器人,通过机器人ID创建文件夹区分,需要组装好路径 ***/ //系统不同 做不同的路径处理 const platform = process.platform; let exeName = newAppName; //可执行程序 let configName = "config.json"; //配置文件 let appPath = ""; //相对路径存放 appPath = config.filePath + "/" + app.id + "/" + strategyName + "/" + programName //1. 检查目录 file.checkPathSync(appPath); /**** *** 第三步:rust 启动程序检查(下载更新) ***/ var isDow = false var scheduleDow = 0 var scheduleConfig = 0 //2、 检查执行程序 var destination = appPath + "/" + exeName if (!file.checkFilePath(destination)) { app.threadStatus = RobotStatus.DOWNLOADING robotStatus(app) isDow = true var dowHeaders = {...config.headers}; if (platform === 'win32') { downloadFileWithPowerShell( config.baseUrl + config.dowAppURL + '/?path=' + appName, appPath + "/" + appName, dowHeaders, (err, b) => { if (err === null) { scheduleDow = 1 } else { app.threadStatus = RobotStatus.ERROR app.errorMessage = '下载失败!' scheduleDow = -1 robotStatus(app) } }); } else { downloadFileWithLinux( config.baseUrl + config.dowAppURL + '/?path=' + appName, appPath + "/" + appName, dowHeaders, (err, b) => { if (err === null) { scheduleDow = 1 } else { app.threadStatus = RobotStatus.ERROR app.errorMessage = '下载失败!' scheduleDow = -1 robotStatus(app) } }); } } else { scheduleDow = 1 } /**** *** 第四步:rust 启动配置检查(下载更新) ***/ //2 为防止启动指令不同,每次重新写入 var destination2 = appPath + "/" + configName var urrrl = config.baseUrl + config.dowConfigURL + '/?robotId=' + app.id var configHeaders = {...config.headers}; http.request_get(urrrl, {...config.headers}) .then((data) => { logger.info('配置参数:', data); const json = JSON.parse(data) // 处理成配置文件 const map = json.data var json_obj = JSON.parse("{}") for (k in map) { json_obj[k] = map[k] // logger.info(map[k] + "\t") } logger.info("参数组装完成!") te = JSON.stringify(json_obj) file.writeFile(destination2, te, (errer, b) => { if (errer === null && b === true) { scheduleConfig = 1 } else { logger.info("配置参数写入配置失败!", errer) app.threadStatus = RobotStatus.ERROR app.errorMessage = '配置参数写入配置失败!' scheduleConfig = -1 robotStatus(app) } }) }) .catch((e) => { logger.info("配置参数获取失败", e) app.threadStatus = RobotStatus.ERROR app.errorMessage = '配置参数获取失败!' scheduleConfig = -1 robotStatus(app) }) //监听下载只有下载完成了才能继续 while (true) { await delay(5000); let info_t = "" let info_t2 = "" //是否开启下载,如果是新下载,下载完成需要授权 if (isDow) { if (scheduleDow === 1) { info_t = "启动文件:下载完成!" //文件下载好了,设置统一前缀 execSync(`sudo mv ${appPath + "/" + appName} ${appPath + "/" + newAppName}`, (error, stdout, stderr) => { if (error) { logger.error(`文件重命名失败: ${error}`); } logger.info(`文件重命名完成!`); }); //文件授权 execSync(`chmod +x ${appPath + "/" + newAppName}`, (error, stdout, stderr) => { if (error) { logger.error(`启动文件:授权失败: ${error}`); } logger.info(`启动文件:授权完成!`); }); } else if (scheduleDow === -1) { info_t = "启动文件:下载失败!" } else { info_t = "启动文件:还在下载..." } } else { info_t = "启动文件:完整无需下载" } if (scheduleConfig === 1) { info_t2 = "配置文件:读取成功!" } else if (scheduleConfig === -1) { info_t = "配置文件:读取失败!" } else { info_t2 = "配置文件:还在读取..." } logger.info(info_t, info_t2); if (scheduleDow === 1 && scheduleConfig === 1) { break } else if (scheduleConfig === -1 || scheduleDow === -1) { return resolve(false); } } app.threadStatus = RobotStatus.START_PENDING robotStatus(app) logger.info("开始启动程序!"); //3. spawn启动 const exePath = appPath + "/" + exeName const configPath = appPath + "/" + configName logger.info(`文件地址:${exePath}-----${configPath}`); const command = exePath const args = ['--config=' + configPath, '--port=' + app.port] app.childProcess = spawn(command, args) app.threadStartTime = new Date().getTime() /**********监听*********/ app.childProcess.stdout.on('data', (msg) => { // logger.info('stdout:' + msg.toString()) }) app.childProcess.on('message', (msg) => { // logger.info(`message: ${msg}`); }); app.childProcess.on('error', (err) => { logger.error('子程序-异常:', err); app.threadStatus = RobotStatus.ERROR app.errorMessage = `子线程发生异常!! ${err}` robotStatus(app) app.childProcess = undefined app.status = 0 app.closeTime = new Date().getTime() }); app.childProcess.on('exit', (code, signal) => { logger.info(`子进程退出-exit,退出码: ${code}, 信号: ${signal}`); app.threadStatus = RobotStatus.STOPPED robotStatus(app) app.childProcess = undefined app.status = 0 app.closeTime = new Date().getTime() }); app.childProcess.on('close', (code) => { logger.info(`子进程退出-close,退出码 ${code}`); app.threadStatus = RobotStatus.STOPPED robotStatus(app) app.childProcess = undefined app.status = 0 app.closeTime = new Date().getTime() }); app.threadStatus = RobotStatus.RUNNING robotStatus(app) return resolve(true) }) } function delay(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } async function closeApp(param) { return new Promise((resolve, reject) => { var key = param.id var app = getApp(key) logger.info(` 信号: `, app.threadStatus); /*******新的删除方式*************/ //文件授权 if (app.childProcess !== undefined) { var pid = app.childProcess.pid exec(`sudo kill ${pid}`, (error, stdout, stderr) => { if (error) { logger.error(`进程${pid} 杀死失败: ${error}`); } logger.info(`进程${pid} 杀死成功`); logger.info(`当前app:`, app); app.threadStatus = RobotStatus.STOP_PENDING app.isClearance = false robotStatus(app) }); } return resolve(true) }) /*******新的删除方式*************/ } //重启 RESTART async function restartApp(param) { var key = param.id var app = getApp(key) // var restartStatus = app.restartStatus // if (restartStatus !== 0) { // logger.info("防止重启指令 重复发送~!") // return // } // app.restartStatus = 1 logger.info("--开始重启!") logger.info('当前app', app.id, app.threadStatus); const closeResult = await closeApp(param) // logger.info('?', JSON.stringify(closeResult)) while (true) { await delay(1000) const runResult = await run(param) // logger.info('??', JSON.stringify(runResult)) if (runResult) break } // app.restartStatus = 0 logger.info(`重启完成!!!!!`); } //启动- 检查程序 async function searchPositions(param) { return new Promise(async (resolve, reject) => { // {"id":"375","callPort":"1111","path":"80654e330ed67df2880231a67925e984as-rust","programName":"v3.3.2_clear","strategyName":"as","checkId":"467"} var key = param.id var appName = param.path var r_id = param.id let newAppName = "4l_as_" + appName; let configName = "config.json"; //配置文件 var programName = param.programName var strategyName = param.strategyName var config = getRustConfig() var app = getApp(key) if (app.port === -1) { } else { } //相对路径存放 var appPath = config.filePath + "/" + param.id + "/" + strategyName + "/" + programName const asName = appPath + "/" + newAppName const configPath = appPath + "/" + configName const appPort = param.callPort const command = `sudo ${asName} --config=${configPath} --port=${appPort} --run_mode=1 --r_id=${r_id}` exec(command, (error, stdout, stderr) => { if (error) { logger.error(`命令执行失败~: ${command} 错误:${error}`); } logger.info(`检查指令发送成功!`); logger.info(command); }); return resolve(true) }) } async function closeAppAll() { return new Promise(async (resolve, reject) => { var appMap = getAppMap() // console.log(appMap.size); // 输出Map的大小 // appMap.forEach((value, key) => { // console.log(key, value); // 输出Map的键值对 // // logger.info("???", key, value) // var app = value // logger.info(` 信号: `, app.threadStatus); // /*******新的删除方式*************/ // //文件授权 // if (app.childProcess !== undefined) { // var pid = app.childProcess.pid // exec(`sudo kill ${pid}`, (error, stdout, stderr) => { // if (error) { // logger.error(`进程${pid} 杀死失败: ${error}`); // } // logger.info(`进程${pid} 杀死成功`); // // logger.info(`当前app:`, app); // app.threadStatus = RobotStatus.STOP_PENDING // robotStatus(app) // }); // } // }); exec(`sudo pkill 4l_as`, (error, stdout, stderr) => { if (error) { logger.error(`4l_as杀死失败: ${error}`); } logger.info(`4l_as 杀死成功`); }); while (true) { var z = 0; await delay(1000) var str = ""; appMap.forEach((value, key) => { str += "机器人:" + key + ",当前状态:" + value.threadStatus + "\t" if (value.threadStatus !== RobotStatus.STOPPED) { z += 1 } }); logger.info(str) if (z === 0) { logger.info(`策略全部关闭~~开始关闭node`); break } else { logger.info(`等待关闭~~策略!`); } } //为了确保as // while (true) { // execSync(`sudo kill $(pgrep -f "^.*\/4l_as_")`, (error, stdout, stderr) => { // if (error) { // logger.error(`全杀-进程执行失败: ${error}`); // } // logger.info(`全杀-进程执行成功`); // break // }); // } return resolve(true) }) /*******新的删除方式*************/ } // 下载执行程序,使用的实行服务器指令的方式进行下载 function downloadFileWithPowerShell(url, destination, headers, funBreak) { const headersString = Object.entries(headers) .map(([key, value]) => `'${key}'='${value}'`) // 使用单引号来确保特殊字符不被解析 .join('; '); // 使用分号和空格分隔每个键值对 const command = `powershell -command "Invoke-WebRequest -Uri '${url}' -OutFile '${destination}' -Headers @{${headersString}}"`; exec(command, (error, stdout, stderr) => { if (error) { logger.error(`下载出错: ${error}`); return funBreak(error); } logger.info(`下载完成!`); funBreak(null, true); // 通知下载成功,修改了B为true }); } function downloadFileWithLinux(url, destination, headers, funBreak) { const headersString = Object.entries(headers) .map(([key, value]) => `-H '${key}: ${value}'`) // 设置curl的HTTP头参数 .join(' '); // 使用空格分隔每个头参数 const command = `curl ${headersString} '${url}' -o '${destination}'`; exec(command, (error, stdout, stderr) => { if (error) { logger.error(`下载出错: ${error}`); return funBreak(error); } logger.info(`下载完成!`); funBreak(null, true); // 通知下载成功,修改了B为true }); } // 上报-机器人状态 function robotStatus(app) { var config = getRustConfig() var msg = (app.threadStatus !== RobotStatus.ERROR ? "完成" : app.errorMessage) http.request_post(`${config.reportedUrl}/report/statusReport`, { "robotId": app.id, "status": app.threadStatus, "msg": msg }, {...config.headers}).then((data) => { // logger.info('??', data); logger.info('#####################汇报:状态:', '机器人id:', app.id, '状态:', app.threadStatus); }).catch((error) => { logger.error(`请求遇到问题1: ${error.message}`); // 处理可能发生的错误 }); } // 上报-余额 function robotAmount(app) { var accUrl = "http://127.0.0.1:" + app.port //拿到策略余额 try { var config = getRustConfig() http.request_get(`${accUrl}/account`, {...config.headers}) .then((data) => { var d = JSON.parse(data) app.nowBalance = d.now_balance app.posNum = d.pos //余额有变动上报一次 // logger.info(`余额当前:${(app.nowBalance + "")}--${(d.now_balance + "")}--`); // if ((app.nowBalance + "") !== (d.now_balance + "")) { // http.request_post(`${config.reportedUrl}/report/amountReport`, { // "robotId": app.id, // "amount": d.now_balance // }, {...config.headers}) // .then((data2) => { // logger.info('上报响应', data2); // logger.info('#####################汇报:余额:pid:', app.childProcess.pid, '机器人id:', app.id, '机器人本地余额:', app.nowBalance, '实际余额:', d.now_balance); // app.nowBalance = d.now_balance // }).catch((error) => { // logger.error(`#####################汇报:余额上报错误: ${error.message}`); // 处理可能发生的错误 // }); // } }).catch((error) => { logger.error(`#####################汇报:余额接口请求失败: ${error.message}`); // 处理可能发生的错误 }); } catch (e) { logger.error('请求失败!:', e) } } // 定时清理停止状态的 机器人 function delRobot(app) { // const platform = process.platform; // if (platform !== 'win32' && app.threadStatus === RobotStatus.STOPPED) { // var pid = app.childProcess.pid // delApp(app.id) // } } //---------------------------------------------- module.exports = { run, closeApp, restartApp, delRobot, robotStatus, robotAmount, closeAppAll, searchPositions, appMap, getApp, getAppMap, RobotStatus };