Skip to content

Commit

Permalink
Merge branch 'master' into utf8
Browse files Browse the repository at this point in the history
  • Loading branch information
turt2live committed Mar 19, 2020
2 parents 6b46f2a + d0c319d commit f850218
Show file tree
Hide file tree
Showing 26 changed files with 798 additions and 42 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
/logs
/vendor
/config
/gdpr-data
/ipfs

media-repo*.yaml
homeserver.yaml
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/vendor
/config
/gdpr-data
/ipfs

# Generated files
assets.bin.go
Expand Down
31 changes: 7 additions & 24 deletions api/unstable/ipfs_download.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package unstable

import (
"bytes"
"net/http"

"github.com/gorilla/mux"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-http-client"
"github.com/ipfs/interface-go-ipfs-core/path"
"github.com/sirupsen/logrus"
"github.com/turt2live/matrix-media-repo/api"
"github.com/turt2live/matrix-media-repo/api/r0"
"github.com/turt2live/matrix-media-repo/common/rcontext"
"github.com/turt2live/matrix-media-repo/util"
"github.com/turt2live/matrix-media-repo/ipfs_proxy"
)

func IPFSDownload(r *http.Request, rctx rcontext.RequestContext, user api.UserInfo) interface{} {
Expand All @@ -26,29 +22,16 @@ func IPFSDownload(r *http.Request, rctx rcontext.RequestContext, user api.UserIn
"server": server,
})

ipfs, err := httpapi.NewLocalApi()
obj, err := ipfs_proxy.GetObject(ipfsContentId, rctx)
if err != nil {
rctx.Log.Error(err)
return api.InternalServerError("Unexpected error connecting to IPFS")
}

ipfsCid, err := cid.Decode(ipfsContentId)
if err != nil {
rctx.Log.Error(err)
return api.InternalServerError("Unexpected error decoding content ID")
}

ipfsPath := path.IpfsPath(ipfsCid)
node, err := ipfs.ResolveNode(rctx.Context, ipfsPath)
if err != nil {
rctx.Log.Error(err)
return api.InternalServerError("Unexpected error resolving object from IPFS")
return api.InternalServerError("unexpected error")
}

return &r0.DownloadMediaResponse{
ContentType: "application/octet-stream",
Filename: "ipfs.dat", // TODO: Figure out how to get a name out of this
SizeBytes: int64(len(node.RawData())),
Data: util.BufferToStream(bytes.NewBuffer(node.RawData())), // stream to avoid log spam
ContentType: obj.ContentType,
Filename: obj.FileName,
SizeBytes: obj.SizeBytes,
Data: obj.Data,
}
}
4 changes: 4 additions & 0 deletions cmd/media_repo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/turt2live/matrix-media-repo/common/runtime"
"github.com/turt2live/matrix-media-repo/common/version"
"github.com/turt2live/matrix-media-repo/internal_cache"
"github.com/turt2live/matrix-media-repo/ipfs_proxy"
"github.com/turt2live/matrix-media-repo/metrics"
"github.com/turt2live/matrix-media-repo/tasks"
)
Expand Down Expand Up @@ -60,6 +61,9 @@ func main() {
defer watcher.Close()
setupReloads()

logrus.Info("Starting IPFS (if enabled)...")
ipfs_proxy.Reload()

logrus.Info("Starting media repository...")
metrics.Init()
web := webserver.Init()
Expand Down
16 changes: 16 additions & 0 deletions cmd/media_repo/reloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/turt2live/matrix-media-repo/api/webserver"
"github.com/turt2live/matrix-media-repo/common/globals"
"github.com/turt2live/matrix-media-repo/common/runtime"
"github.com/turt2live/matrix-media-repo/ipfs_proxy"
"github.com/turt2live/matrix-media-repo/metrics"
"github.com/turt2live/matrix-media-repo/storage"
"github.com/turt2live/matrix-media-repo/tasks"
Expand All @@ -15,6 +16,7 @@ func setupReloads() {
reloadDatabaseOnChan(globals.DatabaseReloadChan)
reloadDatastoresOnChan(globals.DatastoresReloadChan)
reloadRecurringTasksOnChan(globals.RecurringTasksReloadChan)
reloadIpfsOnChan(globals.IPFSReloadChan)
}

func stopReloads() {
Expand All @@ -24,6 +26,7 @@ func stopReloads() {
globals.DatabaseReloadChan <- false
globals.DatastoresReloadChan <- false
globals.RecurringTasksReloadChan <- false
globals.IPFSReloadChan <- false
}

func reloadWebOnChan(reloadChan chan bool) {
Expand Down Expand Up @@ -93,3 +96,16 @@ func reloadRecurringTasksOnChan(reloadChan chan bool) {
}
}()
}

func reloadIpfsOnChan(reloadChan chan bool) {
go func() {
for {
shouldReload := <-reloadChan
if shouldReload {
ipfs_proxy.Reload()
} else {
ipfs_proxy.Stop()
}
}
}()
}
4 changes: 4 additions & 0 deletions common/config/conf_min_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func NewDefaultMinimumRepoConfig() MinimumRepoConfig {
},
IPFS: IPFSConfig{
Enabled: false,
Daemon: IPFSDaemonConfig{
Enabled: true,
RepoPath: "./ipfs",
},
},
},
}
Expand Down
8 changes: 7 additions & 1 deletion common/config/models_domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,11 @@ type MSC2448Config struct {
}

type IPFSConfig struct {
Enabled bool `yaml:"enabled"`
Enabled bool `yaml:"enabled"`
Daemon IPFSDaemonConfig `yaml:"builtInDaemon"`
}

type IPFSDaemonConfig struct {
Enabled bool `yaml:"enabled"`
RepoPath string `yaml:"repoPath"`
}
7 changes: 7 additions & 0 deletions common/config/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ func onFileChanged() {
logrus.Warn("Log configuration changed - restart the media repo to apply changes")
}

ipfsDaemonChange := configNew.Features.IPFS.Daemon.Enabled != configNow.Features.IPFS.Daemon.Enabled
ipfsDaemonPathChange := configNew.Features.IPFS.Daemon.RepoPath != configNow.Features.IPFS.Daemon.RepoPath
if ipfsDaemonChange || ipfsDaemonPathChange {
logrus.Warn("IPFS Daemon options changed - reloading")
globals.IPFSReloadChan <- true
}

// Always update the datastores
logrus.Warn("Updating datastores to ensure accuracy")
globals.DatastoresReloadChan <- true
Expand Down
1 change: 1 addition & 0 deletions common/globals/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ var MetricsReloadChan = make(chan bool)
var DatabaseReloadChan = make(chan bool)
var DatastoresReloadChan = make(chan bool)
var RecurringTasksReloadChan = make(chan bool)
var IPFSReloadChan = make(chan bool)
31 changes: 28 additions & 3 deletions config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ datastores:
forKinds: ["thumbnails"]
opts:
path: /var/matrix/media

- type: s3
enabled: false # Enable this to set up s3 uploads
forKinds: ["thumbnails", "remote_media", "local_media", "archives"]
Expand All @@ -115,6 +116,20 @@ datastores:
# some providers will need this (like Scaleway). Uncomment to use.
#region: "sfo2"

# The media repo does support an IPFS datastore, but only if the IPFS feature is enabled. If
# the feature is not enabled, this will not work. Note that IPFS support is experimental at
# the moment and not recommended for general use.
#
# NOTE: Everything you upload to IPFS will be publicly accessible, even when the media repo
# puts authentication on the download endpoints. Only use this option for cases where you
# expect your media to be publicly accessible.
- type: ipfs
enabled: false # Enable this to use IPFS support
forKinds: ["local_media"]
# The IPFS datastore currently has no options. It will use the daemon or HTTP API configured
# in the IPFS section of your main config.
opts: {}

# Options for controlling archives. Archives are exports of a particular user's content for
# the purpose of GDPR or moving media to a different server.
archiving:
Expand Down Expand Up @@ -430,9 +445,19 @@ featureSupport:
punch: 1

# IPFS Support
# This is currently beta and might not work at all. It also assumes you have a local IPFS
# agent running so it can access the HTTP API. In future these options might be moved and
# more configurable.
# This is currently experimental and might not work at all.
IPFS:
# Whether or not IPFS support is enabled for use in the media repo.
enabled: false

# Options for the built in IPFS daemon
builtInDaemon:
# Enable this to spawn an in-process IPFS node to use instead of a localhost
# HTTP agent. If this is disabled, the media repo will assume you have an HTTP
# IPFS agent running and accessible. Defaults to using a daemon (true).
enabled: true

# If the Daemon is enabled, set this to the location where the IPFS files should
# be stored. If you're using Docker, this should be something like "/data/ipfs"
# so it can be mapped to a volume.
repoPath: "./ipfs"
4 changes: 2 additions & 2 deletions controllers/data_controller/import_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func doImport(updateChannel chan *importUpdate, taskId int, importId string, ctx
if found {
ctx.Log.Info("Using file from memory")
closer := util.BufferToStream(buf)
_, err := upload_controller.StoreDirect(closer, record.SizeBytes, record.ContentType, record.FileName, userId, record.Origin, record.MediaId, kind, ctx)
_, err := upload_controller.StoreDirect(nil, closer, record.SizeBytes, record.ContentType, record.FileName, userId, record.Origin, record.MediaId, kind, ctx)
if err != nil {
ctx.Log.Errorf("Error importing file: %s", err.Error())
continue
Expand Down Expand Up @@ -310,7 +310,7 @@ func doImport(updateChannel chan *importUpdate, taskId int, importId string, ctx
continue
}

_, err = upload_controller.StoreDirect(r.Body, r.ContentLength, record.ContentType, record.FileName, userId, record.Origin, record.MediaId, kind, ctx)
_, err = upload_controller.StoreDirect(nil, r.Body, r.ContentLength, record.ContentType, record.FileName, userId, record.Origin, record.MediaId, kind, ctx)
if err != nil {
ctx.Log.Errorf("Error importing file: %s", err.Error())
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func downloadResourceWorkFn(request *resource_handler.WorkRequest) interface{} {
defer fileStream.Close()

userId := upload_controller.NoApplicableUploadUser
media, err := upload_controller.StoreDirect(fileStream, downloaded.ContentLength, downloaded.ContentType, downloaded.DesiredFilename, userId, info.origin, info.mediaId, common.KindRemoteMedia, ctx)
media, err := upload_controller.StoreDirect(nil, fileStream, downloaded.ContentLength, downloaded.ContentType, downloaded.DesiredFilename, userId, info.origin, info.mediaId, common.KindRemoteMedia, ctx)
if err != nil {
ctx.Log.Error("Error persisting file: ", err)
return &workerDownloadResponse{err: err}
Expand Down
52 changes: 43 additions & 9 deletions controllers/upload_controller/upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package upload_controller

import (
"database/sql"
"fmt"
"io"
"io/ioutil"
"strconv"
Expand All @@ -19,6 +20,11 @@ import (

const NoApplicableUploadUser = ""

type AlreadyUploadedFile struct {
DS *datastore.DatastoreRef
ObjectInfo *types.ObjectInfo
}

func IsRequestTooLarge(contentLength int64, contentLengthHeader string, ctx rcontext.RequestContext) bool {
if ctx.Config.Uploads.MaxSizeBytes <= 0 {
return false
Expand Down Expand Up @@ -124,7 +130,25 @@ func UploadMedia(contents io.ReadCloser, contentLength int64, contentType string
}
}

return StoreDirect(data, contentLength, contentType, filename, userId, origin, mediaId, common.KindLocalMedia, ctx)
var existingFile *AlreadyUploadedFile = nil
ds, err := datastore.PickDatastore(common.KindLocalMedia, ctx)
if err != nil {
return nil, err
}
if ds.Type == "ipfs" {
// Do the upload now so we can pick the media ID to point to IPFS
info, err := ds.UploadFile(data, contentLength, ctx)
if err != nil {
return nil, err
}
existingFile = &AlreadyUploadedFile{
DS: ds,
ObjectInfo: info,
}
mediaId = fmt.Sprintf("ipfs:%s", info.Location[len("ipfs/"):])
}

return StoreDirect(existingFile, data, contentLength, contentType, filename, userId, origin, mediaId, common.KindLocalMedia, ctx)
}

func trackUploadAsLastAccess(ctx rcontext.RequestContext, media *types.Media) {
Expand Down Expand Up @@ -177,14 +201,24 @@ func IsAllowed(contentType string, reportedContentType string, userId string, ct
return allowed
}

func StoreDirect(contents io.ReadCloser, expectedSize int64, contentType string, filename string, userId string, origin string, mediaId string, kind string, ctx rcontext.RequestContext) (*types.Media, error) {
ds, err := datastore.PickDatastore(kind, ctx)
if err != nil {
return nil, err
}
info, err := ds.UploadFile(contents, expectedSize, ctx)
if err != nil {
return nil, err
func StoreDirect(f *AlreadyUploadedFile, contents io.ReadCloser, expectedSize int64, contentType string, filename string, userId string, origin string, mediaId string, kind string, ctx rcontext.RequestContext) (*types.Media, error) {
var ds *datastore.DatastoreRef
var info *types.ObjectInfo
if f == nil {
dsPicked, err := datastore.PickDatastore(kind, ctx)
if err != nil {
return nil, err
}
ds = dsPicked

fInfo, err := ds.UploadFile(contents, expectedSize, ctx)
if err != nil {
return nil, err
}
info = fInfo
} else {
ds = f.DS
info = f.ObjectInfo
}

stream, err := ds.DownloadFile(info.Location)
Expand Down
1 change: 1 addition & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Any options from the main config can then be overridden per-domain with the exce
* `downloads.expireAfterDays` - because remote media downloads are not for any particular domain.
* `thumbnails.expireAfterDays` - because thumbnails aren't associated with any particular domain.
* `urlPreviews.expireAfterDays` - because previews aren't associated with any particular domain.
* `featureSupport.IPFS.builtInDaemon` - because spawning multiple daemons doesn't make sense.

To override a value, simply provide it in any valid per-domain config:

Expand Down
12 changes: 12 additions & 0 deletions docs/ipfs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# IPFS Support

TODO: An actual instructional document.

## Bugs/TODO

* [ ] Copies nearly everything into memory instead of streaming
* [ ] Downloads don't work because the nodes can be split sometimes (multiple links, RawData off the parent doesn't work)
* [ ] Delete support (see TODO)
* [ ] Overwrite support (if possible, might need to add support for changing media locations)
* [ ] General stability testing
* [ ] Use file names from database, not IPFS (because IPFS doesn't have any)
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ require (
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0
github.com/gorilla/mux v1.7.4
github.com/h2non/filetype v1.0.12
github.com/ipfs/go-cid v0.0.3
github.com/ipfs/go-cid v0.0.4
github.com/ipfs/go-ipfs v0.4.22-0.20191119151441-b8ec598d5801
github.com/ipfs/go-ipfs-config v0.0.11
github.com/ipfs/go-ipfs-files v0.0.4
github.com/ipfs/go-ipfs-http-client v0.0.5
github.com/ipfs/interface-go-ipfs-core v0.2.6
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
Expand All @@ -35,8 +38,11 @@ require (
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/lib/pq v1.3.0
github.com/libp2p/go-libp2p-core v0.2.5
github.com/libp2p/go-libp2p-peerstore v0.1.3
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
github.com/minio/minio-go v6.0.14+incompatible
github.com/multiformats/go-multiaddr v0.1.2
github.com/olebedev/emitter v0.0.0-20190110104742-e8d1457e6aee
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea // indirect
Expand Down
Loading

0 comments on commit f850218

Please sign in to comment.