Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/migration ipfs download #8064

Merged
merged 33 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bddfa4d
Move ipfsdir.go into separate package
gammazero Apr 7, 2021
0565e80
move defaultFetchLimit to fetcher.go
gammazero Apr 8, 2021
b838384
Create IpfsFetcher
gammazero Apr 8, 2021
b5aae2c
Use IpfsFetcher to download migrations
gammazero Apr 8, 2021
6963d0c
Check return from connect()
gammazero Apr 8, 2021
5bdef8c
Move ipfsdir pkg back into migrations, and put ipfsfetcher into a sep…
gammazero Apr 8, 2021
f65110a
Review changes and more tests
gammazero Apr 8, 2021
806831d
log ipfs fetcher close error
gammazero Apr 8, 2021
df9a2ca
Multifetcher only return fetch errors if all fetchers fail
gammazero Apr 9, 2021
0994fba
Read IPFS peers from existing config
gammazero Apr 9, 2021
8a7a4c3
Disable listening for inbound connections
gammazero Apr 9, 2021
de0eaba
Download migrations, and add them to IPFS, based on CLI flags.
gammazero Apr 12, 2021
f9da061
Read peers from CLI flag instead of from existing config file
gammazero Apr 13, 2021
1e16ba1
fix test
gammazero Apr 13, 2021
18bdb52
Add migrations from temp IPFS node after migration
gammazero Apr 14, 2021
55f239b
Build on go < 1.16
gammazero Apr 14, 2021
ac8aa16
fix missed error check
gammazero Apr 14, 2021
890bd7e
Migration temp node listens on any port available
gammazero Apr 16, 2021
e5254d2
Configure migration from config and not from CLI flags
gammazero Apr 17, 2021
163c003
check error return
gammazero Apr 17, 2021
9768016
Review changes
gammazero Apr 19, 2021
3a0be4c
Use latest config
gammazero Apr 20, 2021
ee577fe
Document Migration config
gammazero Apr 20, 2021
9727b4f
error message formatting
gammazero Apr 20, 2021
7d16a95
manually load the config when migrating
Stebalien Apr 20, 2021
96bad6f
fix: factor the migration file reading code into a function
Stebalien Apr 20, 2021
d40d0f1
Move readMigrationConfig into migration.go
gammazero Apr 23, 2021
8d1dbfa
Attempt to read Bootstrap and Peering from config
gammazero May 3, 2021
03454b7
Revert to previous multihash version
gammazero May 4, 2021
bd6646f
Additional unit tests
gammazero May 4, 2021
3a4d85e
Update documentation
gammazero May 4, 2021
40d412a
Better error messages
gammazero May 4, 2021
e37896d
Review changes
gammazero May 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
_ "expvar"
"fmt"
"io/ioutil"
"net"
"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -268,6 +269,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
}
}

var cacheMigrations, pinMigrations bool
var fetcher migrations.Fetcher

// acquire the repo lock _before_ constructing a node. we need to make
// sure we are permitted to access the resources (datastore, etc.)
repo, err := fsrepo.Open(cctx.ConfigRoot)
Expand All @@ -288,8 +292,38 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
return fmt.Errorf("fs-repo requires migration")
}

// Fetch migrations from current distribution, or location from environ
fetcher := migrations.NewHttpFetcher(migrations.GetDistPathEnv(migrations.CurrentIpfsDist), "", "go-ipfs", 0)
migrationCfg, err := readMigrationConfig(cctx.ConfigRoot)
if err != nil {
return err
}

fetcher, err = getMigrationFetcher(migrationCfg, &cctx.ConfigRoot)
if err != nil {
return err
}
defer fetcher.Close()

if migrationCfg.Keep == "cache" {
cacheMigrations = true
} else if migrationCfg.Keep == "pin" {
pinMigrations = true
}

if cacheMigrations || pinMigrations {
// Create temp directory to store downloaded migration archives
migrations.DownloadDirectory, err = ioutil.TempDir("", "migrations")
if err != nil {
return err
}
// Defer cleanup of download directory so that it gets cleaned up
// if daemon returns early due to error
defer func() {
if migrations.DownloadDirectory != "" {
os.RemoveAll(migrations.DownloadDirectory)
}
}()
Comment on lines +320 to +324
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in a deferred function here, so that cleanup will happen if the daemon returns early due to an error. It is done later after finishing migration work, so that that directory is not sitting around for the lifetime of the daemon and is not left behind if the daemon exits ungracefully.

Added comment to explain.

}

err = migrations.RunMigration(cctx.Context(), fetcher, fsrepo.RepoVersion, "", false)
if err != nil {
fmt.Println("The migrations of fs-repo failed:")
Expand Down Expand Up @@ -420,6 +454,26 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
return err
}

// Add any files downloaded by migration.
if cacheMigrations || pinMigrations {
err = addMigrations(cctx.Context(), node, fetcher, pinMigrations)
if err != nil {
fmt.Fprintln(os.Stderr, "Could not add migragion to IPFS:", err)
}
// Remove download directory so that it does not remain for lifetime of
// daemon or get left behind if daemon has a hard exit
os.RemoveAll(migrations.DownloadDirectory)
migrations.DownloadDirectory = ""
}
if fetcher != nil {
// If there is an error closing the IpfsFetcher, then print error, but
// do not fail because of it.
err = fetcher.Close()
if err != nil {
log.Errorf("error closing IPFS fetcher: %s", err)
}
}

// construct http gateway
gwErrc, err := serveHTTPGateway(req, cctx)
if err != nil {
Expand Down
300 changes: 300 additions & 0 deletions cmd/ipfs/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"

config "github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi"
"github.com/ipfs/go-ipfs/repo/fsrepo/migrations"
"github.com/ipfs/go-ipfs/repo/fsrepo/migrations/ipfsfetcher"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
ipath "github.com/ipfs/interface-go-ipfs-core/path"
"github.com/libp2p/go-libp2p-core/peer"
)

// readMigrationConfig reads the migration config out of the config, avoiding
// reading anything other than the migration section. That way, we're free to
// make arbitrary changes to all _other_ sections in migrations.
func readMigrationConfig(repoRoot string) (*config.Migration, error) {
var cfg struct {
Migration config.Migration
}

cfgPath, err := config.Filename(repoRoot)
if err != nil {
return nil, err
}

cfgFile, err := os.Open(cfgPath)
if err != nil {
return nil, err
}
defer cfgFile.Close()

err = json.NewDecoder(cfgFile).Decode(&cfg)
if err != nil {
return nil, err
}

switch cfg.Migration.Keep {
case "":
cfg.Migration.Keep = config.DefaultMigrationKeep
case "discard", "cache", "keep":
default:
return nil, errors.New("unknown config value, Migrations.Keep must be 'cache', 'pin', or 'discard'")
}

if len(cfg.Migration.DownloadSources) == 0 {
cfg.Migration.DownloadSources = config.DefaultMigrationDownloadSources
}

return &cfg.Migration, nil
}

func readIpfsConfig(repoRoot *string) (bootstrap []string, peers []peer.AddrInfo) {
if repoRoot == nil {
return
}

cfgPath, err := config.Filename(*repoRoot)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}

cfgFile, err := os.Open(cfgPath)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
defer cfgFile.Close()

// Attempt to read bootstrap addresses
var bootstrapCfg struct {
Bootstrap []string
}
err = json.NewDecoder(cfgFile).Decode(&bootstrapCfg)
if err != nil {
fmt.Fprintln(os.Stderr, "cannot read bootstrap peers from config")
} else {
bootstrap = bootstrapCfg.Bootstrap
}

if _, err = cfgFile.Seek(0, 0); err != nil {
fmt.Fprintln(os.Stderr, err)
}

// Attempt to read peers
var peeringCfg struct {
Peering config.Peering
}
err = json.NewDecoder(cfgFile).Decode(&peeringCfg)
if err != nil {
fmt.Fprintln(os.Stderr, "cannot read peering from config")
} else {
peers = peeringCfg.Peering.Peers
}

return
}

// getMigrationFetcher creates one or more fetchers according to
// config.Migration.DownloadSources. If an IpfsFetcher is required, then
// bootstrap and peer information in read from the config file in repoRoot,
// unless repoRoot is nil.
func getMigrationFetcher(cfg *config.Migration, repoRoot *string) (migrations.Fetcher, error) {
const httpUserAgent = "go-ipfs"

// Fetch migrations from current distribution, or location from environ
fetchDistPath := migrations.GetDistPathEnv(migrations.CurrentIpfsDist)

var fetchers []migrations.Fetcher
for _, src := range cfg.DownloadSources {
src := strings.TrimSpace(src)
switch src {
case "IPFS", "ipfs":
bootstrap, peers := readIpfsConfig(repoRoot)
fetchers = append(fetchers, ipfsfetcher.NewIpfsFetcher(fetchDistPath, 0, bootstrap, peers))
case "HTTPS", "https", "HTTP", "http":
fetchers = append(fetchers, migrations.NewHttpFetcher(fetchDistPath, "", httpUserAgent, 0))
default:
u, err := url.Parse(src)
if err != nil {
return nil, fmt.Errorf("bad gateway address: %s", err)
}
switch u.Scheme {
case "":
u.Scheme = "https"
case "https", "http":
default:
return nil, errors.New("bad gateway address: url scheme must be http or https")
}
fetchers = append(fetchers, migrations.NewHttpFetcher(fetchDistPath, u.String(), httpUserAgent, 0))
case "":
// Ignore empty string
Comment on lines +145 to +146
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we state that if they leave the DownloadSources empty, that results in the default behavior. I thought it might be considered reasonable by some to think [""] means empty.

}
}
if len(fetchers) == 0 {
return nil, errors.New("no sources specified")
}

if len(fetchers) == 1 {
return fetchers[0], nil
}

// Wrap fetchers in a MultiFetcher to try them in order
return migrations.NewMultiFetcher(fetchers...), nil
}

func addMigrations(ctx context.Context, node *core.IpfsNode, fetcher migrations.Fetcher, pin bool) error {
var fetchers []migrations.Fetcher
if mf, ok := fetcher.(*migrations.MultiFetcher); ok {
fetchers = mf.Fetchers()
} else {
fetchers = []migrations.Fetcher{fetcher}
}

for _, fetcher := range fetchers {
switch f := fetcher.(type) {
case *ipfsfetcher.IpfsFetcher:
// Add migrations by connecting to temp node and getting from IPFS
err := addMigrationPaths(ctx, node, f.AddrInfo(), f.FetchedPaths(), pin)
if err != nil {
return err
}
case *migrations.HttpFetcher:
// Add the downloaded migration files directly
if migrations.DownloadDirectory != "" {
var paths []string
err := filepath.Walk(migrations.DownloadDirectory, func(filePath string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
paths = append(paths, filePath)
return nil
})
if err != nil {
return err
}
err = addMigrationFiles(ctx, node, paths, pin)
if err != nil {
return err
}
}
default:
return errors.New("Cannot get migrations from unknown fetcher type")
}
}

return nil
}

// addMigrationFiles adds the files at paths to IPFS, optionally pinning them
func addMigrationFiles(ctx context.Context, node *core.IpfsNode, paths []string, pin bool) error {
Comment on lines +204 to +205
Copy link
Contributor

@aschmahmann aschmahmann May 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is the wrong way to move DAGs around (i.e. the CIDs could not match, but in the near term will) can we first check the CIDs of the files via ipfs add --only-hash (should be a UnixFS CoreAPI option) and only if it matches then pin.

If this is a pain to do then I think we should just scrap storing the files downloaded over HTTP until we allow DAG export via gateways so users can't end up with pins that they can't really track (since pins don't have names and so they'd have to do ipfs get/cat on the pin to figure out what it is).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we just merged CAR export on gateways (#8111) I feel much less strongly about this and think we can leave it as it is.

What we can do is in a later RC (or next release) switch the HTTP Fetcher to really be a CAR fetcher and then this problem goes away. For the time being we control how the migrations are created so we can just leave the defaults.

cc @Stebalien as a heads up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you want me to remove the support for adding downloads from HTTP to IPFS. ...since the CAR download is on the horizon, and less code to worry about.

if len(paths) == 0 {
return nil
}
ifaceCore, err := coreapi.NewCoreAPI(node)
if err != nil {
return err
}
ufs := ifaceCore.Unixfs()

// Add migration files
for _, filePath := range paths {
f, err := os.Open(filePath)
if err != nil {
return err
}

fi, err := f.Stat()
if err != nil {
return err
}

ipfsPath, err := ufs.Add(ctx, files.NewReaderStatFile(f, fi), options.Unixfs.Pin(pin))
if err != nil {
return err
}
fmt.Printf("Added migration file %q: %s\n", filepath.Base(filePath), ipfsPath)
}

return nil
}

// addMigrationPaths adds the files at paths to IPFS, optionally pinning
// them. This is done after connecting to the peer.
func addMigrationPaths(ctx context.Context, node *core.IpfsNode, peerInfo peer.AddrInfo, paths []ipath.Path, pin bool) error {
if len(paths) == 0 {
return errors.New("nothing downloaded by ipfs fetcher")
}
if len(peerInfo.Addrs) == 0 {
return errors.New("no local swarm address for migration node")
}

ipfs, err := coreapi.NewCoreAPI(node)
if err != nil {
return err
}

// Connect to temp node
if err := ipfs.Swarm().Connect(ctx, peerInfo); err != nil {
return fmt.Errorf("could not connect to migration peer %q: %s", peerInfo.ID, err)
}
fmt.Printf("connected to migration peer %q\n", peerInfo)

if pin {
pinApi := ipfs.Pin()
for _, ipfsPath := range paths {
err := pinApi.Add(ctx, ipfsPath)
if err != nil {
return err
}
fmt.Printf("Added and pinned migration file: %q\n", ipfsPath)
}
return nil
}

ufs := ipfs.Unixfs()

// Add migration files
for _, ipfsPath := range paths {
err = ipfsGet(ctx, ufs, ipfsPath)
if err != nil {
return err
}
}

return nil
}

func ipfsGet(ctx context.Context, ufs coreiface.UnixfsAPI, ipfsPath ipath.Path) error {
nd, err := ufs.Get(ctx, ipfsPath)
if err != nil {
return err
}
defer nd.Close()

fnd, ok := nd.(files.File)
if !ok {
return fmt.Errorf("not a file node: %q", ipfsPath)
}
_, err = io.Copy(ioutil.Discard, fnd)
if err != nil {
return fmt.Errorf("cannot read migration: %w", err)
}
fmt.Printf("Added migration file: %q\n", ipfsPath)
return nil
}
Loading