forked from cosmos/cosmos-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
streaming.go
112 lines (99 loc) · 3.48 KB
/
streaming.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package baseapp
import (
"fmt"
"sort"
"strings"
"github.com/spf13/cast"
"cosmossdk.io/store/streaming"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/client/flags"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
)
const (
StreamingTomlKey = "streaming"
StreamingABCITomlKey = "abci"
StreamingABCIPluginTomlKey = "plugin"
StreamingABCIKeysTomlKey = "keys"
StreamingABCIStopNodeOnErrTomlKey = "stop-node-on-err"
)
// RegisterStreamingServices registers streaming services with the BaseApp.
func (app *BaseApp) RegisterStreamingServices(appOpts servertypes.AppOptions, keys map[string]*storetypes.KVStoreKey) error {
// register streaming services
streamingCfg := cast.ToStringMap(appOpts.Get(StreamingTomlKey))
for service := range streamingCfg {
pluginKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, service, StreamingABCIPluginTomlKey)
pluginName := strings.TrimSpace(cast.ToString(appOpts.Get(pluginKey)))
if len(pluginName) > 0 {
logLevel := cast.ToString(appOpts.Get(flags.FlagLogLevel))
plugin, err := streaming.NewStreamingPlugin(pluginName, logLevel)
if err != nil {
return fmt.Errorf("failed to load streaming plugin: %w", err)
}
if err := app.registerStreamingPlugin(appOpts, keys, plugin); err != nil {
return fmt.Errorf("failed to register streaming plugin %w", err)
}
}
}
return nil
}
// registerStreamingPlugin registers streaming plugins with the BaseApp.
func (app *BaseApp) registerStreamingPlugin(
appOpts servertypes.AppOptions,
keys map[string]*storetypes.KVStoreKey,
streamingPlugin interface{},
) error {
v, ok := streamingPlugin.(storetypes.ABCIListener)
if !ok {
return fmt.Errorf("unexpected plugin type %T", v)
}
app.registerABCIListenerPlugin(appOpts, keys, v)
return nil
}
// registerABCIListenerPlugin registers plugins that implement the ABCIListener interface.
func (app *BaseApp) registerABCIListenerPlugin(
appOpts servertypes.AppOptions,
keys map[string]*storetypes.KVStoreKey,
abciListener storetypes.ABCIListener,
) {
stopNodeOnErrKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIStopNodeOnErrTomlKey)
stopNodeOnErr := cast.ToBool(appOpts.Get(stopNodeOnErrKey))
keysKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIKeysTomlKey)
exposeKeysStr := cast.ToStringSlice(appOpts.Get(keysKey))
exposedKeys := exposeStoreKeysSorted(exposeKeysStr, keys)
app.cms.AddListeners(exposedKeys)
app.SetStreamingManager(
storetypes.StreamingManager{
ABCIListeners: []storetypes.ABCIListener{abciListener},
StopNodeOnErr: stopNodeOnErr,
},
)
}
func exposeAll(list []string) bool {
for _, ele := range list {
if ele == "*" {
return true
}
}
return false
}
func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStoreKey) []storetypes.StoreKey {
var exposeStoreKeys []storetypes.StoreKey
if exposeAll(keysStr) {
exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keys))
for key := range keys {
exposeStoreKeys = append(exposeStoreKeys, keys[key])
}
} else {
exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keysStr))
for _, keyStr := range keysStr {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
}
}
// sort storeKeys for deterministic output
sort.SliceStable(exposeStoreKeys, func(i, j int) bool {
return exposeStoreKeys[i].Name() < exposeStoreKeys[j].Name()
})
return exposeStoreKeys
}