Skip to content

Instantly share code, notes, and snippets.

@munanadi
Created May 6, 2022 13:59
Show Gist options
  • Select an option

  • Save munanadi/952cd38648d680dc738034b2e98de180 to your computer and use it in GitHub Desktop.

Select an option

Save munanadi/952cd38648d680dc738034b2e98de180 to your computer and use it in GitHub Desktop.
Watches the latest txns for SwapEvents and adds them to the table
const express = require("express");
const axios = require("axios");
const { Connection, Keypair, PublicKey } = require("@solana/web3.js");
const { IDL } = require("@cykura/sdk");
const anchor = require("@project-serum/anchor");
const { buildCoderMap } = require("@saberhq/anchor-contrib");
const { Program, Provider, Wallet } = anchor;
const PROGRAM_ADD = new PublicKey(
"cysPXAjehMpVKUapzbMCCnpFxUFFryEWEaLgnb9NrR8"
);
const PORT = process.env.PORT || 5000;
const app = express();
const POST_URL =
"http://hasuragraphqlapi-loadbalancer-1955656816.ap-south-1.elb.amazonaws.com/v1/graphql";
const graphqlHeaders = {
"Content-Type": "application/json",
"x-hasura-admin-secret": "k3!njf5!45!sa4!f",
};
const fetchLatest5Txns = `
query fetchLatest5Txns {
good_txns (order_by : { txn_blocktime: desc }, limit: 5 ) {
txn_hash
}
}
`;
let COUNT = 0;
const cykuraBuilder = buildCoderMap(
{ cykura: IDL },
{
cykura: PROGRAM_ADD,
}
);
const wallet = new Wallet(Keypair.generate());
const connection = new Connection(
"https://summer-snowy-wave.solana-mainnet.quiknode.pro/1822e9e649576157616dca62aff4b2f69945c436/"
);
const provider = new Provider(connection, wallet, { commitment: "finalized" });
const cyclosCore = new Program(IDL, PROGRAM_ADD, provider);
app
.get("/", async (req, res) => {
try {
const resp = await axios.post(
POST_URL,
{
query: fetchLatest5Txns,
},
{
headers: graphqlHeaders,
}
);
console.log(resp?.data);
return res.send(resp?.data);
} catch (e) {
console.log("Something went wrong");
console.log(e);
}
})
.get("/count", (req, res) => res.send(JSON.stringify(COUNT)));
app.listen(PORT, () => console.log(`Listening on ${PORT}`));
let lastFetchedTxnHash;
(async function () {
/** Fetch the latest transcation by iteslf and continue the loop from there. */
const data = await connection.getConfirmedSignaturesForAddress2(
PROGRAM_ADD,
{
limit: 20,
},
"finalized"
);
console.log(
data[19].signature,
" is the latest transaction fetched from where the indexer will continue fetching forward looking to present time"
);
lastFetchedTxnHash = await fetchTransactions(data[19].signature);
})();
setInterval(async () => {
console.log(
"Auto fetching runs now",
new Date().toLocaleString("en-Us", { timeZone: "Asia/Kolkata" })
);
lastFetchedTxnHash = await fetchTransactions(lastFetchedTxnHash);
}, 5000);
async function fetchTransactions(lastFetchedTxnHash) {
console.log(`Fetching until ${lastFetchedTxnHash}`);
const data = await connection.getConfirmedSignaturesForAddress2(
PROGRAM_ADD,
{
until: lastFetchedTxnHash,
},
"finalized"
);
const signArr = data.map((d) => d.signature);
console.log(`Got ${signArr.length} signatures`);
if (signArr.length == 0) {
console.log("Ran out of signatures.");
return;
}
console.log(
`First txn is ${signArr[0]} and last txn is ${signArr[signArr.length - 1]}`
);
const txnArr = await connection.getParsedTransactions(signArr);
let t = "";
if (!txnArr) {
console.log("txnArr is null");
return;
}
console.log(txnArr.length, " number of txns fethced");
console.log("Adding Txns");
let count = 0;
let largestBlockTime = 0;
let largestTxnHash;
for (const txn of txnArr) {
if (!txn) {
continue;
}
const signer = txn?.transaction?.message?.accountKeys[0].pubkey.toString();
const txnHash = txn?.transaction?.signatures.toString();
const txnSlot = txn?.slot;
const txnBlockTime = txn?.blockTime;
const txnsLogs = txn?.meta?.logMessages ?? [];
// Try to find the latest hash wrt blocktime in the set of txn signatures fethced
// Can be a failed or a successful txn too
if (largestBlockTime > txnBlockTime) {
largestTxnHash = txnHash;
} else {
largestBlockTime = txnBlockTime;
largestTxnHash = txnHash;
}
// Skip failed transactions for now
if (txn.meta?.err !== null) {
continue;
}
let events;
try {
events = cykuraBuilder.cykura.parseProgramLogEvents(txnsLogs);
} catch (e) {
// Handle things like
// 1. upgrade of contracts = "2jAwQP4HgfRkD7M5ry6HVuHiqPstgXaULfQqhJMbFEaXcZqnyWQaLCcKv5xvgmQEWaso1RQGjscmCD4CSnVEAv5z"
console.log("parseProgramLogEvents fails");
console.log(txn?.meta.logMessages);
process.exit(1);
}
for (const e of events) {
if (e.name === "SwapEvent") {
const event = e;
const data = event.data;
console.log(
` - ${txnHash} ${new Date(txnBlockTime * 1000)
.toLocaleString("en-Us", { timeZone: "Asia/Kolkata" })
.slice()}`
);
// Just to keep count
count++;
t += `{
txn_hash: "${txnHash}",
txn_slot: "${txnSlot.toString()}",
txn_blocktime: "${txnBlockTime.toString()}",
token1: "SOL",
token0: "USDC",
tick: "${data.tick.toString()}",
sqrtprice_x32: "${data.sqrtPriceX32.toString()}",
signer: "${signer.toString()}",
sender: "${data.sender.toString()}",
pool_addr: "${data.poolState.toString()}",
liquidity: "${data.liquidity.toString()}",
amount1: "${data.amount0.toString()}",
amount0: "${data.amount1.toString()}",
}`;
}
}
}
console.log("Found", count, "swaps");
// Query to insert swaps if not exits.
// TODO: The scheme needs some changing. This is a POC test ex. poolAddr and txnHash should be the PK, which is not the case now
const query = `
mutation MyMutation {
insert_good_txns(objects: [${t}], on_conflict: {constraint: good_txns_pkey} ) {
affected_rows
returning {
txn_hash
}
}
}
`;
// Insert the data using a mutation. Try catch.
// If it fails then dont update the last fetched value => It will re run with the previous lastFetchedTxn
// If it succeds then update the lastFetchedTxn value to fetch newer data
try {
const resp = await axios.post(
POST_URL,
{
query,
},
{
headers: graphqlHeaders,
}
);
console.log(resp?.data);
console.log(
`Inserted ${resp?.data?.data?.insert_good_txns.affected_rows} rows of data`
);
console.log(
resp?.data.data.insert_good_txns.returning.map((r) => r.txn_hash)
);
} catch (e) {
console.log("Something went wrong");
console.log(e);
return lastFetchedTxnHash;
}
// TODO: The idea is since we are fetching close to the latest hash we will not get more than 1000 txns at a given point
// unless the write to DB fails which will prompt the function to run again that might cause it to fetch >1000 txns
// Handle such cases such that it doesn't interfere with setInterval call
console.log(
`The latest hash in the batch is ${largestTxnHash} @ ${new Date(
largestBlockTime * 1000
).toLocaleString("en-Us", { timeZone: "Asia/Kolkata" })}`
);
return largestTxnHash;
}
setInterval(() => {
COUNT += 1;
}, 5000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment