const fs = require('fs'); const path = require('path'); const {logger} = require("./logger"); const {rustConfig} = require("../config"); const http = require("./http"); const readline = require('readline'); // 检查目录是否存在,不存在新建 function checkPathSync(filePath) { // 将相对路径转换为绝对路径 const directoryPath = path.resolve(filePath); // 同步检查目录是否存在 if (!fs.existsSync(directoryPath)) { // 目录不存在,需要创建它 logger.info('目录不存在,正在创建...'); try { // 同步创建目录 fs.mkdirSync(directoryPath, {recursive: true}); logger.info('目录已成功创建'); } catch (error) { // 创建目录失败 logger.error('创建目录失败:', error); return false } } else { // 目录已存在 // logger.info('目录已存在'); } return true } // 检查文件是否存在 function checkFilePath(filePath) { // 同步检查文件是否存在 const directoryPath = path.resolve(filePath); if (fs.existsSync(directoryPath)) { logger.info('文件存在:' + filePath); return true } else { logger.error('文件不存在:' + filePath); return false } } // 创建文件 function writeFile(filePath, text, breakFun) { fs.writeFile(filePath, text, (err) => { if (err) { logger.error('写入文件时发生错误:', err); breakFun(err) } else { logger.log('文件已成功写入!'); breakFun(null, true) } }); } // 同步复制 function copyFileSync(source, destination) { try { // 同步复制文件 fs.copyFileSync(source, destination); // logger.info("成功"); return true; } catch (err) { return false; // logger.info("失败"); } } // // 读取日志(倒叙)指定行数 // function readLastNLines(filePath, n) { // const content = fs.readFileSync(filePath, 'utf8'); // const lines = content.trim().split('\n'); // const lastNLines = lines.slice(-n).join('\n'); // // // 此处处理或输出最后n行的内容 // // console.log(lastNLines); // return lastNLines.split("\n").reverse(); // } // 下载文件 function dowFile(url, fileName, dowPath, headers) { return http.dowFile(url, fileName, dowPath, headers) } // 删除文件 function delFile() { } /***********************************/ function getLastFile(dirPath, number, callback) { fs.readdir(dirPath, (err, files) => { // logger.info(`---------------------4`) if (err) { logger.error(`无法列出目录。`, err); // process.exit(1); } else { let fileList = []; //提示日志默认是根据日志生成的,拿到的files 也是 按照生成日志排序的,所以 // 按照倒叙获取,如果一个文件内容满足num,就不在继续查询,如果不够,继续获取文件 for (file of files) { const filePath = path.join(dirPath, file); // logger.info("所有文件:",filePath) fs.promises.stat(filePath).then((stats) =>{ if (stats.isFile()) { let tt = stats.birthtime.getTime() fileList.push({name: file, time: tt}); logger.info("----------------所有文件:" + filePath + "时间:" + tt) } }); } for (a of fileList) { const filePath = path.join(dirPath, a.name); logger.info("复查:" + filePath + "\ttime:" + a.time) } fileList.sort((a, b) => b.time - a.time); for (a of fileList) { const filePath = path.join(dirPath, a.name); logger.info("排序:" + filePath + "\ttime:" + a.time) } let lastFileList = fileList.slice(0, number != -1 ? number : fileList.length); let lastFileNameList = lastFileList.map((item) => item.name); callback(lastFileNameList.reverse(), lastFileList.reverse()); } }); } // 读取日志(倒叙)指定行数 async function readLastNLines(dirPath, filePathList, n) { try { const fileList = []; var count = 0; for (var i = filePathList.length - 1; i >= 0; i--) { var item = filePathList[i]; const filepath = `${dirPath}/${item}`; logger.info("日志文件:" + filepath) //老版本 加载整个文件 // const content = fs.readFileSync(`${filepath}`, "utf8"); // let lines = content.trim().split("\n"); //新版本:流式读取和逐行处理 const lines = await readLinesFromEnd(filepath, n - count); // logger.info("日:" + lines) fileList.push({ filePath: item, lines: lines, }) count += lines.length; if (count >= n) { break } } fileList.reverse(); const allFile = [].concat(...fileList.map((item) => item.lines)); const lastNLines = allFile.slice(-n).reverse(); return lastNLines; } catch (e) { logger.info('读取文件发生异常咯~~2', e); } } async function getRecentLogs(dirPath, logFiles, requiredLogs = 100) { let logs = []; try { let logsSize = requiredLogs for (const file of logFiles) { let log = []; // 创建文件流 const filepath = `${dirPath}/${file}`; const fileStream = fs.createReadStream(filepath); const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); for await (const line of rl) { log.push(line); } for (const l of log.reverse()) { logs.push(l); if (logs.length >= requiredLogs) { break } } } } catch (e) { logger.info('读取文件发生异常咯~~~', e.message); } return logs; // 如果日志不够500条,返回现有日志 } async function getLatestLogEntries(dirPath, logFiles, requiredLogs = 100) { const MAX_LOGS = requiredLogs; let logs = []; try { for (const file of logFiles) { // 创建文件流 const filepath = `${dirPath}/${file}`; logger.info('文件~~~', filepath); // const readStream = fs.createReadStream(filepath, { highWaterMark: 2 * 1024 * 1024 }); // 每次读取10MB // let buffer = ''; // // logs = await new Promise((resolve,reject) =>{ // // 逐块读取并处理数据 // readStream.on('data', (chunk) => { // buffer += chunk.toString(); // 将块数据转成字符串 // const lines = buffer.split('\n'); // 按行拆分 // // // 处理新行 // for (let i = 0; i < lines.length - 1; i++) { // logs.unshift(lines[i]); // 添加到数组前面以实现倒序 // // logs.push(lines[i]); // 添加到数组前面以实现倒序 // logger.info( lines[i]); // // if (logs.length >= MAX_LOGS) break; // 达到所需条数则停止 // } // // // 保留未处理的最后一行(如果是半行,继续留在缓冲区) // buffer = lines[lines.length - 1]; // }); // // // 读取结束后处理剩余缓冲区 // readStream.on('end', () => { // if (buffer) { // logs.unshift(buffer); // 加入最后的未处理行 // } // resolve(logs); // 完成时解析 Promise // }); // // readStream.on('error', (err) => { // reject(err); // 如果出错,拒绝 Promise // }); // // // 确保在获取到足够记录时停止读取 // readStream.on('close', () => { // if (logs.length >= MAX_LOGS) { // readStream.destroy(); // 如果达到了最大记录数,停止读取流 // }else{ // logger.info('----继续读取~~',logs.length); // } // }); // }) // // logger.info( logs); // // 如果已达到所需行数,停止继续读取文件 // if (logs.length >= MAX_LOGS) { // break; // 退出循环 // } // 文件信息 const stats = fs.statSync(filepath); // 获取文件信息 let position = stats.size; // 从文件末尾开始 const chunkSize = 1 * 1024 * 1024; // 每次读取1MB // 文件读取方式 while (position > 0 && logs.length < MAX_LOGS) { const readSize = Math.min(chunkSize, position); // 确保不读取超过文件的大小 const readStream = fs.createReadStream(filepath, { start: position - readSize, // 从当前位置向上读取 end: position - 1, // 读取到当前位置 highWaterMark: readSize // 一次性读取指定大小 }); // 缓存 let buffer = ''; await new Promise((resolve, reject) => { readStream.on('data', (chunk) => { buffer = chunk.toString() + buffer; // 新数据放在前面 }); readStream.on('end', () => { // 按行处理数据 const lines = buffer.split('\n'); while (lines.length > 0) { const line = lines.pop(); // 取出最新的一行 if (line) { logs.unshift(line); // 添加到日志数组开头 // logger.info(line); if (logs.length >= requiredLogs) break; // 达到要求的条数 } } resolve(); }); readStream.on('error', (err) => { console.error('读取文件发生错误:', err); reject(err); // 确保不会因错误而卡住 }); readStream.on('close', () => { if (logs.length >= MAX_LOGS) { readStream.destroy(); // 如果达到了最大记录数,停止读取流 } else { logger.info('----继续读取~~', logs.length); } }); }); position -= readSize; // 递减位置 } // 如果已达到所需行数,停止继续读取文件 if (logs.length >= MAX_LOGS) { break; // 退出循环 } } } catch (e) { logger.info('读取文件发生异常咯~~~', e.message); } // logger.info( logs); return logs.reverse(); // 如果日志不够500条,返回现有日志 } async function readLinesFromEnd(filePath, maxLines) { const lines = []; const fileSize = fs.statSync(filePath).size; const chunkSize = 1024 * 10 * 5; // 每次读取的块大小 try { if (fileSize < chunkSize) { const fileStream = fs.createReadStream(filePath, { start: Math.max(0, 0), end: fileSize }); let buffer = ''; for await (const chunk of fileStream) { buffer = chunk + buffer; let lineEndIndex = buffer.length; while (lineEndIndex >= 0 && lines.length < maxLines) { const lineStartIndex = buffer.lastIndexOf('\n', lineEndIndex - 1); if (lineStartIndex === -1) { break; } const line = buffer.substring(lineStartIndex + 1, lineEndIndex).trim(); if (line) { lines.push(line); } lineEndIndex = lineStartIndex; } if (lines.length >= maxLines) { break; } } } else { let forNum = (fileSize / chunkSize) + 1; for (let i = 1; i <= forNum; i++) { let position = fileSize - chunkSize * i; let z = position + chunkSize const fileStream = fs.createReadStream(filePath, { start: Math.max(position, 0), end: z }); let buffer = ''; for await (const chunk of fileStream) { buffer = chunk + buffer; let lineEndIndex = buffer.length; while (lineEndIndex >= 0 && lines.length < maxLines) { const lineStartIndex = buffer.lastIndexOf('\n', lineEndIndex - 1); if (lineStartIndex === -1) { break; } const line = buffer.substring(lineStartIndex + 1, lineEndIndex).trim(); if (line) { lines.push(line); } lineEndIndex = lineStartIndex; } if (lines.length >= maxLines) { break; } position -= chunkSize; if (position < 0) { position = 0; } } } } } catch (e) { logger.info('读取文件发生异常咯~~~1', e.message); } return lines.reverse(); } async function readLines(filePath, maxLines) { const fileStream = fs.createReadStream(filePath); const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); const lines = []; for await (const line of rl) { lines.push(line); if (lines.length >= maxLines) { break; } } rl.close(); fileStream.close(); return lines; } module.exports = { dowFile, checkFilePath, checkPathSync, writeFile, copyFileSync, readLastNLines, getLatestLogEntries, getRecentLogs, getLastFile }