Skip to content

Commit

Permalink
fix: default to bitswap-client-only
Browse files Browse the repository at this point in the history
This restores bitswap setup we had in v1.0.0 where only the client
is initialized by default.
  • Loading branch information
lidel committed May 14, 2024
1 parent ad18bc8 commit 0a9838b
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 107 deletions.
2 changes: 1 addition & 1 deletion docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Default: 100
### `RAINBOW_PEERING_SHARED_CACHE`

> [!WARNING]
> Experimental feature.
> Experimental feature, will result in increased network I/O due to Bitswap server being run in addition to the lean client.
Enable sharing of local cache to peers safe-listed with `RAINBOW_PEERING`
or `RAINBOW_SEED_PEERING`.
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ Generate an identity seed and launch a gateway:
Name: "peering-shared-cache",
Value: false,
EnvVars: []string{"RAINBOW_PEERING_SHARED_CACHE"},
Usage: "Enable sharing of local cache to peers safe-listed with --peering. Rainbow will respond to Bitswap queries from these peers, serving locally cached data as needed.",
Usage: "(EXPERIMENTAL: increased network I/O) Enable sharing of local cache to peers safe-listed with --peering. Rainbow will respond to Bitswap queries from these peers, serving locally cached data as needed.",
},
&cli.StringFlag{
Name: "blockstore",
Expand Down Expand Up @@ -360,7 +360,7 @@ share the same seed as long as the indexes are different.
IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"),
DenylistSubs: cctx.StringSlice("denylists"),
Peering: peeringAddrs,
PeeringCache: cctx.Bool("peering-shared-cache"),
PeeringSharedCache: cctx.Bool("peering-shared-cache"),

Check warning on line 363 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L363

Added line #L363 was not covered by tests
Seed: seed,
SeedIndex: index,
SeedPeering: cctx.Bool("seed-peering"),
Expand Down
109 changes: 7 additions & 102 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ import (
"github.com/dgraph-io/badger/v4/options"
nopfs "github.com/ipfs-shipyard/nopfs"
nopfsipfs "github.com/ipfs-shipyard/nopfs/ipfs"
"github.com/ipfs/boxo/bitswap"
wl "github.com/ipfs/boxo/bitswap/client/wantlist"
bsmspb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
bsserver "github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
Expand All @@ -27,13 +22,9 @@ import (
"github.com/ipfs/boxo/namesys"
"github.com/ipfs/boxo/path/resolver"
"github.com/ipfs/boxo/peering"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
badger4 "github.com/ipfs/go-ds-badger4"
flatfs "github.com/ipfs/go-ds-flatfs"
delay "github.com/ipfs/go-ipfs-delay"
metri "github.com/ipfs/go-metrics-interface"
mprome "github.com/ipfs/go-metrics-prometheus"
"github.com/ipfs/go-unixfsnode"
dagpb "github.com/ipld/go-codec-dagpb"
Expand Down Expand Up @@ -76,7 +67,7 @@ type Node struct {
dataDir string
datastore datastore.Batching
blockstore blockstore.Blockstore
bs *bitswap.Bitswap
exchange exchange.Interface
bsrv blockservice.BlockService
resolver resolver.Resolver
ns namesys.NameSystem
Expand Down Expand Up @@ -107,8 +98,9 @@ type Config struct {
IpnsMaxCacheTTL time.Duration

DenylistSubs []string
Peering []peer.AddrInfo
PeeringCache bool

Peering []peer.AddrInfo
PeeringSharedCache bool

Seed string
SeedIndex int
Expand Down Expand Up @@ -208,7 +200,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
return nil, err
}

bswap := setupBitswap(ctx, cfg, h, cr, blkst)
bswap := setupBitswapExchange(ctx, cfg, h, cr, blkst)

err = os.Mkdir(filepath.Join(cfg.DataDir, "denylists"), 0755)
if err != nil && !errors.Is(err, fs.ErrExist) {
Expand All @@ -233,7 +225,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
return nil, err
}

bsrv := blockservice.New(blkst, &noNotifyExchange{bswap},
bsrv := blockservice.New(blkst, bswap,
// if we are doing things right, our bitswap wantlists should
// not have blocks that we already have (see
// https://github.com/ipfs/boxo/blob/e0d4b3e9b91e9904066a10278e366c9a6d9645c7/blockservice/blockservice.go#L272). Thus
Expand Down Expand Up @@ -269,7 +261,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
blockstore: blkst,
dataDir: cfg.DataDir,
datastore: ds,
bs: bswap,
exchange: bswap,
ns: ns,
vs: vs,
bsrv: bsrv,
Expand Down Expand Up @@ -398,90 +390,3 @@ func setupPeering(cfg Config, h host.Host) error {

return nil
}

func setupBitswap(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) *bitswap.Bitswap {
var (
peerBlockRequestFilter bsserver.PeerBlockRequestFilter
)
if cfg.PeeringCache && len(cfg.Peering) > 0 {
peers := make(map[peer.ID]struct{}, len(cfg.Peering))
for _, a := range cfg.Peering {
peers[a.ID] = struct{}{}
}

peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
_, ok := peers[p]
return ok
}
} else {
peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
return false
}
}

bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)
bswap := bitswap.New(bsctx, bn, bstore,
// --- Client Options
// default is 1 minute to search for a random live-want (1
// CID). I think we want to search for random live-wants more
// often although probably it overlaps with general
// rebroadcasts.
bitswap.RebroadcastDelay(delay.Fixed(10*time.Second)),
// ProviderSearchDelay: default is 1 second.
bitswap.ProviderSearchDelay(time.Second),
bitswap.WithoutDuplicatedBlockStats(),

// ---- Server Options
bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter),
bitswap.ProvideEnabled(false),
// Do not keep track of other peer's wantlists, we only want to reply if we
// have a block. If we get it later, it's no longer relevant.
bitswap.WithPeerLedger(&noopPeerLedger{}),
// When we don't have a block, don't reply. This reduces processment.
bitswap.SetSendDontHaves(false),
)
bn.Start(bswap)

return bswap
}

type noopPeerLedger struct{}

func (*noopPeerLedger) Wants(p peer.ID, e wl.Entry) {}

func (*noopPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
return false
}

func (*noopPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ bsmspb.Message_Wantlist_WantType) {
}

func (*noopPeerLedger) Peers(k cid.Cid) []bsserver.PeerEntry {
return nil
}

func (*noopPeerLedger) CollectPeerIDs() []peer.ID {
return nil
}

func (*noopPeerLedger) WantlistSizeForPeer(p peer.ID) int {
return 0
}

func (*noopPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
return nil
}

func (*noopPeerLedger) ClearPeerWantlist(p peer.ID) {}

func (*noopPeerLedger) PeerDisconnected(p peer.ID) {}

type noNotifyExchange struct {
exchange.Interface
}

func (e *noNotifyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// Rainbow does not notify when we get new blocks in our Blockservice.
return nil
}
122 changes: 122 additions & 0 deletions setup_bitswap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"context"
"time"

"github.com/ipfs/boxo/bitswap"
bsclient "github.com/ipfs/boxo/bitswap/client"
wl "github.com/ipfs/boxo/bitswap/client/wantlist"
bsmspb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
bsserver "github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
metri "github.com/ipfs/go-metrics-interface"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface {
bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)

// --- Client Options
// bitswap.RebroadcastDelay: default is 1 minute to search for a random
// live-want (1 CID). I think we want to search for random live-wants more
// often although probably it overlaps with general rebroadcasts.
rebroadcastDelay := delay.Fixed(10 * time.Second)
// bitswap.ProviderSearchDelay: default is 1 second.
providerSearchDelay := 1 * time.Second

// If peering and shared cache are both enabled, we initialize both a
// Client and a Server with custom request filter and custom options.
// client+server is more expensive but necessary when deployment requires
// serving cached blocks to safelisted peerids
if cfg.PeeringSharedCache && len(cfg.Peering) > 0 {
var peerBlockRequestFilter bsserver.PeerBlockRequestFilter

// Set up request filter to only respond to request for safelisted (peered) nodes
peers := make(map[peer.ID]struct{}, len(cfg.Peering))
for _, a := range cfg.Peering {
peers[a.ID] = struct{}{}
}
peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
_, ok := peers[p]
return ok
}

// Initialize client+server
bswap := bitswap.New(bsctx, bn, bstore,
// --- Client Options
bitswap.RebroadcastDelay(rebroadcastDelay),
bitswap.ProviderSearchDelay(providerSearchDelay),
bitswap.WithoutDuplicatedBlockStats(),

// ---- Server Options
bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter),
bitswap.ProvideEnabled(false),
// Do not keep track of other peer's wantlists, we only want to reply if we
// have a block. If we get it later, it's no longer relevant.
bitswap.WithPeerLedger(&noopPeerLedger{}),
// When we don't have a block, don't reply. This reduces processment.
bitswap.SetSendDontHaves(false),
)
bn.Start(bswap)
return &noNotifyExchange{bswap}
}

// By default, rainbow runs with bitswap client alone
bswap := bsclient.New(bsctx, bn, bstore,
// --- Client Options
bsclient.RebroadcastDelay(rebroadcastDelay),
bsclient.ProviderSearchDelay(providerSearchDelay),
bsclient.WithoutDuplicatedBlockStats(),
)
bn.Start(bswap)
return bswap
}

type noopPeerLedger struct{}

func (*noopPeerLedger) Wants(p peer.ID, e wl.Entry) {}

func (*noopPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
return false
}

func (*noopPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ bsmspb.Message_Wantlist_WantType) {
}

func (*noopPeerLedger) Peers(k cid.Cid) []bsserver.PeerEntry {
return nil

Check warning on line 96 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}

func (*noopPeerLedger) CollectPeerIDs() []peer.ID {
return nil

Check warning on line 100 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}

func (*noopPeerLedger) WantlistSizeForPeer(p peer.ID) int {
return 0
}

func (*noopPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
return nil

Check warning on line 108 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L107-L108

Added lines #L107 - L108 were not covered by tests
}

func (*noopPeerLedger) ClearPeerWantlist(p peer.ID) {}

Check warning on line 111 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L111

Added line #L111 was not covered by tests

func (*noopPeerLedger) PeerDisconnected(p peer.ID) {}

Check warning on line 113 in setup_bitswap.go

View check run for this annotation

Codecov / codecov/patch

setup_bitswap.go#L113

Added line #L113 was not covered by tests

type noNotifyExchange struct {
exchange.Interface
}

func (e *noNotifyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// Rainbow does not notify when we get new blocks in our Blockservice.
return nil
}
4 changes: 2 additions & 2 deletions setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func mustPeeredNodes(t *testing.T, configuration [][]int, peeringShareCache bool
RoutingV1Endpoints: []string{},
ListenAddrs: []string{mas[i].String()},
Peering: []peer.AddrInfo{},
PeeringCache: peeringShareCache,
PeeringSharedCache: peeringShareCache,
}

for _, j := range configuration[i] {
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestPeering(t *testing.T) {
}, false)
}

func TestPeeringCache(t *testing.T) {
func TestPeeringSharedCache(t *testing.T) {
nodes := mustPeeredNodes(t, [][]int{
{1}, // 0 peered to 1
{0}, // 1 peered to 0
Expand Down

0 comments on commit 0a9838b

Please sign in to comment.