Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
update testing backend
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Mar 23, 2023
1 parent 386e3c1 commit 34e97fb
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 13 deletions.
249 changes: 239 additions & 10 deletions backend/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,29 @@ import (
"bytes"
"context"
"fmt"
bsfetcher "github.com/ipfs/go-fetcher/impl/blockservice"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-libipfs/blocks"
"github.com/ipfs/go-libipfs/files"
"github.com/ipfs/go-libipfs/gateway"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-path"
"github.com/ipfs/go-path/resolver"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipfs/go-unixfsnode"
"github.com/ipld/go-car"
"github.com/ipld/go-car/util"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/schema"
"io"
"net/http"
"os"
"net/url"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -76,7 +90,7 @@ func makeGatewayCARHandler(bsrv blockservice.BlockService, port int) (*http.Serv
if formatParam := r.URL.Query().Get("format"); formatParam != "" {
isCar = formatParam == "car"
if !isCar {
http.Error(w, "only raw format supported", http.StatusBadRequest)
http.Error(w, "only car format supported", http.StatusBadRequest)
return
}
} else {
Expand Down Expand Up @@ -104,7 +118,7 @@ func makeGatewayCARHandler(bsrv blockservice.BlockService, port int) (*http.Serv
return
}

carStream, err := simpleSelectorToCar(contentPath)
carStream, err := simpleSelectorToCar(ctx, bsrv, contentPath.String(), r.URL.Query())
if err != nil {
http.Error(w, "only the ipfs names is supported", http.StatusBadRequest)
return
Expand Down Expand Up @@ -147,8 +161,8 @@ func makeGatewayCARHandler(bsrv blockservice.BlockService, port int) (*http.Serv
}, nil
}

func simpleSelectorToCar(ipfsPath ipath.Path) (io.ReadCloser, error) {
pathSegs := strings.Split(ipfsPath.String(), "/")
func simpleSelectorToCar(ctx context.Context, bsrv blockservice.BlockService, p string, params url.Values) (io.ReadCloser, error) {
pathSegs := strings.Split(p, "/")
if len(pathSegs) < 3 || !(pathSegs[0] == "" && pathSegs[1] == "ipfs") {
return nil, fmt.Errorf("invalid path")
}
Expand All @@ -159,6 +173,11 @@ func simpleSelectorToCar(ipfsPath ipath.Path) (io.ReadCloser, error) {
return nil, err
}

ipfspath, err := path.ParsePath(p)
if err != nil {
return nil, err
}

r, w := io.Pipe()
// Setup header for the output car
err = car.WriteHeader(&car.CarHeader{
Expand All @@ -169,20 +188,230 @@ func simpleSelectorToCar(ipfsPath ipath.Path) (io.ReadCloser, error) {
return nil, fmt.Errorf("writing car header: %w", err)
}

rangeStr, hasRange := params.Get("bytes"), params.Has("bytes")
depthStr, hasDepth := params.Get("depth"), params.Has("depth")

if hasDepth && !(depthStr == "0" || depthStr == "1" || depthStr == "all") {
return nil, fmt.Errorf("depth type: %s not supported", depthStr)
}
var getRange *gateway.GetRange
if hasRange {
getRange, err = rangeStrToGetRange(rangeStr)
if err != nil {
return nil, err
}
}

go func() {
defer w.Close()
remainingPath := pathSegs[1:]
unixfile.NewUnixfsFile()
blockGetter := merkledag.NewDAGService(bsrv).Session(ctx)
blockGetter = &nodeGetterToCarExporer{
ng: blockGetter,
w: w,
mhSet: make(map[string]struct{}),
}
dsrv := merkledag.NewReadOnlyDagService(blockGetter)

// Setup the UnixFS resolver.
fetcherConfig := bsfetcher.NewFetcherConfig(bsrv)
fetcherConfig.PrototypeChooser = dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) {
if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok {
return tlnkNd.LinkTargetNodePrototype(), nil
}
return basicnode.Prototype.Any, nil
})
fetcher := fetcherConfig.WithReifier(unixfsnode.Reify)
r := resolver.NewBasicResolver(fetcher)

lastCid, remainder, err := r.ResolveToLastNode(ctx, ipfspath)
if err != nil {
goLog.Error(err)
return
}

if hasDepth && depthStr == "0" {
return
}

lastCidNode, err := dsrv.Get(ctx, lastCid)
if err != nil {
goLog.Error(err)
return
}

err = util.LdWrite(os.Stdout, block.Cid().Bytes(), block.RawData()) // write to the output car
ufsNode, err := unixfile.NewUnixfsFile(ctx, dsrv, lastCidNode)
if err != nil {
return fmt.Errorf("writing to output car: %w", err)
// It's not UnixFS

// If it's all fetch the graph recursively
if depthStr == "all" {
if err := merkledag.FetchGraph(ctx, lastCid, dsrv); err != nil {
goLog.Error(err)
}
return
}

//if not then either this is an error (which we can't report) or this is the last block for us to return
return
}
if f, ok := ufsNode.(files.File); ok {
if len(remainder) > 0 {
// this is an error, so we're done
return
}
if hasRange {
// TODO: testing + check off by one errors
var numToRead int64
if *getRange.To < 0 {
size, err := f.Seek(0, io.SeekEnd)
if err != nil {
return
}
numToRead = (size - *getRange.To) - int64(getRange.From)
} else {
numToRead = int64(getRange.From) - *getRange.To
}

if _, err := f.Seek(int64(getRange.From), io.SeekStart); err != nil {
return
}
_, _ = io.CopyN(io.Discard, f, numToRead)
return
}
} else if d, ok := ufsNode.(files.Directory); ok {
if depthStr == "1" {
for d.Entries().Next() {
}
return
}
if depthStr == "all" {
// TODO: being lazy here
w, err := files.NewTarWriter(io.Discard)
if err != nil {
goLog.Error(fmt.Errorf("could not create tar write %w", err))
return
}
if err := w.WriteFile(d, "tmp"); err != nil {
goLog.Error(err)
return
}
return
}
} else {
return
}
}()
_ = rootCid
return r, nil
}

type nodeGetterToCarExporer struct {
ng format.NodeGetter
w io.Writer

lk sync.RWMutex
mhSet map[string]struct{}
}

func (n *nodeGetterToCarExporer) Get(ctx context.Context, c cid.Cid) (format.Node, error) {
nd, err := n.ng.Get(ctx, c)
if err != nil {
return nil, err
}

if err := n.trySendBlock(nd); err != nil {
return nil, err
}

return nd, nil
}

func (n *nodeGetterToCarExporer) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption {
ndCh := n.ng.GetMany(ctx, cids)
outCh := make(chan *format.NodeOption)
go func() {
defer close(outCh)
for nd := range ndCh {
if nd.Err == nil {
if err := n.trySendBlock(nd.Node); err != nil {
select {
case outCh <- &format.NodeOption{Err: err}:
case <-ctx.Done():
}
return
}
select {
case outCh <- nd:
case <-ctx.Done():
}
}
}
}()
return outCh
}

func (n *nodeGetterToCarExporer) trySendBlock(block blocks.Block) error {
h := string(block.Cid().Hash())
n.lk.RLock()
_, found := n.mhSet[h]
n.lk.RUnlock()
if !found {
doSend := false
n.lk.Lock()
_, found := n.mhSet[h]
if !found {
doSend = true
n.mhSet[h] = struct{}{}
}
n.lk.Unlock()
if doSend {
err := util.LdWrite(n.w, block.Cid().Bytes(), block.RawData()) // write to the output car
if err != nil {
return fmt.Errorf("writing to output car: %w", err)
}
}
}
return nil
}

var _ format.NodeGetter = (*nodeGetterToCarExporer)(nil)

func rangeStrToGetRange(rangeStr string) (*gateway.GetRange, error) {
rangeElems := strings.Split(rangeStr, ":")
if len(rangeElems) > 2 {
return nil, fmt.Errorf("invalid range")
}
first, err := strconv.ParseUint(rangeElems[0], 10, 64)
if err != nil {
return nil, err
}

if rangeElems[1] == "*" {
return &gateway.GetRange{
From: first,
To: nil,
}, nil
}

second, err := strconv.ParseInt(rangeElems[1], 10, 64)
if err != nil {
return nil, err
}

if second < 0 {
// TODO: fix, might also require a fix in boxo/gateway
return nil, fmt.Errorf("unsupported")
}

if uint64(second) < first {
return nil, fmt.Errorf("invalid range")
}

return &gateway.GetRange{
From: first,
To: &second,
}, nil
}

func makeGatewayBlockHandler(bsrv blockservice.BlockService, port int) (*http.Server, error) {
mux := http.NewServeMux()
mux.HandleFunc("/ipfs/", func(w http.ResponseWriter, r *http.Request) {
Expand Down
17 changes: 14 additions & 3 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func init() {
rootCmd.Flags().Int("gateway-port", 8081, "gateway port")
rootCmd.Flags().Int("metrics-port", 8041, "metrics port")
rootCmd.Flags().String("car-blockstore", "", "a CAR file to use for serving data instead of network requests")
golog.SetLogLevel("bifrost-gateway-backend", "debug")
}

var rootCmd = &cobra.Command{
Expand Down Expand Up @@ -80,9 +81,19 @@ var rootCmd = &cobra.Command{

log.Printf("Starting %s %s", name, version)

gatewaySrv, err := makeGatewayBlockHandler(bsrv, gatewayPort)
if err != nil {
return err
var gatewaySrv *http.Server
var err error

if true {
gatewaySrv, err = makeGatewayCARHandler(bsrv, gatewayPort)
if err != nil {
return err
}
} else {
gatewaySrv, err = makeGatewayBlockHandler(bsrv, gatewayPort)
if err != nil {
return err
}
}

metricsSrv, err := makeMetricsHandler(metricsPort)
Expand Down

0 comments on commit 34e97fb

Please sign in to comment.