skywalking-oap数据接收

代码引用基于v8.7.0版本

GRPCHandler入手,查看skywalking的收集体系

GRPCHandler只是一个标识接口,没有具体的含义

1
2
public interface GRPCHandler extends ServerHandler {
}

可以看到里面有很多的接收器,分别处理不同的发送

  1. 监控数据的接收器

TraceSegmentReportServiceHandler

TraceSegmentReportServiceHandler是一个grpc的类,继承了grpc相关的类,collect里面负责解析SegmentObject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public StreamObserver<SegmentObject> collect(StreamObserver<Commands> responseObserver) {
return new StreamObserver<SegmentObject>() {
@Override
public void onNext(SegmentObject segment) {
if (log.isDebugEnabled()) {
log.debug("received segment in streaming");
}

HistogramMetrics.Timer timer = histogram.createTimer();
try {
//segmentParserService解析SegmentObject
segmentParserService.send(segment);
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
} finally {
timer.finish();
}
}

@Override
public void onError(Throwable throwable) {
log.error(throwable.getMessage(), throwable);
responseObserver.onCompleted();
}

@Override
public void onCompleted() {
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
}
};
}

SegmentParserServiceImpl

负责SegmentObject的解析,但主要使用了委托模式,真正执行逻辑的是TraceAnalyzer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RequiredArgsConstructor
public class SegmentParserServiceImpl implements ISegmentParserService {
private final ModuleManager moduleManager;
private final AnalyzerModuleConfig config;
@Setter
private SegmentParserListenerManager listenerManager;

@Override
public void send(SegmentObject segment) {
final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
// 委托给TraceAnalyzer执行
traceAnalyzer.doAnalysis(segment);
}
}

traceAnalyzer

分析Span的类型,主要有三种,然后分别构建到不同的队列里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void doAnalysis(SegmentObject segmentObject) {
if (segmentObject.getSpansList().size() == 0) {
return;
}

createSpanListeners();

notifySegmentListener(segmentObject);

segmentObject.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
notifyFirstListener(spanObject, segmentObject);
}

if (SpanType.Exit.equals(spanObject.getSpanType())) {
notifyExitListener(spanObject, segmentObject);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
notifyEntryListener(spanObject, segmentObject);
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
notifyLocalListener(spanObject, segmentObject);
} else {
log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
.name());
}
});

notifyListenerToBuild();
}
1
2
3
private void notifyListenerToBuild() {
analysisListeners.forEach(AnalysisListener::build);
}

MultiScopesAnalysisListener

AnalysisListener::build顺藤摸瓜,就找到了MultiScopesAnalysisListener,MultiScopesAnalysisListener的作用就是把Span分类,通过不同的sourceReceiver接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
public void build() {
entrySourceBuilders.forEach(entrySourceBuilder -> {
entrySourceBuilder.prepare();
sourceReceiver.receive(entrySourceBuilder.toAll());
sourceReceiver.receive(entrySourceBuilder.toService());
sourceReceiver.receive(entrySourceBuilder.toServiceInstance());
sourceReceiver.receive(entrySourceBuilder.toEndpoint());
sourceReceiver.receive(entrySourceBuilder.toServiceRelation());
sourceReceiver.receive(entrySourceBuilder.toServiceInstanceRelation());
EndpointRelation endpointRelation = entrySourceBuilder.toEndpointRelation();
/*
* Parent endpoint could be none, because in SkyWalking Cross Process Propagation Headers Protocol v2,
* endpoint in ref could be empty, based on that, endpoint relation maybe can't be established.
* So, I am making this source as optional.
*
* Also, since 6.6.0, source endpoint could be none, if this trace begins by an internal task(local span or exit span), such as Timer,
* rather than, normally begin as an entry span, like a RPC server side.
*/
if (endpointRelation != null) {
sourceReceiver.receive(endpointRelation);
}
});

exitSourceBuilders.forEach(exitSourceBuilder -> {
exitSourceBuilder.prepare();
sourceReceiver.receive(exitSourceBuilder.toServiceRelation());

/*
* Some of the agent can not have the upstream real network address, such as https://github.com/apache/skywalking-nginx-lua.
*/
final ServiceInstanceRelation serviceInstanceRelation = exitSourceBuilder.toServiceInstanceRelation();
if (serviceInstanceRelation != null) {
sourceReceiver.receive(serviceInstanceRelation);
}
if (RequestType.DATABASE.equals(exitSourceBuilder.getType())) {
sourceReceiver.receive(exitSourceBuilder.toServiceMeta());
sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess());
}
});

dbSlowStatementBuilders.forEach(dbSlowStatBuilder -> {
dbSlowStatBuilder.prepare();
sourceReceiver.receive(dbSlowStatBuilder.toDatabaseSlowStatement());
});

logicEndpointBuilders.forEach(logicEndpointBuilder -> {
logicEndpointBuilder.prepare();
sourceReceiver.receive(logicEndpointBuilder.toEndpoint());
});
}

SourceReceiver

负责监控的收集和转发,主要实现是SourceReceiverImpl,里面有转发的逻辑

1
2
3
4
5
public interface SourceReceiver extends Service {
void receive(ISource source);

DispatcherDetectorListener getDispatcherDetectorListener();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SourceReceiverImpl implements SourceReceiver {
@Getter
private final DispatcherManager dispatcherManager;

public SourceReceiverImpl() {
this.dispatcherManager = new DispatcherManager();
}

@Override
public void receive(ISource source) {
//在这里转发
dispatcherManager.forward(source);
}

@Override
public DispatcherDetectorListener getDispatcherDetectorListener() {
return getDispatcherManager();
}

public void scan() throws IOException, InstantiationException, IllegalAccessException {
dispatcherManager.scan();
}
}

DispatcherManager

负责SourceDispatcher的转发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private Map<Integer, List<SourceDispatcher>> dispatcherMap;

public DispatcherManager() {
this.dispatcherMap = new HashMap<>();
}

public void forward(ISource source) {
if (source == null) {
return;
}

List<SourceDispatcher> dispatchers = dispatcherMap.get(source.scope());

/**
* Dispatcher is only generated by oal script analysis result.
* So these will/could be possible, the given source doesn't have the dispatcher,
* when the receiver is open, and oal script doesn't ask for analysis.
*/
if (dispatchers != null) {
source.prepare();
for (SourceDispatcher dispatcher : dispatchers) {
//就是在这里转发
dispatcher.dispatch(source);
}
}
}

SourceDispatcher

有非常多得实现类,具体转发到那里

1
2
3
public interface SourceDispatcher<SOURCE extends ISource> {
void dispatch(SOURCE source);
}

我们重点关注SegmentDispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SegmentDispatcher implements SourceDispatcher<Segment> {

@Override
public void dispatch(Segment source) {
SegmentRecord segment = new SegmentRecord();
segment.setSegmentId(source.getSegmentId());
segment.setTraceId(source.getTraceId());
segment.setServiceId(source.getServiceId());
segment.setServiceInstanceId(source.getServiceInstanceId());
segment.setEndpointName(source.getEndpointName());
segment.setEndpointId(source.getEndpointId());
segment.setStartTime(source.getStartTime());
segment.setEndTime(source.getEndTime());
segment.setLatency(source.getLatency());
segment.setIsError(source.getIsError());
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setVersion(source.getVersion());
segment.setTagsRawData(source.getTags());
segment.setTags(Tag.Util.toStringList(source.getTags()));

//这里就是要存到数据库了
RecordStreamProcessor.getInstance().in(segment);
}
}

RecordStreamProcessor

还是代理模式,内部使用RecordPersistentWorker转发

1
2
3
4
5
6
7
8
@Override
public void in(Record record) {
//使用RecordPersistentWorker来处理
RecordPersistentWorker worker = workers.get(record.getClass());
if (worker != null) {
worker.in(record);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RecordPersistentWorker extends AbstractWorker<Record> {

private static final Logger LOGGER = LoggerFactory.getLogger(RecordPersistentWorker.class);

private final Model model;
private final IRecordDAO recordDAO;
//数据库的dao
private final IBatchDAO batchDAO;

RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) {
super(moduleDefineHolder);
this.model = model;
this.recordDAO = recordDAO;
this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
}

@Override
public void in(Record record) {
try {
InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
//在这里,终于存到数据库dao了
batchDAO.insert(insertRequest);
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
}
}

总结

可以看到大量使用了装饰者模式,层层委托,抽象非常多

  1. TraceSegmentReportServiceHandler处理数据
  2. SegmentParserServiceImpl解析
  3. traceAnalyzer解析
  4. MultiScopesAnalysisListener解析
  5. SourceReceiver转发
  6. DispatcherManager转发
  7. SourceDispatcher转发
  8. RecordStreamProcessor最终使用dao执行