Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ipni-sync http over libp2p #113

Merged
merged 29 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
cb47480
Implement ipni-sync http over libp2p
gammazero Aug 17, 2023
58a980e
Move libp2phttp functionality inside ipnisync
gammazero Aug 18, 2023
a8fe746
Subscriber does not use existing http client
gammazero Aug 18, 2023
37ef775
Fix handler path processing
gammazero Aug 18, 2023
4c6762a
Change option name
gammazero Aug 19, 2023
f9322b0
Update comments
gammazero Aug 19, 2023
b91b8e8
Remove usused field
gammazero Aug 19, 2023
696de20
Sync gets addrs from peerstore if none supplied
gammazero Aug 19, 2023
6fae9ff
Use dtsync if publisher server if libp2p without HTTP
gammazero Aug 19, 2023
d7b76b3
If server no libp2phttp, then use address to choose plain HTTP or dts…
gammazero Aug 20, 2023
03da4f6
Fix error handling in syncer creation
gammazero Aug 20, 2023
62961ad
Fix race when accessing libp2phttp.HTTPHost
gammazero Aug 20, 2023
ab495c8
If publisher HTTP no availabe at IPNI path, retry without IPNI path. …
gammazero Aug 21, 2023
ef38410
Use latest libp2phttp
gammazero Aug 21, 2023
faa5859
latest libp2phttp
gammazero Aug 21, 2023
f1e3263
ignore emtpy http listen addr
gammazero Aug 23, 2023
0669ee8
update to latest libp2phttp
gammazero Aug 24, 2023
999067b
Update log messages
gammazero Aug 24, 2023
309ba51
Log peer.ID consistently as peer
gammazero Aug 24, 2023
1e89b86
Update libp2p
gammazero Aug 25, 2023
352032b
Convert most tests to use ipnisync publisher
gammazero Aug 25, 2023
f5acb82
update comments
gammazero Aug 25, 2023
2194d3a
Update libp2phttp
gammazero Aug 26, 2023
6828f75
Update libp2p
gammazero Aug 29, 2023
1ee93ea
gostream relocated
gammazero Aug 29, 2023
0c9c4ca
update graphsync
gammazero Aug 29, 2023
c4fda5e
Update to release version of libp2p
gammazero Aug 30, 2023
b2617d4
update comment
gammazero Aug 30, 2023
e6d63d9
Use IPNIPath for libp2p protocol ID
gammazero Aug 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions announce/httpsender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type Sender struct {
userAgent string
}

// New creates a new Sender that sends announce messages over HTTP. Announce
// messages are sent to the specified URLs. The addresses in announce messages
// are modified to include the specified peerID, which is necessary to
// communicate the publisher ID over HTTP.
// New creates a new Sender that sends advertisement announcement messages over
// HTTP. Announcements are sent directly to the specified URLs. The specified
// peerID is added to the multiaddrs contained in the announcements, which is
// how the publisher ID is communicated over HTTP.
func New(announceURLs []*url.URL, peerID peer.ID, options ...Option) (*Sender, error) {
if len(announceURLs) == 0 {
return nil, errors.New("no announce urls")
Expand Down
55 changes: 25 additions & 30 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,29 @@ import (

func TestAnnounceReplace(t *testing.T) {
t.Parallel()
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost := test.MkTestHost(t)
srcHostInfo := peer.AddrInfo{
ID: srcHost.ID(),
Addrs: srcHost.Addrs(),
}
srcLnkS := test.MkLinkSystem(srcStore)
dstHost := test.MkTestHost(t)

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
//dstLnkS := test.MkLinkSystem(dstStore)

dstLnkS, blocked := test.MkBlockedLinkSystem(dstStore)
blocksSeenByHook := make(map[cid.Cid]struct{})
blockHook := func(p peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) {
blocksSeenByHook[c] = struct{}{}
}

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
require.NoError(t, err)
defer pub.Close()

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(),
dagsync.BlockHook(blockHook))
require.NoError(t, err)
defer sub.Close()

require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))
srcHost, srcPrivKey := test.MkTestHostPK(t)
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLnkS := test.MkLinkSystem(srcStore)

pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic(testTopic))
require.NoError(t, err)
defer pub.Close()

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)

watcher, cncl := sub.OnSyncFinished()
defer cncl()
Expand All @@ -67,6 +60,11 @@ func TestAnnounceReplace(t *testing.T) {
firstCid := chainLnks[2].(cidlink.Link).Cid
pub.SetRoot(firstCid)

srcHostInfo := peer.AddrInfo{
ID: srcHost.ID(),
Addrs: srcHost.Addrs(),
}

// Have the subscriber receive an announce. This is the same as if it was
// published by the publisher without having to wait for it to arrive.
err = sub.Announce(context.Background(), firstCid, srcHostInfo)
Expand Down Expand Up @@ -154,7 +152,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
pubh := test.MkTestHost(t)
pubds := dssync.MutexWrap(datastore.NewMapDatastore())
publs := test.MkLinkSystem(pubds)
pub, err := ipnisync.NewPublisher("0.0.0.0:0", publs, pubh.Peerstore().PrivKey(pubh.ID()))
pub, err := ipnisync.NewPublisher(publs, pubh.Peerstore().PrivKey(pubh.ID()), ipnisync.WithHTTPListenAddrs("0.0.0.0:0"))
require.NoError(t, err)
defer pub.Close()

Expand Down Expand Up @@ -214,11 +212,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
func TestAnnounceRepublish(t *testing.T) {
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost := test.MkTestHost(t)
srcHostInfo := peer.AddrInfo{
ID: srcHost.ID(),
Addrs: srcHost.Addrs(),
}
srcHost, srcPrivKey := test.MkTestHostPK(t)
srcLnkS := test.MkLinkSystem(srcStore)
dstHost := test.MkTestHost(t)

Expand All @@ -243,10 +237,9 @@ func TestAnnounceRepublish(t *testing.T) {
require.NoError(t, err)
defer sub1.Close()

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic(testTopic))
require.NoError(t, err)
defer pub.Close()
require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))

watcher2, cncl := sub2.OnSyncFinished()
defer cncl()
Expand All @@ -258,7 +251,11 @@ func TestAnnounceRepublish(t *testing.T) {
pub.SetRoot(firstCid)

// Announce one CID to subscriber1.
err = sub1.Announce(context.Background(), firstCid, srcHostInfo)
pubInfo := peer.AddrInfo{
ID: pub.ID(),
Addrs: pub.Addrs(),
}
err = sub1.Announce(context.Background(), firstCid, pubInfo)
require.NoError(t, err)
t.Log("Sent announce for first CID", firstCid)

Expand Down Expand Up @@ -444,7 +441,7 @@ func mkLnk(t *testing.T, srcStore datastore.Batching) cid.Cid {
}

func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer func(peer.ID) bool) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber, announce.Sender) {
srcHost := test.MkTestHost(t)
srcHost, srcPrivKey := test.MkTestHostPK(t)
dstHost := test.MkTestHost(t)
topics := test.WaitForMeshWithMessage(t, testTopic, srcHost, dstHost)

Expand All @@ -453,7 +450,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0]), p2psender.WithExtraData([]byte("t01000")))
require.NoError(t, err)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic(testTopic))
require.NoError(t, err)

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
Expand All @@ -467,7 +464,5 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
require.NoError(t, err)

require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))

return srcHost, dstHost, pub, sub, p2pSender
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
gostream "github.com/libp2p/go-libp2p-gostream"
"github.com/libp2p/go-libp2p/core/host"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
multistream "github.com/multiformats/go-multistream"
"github.com/libp2p/go-libp2p/p2p/net/gostream"
"github.com/multiformats/go-multistream"
)

const closeTimeout = 30 * time.Second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipni/go-libipni/dagsync/p2p/protocol/head"
"github.com/ipni/go-libipni/dagsync/dtsync/head"
"github.com/ipni/go-libipni/dagsync/test"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
Expand Down
2 changes: 1 addition & 1 deletion dagsync/dtsync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/ipni/go-libipni/dagsync/p2p/protocol/head"
"github.com/ipni/go-libipni/dagsync/dtsync/head"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
Expand Down
2 changes: 1 addition & 1 deletion dagsync/dtsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipni/go-libipni/dagsync/p2p/protocol/head"
"github.com/ipni/go-libipni/dagsync/dtsync/head"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand Down
13 changes: 8 additions & 5 deletions dagsync/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dagsync_test
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io"
"log"
Expand All @@ -15,8 +16,9 @@ import (
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipni/go-libipni/dagsync"
"github.com/ipni/go-libipni/dagsync/dtsync"
"github.com/ipni/go-libipni/dagsync/ipnisync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/multiformats/go-multicodec"
)
Expand All @@ -25,12 +27,13 @@ var srcHost host.Host

func ExamplePublisher() {
// Init dagsync publisher and subscriber.
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost, _ = libp2p.New()
srcPrivKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
srcHost, _ = libp2p.New(libp2p.Identity(srcPrivKey))
defer srcHost.Close()
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLnkS := makeLinkSystem(srcStore)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic("/indexer/ingest/testnet"))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -65,7 +68,7 @@ func ExampleSubscriber() {
srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkSys, "/indexer/ingest/testnet", nil)
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkSys, "/indexer/ingest/testnet")
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion dagsync/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func setupPublisherSubscriber(t *testing.T, subscriberOptions []dagsync.Option)
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLinkSys := test.MkLinkSystem(srcStore)

pub, err := ipnisync.NewPublisher("127.0.0.1:0", srcLinkSys, srcPrivKey, ipnisync.WithServer(true))
pub, err := ipnisync.NewPublisher(srcLinkSys, srcPrivKey, ipnisync.WithHTTPListenAddrs("127.0.0.1:0"))
require.NoError(t, err)
t.Cleanup(func() {
pub.Close()
Expand Down
2 changes: 1 addition & 1 deletion dagsync/ipnisync/ipnipath.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import "path"

const protoVersion = "v1"

var IpniPath = path.Join("/ipni", protoVersion, "ad")
var IPNIPath = path.Join("/ipni", protoVersion, "ad")
116 changes: 109 additions & 7 deletions dagsync/ipnisync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ package ipnisync

import (
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/host"
)

// pubConfig contains all options for configuring Publisher.
// config contains all options for configuring Publisher.
type config struct {
handlerPath string
startServer bool
topic string

streamHost host.Host
requireTLS bool
httpAddrs []string
}

// Option is a function that sets a value in a config.
Expand All @@ -27,8 +34,28 @@ func getOpts(opts []Option) (config, error) {
return cfg, nil
}

// WithHTTPListenAddrs sets the HTTP addresses to listen on. These are in
// addresses:port format and may be prefixed with "https://" or "http://" or to
// specify whether or not TLS is required. If there is no prefix, then one is
// assumed based on the value specified by WithRequireTLS.
//
// Setting HTTP listen addresses is optional when a stream host is provided by
// the WithStreamHost option.
func WithHTTPListenAddrs(addrs ...string) Option {
return func(c *config) error {
for _, addr := range addrs {
if addr != "" {
c.httpAddrs = append(c.httpAddrs, addr)
}
}
return nil
}
}

// WithHandlerPath sets the path used to handle requests to this publisher.
// This should only include the path before the /ipni/v1/ad/ part of the path.
// This specifies the portion of the path before the implicit /ipni/v1/ad/ part
// of the path. Calling WithHandlerPath("/foo/bar") configures the publisher to
// handle HTTP requests on the path "/foo/bar/ipni/v1/ad/".
func WithHandlerPath(urlPath string) Option {
return func(c *config) error {
c.handlerPath = urlPath
Expand All @@ -45,12 +72,87 @@ func WithHeadTopic(topic string) Option {
}
}

// WithServer, if true, starts an http server listening on the given address.
// an HTTP server. If this option is not specified, then no server is started
// and this will need to be done by the caller.
func WithServer(serve bool) Option {
// WithStartServer, if true, starts an http server listening on the given
// address. an HTTP server. If this option is not specified, then no server is
// started and this will need to be done by the caller.
func WithStartServer(start bool) Option {
return func(c *config) error {
c.startServer = serve
c.startServer = start
return nil
}
}

// WithRequireTLS tells whether to allow the publisher to require https (true)
// or to serve non-secure http (false). Default is false, allowing non-secure
// HTTP.
func WithRequireTLS(require bool) Option {
return func(c *config) error {
c.requireTLS = require
return nil
}
}

// WithStreamHost specifies an optional stream based libp2p host used to do
// HTTP over libp2p streams.
func WithStreamHost(h host.Host) Option {
return func(c *config) error {
c.streamHost = h
return nil
}
}

type clientConfig struct {
authPeerID bool
streamHost host.Host

httpTimeout time.Duration
httpRetryMax int
httpRetryWaitMin time.Duration
httpRetryWaitMax time.Duration
}

// Option is a function that sets a value in a config.
type ClientOption func(*clientConfig)

// getClientOpts creates a pubConfig and applies Options to it.
func getClientOpts(opts []ClientOption) clientConfig {
var cfg clientConfig
for _, opt := range opts {
opt(&cfg)
}
return cfg
}

// ClientAuthServerPeerID tells the sync client that it must authenticate the
// server's peer ID.
func ClientAuthServerPeerID(require bool) ClientOption {
return func(c *clientConfig) {
c.authPeerID = require
}
}

// ClientHTTPTimeout specifies a time limit for HTTP requests made by the sync
// client. A value of zero means no timeout.
func ClientHTTPTimeout(to time.Duration) ClientOption {
return func(c *clientConfig) {
c.httpTimeout = to
}
}

// ClientStreamHost specifies an optional stream based libp2p host used by the
// sync client to do HTTP over libp2p streams.
func ClientStreamHost(h host.Host) ClientOption {
return func(c *clientConfig) {
c.streamHost = h
}
}

// ClientHTTPRetry configures a retriable HTTP client. Setting retryMax to
// zero, the default, disables the retriable client.
func ClientHTTPRetry(retryMax int, waitMin, waitMax time.Duration) ClientOption {
return func(c *clientConfig) {
c.httpRetryMax = retryMax
c.httpRetryWaitMin = waitMin
c.httpRetryWaitMax = waitMax
}
}
Loading