token-gallery-backend/scraper.js

495 lines
17 KiB
JavaScript
Raw Normal View History

2025-01-18 12:40:12 +00:00
const utils = require('ethereumjs-util');
const fs = require('fs');
const { Client } = require('pg');
const Web3 = require('web3');
const fetch = require("node-fetch");
const BigNumber = require("bignumber.js");
const BN = require('bn.js');
const { create } = require('ipfs-http-client');
const all = require('it-all')
const uint8ArrayConcat = require('uint8arrays/concat')
const uint8ArrayToString = require('uint8arrays/to-string')
const Sentry = require("@sentry/node");
const Tracing = require("@sentry/tracing");
const settings = require(process.argv[2]);
Sentry.init({
dsn: settings.sentry.scraper,
tracesSampleRate: 1.0
});
const ipfsNodes = settings.ipfs.nodes.map(n => create(n) );
async function fetchFromIpfs(hash) {
if (hash.startsWith("ipfs://")) {
hash = hash.substring(7);
}
const promises = ipfsNodes.map(n => all(n.cat(hash)));
const result = await Promise.any(promises);
const rawData = uint8ArrayConcat(result);
return uint8ArrayToString(rawData);
}
const provider = new Web3.providers.WebsocketProvider(settings.web3.url, settings.web3.options)
const web3 = new Web3(provider, null, { transactionConfirmationBlocks: 1 });
provider.on("error", e => {
Sentry.captureException(e);
console.error("Websocket error, exiting", e);
process.exit(1)
});
provider.on("connect", () => {
console.log("Websocket connected");
});
// "end" event is handled further down
async function fetchJson(url) { return await (await fetch(url)).json() }
// This is just here to keep the program from closing when it hits the end.
setInterval(() => {}, 1000 * 60 );
(async () => {
const client = new Client(settings.db);
await client.connect();
const factoryAbi = JSON.parse(await fs.promises.readFile("./factory.abi.json"));
const storeAbi = JSON.parse(await fs.promises.readFile("./store.abi.json"));
async function lookup(key) {
const result = await client.query("select val from lookup where key = $1 limit 1", [ key ]);
return result.rows.length == 0 ? null : result.rows[0].val;
}
async function setLatestBlock(blockNumber) {
const result = await client.query("update lookup set val = $1 where key = 'lastFactoryBlock' and cast(val as integer) < cast($1 as integer)", [ blockNumber ]);
if (result.rowCount == 0) console.log(`Ignoring last block update ${blockNumber} as was older than current`);
}
async function getLatestBlock() {
const result = await client.query("select cast(val as integer) as val from lookup where key = 'lastFactoryBlock' limit 1");
return result.rows[0].val;
}
async function insertEvent(event) {
const result = await client.query(
"insert into event( inserted, name, address, return_values, log_index, transaction_index, block) values ( now(), $1, $2, $3, $4, $5, $6 ) on conflict ( address, log_index, transaction_index, block ) do nothing returning *",
[ event.event, event.address, event.returnValues, event.logIndex, event.transactionIndex, event.blockNumber ]
);
if (result.rowCount == 0) {
return -1;
}
else {
return result.rows[0].id;
}
}
async function getNft(storeAddress, nftId) {
const result = await client.query("select * from nft where nft_id = $1 and store = $2 limit 1", [ nftId, storeAddress ]);
if (result.rowCount == 0) { return null; }
else {
const r = result.rows[0];
return {
id: r.id,
nftId: r.nft_id,
storeAddress: r.store,
ownerAddress: r.owner,
inserted: r.inserted,
base: r.base,
metadata: r.metadata,
metadataUri: r.metadata_uri,
hidden: r.hidden,
ethPrice: new BigNumber(r.eth_price),
tokenPrice: new BigNumber(r.token_price)
}
}
}
function registerStoreEventHandlers(contract) {
contract.events.TransferSingle({ fromBlock: lastSeenBlock }).on("data", handleNftTransferEvent);
contract.events.TransferBatch({ fromBlock: lastSeenBlock }).on("data", handleNftBatchTransferEvent);
contract.events.BuySingleNft({ fromBlock: lastSeenBlock }).on("data", handleBuySingleNft);
contract.events.TokenBuySingleNft({ fromBlock: lastSeenBlock }).on("data", handleTokenBuySingleNft);
contract.events.CreatorTransferred({ fromBlock: lastSeenBlock }).on("data", handleStoreTransferEvent);
contract.events.PriceChange({ fromBlock: lastSeenBlock }).on("data", handlePriceChangeEvent);
}
async function handleNewStoreEvent(event) {
const address = event.returnValues.store;
console.log(`New store: ${address}`);
try {
var contract = new web3.eth.Contract(storeAbi, address);
var name = await contract.methods.name().call();
var symbol = await contract.methods.symbol().call();
var creator = await contract.methods.creator().call();
}
catch (error) {
Sentry.captureException(e);
// This happens sometimes I think because nodes aren't in sync when we continue on a single confirmation.
console.error("There was an error handling new store, bailing and restarting to resume from where we left off.", error);
process.exit(1);
}
if (!stores[address]) {
stores[address] = {
address,
name,
symbol,
creator,
contract
}
}
const eventId = await insertEvent(event);
if (eventId == -1) {
console.log("Store already in database, not inserting.");
}
else {
const result = await client.query(
"insert into store(address, name, symbol, creator, inserted) values($1, $2, $3, $4, now()) on conflict(address) do nothing",
[address, name, symbol, creator]
);
if (result.rowCount == 0) console.error("Store not inserted for some reason.");
}
if (event.blockNumber > lastSeenBlock) {
lastSeenBlock = event.blockNumber;
await setLatestBlock(event.blockNumber);
}
registerStoreEventHandlers(contract);
}
async function handleStoreTransferEvent(event) {
console.log("Store transfer event");
const from = event.returnValues.from;
const to = event.returnValues.to;
const storeAddress = event.address;
const eventId = await insertEvent(event);
if (eventId == -1) {
console.log("Skipping transfer store event.");
}
else {
const result = await client.query("update store set creator = $1 where address = $2", [ to, storeAddress ]);
}
if (event.blockNumber > lastSeenBlock) {
lastSeenBlock = event.blockNumber;
await setLatestBlock(event.blockNumber);
}
}
// sometimes these come in before the mint so they have to be deferred.
const handlePriceChangeEvent = async function (event, deferred = false) {
console.log("Nft price change event");
async function updateNft(price, forSale, tokenPrice, tokenForSale, storeAddress, nftId) {
const params = [ price, forSale, tokenPrice, tokenForSale, storeAddress, nftId ];
const result = await client.query("update nft set eth_price = $1, eth_for_sale = $2, token_price = $3, token_for_sale = $4 where store = $5 and nft_id = $6", params);
if (result.rowCount != 1) {
console.error(`Updated ${result.rowCount} rows instead of 1 for some reason, deferring and trying again.`);
setTimeout(async function(){ await handlePriceChangeEvent(event, true); }, 5000);
}
}
const storeAddress = event.address;
const nftId = event.returnValues.id;
const price = event.returnValues.price;
const forSale = event.returnValues.forSale;
const tokenPrice = event.returnValues.tokenPrice;
const tokenForSale = event.returnValues.tokenForSale;
if (deferred) {
// We KNOW there's already an event so skip trying to insert it
await updateNft(price, forSale, tokenPrice, tokenForSale, storeAddress, nftId);
}
else {
const eventId = await insertEvent(event);
if (eventId == -1) {
console.log("Skipping nft price change event.");
}
else {
await updateNft(price, forSale, tokenPrice, tokenForSale, storeAddress, nftId);
}
}
if (event.blockNumber > lastSeenBlock) {
lastSeenBlock = event.blockNumber;
await setLatestBlock(event.blockNumber);
}
}
async function handleBuySingleNft(event, deferred = false, count = 5) {
console.log("NFT buy event (using Ubiq)");
if (!deferred) {
const eventId = await insertEvent(event);
}
// Don't need to show the transfer event because the buy event has same info.
const params = [
event.blockNumber,
event.transactionIndex,
event.returnValues.from,
event.returnValues.to,
event.returnValues.id
];
console.log(params);
const result = await client.query("update event set hidden=true where name = 'TransferSingle' and block = $1 and transaction_index = $2 and return_values->>'_from' = $3 and return_values->>'_to' = $4 and return_values->>'_id' = $5", params);
if (result.rowCount == 0) {
// Sometimes the events come out of order
if (count > 0) {
console.log("No transfer activity was found to hide, so waiting and trying again later.");
setTimeout(() => { handleBuySingleNft(event, true, count-1); }, 5000);
}
}
}
async function handleTokenBuySingleNft(event, deferred = false, count = 5) {
console.log("NFT buy event (using token)");
if (!deferred) {
const eventId = await insertEvent(event);
}
// Don't need to show the transfer event because the buy event has same info.
const params = [
event.blockNumber,
event.transactionIndex,
event.returnValuesfrom,
event.returnValues.to,
event.returnValues.id
];
const result = await client.query("update event set hidden=true where name = 'TransferSingle' and block = $1 and transaction_index = $2 and return_values->>'_from' = $3 and return_values->>'_to' = $4 and return_values->>'_id' = $5", params);
if (result.rowCount == 0) {
// Sometimes the events come out of order
if (count > 0) {
console.log("No transfer activity was found to hide, so waiting and trying again later.");
setTimeout(() => { handleTokenBuySingleNft(event, true, count-1); }, 5000);
}
}
}
async function handleNftBatchTransferEvent(event) {
console.log("NFT batch transfer event");
const eventId = await insertEvent(event); // Save it even if we're not using it.
const ids = event.returnValues.ids;
// remove aggregate fields
delete event.returnValues['3'];
delete event.returnValues['4'];
delete event.returnValues.ids;
delete event.returnValues.values;
event.event = 'TransferSingle';
event.returnValues.value = '1';
// Can't have same one for every derived event, so faking them, but predictably and recognizably.
let logIndex = event.logIndex + 10000;
for (const id of ids) {
event.returnValues.id = id;
event.logIndex = logIndex;
await handleNftTransferEvent(event);
++logIndex;
}
}
/**
* Handles token transfers and mints.
*/
async function handleNftTransferEvent(event) {
console.log("NFT transfer event");
const eventId = await insertEvent(event);
const operator = event.returnValues.operator;
const from = event.returnValues.from;
const to = event.returnValues.to;
const nftId = event.returnValues.id;
const isMintOperation = (from == "0x0000000000000000000000000000000000000000");
const isBurnOperation = (!isMintOperation && to == "0x0000000000000000000000000000000000000000");
const storeAddress = event.address;
const contract = stores[storeAddress].contract;
try {
// erc1155 can hold both fungible and nonfungible
var isNft = await contract.methods.isNonFungibleItem(nftId).call();
}
catch (error) {
console.error("Error getting if contract is nonfungible. bailing and starting from where we left off.", error);
process.exit(1);
}
if (isNft) {
if (isMintOperation) {
console.log("Mint operation");
try {
var metaId = await contract.methods.metaId(nftId).call();
}
catch (error) {
console.error("Error getting meta id. bailing and starting from where we left off.", error);
process.exit(1);
}
if (eventId != -1) {
const result = await client.query(
"insert into nft(nft_id, store, owner, inserted, base, meta_id) values($1, $2, $3, now(), false, $4) on conflict(nft_id, store) do nothing",
[ nftId, storeAddress, to, metaId ]
);
if (result.rowCount == 0) {
console.log("NFT wasn't inserted for some reason.");
}
}
const hasMetadata = (await client.query("select count(*) from nft where nft_id = $1 and store = $2 and metadata is not null", [ nftId, storeAddress ])) > 0;
if (!hasMetadata) {
// insert metadata if it exists.
const contract = stores[storeAddress].contract;
// this shouldn't ever fail because we call the contract earlier and it succeeded.
const uri = await contract.methods.uri(nftId).call();
try {
let metadata;
if (uri.startsWith("ipfs://")) {
metadata = await fetchFromIpfs(uri);
} else {
metadata = await fetchJson(uri);
}
const result = await client.query("update nft set metadata = $1, metadata_uri = $2 where nft_id = $3 and store = $4 and metadata is null", [ metadata, uri, nftId, storeAddress ]);
if (result.rowCount == 1) {
console.log("Inserted NFT metadata");
}
} catch (e) {
if (e instanceof AggregateError) {
console.error("Aggregate error fetching metadata: ", e.errors);
} else {
console.error("Error fetching metadata", e);
}
}
}
}
else if (isBurnOperation) {
console.log("Burn operation");
if (eventId != -1) {
console.log("Burning in database");
const result = await client.query(
"update nft set owner = $1, eth_price=0, eth_for_sale=false, token_price=0, token_for_sale=false where nft_id = $2 and store = $3",
[ to, nftId, storeAddress ]
);
if (result.rowCount == 0) console.log("Burn not updated in db for some reason.");
}
else {
console.log("Already recorded in db.");
}
}
else {
console.log("Normal transfer operation");
if (eventId != -1) {
const result = await client.query(
"update nft set owner = $1, eth_price=0, eth_for_sale=false, token_price=0, token_for_sale=false where nft_id = $2 and store = $3",
[ to, nftId, storeAddress ]
);
if (result.rowCount == 0) console.log("Skipping NFT transfer, probably because it was already processed");
}
}
}
else {
console.log("Not an NFT, ignoring")
}
if (event.blockNumber > lastSeenBlock) {
lastSeenBlock = event.blockNumber;
await setLatestBlock(event.blockNumber);
}
}
async function handleNewBlock(blockHeader) {
if (blockHeader.number == lastSeenBlock) {
console.log(`Ignoring duplicate block ${lastSeenBlock}`);
}
else if (blockHeader.number > lastSeenBlock) {
lastSeenBlock = blockHeader.number;
await setLatestBlock(blockHeader.number);
}
else {
console.log("Received block header out of order, assuming already processed and ignoring.");
}
}
const stores = {};
let lastSeenBlock = await getLatestBlock();
const storeResult = await client.query("select address, name, symbol, creator from store order by inserted asc");
console.log(`Number of stores queried: ${storeResult.rowCount}`);
for (const row of storeResult.rows) {
const contract = new web3.eth.Contract(storeAbi, row.address);
registerStoreEventHandlers(contract);
stores[row.address] = {
address: row.address,
name: row.name,
symbol: row.symbol,
creator: row.creator,
contract
}
}
const factory = new web3.eth.Contract(factoryAbi, settings.factory.address);
console.log(`Watching for new stores since block: ${lastSeenBlock}`);
factory.events.NewStore({ fromBlock: lastSeenBlock })
.on("data", handleNewStoreEvent);
// handle provider disconnects
provider.on("error", e => {
Sentry.captureException(e);
console.log("Connection ended, re-establishing connection")
web3.eth.clearSubscriptions();
web3.setProvider(provider);
web3.eth.subscribe('newBlockHeaders')
.on('data', handleNewBlock)
.on('error', e => {
Sentry.captureException(e);
});
console.log("Re-setting all event listeners");
factory.events.NewStore({ fromBlock: lastSeenBlock }).on("data", handleNewStoreEvent);
for (const address of Object.keys(stores)) {
const store = stores[address];
registerStoreEventHandlers(store.contract);
}
});
// this is also done in the reconnect above.
web3.eth.subscribe('newBlockHeaders')
.on('data', handleNewBlock)
.on('error', e => {
Sentry.captureException(e);
});
// for each store, add a watch, starting at last block number above,
// for transfer and mint events
})().catch(e => {
Sentry.captureException(e);
console.error("Top-level failure, bailing and starting over from the top.", e);
process.exit(1);
});