-
由 spiritysdx 创作于由 spiritysdx 创作于
scripts_trigger_ckorder.py 10.90 KiB
# -*- coding:utf-8 -*-
# 作者仓库:https://jihulab.com/spiritlhl/qinglong_auto_tools.git
# 觉得不错麻烦点个star谢谢
# 频道:https://t.me/qinglong_auto_tools
'''
cron: 1
new Env('单容器 二叉树随机ck顺序');
'''
# 初次运行生成原始顺序模板文件 trigger_cookies.json,备份原始ck文件 allck.txt
# 可每次运行重新生成模板,在配置文件中配置 export ec_write_cks="true" 开启该功能
# 默认保持前6位ck顺序不变,有需要在配置文件中配置 export ec_head_cks="具体几个" 更改数量
# 可配置随机顺序时给ck备注标上原始顺序,如果备注已存在,则保留原始备注不更改
# 禁用的ck自动后置,非ck变量全部自动前置
# 默认任务定时自行修改
import os
import time
import json
import re
import random
print("不要在主容器运行!!!本脚本每运行一次少一个ck,请在分容器运行!!!")
try:
import requests
except Exception as e:
print(e, "\n缺少requests 模块,请执行命令安装:python3 -m pip install requests")
exit(3)
requests.packages.urllib3.disable_warnings()
ql_auth_path = '/ql/config/auth.json'
try:
head = int(os.environ["ec_head_cks"])
print("已配置保留前{}位ck顺序做车头".format(head))
except:
head = 6
print("#默认只保留前6位ck做车头,有需求")
print("#请在配置文件中配置\nexport ec_head_cks=\"具体几个\" \n#更改车头数量\n")
print("===================================")
print("已设置保留前{}位顺序不改变\n".format(head))
def __get_token() -> str or None:
with open(ql_auth_path, 'r', encoding='utf-8') as f:
j_data = json.load(f)
return j_data.get('token')
def __get__headers() -> dict:
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json;charset=UTF-8',
'Authorization': 'Bearer ' + __get_token()
}
return headers
# 查询环境变量
def get_envs(name: str = None) -> list:
params = {
't': int(time.time() * 1000)
}
if name is not None:
params['searchValue'] = name
res = requests.get(ql_url + '/api/envs', headers=__get__headers(), params=params)
j_data = res.json()
if j_data['code'] == 200:
return j_data['data']
return []
def gettimestamp():
return str(int(time.time() * 1000))
def gettoken(self):
self.headers.update({"Authorization": "Bearer " + __get_token()})
def login(self):
gettoken(self)
def getallenv(self, baseurl, typ):
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
r = self.get(url)
item = json.loads(r.text)["data"]
return item
def getitem(self, baseurl, key, typ):
url = baseurl + typ + "/envs?searchValue=%s&t=%s" % (key, gettimestamp())
r = self.get(url)
item = json.loads(r.text)["data"]
return item
def getckitem(self, baseurl, key, typ):
url = baseurl + typ + "/envs?searchValue=JD_COOKIE&t=%s" % gettimestamp() # JD_COOKIE为默认的环境变量名,该变量里的值默认含pt_pin和pt_key,其他类似默认按照下面注释改
r = self.get(url)
for i in json.loads(r.text)["data"]:
if key in i["value"]:
return i
return []
def update(self, baseurl, typ, value, qlid, remarks):
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
data = {
"name": "JD_COOKIE",
"value": value,
"_id": qlid,
"remarks": remarks
}
r = self.put(url, data=json.dumps(data))
if json.loads(r.text)["code"] == 200:
return True
else:
data = {
"name": "JD_COOKIE",
"value": value,
"id": qlid,
"remarks": remarks
}
r = self.put(url, data=json.dumps(data))
if json.loads(r.text)["code"] == 200:
return True
return False
def insert(self, baseurl, typ, name, value, remarks):
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
data = []
if remarks == None:
data_json = {
"value": value,
"name": name
}
else:
data_json = {
"value": value,
"name": name,
'remarks': remarks
}
data.append(data_json)
r = self.post(url, json.dumps(data))
if json.loads(r.text)["code"] == 200:
return r.json()['data']
else:
return False
def delete(self, baseurl, typ, value):
url = baseurl + typ + "/envs?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
data = value
r = self.delete(url, data=json.dumps(data))
if json.loads(r.text)["code"] == 200:
return True
else:
return False
def disable(self, baseurl, typ, ids):
url = baseurl + typ + "/envs/disable?t=%s" % gettimestamp()
self.headers.update({"Content-Type": "application/json;charset=UTF-8", 'Connection': 'close'})
r = self.put(url, data=json.dumps(ids))
if json.loads(r.text)["code"] == 200:
return True
else:
return False
if __name__ == '__main__':
s = requests.session()
login(s)
try:
ql_url = 'http://localhost:5700/'
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
except:
ql_url = 'http://localhost:5600/'
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
# # 备份ck
# if os.path.exists("./allck.txt") == False:
# temp = []
# for i in cookies:
# temp.append(i["value"] + "\n")
# with open("./allck.txt", "w") as ffp:
# ffp.writelines(temp)
# print("初次运行,自动备份ck到allck.txt文件里,有需要恢复初始状态自取\n")
# else:
# print("脚本管理根目录下allck.txt文件已存在,有需要恢复初始状态自取\n")
# pass
# 备份
tt = "trigger_cookies.json"
allenv = getallenv(s, ql_url, "api")
try:
os.environ['ec_backup_ck']
ec_backup_ck = os.environ['ec_backup_ck']
except:
ec_backup_ck = 'true'
if ec_backup_ck == 'true' and os.path.exists('./' + tt) != True:
with open(tt, "w", encoding="utf-8") as fp:
json.dump(allenv, fp)
print("已备份原有环境变量至{},有需要还原请使用二叉树还原环境变量脚本还原".format(tt))
print("如果不需要备份,请设置export ec_backup_ck=\"false\"")
else:
print("上次备份文件{}还存在或已配置无需备份,本次运行不进行备份".format(tt))
####################################################################################
try:
ql_url = 'http://localhost:5700/'
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
except:
ql_url = 'http://localhost:5600/'
cookies = getitem(s, ql_url, "JD_COOKIE", "api")
allenv = getallenv(s, ql_url, "api")
jdcount = 0
result_list = []
he_list = []
head_list = []
random_list = []
disable_list = []
for i in allenv:
if "JD_COOKIE" == i["name"]:
jdcount += 1
else:
he_list.append(i)
continue
if jdcount <= head:
head_list.append(i)
continue
if i["status"] != 0:
disable_list.append(i)
continue
head_len = len(head_list) + len(he_list)
# 随机化启用的cookie顺序,保留指定位置不变
res = random.sample(range(head, len(cookies)), len(cookies) - head)
print("固定的ck顺序")
print("pin\t\t\t\t上次\t 本次")
for i in range(0, head):
c = re.findall(r"pt_pin=(.*?);", cookies[i]["value"])[0][-16:].ljust(16)
print("{}\t\t{}\t\t{}".format(c, i + 1, i + 1))
print("本次随机ck的顺序")
print("pin\t\t\t\t上次\t 本次")
temp = []
ct = 1
for i in res:
random_list.append(cookies[i - 1])
c = re.findall(r"pt_pin=(.*?);", cookies[i - 1]["value"])[0][-16:].ljust(16)
print("{}\t\t{}\t\t{}".format(c, i, ct + head))
ct += 1
result_list.extend(head_list)
# 与后置黑号脚本匹配
tp1 = []
tp2 = []
try:
ec_blackkey = os.environ["ec_blackkey"]
for i in random_list:
try:
if i['remarks'] == ec_blackkey:
tp2.append(i)
else:
tp1.append(i)
except:
tp1.append(i)
random_list = tp1 + tp2
except:
pass
tp1 = []
tp2 = []
try:
for i in random_list:
try:
if i['remarks'] == '黑号':
tp2.append(i)
else:
tp1.append(i)
except:
tp1.append(i)
random_list = tp1 + tp2
except:
pass
# 去重
tp1 = []
value_tp = []
for i in random_list:
if i['value'] not in value_tp:
tp1.append(i)
value_tp.append(i['value'])
random_list = tp1
he_count = len(he_list)
result_list.extend(random_list)
result_list.extend(disable_list)
# 去重
tp1 = []
value_tp = []
for i in result_list:
if i['value'] not in value_tp:
tp1.append(i)
value_tp.append(i['value'])
result_list = tp1
he_list.extend(result_list)
result_list = he_list
for i in allenv:
try:
c = delete(s, ql_url, "api", [i['_id']])
except:
c = delete(s, ql_url, "api", [i['id']])
time.sleep(2)
print(len(result_list))
for i in result_list:
try:
i['remarks']
c = insert(s, ql_url, "api", i['name'], i['value'], i['remarks'])
if i["status"] != 0:
try:
disable(s, ql_url, 'api', [c[0]['_id']])
except:
disable(s, ql_url, 'api', [c[0]['id']])
except:
c = insert(s, ql_url, "api", i['name'], i['value'], None)
if i["status"] != 0:
try:
disable(s, ql_url, 'api', [c[0]['_id']])
except:
disable(s, ql_url, 'api', [c[0]['id']])
print("已前置非ck变量共{}个,车头ck共{}个,随机顺序ck共{}个,后置禁用ck共{}个".format(he_count, len(head_list), len(random_list),
len(disable_list)))
print("============================================")
print("脚本执行完毕")