Skip to content
This repository has been archived by the owner on Feb 14, 2024. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
pdeville-ledger committed Jul 5, 2022
2 parents 9ddf529 + 0d19b1e commit 47c645b
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 153 deletions.
5 changes: 4 additions & 1 deletion back/src/main/scala/foresight/indexer/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import akka.util.ByteString
import common.indexer._
import common.model.JRPC
import foresight.indexer._
import foresight.indexer.server.HttpServer
import foresight.indexer.server.WsServer
import foresight.model.Raw
import java.sql.Timestamp
Expand Down Expand Up @@ -44,6 +45,9 @@ object Indexer {
.toMat(Sink.ignore)(Keep.both)
.run()

new WsServer(rawInserter).wsServer
new HttpServer(rawInserter).httpServer

fetcher.newHeads
.via(fetcher.getBlock)
.map { case (x, ts) =>
Expand All @@ -68,6 +72,5 @@ object Indexer {
.toMat(Sink.ignore)(Keep.both)
.run()

new WsServer(rawInserter).wsServer
}
}
185 changes: 181 additions & 4 deletions back/src/main/scala/foresight/indexer/RawInserter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ final case class RawInserter(session: SlickSession) {
.transactionally
}

def getProcessedTransactionQuery =
def getProcessedTransactionByBlockHeightQuery(blockHeight: Int) =
sql"""SELECT
hash,
type,
Expand All @@ -164,22 +164,185 @@ final case class RawInserter(session: SlickSession) {
dropped_at,
block_hash,
sender,
receiver,
value,
gas,
gas_price,
max_fee_per_gas,
max_priority_fee_per_gas,
input,
nonce,
transaction_index,
status,
tip
FROM
processed_transactions
WHERE
block_height = $blockHeight
""".as(
GetResult(r =>
Processed.Transaction(
hash = r.nextString(),
transactionType = r.nextString() match {
case "Legacy" => Processed.TransactionType.Legacy
case _ => Processed.TransactionType.EIP1559
},
blockHeight = r.nextIntOption().map(Height(_)),
createdAt = r.nextTimestamp(),
minedAt = r.nextTimestampOption(),
droppedAt = r.nextTimestampOption(),
blockHash = r.nextStringOption(),
sender = r.nextString(),
receiver = r.nextString(),
value = r.nextBigDecimal(),
gas = r.nextBigDecimal(),
gasPrice = r.nextStringOption().map(HexNumber(_).toBigDecimal),
maxFeePerGas = r.nextStringOption().map(HexNumber(_).toBigDecimal),
maxPriorityFeePerGas =
r.nextStringOption().map(HexNumber(_).toBigDecimal),
input = r.nextString(),
nonce = r.nextBigDecimal(),
transactionIndex =
r.nextStringOption().map(HexNumber(_).toBigDecimal),
tip = r.nextBigDecimalOption()
)
)
)

def getProcessedTransactionQuery =
sql"""
with top_legacy as (select *
from processed_transactions
where mined_at is null
and created_at > now() - interval '3 hours'
and type = 'Legacy'
order by gas_price desc
limit 250),
top_eip1559 as (select *
from processed_transactions
where mined_at is null
and created_at > now() - interval '3 hours'
and type = 'EIP1559'
order by gas_price desc
limit 250)
SELECT
hash,
type,
block_height,
created_at,
mined_at,
dropped_at,
block_hash,
sender,
receiver,
value,
gas,
gas_price,
max_fee_per_gas,
max_priority_fee_per_gas,
input,
nonce,
transaction_index,
value,
status,
tip
FROM
top_eip1559
UNION ALL
SELECT
hash,
type,
block_height,
created_at,
mined_at,
dropped_at,
block_hash,
sender,
receiver,
value,
gas,
gas_price,
max_fee_per_gas,
max_priority_fee_per_gas,
input,
nonce,
transaction_index,
value,
status,
tip
FROM
top_legacy

""".as(
GetResult(r =>
Processed.Transaction(
hash = r.nextString(),
transactionType = r.nextString() match {
case "Legacy" => Processed.TransactionType.Legacy
case _ => Processed.TransactionType.EIP1559
},
blockHeight = r.nextIntOption().map(Height(_)),
createdAt = r.nextTimestamp(),
minedAt = r.nextTimestampOption(),
droppedAt = r.nextTimestampOption(),
blockHash = r.nextStringOption(),
sender = r.nextString(),
receiver = r.nextString(),
value = r.nextBigDecimal(),
gas = r.nextBigDecimal(),
gasPrice = r.nextStringOption().map(HexNumber(_).toBigDecimal),
maxFeePerGas = r.nextStringOption().map(HexNumber(_).toBigDecimal),
maxPriorityFeePerGas =
r.nextStringOption().map(HexNumber(_).toBigDecimal),
input = r.nextString(),
nonce = r.nextBigDecimal(),
transactionIndex =
r.nextStringOption().map(HexNumber(_).toBigDecimal),
tip = r.nextBigDecimalOption()
)
)
)

def getMemPoolQuery =
sql"""SELECT
count(*)
FROM
processed_transactions
WHERE
created_at > NOW() - interval '1 second' OR
mined_at > NOW() - interval '1 second'
mined_at is null and created_at > now() - interval '3 hours'
""".as[Int](
GetResult(r => r.nextInt())
)

def getProcessedTransactionByAddressQuery(address: String) =
sql"""SELECT
hash,
type,
block_height,
created_at,
mined_at,
dropped_at,
block_hash,
sender,
receiver,
value,
gas,
gas_price,
max_fee_per_gas,
max_priority_fee_per_gas,
input,
nonce,
transaction_index,
value,
status,
tip
FROM
processed_transactions
WHERE
(sender = $address OR receiver = $address) AND (
created_at > NOW() - interval '5 second' OR
mined_at > NOW() - interval '5 second'
)
""".as(
GetResult(r =>
Processed.Transaction(
Expand All @@ -203,15 +366,29 @@ final case class RawInserter(session: SlickSession) {
r.nextStringOption().map(HexNumber(_).toBigDecimal),
input = r.nextString(),
nonce = r.nextBigDecimal(),
transactionIndex = r.nextStringOption().map(HexNumber(_).toBigDecimal),
transactionIndex =
r.nextStringOption().map(HexNumber(_).toBigDecimal),
tip = r.nextBigDecimalOption()
)
)
)

def getProcessedTransactionByAddress(address: String): Future[List[Processed.Transaction]] =
session.db.run(getProcessedTransactionByAddressQuery(address)).map(_.toList)

def getProcessedTransaction: Future[List[Processed.Transaction]] =
session.db.run(getProcessedTransactionQuery).map(_.toList)

def getMemPool: Future[List[Int]] =
session.db.run(getMemPoolQuery).map(_.toList)

def getProcessedTransactionByBockHeight(
blockHeight: Int
): Future[List[Processed.Transaction]] =
session.db
.run(getProcessedTransactionByBlockHeightQuery(blockHeight))
.map(_.toList)

def insertBlock() = Flow[Raw.Block].via(Slick.flow(insertBlockQuery))

def updateBaseFee(): Flow[Raw.BaseFeeBatch, Int, NotUsed] =
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 47c645b

Please sign in to comment.