Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Backend acts as pure Zipkin collector #2424

Merged
merged 10 commits into from
Mar 30, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions docs/en/setup/backend/backend-receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **receiver-jvm**. gRPC services accept JVM metric data.
1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services.
1. **envoy-metric**. Envoy `metrics_service` supported by this receiver. OAL script support all GAUGE type metrics.
1. **receiver_zipkin**. HTTP service accepts Span in Zipkin v1 and v2 formats. Notice, this receiver only
works as expected in backend single node mode. Cluster mode is not supported. Welcome anyone to improve this.
1. **receiver_zipkin**. See [details](#zipkin-receiver).

The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml
Expand Down Expand Up @@ -59,4 +58,27 @@ receiver-sharing-server:
```

Notice, if you add these settings, make sure they are not as same as core module,
because gRPC/HTTP servers of core are still used for UI and OAP internal communications.
because gRPC/HTTP servers of core are still used for UI and OAP internal communications.

## Zipkin receiver
Zipkin receiver could work in two different mode.
1. Tracing mode(default). Tracing mode is that, skywalking OAP acts like zipkin collector,
fully supports Zipkin v1/v2 formats through HTTP service,
also provide persistence and query in skywalking UI.
But it wouldn't analysis metric from them. In most case, I suggest you could use this feature, when metrics come from service mesh.
Notice, in this mode, Zipkin receiver requires `zipkin-elasticsearch` storage implementation active.
Read [this](backend-storage.md#elasticsearch-6-with-zipkin-trace-extension) to know
how to active.
1. Analysis mode(Not production ready), receive Zipkin v1/v2 formats through HTTP service. Transform the trace to skywalking
native format, and analysis like skywalking trace. This feature can't work in production env right now,
because of Zipkin tag/endpoint value unpredictable, we can't make sure it fits production env requirements.

Active `analysis mode`, you should set `needAnalysis` config.
```yaml
receiver_zipkin:
default:
host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0}
port: ${SW_RECEIVER_ZIPKIN_PORT:9411}
contextPath: ${SW_RECEIVER_ZIPKIN_CONTEXT_PATH:/}
needAnalysis: true
```
20 changes: 20 additions & 0 deletions docs/en/setup/backend/backend-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ storage:
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```

### ElasticSearch 6 with Zipkin trace extension
This implementation shares most of `elasticsearch`, just extend to support zipkin span storage.
It has all same configs.
```yaml
storage:
zipkin-elasticsearch:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
```


### About Namespace
When namespace is set, names of all indexes in ElasticSearch will use it as prefix.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,18 @@ public Trace queryTrace(final String traceId) throws IOException {
Trace trace = new Trace();

List<SegmentRecord> segmentRecords = getTraceQueryDAO().queryByTraceId(traceId);
for (SegmentRecord segment : segmentRecords) {
if (nonNull(segment)) {
if (segment.getVersion() == 2) {
SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());
trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
} else {
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
if (segmentRecords.isEmpty()) {
trace.getSpans().addAll(getTraceQueryDAO().doFlexibleTraceQuery(traceId));
} else {
for (SegmentRecord segment : segmentRecords) {
if (nonNull(segment)) {
if (segment.getVersion() == 2) {
SegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());
trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
} else {
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getDataBinary());
trace.getSpans().addAll(buildSpanList(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,12 @@
public class KeyValue {
private String key;
private String value;

public KeyValue(String key, String value) {
this.key = key;
this.value = value;
}

public KeyValue() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class DefaultScopeDefine {
public static final int SERVICE_INSTANCE_CLR_GC = 20;
public static final int SERVICE_INSTANCE_CLR_THREAD = 21;
public static final int ENVOY_INSTANCE_METRIC = 22;
public static final int ZIPKIN_SPAN = 23;

/**
* Catalog of scope, the indicator processor could use this to group all generated indicators by oal tool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,12 @@ TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDurati
int limit, int from, TraceState traceState, QueryOrder queryOrder) throws IOException;

List<SegmentRecord> queryByTraceId(String traceId) throws IOException;

/**
* This method gives more flexible for unnative
* @param traceId
* @return
* @throws IOException
*/
List<Span> doFlexibleTraceQuery(String traceId) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<artifactId>skywalking-trace-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-zipkin-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-register-receiver-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.skywalking.oap.server.receiver.zipkin;

import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;

public class CoreRegisterLinker {
private static volatile ModuleManager MODULE_MANAGER;
private static volatile IServiceInventoryRegister SERVICE_INVENTORY_REGISTER;
private static volatile IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER;
private static volatile IEndpointInventoryRegister ENDPOINT_INVENTORY_REGISTER;

public static void setModuleManager(ModuleManager moduleManager) {
CoreRegisterLinker.MODULE_MANAGER = moduleManager;
Expand All @@ -45,4 +45,11 @@ public static IServiceInstanceInventoryRegister getServiceInstanceInventoryRegis
}
return SERVICE_INSTANCE_INVENTORY_REGISTER;
}

public static IEndpointInventoryRegister getEndpointInventoryRegister() {
if (ENDPOINT_INVENTORY_REGISTER == null) {
ENDPOINT_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(IEndpointInventoryRegister.class);
}
return ENDPOINT_INVENTORY_REGISTER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,57 +18,20 @@

package org.apache.skywalking.oap.server.receiver.zipkin;

import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;

/**
* @author wusheng
*/
@Setter
@Getter
public class ZipkinReceiverConfig extends ModuleConfig {
private String host;
private int port;
private String contextPath;

private int expireTime = 20;

private int maxCacheSize = 1_000_000;

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public String getContextPath() {
return contextPath;
}

public void setContextPath(String contextPath) {
this.contextPath = contextPath;
}

public int getExpireTime() {
return expireTime;
}

public void setExpireTime(int expireTime) {
this.expireTime = expireTime;
}

public int getMaxCacheSize() {
return maxCacheSize;
}

public void setMaxCacheSize(int maxCacheSize) {
this.maxCacheSize = maxCacheSize;
}
private boolean needAnalysis = false;
private boolean registerZipkinEndpoint = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.skywalking.oap.server.receiver.zipkin;

import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
Expand All @@ -27,9 +28,10 @@
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;

/**
* @author wusheng
Expand Down Expand Up @@ -65,12 +67,14 @@ public ZipkinReceiverProvider() {
jettyServer = new JettyServer(config.getHost(), config.getPort(), config.getContextPath());
jettyServer.initialize();

jettyServer.addHandler(new SpanV1JettyHandler(config));
jettyServer.addHandler(new SpanV2JettyHandler(config));
jettyServer.addHandler(new SpanV1JettyHandler(config, getManager()));
jettyServer.addHandler(new SpanV2JettyHandler(config, getManager()));

ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class);
Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
if (config.isNeedAnalysis()) {
ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class);
Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
}
}

@Override public void notifyAfterCompleted() throws ModuleStartException {
Expand All @@ -82,6 +86,13 @@ public ZipkinReceiverProvider() {
}

@Override public String[] requiredModules() {
return new String[] {TraceModule.NAME};
if (config.isNeedAnalysis()) {
return new String[] {TraceModule.NAME};
} else {
/**
* In pure trace status, we don't need the trace receiver.
*/
return new String[] {CoreModule.NAME};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*
*/

package org.apache.skywalking.oap.server.receiver.zipkin;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis;

import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.SegmentListener;
import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.SegmentListener;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer;

/**
* Send the segments to Analysis module, like receiving segments from native SkyWalking agents.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.receiver.zipkin.analysis;

import java.util.List;
import org.apache.skywalking.oap.server.receiver.zipkin.*;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.CacheFactory;
import zipkin2.Span;

public class ZipkinSkyWalkingTransfer {
public void doTransfer(ZipkinReceiverConfig config, List<Span> spanList) {
spanList.forEach(span -> {
// In Zipkin, the local service name represents the application owner.
String applicationCode = span.localServiceName();
if (applicationCode != null) {
int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode, null);
if (applicationId != 0) {
CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode,
span.timestampAsLong(),
ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode));
}
}

CacheFactory.INSTANCE.get(config).addSpan(span);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.skywalking.oap.server.receiver.zipkin;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis;

import com.google.gson.JsonObject;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*
*/

package org.apache.skywalking.oap.server.receiver.zipkin.cache;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;

import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine.CaffeineSpanCache;
import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine.CaffeineSpanCache;

/**
* @author wusheng
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.skywalking.oap.server.receiver.zipkin.cache;
package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache;

import zipkin2.Span;

Expand Down
Loading