Skip to content

Commit

Permalink
v0.9.7
Browse files Browse the repository at this point in the history
  • Loading branch information
karimodm authored Sep 5, 2022
2 parents 7d42ba3 + 14b62d8 commit d4aef1c
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 39 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# v0.9.7 - 2022-09-05

> This release introduces major bugfixes to the networking and the faucet.
- Fix notarization and networking bugs (#2417)
- Make wallet stateless to prevent faucet getting stuck (#2415)
- Fix: /healthz endpoint (#2379)

# v0.9.6 - 2022-09-01

> This release introduces major bugfixes to epoch notarization and networking.
Expand Down
7 changes: 7 additions & 0 deletions client/wallet/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,10 @@ func GenericConnector(connector Connector) Option {
wallet.connector = connector
}
}

// Stateless allows to run the wallet in a stateless mode, meaning outputs will always be refreshed from the connector.
func Stateless(stateless bool) Option {
return func(wallet *Wallet) {
wallet.Stateless = stateless
}
}
14 changes: 10 additions & 4 deletions client/wallet/output_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ type OutputManager struct {
addressManager *AddressManager
connector Connector
unspentOutputs OutputsByAddressAndOutputID

optsStateless bool
}

// NewUnspentOutputManager creates a new UnspentOutputManager.
func NewUnspentOutputManager(addressManager *AddressManager, connector Connector) (outputManager *OutputManager) {
func NewUnspentOutputManager(addressManager *AddressManager, connector Connector, stateless bool) (outputManager *OutputManager) {
outputManager = &OutputManager{
addressManager: addressManager,
connector: connector,
unspentOutputs: NewAddressToOutputs(),
optsStateless: stateless,
}

if err := outputManager.Refresh(true); err != nil {
Expand All @@ -44,9 +47,12 @@ func (o *OutputManager) Refresh(includeSpentAddresses ...bool) error {
if _, addressExists := o.unspentOutputs[addy]; !addressExists {
o.unspentOutputs[addy] = make(map[utxo.OutputID]*Output)
}
// mark the output as spent if we already marked it as spent locally
if existingOutput, outputExists := o.unspentOutputs[addy][outputID]; outputExists && existingOutput.Spent {
output.Spent = true

// mark the output as spent if we already marked it as spent locally, only in stateful mode.
if !o.optsStateless {
if existingOutput, outputExists := o.unspentOutputs[addy][outputID]; outputExists && existingOutput.Spent {
output.Spent = true
}
}
o.unspentOutputs[addy][outputID] = output
}
Expand Down
3 changes: 2 additions & 1 deletion client/wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Wallet struct {
reusableAddress bool
ConfirmationPollInterval time.Duration
ConfirmationTimeout time.Duration
Stateless bool
}

// New is the factory method of the wallet. It either creates a new wallet or restores the wallet backup that is handed
Expand Down Expand Up @@ -93,7 +94,7 @@ func New(options ...Option) (wallet *Wallet) {
}

// initialize output manager
wallet.outputManager = NewUnspentOutputManager(wallet.addressManager, wallet.connector)
wallet.outputManager = NewUnspentOutputManager(wallet.addressManager, wallet.connector, wallet.Stateless)
err := wallet.outputManager.Refresh(true)
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion packages/core/epoch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

var (
// GenesisTime is the time (Unix in seconds) of the genesis.
GenesisTime int64 = 1662035280
GenesisTime int64 = 1662385954
// Duration is the default epoch duration in seconds.
Duration int64 = 10
)
Expand Down
21 changes: 19 additions & 2 deletions packages/core/notarization/commitments.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ func (f *EpochCommitmentFactory) removeStateMutationLeaf(ei epoch.Index, txID ut
return removeLeaf(commitment.stateMutationTree, txID.Bytes())
}

// hasStateMutationLeaf returns if the leaf is part of the state mutation sparse merkle tree.
func (f *EpochCommitmentFactory) hasStateMutationLeaf(ei epoch.Index, txID utxo.TransactionID) (has bool, err error) {
commitment, err := f.getCommitmentTrees(ei)
if err != nil {
return false, errors.Wrap(err, "could not get commitment while deleting state mutation leaf")
}
return commitment.stateMutationTree.Has(txID.Bytes())
}

// insertTangleLeaf inserts blk to the Tangle sparse merkle tree.
func (f *EpochCommitmentFactory) insertTangleLeaf(ei epoch.Index, blkID tangleold.BlockID) error {
commitment, err := f.getCommitmentTrees(ei)
Expand Down Expand Up @@ -250,11 +259,19 @@ func (f *EpochCommitmentFactory) storeDiffUTXOs(ei epoch.Index, spent, created [
epochDiffStorage := f.storage.getEpochDiffStorage(ei)

for _, spentOutputWithMetadata := range spent {
epochDiffStorage.spent.Store(spentOutputWithMetadata).Release()
cachedObj, stored := epochDiffStorage.spent.StoreIfAbsent(spentOutputWithMetadata)
if !stored {
continue
}
cachedObj.Release()
}

for _, createdOutputWithMetadata := range created {
epochDiffStorage.created.Store(createdOutputWithMetadata).Release()
cachedObj, stored := epochDiffStorage.created.StoreIfAbsent(createdOutputWithMetadata)
if !stored {
continue
}
cachedObj.Release()
}
}

Expand Down
15 changes: 15 additions & 0 deletions packages/core/notarization/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,15 @@ func (m *Manager) OnTransactionInclusionUpdated(event *ledger.TransactionInclusi

txID := event.TransactionID

has, err := m.isTransactionInEpoch(event.TransactionID, oldEpoch);
if err != nil {
m.log.Error(err)
return
}
if !has {
return
}

var spent, created []*ledger.OutputWithMetadata
m.tangle.Ledger.Storage.CachedTransaction(txID).Consume(func(tx utxo.Transaction) {
spent, created = m.resolveOutputs(tx)
Expand Down Expand Up @@ -576,6 +585,8 @@ func (m *Manager) includeTransactionInEpoch(txID utxo.TransactionID, ei epoch.In
if err := m.epochCommitmentFactory.insertStateMutationLeaf(ei, txID); err != nil {
return err
}
// TODO: in case of a reorg, a transaction spending the same output of another TX will cause a duplicate element
// in cache in the objectstorage if we don't hook to the reorged transaction "orphanage".
m.epochCommitmentFactory.storeDiffUTXOs(ei, spent, created)

m.Events.StateMutationTreeInserted.Trigger(&StateMutationTreeUpdatedEvent{TransactionID: txID})
Expand All @@ -596,6 +607,10 @@ func (m *Manager) removeTransactionFromEpoch(txID utxo.TransactionID, ei epoch.I
return nil
}

func (m *Manager) isTransactionInEpoch(txID utxo.TransactionID, ei epoch.Index) (has bool, err error) {
return m.epochCommitmentFactory.hasStateMutationLeaf(ei, txID)
}

// isCommittable returns if the epoch is committable, if all conflicts are resolved and the epoch is old enough.
func (m *Manager) isCommittable(ei epoch.Index) bool {
return m.isOldEnough(ei) && m.allPastConflictsAreResolved(ei)
Expand Down
1 change: 1 addition & 0 deletions packages/node/p2p/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (m *Manager) Send(packet proto.Message, protocolID protocol.ID, to ...ident
stream := nbr.GetStream(protocolID)
if stream == nil {
m.log.Warnw("send error, no stream for protocol", "peer-id", nbr.ID(), "protocol", protocolID)
nbr.Close()
continue
}
if err := stream.WritePacket(packet); err != nil {
Expand Down
48 changes: 29 additions & 19 deletions packages/node/p2p/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (m *Manager) dialPeer(ctx context.Context, p *peer.Peer, opts []ConnectPeer
}

if len(streams) == 0 {
return nil, fmt.Errorf("no streams initiated with peer %s / %s", address, p.ID())
return nil, errors.Errorf("no streams initiated with peer %s / %s", address, p.ID())
}

return streams, nil
Expand All @@ -102,7 +102,7 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe
ctx, cancel = context.WithTimeout(ctx, defaultConnectionTimeout)
defer cancel()
}
am, err := m.newAcceptMatcher(p, protocolID)
amCtx, am, err := m.newAcceptMatcher(ctx, p, protocolID)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -118,11 +118,11 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe
select {
case ps := <-streamCh:
if ps.Protocol() != protocolID {
return nil, fmt.Errorf("accepted stream has wrong protocol: %s != %s", ps.Protocol(), protocolID)
return nil, errors.Errorf("accepted stream has wrong protocol: %s != %s", ps.Protocol(), protocolID)
}
return ps, nil
case <-ctx.Done():
err := ctx.Err()
case <-amCtx.Done():
err := amCtx.Err()
if errors.Is(err, context.DeadlineExceeded) {
m.log.Debugw("accept timeout", "id", am.Peer.ID(), "proto", protocolID)
return nil, errors.WithStack(ErrTimeout)
Expand Down Expand Up @@ -166,7 +166,7 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe
}

if len(streams) == 0 {
return nil, fmt.Errorf("no streams accepted from peer %s", p.ID())
return nil, errors.Errorf("no streams accepted from peer %s", p.ID())
}

return streams, nil
Expand All @@ -175,7 +175,7 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe
func (m *Manager) initiateStream(ctx context.Context, libp2pID libp2ppeer.ID, protocolID protocol.ID) (*PacketsStream, error) {
protocolHandler, registered := m.registeredProtocols[protocolID]
if !registered {
return nil, fmt.Errorf("cannot initiate stream protocol %s is not registered", protocolID)
return nil, errors.Errorf("cannot initiate stream protocol %s is not registered", protocolID)
}
stream, err := m.GetP2PHost().NewStream(ctx, libp2pID, protocolID)
if err != nil {
Expand Down Expand Up @@ -210,11 +210,14 @@ func (m *Manager) handleStream(stream network.Stream) {
am := m.matchNewStream(stream)
if am != nil {
am.StreamChMutex.RLock()
defer am.StreamChMutex.RUnlock()
streamCh := am.StreamCh[protocolID]
am.StreamChMutex.RUnlock()

m.log.Debugw("incoming stream matched", "id", am.Peer.ID(), "proto", protocolID)
streamCh <- ps
select {
case <-am.Ctx.Done():
case streamCh <- ps:
m.log.Debugw("incoming stream matched", "id", am.Peer.ID(), "proto", protocolID)
}
} else {
// close the connection if not matched
m.log.Debugw("unexpected connection", "addr", stream.Conn().RemoteMultiaddr(),
Expand All @@ -230,54 +233,61 @@ type AcceptMatcher struct {
Libp2pID libp2ppeer.ID
StreamChMutex sync.RWMutex
StreamCh map[protocol.ID]chan *PacketsStream
Ctx context.Context
CtxCancel context.CancelFunc
}

func (m *Manager) newAcceptMatcher(p *peer.Peer, protocolID protocol.ID) (*AcceptMatcher, error) {
func (m *Manager) newAcceptMatcher(ctx context.Context, p *peer.Peer, protocolID protocol.ID) (context.Context, *AcceptMatcher, error) {
m.acceptMutex.Lock()
defer m.acceptMutex.Unlock()

libp2pID, err := libp2putil.ToLibp2pPeerID(p)
if err != nil {
return nil, errors.WithStack(err)
return nil, nil, errors.WithStack(err)
}

acceptMatcher, acceptExists := m.acceptMap[libp2pID]
if acceptExists {
acceptMatcher.StreamChMutex.Lock()
defer acceptMatcher.StreamChMutex.Unlock()
if _, streamChanExists := acceptMatcher.StreamCh[protocolID]; streamChanExists {
return nil, nil
return nil, nil, nil
}
acceptMatcher.StreamCh[protocolID] = make(chan *PacketsStream)
return acceptMatcher, nil
return acceptMatcher.Ctx, acceptMatcher, nil
}

cancelCtx, cancelCtxFunc := context.WithCancel(ctx)

am := &AcceptMatcher{
Peer: p,
Libp2pID: libp2pID,
StreamCh: make(map[protocol.ID]chan *PacketsStream),
Peer: p,
Libp2pID: libp2pID,
StreamCh: make(map[protocol.ID]chan *PacketsStream),
Ctx: cancelCtx,
CtxCancel: cancelCtxFunc,
}

am.StreamCh[protocolID] = make(chan *PacketsStream)

m.acceptMap[libp2pID] = am

return am, nil
return cancelCtx, am, nil
}

func (m *Manager) removeAcceptMatcher(am *AcceptMatcher, protocolID protocol.ID) {
m.acceptMutex.Lock()
defer m.acceptMutex.Unlock()

existingAm := m.acceptMap[am.Libp2pID]

existingAm.StreamChMutex.Lock()
defer existingAm.StreamChMutex.Unlock()

close(existingAm.StreamCh[protocolID])
delete(existingAm.StreamCh, protocolID)

if len(existingAm.StreamCh) == 0 {
delete(m.acceptMap, am.Libp2pID)
existingAm.CtxCancel()
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/autopeering/discovery/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import "github.com/iotaledger/goshimmer/plugins/config"
// ParametersDefinitionDiscovery contains the definition of configuration parameters used by the autopeering peer discovery.
type ParametersDefinitionDiscovery struct {
// NetworkVersion defines the config flag of the network version.
NetworkVersion uint32 `default:"64" usage:"autopeering network version"`
NetworkVersion uint32 `default:"65" usage:"autopeering network version"`

// EntryNodes defines the config flag of the entry nodes.
EntryNodes []string `default:"2PV5487xMw5rasGBXXWeqSi4hLz7r19YBt8Y1TGAsQbj@analysisentry-01.devnet.shimmer.iota.cafe:15626,5EDH4uY78EA6wrBkHHAVBWBMDt7EcksRq6pjzipoW15B@entry-0.devnet.tanglebay.com:14646,CAB87iQZR6BjBrCgEBupQJ4gpEBgvGKKv3uuGVRBKb4n@entry-1.devnet.tanglebay.com:14646" usage:"list of trusted entry nodes for auto peering"`
Expand Down
2 changes: 1 addition & 1 deletion plugins/banner/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
Plugin = node.NewPlugin(PluginName, nil, node.Enabled, configure, run)

// AppVersion version number
AppVersion = "v0.9.6"
AppVersion = "v0.9.7"
// SimplifiedAppVersion is the version number without commit hash
SimplifiedAppVersion = simplifiedVersion(AppVersion)
)
Expand Down
2 changes: 1 addition & 1 deletion plugins/database/versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
const (
// DBVersion defines the version of the database schema this version of GoShimmer supports.
// Every time there's a breaking change regarding the stored data, this version flag should be adjusted.
DBVersion = 64
DBVersion = 65
)

var (
Expand Down
5 changes: 0 additions & 5 deletions plugins/faucet/connector.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package faucet

import (
"fmt"

"github.com/iotaledger/hive.go/core/types/confirmation"
"github.com/pkg/errors"

Expand Down Expand Up @@ -33,10 +31,8 @@ func (f *FaucetConnector) UnspentOutputs(addresses ...address.Address) (unspentO
unspentOutputs = make(map[address.Address]map[utxo.OutputID]*wallet.Output)

for _, addr := range addresses {
fmt.Println("> Getting unspent outputs for ", addr.Base58())
f.indexer.CachedAddressOutputMappings(addr.Address()).Consume(func(mapping *indexer.AddressOutputMapping) {
f.tangle.Ledger.Storage.CachedOutput(mapping.OutputID()).Consume(func(output utxo.Output) {
fmt.Println("> > Found output ", output.String())
if typedOutput, ok := output.(devnetvm.Output); ok {
f.tangle.Ledger.Storage.CachedOutputMetadata(typedOutput.ID()).Consume(func(outputMetadata *ledger.OutputMetadata) {
if !outputMetadata.IsSpent() {
Expand All @@ -61,7 +57,6 @@ func (f *FaucetConnector) UnspentOutputs(addresses ...address.Address) (unspentO
})
})
}
fmt.Printf("%+v\n", unspentOutputs)
return
}

Expand Down
1 change: 1 addition & 0 deletions plugins/faucet/faucet.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewFaucet(faucetSeed *seed.Seed) (f *Faucet) {
wallet.FaucetPowDifficulty(Parameters.PowDifficulty),
wallet.ConfirmationTimeout(Parameters.MaxAwait),
wallet.ConfirmationPollingInterval(500*time.Millisecond),
wallet.Stateless(true),
)}
// We use index 1 as a proxy address from which we send the funds to the requester.
f.Wallet.NewReceiveAddress()
Expand Down
8 changes: 4 additions & 4 deletions plugins/webapi/healthz/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/labstack/echo"
"go.uber.org/dig"

"github.com/iotaledger/goshimmer/packages/core/tangleold"
"github.com/iotaledger/goshimmer/packages/core/bootstrapmanager"

"github.com/iotaledger/goshimmer/packages/node/shutdown"
)
Expand All @@ -20,8 +20,8 @@ const PluginName = "WebAPIHealthzEndpoint"
type dependencies struct {
dig.In

Server *echo.Echo
Tangle *tangleold.Tangle `optional:"true"`
Server *echo.Echo
BootstrapManager *bootstrapmanager.Manager `optional:"true"`
}

var (
Expand Down Expand Up @@ -50,7 +50,7 @@ func worker(ctx context.Context) {
}

func getHealthz(c echo.Context) error {
if deps.Tangle != nil {
if deps.BootstrapManager != nil && !deps.BootstrapManager.Bootstrapped() {
return c.NoContent(http.StatusServiceUnavailable)
}
return c.NoContent(http.StatusOK)
Expand Down

0 comments on commit d4aef1c

Please sign in to comment.