skywalking-agent数据收集源码讲解

从源码的角度来探讨,skywalking到底如何收集发送数据

代码引用基于v8.7.0版本

GRPCChannelListener grpc状态监控

skywalking通过grpc发送数据,所以,用grpc发送数据的,都需要实现GRPCChannelListener来查看grpc的连接状态

1
2
3
public interface GRPCChannelListener {
void statusChanged(GRPCChannelStatus status);
}

有很多类型的数据需要发送,但是常见的三种

  1. jvm的信息发送
  2. 服务管理信息发送
  3. span信息,也就是plugin收集的监控发送

TraceSegmentServiceClient 发送监控数据

TraceSegmentServiceClient

TraceSegmentServiceClient负责监控数据的发送,同时实现了四个接口,分别时BootService,IConsumer,TracingContextListener,GRPCChannelListener

1
2
3
4
@DefaultImplementor
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {
// ... omit
}

BootService使用java spi机制初始化,接口里面的主要内容就是生命周期,类似spring bean的生命周期,ServiceManager负责初始化所有的BootService,
SkyWalkingAgent启动的时候完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* The <code>BootService</code> is an interface to all remote, which need to boot when plugin mechanism begins to work.
* {@link #boot()} will be called when <code>BootService</code> start up.
*/
public interface BootService {
void prepare() throws Throwable;

void boot() throws Throwable;

void onComplete() throws Throwable;

void shutdown() throws Throwable;

/**
* {@code BootService}s with higher priorities will be started earlier, and shut down later than those {@code BootService}s with lower priorities.
*
* @return the priority of this {@code BootService}.
*/
default int priority() {
return 0;
}
}

IConsumer负责数据的发送,主要由DataCarrier使用,关键方法consume就是使用生产者消费者模式来消费数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface IConsumer<T> {
void init();

void consume(List<T> data);

void onError(List<T> data, Throwable t);

void onExit();

/**
* Notify the implementation, if there is nothing fetched from the queue. This could be used as a timer to trigger
* reaction if the queue has no element.
*/
default void nothingToConsume() {
return;
}
}

TracingContextListener通知一次监控完成,开始生成消费

1
2
3
public interface TracingContextListener {
void afterFinished(TraceSegment traceSegment);
}

自定义消息发送

LocalLogTraceSegmentServiceClient写到本地文件中,记得添加spi文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@OverrideImplementor(TraceSegmentServiceClient.class)//替换原有的TraceSegmentServiceClient
public class LocalLogTraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener {

private volatile DataCarrier<TraceSegment> carrier;

@Override
public void prepare() throws Throwable {
}

@Override
public void boot() throws Throwable {
//创建dataCarrier
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
carrier.consume(this, 1);
}

@Override
public void onComplete() throws Throwable {
//添加到TracingContext监听
TracingContext.ListenerManager.add(this);
}

@Override
public void shutdown() throws Throwable {
TracingContext.ListenerManager.remove(this);
carrier.shutdownConsumers();
}

@Override
public void init() {
}

@Override
public void consume(List<TraceSegment> data) {
//假设写入本地文件
System.out.print(data);
}

@Override
public void onError(List<TraceSegment> data, Throwable t) {
System.out.print(data);
}

@Override
public void onExit() {
}

@Override
public void afterFinished(TraceSegment traceSegment) {
if (traceSegment.isIgnore()) {
return;
}
//生产数据
carrier.produce(traceSegment);
}
}