Skip to content

Commit

Permalink
Merge pull request #12 from ipni/no-internal-pkgs
Browse files Browse the repository at this point in the history
No internal packages
  • Loading branch information
gammazero authored Mar 31, 2023
2 parents 8bfbc24 + 52c16ce commit f35bb8a
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 78 deletions.
2 changes: 1 addition & 1 deletion dhash/dhash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"math/rand"
"testing"

"github.com/ipni/go-libipni/internal/test"
"github.com/ipni/go-libipni/test"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)
Expand Down
13 changes: 8 additions & 5 deletions find/client/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/ipni/go-libipni/find/client"
"github.com/ipni/go-libipni/find/model"
pb "github.com/ipni/go-libipni/find/pb"
"github.com/ipni/go-libipni/internal/p2pclient"
"github.com/ipni/go-libipni/p2pclient"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -128,12 +128,15 @@ func (c *Client) GetStats(ctx context.Context) (*model.Stats, error) {
}

func (c *Client) sendRecv(ctx context.Context, req *pb.FindMessage, expectRspType pb.FindMessage_MessageType) ([]byte, error) {
r := c.p2pc.SendRequest(ctx, req)
if r.Err != nil {
return nil, r.Err
}
var resp pb.FindMessage
err := c.p2pc.SendRequest(ctx, req, func(data []byte) error {
return proto.Unmarshal(data, &resp)
})
err := proto.Unmarshal(r.Data, &resp)
r.Close()
if err != nil {
return nil, fmt.Errorf("failed to send request to indexer: %s", err)
return nil, err
}
if resp.GetType() != expectRspType {
if resp.GetType() == pb.FindMessage_ERROR_RESPONSE {
Expand Down
2 changes: 1 addition & 1 deletion find/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"

"github.com/ipni/go-libipni/find/model"
"github.com/ipni/go-libipni/internal/test"
"github.com/ipni/go-libipni/test"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ipni/go-libipni

go 1.20
go 1.19

require (
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5
Expand Down
2 changes: 1 addition & 1 deletion ingest/model/ingest_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"testing"

"github.com/ipni/go-libipni/internal/test"
"github.com/ipni/go-libipni/test"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
p2ptest "github.com/libp2p/go-libp2p/core/test"
Expand Down
2 changes: 1 addition & 1 deletion ingest/schema/envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/memstore"
stischema "github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/go-libipni/internal/test"
"github.com/ipni/go-libipni/test"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
p2ptest "github.com/libp2p/go-libp2p/core/test"
Expand Down
2 changes: 1 addition & 1 deletion ingest/schema/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
ipldSchema "github.com/ipld/go-ipld-prime/schema"
"github.com/ipld/go-ipld-prime/storage/memstore"
stischema "github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/go-libipni/internal/test"
"github.com/ipni/go-libipni/test"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
Expand Down
4 changes: 2 additions & 2 deletions metadata/graphsync_filecoinv1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package metadata_test
import (
"testing"

"github.com/ipni/go-libipni/internal/test"
"github.com/ipni/go-libipni/metadata"
"github.com/ipni/go-libipni/test"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-varint"
"github.com/stretchr/testify/require"
)

func TestRoundTripDataTransferFilecoin(t *testing.T) {
cids := test.RandomCids(t, 4)
cids := test.RandomCids(4)
filecoinV1Datas := []*metadata.GraphsyncFilecoinV1{
{
PieceCID: cids[0],
Expand Down
4 changes: 2 additions & 2 deletions metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"math/rand"
"testing"

"github.com/ipni/go-libipni/internal/test"
"github.com/ipni/go-libipni/metadata"
"github.com/ipni/go-libipni/test"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-varint"
"github.com/stretchr/testify/require"
)

func TestMetadata(t *testing.T) {
cids := test.RandomCids(t, 4)
cids := test.RandomCids(4)
tests := []struct {
name string
givenTransports []metadata.Protocol
Expand Down
94 changes: 57 additions & 37 deletions internal/p2pclient/client.go → p2pclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

// Client is responsible for sending requests and receiving responses to and
// from libp2p peers. Each instance of Client communicates with a single peer
// from libp2p peers. Each instance of Client communicates with a single peer
// using a single protocolID.
type Client struct {
host host.Host
Expand All @@ -32,25 +32,37 @@ type Client struct {
sendLock sync.Mutex
}

// DecodeResponseFunc is a function that is passed into this generic libp2p
// Client to decode a response message. This is needed because the generic
// client cannot decode the response message since the message is of a type
// only know to a specific libp2p client using this generic client.
type DecodeResponseFunc func([]byte) error
// Response is returned by SendRequest and contains the response to the
// request. It is the caller's responsibility to call Response.Close() after
// reading the data, to free the message buffer.
type Response struct {
Data []byte
Err error
msgReader msgio.Reader
}

// Close frees the message buffer that holds the response data.
func (r *Response) Close() {
if r.Data != nil {
r.msgReader.ReleaseMsg(r.Data)
r.Data = nil
r.msgReader = nil
}
}

const (
// default port for libp2p client to connect to
// default IPNI port for libp2p client to connect to
defaultLibp2pPort = 3003
// Timeout to wait for a response after a request is sent
readMessageTimeout = 10 * time.Second
)

// ErrReadTimeout is an error that occurs when no message is read within the
// timeout period
// timeout period.
var ErrReadTimeout = fmt.Errorf("timed out reading response")

// New creates a new libp2pclient Client that communicates with a specific peer identified by
// protocolID. If host is nil, then one is created.
// New creates a new Client that communicates with a specific peer identified
// by protocolID. If host is nil, then one is created.
func New(p2pHost host.Host, peerID peer.ID, protoID protocol.ID) (*Client, error) {
// If no host was given, create one.
var ownHost bool
Expand All @@ -73,7 +85,7 @@ func New(p2pHost host.Host, peerID peer.ID, protoID protocol.ID) (*Client, error
}

// Connect connects the client to the host at the location specified by
// hostname. The value of hostname is a host or host:port, where the host is a
// hostname. The value of hostname is a host or host:port, where the host is a
// hostname or IP address.
func (c *Client) Connect(ctx context.Context, hostname string) error {
port := defaultLibp2pPort
Expand Down Expand Up @@ -127,7 +139,7 @@ func (c *Client) Self() peer.ID {
return c.host.ID()
}

// Close resets and closes the network stream if one exists,
// Close resets and closes the network stream if one exists.
func (c *Client) Close() error {
c.sendLock.Lock()
defer c.sendLock.Unlock()
Expand All @@ -137,32 +149,32 @@ func (c *Client) Close() error {
}

if c.ownHost {
if err := c.host.Close(); err != nil {
return err
}
return c.host.Close()
}
return nil
}

// SendRequest sends out a request.
func (c *Client) SendRequest(ctx context.Context, msg proto.Message, decodeRsp DecodeResponseFunc) error {
// SendRequest sends out a request and reads a response.
func (c *Client) SendRequest(ctx context.Context, msg proto.Message) *Response {
c.sendLock.Lock()
defer c.sendLock.Unlock()

err := c.sendMessage(ctx, msg)
if err != nil {
return fmt.Errorf("cannot sent request: %w", err)
return &Response{
Err: fmt.Errorf("cannot sent request: %w", err),
}
}

if err = c.ctxReadMsg(ctx, decodeRsp); err != nil {
rsp := c.readResponse(ctx)
if rsp.Err != nil {
c.closeStream()
return fmt.Errorf("cannot read response: %w", err)
}

return nil
return rsp
}

// SendMessage sends out a message
// SendMessage sends out a message.
func (c *Client) SendMessage(ctx context.Context, msg proto.Message) error {
c.sendLock.Lock()
defer c.sendLock.Unlock()
Expand Down Expand Up @@ -207,30 +219,38 @@ func (c *Client) closeStream() {
c.r = nil
}

func (c *Client) ctxReadMsg(ctx context.Context, decodeRsp DecodeResponseFunc) error {
done := make(chan struct{})
var err error
go func(r msgio.ReadCloser) {
defer close(done)
var data []byte
data, err = r.ReadMsg()
defer r.ReleaseMsg(data)
func (c *Client) readResponse(ctx context.Context) *Response {
rspCh := make(chan *Response, 1)
go func(r msgio.ReadCloser, rsp chan<- *Response) {
data, err := r.ReadMsg()
if err != nil {
if data != nil {
r.ReleaseMsg(data)
}
rsp <- &Response{
Err: err,
}
return
}
err = decodeRsp(data)
}(c.r)
rsp <- &Response{
Data: data,
msgReader: r,
}
}(c.r, rspCh)

t := time.NewTimer(readMessageTimeout)
defer t.Stop()

select {
case <-done:
case response := <-rspCh:
return response
case <-ctx.Done():
return ctx.Err()
return &Response{
Err: ctx.Err(),
}
case <-t.C:
return ErrReadTimeout
return &Response{
Err: ErrReadTimeout,
}
}

return err
}
4 changes: 2 additions & 2 deletions internal/p2pclient/doc.go → p2pclient/doc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package p2pclient provides general libp2p client functionality
//
// This package supplies functionality to communicate raw data with libp2p
// peers, and is useful when building higher-level client that are responsible
// for processing the data send and and received using this package.
// peers. It is useful for building higher-level clients that process data
// which is sent and received by this package.
package p2pclient
File renamed without changes.
35 changes: 12 additions & 23 deletions internal/test/util.go → test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,51 @@ import (
"fmt"
"math/rand"
"sync/atomic"
"testing"

"github.com/ipfs/go-cid"
"github.com/ipni/go-libipni/ingest/schema"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/test"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)

var globalSeed atomic.Int64

func RandomAddrs(n int) []string {
rng := rand.New(rand.NewSource(globalSeed.Add(1)))

addrs := make([]string, n)
for i := 0; i < n; i++ {
addrs[i] = fmt.Sprintf("/ip4/%d.%d.%d.%d/tcp/%d", rng.Int()%255, rng.Int()%255, rng.Int()%255, rng.Int()%255, rng.Int()%10751)
addrs[i] = fmt.Sprintf("/ip4/%d.%d.%d.%d/tcp/%d", rng.Int()%255, rng.Intn(254)+1, rng.Intn(254)+1, rng.Intn(254)+1, rng.Intn(48157)+1024)
}
return addrs
}

func RandomCids(t testing.TB, n int) []cid.Cid {
func RandomCids(n int) []cid.Cid {
rng := rand.New(rand.NewSource(globalSeed.Add(1)))

prefix := schema.Linkproto.Prefix

cids := make([]cid.Cid, n)
for i := 0; i < n; i++ {
b := make([]byte, 10*n)
rng.Read(b)
c, err := prefix.Sum(b)
require.NoError(t, err)
if err != nil {
panic(err)
}
cids[i] = c
}
return cids
}

func RandomIdentity(t *testing.T) (peer.ID, crypto.PrivKey, crypto.PubKey) {
func RandomIdentity() (peer.ID, crypto.PrivKey, crypto.PubKey) {
privKey, pubKey, err := test.RandTestKeyPair(crypto.Ed25519, 256)
require.NoError(t, err)

if err != nil {
panic(err)
}
providerID, err := peer.IDFromPublicKey(pubKey)
require.NoError(t, err)
if err != nil {
panic(err)
}
return providerID, privKey, pubKey
}

Expand All @@ -75,13 +74,3 @@ func RandomMultihashes(n int) []multihash.Multihash {
}
return mhashes
}

func StringToMultiaddrs(t *testing.T, addrs []string) []multiaddr.Multiaddr {
mAddrs := make([]multiaddr.Multiaddr, len(addrs))
for i, addr := range addrs {
ma, err := multiaddr.NewMultiaddr(addr)
require.NoError(t, err)
mAddrs[i] = ma
}
return mAddrs
}
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.0.2"
"version": "v0.0.3"
}

0 comments on commit f35bb8a

Please sign in to comment.