|
- const file = require("./utils/file");
- const {getRustConfig} = require("./config");
- const {spawn, exec, fork, execSync} = require("child_process");
- const {logger} = require("./utils/logger");
- const http = require("./utils/http");
- const e = require("express");
- const {is, tr} = require("date-fns/locale");
- /*******
- * web
- * **** */
- const RobotStatus = Object.freeze({
- // 已停止的
- STOPPED: "STOPPED",
- // 正在停止
- STOP_PENDING: "STOP_PENDING",
- //正在运行
- RUNNING: "RUNNING",
- //启动中
- START_PENDING: "START_PENDING",
- //下载中
- DOWNLOADING: "DOWNLOADING",
- //错误
- ERROR: "ERROR"
- });
- let appMap = new Map();
- function getAppMap() {
- return appMap;
- }
- function getApp(key) {
- let app = {
- id: -1,
- port: -1,
- path: "",
- programName: "",
- strategyName: "",
- childProcess: undefined,
- nowBalance: -1,
- posNum: -1,
- messlist: [],
- predictorState:[],
- threadStatus: RobotStatus.STOPPED,
- errorMessage: "成功",
- threadStartTime: -1,
- status: 0,
- restartStatus: 0,
- closeTime: 0,
- isClearance: false,
- }
- // logger.info(appMap, appMap.has(key));
- if (appMap.has(key)) {
- app = appMap.get(key)
- } else {
- appMap.set(key, app)
- }
- // logger.info(app);
- return app
- }
- function delApp(key) {
- appMap.delete(key)
- }
- async function run(param) {
- return new Promise(async (resolve, reject) => {
- var key = param.id
- var appName = param.path
- let newAppName = "4l_as_" + appName;
- var programName = param.programName
- var strategyName = param.strategyName
- var app = getApp(key)
- var config = getRustConfig()
- //检查当前机器人id 对应的as 是否已经启动,为了防止重复启动
- if (app.status === 1) {
- logger.info(`防止重复启动!结束当前请求`)
- return resolve(false)
- }
- app.status = 1
- // 初始化机器人状态
- app.threadStatus = RobotStatus.START_PENDING
- app.id = key
- app.port = param.callPort
- app.path = param.path
- app.programName = param.programName
- app.strategyName = param.strategyName
- app.isClearance = false
- robotStatus(app)
- /****
- *** 第二步:路径经组装
- * 注意:可能存在一台服务器多个机器人,通过机器人ID创建文件夹区分,需要组装好路径
- ***/
- //系统不同 做不同的路径处理
- const platform = process.platform;
- let exeName = newAppName; //可执行程序
- let configName = "config.json"; //配置文件
- let appPath = "";
- //相对路径存放
- appPath = config.filePath + "/" + app.id + "/" + strategyName + "/" + programName
- //1. 检查目录
- file.checkPathSync(appPath);
- /****
- *** 第三步:rust 启动程序检查(下载更新)
- ***/
- var isDow = false
- var scheduleDow = 0
- var scheduleConfig = 0
- //2、 检查执行程序
- var destination = appPath + "/" + exeName
- if (!file.checkFilePath(destination)) {
- app.threadStatus = RobotStatus.DOWNLOADING
- robotStatus(app)
- isDow = true
- var dowHeaders = {...config.headers};
- if (platform === 'win32') {
- downloadFileWithPowerShell(
- config.baseUrl + config.dowAppURL + '/?path=' + appName,
- appPath + "/" + appName,
- dowHeaders, (err, b) => {
- if (err === null) {
- scheduleDow = 1
- } else {
- app.threadStatus = RobotStatus.ERROR
- app.errorMessage = '下载失败!'
- scheduleDow = -1
- robotStatus(app)
- }
- });
- } else {
- downloadFileWithLinux(
- config.baseUrl + config.dowAppURL + '/?path=' + appName,
- appPath + "/" + appName,
- dowHeaders, (err, b) => {
- if (err === null) {
- scheduleDow = 1
- } else {
- app.threadStatus = RobotStatus.ERROR
- app.errorMessage = '下载失败!'
- scheduleDow = -1
- robotStatus(app)
- }
- });
- }
- } else {
- scheduleDow = 1
- }
- /****
- *** 第四步:rust 启动配置检查(下载更新)
- ***/
- //2 为防止启动指令不同,每次重新写入
- var destination2 = appPath + "/" + configName
- var urrrl = config.baseUrl + config.dowConfigURL + '/?robotId=' + app.id
- var configHeaders = {...config.headers};
- http.request_get(urrrl, {...config.headers})
- .then((data) => {
- logger.info('配置参数:', data);
- const json = JSON.parse(data)
- // 处理成配置文件
- const map = json.data
- var json_obj = JSON.parse("{}")
- for (k in map) {
- json_obj[k] = map[k]
- // logger.info(map[k] + "\t")
- }
- logger.info("参数组装完成!")
- te = JSON.stringify(json_obj)
- file.writeFile(destination2, te, (errer, b) => {
- if (errer === null && b === true) {
- scheduleConfig = 1
- } else {
- logger.info("配置参数写入配置失败!", errer)
- app.threadStatus = RobotStatus.ERROR
- app.errorMessage = '配置参数写入配置失败!'
- scheduleConfig = -1
- robotStatus(app)
- }
- })
- })
- .catch((e) => {
- logger.info("配置参数获取失败", e)
- app.threadStatus = RobotStatus.ERROR
- app.errorMessage = '配置参数获取失败!'
- scheduleConfig = -1
- robotStatus(app)
- })
- //监听下载只有下载完成了才能继续
- while (true) {
- await delay(5000);
- let info_t = ""
- let info_t2 = ""
- //是否开启下载,如果是新下载,下载完成需要授权
- if (isDow) {
- if (scheduleDow === 1) {
- info_t = "启动文件:下载完成!"
- //文件下载好了,设置统一前缀
- execSync(`sudo mv ${appPath + "/" + appName} ${appPath + "/" + newAppName}`, (error, stdout, stderr) => {
- if (error) {
- logger.error(`文件重命名失败: ${error}`);
- }
- logger.info(`文件重命名完成!`);
- });
- //文件授权
- execSync(`chmod +x ${appPath + "/" + newAppName}`, (error, stdout, stderr) => {
- if (error) {
- logger.error(`启动文件:授权失败: ${error}`);
- }
- logger.info(`启动文件:授权完成!`);
- });
- } else if (scheduleDow === -1) {
- info_t = "启动文件:下载失败!"
- } else {
- info_t = "启动文件:还在下载..."
- }
- } else {
- info_t = "启动文件:完整无需下载"
- }
- if (scheduleConfig === 1) {
- info_t2 = "配置文件:读取成功!"
- } else if (scheduleConfig === -1) {
- info_t = "配置文件:读取失败!"
- } else {
- info_t2 = "配置文件:还在读取..."
- }
- logger.info(info_t, info_t2);
- if (scheduleDow === 1 && scheduleConfig === 1) {
- break
- } else if (scheduleConfig === -1 || scheduleDow === -1) {
- return resolve(false);
- }
- }
- app.threadStatus = RobotStatus.START_PENDING
- robotStatus(app)
- logger.info("开始启动程序!");
- //3. spawn启动
- const exePath = appPath + "/" + exeName
- const configPath = appPath + "/" + configName
- logger.info(`文件地址:${exePath}-----${configPath}`);
- const command = exePath
- const args = ['--config=' + configPath, '--port=' + app.port]
- app.childProcess = spawn(command, args)
- app.threadStartTime = new Date().getTime()
- /**********监听*********/
- app.childProcess.stdout.on('data', (msg) => {
- // logger.info('stdout:' + msg.toString())
- })
- app.childProcess.on('message', (msg) => {
- // logger.info(`message: ${msg}`);
- });
- app.childProcess.on('error', (err) => {
- logger.error('子程序-异常:', err);
- app.threadStatus = RobotStatus.ERROR
- app.errorMessage = `子线程发生异常!! ${err}`
- robotStatus(app)
- app.childProcess = undefined
- app.status = 0
- app.closeTime = new Date().getTime()
- });
- app.childProcess.on('exit', (code, signal) => {
- logger.info(`子进程退出-exit,退出码: ${code}, 信号: ${signal}`);
- app.threadStatus = RobotStatus.STOPPED
- robotStatus(app)
- app.childProcess = undefined
- app.status = 0
- app.closeTime = new Date().getTime()
- });
- app.childProcess.on('close', (code) => {
- logger.info(`子进程退出-close,退出码 ${code}`);
- app.threadStatus = RobotStatus.STOPPED
- robotStatus(app)
- app.childProcess = undefined
- app.status = 0
- app.closeTime = new Date().getTime()
- });
- app.threadStatus = RobotStatus.RUNNING
- robotStatus(app)
- return resolve(true)
- })
- }
- function delay(ms) {
- return new Promise(resolve => setTimeout(resolve, ms));
- }
- async function closeApp(param) {
- return new Promise((resolve, reject) => {
- var key = param.id
- var app = getApp(key)
- logger.info(` 信号: `, app.threadStatus);
- /*******新的删除方式*************/
- //文件授权
- if (app.childProcess !== undefined) {
- var pid = app.childProcess.pid
- exec(`sudo kill ${pid}`, (error, stdout, stderr) => {
- if (error) {
- logger.error(`进程${pid} 杀死失败: ${error}`);
- }
- logger.info(`进程${pid} 杀死成功`);
- logger.info(`当前app:`, app);
- app.threadStatus = RobotStatus.STOP_PENDING
- app.isClearance = false
- robotStatus(app)
- });
- }
- return resolve(true)
- })
- /*******新的删除方式*************/
- }
- //重启 RESTART
- async function restartApp(param) {
- var key = param.id
- var app = getApp(key)
- // var restartStatus = app.restartStatus
- // if (restartStatus !== 0) {
- // logger.info("防止重启指令 重复发送~!")
- // return
- // }
- // app.restartStatus = 1
- logger.info("--开始重启!")
- logger.info('当前app', app.id, app.threadStatus);
- const closeResult = await closeApp(param)
- // logger.info('?', JSON.stringify(closeResult))
- while (true) {
- await delay(1000)
- const runResult = await run(param)
- // logger.info('??', JSON.stringify(runResult))
- if (runResult) break
- }
- // app.restartStatus = 0
- logger.info(`重启完成!!!!!`);
- }
- //启动- 检查程序
- async function searchPositions(param) {
- return new Promise(async (resolve, reject) => {
- // {"id":"375","callPort":"1111","path":"80654e330ed67df2880231a67925e984as-rust","programName":"v3.3.2_clear","strategyName":"as","checkId":"467"}
- var key = param.id
- var appName = param.path
- var r_id = param.id
- let newAppName = "4l_as_" + appName;
- let configName = "config.json"; //配置文件
- var programName = param.programName
- var strategyName = param.strategyName
- var config = getRustConfig()
- var app = getApp(key)
- if (app.port === -1) {
- } else {
- }
- //相对路径存放
- var appPath = config.filePath + "/" + param.id + "/" + strategyName + "/" + programName
- const asName = appPath + "/" + newAppName
- const configPath = appPath + "/" + configName
- const appPort = param.callPort
- const command = `sudo ${asName} --config=${configPath} --port=${appPort} --run_mode=1 --r_id=${r_id}`
- exec(command, (error, stdout, stderr) => {
- if (error) {
- logger.error(`命令执行失败~: ${command} 错误:${error}`);
- }
- logger.info(`检查指令发送成功!`);
- logger.info(command);
- });
- return resolve(true)
- })
- }
- async function closeAppAll() {
- return new Promise(async (resolve, reject) => {
- var appMap = getAppMap()
- // console.log(appMap.size); // 输出Map的大小
- // appMap.forEach((value, key) => {
- // console.log(key, value); // 输出Map的键值对
- // // logger.info("???", key, value)
- // var app = value
- // logger.info(` 信号: `, app.threadStatus);
- // /*******新的删除方式*************/
- // //文件授权
- // if (app.childProcess !== undefined) {
- // var pid = app.childProcess.pid
- // exec(`sudo kill ${pid}`, (error, stdout, stderr) => {
- // if (error) {
- // logger.error(`进程${pid} 杀死失败: ${error}`);
- // }
- // logger.info(`进程${pid} 杀死成功`);
- // // logger.info(`当前app:`, app);
- // app.threadStatus = RobotStatus.STOP_PENDING
- // robotStatus(app)
- // });
- // }
- // });
- exec(`sudo pkill 4l_as`, (error, stdout, stderr) => {
- if (error) {
- logger.error(`4l_as杀死失败: ${error}`);
- }
- logger.info(`4l_as 杀死成功`);
- });
- while (true) {
- var z = 0;
- await delay(1000)
- var str = "";
- appMap.forEach((value, key) => {
- str += "机器人:" + key + ",当前状态:" + value.threadStatus + "\t"
- if (value.threadStatus !== RobotStatus.STOPPED) {
- z += 1
- }
- });
- logger.info(str)
- if (z === 0) {
- logger.info(`策略全部关闭~~开始关闭node`);
- break
- } else {
- logger.info(`等待关闭~~策略!`);
- }
- }
- //为了确保as
- // while (true) {
- // execSync(`sudo kill $(pgrep -f "^.*\/4l_as_")`, (error, stdout, stderr) => {
- // if (error) {
- // logger.error(`全杀-进程执行失败: ${error}`);
- // }
- // logger.info(`全杀-进程执行成功`);
- // break
- // });
- // }
- return resolve(true)
- })
- /*******新的删除方式*************/
- }
- // 下载执行程序,使用的实行服务器指令的方式进行下载
- function downloadFileWithPowerShell(url, destination, headers, funBreak) {
- const headersString = Object.entries(headers)
- .map(([key, value]) => `'${key}'='${value}'`) // 使用单引号来确保特殊字符不被解析
- .join('; '); // 使用分号和空格分隔每个键值对
- const command = `powershell -command "Invoke-WebRequest -Uri '${url}' -OutFile '${destination}' -Headers @{${headersString}}"`;
- exec(command, (error, stdout, stderr) => {
- if (error) {
- logger.error(`下载出错: ${error}`);
- return funBreak(error);
- }
- logger.info(`下载完成!`);
- funBreak(null, true); // 通知下载成功,修改了B为true
- });
- }
- function downloadFileWithLinux(url, destination, headers, funBreak) {
- const headersString = Object.entries(headers)
- .map(([key, value]) => `-H '${key}: ${value}'`) // 设置curl的HTTP头参数
- .join(' '); // 使用空格分隔每个头参数
- const command = `curl ${headersString} '${url}' -o '${destination}'`;
- exec(command, (error, stdout, stderr) => {
- if (error) {
- logger.error(`下载出错: ${error}`);
- return funBreak(error);
- }
- logger.info(`下载完成!`);
- funBreak(null, true); // 通知下载成功,修改了B为true
- });
- }
- // 上报-机器人状态
- function robotStatus(app) {
- var config = getRustConfig()
- var msg = (app.threadStatus !== RobotStatus.ERROR ? "完成" : app.errorMessage)
- http.request_post(`${config.reportedUrl}/report/statusReport`, {
- "robotId": app.id,
- "status": app.threadStatus,
- "msg": msg
- }, {...config.headers}).then((data) => {
- // logger.info('??', data);
- logger.info('#####################汇报:状态:', '机器人id:', app.id, '状态:', app.threadStatus);
- }).catch((error) => {
- logger.error(`请求遇到问题1: ${error.message}`); // 处理可能发生的错误
- });
- }
- // 上报-余额
- function robotAmount(app) {
- var accUrl = "http://127.0.0.1:" + app.port
- //拿到策略余额
- try {
- var config = getRustConfig()
- http.request_get(`${accUrl}/account`, {...config.headers})
- .then((data) => {
- var d = JSON.parse(data)
- app.nowBalance = d.now_balance
- app.posNum = d.pos
- //余额有变动上报一次
- // logger.info(`余额当前:${(app.nowBalance + "")}--${(d.now_balance + "")}--`);
- // if ((app.nowBalance + "") !== (d.now_balance + "")) {
- // http.request_post(`${config.reportedUrl}/report/amountReport`, {
- // "robotId": app.id,
- // "amount": d.now_balance
- // }, {...config.headers})
- // .then((data2) => {
- // logger.info('上报响应', data2);
- // logger.info('#####################汇报:余额:pid:', app.childProcess.pid, '机器人id:', app.id, '机器人本地余额:', app.nowBalance, '实际余额:', d.now_balance);
- // app.nowBalance = d.now_balance
- // }).catch((error) => {
- // logger.error(`#####################汇报:余额上报错误: ${error.message}`); // 处理可能发生的错误
- // });
- // }
- }).catch((error) => {
- logger.error(`#####################汇报:余额接口请求失败: ${error.message}`); // 处理可能发生的错误
- });
- } catch (e) {
- logger.error('请求失败!:', e)
- }
- }
- // 定时清理停止状态的 机器人
- function delRobot(app) {
- // const platform = process.platform;
- // if (platform !== 'win32' && app.threadStatus === RobotStatus.STOPPED) {
- // var pid = app.childProcess.pid
- // delApp(app.id)
- // }
- }
- //----------------------------------------------
- module.exports = {
- run,
- closeApp,
- restartApp,
- delRobot,
- robotStatus,
- robotAmount,
- closeAppAll,
- searchPositions,
- appMap,
- getApp,
- getAppMap,
- RobotStatus
- };
|