From e7c0c807bc1f0de1c5d47c61f2f1e31112507e87 Mon Sep 17 00:00:00 2001 From: gammazero Date: Fri, 31 Mar 2023 14:06:44 -0700 Subject: [PATCH 1/2] No internal packages All package in this repo are potentially useful by others and should be shared. - Expose test package. Has functionality commonly used in ipni testing. - Expose p2pclient package. Useful for creating p2p clients in addition to the ipni find client in this repo. Update version for newly available packages. --- dhash/dhash_test.go | 2 +- find/client/p2p/client.go | 13 ++- find/model/model_test.go | 2 +- ingest/model/ingest_request_test.go | 2 +- ingest/schema/envelope_test.go | 2 +- ingest/schema/types_test.go | 2 +- metadata/graphsync_filecoinv1_test.go | 4 +- metadata/metadata_test.go | 4 +- {internal/p2pclient => p2pclient}/client.go | 94 +++++++++++-------- {internal/p2pclient => p2pclient}/doc.go | 4 +- .../p2pclient => p2pclient}/message_writer.go | 0 {internal/test => test}/util.go | 35 +++---- version.json | 2 +- 13 files changed, 89 insertions(+), 77 deletions(-) rename {internal/p2pclient => p2pclient}/client.go (70%) rename {internal/p2pclient => p2pclient}/doc.go (52%) rename {internal/p2pclient => p2pclient}/message_writer.go (100%) rename {internal/test => test}/util.go (69%) diff --git a/dhash/dhash_test.go b/dhash/dhash_test.go index 80cd6f4..4448fac 100644 --- a/dhash/dhash_test.go +++ b/dhash/dhash_test.go @@ -6,7 +6,7 @@ import ( "math/rand" "testing" - "github.com/ipni/go-libipni/internal/test" + "github.com/ipni/go-libipni/test" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) diff --git a/find/client/p2p/client.go b/find/client/p2p/client.go index fd897a5..75384c3 100644 --- a/find/client/p2p/client.go +++ b/find/client/p2p/client.go @@ -9,7 +9,7 @@ import ( "github.com/ipni/go-libipni/find/client" "github.com/ipni/go-libipni/find/model" pb "github.com/ipni/go-libipni/find/pb" - "github.com/ipni/go-libipni/internal/p2pclient" + "github.com/ipni/go-libipni/p2pclient" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" @@ -128,12 +128,15 @@ func (c *Client) GetStats(ctx context.Context) (*model.Stats, error) { } func (c *Client) sendRecv(ctx context.Context, req *pb.FindMessage, expectRspType pb.FindMessage_MessageType) ([]byte, error) { + r := c.p2pc.SendRequest(ctx, req) + if r.Err != nil { + return nil, r.Err + } var resp pb.FindMessage - err := c.p2pc.SendRequest(ctx, req, func(data []byte) error { - return proto.Unmarshal(data, &resp) - }) + err := proto.Unmarshal(r.Data, &resp) + r.Close() if err != nil { - return nil, fmt.Errorf("failed to send request to indexer: %s", err) + return nil, err } if resp.GetType() != expectRspType { if resp.GetType() == pb.FindMessage_ERROR_RESPONSE { diff --git a/find/model/model_test.go b/find/model/model_test.go index a1a6513..87e1df8 100644 --- a/find/model/model_test.go +++ b/find/model/model_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/ipni/go-libipni/find/model" - "github.com/ipni/go-libipni/internal/test" + "github.com/ipni/go-libipni/test" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" diff --git a/ingest/model/ingest_request_test.go b/ingest/model/ingest_request_test.go index 6a6d1bc..63047b3 100644 --- a/ingest/model/ingest_request_test.go +++ b/ingest/model/ingest_request_test.go @@ -4,7 +4,7 @@ import ( "bytes" "testing" - "github.com/ipni/go-libipni/internal/test" + "github.com/ipni/go-libipni/test" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" p2ptest "github.com/libp2p/go-libp2p/core/test" diff --git a/ingest/schema/envelope_test.go b/ingest/schema/envelope_test.go index e6a6696..8e24e1f 100644 --- a/ingest/schema/envelope_test.go +++ b/ingest/schema/envelope_test.go @@ -11,7 +11,7 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/storage/memstore" stischema "github.com/ipni/go-libipni/ingest/schema" - "github.com/ipni/go-libipni/internal/test" + "github.com/ipni/go-libipni/test" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" p2ptest "github.com/libp2p/go-libp2p/core/test" diff --git a/ingest/schema/types_test.go b/ingest/schema/types_test.go index 5c5f1d8..aa4a7d4 100644 --- a/ingest/schema/types_test.go +++ b/ingest/schema/types_test.go @@ -14,7 +14,7 @@ import ( ipldSchema "github.com/ipld/go-ipld-prime/schema" "github.com/ipld/go-ipld-prime/storage/memstore" stischema "github.com/ipni/go-libipni/ingest/schema" - "github.com/ipni/go-libipni/internal/test" + "github.com/ipni/go-libipni/test" "github.com/multiformats/go-multicodec" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" diff --git a/metadata/graphsync_filecoinv1_test.go b/metadata/graphsync_filecoinv1_test.go index 930d039..ba4b7f7 100644 --- a/metadata/graphsync_filecoinv1_test.go +++ b/metadata/graphsync_filecoinv1_test.go @@ -3,15 +3,15 @@ package metadata_test import ( "testing" - "github.com/ipni/go-libipni/internal/test" "github.com/ipni/go-libipni/metadata" + "github.com/ipni/go-libipni/test" "github.com/multiformats/go-multicodec" "github.com/multiformats/go-varint" "github.com/stretchr/testify/require" ) func TestRoundTripDataTransferFilecoin(t *testing.T) { - cids := test.RandomCids(t, 4) + cids := test.RandomCids(4) filecoinV1Datas := []*metadata.GraphsyncFilecoinV1{ { PieceCID: cids[0], diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 32f3b90..2d03ece 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -4,15 +4,15 @@ import ( "math/rand" "testing" - "github.com/ipni/go-libipni/internal/test" "github.com/ipni/go-libipni/metadata" + "github.com/ipni/go-libipni/test" "github.com/multiformats/go-multicodec" "github.com/multiformats/go-varint" "github.com/stretchr/testify/require" ) func TestMetadata(t *testing.T) { - cids := test.RandomCids(t, 4) + cids := test.RandomCids(4) tests := []struct { name string givenTransports []metadata.Protocol diff --git a/internal/p2pclient/client.go b/p2pclient/client.go similarity index 70% rename from internal/p2pclient/client.go rename to p2pclient/client.go index 6390345..61f2ff4 100644 --- a/internal/p2pclient/client.go +++ b/p2pclient/client.go @@ -20,7 +20,7 @@ import ( ) // Client is responsible for sending requests and receiving responses to and -// from libp2p peers. Each instance of Client communicates with a single peer +// from libp2p peers. Each instance of Client communicates with a single peer // using a single protocolID. type Client struct { host host.Host @@ -32,25 +32,37 @@ type Client struct { sendLock sync.Mutex } -// DecodeResponseFunc is a function that is passed into this generic libp2p -// Client to decode a response message. This is needed because the generic -// client cannot decode the response message since the message is of a type -// only know to a specific libp2p client using this generic client. -type DecodeResponseFunc func([]byte) error +// Response is returned by SendRequest and contains the response to the +// request. It is the caller's responsibility to call Response.Close() after +// reading the data, to free the message buffer. +type Response struct { + Data []byte + Err error + msgReader msgio.Reader +} + +// Close frees the message buffer that holds the response data. +func (r *Response) Close() { + if r.Data != nil { + r.msgReader.ReleaseMsg(r.Data) + r.Data = nil + r.msgReader = nil + } +} const ( - // default port for libp2p client to connect to + // default IPNI port for libp2p client to connect to defaultLibp2pPort = 3003 // Timeout to wait for a response after a request is sent readMessageTimeout = 10 * time.Second ) // ErrReadTimeout is an error that occurs when no message is read within the -// timeout period +// timeout period. var ErrReadTimeout = fmt.Errorf("timed out reading response") -// New creates a new libp2pclient Client that communicates with a specific peer identified by -// protocolID. If host is nil, then one is created. +// New creates a new Client that communicates with a specific peer identified +// by protocolID. If host is nil, then one is created. func New(p2pHost host.Host, peerID peer.ID, protoID protocol.ID) (*Client, error) { // If no host was given, create one. var ownHost bool @@ -73,7 +85,7 @@ func New(p2pHost host.Host, peerID peer.ID, protoID protocol.ID) (*Client, error } // Connect connects the client to the host at the location specified by -// hostname. The value of hostname is a host or host:port, where the host is a +// hostname. The value of hostname is a host or host:port, where the host is a // hostname or IP address. func (c *Client) Connect(ctx context.Context, hostname string) error { port := defaultLibp2pPort @@ -127,7 +139,7 @@ func (c *Client) Self() peer.ID { return c.host.ID() } -// Close resets and closes the network stream if one exists, +// Close resets and closes the network stream if one exists. func (c *Client) Close() error { c.sendLock.Lock() defer c.sendLock.Unlock() @@ -137,32 +149,32 @@ func (c *Client) Close() error { } if c.ownHost { - if err := c.host.Close(); err != nil { - return err - } + return c.host.Close() } return nil } -// SendRequest sends out a request. -func (c *Client) SendRequest(ctx context.Context, msg proto.Message, decodeRsp DecodeResponseFunc) error { +// SendRequest sends out a request and reads a response. +func (c *Client) SendRequest(ctx context.Context, msg proto.Message) *Response { c.sendLock.Lock() defer c.sendLock.Unlock() err := c.sendMessage(ctx, msg) if err != nil { - return fmt.Errorf("cannot sent request: %w", err) + return &Response{ + Err: fmt.Errorf("cannot sent request: %w", err), + } } - if err = c.ctxReadMsg(ctx, decodeRsp); err != nil { + rsp := c.readResponse(ctx) + if rsp.Err != nil { c.closeStream() - return fmt.Errorf("cannot read response: %w", err) } - return nil + return rsp } -// SendMessage sends out a message +// SendMessage sends out a message. func (c *Client) SendMessage(ctx context.Context, msg proto.Message) error { c.sendLock.Lock() defer c.sendLock.Unlock() @@ -207,30 +219,38 @@ func (c *Client) closeStream() { c.r = nil } -func (c *Client) ctxReadMsg(ctx context.Context, decodeRsp DecodeResponseFunc) error { - done := make(chan struct{}) - var err error - go func(r msgio.ReadCloser) { - defer close(done) - var data []byte - data, err = r.ReadMsg() - defer r.ReleaseMsg(data) +func (c *Client) readResponse(ctx context.Context) *Response { + rspCh := make(chan *Response, 1) + go func(r msgio.ReadCloser, rsp chan<- *Response) { + data, err := r.ReadMsg() if err != nil { + if data != nil { + r.ReleaseMsg(data) + } + rsp <- &Response{ + Err: err, + } return } - err = decodeRsp(data) - }(c.r) + rsp <- &Response{ + Data: data, + msgReader: r, + } + }(c.r, rspCh) t := time.NewTimer(readMessageTimeout) defer t.Stop() select { - case <-done: + case response := <-rspCh: + return response case <-ctx.Done(): - return ctx.Err() + return &Response{ + Err: ctx.Err(), + } case <-t.C: - return ErrReadTimeout + return &Response{ + Err: ErrReadTimeout, + } } - - return err } diff --git a/internal/p2pclient/doc.go b/p2pclient/doc.go similarity index 52% rename from internal/p2pclient/doc.go rename to p2pclient/doc.go index f076add..0838592 100644 --- a/internal/p2pclient/doc.go +++ b/p2pclient/doc.go @@ -1,6 +1,6 @@ // Package p2pclient provides general libp2p client functionality // // This package supplies functionality to communicate raw data with libp2p -// peers, and is useful when building higher-level client that are responsible -// for processing the data send and and received using this package. +// peers. It is useful for building higher-level clients that process data +// which is sent and received by this package. package p2pclient diff --git a/internal/p2pclient/message_writer.go b/p2pclient/message_writer.go similarity index 100% rename from internal/p2pclient/message_writer.go rename to p2pclient/message_writer.go diff --git a/internal/test/util.go b/test/util.go similarity index 69% rename from internal/test/util.go rename to test/util.go index d44de7b..15324ec 100644 --- a/internal/test/util.go +++ b/test/util.go @@ -4,52 +4,51 @@ import ( "fmt" "math/rand" "sync/atomic" - "testing" "github.com/ipfs/go-cid" "github.com/ipni/go-libipni/ingest/schema" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/test" - "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" - "github.com/stretchr/testify/require" ) var globalSeed atomic.Int64 func RandomAddrs(n int) []string { rng := rand.New(rand.NewSource(globalSeed.Add(1))) - addrs := make([]string, n) for i := 0; i < n; i++ { - addrs[i] = fmt.Sprintf("/ip4/%d.%d.%d.%d/tcp/%d", rng.Int()%255, rng.Int()%255, rng.Int()%255, rng.Int()%255, rng.Int()%10751) + addrs[i] = fmt.Sprintf("/ip4/%d.%d.%d.%d/tcp/%d", rng.Int()%255, rng.Intn(254)+1, rng.Intn(254)+1, rng.Intn(254)+1, rng.Intn(48157)+1024) } return addrs } -func RandomCids(t testing.TB, n int) []cid.Cid { +func RandomCids(n int) []cid.Cid { rng := rand.New(rand.NewSource(globalSeed.Add(1))) - prefix := schema.Linkproto.Prefix - cids := make([]cid.Cid, n) for i := 0; i < n; i++ { b := make([]byte, 10*n) rng.Read(b) c, err := prefix.Sum(b) - require.NoError(t, err) + if err != nil { + panic(err) + } cids[i] = c } return cids } -func RandomIdentity(t *testing.T) (peer.ID, crypto.PrivKey, crypto.PubKey) { +func RandomIdentity() (peer.ID, crypto.PrivKey, crypto.PubKey) { privKey, pubKey, err := test.RandTestKeyPair(crypto.Ed25519, 256) - require.NoError(t, err) - + if err != nil { + panic(err) + } providerID, err := peer.IDFromPublicKey(pubKey) - require.NoError(t, err) + if err != nil { + panic(err) + } return providerID, privKey, pubKey } @@ -75,13 +74,3 @@ func RandomMultihashes(n int) []multihash.Multihash { } return mhashes } - -func StringToMultiaddrs(t *testing.T, addrs []string) []multiaddr.Multiaddr { - mAddrs := make([]multiaddr.Multiaddr, len(addrs)) - for i, addr := range addrs { - ma, err := multiaddr.NewMultiaddr(addr) - require.NoError(t, err) - mAddrs[i] = ma - } - return mAddrs -} diff --git a/version.json b/version.json index fd1e38a..4f0adde 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.0.2" + "version": "v0.0.3" } From 52c16ce876202afe50745430f3144ff787b0c37c Mon Sep 17 00:00:00 2001 From: gammazero Date: Fri, 31 Mar 2023 14:17:16 -0700 Subject: [PATCH 2/2] Only require go.1.19 --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 4a8ce0e..f042b52 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ipni/go-libipni -go 1.20 +go 1.19 require ( github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5