Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for custom collectors #307

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
257 changes: 257 additions & 0 deletions apstra/api_custom_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
package apstra

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
)

const (
apiUrlCollectors = "/api/telemetry/collectors"
apiUrlCollectorsByServiceName = apiUrlCollectors + apiUrlPathDelim + "%s"
)

//var (
// _ json.Marshaler = (*Collector)(nil)
// _ json.Unmarshaler = (*Collector)(nil)
//)

type CollectorPlatform struct {
OsType CollectorOSType
OsVersion CollectorOSVersion
OsFamily []CollectorOSVariant
Model string
}

func (o *CollectorPlatform) UnmarshalJSON(data []byte) error {
var raw struct {
OsType string `json:"os_type"`
OsVersion string `json:"os_version"`
OsFamily string `json:"family"`
Model string `json:"model"`
}

err := json.Unmarshal(data, &raw)
if err != nil {
return err
}

err = o.OsType.FromString(raw.OsType)
if err != nil {
return err
}

err = o.OsVersion.FromString(raw.OsVersion)
if err != nil {
return err
}

o.Model = raw.Model

for _, v := range strings.Split(raw.OsFamily, ",") {
var variant CollectorOSVariant
err = variant.FromString(v)
if err != nil {
return err
}
o.OsFamily = append(o.OsFamily, variant)
}

return nil
}

func (o *CollectorPlatform) MarshalJSON() ([]byte, error) {
var raw struct {
OsType string `json:"os_type"`
OsVersion string `json:"os_version"`
OsFamily string `json:"family"`
Model string `json:"model"`
}
raw.OsType = o.OsType.String()
raw.OsVersion = o.OsVersion.String()
raw.Model = o.Model
raw.OsFamily = o.OsFamily[0].String()
for _, v := range o.OsFamily[1:] {
raw.OsFamily = raw.OsFamily + "," + v.String()
}
return json.Marshal(raw)
}

type Query struct {
rajagopalans marked this conversation as resolved.
Show resolved Hide resolved
Accessors map[string]string `json:"accessors"`
Keys map[string]string `json:"keys"`
Value string `json:"value"`
Filter string `json:"filter"`
}
type Collector struct {
ServiceName string
Platform CollectorPlatform `json:"platform"`
SourceType string `json:"source_type"`
Cli string `json:"cli"`
Query Query `json:"query"`
RelaxedSchemaValidation bool `json:"relaxed_schema_validation"`
}

// GetAllCollectors gets all the Collectors for all services
func (o *Client) GetAllCollectors(ctx context.Context) ([]Collector, error) {
var response struct {
Items map[string]interface{} `json:"items"`
}
var collectors []Collector
// First Reach out to /collectors , we are interested really only in the keys, so that we can pull the collectors
err := o.talkToApstra(ctx, &talkToApstraIn{
method: http.MethodGet,
urlStr: fmt.Sprintf(apiUrlCollectors),
apiResponse: &response,
})
if err != nil {
return nil, convertTtaeToAceWherePossible(err)
}

for k := range response.Items {
cs, err := o.GetCollectorsByServiceName(ctx, k)
if err != nil {
return nil, err
}
for _, v := range cs {
v.ServiceName = k
collectors = append(collectors, v)
}
}
return collectors, nil
}

// GetCollectorsByServiceName gets all the Collectors that correspond to a particular service
func (o *Client) GetCollectorsByServiceName(ctx context.Context, name string) ([]Collector, error) {
var ace ClientErr
var Response struct {
Items []Collector `json:"items"`
}
err := o.talkToApstra(ctx, &talkToApstraIn{
method: http.MethodGet,
urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, name),
apiResponse: &Response,
})

if err != nil {
err = convertTtaeToAceWherePossible(err)
if errors.As(err, &ace) && ace.Type() == ErrNotfound {
return Response.Items, nil
rajagopalans marked this conversation as resolved.
Show resolved Hide resolved
}
return nil, err
}

for i := range Response.Items {
Response.Items[i].ServiceName = name
}
return Response.Items, nil
}

// CreateCollector creates a collector
func (o *Client) CreateCollector(ctx context.Context, in *Collector) error {
// Check if this is the first collector for that service name
cs, err := o.GetCollectorsByServiceName(ctx, in.ServiceName)
rajagopalans marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
var Request struct {
ServiceName string `json:"service_name"`
Items []Collector `json:"collectors"`
}
Request.ServiceName = in.ServiceName
Request.Items = append(Request.Items, *in)
// This is the first collector for this service name
// So we POST
if len(cs) == 0 {
err = o.talkToApstra(ctx, &talkToApstraIn{
method: http.MethodPost,
urlStr: fmt.Sprintf(apiUrlCollectors),
apiInput: &Request,
})
return err
}

// There are other collectors, so this is a patch
err = o.talkToApstra(ctx, &talkToApstraIn{
method: http.MethodPatch,
urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, in.ServiceName),
apiInput: &Request,
})
return err
rajagopalans marked this conversation as resolved.
Show resolved Hide resolved
}

// Update Collector Updates a collector
rajagopalans marked this conversation as resolved.
Show resolved Hide resolved
func (o *Client) UpdateCollector(ctx context.Context, in *Collector) error {
var Request struct {
Items []Collector `json:"collectors"`
}
Request.Items = append(Request.Items, *in)
return o.talkToApstra(ctx, &talkToApstraIn{
method: http.MethodPatch,
urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, in.ServiceName),
apiInput: &Request,
})
}

// DeleteAllCollectorsInService deletes all the collectors under a service
func (o *Client) DeleteAllCollectorsInService(ctx context.Context, name string) error {
return o.talkToApstra(ctx, &talkToApstraIn{
rajagopalans marked this conversation as resolved.
Show resolved Hide resolved
method: http.MethodDelete,
urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, name),
})
}

func (p1 *CollectorPlatform) Equals(p2 *CollectorPlatform) bool {
if p1.OsType != p2.OsType {
return false
}
if p1.OsVersion != p2.OsVersion {
return false
}
if p1.Model != p2.Model {
return false
}
if len(p1.OsFamily) != len(p2.OsFamily) {
return false
}

m := make(map[CollectorOSVariant]bool, len(p1.OsFamily))
for _, v := range p1.OsFamily {
m[v] = true
}
for _, v := range p2.OsFamily {
_, ok := m[v]
if !ok {
return false
}
}
return true
}

// DeleteCollector deletes the collector described in the object
func (o *Client) DeleteCollector(ctx context.Context, in *Collector) error {
var Request struct {
ServiceName string `json:"service_name"`
Items []Collector `json:"collectors"`
}

cs, err := o.GetCollectorsByServiceName(ctx, in.ServiceName)
if err != nil {
return err
}

Request.ServiceName = in.ServiceName
for _, c := range cs {
if !c.Platform.Equals(&in.Platform) {
Request.Items = append(Request.Items, c)
}
}
return o.talkToApstra(ctx, &talkToApstraIn{
rajagopalans marked this conversation as resolved.
Show resolved Hide resolved
method: http.MethodPut,
urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, in.ServiceName),
apiInput: &Request,
})
}
141 changes: 141 additions & 0 deletions apstra/api_custom_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//go:build integration
// +build integration

package apstra

import (
"context"
"github.com/stretchr/testify/require"
"log"
"testing"
)

func TestCollector(t *testing.T) {
ctx := context.Background()

clients, err := getTestClients(ctx, t)
require.NoError(t, err)

for clientName, client := range clients {
log.Printf("Testing Custom Collectors against %s %s (%s)", client.clientType, clientName, client.client.ApiVersion())

ts, err := client.client.GetAllTelemetryServiceRegistryEntries(ctx)
for _, tsr := range ts {
c, err := client.client.GetCollectorsByServiceName(ctx, tsr.ServiceName)
if err != nil {
t.Fatalf(err.Error())
}
for _, d := range c {
log.Printf("%v", d)
}
}

name := randString(10, "hex")
schema := `{
"properties": {
"key": {
"properties": {
"schemakey1": {
"type": "string"
}
},
"required": [
"schemakey1"
],
"type": "object"
},
"value": {
"type": "string"
}
},
"required": [
"key",
"value"
],
"type": "object"
}`

entry := TelemetryServiceRegistryEntry{
ServiceName: name,
StorageSchemaPath: StorageSchemaPathIBA_STRING_DATA,
ApplicationSchema: []byte(schema),
Builtin: false,
Description: "Test Service %s",
}
ServiceName, err := client.client.CreateTelemetryServiceRegistryEntry(ctx, &entry)
log.Printf("Service Name %s Created ", ServiceName)
require.NoError(t, err)
cs, err := client.client.GetCollectorsByServiceName(ctx, name)
require.NoError(t, err)
if len(cs) != 0 {
log.Println("There should be no collectors, this is a new service")
}

c1 := Collector{
ServiceName: name,
Platform: CollectorPlatform{
OsType: CollectorOSTypeJunosEvo,
OsVersion: CollectorOSVersion22_2r2,
OsFamily: []CollectorOSVariant{CollectorOSVariantACX},
Model: "",
},
SourceType: "cli",
Cli: "cli show interfaces extensive",
Query: Query{
Accessors: map[string]string{"telemetrykey1": "/interface-information/docsis-information/docsis-media-properties/downstream-buffers-free"},
Keys: map[string]string{"schemakey1": "telemetrykey1"},
Value: "telemetrykey1",
Filter: "",
},
RelaxedSchemaValidation: true,
}

err = client.client.CreateCollector(ctx, &c1)
require.NoError(t, err)

cs, err = client.client.GetCollectorsByServiceName(ctx, name)
require.NoError(t, err)
if len(cs) != 1 {
log.Printf("There should be one collector, got %d", len(cs))
}

c1.Platform.OsFamily = []CollectorOSVariant{CollectorOSVariantACX_F, CollectorOSVariantJunos}
err = client.client.CreateCollector(ctx, &c1)
require.NoError(t, err)
cs, err = client.client.GetCollectorsByServiceName(ctx, name)
require.NoError(t, err)
if len(cs) != 2 {
log.Printf("There should be two collectors, got %d", len(cs))
}

c1.Query.Accessors["telemetrykey1"] = "/interface-information/docsis-information/docsis-media-properties/downstream-buffers-used"
err = client.client.UpdateCollector(ctx, &c1)
require.NoError(t, err)
cs, err = client.client.GetCollectorsByServiceName(ctx, name)
require.NoError(t, err)
if len(cs) != 2 {
log.Printf("There should be two collectors, got %d", len(cs))
}

err = client.client.DeleteCollector(ctx, &c1)
require.NoError(t, err)
cs, err = client.client.GetCollectorsByServiceName(ctx, name)
require.NoError(t, err)
if len(cs) != 1 {
log.Printf("There should be one collector, got %d", len(cs))
}

err = client.client.DeleteAllCollectorsInService(ctx, name)
require.NoError(t, err)

cs, err = client.client.GetCollectorsByServiceName(ctx, name)
require.NoError(t, err)
if len(cs) != 0 {
log.Println("There should be no collectors, this is a new service")
}

err = client.client.DeleteTelemetryServiceRegistryEntry(ctx, name)
require.NoError(t, err)

}
}
Loading
Loading