-
-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/migration ipfs download #8064
Changes from all commits
bddfa4d
0565e80
b838384
b5aae2c
6963d0c
5bdef8c
f65110a
806831d
df9a2ca
0994fba
8a7a4c3
de0eaba
f9da061
1e16ba1
18bdb52
55f239b
ac8aa16
890bd7e
e5254d2
163c003
9768016
3a0be4c
ee577fe
9727b4f
7d16a95
96bad6f
d40d0f1
8d1dbfa
03454b7
bd6646f
3a4d85e
40d412a
e37896d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,300 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"net/url" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
|
||
config "github.com/ipfs/go-ipfs-config" | ||
"github.com/ipfs/go-ipfs-files" | ||
"github.com/ipfs/go-ipfs/core" | ||
"github.com/ipfs/go-ipfs/core/coreapi" | ||
"github.com/ipfs/go-ipfs/repo/fsrepo/migrations" | ||
"github.com/ipfs/go-ipfs/repo/fsrepo/migrations/ipfsfetcher" | ||
coreiface "github.com/ipfs/interface-go-ipfs-core" | ||
"github.com/ipfs/interface-go-ipfs-core/options" | ||
ipath "github.com/ipfs/interface-go-ipfs-core/path" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
// readMigrationConfig reads the migration config out of the config, avoiding | ||
// reading anything other than the migration section. That way, we're free to | ||
// make arbitrary changes to all _other_ sections in migrations. | ||
func readMigrationConfig(repoRoot string) (*config.Migration, error) { | ||
var cfg struct { | ||
Migration config.Migration | ||
} | ||
|
||
cfgPath, err := config.Filename(repoRoot) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
cfgFile, err := os.Open(cfgPath) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer cfgFile.Close() | ||
|
||
err = json.NewDecoder(cfgFile).Decode(&cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
switch cfg.Migration.Keep { | ||
case "": | ||
cfg.Migration.Keep = config.DefaultMigrationKeep | ||
case "discard", "cache", "keep": | ||
default: | ||
return nil, errors.New("unknown config value, Migrations.Keep must be 'cache', 'pin', or 'discard'") | ||
} | ||
|
||
if len(cfg.Migration.DownloadSources) == 0 { | ||
cfg.Migration.DownloadSources = config.DefaultMigrationDownloadSources | ||
} | ||
|
||
return &cfg.Migration, nil | ||
} | ||
|
||
func readIpfsConfig(repoRoot *string) (bootstrap []string, peers []peer.AddrInfo) { | ||
if repoRoot == nil { | ||
return | ||
} | ||
|
||
cfgPath, err := config.Filename(*repoRoot) | ||
if err != nil { | ||
fmt.Fprintln(os.Stderr, err) | ||
return | ||
} | ||
|
||
cfgFile, err := os.Open(cfgPath) | ||
if err != nil { | ||
fmt.Fprintln(os.Stderr, err) | ||
return | ||
} | ||
defer cfgFile.Close() | ||
|
||
// Attempt to read bootstrap addresses | ||
var bootstrapCfg struct { | ||
Bootstrap []string | ||
} | ||
err = json.NewDecoder(cfgFile).Decode(&bootstrapCfg) | ||
if err != nil { | ||
fmt.Fprintln(os.Stderr, "cannot read bootstrap peers from config") | ||
} else { | ||
bootstrap = bootstrapCfg.Bootstrap | ||
} | ||
|
||
if _, err = cfgFile.Seek(0, 0); err != nil { | ||
fmt.Fprintln(os.Stderr, err) | ||
} | ||
|
||
// Attempt to read peers | ||
var peeringCfg struct { | ||
Peering config.Peering | ||
} | ||
err = json.NewDecoder(cfgFile).Decode(&peeringCfg) | ||
if err != nil { | ||
fmt.Fprintln(os.Stderr, "cannot read peering from config") | ||
} else { | ||
peers = peeringCfg.Peering.Peers | ||
} | ||
|
||
return | ||
} | ||
|
||
// getMigrationFetcher creates one or more fetchers according to | ||
// config.Migration.DownloadSources. If an IpfsFetcher is required, then | ||
// bootstrap and peer information in read from the config file in repoRoot, | ||
// unless repoRoot is nil. | ||
func getMigrationFetcher(cfg *config.Migration, repoRoot *string) (migrations.Fetcher, error) { | ||
const httpUserAgent = "go-ipfs" | ||
|
||
// Fetch migrations from current distribution, or location from environ | ||
fetchDistPath := migrations.GetDistPathEnv(migrations.CurrentIpfsDist) | ||
|
||
var fetchers []migrations.Fetcher | ||
for _, src := range cfg.DownloadSources { | ||
src := strings.TrimSpace(src) | ||
switch src { | ||
case "IPFS", "ipfs": | ||
bootstrap, peers := readIpfsConfig(repoRoot) | ||
fetchers = append(fetchers, ipfsfetcher.NewIpfsFetcher(fetchDistPath, 0, bootstrap, peers)) | ||
case "HTTPS", "https", "HTTP", "http": | ||
fetchers = append(fetchers, migrations.NewHttpFetcher(fetchDistPath, "", httpUserAgent, 0)) | ||
default: | ||
u, err := url.Parse(src) | ||
if err != nil { | ||
return nil, fmt.Errorf("bad gateway address: %s", err) | ||
} | ||
switch u.Scheme { | ||
case "": | ||
u.Scheme = "https" | ||
case "https", "http": | ||
default: | ||
return nil, errors.New("bad gateway address: url scheme must be http or https") | ||
} | ||
fetchers = append(fetchers, migrations.NewHttpFetcher(fetchDistPath, u.String(), httpUserAgent, 0)) | ||
case "": | ||
// Ignore empty string | ||
Comment on lines
+145
to
+146
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we state that if they leave the DownloadSources empty, that results in the default behavior. I thought it might be considered reasonable by some to think |
||
} | ||
} | ||
if len(fetchers) == 0 { | ||
return nil, errors.New("no sources specified") | ||
} | ||
|
||
if len(fetchers) == 1 { | ||
return fetchers[0], nil | ||
} | ||
|
||
// Wrap fetchers in a MultiFetcher to try them in order | ||
return migrations.NewMultiFetcher(fetchers...), nil | ||
} | ||
|
||
func addMigrations(ctx context.Context, node *core.IpfsNode, fetcher migrations.Fetcher, pin bool) error { | ||
var fetchers []migrations.Fetcher | ||
if mf, ok := fetcher.(*migrations.MultiFetcher); ok { | ||
fetchers = mf.Fetchers() | ||
} else { | ||
fetchers = []migrations.Fetcher{fetcher} | ||
} | ||
|
||
for _, fetcher := range fetchers { | ||
switch f := fetcher.(type) { | ||
case *ipfsfetcher.IpfsFetcher: | ||
// Add migrations by connecting to temp node and getting from IPFS | ||
err := addMigrationPaths(ctx, node, f.AddrInfo(), f.FetchedPaths(), pin) | ||
if err != nil { | ||
return err | ||
} | ||
case *migrations.HttpFetcher: | ||
// Add the downloaded migration files directly | ||
if migrations.DownloadDirectory != "" { | ||
var paths []string | ||
err := filepath.Walk(migrations.DownloadDirectory, func(filePath string, info os.FileInfo, err error) error { | ||
if info.IsDir() { | ||
return nil | ||
} | ||
paths = append(paths, filePath) | ||
return nil | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
err = addMigrationFiles(ctx, node, paths, pin) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
default: | ||
return errors.New("Cannot get migrations from unknown fetcher type") | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// addMigrationFiles adds the files at paths to IPFS, optionally pinning them | ||
func addMigrationFiles(ctx context.Context, node *core.IpfsNode, paths []string, pin bool) error { | ||
Comment on lines
+204
to
+205
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that this is the wrong way to move DAGs around (i.e. the CIDs could not match, but in the near term will) can we first check the CIDs of the files via If this is a pain to do then I think we should just scrap storing the files downloaded over HTTP until we allow DAG export via gateways so users can't end up with pins that they can't really track (since pins don't have names and so they'd have to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that we just merged CAR export on gateways (#8111) I feel much less strongly about this and think we can leave it as it is. What we can do is in a later RC (or next release) switch the HTTP Fetcher to really be a CAR fetcher and then this problem goes away. For the time being we control how the migrations are created so we can just leave the defaults. cc @Stebalien as a heads up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know if you want me to remove the support for adding downloads from HTTP to IPFS. ...since the CAR download is on the horizon, and less code to worry about. |
||
if len(paths) == 0 { | ||
return nil | ||
} | ||
ifaceCore, err := coreapi.NewCoreAPI(node) | ||
if err != nil { | ||
return err | ||
} | ||
ufs := ifaceCore.Unixfs() | ||
|
||
// Add migration files | ||
for _, filePath := range paths { | ||
f, err := os.Open(filePath) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
fi, err := f.Stat() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
ipfsPath, err := ufs.Add(ctx, files.NewReaderStatFile(f, fi), options.Unixfs.Pin(pin)) | ||
if err != nil { | ||
return err | ||
} | ||
fmt.Printf("Added migration file %q: %s\n", filepath.Base(filePath), ipfsPath) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// addMigrationPaths adds the files at paths to IPFS, optionally pinning | ||
// them. This is done after connecting to the peer. | ||
func addMigrationPaths(ctx context.Context, node *core.IpfsNode, peerInfo peer.AddrInfo, paths []ipath.Path, pin bool) error { | ||
if len(paths) == 0 { | ||
return errors.New("nothing downloaded by ipfs fetcher") | ||
} | ||
if len(peerInfo.Addrs) == 0 { | ||
return errors.New("no local swarm address for migration node") | ||
} | ||
|
||
ipfs, err := coreapi.NewCoreAPI(node) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Connect to temp node | ||
if err := ipfs.Swarm().Connect(ctx, peerInfo); err != nil { | ||
return fmt.Errorf("could not connect to migration peer %q: %s", peerInfo.ID, err) | ||
} | ||
fmt.Printf("connected to migration peer %q\n", peerInfo) | ||
|
||
if pin { | ||
pinApi := ipfs.Pin() | ||
for _, ipfsPath := range paths { | ||
err := pinApi.Add(ctx, ipfsPath) | ||
if err != nil { | ||
return err | ||
} | ||
fmt.Printf("Added and pinned migration file: %q\n", ipfsPath) | ||
} | ||
return nil | ||
} | ||
|
||
ufs := ipfs.Unixfs() | ||
|
||
// Add migration files | ||
for _, ipfsPath := range paths { | ||
err = ipfsGet(ctx, ufs, ipfsPath) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func ipfsGet(ctx context.Context, ufs coreiface.UnixfsAPI, ipfsPath ipath.Path) error { | ||
nd, err := ufs.Get(ctx, ipfsPath) | ||
if err != nil { | ||
return err | ||
} | ||
defer nd.Close() | ||
|
||
fnd, ok := nd.(files.File) | ||
if !ok { | ||
return fmt.Errorf("not a file node: %q", ipfsPath) | ||
} | ||
_, err = io.Copy(ioutil.Discard, fnd) | ||
if err != nil { | ||
return fmt.Errorf("cannot read migration: %w", err) | ||
} | ||
fmt.Printf("Added migration file: %q\n", ipfsPath) | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we do this both here and later (https://github.com/ipfs/go-ipfs/pull/8064/files#diff-73163a91b837dff9f7108907361d53ed19b735ca59e5231eed6b49ade275cacfR455)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done in a deferred function here, so that cleanup will happen if the daemon returns early due to an error. It is done later after finishing migration work, so that that directory is not sitting around for the lifetime of the daemon and is not left behind if the daemon exits ungracefully.
Added comment to explain.