Warning: error_log(/data/www/wwwroot/hmttv.cn/caches/error_log.php): failed to open stream: Permission denied in /data/www/wwwroot/hmttv.cn/phpcms/libs/functions/global.func.php on line 537 Warning: error_log(/data/www/wwwroot/hmttv.cn/caches/error_log.php): failed to open stream: Permission denied in /data/www/wwwroot/hmttv.cn/phpcms/libs/functions/global.func.php on line 537 国产性猛交xx乱,欧美成人午夜视频免看,欧美日本视频在线观看

          整合營銷服務(wù)商

          電腦端+手機(jī)端+微信端=數(shù)據(jù)同步管理

          免費(fèi)咨詢熱線:

          Spring 源碼-BeanFactoryPostProcessor怎么執(zhí)行的(6)

          bstractApplicationContext提供的postProcessBeanFactory空方法

          postProcessBeanFactory這個(gè)方法沒名字跟BeanFactoryPostProcessor接口中的方法一樣,但是他的功能是提供給子類進(jìn)行添加一些額外的功能,比如添加BeanPostProcessor接口的實(shí)現(xiàn),或者定制一些其他的功能也是可以的,因?yàn)檫@個(gè)方法你可以拿到BeanFactory,自然是可以對(duì)他進(jìn)行一些功能的定制的。

          這里看下Spring 提供的子類GenericWebApplicationContext是如何實(shí)現(xiàn)的:

          @Override
          protected void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
            if (this.servletContext != null) {
              beanFactory.addBeanPostProcessor(new ServletContextAwareProcessor(this.servletContext));
              beanFactory.ignoreDependencyInterface(ServletContextAware.class);
            }
            WebApplicationContextUtils.registerWebApplicationScopes(beanFactory, this.servletContext);
            WebApplicationContextUtils.registerEnvironmentBeans(beanFactory, this.servletContext);
          }
          

          這里他注冊了一個(gè)ServletContextAwreProcessor 到beanFactory中,ServletContexAwareProcessor是一個(gè)BeanPostProcessor接口的子類。

          重頭戲BeanFactoryPostProcessor

          接下來分析AbstractApplicationContext#refresh中的invokeBeanFactoryPostProcessors方法,這個(gè)方法用來注冊和執(zhí)行BeanFactoryPostProcessor的。

          直接上源碼:

          protected void invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory beanFactory) {
            // 執(zhí)行所有的BeanFactoryPostProcessor
            PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());
          
            // Detect a LoadTimeWeaver and prepare for weaving, if found in the meantime
            // (e.g. through an @Bean method registered by ConfigurationClassPostProcessor)
            // aop的處理
            if (beanFactory.getTempClassLoader() == null && beanFactory.containsBean(LOAD_TIME_WEAVER_BEAN_NAME)) {
              beanFactory.addBeanPostProcessor(new LoadTimeWeaverAwareProcessor(beanFactory));
              beanFactory.setTempClassLoader(new ContextTypeMatchClassLoader(beanFactory.getBeanClassLoader()));
            }
          }
          

          重點(diǎn)在這里:

          PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());
          

          首先獲取BeanFactoryPostProcessor的集合,這里獲取到都是用戶在定制BeanFactory時(shí)add加入進(jìn)去的,進(jìn)入這個(gè)方法:

          public static void invokeBeanFactoryPostProcessors(
            ConfigurableListableBeanFactory beanFactory, List<BeanFactoryPostProcessor> beanFactoryPostProcessors) {
          
            // Invoke BeanDefinitionRegistryPostProcessors first, if any.
            // 已經(jīng)處理的Bean
            Set<String> processedBeans = new HashSet<>();
            // 先進(jìn)性外部BFPP的處理,并且判斷當(dāng)前Factory是否是BeanDefinitionRegistry
            if (beanFactory instanceof BeanDefinitionRegistry) {
              BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
              // 保存BFPP的Bean
              List<BeanFactoryPostProcessor> regularPostProcessors = new ArrayList<>();
              // 保存BDRPP的Bean
              List<BeanDefinitionRegistryPostProcessor> registryProcessors = new ArrayList<>();
              // 開始處理外部傳入的BFPP
              for (BeanFactoryPostProcessor postProcessor : beanFactoryPostProcessors) {
                // 先處理BDRPP
                if (postProcessor instanceof BeanDefinitionRegistryPostProcessor) {
                  BeanDefinitionRegistryPostProcessor registryProcessor =
                    (BeanDefinitionRegistryPostProcessor) postProcessor;
                  // 直接調(diào)用BDRPP的接口方法,后面的postProcessBeanFactory 方法后面統(tǒng)一處理
                  registryProcessor.postProcessBeanDefinitionRegistry(registry);
                  // 加入到BFPP的集合中
                  registryProcessors.add(registryProcessor);
                }
                else {
                  // 加入到BDRPP的集合中
                  regularPostProcessors.add(postProcessor);
                }
              }
          
              // Do not initialize FactoryBeans here: We need to leave all regular beans
              // uninitialized to let the bean factory post-processors apply to them!
              // Separate between BeanDefinitionRegistryPostProcessors that implement
              // PriorityOrdered, Ordered, and the rest.
              // 保存當(dāng)前的BDRPP
              List<BeanDefinitionRegistryPostProcessor> currentRegistryProcessors = new ArrayList<>();
          
              // First, invoke the BeanDefinitionRegistryPostProcessors that implement PriorityOrdered.
              // 按類型獲取BeanName
              String[] postProcessorNames =
                beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
              for (String ppName : postProcessorNames) {
                // 判斷當(dāng)前的beanName是都是實(shí)現(xiàn)了PriorityOrdered
                if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
                  // 加入到當(dāng)前注冊的BDRPP集合中
                  currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
                  // 加入到已經(jīng)處理的bean集合中
                  processedBeans.add(ppName);
                }
              }
              // 對(duì)當(dāng)前的BDRPP進(jìn)行排序
              sortPostProcessors(currentRegistryProcessors, beanFactory);
              // 將當(dāng)前的BDRPP全部加入到最前面定義的BDRPP的集合中
              registryProcessors.addAll(currentRegistryProcessors);
              // 執(zhí)行當(dāng)前的BDRPP的postProcessBeanDefinitionRegistry方法
              invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
              // 清空當(dāng)前的BDRPP
              currentRegistryProcessors.clear();
          
              // Next, invoke the BeanDefinitionRegistryPostProcessors that implement Ordered.
              // 再次獲取bdrpp,因?yàn)樯厦娴膱?zhí)行可能還會(huì)加入新的bdrpp進(jìn)來
              postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
              for (String ppName : postProcessorNames) {
                // 判斷是否已經(jīng)處理過,并且是否實(shí)現(xiàn)了Ordered接口
                if (!processedBeans.contains(ppName) && beanFactory.isTypeMatch(ppName, Ordered.class)) {
                  // 加入到當(dāng)前的BDRPP的集合中
                  currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
                  // 添加到已經(jīng)處理的集合中
                  processedBeans.add(ppName);
                }
              }
              // 排序
              sortPostProcessors(currentRegistryProcessors, beanFactory);
              // 加入到BDRPP集合中
              registryProcessors.addAll(currentRegistryProcessors);
              // 執(zhí)行bdrpp的postProcessBeanDefinitionRegistry方法
              invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
              // 清空當(dāng)前bdrpp集合
              currentRegistryProcessors.clear();
          
              // Finally, invoke all other BeanDefinitionRegistryPostProcessors until no further ones appear.
              boolean reiterate = true;
              // 循環(huán)去獲取BDRPP,然后進(jìn)行排序、執(zhí)行操作,直到所有的BDRPP全部執(zhí)行完
              while (reiterate) {
                reiterate = false;
                // 獲取BDRPP
                postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
                for (String ppName : postProcessorNames) {
                  // 如果已經(jīng)處理過,就執(zhí)行BDRPP,并且退出循環(huán),否則繼續(xù)循環(huán)
                  if (!processedBeans.contains(ppName)) {
                    currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
                    processedBeans.add(ppName);
                    reiterate = true;
                  }
                }
                // 排序
                sortPostProcessors(currentRegistryProcessors, beanFactory);
                // 加入到BDRPP集合中
                registryProcessors.addAll(currentRegistryProcessors);
                // 執(zhí)行bdrpp
                invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
                currentRegistryProcessors.clear();
              }
          
              // Now, invoke the postProcessBeanFactory callback of all processors handled so far.
              // 執(zhí)行bdrpp 中的postProcessBeanFactory方法
              invokeBeanFactoryPostProcessors(registryProcessors, beanFactory);
              // 執(zhí)行bfpp 中的postProcessBeanFactory方法
              invokeBeanFactoryPostProcessors(regularPostProcessors, beanFactory);
            }
          
            else {
              // 如果不是bdrpp,那么直接執(zhí)行bfpp的postProcessBeanFactory
              // Invoke factory processors registered with the context instance.
              invokeBeanFactoryPostProcessors(beanFactoryPostProcessors, beanFactory);
            }
          
            // Do not initialize FactoryBeans here: We need to leave all regular beans
            // uninitialized to let the bean factory post-processors apply to them!
            // 獲取BFPP的beanName集合
            String[] postProcessorNames =
              beanFactory.getBeanNamesForType(BeanFactoryPostProcessor.class, true, false);
          
            // Separate between BeanFactoryPostProcessors that implement PriorityOrdered,
            // Ordered, and the rest.
            // 定義實(shí)現(xiàn)了PriorityOrdered的BFPP
            List<BeanFactoryPostProcessor> priorityOrderedPostProcessors = new ArrayList<>();
            // 定義實(shí)現(xiàn)了Ordered接口的集合
            //		List<String> orderedPostProcessorNames = new ArrayList<>();
            List<BeanFactoryPostProcessor> orderedPostProcessors = new ArrayList<>();
            // 定義沒有排序的集合
            //		List<String> nonOrderedPostProcessorNames = new ArrayList<>();
            List<BeanFactoryPostProcessor> nonOrderedPostProcessors = new ArrayList<>();
            for (String ppName : postProcessorNames) {
              // 如果已經(jīng)處理過了就不做處理
              if (processedBeans.contains(ppName)) {
                // skip - already processed in first phase above
              }
              else if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
                priorityOrderedPostProcessors.add(beanFactory.getBean(ppName, BeanFactoryPostProcessor.class));
              }
              else if (beanFactory.isTypeMatch(ppName, Ordered.class)) {
                //				orderedPostProcessorNames.add(ppName);
                orderedPostProcessors.add(beanFactory.getBean(ppName,BeanFactoryPostProcessor.class));
              }
              else {
                //				nonOrderedPostProcessorNames.add(ppName);
                nonOrderedPostProcessors.add(beanFactory.getBean(ppName,BeanFactoryPostProcessor.class));
              }
            }
          
            // First, invoke the BeanFactoryPostProcessors that implement PriorityOrdered.
            // 排序
            sortPostProcessors(priorityOrderedPostProcessors, beanFactory);
            // 先執(zhí)行PriorityOrdered接口的bfpp
            invokeBeanFactoryPostProcessors(priorityOrderedPostProcessors, beanFactory);
          
            // Next, invoke the BeanFactoryPostProcessors that implement Ordered.
            // 這里將上面獲取到Ordered接口的BFPP進(jìn)行集合轉(zhuǎn)換,然后排序,然后執(zhí)行,這里其實(shí)可以直接合并,
            // 在上述進(jìn)行獲取時(shí)就放在這個(gè)orderedPostProcessors集合中
            //		List<BeanFactoryPostProcessor> orderedPostProcessors = new ArrayList<>(orderedPostProcessorNames.size());
            //		for (String postProcessorName : orderedPostProcessorNames) {
            //			orderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
            //		}
            sortPostProcessors(orderedPostProcessors, beanFactory);
            invokeBeanFactoryPostProcessors(orderedPostProcessors, beanFactory);
          
            // Finally, invoke all other BeanFactoryPostProcessors.
            // 處理沒有排序的
            //		List<BeanFactoryPostProcessor> nonOrderedPostProcessors = new ArrayList<>(nonOrderedPostProcessorNames.size());
            //		for (String postProcessorName : nonOrderedPostProcessorNames) {
            //			nonOrderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
            //		}
            invokeBeanFactoryPostProcessors(nonOrderedPostProcessors, beanFactory);
          
            // Clear cached merged bean definitions since the post-processors might have
            // modified the original metadata, e.g. replacing placeholders in values...
            // 清除緩存的元數(shù)據(jù),因?yàn)榻?jīng)過BFPP的執(zhí)行,可能BeanDefinition的屬性值已經(jīng)個(gè)變化,比如使用占位符的屬性值
            beanFactory.clearMetadataCache();
          }
          

          這個(gè)方法大概很長,實(shí)際上就做了一下這么幾點(diǎn)事情:

          • 先執(zhí)行外部傳入的BeanFactoryPostProcessor的實(shí)現(xiàn)
          • 處理時(shí)先處理BeanFactoryPostProcessor的子接口BeanDefinitionRegistryPostProcessor的實(shí)現(xiàn)
          • 處理BeanDefinitionRegistryPostProcessor實(shí)現(xiàn)的時(shí)候先處理實(shí)現(xiàn)了PriorityOrdered接口的實(shí)現(xiàn)
          • 處理完P(guān)riorityOrdered接口實(shí)現(xiàn)的類之后再處理實(shí)現(xiàn)了Ordered接口的實(shí)現(xiàn)
          • 處理完Ordered接口的實(shí)現(xiàn)類之后處理沒有排序的
          • 處理完BeanDefinitionRegistryPostProcessor的實(shí)現(xiàn)之后處理BeanFactoryPostProcessor的實(shí)現(xiàn)
          • 處理順序也是PriorityOreded,Ordered,沒有排序的

          這里大概邏輯就是這個(gè),看起來可能不是很懂,畫個(gè)流程圖:

          通過流程圖可以簡化為:先遍歷執(zhí)行外部傳入的BFPP,再執(zhí)行BDRPP,再執(zhí)行BFPP三部分,處理每一部分可能會(huì)進(jìn)行排序操作,排序按照PriorityOrdered,Ordered,noSort進(jìn)行排序再執(zhí)行。

          這里解釋下BeanDefinitionRegistryPostProcessor,這個(gè)接口是BeanFactoryPostProcessor,它里面包含一個(gè)方法叫postProcessBeanDefinitionRegistry,這個(gè)方法非常重要,在實(shí)現(xiàn)類ConfigurationClassPostProcessor中就是使用這個(gè)方法進(jìn)行注解的解析的,而且這個(gè)類也是實(shí)現(xiàn)SpringBoot自動(dòng)裝配的關(guān)鍵。

          ConfigurationClassPostProcessor這個(gè)類是什么時(shí)候加入到Spring容器的呢?

          在我們啟動(dòng)容器的時(shí)候,Spring會(huì)進(jìn)行BeanDefinition的掃描,如果我們在xml配置文件中開啟了注解掃描:

          <context:component-scan base-package="com.redwinter.test"/>
          

          那么這個(gè)時(shí)候就會(huì)自動(dòng)添加多個(gè)BeanDefinition到Spring容器中,beanName為org.springframework.context.annotation.internalConfigurationAnnotationProcessor,其他還有幾個(gè):

          前面的文章 https://www.cnblogs.com/redwinter/p/16165878.html 講到自定義標(biāo)簽,在spring解析xml時(shí)分為默認(rèn)的命名空間和自定義的命名空間的,而context就是自定義的命名空間,這個(gè)標(biāo)簽的解析器為ComponentScanBeanDefinitionParser,這個(gè)類中的parse方法就是解析邏輯處理:

          @Override
          @Nullable
          public BeanDefinition parse(Element element, ParserContext parserContext) {
            String basePackage = element.getAttribute(BASE_PACKAGE_ATTRIBUTE);
            basePackage = parserContext.getReaderContext().getEnvironment().resolvePlaceholders(basePackage);
            String[] basePackages = StringUtils.tokenizeToStringArray(basePackage,
                                                                      ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);
            // Actually scan for bean definitions and register them.
            // 配置掃描器
            ClassPathBeanDefinitionScanner scanner = configureScanner(parserContext, element);
            // 掃描BeanDefinition,在指定的包下
            Set<BeanDefinitionHolder> beanDefinitions = scanner.doScan(basePackages);
            // 注冊組件
            registerComponents(parserContext.getReaderContext(), beanDefinitions, element);
          
            return null;
          }
          

          這個(gè)方法執(zhí)行流程:

          • 創(chuàng)建一個(gè)配置掃描器
          • 掃描指定包下標(biāo)有注解的類并解析為BeanDefinition
          • 執(zhí)行registerComponents方法,注冊組件

          registerComponents方法里面就是添加ConfigurationClassPostProcessor的地方,由于代碼太多這里只貼部分代碼:

          // ...省略部分代碼
          Set<BeanDefinitionHolder> beanDefs = new LinkedHashSet<>(8);
          		// 判斷注冊器中個(gè)是否包含org.springframework.context.annotation.internalConfigurationAnnotationProcessor
          		// 不包含就加入一個(gè)ConfigurationClassPostProcessor的BeanDefinition
          		// 用于解析注解
          		if (!registry.containsBeanDefinition(CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME)) {
          			// 創(chuàng)建一個(gè)BeanDefinition為ConfigurationClassPostProcessor
          			RootBeanDefinition def = new RootBeanDefinition(ConfigurationClassPostProcessor.class);
          			def.setSource(source);
          			// 注冊一個(gè)beanName為org.springframework.context.annotation.internalConfigurationAnnotationProcessor
          			// 的BeanDefinition,class為ConfigurationClassPostProcessor
          			beanDefs.add(registerPostProcessor(registry, def, CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME));
          		}
          		// 創(chuàng)建一個(gè)AutowiredAnnotationBeanPostProcessor的BeanDefinition
          		// 用于自動(dòng)裝配
          		if (!registry.containsBeanDefinition(AUTOWIRED_ANNOTATION_PROCESSOR_BEAN_NAME)) {
          			RootBeanDefinition def = new RootBeanDefinition(AutowiredAnnotationBeanPostProcessor.class);
          			def.setSource(source);
          			beanDefs.add(registerPostProcessor(registry, def, AUTOWIRED_ANNOTATION_PROCESSOR_BEAN_NAME));
          		}
          // ...省略部分代碼
          

          源碼中注冊了一個(gè)beanName為CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME常量的名字,這個(gè)常量就是org.springframework.context.annotation.internalConfigurationAnnotationProcessor,class為ConfigurationClassPostProcessor

          那注解的解析是如何進(jìn)行解析的呢?由于篇幅過長,下一篇再來解析。


          如果本文對(duì)你有幫助,別忘記給我個(gè)3連 ,點(diǎn)贊,轉(zhuǎn)發(fā),評(píng)論,,咱們下期見。

          收藏 等于白嫖,點(diǎn)贊才是真情。





          原文 https://www.cnblogs.com/redwinter/p/16196359.html

          點(diǎn)

          做過微信或支付寶支付的童鞋,可能遇到過這種問題,就是填寫支付結(jié)果回調(diào),就是在支付成功之后,支付寶要根據(jù)我們給的地址給我們進(jìn)行通知,通知我們用戶是否支付成功,如果成功我們就要去處理下面相應(yīng)的業(yè)務(wù)邏輯,如果在測試服務(wù),那么這個(gè)回調(diào)地址我們就需要填寫測試服務(wù)的,如果發(fā)布到線上那么我們就需要改成線上的地址。

          針對(duì)上面的場景,我們一般都會(huì)通過如下的方式,進(jìn)行一個(gè)動(dòng)態(tài)配置,不需要每次去改,防止出現(xiàn)問題。

          public class PayTest {
          
              @Value("${spring.profiles.active}")
              private String environment;
          
              public Object notify(HttpServletRequest request) {
          
                  if ("prod".equals(environment)) {
                      // 正式環(huán)境
                  } else if ("test".equals(environment)) {
          
                      // 測試環(huán)境
                  }
                  return "SUCCESS";
              }
          }
          復(fù)制代碼

          上面的代碼看起來沒有一點(diǎn)問題,但是身為搬磚的我們咋可能這樣搬,姿勢不對(duì)呀!

          問題:

          擴(kuò)展性太差,如果這個(gè)參數(shù)我們還需要在別的地方用到,那么我們是不是還要使用@Value的注解獲取一遍,假如有天我們的leader突然說嗎,test這個(gè)單詞看著太low了,換個(gè)高端一點(diǎn)的,換成dev,那么我們是不是要把項(xiàng)目中所有的test都要改過來,如果少還好,要是很多,那我們怕不是涼了。

          所以我們能不能將這些配置參數(shù)搞成一個(gè)全局的靜態(tài)變量,這樣的話我們直接飲用就好了,哪怕到時(shí)候真的要改,那我也只需要改動(dòng)一處就好了。

          注意大坑

          有的朋友可能就比較自信了,那我直接加個(gè)static修飾下不就好了,如果你真是打算這樣做,那你就準(zhǔn)備卷好鋪蓋走人吧。直接加static獲取到的值其實(shí)是一個(gè)null,至于原因,大家復(fù)習(xí)下類以及靜態(tài)變量變量的加載順序。

          @PostConstruct注解

          那么既然說出了問題,肯定就有解決方法,不然你以為我跟你玩呢。

          首先這個(gè)注解是由Java提供的,它用來修飾一個(gè)非靜態(tài)的void方法。它會(huì)在服務(wù)器加載Servlet的時(shí)候運(yùn)行,并且只運(yùn)行一次

          改造:

          @Component
          public class SystemConstant {
          
              public static String surroundings;
          
              @Value("${spring.profiles.active}")
              public String environment;
          
              @PostConstruct
              public void initialize() {
                  System.out.println("初始化環(huán)境...");
                  surroundings = this.environment;
              }
          }
          復(fù)制代碼

          結(jié)果:

          我們可以看到在項(xiàng)目啟動(dòng)的時(shí)候進(jìn)行了初始化


          到這里我們已經(jīng)可以拿到當(dāng)前運(yùn)行的環(huán)境是測試還是正式,這樣就可以做到動(dòng)態(tài)配置


          最后想說

          其實(shí)這個(gè)注解遠(yuǎn)不止這點(diǎn)用處,像我之前寫的Redis工具類,我使用的是RedisTemplate操作Redis,導(dǎo)致寫出來的方法沒辦法用static修飾,每次使用Redis工具類只能先注入到容器然后再調(diào)用,使用了這個(gè)注解就可以完美的解決這種尷尬的問題。代碼如下。

          - 簡介

          這個(gè)系列是我學(xué)習(xí)Flink之后,想到加強(qiáng)一下我的FLink能力,所以開始一系列的自己擬定業(yè)務(wù)場景,進(jìn)行開發(fā)。

          這里更類似筆記,而不是教學(xué),所以不會(huì)特別細(xì)致,敬請諒解。

          這里是實(shí)戰(zhàn)的,具體一些環(huán)境,代碼基礎(chǔ)知識(shí)不會(huì)講解,例如docker,flink語法之類的,看情況做具體講解,所以需要一些技術(shù)門檻。

          2 - 準(zhǔn)備

          • flink - 1.12.0
          • elasticsearch - 7.12
          • kafka - 2.12-2.5.0
          • kibana - 7.12
          • filebeat - 7.12

          這里就不做下載地址的分享了,大家自行下載吧。

          3 - 代碼

          Flink代碼

          maven pom依賴,別問為啥這么多依賴,問我就說不知道,你就復(fù)制吧。

          <?xml version="1.0" encoding="UTF-8"?>
          <project xmlns="http://maven.apache.org/POM/4.0.0"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
              <modelVersion>4.0.0</modelVersion>
          
              <groupId>com.iolo</groupId>
              <artifactId>flink_study</artifactId>
              <version>1.0.0</version>
              <!-- 指定倉庫位置,依次為aliyun、apache和cloudera倉庫 -->
              <repositories>
                  <repository>
                      <id>aliyun</id>
                      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
                  </repository>
                  <repository>
                      <id>apache</id>
                      <url>https://repository.apache.org/content/repositories/snapshots/</url>
                  </repository>
                  <repository>
                      <id>cloudera</id>
                      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
                  </repository>
              </repositories>
          
              <properties>
                  <encoding>UTF-8</encoding>
                  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                  <maven.compiler.source>1.8</maven.compiler.source>
                  <maven.compiler.target>1.8</maven.compiler.target>
                  <java.version>1.8</java.version>
                  <scala.version>2.12</scala.version>
                  <flink.version>1.12.0</flink.version>
              </properties>
              <dependencies>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-clients_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-scala_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-java</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-streaming-scala_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-streaming-java_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-api-java-bridge_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <!-- blink執(zhí)行計(jì)劃,1.11+默認(rèn)的-->
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-planner-blink_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-common</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <!-- flink連接器-->
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-connector-kafka_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-sql-connector-kafka_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-connector-jdbc_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-csv</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-json</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
          
                  <dependency>
                      <groupId>org.apache.bahir</groupId>
                      <artifactId>flink-connector-redis_2.11</artifactId>
                      <version>1.0</version>
                      <exclusions>
                          <exclusion>
                              <artifactId>flink-streaming-java_2.11</artifactId>
                              <groupId>org.apache.flink</groupId>
                          </exclusion>
                          <exclusion>
                              <artifactId>flink-runtime_2.11</artifactId>
                              <groupId>org.apache.flink</groupId>
                          </exclusion>
                          <exclusion>
                              <artifactId>flink-core</artifactId>
                              <groupId>org.apache.flink</groupId>
                          </exclusion>
                          <exclusion>
                              <artifactId>flink-java</artifactId>
                              <groupId>org.apache.flink</groupId>
                          </exclusion>
                      </exclusions>
                  </dependency>
          
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-connector-hive_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
          
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-shaded-hadoop-2-uber</artifactId>
                      <version>2.7.5-10.0</version>
                  </dependency>
          
                  <dependency>
                      <groupId>mysql</groupId>
                      <artifactId>mysql-connector-java</artifactId>
                      <version>5.1.38</version>
                  </dependency>
          
                  <!-- 日志 -->
                  <dependency>
                      <groupId>org.slf4j</groupId>
                      <artifactId>slf4j-log4j12</artifactId>
                      <version>1.7.7</version>
                      <scope>runtime</scope>
                  </dependency>
                  <dependency>
                      <groupId>log4j</groupId>
                      <artifactId>log4j</artifactId>
                      <version>1.2.17</version>
                      <scope>runtime</scope>
                  </dependency>
          
                  <dependency>
                      <groupId>com.alibaba</groupId>
                      <artifactId>fastjson</artifactId>
                      <version>1.2.44</version>
                  </dependency>
          
                  <dependency>
                      <groupId>org.projectlombok</groupId>
                      <artifactId>lombok</artifactId>
                      <version>1.18.2</version>
                      <scope>provided</scope>
                  </dependency>
          
                  <dependency>
                      <groupId>com.squareup.okhttp3</groupId>
                      <artifactId>okhttp</artifactId>
                      <version>4.9.1</version>
                  </dependency>
          
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
          
              </dependencies>
          
              <build>
                  <sourceDirectory>src/main/java</sourceDirectory>
                  <plugins>
                      <!-- 編譯插件 -->
                      <plugin>
                          <groupId>org.apache.maven.plugins</groupId>
                          <artifactId>maven-compiler-plugin</artifactId>
                          <version>3.5.1</version>
                          <configuration>
                              <source>1.8</source>
                              <target>1.8</target>
                              <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                          </configuration>
                      </plugin>
                      <plugin>
                          <groupId>org.apache.maven.plugins</groupId>
                          <artifactId>maven-surefire-plugin</artifactId>
                          <version>2.18.1</version>
                          <configuration>
                              <useFile>false</useFile>
                              <disableXmlReport>true</disableXmlReport>
                              <includes>
                                  <include>**/*Test.*</include>
                                  <include>**/*Suite.*</include>
                              </includes>
                          </configuration>
                      </plugin>
                      <!-- 打包插件(會(huì)包含所有依賴) -->
                      <plugin>
                          <groupId>org.apache.maven.plugins</groupId>
                          <artifactId>maven-shade-plugin</artifactId>
                          <version>2.3</version>
                          <executions>
                              <execution>
                                  <phase>package</phase>
                                  <goals>
                                      <goal>shade</goal>
                                  </goals>
                                  <configuration>
                                      <filters>
                                          <filter>
                                              <artifact>*:*</artifact>
                                              <excludes>
                                                  <!--
                                                  zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                                  <exclude>META-INF/*.SF</exclude>
                                                  <exclude>META-INF/*.DSA</exclude>
                                                  <exclude>META-INF/*.RSA</exclude>
                                              </excludes>
                                          </filter>
                                      </filters>
                                      <transformers>
                                          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                              <!-- 設(shè)置jar包的入口類(可選) -->
                                              <mainClass></mainClass>
                                          </transformer>
                                      </transformers>
                                  </configuration>
                              </execution>
                          </executions>
                      </plugin>
                  </plugins>
              </build>
          
          </project>

          下面是flink的,具體講解都在代碼里

          package com.iolo.flink.cases;
          
          import com.alibaba.fastjson.JSONObject;
          import com.iolo.common.util.DingDingUtil;
          import org.apache.flink.api.common.RuntimeExecutionMode;
          import org.apache.flink.api.common.functions.FlatMapFunction;
          import org.apache.flink.api.common.functions.RuntimeContext;
          import org.apache.flink.api.common.serialization.SimpleStringSchema;
          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.shaded.hadoop2.com.google.gson.Gson;
          import org.apache.flink.streaming.api.datastream.DataStreamSource;
          import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
          import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
          import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
          import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
          import org.apache.flink.util.Collector;
          import org.apache.http.HttpHost;
          import org.elasticsearch.action.index.IndexRequest;
          import org.elasticsearch.client.Requests;
          
          import java.util.ArrayList;
          import java.util.HashMap;
          import java.util.List;
          import java.util.Map;
          import java.util.Properties;
          
          /**
           * @author iolo
           * @date 2021/3/17
           * 監(jiān)控日志實(shí)時(shí)報(bào)警
           * <p>
           * 準(zhǔn)備環(huán)境
           * 1 - flink 1.12
           * 2 - kafka
           * 3 - filebeat
           * 4 - Springboot服務(wù)(可以產(chǎn)生各類級(jí)別日志的接口)
           * 5 - es+kibana
           * <p>
           * filebeat 監(jiān)控Springboot服務(wù)日志 提交給kafka(主題sever_log_to_flink_consumer)
           * flink消費(fèi)kafka主題日志 ,整理收集,如果遇到error日志發(fā)送郵件,或者發(fā)釘釘(這里可以調(diào)用Springboot服務(wù),或者直接flink發(fā)送)
           * 然后將所有日志存入es 進(jìn)行 kibana分析
           **/
          public class case_1_kafka_es_log {
              public static void main(String[] args) throws Exception {
                  // TODO env
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
          
                  Properties props = new Properties();
                  //集群地址
                  props.setProperty("bootstrap.servers", "127.0.0.1:9092");
                  //消費(fèi)者組id
                  props.setProperty("group.id", "test-consumer-group");
                  //latest有offset記錄從記錄位置開始消費(fèi),沒有記錄從最新的/最后的消息開始消費(fèi)
                  //earliest有offset記錄從記錄位置開始消費(fèi),沒有記錄從最早的/最開始的消息開始消費(fèi)
                  props.setProperty("auto.offset.reset", "latest");
                  //會(huì)開啟一個(gè)后臺(tái)線程每隔5s檢測一下Kafka的分區(qū)情況,實(shí)現(xiàn)動(dòng)態(tài)分區(qū)檢測
                  props.setProperty("flink.partition-discovery.interval-millis", "5000");
                  //自動(dòng)提交(提交到默認(rèn)主題,后續(xù)學(xué)習(xí)了Checkpoint后隨著Checkpoint存儲(chǔ)在Checkpoint和默認(rèn)主題中)
                  props.setProperty("enable.auto.commit", "true");
                  //自動(dòng)提交的時(shí)間間隔
                  props.setProperty("auto.commit.interval.ms", "2000");
          
                  // TODO source
                  FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>("sever_log_to_flink_consumer", new SimpleStringSchema(), props);
          
                  DataStreamSource<String> ds = env.addSource(source);
          
                  // TODO transformation
                  SingleOutputStreamOperator<Tuple3<String, String, String>> result = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, String>>() {
                      @Override
                      public void flatMap(String s, Collector<Tuple3<String, String, String>> collector) throws Exception {
                          JSONObject json = (JSONObject) JSONObject.parse(s);
                          String timestamp = json.getString("@timestamp");
                          String message = json.getString("message");
                          String[] split = message.split(" ");
                          String level = split[3];
                          if ("[ERROR]".equalsIgnoreCase(level)) {
                              System.out.println("error!");
                              DingDingUtil.dingdingPost("error");
                          }
          
                          collector.collect(Tuple3.of(timestamp, level, message));
                      }
                  });
          
                  // TODO sink
                  result.print();
          
                  /**
                   * https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/elasticsearch.html
                   */
                  List<HttpHost> httpHosts = new ArrayList<>();
                  httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
                  ElasticsearchSink.Builder<Tuple3<String, String, String>> esSinkBuilder = new ElasticsearchSink.Builder<>(
                          httpHosts,
                          new ElasticsearchSinkFunction<Tuple3<String, String, String>>() {
                              @Override
                              public void process(Tuple3<String, String, String> stringStringStringTuple3, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                                  Map<String, String> json = new HashMap<>();
                                  json.put("@timestamp", stringStringStringTuple3.f0);
                                  json.put("level", stringStringStringTuple3.f1);
                                  json.put("message", stringStringStringTuple3.f2);
                                  IndexRequest item = Requests.indexRequest()
                                          .index("my-log")
                                          .source(json);
                                  requestIndexer.add(item);
                              }
                          });
                  esSinkBuilder.setBulkFlushMaxActions(1);
                  result.addSink(esSinkBuilder.build());
                  // TODO execute
                  env.execute("case_1_kafka_es_log");
              }
          }

          其中為了告警通知,做了個(gè)釘釘自定義機(jī)器人通知,需要的可以去百度查看一下,很方便。

          https://developers.dingtalk.com/document/app/custom-robot-access/title-jfe-yo9-jl2

          package com.iolo.common.util;
          
          import lombok.extern.slf4j.Slf4j;
          import okhttp3.MediaType;
          import okhttp3.OkHttpClient;
          import okhttp3.Request;
          import okhttp3.RequestBody;
          import okhttp3.Response;
          
          import java.io.IOException;
          
          /**
           * @author iolo
           * @date 2021/3/30
           * https://developers.dingtalk.com/document/app/custom-robot-access/title-jfe-yo9-jl2
           **/
          @Slf4j
          public class DingDingUtil {
              private static final String url = "https://oapi.dingtalk.com/robot/send?access_token=你自己的token替換";
          
              /**
               * 秘鑰token
               *
               * @param
               * @return java.lang.String
               * @author fengxinxin
               * @date 2021/3/30 下午5:03
               **/
              public static void dingdingPost(String text) throws Exception {
                  MediaType JSON = MediaType.parse("application/json");
                  OkHttpClient client = new OkHttpClient();
                  String json = "{\"msgtype\": \"text\",\"text\": {\"content\": \"FlinkLog:" + text + "\"}}";
                  RequestBody body = RequestBody.create(JSON, json);
                  Request request = new Request.Builder()
                          .url(url)
                          .post(body)
                          .build();
                  try (Response response = client.newCall(request).execute()) {
                      String responseBody = response.body().string();
                      log.info(responseBody);
          
                  } catch (IOException e) {
                      log.error(e.getMessage());
                  }
              }
          }

          然后可以直接在控制面板直接啟動(dòng)這個(gè)main方法


          Springboot

          gitee地址直接下載,不做詳細(xì)講解

          接口地址 http://127.0.0.1:8080/test/log?level=error&count=10


          Kafka

          操作命令,這些命令都是在Kafka里的bin目錄下,Zookeeper是kafka自帶的那個(gè)

          # Zookeeper 啟動(dòng)命令
          ./zookeeper-server-start.sh ../config/zookeeper.properties
          # Kafka 啟動(dòng)命令
          ./kafka-server-start.sh ../config/server.properties
          # 創(chuàng)建 topic sever_log_to_flink_consumer
          ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sever_log_to_flink_consumer
          # 查看是否創(chuàng)建成功
          ./kafka-topics.sh --list --zookeeper localhost:2181
          # 這是生產(chǎn)者
          ./kafka-console-producer.sh --broker-list localhost:9092 --topic sever_log_to_flink_consumer
          # 這是消費(fèi)者
          ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sever_log_to_flink_consumer --from-beginning

          Elasticsearch

          這里開始使用docker,具體環(huán)境可以自行搭建,并且以后docker的場景會(huì)越來越多,直接上命令。

          docker run \
          --name fxx-es \
          -p 9200:9200 \
          -p 9300:9300 \
          -v /Users/iOLO/dev/docker/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
          -e "discovery.type=single-node" \
          docker.elastic.co/elasticsearch/elasticsearch:7.12.0

          驗(yàn)證


          Kibana

          docker run \
          --name fxx-kibana \
          --link fxx-es:elasticsearch \
          -p 5601:5601 \
          docker.elastic.co/kibana/kibana:7.12.0

          我這里去容器內(nèi)部設(shè)置中文,你可以不做

          設(shè)置的方法,在配置文件kibana.yml增加i18n.locale: "zh-CN"

          驗(yàn)證 地址 是 127.0.0.1:5601

          具體操作的時(shí)候進(jìn)行圖文講解

          Filebeat

          下載地址 https://www.elastic.co/cn/downloads/beats/filebeat

          選擇自己電腦環(huán)境進(jìn)行下載,我是MAC

          解壓之后修改配置文件里,直接上配置文件

          # ============================== Filebeat inputs ===============================
          
          filebeat.inputs:
          
          # Each - is an input. Most options can be set at the input level, so
          # you can use different inputs for various configurations.
          # Below are the input specific configurations.
          
          - type: log
          
            # Change to true to enable this input configuration. 這里是需要修改
            enabled: true
          
            # Paths that should be crawled and fetched. Glob based paths. 這里改成你本地下載那個(gè)Springboot的log文件地址
            paths:
              - /Users/iOLO/dev/Java/flinklog/logs/flink-log.*.log
          
          # ------------------------------ Kafka Output -------------------------------
          output.kafka:
            # initial brokers for reading cluster metadata kafka的連接地址,這是直接從官網(wǎng)粘貼過來的,
            # https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
            hosts: ["127.0.01:9092"]
          
            # message topic selection + partitioning 然后就是消費(fèi)topic,其他都是官網(wǎng)的默認(rèn)值,我就沒做改動(dòng)
            topic: 'sever_log_to_flink_consumer'
            partition.round_robin:
              reachable_only: false
          
            required_acks: 1
            compression: gzip
            max_message_bytes: 1000000


          4 - 實(shí)戰(zhàn)

          環(huán)境和程序都準(zhǔn)備好了之后,別忘了啟動(dòng)Springboot服務(wù)

          然后通過請求接口服務(wù) 127.0.0.1:8080/test/log?level=error&count=10 來產(chǎn)生日志

          通過查看釘釘 看是否有報(bào)警信息


          釘釘成功!!!

          然后就是Kibana的操作

          直接上結(jié)果頁面

          然后就是操作步驟

          第一先去es選擇index

          第二步根據(jù)紅框點(diǎn)擊進(jìn)去es查詢index頁面

          最后在輸入框里查詢你剛才的index ,咱們的代碼就是my-index,根據(jù)提示進(jìn)行下一步,我這里已經(jīng)創(chuàng)建過了,所以就不再演示。最后就可以會(huì)有之前頁面的效果。

          5 - 結(jié)束語

          整體就是這樣,很多人肯定會(huì)提出質(zhì)疑,說直接filebeat+ELK 也能完成這類效果,好的,你別杠,我這是學(xué)習(xí)flink之后,然后自己出業(yè)務(wù)場景,進(jìn)行flink的實(shí)戰(zhàn)總結(jié),如果你有更好的方案,就干你的。

          然后如果大家有啥想要的,遇到的場景,都可以提出來,我會(huì)斟酌后進(jìn)行采納進(jìn)行實(shí)戰(zhàn)實(shí)施。

          最后感謝閱讀。


          主站蜘蛛池模板: 性无码一区二区三区在线观看| 国产乱子伦一区二区三区| 人妻体内射精一区二区三区| 无码日韩精品一区二区免费| 中文无码AV一区二区三区| 精品少妇人妻AV一区二区三区| 国产一区三区二区中文在线| 国产精品va一区二区三区| 久久久久人妻一区精品色| 精品国产日产一区二区三区 | 高清一区二区三区免费视频| 日本精品一区二区三区在线视频一 | 久久99精品一区二区三区| 亚洲国产综合无码一区二区二三区| 精品一区狼人国产在线| 日韩精品一区在线| 另类ts人妖一区二区三区| 成人免费视频一区| 日韩三级一区二区| 波多野结衣精品一区二区三区 | 精品国产福利一区二区| 日韩人妻一区二区三区免费| 搡老熟女老女人一区二区| 蜜桃传媒视频麻豆第一区| 国产精品特级毛片一区二区三区| 精品日产一区二区三区手机| 亚洲熟女一区二区三区| 色久综合网精品一区二区| 一本大道在线无码一区| 日本精品高清一区二区2021| 国产丝袜一区二区三区在线观看| 秋霞电影网一区二区三区| 国产婷婷一区二区三区| 好看的电影网站亚洲一区| 99精品国产一区二区三区2021 | 99精品国产高清一区二区麻豆| 亚洲无码一区二区三区| 精品乱码一区二区三区在线| 精品成人一区二区三区四区| 日本不卡一区二区三区| 3D动漫精品啪啪一区二区下载|