import os import sys import json import stat import time import socket import shutil import base64 import linecache #读取日志文件 import requests import threading #多线程 import subprocess import traceback #输出报错 """ 接受Web指令,上传日志 """ def printf(*args): print('[-]', time.strftime("%H:%M:%S", time.localtime()),'',*args) """ 判断端口是否开启""" def is_port_open(ip, port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.connect((ip, int(port))) s.shutdown(socket.SHUT_RDWR) return True except Exception as e: pass return False """ 放入缓存防止内存过载""" def get_line_count(filename): count = 0 with open(filename, 'r') as f: while True: buffer = f.read(1024 * 1) if not buffer: break count += buffer.count('\n') return count """ 每隔N秒实现""" Log_time = {} def tlog(data, xsTime, set=0, look=0): global Log_Time if data not in Log_time.keys(): Log_time[data] = 0 nowTime = int(time.time()) if nowTime - Log_time[data] >= xsTime: if not look: Log_time[data] = nowTime return 1 elif set: Log_time[data] = nowTime return 0 ''' 获取本地ip''' def get_local_ip_list(): import netifaces as ni ipList = [] # print('检测服务器网络配置') for dev in ni.interfaces(): print('dev:',dev) if 'ens' in dev or 'eth' in dev or 'enp' in dev: # print(ni.ifaddresses(dev)) for i in ni.ifaddresses(dev)[2]: ip=i['addr'] print(f"检测到私有ip:{ip}") if ip not in ipList: ipList.append(ip) print(f"当前服务器私有ip为{ipList}") return ipList """文件写入内容""" def write(filePath, data): if "/" in filePath or "\\" in filePath: dirPath = os.path.dirname(filePath) if not os.path.exists(dirPath): os.makedirs(dirPath) try: f = open(filePath, 'wb+') if ".pyc" in filePath else open(filePath, 'w+', encoding='utf-8') f.write(data) f.close() except: print("写入失败", filePath, data) """读取文件内容""" def read(filename): if not os.path.exists(filename): return 0 f = open(filename, 'r',encoding='utf-8') if f: x = f.read() f.close() return x else: return False class api(): def __init__(self): self.url = "https://hhh.liangjiang.cc/api/as" self.url2 = "http://208.87.206.108:1999/sapp99asd8asvc4" self.url3 = "https://hhh.liangjiang.cc/api/log/addError" self.rTime = int(time.time()) #启动时间 self.debug = 0 self.pop = {} self.upTime = 10 #多少秒推送一次盈利状态 self.downStatus = 'no' #锁,只能同时进行一个 """请求URL""" def getUrl(self, url, post=0, chongshi=0): try: if post: post['rTime'] = self.rTime # print("POST: ", url.replace(self.url, '').replace(self.url2, '').replace(self.url3, '')) r = requests.post(url,data=post,timeout=30) else: print("GET: ",url.replace(self.url, '')) r = requests.get(url,timeout=30) r.encoding = 'utf-8' return json.loads(r.text) except Exception: if self.debug: traceback.print_exc() print(url, post) try: print(r.text) log = "Url:"+url+"\n\nPost:" if post: log += str(post) else: log += "False" log += "\n\n"+r.text path = "../error/Api_"+time.strftime("%H-%M-%S", time.localtime())+".html" print('Error:', path) write(path, log) except: traceback.print_exc() time.sleep(10) if not chongshi: return self.getUrl(url,post,1) return False """ 下载策略文件,20mb""" def downLoad(self, id, name): for i in range(5 * 60): if self.downStatus == 'down': self.upRobot(id, 5) time.sleep(1) else: break else: self.upRobot(id, 6) return 0 status = 0 self.downStatus = 'down' try: path = f"./{name}" url = f"{self.url2}/{name}" fileSize = 0 try: fileSize = os.path.getsize(path) except: fileSize = 0 preFileSize = read(path+'.size') if preFileSize and int(fileSize) == int(preFileSize): status = 1 print(path, '策略文件已存在,跳过下载...') else: self.upRobot(id, 4) with requests.get(url = url, stream=True) as fget: #get方法 # 此时只有响应头被下载 WebFileSize = int(fget.headers["Content-Length"]) write(path+'.size', str(WebFileSize)) if int(fileSize) != WebFileSize: print('-' * 32) print(f"文件名: {name}") print(f"大小: {WebFileSize/(1000**2)}Mb") print('-' * 32) count = 0 count_tmp = 0 time1 = time.time() with open(path, "wb") as fw: for chunk in fget.iter_content(chunk_size = 512): fw.write(chunk) count += len(chunk) if count >= WebFileSize: print("\n下载完成~") if time.time() - time1 > 1: p = count / WebFileSize * 100 speed = (count - count_tmp) / 1024 / 1024 / 2 count_tmp = count p2 = '{:.2f}'.format(p) speed2 = '{:.2f}'.format(speed) print(f'下载进度: ' + p2 + '%' + ' 速度: ' + speed2 + 'M/S', end='\r') time1 = time.time() status = 1 else: status = 1 os.chmod(path, stat.S_IRWXU) except Exception as e: self.upError(traceback.format_exc()) self.downStatus = 'no' if status: self.upRobot(id, 7) else: self.upRobot(id, 6) return status """ 删除Py文件""" def delPy(self): return dirName = "./aspy/" try: if os.path.exists(dirName): shutil.rmtree(dirName) printf("删除文件成功") except Exception: printf("删除文件失败 文件被占用") """ 获取任务""" def getTasks(self, post=''): r = self.getUrl(self.url, {'data': base64.b64encode(post.encode())}) try: printf(r['msg']) return r except: print(r) """ 更新任务状态""" def upTask(self, id, status, name, msg): if id: r = self.getUrl(self.url+"/upTask", {'id': id, 'name': name, 'status': status, 'msg': msg}) try: printf(r['msg']) return r except: print(r) """ 更新机器人运行状态""" def upRobot(self, id, status, data=''): """ 0 停止 -1 错误 1 运行中 2 启动中 3 停机中 4 下载中 5 等待其他下载 6 下载失败 7 下载完成 """ if not id: return if data: data = base64.b64encode(json.dumps(data).encode()) status = 1 r = self.getUrl(self.url+"/upRobot", {'id': id, 'status': status, 'data': data}) try: printf(r['msg']) return r except: print(r) """ 运行机器人""" def runRobot(self, data): try: port = data['port'] printf(data['name'], data['port'], data['id'], '准备运行机器人') if not tlog(f"{port}启动", 40): return self.upTask(data['task_id'], 1, data['name'], '40s禁止重复启动') if self.status(port): return self.upTask(data['task_id'], 1, data['name'], '已经在运行') if len(str(data['ip2'])) > 1: ip_i = 0 iplist = get_local_ip_list() for k in iplist: if k == data['ip2']: print(data['port'], "找到了IP", ip_i, k) data['can'] += "ip = "+str(ip_i) break ip_i += 1 else: return self.upTask(data['task_id'], 1, data['name'], '未找到此IP '+data['ip2']) else: data['can'] += "ip = 0" config = "./config/"+str(data['id'])+".toml" pypath = './'+data['ver'] """ 下载策略""" self.downLoad(data['id'], data['ver']) """ 写入配置文件""" write(config, data['can']) """ 执行进程""" print(pypath, '--config=', config) self.pop[port] = subprocess.Popen([pypath, f'--config={config}']) self.upRobot(data['id'], 2) self.upTask(data['task_id'], 1, data['name'], '运行成功') except Exception as e: api.upError(traceback.format_exc()) self.upRobot(data['id'], -1) """ 终止机器人""" def stopRobot(self, data): port = data['port'] printf(data['name'], data['port'], data['id'], '准备停止机器人') if not tlog(f"{data['port']}停止", 20): self.upTask(data['task_id'], 1, data['name'], '20s禁止重复停止') return 1 if not self.status(port): self.upTask(data['task_id'], 1, data['name'], '重复停止') return 1 try: r = requests.post(f"http://127.0.0.1:{port}/exit", data='{"stop": true}', timeout=1) print(port, r.text) if "退出" in r.text: self.upRobot(data['id'], 3) self.upTask(data['task_id'], -1, data['name'], '停机成功') if port in self.pop: try: print(port, "Stop Wait 20s...") self.pop[port].wait(20) except: traceback.print_exc() try: print(port, "Kill") self.pop[port].kill() except: traceback.print_exc() del self.pop[port] return 1 else: self.upError(f"停机{data['port']}失败:"+data) except Exception as e: pass self.upTask(data['task_id'], -1, data['name'], '停止失败') return 0 """ 重启机器人""" def rebootRebot(self, data): try: port = data['port'] self.stopRobot(data) """ 等待退出""" for i in range(40 * 5): if not self.status(data['port']): break print(port, "rebot 等待机器人退出") time.sleep(0.5) continue self.runRobot(data) except Exception as e: self.upError(traceback.format_exc()) """ 停止所有机器人并且重启脚本""" def rebotAs(self, data): ports = list(self.pop.keys()) if not ports: self.upTask(data['task_id'], 1, data['name'], '无机器人') for port in ports: data['port'] = port t = threading.Thread(target=api.stopRobot, args=(data, )) t.daemon = True t.start() """ 等待退出""" for i in range(40 * 5): for port in ports: if self.status(port): print(port, "rebotAs等待机器人退出") time.sleep(0.5) continue break exit(0) """ 上传错误日志""" def upError(self, error): print(error) print("----------") filePath = '../error/'+time.strftime("%Y-%m-%d_%H_%M_%S", time.localtime())+'.txt' write(filePath, error) self.getUrl(self.url3+'?key=d64a8sc874sa8c4as5', {'serverName': 'As', 'data': base64.b64encode(error.encode())}) time.sleep(5) """ 获取机器人运行状态,并且上传状态""" def status(self, port, upload=0): status = 0 try: if is_port_open(f"127.0.0.1", port): r = requests.get(f"http://127.0.0.1:{port}/account", timeout=1) status = r.text except Exception as e: pass # 运行过,但是停机了 if port in self.pop and not status and tlog(f"{port}启动", 40, look=1): try: print(port, "Kill") self.pop[port].kill() except: traceback.print_exc() del self.pop[port] self.upRobot(upload, -1) #更新状态为出错 if status: printf(port, '机器人已在运行', status) else: printf(port, '机器人没在运行') return status """ 读取log""" def getLog(self, port): try: a_dir = "./" file = 0 cTime = 0 for listname in os.listdir(a_dir): cur_path = os.path.join(a_dir, listname) #组合新的文件名 if f"logs{port}" == listname: for listname2 in os.listdir(cur_path): cur_path2 = os.path.join(cur_path, listname2) #组合新的文件名 #找出最新修改的文件 obj = os.stat(cur_path2).st_mtime if obj > cTime: cTime = obj file = cur_path2 data = "" if file: n = 200 linecache.clearcache() line_count = get_line_count(file) line_count = line_count - n - 1 for i in range(n): last_line = linecache.getline(file, line_count) data = last_line+data line_count += 1 else: data = "未找到文件" return data except Exception as e: api.upError(traceback.format_exc()) return "读取出错" if __name__=='__main__': api = api() try: os.remove("./as.py") except: pass try: os.remove("./as.pyc") except: pass while 1: try: tasks = api.getTasks() if tasks and tasks['status']: tasks = tasks['data'] for data in tasks: #放在后面执行 if data['name'] == 'look': continue #启动机器人 if data['name'] == 'run': t = threading.Thread(target=api.runRobot, args=(data, )) t.daemon = True t.start() if data['name'] == 'stop': t = threading.Thread(target=api.stopRobot, args=(data, )) t.daemon = True t.start() if data['name'] == 'reboot' and tlog(f"{data['port']}重启", 40): t = threading.Thread(target=api.rebootRebot, args=(data, )) t.daemon = True t.start() if data['name'] == 'rebootAs': api.rebotAs(data) up = [] for data in tasks: if data['name'] == 'look': log = api.getLog(data['port']) if data['log'] else 0 status = api.status(data['port'], data['id']) if status: up.append([data['id'], 1, status, log]) else: up.append([data['id'], 0, '', log]) if up: api.upRobot(1, 1, up) #上传盈利状态 time.sleep(5) #1s 获取一次任务 if tlog("锁定", 30, look=1): #锁定30s api.delPy() except Exception as e: api.upError(traceback.format_exc())