skywalking-oap数据接收 代码引用基于v8.7.0版本 
从GRPCHandler入手,查看skywalking的收集体系 GRPCHandler只是一个标识接口,没有具体的含义
1 2 public  interface  GRPCHandler  extends  ServerHandler  {} 
 
 
可以看到里面有很多的接收器,分别处理不同的发送
监控数据的接收器 
 
TraceSegmentReportServiceHandlerTraceSegmentReportServiceHandler是一个grpc的类,继承了grpc相关的类,collect里面负责解析SegmentObject
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override     public  StreamObserver<SegmentObject> collect (StreamObserver<Commands> responseObserver)  {         return  new  StreamObserver <SegmentObject>() {             @Override              public  void  onNext (SegmentObject segment)  {                 if  (log.isDebugEnabled()) {                     log.debug("received segment in streaming" );                 }                 HistogramMetrics.Timer  timer  =  histogram.createTimer();                 try  {                                          segmentParserService.send(segment);                 } catch  (Exception e) {                     errorCounter.inc();                     log.error(e.getMessage(), e);                 } finally  {                     timer.finish();                 }             }             @Override              public  void  onError (Throwable throwable)  {                 log.error(throwable.getMessage(), throwable);                 responseObserver.onCompleted();             }             @Override              public  void  onCompleted ()  {                 responseObserver.onNext(Commands.newBuilder().build());                 responseObserver.onCompleted();             }         };     } 
 
SegmentParserServiceImpl负责SegmentObject的解析,但主要使用了委托模式,真正执行逻辑的是TraceAnalyzer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @RequiredArgsConstructor public  class  SegmentParserServiceImpl  implements  ISegmentParserService  {    private  final  ModuleManager moduleManager;     private  final  AnalyzerModuleConfig config;     @Setter      private  SegmentParserListenerManager listenerManager;     @Override      public  void  send (SegmentObject segment)  {         final  TraceAnalyzer  traceAnalyzer  =  new  TraceAnalyzer (moduleManager, listenerManager, config);                  traceAnalyzer.doAnalysis(segment);     } } 
 
traceAnalyzer分析Span的类型,主要有三种,然后分别构建到不同的队列里面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public  void  doAnalysis (SegmentObject segmentObject)  {        if  (segmentObject.getSpansList().size() == 0 ) {             return ;         }         createSpanListeners();         notifySegmentListener(segmentObject);         segmentObject.getSpansList().forEach(spanObject -> {             if  (spanObject.getSpanId() == 0 ) {                 notifyFirstListener(spanObject, segmentObject);             }             if  (SpanType.Exit.equals(spanObject.getSpanType())) {                 notifyExitListener(spanObject, segmentObject);             } else  if  (SpanType.Entry.equals(spanObject.getSpanType())) {                 notifyEntryListener(spanObject, segmentObject);             } else  if  (SpanType.Local.equals(spanObject.getSpanType())) {                 notifyLocalListener(spanObject, segmentObject);             } else  {                 log.error("span type value was unexpected, span type name: {}" , spanObject.getSpanType()                                                                                           .name());             }         });         notifyListenerToBuild();     } 
 
1 2 3 private  void  notifyListenerToBuild ()  {    analysisListeners.forEach(AnalysisListener::build); } 
 
MultiScopesAnalysisListener 从AnalysisListener::build顺藤摸瓜,就找到了MultiScopesAnalysisListener,MultiScopesAnalysisListener的作用就是把Span分类,通过不同的sourceReceiver接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Override     public  void  build ()  {         entrySourceBuilders.forEach(entrySourceBuilder -> {             entrySourceBuilder.prepare();             sourceReceiver.receive(entrySourceBuilder.toAll());             sourceReceiver.receive(entrySourceBuilder.toService());             sourceReceiver.receive(entrySourceBuilder.toServiceInstance());             sourceReceiver.receive(entrySourceBuilder.toEndpoint());             sourceReceiver.receive(entrySourceBuilder.toServiceRelation());             sourceReceiver.receive(entrySourceBuilder.toServiceInstanceRelation());             EndpointRelation  endpointRelation  =  entrySourceBuilder.toEndpointRelation();                          if  (endpointRelation != null ) {                 sourceReceiver.receive(endpointRelation);             }         });         exitSourceBuilders.forEach(exitSourceBuilder -> {             exitSourceBuilder.prepare();             sourceReceiver.receive(exitSourceBuilder.toServiceRelation());                          final  ServiceInstanceRelation  serviceInstanceRelation  =  exitSourceBuilder.toServiceInstanceRelation();             if  (serviceInstanceRelation != null ) {                 sourceReceiver.receive(serviceInstanceRelation);             }             if  (RequestType.DATABASE.equals(exitSourceBuilder.getType())) {                 sourceReceiver.receive(exitSourceBuilder.toServiceMeta());                 sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess());             }         });         dbSlowStatementBuilders.forEach(dbSlowStatBuilder -> {             dbSlowStatBuilder.prepare();             sourceReceiver.receive(dbSlowStatBuilder.toDatabaseSlowStatement());         });         logicEndpointBuilders.forEach(logicEndpointBuilder -> {             logicEndpointBuilder.prepare();             sourceReceiver.receive(logicEndpointBuilder.toEndpoint());         });     } 
 
SourceReceiver负责监控的收集和转发,主要实现是SourceReceiverImpl,里面有转发的逻辑
1 2 3 4 5 public  interface  SourceReceiver  extends  Service  {    void  receive (ISource source) ;     DispatcherDetectorListener getDispatcherDetectorListener () ; } 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  class  SourceReceiverImpl  implements  SourceReceiver  {    @Getter      private  final  DispatcherManager dispatcherManager;     public  SourceReceiverImpl ()  {         this .dispatcherManager = new  DispatcherManager ();     }     @Override      public  void  receive (ISource source)  {              dispatcherManager.forward(source);     }     @Override      public  DispatcherDetectorListener getDispatcherDetectorListener ()  {         return  getDispatcherManager();     }     public  void  scan ()  throws  IOException, InstantiationException, IllegalAccessException {         dispatcherManager.scan();     } } 
 
DispatcherManager负责SourceDispatcher的转发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private  Map<Integer, List<SourceDispatcher>> dispatcherMap;public  DispatcherManager ()  {    this .dispatcherMap = new  HashMap <>(); } public  void  forward (ISource source)  {    if  (source == null ) {         return ;     }     List<SourceDispatcher> dispatchers = dispatcherMap.get(source.scope());          if  (dispatchers != null ) {         source.prepare();         for  (SourceDispatcher dispatcher : dispatchers) {                      dispatcher.dispatch(source);         }     } } 
 
SourceDispatcher有非常多得实现类,具体转发到那里
1 2 3 public  interface  SourceDispatcher <SOURCE extends  ISource > {    void  dispatch (SOURCE source) ; } 
 
 
我们重点关注SegmentDispatcher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public  class  SegmentDispatcher  implements  SourceDispatcher <Segment> {    @Override      public  void  dispatch (Segment source)  {         SegmentRecord  segment  =  new  SegmentRecord ();         segment.setSegmentId(source.getSegmentId());         segment.setTraceId(source.getTraceId());         segment.setServiceId(source.getServiceId());         segment.setServiceInstanceId(source.getServiceInstanceId());         segment.setEndpointName(source.getEndpointName());         segment.setEndpointId(source.getEndpointId());         segment.setStartTime(source.getStartTime());         segment.setEndTime(source.getEndTime());         segment.setLatency(source.getLatency());         segment.setIsError(source.getIsError());         segment.setDataBinary(source.getDataBinary());         segment.setTimeBucket(source.getTimeBucket());         segment.setVersion(source.getVersion());         segment.setTagsRawData(source.getTags());         segment.setTags(Tag.Util.toStringList(source.getTags()));         RecordStreamProcessor.getInstance().in(segment);     } } 
 
RecordStreamProcessor还是代理模式,内部使用RecordPersistentWorker转发
1 2 3 4 5 6 7 8 @Override     public  void  in (Record record)  {              RecordPersistentWorker  worker  =  workers.get(record.getClass());         if  (worker != null ) {             worker.in(record);         }     } 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public  class  RecordPersistentWorker  extends  AbstractWorker <Record> {    private  static  final  Logger  LOGGER  =  LoggerFactory.getLogger(RecordPersistentWorker.class);     private  final  Model model;     private  final  IRecordDAO recordDAO;          private  final  IBatchDAO batchDAO;     RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) {         super (moduleDefineHolder);         this .model = model;         this .recordDAO = recordDAO;         this .batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);     }     @Override      public  void  in (Record record)  {         try  {             InsertRequest  insertRequest  =  recordDAO.prepareBatchInsert(model, record);                          batchDAO.insert(insertRequest);         } catch  (IOException e) {             LOGGER.error(e.getMessage(), e);         }     } } 
 
总结 可以看到大量使用了装饰者模式,层层委托,抽象非常多
TraceSegmentReportServiceHandler处理数据 
SegmentParserServiceImpl解析 
traceAnalyzer解析 
MultiScopesAnalysisListener解析 
SourceReceiver转发 
DispatcherManager转发 
SourceDispatcher转发 
RecordStreamProcessor最终使用dao执行