Skip to content

Instantly share code, notes, and snippets.

@halaprix
Created October 16, 2024 10:13
Show Gist options
  • Select an option

  • Save halaprix/610e652be62787533008a933fb6309d5 to your computer and use it in GitHub Desktop.

Select an option

Save halaprix/610e652be62787533008a933fb6309d5 to your computer and use it in GitHub Desktop.
sENA - claim multiple calldata generator
import requests
from eth_abi import encode
from eth_utils import to_checksum_address
import time
import json
import os
def ensure_folder(folder_name):
if not os.path.exists(folder_name):
os.makedirs(folder_name)
def fetch_and_save_data(address):
checksummed_address = to_checksum_address(address)
proof_file = f"proofs/{checksummed_address}.json"
# Check if we already have the data
if os.path.exists(proof_file):
print(f"Loading existing data for {checksummed_address}")
with open(proof_file, 'r') as f:
return json.load(f)
url = f"https://airdrop-data-ethena.s3.us-west-2.amazonaws.com/{checksummed_address}/0x8db60e956b76def53eaa4a2ca736b51f49c219476676dfd85f95e023039650bd-{checksummed_address}.json"
sleep_time = 1
print(f"Waiting {sleep_time} seconds before fetching data for {checksummed_address}...")
time.sleep(sleep_time) # 5-second delay
response = requests.get(url)
if response.status_code == 200:
data = response.json()
# Save the fetched data immediately
with open(proof_file, 'w') as f:
json.dump(data, f, indent=2)
print(f"Saved data for {checksummed_address}")
return data
else:
print(f"Failed to fetch data for {checksummed_address}")
return None
def process_data(data):
if data['claimed']:
return None
event = data['events'][0]
return {
'beneficiary': to_checksum_address(event['beneficiary']),
'amount': int(event['awardAmount']),
'releaseTime': event['releaseTime'],
'proofs': event['proofs']
}
def encode_calldata(processed_data_list):
beneficiaries = [data['beneficiary'] for data in processed_data_list]
amounts = [data['amount'] for data in processed_data_list]
release_times = [data['releaseTime'] for data in processed_data_list]
indexes = [0] * len(amounts)
empty_bytes = b''
proofs = [[bytes.fromhex(proof[2:]) for proof in data['proofs']] for data in processed_data_list]
encoded_params = encode(
['address[]', 'uint256[]', 'uint256[]', 'uint256[]', 'uint256[]', 'bytes32[][]'],
[beneficiaries, amounts, amounts, release_times, indexes, proofs]
)
function_selector = "0x8132b321"
full_calldata = function_selector + encoded_params.hex()
return full_calldata
def process_addresses_in_chunks(addresses, chunk_size=25):
ensure_folder('proofs')
ensure_folder('calldata')
processed_data_list = []
chunk_counter = 1
calldata_chunks = []
for address in addresses:
data = fetch_and_save_data(address)
if data:
processed_data = process_data(data)
if processed_data:
processed_data_list.append(processed_data)
if len(processed_data_list) == chunk_size:
calldata = encode_calldata(processed_data_list)
calldata_chunks.append(calldata)
# Write current chunk to a separate file
chunk_file = f"calldata/chunk_{chunk_counter}.json"
with open(chunk_file, 'w') as f:
json.dump(calldata, f)
print(f"Processed and saved chunk {chunk_counter} with {chunk_size} addresses")
chunk_counter += 1
processed_data_list = [] # Reset for the next chunk
else:
print(f"Skipping {to_checksum_address(address)} as it has already claimed")
else:
print(f"No data found for {to_checksum_address(address)}")
# Process any remaining addresses
if processed_data_list:
calldata = encode_calldata(processed_data_list)
calldata_chunks.append(calldata)
# Write final chunk to a separate file
chunk_file = f"calldata/chunk_{chunk_counter}.json"
with open(chunk_file, 'w') as f:
json.dump(calldata, f)
print(f"Processed and saved final chunk with {len(processed_data_list)} addresses")
# Write summary of all chunks
summary_file = "calldata/summary.json"
with open(summary_file, 'w') as f:
json.dump({
"total_chunks": chunk_counter,
"chunks": [f"chunk_{i}.json" for i in range(1, chunk_counter + 1)]
}, f, indent=2)
print(f"Calldata chunks have been written to individual files in the 'calldata' folder")
print(f"Summary of chunks has been written to {summary_file}")
def main():
# Load addresses from a file or define them here
with open('addresses.txt', 'r') as f:
addresses = [line.strip() for line in f]
process_addresses_in_chunks(addresses)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment