Skip to content

Commit

Permalink
feat(logs): add tag attribute autocomplete for logs (SigNoz#2404)
Browse files Browse the repository at this point in the history
* chore: add payload types for autocomplete requests

* chore: update the query params file location and payload

* chore: add query range v3 API request/response payload types

* feat: metric attribute autocomplete for the aggregation type

* feat: add attrs filters autocomplete endpoints

* feat(logs): add tag attribute autocomplete for logs

* chore: added support for multiple datatype in value suggestion api for attributes

* feat: int64/float64 added for AttributeKeyDataType along with validation

* feat: filterAttributeValueResponse type updated

* fix: number type updated and query updated

* feat: remove tagType in keys autocomplete

* feat: return isColumn value correctly for attibute keys

* Update pkg/query-service/app/clickhouseReader/reader.go

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>

* fix: don't skip empty strings in value autocomplete

* fix: allow empty string search

* feat: add top level column names of logs in key sugestion

* fix: tagType column removed

* feat: get attribute values from logs table for top level fields

* feat: don't throw error if dataType and tagType is not present

* feat: timerange select corrected

* feat: autocomplete for int/float added

* fix: reverted attributeValueResponse change

* fix: null values handled for int and float

* feat: add support for get log aggreagte attributes

* feat: aggreate attribute logic updated and body added in keys autocomplete

* fix: constants updaetd

* fix: body type updated and empty response for noop and count

* fix: isColumn logic updated

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
Co-authored-by: Vishal Sharma <makeavish786@gmail.com>
Co-authored-by: palashgdev <palashgdev@gmail.com>
  • Loading branch information
4 people committed Apr 6, 2023
1 parent 5f73a82 commit d092905
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 16 deletions.
3 changes: 3 additions & 0 deletions pkg/query-service/app/clickhouseReader/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
defaultLogsLocalTable string = "logs"
defaultLogAttributeKeysTable string = "distributed_logs_attribute_keys"
defaultLogResourceKeysTable string = "distributed_logs_resource_keys"
defaultLogTagAttributeTable string = "distributed_tag_attributes"
defaultLiveTailRefreshSeconds int = 10
defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000
Expand Down Expand Up @@ -69,6 +70,7 @@ type namespaceConfig struct {
LogsLocalTable string
LogsAttributeKeysTable string
LogsResourceKeysTable string
LogsTagAttributeTable string
LiveTailRefreshSeconds int
WriteBatchDelay time.Duration
WriteBatchSize int
Expand Down Expand Up @@ -137,6 +139,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
LogsLocalTable: defaultLogsLocalTable,
LogsAttributeKeysTable: defaultLogAttributeKeysTable,
LogsResourceKeysTable: defaultLogResourceKeysTable,
LogsTagAttributeTable: defaultLogTagAttributeTable,
LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds,
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Expand Down
248 changes: 246 additions & 2 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clickhouseReader
import (
"bytes"
"context"
"database/sql"
"encoding/json"

"fmt"
Expand Down Expand Up @@ -101,6 +102,7 @@ type ClickHouseReader struct {
logsLocalTable string
logsAttributeKeys string
logsResourceKeys string
logsTagAttributeTable string
queryEngine *promql.Engine
remoteStorage *remote.Storage
fanoutStorage *storage.Storage
Expand Down Expand Up @@ -150,6 +152,7 @@ func NewReader(localDB *sqlx.DB, configFile string, featureFlag interfaces.Featu
logsLocalTable: options.primary.LogsLocalTable,
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
logsResourceKeys: options.primary.LogsResourceKeysTable,
logsTagAttributeTable: options.primary.LogsTagAttributeTable,
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
promConfigFile: configFile,
featureFlags: featureFlag,
Expand Down Expand Up @@ -3385,14 +3388,18 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe
func extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) {
for _, field := range *fields {
field.Type = fieldType
if strings.Contains(tableStatement, fmt.Sprintf("INDEX %s_idx", field.Name)) {
if isSelectedField(tableStatement, field.Name) {
response.Selected = append(response.Selected, field)
} else {
response.Interesting = append(response.Interesting, field)
}
}
}

func isSelectedField(tableStatement, field string) bool {
return strings.Contains(tableStatement, fmt.Sprintf("INDEX %s_idx", field))
}

func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError {
// if a field is selected it means that the field needs to be indexed
if field.Selected {
Expand Down Expand Up @@ -3706,7 +3713,7 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, req
}
key := v3.AttributeKey{
Key: metricName,
DataType: v3.AttributeKeyDataTypeNumber,
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyTypeTag,
}
response.AttributeKeys = append(response.AttributeKeys, key)
Expand Down Expand Up @@ -3782,6 +3789,243 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3
return &attributeValues, nil
}

func isColumn(tableStatement, field string) bool {
return strings.Contains(tableStatement, fmt.Sprintf("`%s` ", field))
}

func (r *ClickHouseReader) GetLogAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {

var query string
var err error
var rows driver.Rows
var response v3.AggregateAttributeResponse

where := ""
switch req.Operator {
case v3.AggregateOperatorCountDistinct:
where = "tagKey ILIKE $1"
case
v3.AggregateOperatorRateSum,
v3.AggregateOperatorRateMax,
v3.AggregateOperatorRateAvg,
v3.AggregateOperatorRate,
v3.AggregateOperatorRateMin,
v3.AggregateOperatorP05,
v3.AggregateOperatorP10,
v3.AggregateOperatorP20,
v3.AggregateOperatorP25,
v3.AggregateOperatorP50,
v3.AggregateOperatorP75,
v3.AggregateOperatorP90,
v3.AggregateOperatorP95,
v3.AggregateOperatorP99,
v3.AggregateOperatorAvg,
v3.AggregateOperatorSum,
v3.AggregateOperatorMin,
v3.AggregateOperatorMax:
where = "tagKey ILIKE $1 AND (tagDataType='int64' or tagDataType='float64')"
case
v3.AggregateOpeatorCount,
v3.AggregateOperatorNoOp:
return &v3.AggregateAttributeResponse{}, nil
default:
return nil, fmt.Errorf("unsupported aggregate operator")
}

query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, tagDataType from %s.%s WHERE %s limit $2", r.logsDB, r.logsTagAttributeTable, where)
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit)
if err != nil {
zap.S().Error(err)
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
}

var tagKey string
var dataType string
var attType string
for rows.Next() {
if err := rows.Scan(&tagKey, &attType, &dataType); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
key := v3.AttributeKey{
Key: tagKey,
DataType: v3.AttributeKeyDataType(dataType),
Type: v3.AttributeKeyType(attType),
IsColumn: isColumn(statements[0].Statement, tagKey),
}
response.AttributeKeys = append(response.AttributeKeys, key)
}
// add other attributes
for _, f := range constants.StaticFieldsLogsV3 {
if len(req.SearchText) == 0 || strings.Contains(f.Key, req.SearchText) {
f.IsColumn = isColumn(statements[0].Statement, f.Key)
response.AttributeKeys = append(response.AttributeKeys, f)
}
}

return &response, nil
}

func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
var query string
var err error
var rows driver.Rows
var response v3.FilterAttributeKeyResponse

if len(req.SearchText) != 0 {
query = fmt.Sprintf("select distinct tagKey, tagType, tagDataType from %s.%s where tagKey ILIKE $1 limit $2", r.logsDB, r.logsTagAttributeTable)
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit)
} else {
query = fmt.Sprintf("select distinct tagKey, tagType, tagDataType from %s.%s limit $1", r.logsDB, r.logsTagAttributeTable)
rows, err = r.db.Query(ctx, query, req.Limit)
}

if err != nil {
zap.S().Error(err)
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()

statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsLocalTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error())
}

var attributeKey string
var attributeDataType string
var tagType string
for rows.Next() {
if err := rows.Scan(&attributeKey, &tagType, &attributeDataType); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}

key := v3.AttributeKey{
Key: attributeKey,
DataType: v3.AttributeKeyDataType(attributeDataType),
Type: v3.AttributeKeyType(tagType),
IsColumn: isColumn(statements[0].Statement, attributeKey),
}

response.AttributeKeys = append(response.AttributeKeys, key)
}

// add other attributes
for _, f := range constants.StaticFieldsLogsV3 {
if len(req.SearchText) == 0 || strings.Contains(f.Key, req.SearchText) {
f.IsColumn = isColumn(statements[0].Statement, f.Key)
response.AttributeKeys = append(response.AttributeKeys, f)
}
}

return &response, nil
}

func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
var err error
var filterValueColumn string
var rows driver.Rows
var attributeValues v3.FilterAttributeValueResponse

// if dataType or tagType is not present return empty response
if len(req.FilterAttributeKeyDataType) == 0 || len(req.TagType) == 0 || req.FilterAttributeKey == "body" {
return &v3.FilterAttributeValueResponse{}, nil
}

// if data type is bool, return true and false
if req.FilterAttributeKeyDataType == v3.AttributeKeyDataTypeBool {
return &v3.FilterAttributeValueResponse{
BoolAttributeValues: []bool{true, false},
}, nil
}

query := "select distinct"
switch req.FilterAttributeKeyDataType {
case v3.AttributeKeyDataTypeInt64:
filterValueColumn = "int64TagValue"
case v3.AttributeKeyDataTypeFloat64:
filterValueColumn = "float64TagValue"
case v3.AttributeKeyDataTypeString:
filterValueColumn = "stringTagValue"
}

searchText := fmt.Sprintf("%%%s%%", req.SearchText)

// check if the tagKey is a topLevelColumn
if _, ok := constants.LogsTopLevelColumnsV3[req.FilterAttributeKey]; ok {
// query the column for the last 48 hours
filterValueColumnWhere := req.FilterAttributeKey
selectKey := req.FilterAttributeKey
if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString {
filterValueColumnWhere = fmt.Sprintf("toString(%s)", req.FilterAttributeKey)
selectKey = fmt.Sprintf("toInt64(%s)", req.FilterAttributeKey)
}

// prepare the query and run
if len(req.SearchText) != 0 {
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) and %s ILIKE $1 limit $2", selectKey, r.logsDB, r.logsTable, filterValueColumnWhere)
rows, err = r.db.Query(ctx, query, searchText, req.Limit)
} else {
query = fmt.Sprintf("select distinct %s from %s.%s where timestamp >= toInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)*1000000000) limit $1", selectKey, r.logsDB, r.logsTable)
rows, err = r.db.Query(ctx, query, req.Limit)
}
} else if len(req.SearchText) != 0 {
filterValueColumnWhere := filterValueColumn
if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString {
filterValueColumnWhere = fmt.Sprintf("toString(%s)", filterValueColumn)
}
query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and %s ILIKE $2 and tagType=$3 limit $4", filterValueColumn, r.logsDB, r.logsTagAttributeTable, filterValueColumnWhere)
rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, searchText, req.TagType, req.Limit)
} else {
query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and tagType=$2 limit $3", filterValueColumn, r.logsDB, r.logsTagAttributeTable)
rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, req.TagType, req.Limit)
}

if err != nil {
zap.S().Error(err)
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()

var strAttributeValue string
var float64AttributeValue sql.NullFloat64
var int64AttributeValue sql.NullInt64
for rows.Next() {
switch req.FilterAttributeKeyDataType {
case v3.AttributeKeyDataTypeInt64:
if err := rows.Scan(&int64AttributeValue); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
if int64AttributeValue.Valid {
attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, int64AttributeValue.Int64)
}
case v3.AttributeKeyDataTypeFloat64:
if err := rows.Scan(&float64AttributeValue); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
if float64AttributeValue.Valid {
attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, float64AttributeValue.Float64)
}
case v3.AttributeKeyDataTypeString:
if err := rows.Scan(&strAttributeValue); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, strAttributeValue)
}
}

return &attributeValues, nil

}

func readRow(vars []interface{}, columnNames []string) ([]string, map[string]string, v3.Point) {
// Each row will have a value and a timestamp, and an optional list of label values
// example: {Timestamp: ..., Value: ...}
Expand Down
6 changes: 3 additions & 3 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2369,7 +2369,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req)
case v3.DataSourceLogs:
// TODO: implement
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
case v3.DataSourceTraces:
// TODO: implement
default:
Expand Down Expand Up @@ -2398,7 +2398,7 @@ func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.R
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req)
case v3.DataSourceLogs:
// TODO: implement
response, err = aH.reader.GetLogAttributeKeys(r.Context(), req)
case v3.DataSourceTraces:
// TODO: implement
default:
Expand Down Expand Up @@ -2427,7 +2427,7 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAttributeValues(r.Context(), req)
case v3.DataSourceLogs:
// TODO: implement
response, err = aH.reader.GetLogAttributeValues(r.Context(), req)
case v3.DataSourceTraces:
// TODO: implement
default:
Expand Down
Loading

0 comments on commit d092905

Please sign in to comment.