Skip to content

Commit

Permalink
Bleve server (#186)
Browse files Browse the repository at this point in the history
  • Loading branch information
timburks authored Apr 20, 2023
1 parent a275c34 commit 7cecddb
Show file tree
Hide file tree
Showing 7 changed files with 410 additions and 42 deletions.
35 changes: 35 additions & 0 deletions cmd/registry-experimental/cmd/bleve/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ This directory contains an experimental search implementation built with
https://blevesearch.com. Specs are indexed as full-text blobs and queried with
the Bleve default queries.

Note that all calls below require that `registry-experimental` be configured to
use a `registry-server` instance.

Index specs with the following, where PATTERN should match one or more specs:

```
Expand All @@ -18,3 +21,35 @@ Search the index with the following:
```
registry-experimental bleve search QUERY
```

Indexing and search are also available with a simple REST API that is provided
by `bleve serve`.

First run `registry-experimental bleve serve`. While it is running,
specs can be indexed and searched as follows:

Specs can be indexed by posting JSON to the `/index` endpoint:

```
curl http://localhost:8888/index \
-X POST \
-H "Content-Type: application/json" \
-d @- \
<<EOF
{
pattern: "projects/${PROJECT_ID}/locations/global/apis/-/versions/-/specs/-",
filter: "mime_type.contains('openapi')",
}
EOF
Note that the `filter` value is optional.
Specs be searched with `/search`:
```

curl http://localhost:8888/search?q=domain

```
This searches for specs containing the word "domain".
```
1 change: 1 addition & 0 deletions cmd/registry-experimental/cmd/bleve/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func Command() *cobra.Command {
}
cmd.AddCommand(indexCommand())
cmd.AddCommand(searchCommand())
cmd.AddCommand(serveCommand())
cmd.PersistentFlags().StringVar(&bleveDir, "bleve", "registry.bleve", "path to local bleve search index")
return cmd
}
102 changes: 98 additions & 4 deletions cmd/registry-experimental/cmd/bleve/bleve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/apigee/registry/cmd/registry/compress"
"github.com/apigee/registry/pkg/connection/grpctest"
Expand All @@ -33,10 +39,7 @@ func TestMain(m *testing.M) {
grpctest.TestMain(m, registry.Config{})
}

func TestSearch(t *testing.T) {
ctx := context.Background()
blevePath := filepath.Join(t.TempDir(), "registry.bleve")

func buildTestRegistry(ctx context.Context, t *testing.T) {
bookstore_protos, err := compress.ZipArchiveOfPath("testdata/examples", "testdata/", true)
if err != nil {
t.Fatalf("Failed to initialize search test: %s", err)
Expand Down Expand Up @@ -80,6 +83,14 @@ func TestSearch(t *testing.T) {
Contents: []byte("Hello, this is an http API."),
},
})
}

func TestSearch(t *testing.T) {
var err error
ctx := context.Background()
blevePath := filepath.Join(t.TempDir(), "registry.bleve")

buildTestRegistry(ctx, t)

cmd := Command()
cmd.SetArgs([]string{"index", "projects/search-test/locations/global/apis/-/versions/-/specs/-", "--bleve", blevePath})
Expand Down Expand Up @@ -181,3 +192,86 @@ func TestSearch(t *testing.T) {
}
})
}

func TestServer(t *testing.T) {
ctx := context.Background()
blevePath := filepath.Join(t.TempDir(), "registry.bleve")
port := "8891"

buildTestRegistry(ctx, t)

// Start the server.
go func() {
cmd := Command()
cmd.SetArgs([]string{"serve", "--bleve", blevePath, "--port", port})
if err := cmd.Execute(); err != nil {
log.Fatalf("Execute() with args %+v returned error: %s", cmd.Args, err)
}
}()

// Wait for the server to start.
_, err := net.DialTimeout("tcp", "localhost:"+port, 2*time.Second)
if err != nil {
log.Fatalf("Failed to connect to test server: %s", err)
}

// Call the indexing API.
t.Run("server-indexing", func(t *testing.T) {
postBody, err := json.Marshal(struct {
Pattern string `json:"pattern"`
Filter string `json:"string"`
}{
Pattern: "projects/search-test/locations/global/apis/-/versions/-/specs/-",
Filter: "mime_type.contains('openapi')",
})
if err != nil {
t.Fatalf("failed to create request %s", err)
}
request, err := http.NewRequest("POST", "http://localhost:"+port+"/index", bytes.NewBuffer(postBody))
if err != nil {
t.Fatalf("failed to create request %s", err)
}
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
client := &http.Client{}
response, err := client.Do(request)
if err != nil {
t.Fatalf("failed to call index API %s", err)
}
defer response.Body.Close()
if response.StatusCode != 200 {
t.Fatalf("unexpected code from index API %d", response.StatusCode)
}
fmt.Println("response Headers:", response.Header)
})

// Call the search API.
t.Run("server-search", func(t *testing.T) {
request, err := http.NewRequest("GET", "http://localhost:"+port+"/search?q=pet", nil)
if err != nil {
t.Fatalf("failed to create request %s", err)
}
client := &http.Client{}
response, error := client.Do(request)
if error != nil {
t.Fatalf("failed to call search API %s", err)
}
defer response.Body.Close()
if response.StatusCode != 200 {
t.Fatalf("unexpected code from search API %d", response.StatusCode)
}
fmt.Println("response Headers:", response.Header)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
t.Fatalf("failed to read search API response %s", err)
}
var searchResponse struct {
TotalHits int `json:"total_hits"`
}
if err = json.Unmarshal(body, &searchResponse); err != nil {
t.Fatalf("failed to read unmarshal API response %s", err)
}
if searchResponse.TotalHits != 1 {
t.Fatalf("failed to get expected number of hits (1), got %d", searchResponse.TotalHits)
}
})
}
126 changes: 101 additions & 25 deletions cmd/registry-experimental/cmd/bleve/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"github.com/apigee/registry/pkg/connection"
"github.com/apigee/registry/pkg/log"
"github.com/apigee/registry/pkg/mime"
"github.com/apigee/registry/pkg/names"
"github.com/apigee/registry/pkg/visitor"
"github.com/apigee/registry/rpc"
"github.com/blevesearch/bleve"
"github.com/spf13/cobra"
"google.golang.org/protobuf/proto"
)

var bleveMutex sync.Mutex
Expand All @@ -43,7 +43,7 @@ func indexCommand() *cobra.Command {
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
client, err := connection.NewRegistryClient(ctx)
registryClient, err := connection.NewRegistryClient(ctx)
if err != nil {
return fmt.Errorf("failed to get client: %s", err)
}
Expand All @@ -55,41 +55,60 @@ func indexCommand() *cobra.Command {
// Initialize task queue.
taskQueue, wait := tasks.WorkerPoolIgnoreError(ctx, jobs)
defer wait()
// Generate tasks.
if spec, err := names.ParseSpec(pattern); err == nil {
err = visitor.ListSpecs(ctx, client, spec, filter, false, func(ctx context.Context, spec *rpc.ApiSpec) error {
taskQueue <- &indexSpecTask{
client: client,
specName: spec.Name,
}
return nil
})
if err != nil {
return fmt.Errorf("failed to list specs: %s", err)
}
} else {
return fmt.Errorf("unsupported pattern: %s", pattern)
v := &indexVisitor{
taskQueue: taskQueue,
registryClient: registryClient,
}
return nil
return visitor.Visit(ctx, v, visitor.VisitorOptions{
RegistryClient: registryClient,
Pattern: pattern,
Filter: filter,
})
},
}
cmd.Flags().StringVar(&filter, "filter", "", "filter selected resources")
cmd.Flags().IntVarP(&jobs, "jobs", "j", 10, "number of actions to perform concurrently")
return cmd
}

type indexVisitor struct {
visitor.Unsupported
taskQueue chan<- tasks.Task
registryClient connection.RegistryClient
}

func (v *indexVisitor) SpecHandler() visitor.SpecHandler {
return func(ctx context.Context, message *rpc.ApiSpec) error {
v.taskQueue <- &indexSpecTask{
client: v.registryClient,
name: message.Name,
}
return nil
}
}

func (v *indexVisitor) ArtifactHandler() visitor.ArtifactHandler {
return func(ctx context.Context, message *rpc.Artifact) error {
v.taskQueue <- &indexArtifactTask{
client: v.registryClient,
name: message.Name,
}
return nil
}
}

type indexSpecTask struct {
client connection.RegistryClient
specName string
client connection.RegistryClient
name string
}

func (task *indexSpecTask) String() string {
return "index " + task.specName
return "index " + task.name
}

func (task *indexSpecTask) Run(ctx context.Context) error {
request := &rpc.GetApiSpecRequest{
Name: task.specName,
Name: task.name,
}
spec, err := task.client.GetApiSpec(ctx, request)
if err != nil {
Expand All @@ -107,7 +126,7 @@ func (task *indexSpecTask) Run(ctx context.Context) error {
mime.IsOpenAPIv3(spec.GetMimeType()) ||
mime.IsDiscovery(spec.GetMimeType()):
message = map[string]string{spec.GetFilename(): string(data)}
case mime.IsProto(spec.GetMimeType()):
case mime.IsZipArchive(spec.GetMimeType()):
m, err := compress.UnzipArchiveToMap(data)
if err != nil {
return err
Expand All @@ -119,9 +138,66 @@ func (task *indexSpecTask) Run(ctx context.Context) error {
}
message = m2
default:
return fmt.Errorf("unable to generate descriptor for style %s", spec.GetMimeType())
return fmt.Errorf("unable to index style %s", spec.GetMimeType())
}

// The bleve index requires serialized updates.
bleveMutex.Lock()
defer bleveMutex.Unlock()
// Open the index, creating a new one if necessary.
index, err := bleve.Open(bleveDir)
if err != nil {
mapping := bleve.NewIndexMapping()
index, err = bleve.New(bleveDir, mapping)
if err != nil {
return err
}
}
defer index.Close()
// Index the spec.
log.Infof(ctx, "Indexing %s", task.name)
return index.Index(task.name, message)
}

type indexArtifactTask struct {
client connection.RegistryClient
name string
}

func (task *indexArtifactTask) String() string {
return "index " + task.name
}

func (task *indexArtifactTask) Run(ctx context.Context) error {
request := &rpc.GetArtifactRequest{
Name: task.name,
}
artifact, err := task.client.GetArtifact(ctx, request)
if err != nil {
return err
}
messageType, err := mime.MessageTypeForMimeType(artifact.GetMimeType())
if err != nil {
return err
}
switch messageType {
case "google.cloud.apigeeregistry.v1.apihub.FieldSet":
// supported
default:
return fmt.Errorf("unable to index type %s", messageType)
}
err = visitor.FetchArtifactContents(ctx, task.client, artifact)
if err != nil {
return nil
}
message, err := mime.MessageForMimeType(artifact.GetMimeType())
if err != nil {
return nil
}
err = proto.Unmarshal(artifact.GetContents(), message)
if err != nil {
return nil
}
// The bleve index requires serialized updates.
bleveMutex.Lock()
defer bleveMutex.Unlock()
Expand All @@ -136,6 +212,6 @@ func (task *indexSpecTask) Run(ctx context.Context) error {
}
defer index.Close()
// Index the spec.
log.Debugf(ctx, "Indexing %s", task.specName)
return index.Index(task.specName, message)
log.Infof(ctx, "Indexing %s", task.name)
return index.Index(task.name, message)
}
Loading

0 comments on commit 7cecddb

Please sign in to comment.