123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593 |
- import os
- import sys
- import json
- import stat
- import time
- import socket
- import shutil
- import base64
- import linecache #读取日志文件
- import requests
- import threading #多线程
- import subprocess
- import traceback #输出报错
- """
- 接受Web指令,上传日志
- """
- def printf(*args):
- print('[-]', time.strftime("%H:%M:%S", time.localtime()),'',*args)
- """ 判断端口是否开启"""
- def is_port_open(ip, port):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- s.connect((ip, int(port)))
- s.shutdown(socket.SHUT_RDWR)
- return True
- except Exception as e:
- pass
-
- return False
- """ 放入缓存防止内存过载"""
- def get_line_count(filename):
- count = 0
- with open(filename, 'r') as f:
- while True:
- buffer = f.read(1024 * 1)
- if not buffer:
- break
- count += buffer.count('\n')
- return count
- """ 每隔N秒实现"""
- Log_time = {}
- def tlog(data, xsTime, set=0, look=0):
- global Log_Time
- if data not in Log_time.keys():
- Log_time[data] = 0
- nowTime = int(time.time())
- if nowTime - Log_time[data] >= xsTime:
- if not look:
- Log_time[data] = nowTime
- return 1
- elif set:
- Log_time[data] = nowTime
- return 0
- ''' 获取本地ip'''
- def get_local_ip_list():
- import netifaces as ni
- ipList = []
- # print('检测服务器网络配置')
- for dev in ni.interfaces():
- print('dev:',dev)
- if 'ens' in dev or 'eth' in dev or 'enp' in dev:
- # print(ni.ifaddresses(dev))
- for i in ni.ifaddresses(dev)[2]:
- ip=i['addr']
- print(f"检测到私有ip:{ip}")
- if ip not in ipList:
- ipList.append(ip)
- print(f"当前服务器私有ip为{ipList}")
- return ipList
-
- """文件写入内容"""
- def write(filePath, data):
- if "/" in filePath or "\\" in filePath:
- dirPath = os.path.dirname(filePath)
-
- if not os.path.exists(dirPath):
- os.makedirs(dirPath)
- try:
- f = open(filePath, 'wb+') if ".pyc" in filePath else open(filePath, 'w+', encoding='utf-8')
- f.write(data)
- f.close()
- except:
- print("写入失败", filePath, data)
- """读取文件内容"""
- def read(filename):
- if not os.path.exists(filename):
- return 0
- f = open(filename, 'r',encoding='utf-8')
- if f:
- x = f.read()
- f.close()
- return x
- else:
- return False
- class api():
-
- def __init__(self):
- self.url = "https://hhh.liangjiang.cc/api/as"
- self.url2 = "http://208.87.206.108:1999/sapp99asd8asvc4"
- self.url3 = "https://hhh.liangjiang.cc/api/log/addError"
- self.rTime = int(time.time()) #启动时间
- self.debug = 0
- self.pop = {}
- self.upTime = 10 #多少秒推送一次盈利状态
- self.downStatus = 'no' #锁,只能同时进行一个
-
- """请求URL"""
- def getUrl(self, url, post=0, chongshi=0):
- try:
- if post:
- post['rTime'] = self.rTime
- # print("POST: ", url.replace(self.url, '').replace(self.url2, '').replace(self.url3, ''))
- r = requests.post(url,data=post,timeout=30)
- else:
- print("GET: ",url.replace(self.url, ''))
- r = requests.get(url,timeout=30)
- r.encoding = 'utf-8'
- return json.loads(r.text)
- except Exception:
- if self.debug:
- traceback.print_exc()
- print(url, post)
- try:
- print(r.text)
- log = "Url:"+url+"\n\nPost:"
- if post:
- log += str(post)
- else:
- log += "False"
- log += "\n\n"+r.text
- path = "../error/Api_"+time.strftime("%H-%M-%S", time.localtime())+".html"
- print('Error:', path)
- write(path, log)
- except:
- traceback.print_exc()
- time.sleep(10)
-
- if not chongshi:
- return self.getUrl(url,post,1)
- return False
- """ 下载策略文件,20mb"""
- def downLoad(self, id, name):
- for i in range(5 * 60):
- if self.downStatus == 'down':
- self.upRobot(id, 5)
- time.sleep(1)
- else:
- break
- else:
- self.upRobot(id, 6)
- return 0
- status = 0
- self.downStatus = 'down'
- try:
- path = f"./{name}"
- url = f"{self.url2}/{name}"
- fileSize = 0
- try:
- fileSize = os.path.getsize(path)
- except:
- fileSize = 0
- preFileSize = read(path+'.size')
- if preFileSize and int(fileSize) == int(preFileSize):
- status = 1
- print(path, '策略文件已存在,跳过下载...')
- else:
- self.upRobot(id, 4)
- with requests.get(url = url, stream=True) as fget: #get方法
- # 此时只有响应头被下载
- WebFileSize = int(fget.headers["Content-Length"])
- write(path+'.size', str(WebFileSize))
- if int(fileSize) != WebFileSize:
- print('-' * 32)
- print(f"文件名: {name}")
- print(f"大小: {WebFileSize/(1000**2)}Mb")
- print('-' * 32)
-
- count = 0
- count_tmp = 0
- time1 = time.time()
- with open(path, "wb") as fw:
- for chunk in fget.iter_content(chunk_size = 512):
- fw.write(chunk)
- count += len(chunk)
- if count >= WebFileSize:
- print("\n下载完成~")
- if time.time() - time1 > 1:
- p = count / WebFileSize * 100
- speed = (count - count_tmp) / 1024 / 1024 / 2
- count_tmp = count
- p2 = '{:.2f}'.format(p)
- speed2 = '{:.2f}'.format(speed)
- print(f'下载进度: ' + p2 + '%' + ' 速度: ' + speed2 + 'M/S', end='\r')
- time1 = time.time()
- status = 1
- else:
- status = 1
-
- os.chmod(path, stat.S_IRWXU)
- except Exception as e:
- self.upError(traceback.format_exc())
-
- self.downStatus = 'no'
- if status:
- self.upRobot(id, 7)
- else:
- self.upRobot(id, 6)
- return status
- """ 删除Py文件"""
- def delPy(self):
- return
- dirName = "./aspy/"
- try:
- if os.path.exists(dirName):
- shutil.rmtree(dirName)
- printf("删除文件成功")
- except Exception:
- printf("删除文件失败 文件被占用")
- """ 获取任务"""
- def getTasks(self, post=''):
- r = self.getUrl(self.url, {'data': base64.b64encode(post.encode())})
- try:
- printf(r['msg'])
- return r
- except:
- print(r)
- """ 更新任务状态"""
- def upTask(self, id, status, name, msg):
- if id:
- r = self.getUrl(self.url+"/upTask", {'id': id, 'name': name, 'status': status, 'msg': msg})
- try:
- printf(r['msg'])
- return r
- except:
- print(r)
- """ 更新机器人运行状态"""
- def upRobot(self, id, status, data=''):
- """
- 0 停止
- -1 错误
- 1 运行中
- 2 启动中
- 3 停机中
- 4 下载中
- 5 等待其他下载
- 6 下载失败
- 7 下载完成
- """
- if not id:
- return
- if data:
- data = base64.b64encode(json.dumps(data).encode())
- status = 1
- r = self.getUrl(self.url+"/upRobot", {'id': id, 'status': status, 'data': data})
- try:
- printf(r['msg'])
- return r
- except:
- print(r)
- """ 运行机器人"""
- def runRobot(self, data):
- try:
- port = data['port']
- printf(data['name'], data['port'], data['id'], '准备运行机器人')
- if not tlog(f"{port}启动", 40):
- return self.upTask(data['task_id'], 1, data['name'], '40s禁止重复启动')
- if self.status(port):
- return self.upTask(data['task_id'], 1, data['name'], '已经在运行')
- if len(str(data['ip2'])) > 1:
- ip_i = 0
- iplist = get_local_ip_list()
- for k in iplist:
- if k == data['ip2']:
- print(data['port'], "找到了IP", ip_i, k)
- data['can'] += "ip = "+str(ip_i)
- break
- ip_i += 1
- else:
- return self.upTask(data['task_id'], 1, data['name'], '未找到此IP '+data['ip2'])
- else:
- data['can'] += "ip = 0"
- config = "./config/"+str(data['id'])+".toml"
- pypath = './'+data['ver']
- """ 下载策略"""
- self.downLoad(data['id'], data['ver'])
-
- """ 写入配置文件"""
- write(config, data['can'])
- """ 执行进程"""
- print(pypath, '--config=', config)
- self.pop[port] = subprocess.Popen([pypath, f'--config={config}'])
-
- self.upRobot(data['id'], 2)
- self.upTask(data['task_id'], 1, data['name'], '运行成功')
- except Exception as e:
- api.upError(traceback.format_exc())
- self.upRobot(data['id'], -1)
- """ 终止机器人"""
- def stopRobot(self, data):
- port = data['port']
- printf(data['name'], data['port'], data['id'], '准备停止机器人')
- if not tlog(f"{data['port']}停止", 20):
- self.upTask(data['task_id'], 1, data['name'], '20s禁止重复停止')
- return 1
- if not self.status(port):
- self.upTask(data['task_id'], 1, data['name'], '重复停止')
- return 1
-
- try:
- r = requests.post(f"http://127.0.0.1:{port}/exit", data='{"stop": true}', timeout=1)
- print(port, r.text)
- if "退出" in r.text:
- self.upRobot(data['id'], 3)
- self.upTask(data['task_id'], -1, data['name'], '停机成功')
- if port in self.pop:
- try:
- print(port, "Stop Wait 20s...")
- self.pop[port].wait(20)
- except:
- traceback.print_exc()
- try:
- print(port, "Kill")
- self.pop[port].kill()
- except:
- traceback.print_exc()
- del self.pop[port]
- return 1
- else:
- self.upError(f"停机{data['port']}失败:"+data)
- except Exception as e:
- pass
- self.upTask(data['task_id'], -1, data['name'], '停止失败')
- return 0
- """ 重启机器人"""
- def rebootRebot(self, data):
- try:
- port = data['port']
- self.stopRobot(data)
- """ 等待退出"""
- for i in range(40 * 5):
- if not self.status(data['port']):
- break
- print(port, "rebot 等待机器人退出")
- time.sleep(0.5)
- continue
- self.runRobot(data)
- except Exception as e:
- self.upError(traceback.format_exc())
- """ 停止所有机器人并且重启脚本"""
- def rebotAs(self, data):
- ports = list(self.pop.keys())
- if not ports:
- self.upTask(data['task_id'], 1, data['name'], '无机器人')
- for port in ports:
- data['port'] = port
- t = threading.Thread(target=api.stopRobot, args=(data, ))
- t.daemon = True
- t.start()
-
- """ 等待退出"""
- for i in range(40 * 5):
- for port in ports:
- if self.status(port):
- print(port, "rebotAs等待机器人退出")
- time.sleep(0.5)
- continue
- break
-
- exit(0)
- """ 上传错误日志"""
- def upError(self, error):
- print(error)
- print("----------")
- filePath = '../error/'+time.strftime("%Y-%m-%d_%H_%M_%S", time.localtime())+'.txt'
- write(filePath, error)
- self.getUrl(self.url3+'?key=d64a8sc874sa8c4as5', {'serverName': 'As', 'data': base64.b64encode(error.encode())})
- time.sleep(5)
- """ 获取机器人运行状态,并且上传状态"""
- def status(self, port, upload=0):
- status = 0
- try:
- if is_port_open(f"127.0.0.1", port):
- r = requests.get(f"http://127.0.0.1:{port}/account", timeout=1)
- status = r.text
- except Exception as e:
- pass
-
- # 运行过,但是停机了
- if port in self.pop and not status and tlog(f"{port}启动", 40, look=1):
- try:
- print(port, "Kill")
- self.pop[port].kill()
- except:
- traceback.print_exc()
- del self.pop[port]
- self.upRobot(upload, -1) #更新状态为出错
-
- if status:
- printf(port, '机器人已在运行', status)
- else:
- printf(port, '机器人没在运行')
- return status
- """ 读取log"""
- def getLog(self, port):
- try:
- a_dir = "./"
- file = 0
- cTime = 0
- for listname in os.listdir(a_dir):
- cur_path = os.path.join(a_dir, listname) #组合新的文件名
- if f"logs{port}" == listname:
-
- for listname2 in os.listdir(cur_path):
- cur_path2 = os.path.join(cur_path, listname2) #组合新的文件名
- #找出最新修改的文件
- obj = os.stat(cur_path2).st_mtime
- if obj > cTime:
- cTime = obj
- file = cur_path2
- data = ""
- if file:
- n = 200
- linecache.clearcache()
- line_count = get_line_count(file)
- line_count = line_count - n - 1
- for i in range(n):
- last_line = linecache.getline(file, line_count)
- data = last_line+data
- line_count += 1
- else:
- data = "未找到文件"
- return data
-
- except Exception as e:
- api.upError(traceback.format_exc())
- return "读取出错"
- if __name__=='__main__':
- api = api()
- try:
- os.remove("./as.py")
- except:
- pass
- try:
- os.remove("./as.pyc")
- except:
- pass
- while 1:
- try:
- tasks = api.getTasks()
- if tasks and tasks['status']:
- tasks = tasks['data']
- for data in tasks:
- #放在后面执行
- if data['name'] == 'look':
- continue
-
- #启动机器人
- if data['name'] == 'run':
- t = threading.Thread(target=api.runRobot, args=(data, ))
- t.daemon = True
- t.start()
- if data['name'] == 'stop':
- t = threading.Thread(target=api.stopRobot, args=(data, ))
- t.daemon = True
- t.start()
- if data['name'] == 'reboot' and tlog(f"{data['port']}重启", 40):
- t = threading.Thread(target=api.rebootRebot, args=(data, ))
- t.daemon = True
- t.start()
-
- if data['name'] == 'rebootAs':
- api.rebotAs(data)
- up = []
- for data in tasks:
- if data['name'] == 'look':
- log = api.getLog(data['port']) if data['log'] else 0
- status = api.status(data['port'], data['id'])
- if status:
- up.append([data['id'], 1, status, log])
- else:
- up.append([data['id'], 0, '', log])
- if up:
- api.upRobot(1, 1, up) #上传盈利状态
- time.sleep(5) #1s 获取一次任务
- if tlog("锁定", 30, look=1): #锁定30s
- api.delPy()
- except Exception as e:
- api.upError(traceback.format_exc())
|