@@ -0,0 +1,593 @@
+import os
+import sys
+import json
+import stat
+import time
+import socket
+import shutil
+import base64
+import linecache #读取日志文件
+import requests
+import threading #多线程
+import subprocess
+import traceback #输出报错
+def printf(*args):
+ print('[-]', time.strftime("%H:%M:%S", time.localtime()),'',*args)
+""" 判断端口是否开启"""
+def is_port_open(ip, port):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ s.connect((ip, int(port)))
+ s.shutdown(socket.SHUT_RDWR)
+ return True
+ except Exception as e:
+ pass
+ return False
+""" 放入缓存防止内存过载"""
+def get_line_count(filename):
+ count = 0
+ with open(filename, 'r') as f:
+ while True:
+ buffer = f.read(1024 * 1)
+ if not buffer:
+ break
+ count += buffer.count('\n')
+ return count
+""" 每隔N秒实现"""
+Log_time = {}
+def tlog(data, xsTime, set=0, look=0):
+ global Log_Time
+ if data not in Log_time.keys():
+ Log_time[data] = 0
+ nowTime = int(time.time())
+ if nowTime - Log_time[data] >= xsTime:
+ if not look:
+ Log_time[data] = nowTime
+ return 1
+ elif set:
+ Log_time[data] = nowTime
+ return 0
+''' 获取本地ip'''
+def get_local_ip_list():
+ import netifaces as ni
+ ipList = []
+ # print('检测服务器网络配置')
+ for dev in ni.interfaces():
+ print('dev:',dev)
+ if 'ens' in dev or 'eth' in dev or 'enp' in dev:
+ # print(ni.ifaddresses(dev))
+ for i in ni.ifaddresses(dev)[2]:
+ ip=i['addr']
+ print(f"检测到私有ip:{ip}")
+ if ip not in ipList:
+ ipList.append(ip)
+ print(f"当前服务器私有ip为{ipList}")
+ return ipList
+def write(filePath, data):
+ if "/" in filePath or "\\" in filePath:
+ dirPath = os.path.dirname(filePath)
+ if not os.path.exists(dirPath):
+ os.makedirs(dirPath)
+ try:
+ f = open(filePath, 'wb+') if ".pyc" in filePath else open(filePath, 'w+', encoding='utf-8')
+ f.write(data)
+ f.close()
+ except:
+ print("写入失败", filePath, data)
+def read(filename):
+ if not os.path.exists(filename):
+ return 0
+ f = open(filename, 'r',encoding='utf-8')
+ if f:
+ x = f.read()
+ f.close()
+ return x
+ else:
+ return False
+class api():
+ def __init__(self):
+ self.url = "https://hhh.liangjiang.cc/api/as"
+ self.url2 = ""
+ self.url3 = "https://hhh.liangjiang.cc/api/log/addError"
+ self.rTime = int(time.time()) #启动时间
+ self.debug = 0
+ self.pop = {}
+ self.upTime = 10 #多少秒推送一次盈利状态
+ self.downStatus = 'no' #锁,只能同时进行一个
+ """请求URL"""
+ def getUrl(self, url, post=0, chongshi=0):
+ try:
+ if post:
+ post['rTime'] = self.rTime
+ # print("POST: ", url.replace(self.url, '').replace(self.url2, '').replace(self.url3, ''))
+ r = requests.post(url,data=post,timeout=30)
+ else:
+ print("GET: ",url.replace(self.url, ''))
+ r = requests.get(url,timeout=30)
+ r.encoding = 'utf-8'
+ return json.loads(r.text)
+ except Exception:
+ if self.debug:
+ traceback.print_exc()
+ print(url, post)
+ try:
+ print(r.text)
+ log = "Url:"+url+"\n\nPost:"
+ if post:
+ log += str(post)
+ else:
+ log += "False"
+ log += "\n\n"+r.text
+ path = "../error/Api_"+time.strftime("%H-%M-%S", time.localtime())+".html"
+ print('Error:', path)
+ write(path, log)
+ except:
+ traceback.print_exc()
+ time.sleep(10)
+ if not chongshi:
+ return self.getUrl(url,post,1)
+ return False
+ """ 下载策略文件,20mb"""
+ def downLoad(self, id, name):
+ for i in range(5 * 60):
+ if self.downStatus == 'down':
+ self.upRobot(id, 5)
+ time.sleep(1)
+ else:
+ break
+ else:
+ self.upRobot(id, 6)
+ return 0
+ status = 0
+ self.downStatus = 'down'
+ try:
+ path = f"./{name}"
+ url = f"{self.url2}/{name}"
+ fileSize = 0
+ try:
+ fileSize = os.path.getsize(path)
+ except:
+ fileSize = 0
+ preFileSize = read(path+'.size')
+ if preFileSize and int(fileSize) == int(preFileSize):
+ status = 1
+ print(path, '策略文件已存在,跳过下载...')
+ else:
+ self.upRobot(id, 4)
+ with requests.get(url = url, stream=True) as fget: #get方法
+ # 此时只有响应头被下载
+ WebFileSize = int(fget.headers["Content-Length"])
+ write(path+'.size', str(WebFileSize))
+ if int(fileSize) != WebFileSize:
+ print('-' * 32)
+ print(f"文件名: {name}")
+ print(f"大小: {WebFileSize/(1000**2)}Mb")
+ print('-' * 32)
+ count = 0
+ count_tmp = 0
+ time1 = time.time()
+ with open(path, "wb") as fw:
+ for chunk in fget.iter_content(chunk_size = 512):
+ fw.write(chunk)
+ count += len(chunk)
+ if count >= WebFileSize:
+ print("\n下载完成~")
+ if time.time() - time1 > 1:
+ p = count / WebFileSize * 100
+ speed = (count - count_tmp) / 1024 / 1024 / 2
+ count_tmp = count
+ p2 = '{:.2f}'.format(p)
+ speed2 = '{:.2f}'.format(speed)
+ print(f'下载进度: ' + p2 + '%' + ' 速度: ' + speed2 + 'M/S', end='\r')
+ time1 = time.time()
+ status = 1
+ else:
+ status = 1
+ os.chmod(path, stat.S_IRWXU)
+ except Exception as e:
+ self.upError(traceback.format_exc())
+ self.downStatus = 'no'
+ if status:
+ self.upRobot(id, 7)
+ else:
+ self.upRobot(id, 6)
+ return status
+ """ 删除Py文件"""
+ def delPy(self):
+ return
+ dirName = "./aspy/"
+ try:
+ if os.path.exists(dirName):
+ shutil.rmtree(dirName)
+ printf("删除文件成功")
+ except Exception:
+ printf("删除文件失败 文件被占用")
+ """ 获取任务"""
+ def getTasks(self, post=''):
+ r = self.getUrl(self.url, {'data': base64.b64encode(post.encode())})
+ try:
+ printf(r['msg'])
+ return r
+ except:
+ print(r)
+ """ 更新任务状态"""
+ def upTask(self, id, status, name, msg):
+ if id:
+ r = self.getUrl(self.url+"/upTask", {'id': id, 'name': name, 'status': status, 'msg': msg})
+ try:
+ printf(r['msg'])
+ return r
+ except:
+ print(r)
+ """ 更新机器人运行状态"""
+ def upRobot(self, id, status, data=''):
+ """
+ 0 停止
+ -1 错误
+ 1 运行中
+ 2 启动中
+ 3 停机中
+ 4 下载中
+ 5 等待其他下载
+ 6 下载失败
+ 7 下载完成
+ """
+ if not id:
+ return
+ if data:
+ data = base64.b64encode(json.dumps(data).encode())
+ status = 1
+ r = self.getUrl(self.url+"/upRobot", {'id': id, 'status': status, 'data': data})
+ try:
+ printf(r['msg'])
+ return r
+ except:
+ print(r)
+ """ 运行机器人"""
+ def runRobot(self, data):
+ try:
+ port = data['port']
+ printf(data['name'], data['port'], data['id'], '准备运行机器人')
+ if not tlog(f"{port}启动", 40):
+ return self.upTask(data['task_id'], 1, data['name'], '40s禁止重复启动')
+ if self.status(port):
+ return self.upTask(data['task_id'], 1, data['name'], '已经在运行')
+ if len(str(data['ip2'])) > 1:
+ ip_i = 0
+ iplist = get_local_ip_list()
+ for k in iplist:
+ if k == data['ip2']:
+ print(data['port'], "找到了IP", ip_i, k)
+ data['can'] += "ip = "+str(ip_i)
+ break
+ ip_i += 1
+ else:
+ return self.upTask(data['task_id'], 1, data['name'], '未找到此IP '+data['ip2'])
+ else:
+ data['can'] += "ip = 0"
+ config = "./config/"+str(data['id'])+".toml"
+ pypath = './'+data['ver']
+ """ 下载策略"""
+ self.downLoad(data['id'], data['ver'])
+ """ 写入配置文件"""
+ write(config, data['can'])
+ """ 执行进程"""
+ print(pypath, '--config=', config)
+ self.pop[port] = subprocess.Popen([pypath, f'--config={config}'])
+ self.upRobot(data['id'], 2)
+ self.upTask(data['task_id'], 1, data['name'], '运行成功')
+ except Exception as e:
+ api.upError(traceback.format_exc())
+ self.upRobot(data['id'], -1)
+ """ 终止机器人"""
+ def stopRobot(self, data):
+ port = data['port']
+ printf(data['name'], data['port'], data['id'], '准备停止机器人')
+ if not tlog(f"{data['port']}停止", 20):
+ self.upTask(data['task_id'], 1, data['name'], '20s禁止重复停止')
+ return 1
+ if not self.status(port):
+ self.upTask(data['task_id'], 1, data['name'], '重复停止')
+ return 1
+ try:
+ r = requests.post(f"{port}/exit", data='{"stop": true}', timeout=1)
+ print(port, r.text)
+ if "退出" in r.text:
+ self.upRobot(data['id'], 3)
+ self.upTask(data['task_id'], -1, data['name'], '停机成功')
+ if port in self.pop:
+ try:
+ print(port, "Stop Wait 20s...")
+ self.pop[port].wait(20)
+ except:
+ traceback.print_exc()
+ try:
+ print(port, "Kill")
+ self.pop[port].kill()
+ except:
+ traceback.print_exc()
+ del self.pop[port]
+ return 1
+ else:
+ self.upError(f"停机{data['port']}失败:"+data)
+ except Exception as e:
+ pass
+ self.upTask(data['task_id'], -1, data['name'], '停止失败')
+ return 0
+ """ 重启机器人"""
+ def rebootRebot(self, data):
+ try:
+ port = data['port']
+ self.stopRobot(data)
+ """ 等待退出"""
+ for i in range(40 * 5):
+ if not self.status(data['port']):
+ break
+ print(port, "rebot 等待机器人退出")
+ time.sleep(0.5)
+ continue
+ self.runRobot(data)
+ except Exception as e:
+ self.upError(traceback.format_exc())
+ """ 停止所有机器人并且重启脚本"""
+ def rebotAs(self, data):
+ ports = list(self.pop.keys())
+ if not ports:
+ self.upTask(data['task_id'], 1, data['name'], '无机器人')
+ for port in ports:
+ data['port'] = port
+ t = threading.Thread(target=api.stopRobot, args=(data, ))
+ t.daemon = True
+ t.start()
+ """ 等待退出"""
+ for i in range(40 * 5):
+ for port in ports:
+ if self.status(port):
+ print(port, "rebotAs等待机器人退出")
+ time.sleep(0.5)
+ continue
+ break
+ exit(0)
+ """ 上传错误日志"""
+ def upError(self, error):
+ print(error)
+ print("----------")
+ filePath = '../error/'+time.strftime("%Y-%m-%d_%H_%M_%S", time.localtime())+'.txt'
+ write(filePath, error)
+ self.getUrl(self.url3+'?key=d64a8sc874sa8c4as5', {'serverName': 'As', 'data': base64.b64encode(error.encode())})
+ time.sleep(5)
+ """ 获取机器人运行状态,并且上传状态"""
+ def status(self, port, upload=0):
+ status = 0
+ try:
+ if is_port_open(f"", port):
+ r = requests.get(f"{port}/account", timeout=1)
+ status = r.text
+ except Exception as e:
+ pass
+ # 运行过,但是停机了
+ if port in self.pop and not status and tlog(f"{port}启动", 40, look=1):
+ try:
+ print(port, "Kill")
+ self.pop[port].kill()
+ except:
+ traceback.print_exc()
+ del self.pop[port]
+ self.upRobot(upload, -1) #更新状态为出错
+ if status:
+ printf(port, '机器人已在运行', status)
+ else:
+ printf(port, '机器人没在运行')
+ return status
+ """ 读取log"""
+ def getLog(self, port):
+ try:
+ a_dir = "./"
+ file = 0
+ cTime = 0
+ for listname in os.listdir(a_dir):
+ cur_path = os.path.join(a_dir, listname) #组合新的文件名
+ if f"logs{port}" == listname:
+ for listname2 in os.listdir(cur_path):
+ cur_path2 = os.path.join(cur_path, listname2) #组合新的文件名
+ #找出最新修改的文件
+ obj = os.stat(cur_path2).st_mtime
+ if obj > cTime:
+ cTime = obj
+ file = cur_path2
+ data = ""
+ if file:
+ n = 200
+ linecache.clearcache()
+ line_count = get_line_count(file)
+ line_count = line_count - n - 1
+ for i in range(n):
+ last_line = linecache.getline(file, line_count)
+ data = last_line+data
+ line_count += 1
+ else:
+ data = "未找到文件"
+ return data
+ except Exception as e:
+ api.upError(traceback.format_exc())
+ return "读取出错"
+if __name__=='__main__':
+ api = api()
+ try:
+ os.remove("./as.py")
+ except:
+ pass
+ try:
+ os.remove("./as.pyc")
+ except:
+ pass
+ while 1:
+ try:
+ tasks = api.getTasks()
+ if tasks and tasks['status']:
+ tasks = tasks['data']
+ for data in tasks:
+ #放在后面执行
+ if data['name'] == 'look':
+ continue
+ #启动机器人
+ if data['name'] == 'run':
+ t = threading.Thread(target=api.runRobot, args=(data, ))
+ t.daemon = True
+ t.start()
+ if data['name'] == 'stop':
+ t = threading.Thread(target=api.stopRobot, args=(data, ))
+ t.daemon = True
+ t.start()
+ if data['name'] == 'reboot' and tlog(f"{data['port']}重启", 40):
+ t = threading.Thread(target=api.rebootRebot, args=(data, ))
+ t.daemon = True
+ t.start()
+ if data['name'] == 'rebootAs':
+ api.rebotAs(data)
+ up = []
+ for data in tasks:
+ if data['name'] == 'look':
+ log = api.getLog(data['port']) if data['log'] else 0
+ status = api.status(data['port'], data['id'])
+ if status:
+ up.append([data['id'], 1, status, log])
+ else:
+ up.append([data['id'], 0, '', log])
+ if up:
+ api.upRobot(1, 1, up) #上传盈利状态
+ time.sleep(5) #1s 获取一次任务
+ if tlog("锁定", 30, look=1): #锁定30s
+ api.delPy()
+ except Exception as e:
+ api.upError(traceback.format_exc())