Created
May 6, 2022 13:59
-
-
Save munanadi/952cd38648d680dc738034b2e98de180 to your computer and use it in GitHub Desktop.
Watches the latest txns for SwapEvents and adds them to the table
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const express = require("express"); | |
| const axios = require("axios"); | |
| const { Connection, Keypair, PublicKey } = require("@solana/web3.js"); | |
| const { IDL } = require("@cykura/sdk"); | |
| const anchor = require("@project-serum/anchor"); | |
| const { buildCoderMap } = require("@saberhq/anchor-contrib"); | |
| const { Program, Provider, Wallet } = anchor; | |
| const PROGRAM_ADD = new PublicKey( | |
| "cysPXAjehMpVKUapzbMCCnpFxUFFryEWEaLgnb9NrR8" | |
| ); | |
| const PORT = process.env.PORT || 5000; | |
| const app = express(); | |
| const POST_URL = | |
| "http://hasuragraphqlapi-loadbalancer-1955656816.ap-south-1.elb.amazonaws.com/v1/graphql"; | |
| const graphqlHeaders = { | |
| "Content-Type": "application/json", | |
| "x-hasura-admin-secret": "k3!njf5!45!sa4!f", | |
| }; | |
| const fetchLatest5Txns = ` | |
| query fetchLatest5Txns { | |
| good_txns (order_by : { txn_blocktime: desc }, limit: 5 ) { | |
| txn_hash | |
| } | |
| } | |
| `; | |
| let COUNT = 0; | |
| const cykuraBuilder = buildCoderMap( | |
| { cykura: IDL }, | |
| { | |
| cykura: PROGRAM_ADD, | |
| } | |
| ); | |
| const wallet = new Wallet(Keypair.generate()); | |
| const connection = new Connection( | |
| "https://summer-snowy-wave.solana-mainnet.quiknode.pro/1822e9e649576157616dca62aff4b2f69945c436/" | |
| ); | |
| const provider = new Provider(connection, wallet, { commitment: "finalized" }); | |
| const cyclosCore = new Program(IDL, PROGRAM_ADD, provider); | |
| app | |
| .get("/", async (req, res) => { | |
| try { | |
| const resp = await axios.post( | |
| POST_URL, | |
| { | |
| query: fetchLatest5Txns, | |
| }, | |
| { | |
| headers: graphqlHeaders, | |
| } | |
| ); | |
| console.log(resp?.data); | |
| return res.send(resp?.data); | |
| } catch (e) { | |
| console.log("Something went wrong"); | |
| console.log(e); | |
| } | |
| }) | |
| .get("/count", (req, res) => res.send(JSON.stringify(COUNT))); | |
| app.listen(PORT, () => console.log(`Listening on ${PORT}`)); | |
| let lastFetchedTxnHash; | |
| (async function () { | |
| /** Fetch the latest transcation by iteslf and continue the loop from there. */ | |
| const data = await connection.getConfirmedSignaturesForAddress2( | |
| PROGRAM_ADD, | |
| { | |
| limit: 20, | |
| }, | |
| "finalized" | |
| ); | |
| console.log( | |
| data[19].signature, | |
| " is the latest transaction fetched from where the indexer will continue fetching forward looking to present time" | |
| ); | |
| lastFetchedTxnHash = await fetchTransactions(data[19].signature); | |
| })(); | |
| setInterval(async () => { | |
| console.log( | |
| "Auto fetching runs now", | |
| new Date().toLocaleString("en-Us", { timeZone: "Asia/Kolkata" }) | |
| ); | |
| lastFetchedTxnHash = await fetchTransactions(lastFetchedTxnHash); | |
| }, 5000); | |
| async function fetchTransactions(lastFetchedTxnHash) { | |
| console.log(`Fetching until ${lastFetchedTxnHash}`); | |
| const data = await connection.getConfirmedSignaturesForAddress2( | |
| PROGRAM_ADD, | |
| { | |
| until: lastFetchedTxnHash, | |
| }, | |
| "finalized" | |
| ); | |
| const signArr = data.map((d) => d.signature); | |
| console.log(`Got ${signArr.length} signatures`); | |
| if (signArr.length == 0) { | |
| console.log("Ran out of signatures."); | |
| return; | |
| } | |
| console.log( | |
| `First txn is ${signArr[0]} and last txn is ${signArr[signArr.length - 1]}` | |
| ); | |
| const txnArr = await connection.getParsedTransactions(signArr); | |
| let t = ""; | |
| if (!txnArr) { | |
| console.log("txnArr is null"); | |
| return; | |
| } | |
| console.log(txnArr.length, " number of txns fethced"); | |
| console.log("Adding Txns"); | |
| let count = 0; | |
| let largestBlockTime = 0; | |
| let largestTxnHash; | |
| for (const txn of txnArr) { | |
| if (!txn) { | |
| continue; | |
| } | |
| const signer = txn?.transaction?.message?.accountKeys[0].pubkey.toString(); | |
| const txnHash = txn?.transaction?.signatures.toString(); | |
| const txnSlot = txn?.slot; | |
| const txnBlockTime = txn?.blockTime; | |
| const txnsLogs = txn?.meta?.logMessages ?? []; | |
| // Try to find the latest hash wrt blocktime in the set of txn signatures fethced | |
| // Can be a failed or a successful txn too | |
| if (largestBlockTime > txnBlockTime) { | |
| largestTxnHash = txnHash; | |
| } else { | |
| largestBlockTime = txnBlockTime; | |
| largestTxnHash = txnHash; | |
| } | |
| // Skip failed transactions for now | |
| if (txn.meta?.err !== null) { | |
| continue; | |
| } | |
| let events; | |
| try { | |
| events = cykuraBuilder.cykura.parseProgramLogEvents(txnsLogs); | |
| } catch (e) { | |
| // Handle things like | |
| // 1. upgrade of contracts = "2jAwQP4HgfRkD7M5ry6HVuHiqPstgXaULfQqhJMbFEaXcZqnyWQaLCcKv5xvgmQEWaso1RQGjscmCD4CSnVEAv5z" | |
| console.log("parseProgramLogEvents fails"); | |
| console.log(txn?.meta.logMessages); | |
| process.exit(1); | |
| } | |
| for (const e of events) { | |
| if (e.name === "SwapEvent") { | |
| const event = e; | |
| const data = event.data; | |
| console.log( | |
| ` - ${txnHash} ${new Date(txnBlockTime * 1000) | |
| .toLocaleString("en-Us", { timeZone: "Asia/Kolkata" }) | |
| .slice()}` | |
| ); | |
| // Just to keep count | |
| count++; | |
| t += `{ | |
| txn_hash: "${txnHash}", | |
| txn_slot: "${txnSlot.toString()}", | |
| txn_blocktime: "${txnBlockTime.toString()}", | |
| token1: "SOL", | |
| token0: "USDC", | |
| tick: "${data.tick.toString()}", | |
| sqrtprice_x32: "${data.sqrtPriceX32.toString()}", | |
| signer: "${signer.toString()}", | |
| sender: "${data.sender.toString()}", | |
| pool_addr: "${data.poolState.toString()}", | |
| liquidity: "${data.liquidity.toString()}", | |
| amount1: "${data.amount0.toString()}", | |
| amount0: "${data.amount1.toString()}", | |
| }`; | |
| } | |
| } | |
| } | |
| console.log("Found", count, "swaps"); | |
| // Query to insert swaps if not exits. | |
| // TODO: The scheme needs some changing. This is a POC test ex. poolAddr and txnHash should be the PK, which is not the case now | |
| const query = ` | |
| mutation MyMutation { | |
| insert_good_txns(objects: [${t}], on_conflict: {constraint: good_txns_pkey} ) { | |
| affected_rows | |
| returning { | |
| txn_hash | |
| } | |
| } | |
| } | |
| `; | |
| // Insert the data using a mutation. Try catch. | |
| // If it fails then dont update the last fetched value => It will re run with the previous lastFetchedTxn | |
| // If it succeds then update the lastFetchedTxn value to fetch newer data | |
| try { | |
| const resp = await axios.post( | |
| POST_URL, | |
| { | |
| query, | |
| }, | |
| { | |
| headers: graphqlHeaders, | |
| } | |
| ); | |
| console.log(resp?.data); | |
| console.log( | |
| `Inserted ${resp?.data?.data?.insert_good_txns.affected_rows} rows of data` | |
| ); | |
| console.log( | |
| resp?.data.data.insert_good_txns.returning.map((r) => r.txn_hash) | |
| ); | |
| } catch (e) { | |
| console.log("Something went wrong"); | |
| console.log(e); | |
| return lastFetchedTxnHash; | |
| } | |
| // TODO: The idea is since we are fetching close to the latest hash we will not get more than 1000 txns at a given point | |
| // unless the write to DB fails which will prompt the function to run again that might cause it to fetch >1000 txns | |
| // Handle such cases such that it doesn't interfere with setInterval call | |
| console.log( | |
| `The latest hash in the batch is ${largestTxnHash} @ ${new Date( | |
| largestBlockTime * 1000 | |
| ).toLocaleString("en-Us", { timeZone: "Asia/Kolkata" })}` | |
| ); | |
| return largestTxnHash; | |
| } | |
| setInterval(() => { | |
| COUNT += 1; | |
| }, 5000); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment