skywalking-oap数据接收 代码引用基于v8.7.0
版本
从GRPCHandler
入手,查看skywalking的收集体系 GRPCHandler
只是一个标识接口,没有具体的含义
1 2 public interface GRPCHandler extends ServerHandler {}
可以看到里面有很多的接收器,分别处理不同的发送
监控数据的接收器
TraceSegmentReportServiceHandler
TraceSegmentReportServiceHandler
是一个grpc的类,继承了grpc相关的类,collect
里面负责解析SegmentObject
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public StreamObserver<SegmentObject> collect (StreamObserver<Commands> responseObserver) { return new StreamObserver <SegmentObject>() { @Override public void onNext (SegmentObject segment) { if (log.isDebugEnabled()) { log.debug("received segment in streaming" ); } HistogramMetrics.Timer timer = histogram.createTimer(); try { segmentParserService.send(segment); } catch (Exception e) { errorCounter.inc(); log.error(e.getMessage(), e); } finally { timer.finish(); } } @Override public void onError (Throwable throwable) { log.error(throwable.getMessage(), throwable); responseObserver.onCompleted(); } @Override public void onCompleted () { responseObserver.onNext(Commands.newBuilder().build()); responseObserver.onCompleted(); } }; }
SegmentParserServiceImpl
负责SegmentObject
的解析,但主要使用了委托模式,真正执行逻辑的是TraceAnalyzer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @RequiredArgsConstructor public class SegmentParserServiceImpl implements ISegmentParserService { private final ModuleManager moduleManager; private final AnalyzerModuleConfig config; @Setter private SegmentParserListenerManager listenerManager; @Override public void send (SegmentObject segment) { final TraceAnalyzer traceAnalyzer = new TraceAnalyzer (moduleManager, listenerManager, config); traceAnalyzer.doAnalysis(segment); } }
traceAnalyzer
分析Span
的类型,主要有三种,然后分别构建到不同的队列里面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void doAnalysis (SegmentObject segmentObject) { if (segmentObject.getSpansList().size() == 0 ) { return ; } createSpanListeners(); notifySegmentListener(segmentObject); segmentObject.getSpansList().forEach(spanObject -> { if (spanObject.getSpanId() == 0 ) { notifyFirstListener(spanObject, segmentObject); } if (SpanType.Exit.equals(spanObject.getSpanType())) { notifyExitListener(spanObject, segmentObject); } else if (SpanType.Entry.equals(spanObject.getSpanType())) { notifyEntryListener(spanObject, segmentObject); } else if (SpanType.Local.equals(spanObject.getSpanType())) { notifyLocalListener(spanObject, segmentObject); } else { log.error("span type value was unexpected, span type name: {}" , spanObject.getSpanType() .name()); } }); notifyListenerToBuild(); }
1 2 3 private void notifyListenerToBuild () { analysisListeners.forEach(AnalysisListener::build); }
MultiScopesAnalysisListener 从AnalysisListener::build
顺藤摸瓜,就找到了MultiScopesAnalysisListener
,MultiScopesAnalysisListener
的作用就是把Span
分类,通过不同的sourceReceiver
接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Override public void build () { entrySourceBuilders.forEach(entrySourceBuilder -> { entrySourceBuilder.prepare(); sourceReceiver.receive(entrySourceBuilder.toAll()); sourceReceiver.receive(entrySourceBuilder.toService()); sourceReceiver.receive(entrySourceBuilder.toServiceInstance()); sourceReceiver.receive(entrySourceBuilder.toEndpoint()); sourceReceiver.receive(entrySourceBuilder.toServiceRelation()); sourceReceiver.receive(entrySourceBuilder.toServiceInstanceRelation()); EndpointRelation endpointRelation = entrySourceBuilder.toEndpointRelation(); if (endpointRelation != null ) { sourceReceiver.receive(endpointRelation); } }); exitSourceBuilders.forEach(exitSourceBuilder -> { exitSourceBuilder.prepare(); sourceReceiver.receive(exitSourceBuilder.toServiceRelation()); final ServiceInstanceRelation serviceInstanceRelation = exitSourceBuilder.toServiceInstanceRelation(); if (serviceInstanceRelation != null ) { sourceReceiver.receive(serviceInstanceRelation); } if (RequestType.DATABASE.equals(exitSourceBuilder.getType())) { sourceReceiver.receive(exitSourceBuilder.toServiceMeta()); sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess()); } }); dbSlowStatementBuilders.forEach(dbSlowStatBuilder -> { dbSlowStatBuilder.prepare(); sourceReceiver.receive(dbSlowStatBuilder.toDatabaseSlowStatement()); }); logicEndpointBuilders.forEach(logicEndpointBuilder -> { logicEndpointBuilder.prepare(); sourceReceiver.receive(logicEndpointBuilder.toEndpoint()); }); }
SourceReceiver
负责监控的收集和转发,主要实现是SourceReceiverImpl
,里面有转发的逻辑
1 2 3 4 5 public interface SourceReceiver extends Service { void receive (ISource source) ; DispatcherDetectorListener getDispatcherDetectorListener () ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class SourceReceiverImpl implements SourceReceiver { @Getter private final DispatcherManager dispatcherManager; public SourceReceiverImpl () { this .dispatcherManager = new DispatcherManager (); } @Override public void receive (ISource source) { dispatcherManager.forward(source); } @Override public DispatcherDetectorListener getDispatcherDetectorListener () { return getDispatcherManager(); } public void scan () throws IOException, InstantiationException, IllegalAccessException { dispatcherManager.scan(); } }
DispatcherManager
负责SourceDispatcher
的转发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private Map<Integer, List<SourceDispatcher>> dispatcherMap;public DispatcherManager () { this .dispatcherMap = new HashMap <>(); } public void forward (ISource source) { if (source == null ) { return ; } List<SourceDispatcher> dispatchers = dispatcherMap.get(source.scope()); if (dispatchers != null ) { source.prepare(); for (SourceDispatcher dispatcher : dispatchers) { dispatcher.dispatch(source); } } }
SourceDispatcher
有非常多得实现类,具体转发到那里
1 2 3 public interface SourceDispatcher <SOURCE extends ISource > { void dispatch (SOURCE source) ; }
我们重点关注SegmentDispatcher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class SegmentDispatcher implements SourceDispatcher <Segment> { @Override public void dispatch (Segment source) { SegmentRecord segment = new SegmentRecord (); segment.setSegmentId(source.getSegmentId()); segment.setTraceId(source.getTraceId()); segment.setServiceId(source.getServiceId()); segment.setServiceInstanceId(source.getServiceInstanceId()); segment.setEndpointName(source.getEndpointName()); segment.setEndpointId(source.getEndpointId()); segment.setStartTime(source.getStartTime()); segment.setEndTime(source.getEndTime()); segment.setLatency(source.getLatency()); segment.setIsError(source.getIsError()); segment.setDataBinary(source.getDataBinary()); segment.setTimeBucket(source.getTimeBucket()); segment.setVersion(source.getVersion()); segment.setTagsRawData(source.getTags()); segment.setTags(Tag.Util.toStringList(source.getTags())); RecordStreamProcessor.getInstance().in(segment); } }
RecordStreamProcessor
还是代理模式,内部使用RecordPersistentWorker
转发
1 2 3 4 5 6 7 8 @Override public void in (Record record) { RecordPersistentWorker worker = workers.get(record.getClass()); if (worker != null ) { worker.in(record); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class RecordPersistentWorker extends AbstractWorker <Record> { private static final Logger LOGGER = LoggerFactory.getLogger(RecordPersistentWorker.class); private final Model model; private final IRecordDAO recordDAO; private final IBatchDAO batchDAO; RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) { super (moduleDefineHolder); this .model = model; this .recordDAO = recordDAO; this .batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class); } @Override public void in (Record record) { try { InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record); batchDAO.insert(insertRequest); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } } }
总结 可以看到大量使用了装饰者模式,层层委托,抽象非常多
TraceSegmentReportServiceHandler
处理数据
SegmentParserServiceImpl
解析
traceAnalyzer
解析
MultiScopesAnalysisListener
解析
SourceReceiver
转发
DispatcherManager
转发
SourceDispatcher
转发
RecordStreamProcessor
最终使用dao执行