Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add last synced blockNumber to each syncer #235

Merged
merged 1 commit into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add last synced blockNumber to each syncer
  • Loading branch information
esen committed Jun 1, 2022
commit d43508cf8b266cb4706fd32c0d518294a1f05fee
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@ import io.horizontalsystems.ethereumkit.core.ITransactionSyncer
import io.horizontalsystems.ethereumkit.models.Eip20Event
import io.horizontalsystems.ethereumkit.models.ProviderTokenTransaction
import io.horizontalsystems.ethereumkit.models.Transaction
import io.reactivex.Single

class Erc20TransactionSyncer(
private val transactionProvider: ITransactionProvider,
private val storage: IEip20Storage
): ITransactionSyncer {
private val transactionProvider: ITransactionProvider,
private val storage: IEip20Storage
) : ITransactionSyncer {

private fun handle(transactions: List<ProviderTokenTransaction>) {
if (transactions.isEmpty()) return

val events = transactions.map { tx ->
Eip20Event(tx.hash, tx.contractAddress, tx.from, tx.to, tx.value, tx.tokenName, tx.tokenSymbol, tx.tokenDecimal, )
Eip20Event(tx.hash, tx.blockNumber, tx.contractAddress, tx.from, tx.to, tx.value, tx.tokenName, tx.tokenSymbol, tx.tokenDecimal)
}

storage.save(events)
}

override fun getTransactionsSingle(lastTransactionBlockNumber: Long) =
transactionProvider.getTokenTransactions(lastTransactionBlockNumber + 1)
override fun getTransactionsSingle(): Single<List<Transaction>> {
val lastTransactionBlockNumber = storage.getLastEvent()?.blockNumber ?: 0

return transactionProvider.getTokenTransactions(lastTransactionBlockNumber + 1)
.doOnSuccess { providerTokenTransactions -> handle(providerTokenTransactions) }
.map { providerTokenTransactions ->
providerTokenTransactions.map { transaction ->
Expand All @@ -40,5 +43,6 @@ class Erc20TransactionSyncer(
)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.horizontalsystems.ethereumkit.api.models.EthereumKitState
import io.horizontalsystems.ethereumkit.api.storage.ApiStorage
import io.horizontalsystems.ethereumkit.core.storage.Eip20Storage
import io.horizontalsystems.ethereumkit.core.storage.TransactionStorage
import io.horizontalsystems.ethereumkit.core.storage.TransactionSyncerStateStorage
import io.horizontalsystems.ethereumkit.crypto.CryptoUtils
import io.horizontalsystems.ethereumkit.crypto.InternalBouncyCastleProvider
import io.horizontalsystems.ethereumkit.decorations.DecorationManager
Expand Down Expand Up @@ -398,11 +399,12 @@ class EthereumKit(

val transactionDatabase = EthereumDatabaseManager.getTransactionDatabase(application, walletId, chain)
val transactionStorage = TransactionStorage(transactionDatabase)
val transactionSyncerStateStorage = TransactionSyncerStateStorage(transactionDatabase)

val erc20Database = EthereumDatabaseManager.getErc20Database(application, walletId, chain)
val erc20Storage = Eip20Storage(erc20Database)

val ethereumTransactionSyncer = EthereumTransactionSyncer(transactionProvider)
val ethereumTransactionSyncer = EthereumTransactionSyncer(transactionProvider, transactionSyncerStateStorage)
val internalTransactionsSyncer = InternalTransactionSyncer(transactionProvider, transactionStorage)

val decorationManager = DecorationManager(address, transactionStorage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ interface IBlockchainListener {
}

interface ITransactionStorage {
fun getLastTransaction(): Transaction?
fun getTransactions(hashes: List<ByteArray>): List<Transaction>
fun getTransaction(hash: ByteArray): Transaction?
fun getTransactionsBeforeAsync(tags: List<List<String>>, hash: ByteArray?, limit: Int?): Single<List<Transaction>>
Expand All @@ -76,6 +75,7 @@ interface ITransactionStorage {
fun getPendingTransactions(tags: List<List<String>>): List<Transaction>
fun getNonPendingTransactionsByNonces(pendingTransactionNonces: List<Long>): List<Transaction>

fun getLastInternalTransaction(): InternalTransaction?
fun getInternalTransactions(): List<InternalTransaction>
fun getInternalTransactionsByHashes(hashes: List<ByteArray>): List<InternalTransaction>
fun saveInternalTransactions(internalTransactions: List<InternalTransaction>)
Expand All @@ -84,13 +84,14 @@ interface ITransactionStorage {
}

interface IEip20Storage {
fun getLastEvent(): Eip20Event?
fun save(events: List<Eip20Event>)
fun getEvents(): List<Eip20Event>
fun getEventsByHashes(hashes: List<ByteArray>): List<Eip20Event>
}

interface ITransactionSyncer {
fun getTransactionsSingle(lastTransactionBlockNumber: Long): Single<List<Transaction>>
fun getTransactionsSingle(): Single<List<Transaction>>
}

interface IMethodDecorator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@ import io.reactivex.Flowable
import io.reactivex.Single
import io.reactivex.subjects.PublishSubject
import java.math.BigInteger
import java.util.logging.Logger

class TransactionManager(
private val storage: ITransactionStorage,
private val decorationManager: DecorationManager,
private val blockchain: IBlockchain,
private val provider: ITransactionProvider
) {
val lastTransaction: Transaction?
get() = storage.getLastTransaction()

private val logger = Logger.getLogger(this.javaClass.simpleName)
private val fullTransactionsSubject = PublishSubject.create<List<FullTransaction>>()
private val fullTransactionsWithTagsSubject = PublishSubject.create<List<TransactionWithTags>>()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import io.horizontalsystems.ethereumkit.models.Eip20Event
entities = [
Eip20Event::class
],
version = 1,
version = 2,
exportSchema = false
)
@TypeConverters(RoomTypeConverters::class, Eip20Database.TypeConverters::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import io.horizontalsystems.ethereumkit.models.Eip20Event
@Dao
interface Eip20EventDao {

@Query("SELECT * FROM Eip20Event ORDER BY blockNumber DESC LIMIT 1")
fun getLastEip20Event(): Eip20Event?

@Insert
fun insertEip20Events(events: List<Eip20Event>)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import io.horizontalsystems.ethereumkit.models.Eip20Event
class Eip20Storage(database: Eip20Database) : IEip20Storage {
private val erc20EventDao = database.eip20EventDao()

override fun getLastEvent(): Eip20Event? =
erc20EventDao.getLastEip20Event()

override fun save(events: List<Eip20Event>) {
erc20EventDao.insertEip20Events(events)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ interface TransactionDao {
@Query("SELECT * FROM `Transaction` WHERE hash=:hash")
fun getTransaction(hash: ByteArray): Transaction?

@Query("SELECT * FROM `Transaction` WHERE blockNumber IS NOT NULL ORDER BY blockNumber DESC LIMIT 1")
fun getLastTransaction() : Transaction?
@Query("SELECT * FROM `InternalTransaction` ORDER BY blockNumber DESC LIMIT 1")
fun getLastInternalTransaction() : InternalTransaction?

@Query("SELECT * FROM `Transaction` WHERE hash IN (:hashes)")
fun getTransactions(hashes: List<ByteArray>): List<Transaction>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,25 @@ import androidx.room.*
import io.horizontalsystems.ethereumkit.api.storage.RoomTypeConverters
import io.horizontalsystems.ethereumkit.models.InternalTransaction
import io.horizontalsystems.ethereumkit.models.Transaction
import io.horizontalsystems.ethereumkit.models.TransactionSyncerState
import io.horizontalsystems.ethereumkit.models.TransactionTag

@Database(
entities = [
Transaction::class,
InternalTransaction::class,
TransactionTag::class
TransactionTag::class,
TransactionSyncerState::class
],
version = 11,
version = 12,
exportSchema = false
)
@TypeConverters(RoomTypeConverters::class, TransactionDatabase.TypeConverters::class)
abstract class TransactionDatabase : RoomDatabase() {

abstract fun transactionDao(): TransactionDao
abstract fun transactionTagDao(): TransactionTagDao
abstract fun transactionSyncerStateDao(): TransactionSyncerStateDao

companion object {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ class TransactionStorage(database: TransactionDatabase) : ITransactionStorage {
private val transactionDao = database.transactionDao()
private val tagsDao = database.transactionTagDao()

override fun getLastTransaction(): Transaction? =
transactionDao.getLastTransaction()

override fun getTransactions(hashes: List<ByteArray>): List<Transaction> =
transactionDao.getTransactions(hashes)

Expand Down Expand Up @@ -124,6 +121,9 @@ class TransactionStorage(database: TransactionDatabase) : ITransactionStorage {
override fun getNonPendingTransactionsByNonces(pendingTransactionNonces: List<Long>): List<Transaction> =
transactionDao.getNonPendingByNonces(pendingTransactionNonces)

override fun getLastInternalTransaction(): InternalTransaction? =
transactionDao.getLastInternalTransaction()

override fun getInternalTransactions(): List<InternalTransaction> =
transactionDao.getInternalTransactions()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.horizontalsystems.ethereumkit.core.storage

import androidx.room.Dao
import androidx.room.Insert
import androidx.room.OnConflictStrategy
import androidx.room.Query
import io.horizontalsystems.ethereumkit.models.TransactionSyncerState

@Dao
interface TransactionSyncerStateDao {

@Query("SELECT * FROM `TransactionSyncerState` WHERE syncerId = :syncerId LIMIT 1")
fun get(syncerId: String) : TransactionSyncerState?

@Insert(onConflict = OnConflictStrategy.REPLACE)
fun save(transactionSyncerState: TransactionSyncerState)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.horizontalsystems.ethereumkit.core.storage

import io.horizontalsystems.ethereumkit.models.TransactionSyncerState

class TransactionSyncerStateStorage(database: TransactionDatabase) {
private val dao = database.transactionSyncerStateDao()

fun get(syncerId: String): TransactionSyncerState? =
dao.get(syncerId)

fun save(transactionSyncerState: TransactionSyncerState) {
dao.save(transactionSyncerState)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.math.BigInteger
@Entity
class Eip20Event(
val hash: ByteArray,
val blockNumber: Long,
val contractAddress: Address,
val from: Address,
val to: Address,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import java.util.*
@Entity
data class InternalTransaction(
val hash: ByteArray,
val blockNumber: Long,
val from: Address,
val to: Address,
val value: BigInteger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ data class ProviderInternalTransaction(
val traceId: String
) {

fun internalTransaction() = InternalTransaction(hash, from, to, value)
fun internalTransaction() = InternalTransaction(hash, blockNumber, from, to, value)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.horizontalsystems.ethereumkit.models

import androidx.room.Entity
import androidx.room.PrimaryKey

@Entity
class TransactionSyncerState(
@PrimaryKey
val syncerId: String,
val lastBlockNumber: Long
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,27 @@ package io.horizontalsystems.ethereumkit.transactionsyncers

import io.horizontalsystems.ethereumkit.core.ITransactionProvider
import io.horizontalsystems.ethereumkit.core.ITransactionSyncer
import io.horizontalsystems.ethereumkit.core.storage.TransactionSyncerStateStorage
import io.horizontalsystems.ethereumkit.models.ProviderTransaction
import io.horizontalsystems.ethereumkit.models.Transaction
import io.horizontalsystems.ethereumkit.models.TransactionSyncerState
import io.reactivex.Single

class EthereumTransactionSyncer(
private val transactionProvider: ITransactionProvider
private val transactionProvider: ITransactionProvider,
private val storage: TransactionSyncerStateStorage
): ITransactionSyncer {

override fun getTransactionsSingle(lastTransactionBlockNumber: Long) =
transactionProvider.getTransactions(lastTransactionBlockNumber + 1).map { providerTransactions ->
companion object {
const val SyncerId = "ethereum-transaction-syncer"
}

override fun getTransactionsSingle(): Single<List<Transaction>> {
val lastTransactionBlockNumber = storage.get(SyncerId)?.lastBlockNumber ?: 0

return transactionProvider.getTransactions(lastTransactionBlockNumber + 1)
.doOnSuccess { providerTransactions -> handle(providerTransactions) }
.map { providerTransactions ->
providerTransactions.map { transaction ->
val isFailed = when {
transaction.txReceiptStatus != null -> {
Expand Down Expand Up @@ -42,5 +55,13 @@ class EthereumTransactionSyncer(
)
}
}
}

private fun handle(transactions: List<ProviderTransaction>) {
val maxBlockNumber = transactions.maxOfOrNull { it.blockNumber } ?: return
val syncerState = TransactionSyncerState(SyncerId, maxBlockNumber)

storage.save(syncerState)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.horizontalsystems.ethereumkit.core.ITransactionSyncer
import io.horizontalsystems.ethereumkit.models.InternalTransaction
import io.horizontalsystems.ethereumkit.models.ProviderInternalTransaction
import io.horizontalsystems.ethereumkit.models.Transaction
import io.reactivex.Single

class InternalTransactionSyncer(
private val transactionProvider: ITransactionProvider,
Expand All @@ -16,24 +17,27 @@ class InternalTransactionSyncer(
if (transactions.isEmpty()) return

val internalTransactions = transactions.map { tx ->
InternalTransaction(tx.hash, tx.from, tx.to, tx.value)
InternalTransaction(tx.hash, tx.blockNumber, tx.from, tx.to, tx.value)
}

storage.saveInternalTransactions(internalTransactions)
}

override fun getTransactionsSingle(lastTransactionBlockNumber: Long) =
transactionProvider.getInternalTransactions(lastTransactionBlockNumber + 1)
override fun getTransactionsSingle(): Single<List<Transaction>> {
val lastTransactionBlockNumber = storage.getLastInternalTransaction()?.blockNumber ?: 0

return transactionProvider.getInternalTransactions(lastTransactionBlockNumber + 1)
.doOnSuccess { providerInternalTransactions -> handle(providerInternalTransactions) }
.map { providerInternalTransactions ->
providerInternalTransactions.map { transaction ->
Transaction(
hash = transaction.hash,
timestamp = transaction.timestamp,
isFailed = false,
blockNumber = transaction.blockNumber,
)
providerInternalTransactions.map { transaction ->
Transaction(
hash = transaction.hash,
timestamp = transaction.timestamp,
isFailed = false,
blockNumber = transaction.blockNumber,
)
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class TransactionSyncManager(

syncState = EthereumKit.SyncState.Syncing()

Single.zip(syncers.map { it.getTransactionsSingle(transactionManager.lastTransaction?.blockNumber ?: 0) }) { array ->
Single.zip(syncers.map {
it.getTransactionsSingle().onErrorReturnItem(listOf())
}) { array ->
array
.map { it as List<Transaction> }
.reduce { acc, list -> acc + list }
Expand Down