Created
October 16, 2024 10:13
-
-
Save halaprix/610e652be62787533008a933fb6309d5 to your computer and use it in GitHub Desktop.
sENA - claim multiple calldata generator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| from eth_abi import encode | |
| from eth_utils import to_checksum_address | |
| import time | |
| import json | |
| import os | |
| def ensure_folder(folder_name): | |
| if not os.path.exists(folder_name): | |
| os.makedirs(folder_name) | |
| def fetch_and_save_data(address): | |
| checksummed_address = to_checksum_address(address) | |
| proof_file = f"proofs/{checksummed_address}.json" | |
| # Check if we already have the data | |
| if os.path.exists(proof_file): | |
| print(f"Loading existing data for {checksummed_address}") | |
| with open(proof_file, 'r') as f: | |
| return json.load(f) | |
| url = f"https://airdrop-data-ethena.s3.us-west-2.amazonaws.com/{checksummed_address}/0x8db60e956b76def53eaa4a2ca736b51f49c219476676dfd85f95e023039650bd-{checksummed_address}.json" | |
| sleep_time = 1 | |
| print(f"Waiting {sleep_time} seconds before fetching data for {checksummed_address}...") | |
| time.sleep(sleep_time) # 5-second delay | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Save the fetched data immediately | |
| with open(proof_file, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| print(f"Saved data for {checksummed_address}") | |
| return data | |
| else: | |
| print(f"Failed to fetch data for {checksummed_address}") | |
| return None | |
| def process_data(data): | |
| if data['claimed']: | |
| return None | |
| event = data['events'][0] | |
| return { | |
| 'beneficiary': to_checksum_address(event['beneficiary']), | |
| 'amount': int(event['awardAmount']), | |
| 'releaseTime': event['releaseTime'], | |
| 'proofs': event['proofs'] | |
| } | |
| def encode_calldata(processed_data_list): | |
| beneficiaries = [data['beneficiary'] for data in processed_data_list] | |
| amounts = [data['amount'] for data in processed_data_list] | |
| release_times = [data['releaseTime'] for data in processed_data_list] | |
| indexes = [0] * len(amounts) | |
| empty_bytes = b'' | |
| proofs = [[bytes.fromhex(proof[2:]) for proof in data['proofs']] for data in processed_data_list] | |
| encoded_params = encode( | |
| ['address[]', 'uint256[]', 'uint256[]', 'uint256[]', 'uint256[]', 'bytes32[][]'], | |
| [beneficiaries, amounts, amounts, release_times, indexes, proofs] | |
| ) | |
| function_selector = "0x8132b321" | |
| full_calldata = function_selector + encoded_params.hex() | |
| return full_calldata | |
| def process_addresses_in_chunks(addresses, chunk_size=25): | |
| ensure_folder('proofs') | |
| ensure_folder('calldata') | |
| processed_data_list = [] | |
| chunk_counter = 1 | |
| calldata_chunks = [] | |
| for address in addresses: | |
| data = fetch_and_save_data(address) | |
| if data: | |
| processed_data = process_data(data) | |
| if processed_data: | |
| processed_data_list.append(processed_data) | |
| if len(processed_data_list) == chunk_size: | |
| calldata = encode_calldata(processed_data_list) | |
| calldata_chunks.append(calldata) | |
| # Write current chunk to a separate file | |
| chunk_file = f"calldata/chunk_{chunk_counter}.json" | |
| with open(chunk_file, 'w') as f: | |
| json.dump(calldata, f) | |
| print(f"Processed and saved chunk {chunk_counter} with {chunk_size} addresses") | |
| chunk_counter += 1 | |
| processed_data_list = [] # Reset for the next chunk | |
| else: | |
| print(f"Skipping {to_checksum_address(address)} as it has already claimed") | |
| else: | |
| print(f"No data found for {to_checksum_address(address)}") | |
| # Process any remaining addresses | |
| if processed_data_list: | |
| calldata = encode_calldata(processed_data_list) | |
| calldata_chunks.append(calldata) | |
| # Write final chunk to a separate file | |
| chunk_file = f"calldata/chunk_{chunk_counter}.json" | |
| with open(chunk_file, 'w') as f: | |
| json.dump(calldata, f) | |
| print(f"Processed and saved final chunk with {len(processed_data_list)} addresses") | |
| # Write summary of all chunks | |
| summary_file = "calldata/summary.json" | |
| with open(summary_file, 'w') as f: | |
| json.dump({ | |
| "total_chunks": chunk_counter, | |
| "chunks": [f"chunk_{i}.json" for i in range(1, chunk_counter + 1)] | |
| }, f, indent=2) | |
| print(f"Calldata chunks have been written to individual files in the 'calldata' folder") | |
| print(f"Summary of chunks has been written to {summary_file}") | |
| def main(): | |
| # Load addresses from a file or define them here | |
| with open('addresses.txt', 'r') as f: | |
| addresses = [line.strip() for line in f] | |
| process_addresses_in_chunks(addresses) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment