Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query protocol and codebase for log type record #2449

Merged
merged 12 commits into from
Apr 7, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ SRC_ENDPOINT_RELATION_SERVER_SIDE: 'endpoint_relation_server_side';
SRC_SERVICE_RELATION_SERVER_SIDE: 'service_relation_server_side';
SRC_SERVICE_RELATION_CLIENT_SIDE: 'service_relation_client_side';
SRC_ALARM_RECORD: 'alarm_record';
SRC_HTTP_ACCESS_LOG: 'http_access_log';

SRC_ZIPKIN_SPAN: 'zipkin_span';
SRC_JAEGER_SPAN: 'jaeger_span';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ source

disableSource
: SRC_SEGMENT | SRC_TOP_N_DB_STATEMENT | SRC_ENDPOINT_RELATION_SERVER_SIDE | SRC_SERVICE_RELATION_SERVER_SIDE |
SRC_SERVICE_RELATION_CLIENT_SIDE | SRC_ALARM_RECORD
SRC_SERVICE_RELATION_CLIENT_SIDE | SRC_ALARM_RECORD | SRC_HTTP_ACCESS_LOG
;

sourceAttribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ private void addQueryService(List<Class> classes) {
classes.add(TopologyQueryService.class);
classes.add(MetricQueryService.class);
classes.add(TraceQueryService.class);
classes.add(LogQueryService.class);
classes.add(MetadataQueryService.class);
classes.add(AggregationQueryService.class);
classes.add(AlarmQueryService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public CoreModuleProvider() {
this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
this.registerServiceImplementation(MetricQueryService.class, new MetricQueryService(getManager()));
this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
this.registerServiceImplementation(LogQueryService.class, new LogQueryService(getManager()));
this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.manual.log;

import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.query.entity.ContentType;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;

/**
* @author wusheng
*/

public abstract class AbstractLogRecord extends Record {

public static final String SERVICE_ID = "service_id";
public static final String SERVICE_INSTANCE_ID = "service_instance_id";
public static final String ENDPOINT_ID = "endpoint_id";
public static final String TRACE_ID = "trace_id";
public static final String IS_ERROR = "is_error";
public static final String STATUS_CODE = "status_code";
public static final String CONTENT_TYPE = "content_type";
public static final String CONTENT = "content";
public static final String TIMESTAMP = "timestamp";

@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int serviceInstanceId;
@Setter @Getter @Column(columnName = ENDPOINT_ID) private int endpointId;
@Setter @Getter @Column(columnName = TRACE_ID) private String traceId;
@Setter @Getter @Column(columnName = IS_ERROR) private int isError;
@Setter @Getter @Column(columnName = STATUS_CODE) private String statusCode;
@Setter @Getter @Column(columnName = CONTENT_TYPE) private int contentType = ContentType.NONE.value();
@Setter @Getter @Column(columnName = CONTENT) private String content;
@Setter @Getter @Column(columnName = TIMESTAMP) private long timestamp;

@Override public String id() {
throw new UnexpectedException("AbstractLogRecord doesn't provide id()");
}

public static abstract class Builder<T extends AbstractLogRecord> implements StorageBuilder<T> {
protected void map2Data(T record, Map<String, Object> dbMap) {
record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
record.setServiceInstanceId(((Number)dbMap.get(SERVICE_INSTANCE_ID)).intValue());
record.setEndpointId(((Number)dbMap.get(ENDPOINT_ID)).intValue());
record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue());
record.setTraceId((String)dbMap.get(TRACE_ID));
record.setStatusCode((String)dbMap.get(STATUS_CODE));
record.setContentType(((Number)dbMap.get(CONTENT_TYPE)).intValue());
record.setContent((String)dbMap.get(CONTENT));
record.setTimestamp(((Number)dbMap.get(TIMESTAMP)).longValue());
record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
}

@Override public Map<String, Object> data2Map(AbstractLogRecord record) {
Map<String, Object> map = new HashMap<>();
map.put(SERVICE_ID, record.getServiceId());
map.put(SERVICE_INSTANCE_ID, record.getServiceInstanceId());
map.put(ENDPOINT_ID, record.getEndpointId());
map.put(TRACE_ID, record.getTraceId());
map.put(IS_ERROR, record.getIsError());
map.put(STATUS_CODE, record.getStatusCode());
map.put(TIME_BUCKET, record.getTimeBucket());
map.put(CONTENT_TYPE, record.getContentType());
map.put(CONTENT, record.getContent());
map.put(TIMESTAMP, record.getTimestamp());
return map;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.manual.log;

import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.source.HTTPAccessLog;

/**
* @author wusheng
*/
public class HTTPAccessLogDispatcher implements SourceDispatcher<HTTPAccessLog> {
@Override public void dispatch(HTTPAccessLog source) {
HTTPAccessLogRecord record = new HTTPAccessLogRecord();
record.setTimestamp(source.getTimestamp());
record.setTimeBucket(source.getTimeBucket());
record.setServiceId(source.getServiceId());
record.setServiceInstanceId(source.getServiceInstanceId());
record.setEndpointId(source.getEndpointId());
record.setTraceId(source.getTraceId());
record.setIsError(source.getIsError());
record.setStatusCode(source.getStatusCode());
record.setContentType(source.getContentType().value());
record.setContent(source.getContent());

RecordProcess.INSTANCE.in(record);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.manual.log;

import java.util.Map;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;

import static org.apache.skywalking.oap.server.core.analysis.manual.log.HTTPAccessLogRecord.INDEX_NAME;

@StorageEntity(name = INDEX_NAME, builder = HTTPAccessLogRecord.Builder.class, sourceScopeId = DefaultScopeDefine.HTTP_ACCESS_LOG)
public class HTTPAccessLogRecord extends AbstractLogRecord {
public static final String INDEX_NAME = "http_access_log";

public static class Builder extends AbstractLogRecord.Builder<HTTPAccessLogRecord> {
@Override public HTTPAccessLogRecord map2Data(Map<String, Object> dbMap) {
HTTPAccessLogRecord record = new HTTPAccessLogRecord();
super.map2Data(record, dbMap);
return record;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ public class SegmentRecord extends Record {
public static final String DATA_BINARY = "data_binary";
public static final String VERSION = "version";

@Setter @Getter @Column(columnName = SEGMENT_ID) @IDColumn private String segmentId;
@Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
@Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId;
@Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) @IDColumn private int serviceInstanceId;
@Setter @Getter @Column(columnName = ENDPOINT_NAME, matchQuery = true) @IDColumn private String endpointName;
@Setter @Getter @Column(columnName = ENDPOINT_ID) @IDColumn private int endpointId;
@Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime;
@Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime;
@Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
@Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
@Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
@Setter @Getter @Column(columnName = VERSION) @IDColumn private int version;
@Setter @Getter @Column(columnName = SEGMENT_ID) private String segmentId;
@Setter @Getter @Column(columnName = TRACE_ID) private String traceId;
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int serviceInstanceId;
@Setter @Getter @Column(columnName = ENDPOINT_NAME, matchQuery = true) private String endpointName;
@Setter @Getter @Column(columnName = ENDPOINT_ID) private int endpointId;
@Setter @Getter @Column(columnName = START_TIME) private long startTime;
@Setter @Getter @Column(columnName = END_TIME) private long endTime;
@Setter @Getter @Column(columnName = LATENCY) private int latency;
@Setter @Getter @Column(columnName = IS_ERROR) private int isError;
@Setter @Getter @Column(columnName = DATA_BINARY) private byte[] dataBinary;
@Setter @Getter @Column(columnName = VERSION) private int version;

@Override public String id() {
return segmentId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.query;

import java.io.IOException;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.module.*;

/**
* @author wusheng
*/
public class LogQueryService implements Service {
private final ModuleManager moduleManager;
private ILogQueryDAO logQueryDAO;
private ServiceInventoryCache serviceInventoryCache;
private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;

public LogQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}

private ILogQueryDAO getLogQueryDAO() {
if (logQueryDAO == null) {
this.logQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(ILogQueryDAO.class);
}
return logQueryDAO;
}

private ServiceInventoryCache getServiceInventoryCache() {
if (serviceInventoryCache == null) {
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
}
return serviceInventoryCache;
}

private ServiceInstanceInventoryCache getServiceInstanceInventoryCache() {
if (serviceInstanceInventoryCache == null) {
this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
}
return serviceInstanceInventoryCache;
}

private EndpointInventoryCache getEndpointInventoryCache() {
if (endpointInventoryCache == null) {
this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
}
return endpointInventoryCache;
}

public Logs queryLogs(final String metricName, int serviceId, int serviceInstanceId, int endpointId,
String traceId, LogState state, String stateCode, Pagination paging, final long startTB,
final long endTB) throws IOException {
PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(paging);

Logs logs = getLogQueryDAO().queryLogs(metricName, serviceId, serviceInstanceId, endpointId,
traceId, state, stateCode, paging, page.getFrom(), page.getLimit(), startTB, endTB);
logs.getLogs().forEach(log -> {
if (log.getServiceId() != Const.NONE) {
log.setServiceName(getServiceInventoryCache().get(log.getServiceId()).getName());
}
if (log.getServiceInstanceId() != Const.NONE) {
log.setServiceInstanceName(getServiceInstanceInventoryCache().get(log.getServiceInstanceId()).getName());
}
if (log.getEndpointId() != Const.NONE) {
log.setEndpointName(getEndpointInventoryCache().get(log.getEndpointId()).getName());
}
});
return logs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.query.entity;

import org.apache.skywalking.oap.server.core.UnexpectedException;

/**
* @author wusheng
*/
public enum ContentType {
TEXT(1), JSON(2), NONE(0);

private int value;

ContentType(int value) {
this.value = value;
}

public int value() {
return value;
}

public static ContentType instanceOf(int value) {
switch (value) {
case 1:
return TEXT;
case 2:
return JSON;
case 0:
return NONE;
default:
throw new UnexpectedException("unexpected value=" + value);
}
}
}
Loading