|
|
@@ -0,0 +1,118 @@
|
|
|
+import argparse
|
|
|
+import json
|
|
|
+import os
|
|
|
+import time
|
|
|
+from typing import Any, Dict, List
|
|
|
+
|
|
|
+import requests
|
|
|
+
|
|
|
+DEFAULT_AUTH = (
|
|
|
+ "1761464345:272589:0:77f74abf549ad0394e88323b73c56d09cdc66f400a4df9fc52a53887471e0cf2caaa77ac7891aa5b6f0ac8f854c3abe06d2853fd7d5bc74d6b6a8e0fef1240f473d97b07cf43300ddedf4e79ec671039"
|
|
|
+)
|
|
|
+BASE_URL = "https://mainnet.zklighter.elliot.ai/api/v1/leaderboard"
|
|
|
+
|
|
|
+
|
|
|
+def load_filtered_accounts(path: str) -> List[Dict[str, Any]]:
|
|
|
+ with open(path, "r", encoding="utf-8") as f:
|
|
|
+ return json.load(f)
|
|
|
+
|
|
|
+
|
|
|
+def extract_addresses(accounts: List[Dict[str, Any]]) -> List[str]:
|
|
|
+ seen = set()
|
|
|
+ addrs: List[str] = []
|
|
|
+ for acc in accounts:
|
|
|
+ addr = acc.get("l1_address")
|
|
|
+ if isinstance(addr, str) and addr and addr not in seen:
|
|
|
+ seen.add(addr)
|
|
|
+ addrs.append(addr)
|
|
|
+ return addrs
|
|
|
+
|
|
|
+
|
|
|
+def read_existing_output(path: str) -> Dict[str, Any]:
|
|
|
+ if os.path.exists(path):
|
|
|
+ try:
|
|
|
+ with open(path, "r", encoding="utf-8") as f:
|
|
|
+ return json.load(f)
|
|
|
+ except Exception:
|
|
|
+ return {}
|
|
|
+ return {}
|
|
|
+
|
|
|
+
|
|
|
+def save_output(path: str, data: Dict[str, Any]) -> None:
|
|
|
+ tmp_path = path + ".tmp"
|
|
|
+ with open(tmp_path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
+ os.replace(tmp_path, path)
|
|
|
+
|
|
|
+
|
|
|
+def try_json(resp: requests.Response) -> Any:
|
|
|
+ try:
|
|
|
+ return resp.json()
|
|
|
+ except ValueError:
|
|
|
+ return {"text": resp.text}
|
|
|
+
|
|
|
+
|
|
|
+def fetch(session: requests.Session, address: str, auth: str, timeout: int = 15) -> Dict[str, Any]:
|
|
|
+ headers = {"authorization": auth}
|
|
|
+ params = {"type": "weekly", "l1_address": address}
|
|
|
+ print(params)
|
|
|
+ print(headers)
|
|
|
+ try:
|
|
|
+ resp = session.get(BASE_URL, headers=headers, params=params, timeout=timeout)
|
|
|
+ return {"status_code": resp.status_code, "body": try_json(resp)}
|
|
|
+ except requests.RequestException as e:
|
|
|
+ return {"error": str(e)}
|
|
|
+
|
|
|
+
|
|
|
+def run(input_path: str, output_path: str, period: float, auth: str) -> None:
|
|
|
+ accounts = load_filtered_accounts(input_path)
|
|
|
+ addrs = extract_addresses(accounts)
|
|
|
+ results = read_existing_output(output_path)
|
|
|
+ session = requests.Session()
|
|
|
+
|
|
|
+ count = 0
|
|
|
+ for addr in addrs:
|
|
|
+ if addr in results:
|
|
|
+ continue
|
|
|
+ data = fetch(session, addr, auth)
|
|
|
+ results[addr] = data
|
|
|
+ save_output(output_path, results)
|
|
|
+ count += 1
|
|
|
+ print(f"Fetched {addr}: {'OK' if 'body' in data else 'ERR'}, {data}")
|
|
|
+ time.sleep(period)
|
|
|
+
|
|
|
+ print(f"Done. Total fetched: {count}. Output: {output_path}")
|
|
|
+
|
|
|
+
|
|
|
+def main() -> None:
|
|
|
+ parser = argparse.ArgumentParser(
|
|
|
+ description="Fetch leaderboard data for l1_addresses from filtered_accounts.json"
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--input",
|
|
|
+ default=os.path.join("src", "lighter", "filtered_accounts.json"),
|
|
|
+ help="Input filtered accounts JSON path",
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--output",
|
|
|
+ default=os.path.join("src", "lighter", "leaderboard_results.json"),
|
|
|
+ help="Output JSON path",
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--period",
|
|
|
+ type=float,
|
|
|
+ default=2.0,
|
|
|
+ help="Seconds between requests (default: 2.0)",
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--auth",
|
|
|
+ default=DEFAULT_AUTH,
|
|
|
+ help="Authorization header value",
|
|
|
+ )
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ run(args.input, args.output, args.period, args.auth)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|