Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/migration ipfs download #8064

Merged
merged 33 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bddfa4d
Move ipfsdir.go into separate package
gammazero Apr 7, 2021
0565e80
move defaultFetchLimit to fetcher.go
gammazero Apr 8, 2021
b838384
Create IpfsFetcher
gammazero Apr 8, 2021
b5aae2c
Use IpfsFetcher to download migrations
gammazero Apr 8, 2021
6963d0c
Check return from connect()
gammazero Apr 8, 2021
5bdef8c
Move ipfsdir pkg back into migrations, and put ipfsfetcher into a sep…
gammazero Apr 8, 2021
f65110a
Review changes and more tests
gammazero Apr 8, 2021
806831d
log ipfs fetcher close error
gammazero Apr 8, 2021
df9a2ca
Multifetcher only return fetch errors if all fetchers fail
gammazero Apr 9, 2021
0994fba
Read IPFS peers from existing config
gammazero Apr 9, 2021
8a7a4c3
Disable listening for inbound connections
gammazero Apr 9, 2021
de0eaba
Download migrations, and add them to IPFS, based on CLI flags.
gammazero Apr 12, 2021
f9da061
Read peers from CLI flag instead of from existing config file
gammazero Apr 13, 2021
1e16ba1
fix test
gammazero Apr 13, 2021
18bdb52
Add migrations from temp IPFS node after migration
gammazero Apr 14, 2021
55f239b
Build on go < 1.16
gammazero Apr 14, 2021
ac8aa16
fix missed error check
gammazero Apr 14, 2021
890bd7e
Migration temp node listens on any port available
gammazero Apr 16, 2021
e5254d2
Configure migration from config and not from CLI flags
gammazero Apr 17, 2021
163c003
check error return
gammazero Apr 17, 2021
9768016
Review changes
gammazero Apr 19, 2021
3a0be4c
Use latest config
gammazero Apr 20, 2021
ee577fe
Document Migration config
gammazero Apr 20, 2021
9727b4f
error message formatting
gammazero Apr 20, 2021
7d16a95
manually load the config when migrating
Stebalien Apr 20, 2021
96bad6f
fix: factor the migration file reading code into a function
Stebalien Apr 20, 2021
d40d0f1
Move readMigrationConfig into migration.go
gammazero Apr 23, 2021
8d1dbfa
Attempt to read Bootstrap and Peering from config
gammazero May 3, 2021
03454b7
Revert to previous multihash version
gammazero May 4, 2021
bd6646f
Additional unit tests
gammazero May 4, 2021
3a4d85e
Update documentation
gammazero May 4, 2021
40d412a
Better error messages
gammazero May 4, 2021
e37896d
Review changes
gammazero May 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Create IpfsFetcher
  • Loading branch information
gammazero committed May 11, 2021
commit b838384f45db08317fc32f56bbfd7c84ebc43d16
293 changes: 293 additions & 0 deletions repo/fsrepo/migrations/ipfsfetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
package migrations

import (
"context"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
"sync"

"github.com/ipfs/go-ipfs-config"
files "github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/plugin/loader"
"github.com/ipfs/go-ipfs/repo/fsrepo"
"github.com/ipfs/go-ipfs/repo/fsrepo/migrations/ipfsdir"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
ipath "github.com/ipfs/interface-go-ipfs-core/path"
peer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)

type IpfsFetcher struct {
distPath string
limit int64
peers []string

openOnce sync.Once
closeOnce sync.Once
err error
mvdan marked this conversation as resolved.
Show resolved Hide resolved

ipfs iface.CoreAPI
ipfsCancel context.CancelFunc
ipfsCtx context.Context
ipfsTmpDir string
}

// NewIpfsFetcher creates a new IpfsFetcher
//
// Specifying "" for distPath sets the default IPNS path.
// Specifying 0 for fetchLimit sets the default, -1 means no limit.
func NewIpfsFetcher(distPath string, fetchLimit int64, peers []string) *IpfsFetcher {
f := &IpfsFetcher{
limit: defaultFetchLimit,
distPath: LatestIpfsDist,
peers: peers,
}

if distPath != "" {
if !strings.HasPrefix(distPath, "/") {
mvdan marked this conversation as resolved.
Show resolved Hide resolved
distPath = "/" + distPath
}
f.distPath = distPath
}

if fetchLimit != 0 {
if fetchLimit == -1 {
mvdan marked this conversation as resolved.
Show resolved Hide resolved
fetchLimit = 0
}
f.limit = fetchLimit
}

return f
}

// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
// Initialize and start IPFS node on first call to Fetch, since the fetcher
// may be created by not used.
f.openOnce.Do(func() {
f.ipfsTmpDir, f.err = initTempNode(ctx)
if f.err != nil {
return
}

f.err = f.startTempNode()
})

fmt.Printf("Fetching with IPFS: %q\n", filePath)

if f.err != nil {
return nil, f.err
}

iPath, err := parsePath(filepath.Join(f.distPath, filePath))
if err != nil {
return nil, err
}

nd, err := f.ipfs.Unixfs().Get(ctx, iPath)
if err != nil {
return nil, err
}

fileNode, ok := nd.(files.File)
if !ok {
return nil, fmt.Errorf("%q is not a file", filePath)
}

if f.limit != 0 {
return NewLimitReadCloser(fileNode, f.limit), nil
}
return fileNode, nil
}

func (f *IpfsFetcher) Close() error {
f.closeOnce.Do(func() {
if f.ipfsCancel != nil {
// Tell ipfs to stop
f.ipfsCancel()

// Wait until ipfs is stopped
<-f.ipfsCtx.Done()
}

if f.ipfsTmpDir != "" {
// Remove the temp ipfs dir
if err := os.RemoveAll(f.ipfsTmpDir); err != nil {
fmt.Fprintln(os.Stderr, err)
mvdan marked this conversation as resolved.
Show resolved Hide resolved
}
}
})
return nil
}

func initTempNode(ctx context.Context) (string, error) {
defaultPath, err := ipfsdir.IpfsDir("")
if err != nil {
return "", err
}

// TODO: Is there a better way to check it plugins are loaded first?
err = setupPlugins(defaultPath)
// Need to ignore errors here because plugins may already be loaded when
// run from ipfs daemon.
if err != nil {
fmt.Println("Ignored plugin error:", err)
mvdan marked this conversation as resolved.
Show resolved Hide resolved
}

identity, err := config.CreateIdentity(ioutil.Discard, []options.KeyGenerateOption{
options.Key.Type(options.Ed25519Key),
})
if err != nil {
return "", err
}
cfg, err := config.InitWithIdentity(identity)
if err != nil {
return "", err
}

// create temporary ipfs directory
dir, err := ioutil.TempDir("", "ipfs-temp")
if err != nil {
return "", fmt.Errorf("failed to get temp dir: %s", err)
}

// configure the temporary node
cfg.Routing.Type = "dhtclient"

err = fsrepo.Init(dir, cfg)
if err != nil {
os.RemoveAll(dir)
return "", fmt.Errorf("failed to init ephemeral node: %s", err)
}

return dir, nil
}

func (f *IpfsFetcher) startTempNode() error {
// Open the repo
r, err := fsrepo.Open(f.ipfsTmpDir)
if err != nil {
return err
}

// Create a new lifetime context that is used to stop the temp ipfs node
ctxIpfsLife, cancel := context.WithCancel(context.Background())

// Construct the node
node, err := core.NewNode(ctxIpfsLife, &core.BuildCfg{
Online: true,
Routing: libp2p.DHTClientOption,
Repo: r,
})
if err != nil {
cancel()
r.Close()
return err
}

ifaceCore, err := coreapi.NewCoreAPI(node)
if err != nil {
cancel()
return err
}

f.ipfs = ifaceCore
f.ipfsCancel = cancel // stops node
f.ipfsCtx = node.Context() // signals when node is stopped

// Connect to any specified peers
go connect(ctxIpfsLife, ifaceCore, f.peers)

return nil
}

func parsePath(path string) (ipath.Path, error) {
ipfsPath := ipath.New(path)
if ipfsPath.IsValid() == nil {
return ipfsPath, nil
}

u, err := url.Parse(path)
if err != nil {
return nil, fmt.Errorf("%q could not be parsed: %s", path, err)
}

switch proto := u.Scheme; proto {
case "ipfs", "ipld", "ipns":
ipfsPath = ipath.New(filepath.Join("/", proto, u.Host, u.Path))
mvdan marked this conversation as resolved.
Show resolved Hide resolved
case "http", "https":
ipfsPath = ipath.New(u.Path)
default:
return nil, fmt.Errorf("%q is not recognized as an IPFS path", path)
}
return ipfsPath, ipfsPath.IsValid()
}

func setupPlugins(path string) error {
// Load plugins. This will skip the repo if not available.
plugins, err := loader.NewPluginLoader(filepath.Join(path, "plugins"))
if err != nil {
return fmt.Errorf("error loading plugins: %s", err)
}

if err := plugins.Initialize(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}

if err := plugins.Inject(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}

return nil
}

func connect(ctx context.Context, ipfs iface.CoreAPI, peers []string) error {
if len(peers) == 0 {
return nil
}

pinfos := make(map[peer.ID]*peer.AddrInfo, len(peers))
for _, addrStr := range peers {
addr, err := ma.NewMultiaddr(addrStr)
if err != nil {
return err
}
pii, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
return err
}
pi, ok := pinfos[pii.ID]
if !ok {
pi = &peer.AddrInfo{ID: pii.ID}
pinfos[pi.ID] = pi
}
pi.Addrs = append(pi.Addrs, pii.Addrs...)
}

var wg sync.WaitGroup
wg.Add(len(pinfos))
for _, pi := range pinfos {
go func(pi *peer.AddrInfo) {
defer wg.Done()
fmt.Fprintf(os.Stderr, "attempting to connect to peer: %q\n", pi)
err := ipfs.Swarm().Connect(ctx, *pi)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to connect to %s: %s", pi.ID, err)
}
fmt.Fprintf(os.Stderr, "successfully connected to %s\n", pi.ID)
}(pi)
}
wg.Wait()
return nil
mvdan marked this conversation as resolved.
Show resolved Hide resolved
}
47 changes: 47 additions & 0 deletions repo/fsrepo/migrations/ipfsfetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package migrations

import (
"bufio"
"context"
"testing"
)

func TestIpfsFetcher(t *testing.T) {
t.Skip("manually-run dev test only")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe hide it behind a flag, then :) otherwise the only way to run it is by editing the source.

is there any way to eventually make this test work in a local way without extra setup? e.g. by setting up some IPFS/HTTP servers/nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - hidden behind flag

For the HttpFetcher I created a dummy test server. I am not really sure there is a great way to do this for IPFS. Mocking interfaces seems a bit excessive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just a bit worried that we have a test that will practically never be run. Mocking isn't great, but if running a real IPFS node for the test is too difficult, at least testing with a mock by default would be better than testing nothing by default. The flag could still be used to test against a real IPFS node instead of a mock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another pattern is to use an environment variable and put the name of it in the Skip message. Inspired by this blog post https://peter.bourgon.org/blog/2021/04/02/dont-use-build-tags-for-integration-tests.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was handled by adding a skipUnlessEpic function like as is done here:
https://github.com/ipfs/go-ipfs/blob/ef866a1400b3b2861e5e8b6cc9edc8633b890a0a/test/integration/addcat_test.go#L173?


ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewIpfsFetcher("", 0, nil)
defer fetcher.Close()

rc, err := fetcher.Fetch(ctx, "go-ipfs/versions")
if err != nil {
t.Fatal(err)
}
defer rc.Close()

var out []string
mvdan marked this conversation as resolved.
Show resolved Hide resolved
scan := bufio.NewScanner(rc)
for scan.Scan() {
out = append(out, scan.Text())
}
err = scan.Err()
if err != nil {
t.Fatal("could not read versions:", err)
}

if len(out) < 6 {
t.Fatal("do not get all expected data")
}
if out[0] != "v0.3.2" {
t.Fatal("expected v1.0.0 as first line, got", out[0])
}

// Check not found
_, err = fetcher.Fetch(ctx, "/no_such_file")
if err == nil {
t.Fatal("expected error 404")
}

}