分享

工作经验分享:Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索

问题导读:
1、一个完整的集中式日志系统,包含几个主要特点?
2、如何添加 Kafka 集群 broker list?
3、如何编写pulsar-kafka 脚本文件?
4、如何编写pulsar-daemon 脚本文件?



关于 Apache Pulsar

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。

当前已有众多国内外大型互联网和传统行业公司采用 Apache Pulsar,案例分布在人工智能、金融、电信运营商、直播与短视频、物联网、零售与电子商务、在线教育等多个行业,如美国有线电视网络巨头 Comcast、Yahoo!、腾讯、中国电信、中国移动、BIGO、VIPKID 等。

背景介绍

Apache Pulsar 作为一个云原生的分布式消息系统,包括 Zookeeper、bookie、broker、functions-worker、proxy 等多个组件,并且所有的组件以分布式的方式部署在多台主机上,因此每个组件的日志文件也就分散在多台主机上,当组件出现问题时,由于日志比较分散,想要检查各个服务是否有报错信息,需要挨个服务去排查,比较麻烦,通常我们的做法是直接对日志文件进行 grep、awk 等命令就可以获得想要的信息。但是,随着应用和服务体量的增加,支撑的节点也随之增加,那么传统方法暴露出很多问题,比如:效率低下、日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询等等。所以我们希望通过对日志进行聚合、监控,能够快速的找到 Pulsar 各个服务的报错信息,快速的排查,使得运维更加具有目的性、针对性和直接性。

为了解决日志检索的问题,我们团队考虑使用集中式日志收集系统,将 Pulsar 所有节点上的日志统一收集,管理,访问。

一个完整的集中式日志系统,需要包含以下几个主要特点:

•收集-能够采集多种来源的日志数据;•传输-能够稳定的把日志数据传输到中央系统;•存储-如何存储日志数据;•分析-可以支持 UI 分析;•警告-能够提供错误报告,监控机制.

ELK 提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用,是目前主流的一种日志系统。我们公司拥有自研的大数据管理平台,通过大数据管理平台部署和管理 ELK,并且在生产系统中已经使用ELK 为多个业务系统提供了支撑服务。 ELK 是三个开源软件的缩写,分别表示:Elasticsearch、Logstash、Kibana , 它们都是开源软件,最新版本已经改名为 Elastic Stack,并新增了 Beats项目,其中包括 FileBeat,它是一个轻量级的日志收集处理工具 (Agent),Filebeat 占用资源少,适合于在各个服务器上搜集日志后传输给 Logstash。


2021-01-13_191451.jpg

在上图中可以看到,如果 Pulsar 使用这种日志采集模式存在两个问题:

•部署了 Pulsar 服务的主机必须部署一套 Filebeat 服务;
•Pulsar 服务的日志必须以文件的方式落一次磁盘,占用了主机磁盘的 IO。

为此,我们考虑 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索,Log4j2 默认支持将日志发送到 Kafka 的功能,使用 Kafka 自带的 Log4j2Appender,在 Log4j2 配置文件中进行相应的配置,即可完成将 Log4j2 产生的日志实时发送至 Kafka 中。

如下图所示:
2021-01-13_191525.jpg

实施过程

下面以 Pulsar 2.6.2 版本为例,介绍 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索的解决方案的详细的实施过程。


一、准备工作

首先需要确定的是在 Kibana 中用于检索日志的字段,可以对这些字段聚合、多维度查询,然后,Elasticsearch 根据检索字段进行分词,并创建索引。
2021-01-13_191600.jpg

如上图所示:我们将对 Pulsar 的日志建立了 8 个检索字段,分别是:集群名、主机名、主机IP、组件名、日志内容、系统时间、日志级别、集群实例。


二、实施过程

说明:为了保证 Pulsar 原生的配置文件和脚本文件的结构不被破坏,我们通过添加新的配置文件和脚本文件来实现此方案。

1. 添加配置文件

在 {PULSAR_HOME}/conf 目录中添加以下两个配置文件:

1)logenv.sh
该文件是将Pulsar 组件启动时需要的 JVM 选项以配置的方式传递到 Pulsar 服务的 Java 进程中,内容示例如下:

  1. KAFKA_CLUSTER=192.168.0.1:9092,192.168.0.2:9092,192.168.0.2:9092
  2. PULSAR_CLUSTER=pulsar_cluster
  3. PULSAR_TOPIC=pulsar_topic
  4. HOST_IP=192.168.0.1
  5. PULSAR_MODULE_INSTANCE_ID=1
复制代码

以上这些字段的意义分别是:

•KAFKA_CLUSTER:Kafka broker list 地址;•PULSAR_CLUSTER:Pulsar 的集群名称;•PULSAR_TOPIC:Kafka 中用于接入 Pulsar 服务日志的 Topic;•HOST_IP:Pulsar 主机的 IP;•PULSAR_MODULE_INSTANCE_ID:Pulsar 服务的实例标识,一个主机上可能会部署多个 Pulsar 集群,集群间通过实例标识来区分。

2)log4j2-kafka.yaml
该配置文件是从 log4j2.yaml 复制而来,在 log4j2.yaml 的基础上添加以下修改: (说明:下图中左侧为 log4j2.yaml,右侧为 log4j2-kafka.yaml。)

•添加 Kafka 集群 broker list,并定义 log4j2 写到 Kafka 中的消息记录格式,一条消息中的 8 个检索字段以空格分割,Elasticsearch 以空格作为分割符对 8 个检索字段进行分词。

2021-01-13_191638.jpg

2021-01-13_191706.jpg

•log4j2-kafka.yaml 配置文件的完整内容如下:

  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements.  See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership.  The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License.  You may obtain a copy of the License at
  9. #
  10. #   http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied.  See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. #
  19. Configuration:
  20.   status: INFO
  21.   monitorInterval: 30
  22.   name: pulsar
  23.   packages: io.prometheus.client.log4j2
  24.   Properties:
  25.     Property:
  26.       - name: "pulsar.log.dir"
  27.         value: "logs"
  28.       - name: "pulsar.log.file"
  29.         value: "pulsar.log"
  30.       - name: "pulsar.log.appender"
  31.         value: "RoutingAppender"
  32.       - name: "pulsar.log.root.level"
  33.         value: "info"
  34.       - name: "pulsar.log.level"
  35.         value: "info"
  36.       - name: "pulsar.routing.appender.default"
  37.         value: "Console"
  38.       - name: "kafkaBrokers"
  39.         value: "${sys:kafka.cluster}"
  40.       - name: "pattern"
  41.         value: "${sys:pulsar.cluster} ${sys:pulsar.hostname} ${sys:pulsar.hostip} ${sys:pulsar.module.type} ${sys:pulsar.module.instanceid} %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%c{10}] %level , %msg%n"
  42.   # Example: logger-filter script
  43.   Scripts:
  44.     ScriptFile:
  45.       name: filter.js
  46.       language: JavaScript
  47.       path: ./conf/log4j2-scripts/filter.js
  48.       charset: UTF-8
  49.   Appenders:
  50.     #Kafka
  51.     Kafka:
  52.       name: "pulsar_kafka"
  53.       topic: "${sys:pulsar.topic}"
  54.       ignoreExceptions: "false"
  55.       PatternLayout:
  56.         pattern: "${pattern}"
  57.       Property:
  58.         - name: "bootstrap.servers"
  59.           value: "${kafkaBrokers}"
  60.         - name: "max.block.ms"
  61.           value: "2000"
  62.     # Console
  63.     Console:
  64.       name: Console
  65.       target: SYSTEM_OUT
  66.       PatternLayout:
  67.         Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
  68.     Failover:
  69.       name: "Failover"
  70.       primary: "pulsar_kafka"
  71.       retryIntervalSeconds: "600"
  72.       Failovers:
  73.         AppenderRef:
  74.           ref: "RollingFile"
  75.     # Rolling file appender configuration
  76.     RollingFile:
  77.       name: RollingFile
  78.       fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
  79.       filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
  80.       immediateFlush: false
  81.       PatternLayout:
  82.         Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
  83.       Policies:
  84.         TimeBasedTriggeringPolicy:
  85.           interval: 1
  86.           modulate: true
  87.         SizeBasedTriggeringPolicy:
  88.           size: 1 GB
  89.       # Delete file older than 30days
  90.       DefaultRolloverStrategy:
  91.           Delete:
  92.             basePath: ${sys:pulsar.log.dir}
  93.             maxDepth: 2
  94.             IfFileName:
  95.               glob: "*/${sys:pulsar.log.file}*log.gz"
  96.             IfLastModified:
  97.               age: 30d
  98.     Prometheus:
  99.       name: Prometheus
  100.     # Routing
  101.     Routing:
  102.       name: RoutingAppender
  103.       Routes:
  104.         pattern: "${ctx:function}"
  105.         Route:
  106.           -
  107.             Routing:
  108.               name: InstanceRoutingAppender
  109.               Routes:
  110.                 pattern: "${ctx:instance}"
  111.                 Route:
  112.                   -
  113.                     RollingFile:
  114.                       name: "Rolling-${ctx:function}"
  115.                       fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/${ctx:functionname}-${ctx:instance}.log"
  116.                       filePattern : "${sys:pulsar.log.dir}/functions/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz"
  117.                       PatternLayout:
  118.                         Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
  119.                       Policies:
  120.                         TimeBasedTriggeringPolicy:
  121.                           interval: 1
  122.                           modulate: true
  123.                         SizeBasedTriggeringPolicy:
  124.                           size: "20MB"
  125.                         # Trigger every day at midnight that also scan
  126.                         # roll-over strategy that deletes older file
  127.                         CronTriggeringPolicy:
  128.                           schedule: "0 0 0 * * ?"
  129.                       # Delete file older than 30days
  130.                       DefaultRolloverStrategy:
  131.                           Delete:
  132.                             basePath: ${sys:pulsar.log.dir}
  133.                             maxDepth: 2
  134.                             IfFileName:
  135.                               glob: "*/${sys:pulsar.log.file}*log.gz"
  136.                             IfLastModified:
  137.                               age: 30d
  138.                   - ref: "${sys:pulsar.routing.appender.default}"
  139.                     key: "${ctx:function}"
  140.           - ref: "${sys:pulsar.routing.appender.default}"
  141.             key: "${ctx:function}"
  142.   Loggers:
  143.     # Default root logger configuration
  144.     AsyncRoot:
  145.       level: "${sys:pulsar.log.root.level}"
  146.       additivity: true
  147.       AppenderRef:
  148.         - ref: "Failover"
  149.           level: "${sys:pulsar.log.level}"
  150.         - ref: Prometheus
  151.           level: info
  152.     AsyncLogger:
  153.       - name: org.apache.bookkeeper.bookie.BookieShell
  154.         level: info
  155.         additivity: false
  156.         AppenderRef:
  157.           - ref: Console
  158.       - name: verbose
  159.         level: info
  160.         additivity: false
  161.         AppenderRef:
  162.           - ref: Console
  163.     # Logger to inject filter script
  164. #     - name: org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
  165. #       level: debug
  166. #       additivity: false
  167. #       AppenderRef:
  168. #         ref: "${sys:pulsar.log.appender}"
  169. #         ScriptFilter:
  170. #           onMatch: ACCEPT
  171. #           onMisMatch: DENY
  172. #           ScriptRef:
  173. #             ref: filter.js
复制代码


注意事项:

•日志接入必须异步,绝对不能影响服务性能;•响应要求比较高的系统接入第三方系统,必须依赖解耦,此处的 Failover Appender 就是解耦对 Kafka 的依赖,当 Kafka Crash 时,日志触发 Failover,写本地即可;•log4j2 Failover appender retryIntervalSeconds 的默认值是 1 分钟,是通过异常来切换的,所以可以适量加大间隔,比如上面的 10分钟;•Kafka appender ignoreExceptions 必须设置为 false,否则无法触发 Failover;•这里有个比较大的坑是 max.block.ms Property,KafkaClient 包里默认值是 60000ms,当 Kafka 宕机时,尝试写 Kafka 需要 1 分钟才能返回 Exception,之后才会触发 Failover,当请求量大时,log4j2 队列很快就会打满,之后写日志就 Blocking,严重影响到主服务响应。所以要设置足够短,队列长度足够长。

2、添加脚本文件

在 {PULSAR_HOME}/bin 目录中添加以下两个脚本文件:


1)pulsar-kafka
该脚本文件是从 pulsar 脚本文件复制而来,在 pulsar 脚本文件的基础上添加如下修改: (说明:下图中左侧为 pulsar,右侧为 pulsar-kafka。)
2021-01-13_191754.jpg

•pulsar-kafka 脚本文件的完整内容如下:

  1. #!/usr/bin/env bash
  2. #
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements.  See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership.  The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License.  You may obtain a copy of the License at
  10. #
  11. #   http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing,
  14. # software distributed under the License is distributed on an
  15. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16. # KIND, either express or implied.  See the License for the
  17. # specific language governing permissions and limitations
  18. # under the License.
  19. #
  20. BINDIR=$(dirname "$0")
  21. export PULSAR_HOME=`cd -P $BINDIR/..;pwd`
  22. DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf
  23. DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf
  24. DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf
  25. DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf
  26. DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf
  27. DEFAULT_PROXY_CONF=$PULSAR_HOME/conf/proxy.conf
  28. DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
  29. DEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.conf
  30. DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2-kafka.yaml
  31. DEFAULT_PULSAR_PRESTO_CONF=${PULSAR_HOME}/conf/presto
  32. # functions related variables
  33. FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
  34. DEFAULT_WORKER_CONF=$PULSAR_HOME/conf/functions_worker.yml
  35. DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
  36. JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
  37. DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
  38. PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
  39. DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR=$PULSAR_HOME/instances/deps
  40. FUNCTIONS_EXTRA_DEPS_DIR=${PULSAR_FUNCTIONS_EXTRA_DEPS_DIR:-"${DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR}"}
  41. SQL_HOME=$PULSAR_HOME/pulsar-sql
  42. PRESTO_HOME=${PULSAR_HOME}/lib/presto
  43. # Check bookkeeper env and load bkenv.sh
  44. if [ -f "$PULSAR_HOME/conf/bkenv.sh" ]
  45. then
  46.     . "$PULSAR_HOME/conf/bkenv.sh"
  47. fi
  48. # Check pulsar env and load pulser_env.sh
  49. if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
  50. then
  51.     . "$PULSAR_HOME/conf/pulsar_env.sh"
  52. fi
  53. if [ -f "$PULSAR_HOME/conf/logenv.sh" ]
  54. then
  55.     . "$PULSAR_HOME/conf/logenv.sh"
  56. fi
  57. # Check for the java to use
  58. if [[ -z $JAVA_HOME ]]; then
  59.     JAVA=$(which java)
  60.     if [ $? != 0 ]; then
  61.         echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
  62.         exit 1
  63.     fi
  64. else
  65.     JAVA=$JAVA_HOME/bin/java
  66. fi
  67. # exclude tests jar
  68. RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
  69. if [ $? == 0 ]; then
  70.     PULSAR_JAR=$RELEASE_JAR
  71. fi
  72. # exclude tests jar
  73. BUILT_JAR=`ls $PULSAR_HOME/pulsar-broker/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
  74. if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
  75.     echo "\nCouldn't find pulsar jar.";
  76.     echo "Make sure you've run 'mvn package'\n";
  77.     exit 1;
  78. elif [ -e "$BUILT_JAR" ]; then
  79.     PULSAR_JAR=$BUILT_JAR
  80. fi
  81. #
  82. # find the instance locations for pulsar-functions
  83. #
  84. # find the java instance location
  85. if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
  86.     # didn't find a released jar, then search the built jar
  87.     BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
  88.     if [ -z "${BUILT_JAVA_INSTANCE_JAR}" ]; then
  89.         echo "\nCouldn't find pulsar-functions java instance jar.";
  90.         echo "Make sure you've run 'mvn package'\n";
  91.         exit 1;
  92.     fi
  93.     JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
  94. fi
  95. # find the python instance location
  96. if [ ! -f "${PY_INSTANCE_FILE}" ]; then
  97.     # didn't find a released python instance, then search the built python instance
  98.     BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
  99.     if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then
  100.         echo "\nCouldn't find pulsar-functions python instance.";
  101.         echo "Make sure you've run 'mvn package'\n";
  102.         exit 1;
  103.     fi
  104.     PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
  105. fi
  106. # find pulsar sql presto distribution location
  107. check_presto_libraries() {
  108.     if [ ! -d "${PRESTO_HOME}" ]; then
  109.         BUILT_PRESTO_HOME="${SQL_HOME}/presto-distribution/target/pulsar-presto-distribution"
  110.         if [ ! -d "${BUILT_PRESTO_HOME}" ]; then
  111.             echo "\nCouldn't find presto distribution.";
  112.             echo "Make sure you've run 'mvn package'\n";
  113.             exit 1;
  114.         fi
  115.         PRESTO_HOME=${BUILT_PRESTO_HOME}
  116.     fi
  117. }
  118. pulsar_help() {
  119.     cat <<EOF
  120. Usage: pulsar <command>
  121. where command is one of:
  122.     broker              Run a broker server
  123.     bookie              Run a bookie server
  124.     zookeeper           Run a zookeeper server
  125.     configuration-store Run a configuration-store server
  126.     discovery           Run a discovery server
  127.     proxy               Run a pulsar proxy
  128.     websocket           Run a web socket proxy server
  129.     functions-worker    Run a functions worker server
  130.     sql-worker          Run a sql worker server
  131.     sql                 Run sql CLI
  132.     standalone          Run a broker server with local bookies and local zookeeper
  133.     initialize-cluster-metadata     One-time metadata initialization
  134.     delete-cluster-metadata         Delete a cluster's metadata
  135.     initialize-transaction-coordinator-metadata     One-time transaction coordinator metadata initialization
  136.     initialize-namespace     namespace initialization
  137.     compact-topic       Run compaction against a topic
  138.     zookeeper-shell     Open a ZK shell client
  139.     broker-tool         CLI to operate a specific broker
  140.     tokens              Utility to create authentication tokens
  141.     help                This help message
  142. or command is the full name of a class with a defined main() method.
  143. Environment variables:
  144.    PULSAR_LOG_CONF               Log4j configuration file (default $DEFAULT_LOG_CONF)
  145.    PULSAR_BROKER_CONF            Configuration file for broker (default: $DEFAULT_BROKER_CONF)
  146.    PULSAR_BOOKKEEPER_CONF        Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF)
  147.    PULSAR_ZK_CONF                Configuration file for zookeeper (default: $DEFAULT_ZK_CONF)
  148.    PULSAR_CONFIGURATION_STORE_CONF         Configuration file for global configuration store (default: $DEFAULT_CONFIGURATION_STORE_CONF)
  149.    PULSAR_DISCOVERY_CONF         Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
  150.    PULSAR_WEBSOCKET_CONF         Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
  151.    PULSAR_PROXY_CONF             Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF)
  152.    PULSAR_WORKER_CONF            Configuration file for functions worker (default: $DEFAULT_WORKER_CONF)
  153.    PULSAR_STANDALONE_CONF        Configuration file for standalone (default: $DEFAULT_STANDALONE_CONF)
  154.    PULSAR_PRESTO_CONF            Configuration directory for Pulsar Presto (default: $DEFAULT_PULSAR_PRESTO_CONF)
  155.    PULSAR_EXTRA_OPTS             Extra options to be passed to the jvm
  156.    PULSAR_EXTRA_CLASSPATH        Add extra paths to the pulsar classpath
  157.    PULSAR_PID_DIR                Folder where the pulsar server PID file should be stored
  158.    PULSAR_STOP_TIMEOUT           Wait time before forcefully kill the pulsar server instance, if the stop is not successful
  159. These variable can also be set in conf/pulsar_env.sh
  160. EOF
  161. }
  162. add_maven_deps_to_classpath() {
  163.     MVN="mvn"
  164.     if [ "$MAVEN_HOME" != "" ]; then
  165.     MVN=${MAVEN_HOME}/bin/mvn
  166.     fi
  167.     # Need to generate classpath from maven pom. This is costly so generate it
  168.     # and cache it. Save the file into our target dir so a mvn clean will get
  169.     # clean it up and force us create a new one.
  170.     f="${PULSAR_HOME}/distribution/server/target/classpath.txt"
  171.     if [ ! -f "${f}" ]
  172.     then
  173.     ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
  174.     fi
  175.     PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
  176. }
  177. if [ -d "$PULSAR_HOME/lib" ]; then
  178. PULSAR_CLASSPATH=$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*
  179.     ASPECTJ_AGENT_PATH=`ls -1 $PULSAR_HOME/lib/org.aspectj-aspectjweaver-*.jar`
  180. else
  181.     add_maven_deps_to_classpath
  182.     ASPECTJ_VERSION=`grep '<aspectj.version>' $PULSAR_HOME/pom.xml | awk -F'>' '{print $2}' | awk -F'<' '{print $1}'`
  183.     ASPECTJ_AGENT_PATH="$HOME/.m2/repository/org/aspectj/aspectjweaver/$ASPECTJ_VERSION/aspectjweaver-$ASPECTJ_VERSION.jar"
  184. fi
  185. ASPECTJ_AGENT="-javaagent:$ASPECTJ_AGENT_PATH"
  186. # if no args specified, show usage
  187. if [ $# = 0 ]; then
  188.     pulsar_help;
  189.     exit 1;
  190. fi
  191. # get arguments
  192. COMMAND=$1
  193. shift
  194. if [ -z "$PULSAR_WORKER_CONF" ]; then
  195.     PULSAR_WORKER_CONF=$DEFAULT_WORKER_CONF
  196. fi
  197. if [ -z "$PULSAR_BROKER_CONF" ]; then
  198.     PULSAR_BROKER_CONF=$DEFAULT_BROKER_CONF
  199. fi
  200. if [ -z "$PULSAR_BOOKKEEPER_CONF" ]; then
  201.     PULSAR_BOOKKEEPER_CONF=$DEFAULT_BOOKKEEPER_CONF
  202. fi
  203. if [ -z "$PULSAR_ZK_CONF" ]; then
  204.     PULSAR_ZK_CONF=$DEFAULT_ZK_CONF
  205. fi
  206. if [ -z "$PULSAR_GLOBAL_ZK_CONF" ]; then
  207.     PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONF
  208. fi
  209. if [ -z "$PULSAR_CONFIGURATION_STORE_CONF" ]; then
  210.     PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_CONFIGURATION_STORE_CONF
  211. fi
  212. if [ -z "$PULSAR_DISCOVERY_CONF" ]; then
  213.     PULSAR_DISCOVERY_CONF=$DEFAULT_DISCOVERY_CONF
  214. fi
  215. if [ -z "$PULSAR_PROXY_CONF" ]; then
  216.     PULSAR_PROXY_CONF=$DEFAULT_PROXY_CONF
  217. fi
  218. if [ -z "$PULSAR_WEBSOCKET_CONF" ]; then
  219.     PULSAR_WEBSOCKET_CONF=$DEFAULT_WEBSOCKET_CONF
  220. fi
  221. if [ -z "$PULSAR_STANDALONE_CONF" ]; then
  222.     PULSAR_STANDALONE_CONF=$DEFAULT_STANDALONE_CONF
  223. fi
  224. if [ -z "$PULSAR_LOG_CONF" ]; then
  225.     PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
  226. fi
  227. if [ -z "$PULSAR_PRESTO_CONF" ]; then
  228.     PULSAR_PRESTO_CONF=$DEFAULT_PULSAR_PRESTO_CONF
  229. fi
  230. PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
  231. PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
  232. OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"
  233. # Ensure we can read bigger content from ZK. (It might be
  234. # rarely needed when trying to list many z-nodes under a
  235. # directory)
  236. OPTS="$OPTS -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true"
  237. OPTS="-cp $PULSAR_CLASSPATH $OPTS"
  238. OPTS="$OPTS $PULSAR_EXTRA_OPTS $PULSAR_MEM $PULSAR_GC"
  239. # log directory & file
  240. PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
  241. PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
  242. PULSAR_LOG_ROOT_LEVEL=${PULSAR_LOG_ROOT_LEVEL:-"info"}
  243. PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
  244. PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}
  245. #Configure log configuration system properties
  246. OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
  247. OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
  248. OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
  249. OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"
  250. # Functions related logging
  251. OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
  252. # instance
  253. OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
  254. OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
  255. OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
  256. OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
  257. OPTS="$OPTS -Dpulsar.module.instanceid=${PULSAR_MODULE_INSTANCE_ID} -Dpulsar.module.type=$COMMAND -Dkafka.cluster=${KAFKA_CLUSTER} -Dpulsar.hostname=${HOSTNAME} -Dpulsar.hostip=${HOST_IP} -Dpulsar.cluster=${PULSAR_CLUSTER} -Dpulsar.topic=${PULSAR_TOPIC}"
  258. ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true"
  259. #Change to PULSAR_HOME to support relative paths
  260. cd "$PULSAR_HOME"
  261. if [ $COMMAND == "broker" ]; then
  262.     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-broker.log"}
  263.     exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarBrokerStarter --broker-conf $PULSAR_BROKER_CONF $@
  264. elif [ $COMMAND == "bookie" ]; then
  265.     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"bookkeeper.log"}
  266.     # Pass BOOKIE_EXTRA_OPTS option defined in pulsar_env.sh
  267.     OPTS="$OPTS $BOOKIE_EXTRA_OPTS"
  268.     exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.proto.BookieServer --conf $PULSAR_BOOKKEEPER_CONF $@
  269. elif [ $COMMAND == "zookeeper" ]; then
  270.     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"}
  271.     exec $JAVA ${ZK_OPTS} $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ZooKeeperStarter $PULSAR_ZK_CONF $@
  272. elif [ $COMMAND == "global-zookeeper" ]; then
  273.     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
  274.     # Allow global ZK to turn into read-only mode when it cannot reach the quorum
  275.     OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
  276.     exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@
  277. elif [ $COMMAND == "configuration-store" ]; then
  278.     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"}
  279.     # Allow global ZK to turn into read-only mode when it cannot reach the quorum
  280.     OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
  281.     exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@
  282. elif [ $COMMAND == "discovery" ]; then
  283.     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"}
  284.     exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@
  285. elif [ $COMMAND == "proxy" ]; then
  286.     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-proxy.log"}
  287.     exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.proxy.server.ProxyServiceStarter --config $PULSAR_PROXY_CONF $@
  288. elif [ $COMMAND == "websocket" ]; then
  289.     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-websocket.log"}
  290.     exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@
  291. elif [ $COMMAND == "functions-worker" ]; then
  292.     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-functions-worker.log"}
  293.     exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.functions.worker.FunctionWorkerStarter -c $PULSAR_WORKER_CONF $@
  294. elif [ $COMMAND == "standalone" ]; then
  295.     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
  296.     exec $JAVA $OPTS $ASPECTJ_AGENT ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
  297. elif [ $COMMAND == "initialize-cluster-metadata" ]; then
  298.     exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
  299. elif [ $COMMAND == "delete-cluster-metadata" ]; then
  300.     exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataTeardown $@
  301. elif [ $COMMAND == "initialize-transaction-coordinator-metadata" ]; then
  302.     exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@
  303. elif [ $COMMAND == "initialize-namespace" ]; then
  304.     exec $JAVA $OPTS org.apache.pulsar.PulsarInitialNamespaceSetup $@
  305. elif [ $COMMAND == "zookeeper-shell" ]; then
  306.     exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@
  307. elif [ $COMMAND == "broker-tool" ]; then
  308.     exec $JAVA $OPTS org.apache.pulsar.broker.tools.BrokerTool $@
  309. elif [ $COMMAND == "compact-topic" ]; then
  310.     exec $JAVA $OPTS org.apache.pulsar.compaction.CompactorTool --broker-conf $PULSAR_BROKER_CONF $@
  311. elif [ $COMMAND == "sql" ]; then
  312.     check_presto_libraries
  313.     exec $JAVA -cp "${PRESTO_HOME}/lib/*" io.prestosql.cli.Presto --server localhost:8081 "${@}"
  314. elif [ $COMMAND == "sql-worker" ]; then
  315.     check_presto_libraries
  316.     exec ${PRESTO_HOME}/bin/launcher --etc-dir ${PULSAR_PRESTO_CONF} "${@}"
  317. elif [ $COMMAND == "tokens" ]; then
  318.       exec $JAVA $OPTS org.apache.pulsar.utils.auth.tokens.TokensCliUtils $@
  319. elif [ $COMMAND == "help" -o $COMMAND == "--help" -o $COMMAND == "-h" ]; then
  320.     pulsar_help;
  321. else
  322.     echo ""
  323.     echo "-- Invalid command '$COMMAND' -- Use '$0 help' to get a list of valid commands"
  324.     echo ""
  325.     exit 1
  326. fi
复制代码


2)pulsar-daemon-kafka

该脚本文件是从 pulsar-daemon 脚本文件复制而来,在 pulsar-daemon 脚本文件的基础上添加如下修改: (说明:下图中左侧为 pulsar-daemon,右侧为 pulsar-daemon-kafka。)

2021-01-13_191836.jpg

&#8226;pulsar-daemon-kafka 脚本文件的完整内容如下:

  1. #!/usr/bin/env bash
  2. #
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements.  See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership.  The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License.  You may obtain a copy of the License at
  10. #
  11. #   http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing,
  14. # software distributed under the License is distributed on an
  15. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16. # KIND, either express or implied.  See the License for the
  17. # specific language governing permissions and limitations
  18. # under the License.
  19. #
  20. usage() {
  21.     cat <<EOF
  22. Usage: pulsar-daemon (start|stop) <command> <args...>
  23. where command is one of:
  24.     broker              Run a broker server
  25.     bookie              Run a bookie server
  26.     zookeeper           Run a zookeeper server
  27.     configuration-store Run a configuration-store server
  28.     discovery           Run a discovery server
  29.     websocket           Run a websocket proxy server
  30.     functions-worker    Run a functions worker server
  31.     standalone          Run a standalone Pulsar service
  32.     proxy               Run a Proxy Pulsar service
  33. where argument is one of:
  34.     -force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdown
  35. EOF
  36. }
  37. BINDIR=$(dirname "$0")
  38. PULSAR_HOME=$(cd -P $BINDIR/..;pwd)
  39. # Check bookkeeper env and load bkenv.sh
  40. if [ -f "$PULSAR_HOME/conf/bkenv.sh" ]
  41. then
  42.     . "$PULSAR_HOME/conf/bkenv.sh"
  43. fi
  44. if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
  45. then
  46.     . "$PULSAR_HOME/conf/pulsar_env.sh"
  47. fi
  48. if [ -f "$PULSAR_HOME/conf/logenv.sh" ]
  49. then
  50.     . "$PULSAR_HOME/conf/logenv.sh"
  51. fi
  52. PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RollingFile"}
  53. PULSAR_STOP_TIMEOUT=${PULSAR_STOP_TIMEOUT:-30}
  54. PULSAR_PID_DIR=${PULSAR_PID_DIR:-$PULSAR_HOME/bin}
  55. if [ $# = 0 ]; then
  56.     usage
  57.     exit 1
  58. elif [ $# = 1 ]; then
  59.     if [ $1 == "--help" -o $1 == "-h" ]; then
  60.         usage
  61.         exit 1
  62.     else
  63.         echo "Error: no enough arguments provided."
  64.         usage
  65.         exit 1
  66.     fi
  67. fi
  68. startStop=$1
  69. shift
  70. command=$1
  71. shift
  72. case $command in
  73.     (broker)
  74.         echo "doing $startStop $command ..."
  75.         ;;
  76.     (bookie)
  77.         echo "doing $startStop $command ..."
  78.         ;;
  79.     (zookeeper)
  80.         echo "doing $startStop $command ..."
  81.         ;;
  82.     (global-zookeeper)
  83.         echo "doing $startStop $command ..."
  84.         ;;
  85.     (configuration-store)
  86.         echo "doing $startStop $command ..."
  87.         ;;
  88.     (discovery)
  89.         echo "doing $startStop $command ..."
  90.         ;;
  91.     (websocket)
  92.         echo "doing $startStop $command ..."
  93.         ;;
  94.     (functions-worker)
  95.         echo "doing $startStop $command ..."
  96.         ;;
  97.     (standalone)
  98.         echo "doing $startStop $command ..."
  99.         ;;
  100.     (proxy)
  101.         echo "doing $startStop $command ..."
  102.         ;;
  103.     (*)
  104.         echo "Error: unknown service name $command"
  105.         usage
  106.         exit 1
  107.         ;;
  108. esac
  109. export PULSAR_LOG_DIR=$PULSAR_LOG_DIR
  110. export PULSAR_LOG_APPENDER=$PULSAR_LOG_APPENDER
  111. export PULSAR_LOG_FILE=pulsar-$command-$HOSTNAME.log
  112. pid=$PULSAR_PID_DIR/pulsar-$command.pid
  113. out=$PULSAR_LOG_DIR/pulsar-$command-$HOSTNAME.out
  114. logfile=$PULSAR_LOG_DIR/$PULSAR_LOG_FILE
  115. rotate_out_log ()
  116. {
  117.     log=$1;
  118.     num=5;
  119.     if [ -n "$2" ]; then
  120.        num=$2
  121.     fi
  122.     if [ -f "$log" ]; then # rotate logs
  123.         while [ $num -gt 1 ]; do
  124.             prev=`expr $num - 1`
  125.             [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
  126.             num=$prev
  127.         done
  128.         mv "$log" "$log.$num";
  129.     fi
  130. }
  131. mkdir -p "$PULSAR_LOG_DIR"
  132. case $startStop in
  133.   (start)
  134.     if [ -f $pid ]; then
  135.       if kill -0 `cat $pid` > /dev/null 2>&1; then
  136.         echo $command running as process `cat $pid`.  Stop it first.
  137.         exit 1
  138.       fi
  139.     fi
  140.     rotate_out_log $out
  141.     echo starting $command, logging to $logfile
  142.     echo Note: Set immediateFlush to true in conf/log4j2-kafka.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations.
  143.     pulsar=$PULSAR_HOME/bin/pulsar-kafka
  144.     nohup $pulsar $command "$@" > "$out" 2>&1 < /dev/null &
  145.     echo $! > $pid
  146.     sleep 1; head $out
  147.     sleep 2;
  148.     if ! ps -p $! > /dev/null ; then
  149.       exit 1
  150.     fi
  151.     ;;
  152.   (stop)
  153.     if [ -f $pid ]; then
  154.       TARGET_PID=$(cat $pid)
  155.       if kill -0 $TARGET_PID > /dev/null 2>&1; then
  156.         echo "stopping $command"
  157.         kill $TARGET_PID
  158.         count=0
  159.         location=$PULSAR_LOG_DIR
  160.         while ps -p $TARGET_PID > /dev/null;
  161.          do
  162.           echo "Shutdown is in progress... Please wait..."
  163.           sleep 1
  164.           count=`expr $count + 1`
  165.           if [ "$count" = "$PULSAR_STOP_TIMEOUT" ]; then
  166.                 break
  167.           fi
  168.          done
  169.         if [ "$count" != "$PULSAR_STOP_TIMEOUT" ]; then
  170.             echo "Shutdown completed."
  171.         fi
  172.         if kill -0 $TARGET_PID > /dev/null 2>&1; then
  173.               fileName=$location/$command.out
  174.               $JAVA_HOME/bin/jstack $TARGET_PID > $fileName
  175.               echo "Thread dumps are taken for analysis at $fileName"
  176.               if [ "$1" == "-force" ]
  177.               then
  178.                  echo "forcefully stopping $command"
  179.                  kill -9 $TARGET_PID >/dev/null 2>&1
  180.                  echo Successfully stopped the process
  181.               else
  182.                  echo "WARNNING :  $command is not stopped completely."
  183.                  exit 1
  184.               fi
  185.         fi
  186.       else
  187.         echo "no $command to stop"
  188.       fi
  189.       rm $pid
  190.     else
  191.       echo no "$command to stop"
  192.     fi
  193.     ;;
  194.   (*)
  195.     usage
  196.     exit 1
  197.     ;;
  198. esac
复制代码

3、添加 Kafka Producer 依赖的 jar

在 pulsar 集群的所有节点上的 {PULSAR_HOME}/lib 目录中添加以下 3 个 jar:
  1. connect-api-2.0.1.jar
  2. disruptor-3.4.2.jar
  3. kafka-clients-2.0.1.jar
复制代码

4、启动Pulsar 服务

1.为了确保 Pulsar 服务的日志能够正确的写入 Kafka,先通过 bin/pulsar-kafka 前台启动,在没有异常的情况下,再通过 bin/pulsar-daemon-kafka 命令后台启动。2.以启动 broker 为例,执行以下命令:
  1. bin/pulsar-daemon-kafka start broker
复制代码

3. 通过 ps 命令查看 broker 进程如下:

2021-01-13_191916.jpg

在上图可以看到,我们通过 logenv.sh 配置的 OPTS 都已经传递到 broker 进程中,log4j2-kafka.yaml 中的 sys 标签便可以通过这些属性值实例化一个 Kafka Producer,broker 进程的日志便会通过 Kafka Producer 发送到 Kafka broker 中。

5、测试 Pulsar 日志是否成功写入 Kafka broker

启动一个Kafka Consumer ,订阅log4j2 发送消息的Topic,读取到的消息内容如下,多个检索字段之间以空格分开:

pulsar-cluster dapp21 192.168.0.1 broker 1 2020-12-26 17:40:14.363 [prometheus-stats-43-1] [org.eclipse.jetty.server.RequestLog] INFO - 192.168.0.1 - - [26/Dec/2020:17:40:14 +0800] "GET /metrics/ HTTP/1.1" 200 23445 "http://192.168.0.1:8080/metrics" "Prometheus/2.22.1" 4

6、日志检索

打开 kibana 页面,根据分词的字段进行检索,检索条件如下:
cluster:"pulsar-cluster" AND hostname:"XXX" AND module:"broker" AND level:"INFO"

2021-01-13_191944.jpg

在上图中可以看到某个时间段内的日志检索结果,并且可以根据需要,在检索结果中添加 Available fields。这样,开发或运维人员可以通过 kibana 从多个维度快速有效的分析 Pulsar 服务异常的原因。至此,就是 Apache Pulsar 基于 Log4j2+Kafka+ELK 实现日志的快速检索的一套完整的解决方案。


总结

目前,分布式、微服务化是比较流行的技术方向,在生产系统中,随着业务的不断发展, 应用和服务体量的快速扩张,从单体/垂直架构转移到分布式/微服务架构是自然而然的选择,它主要表现在降低复杂度、容错、独立部署、水平伸缩等方面。但同时也面临着新的挑战,如问题排查的效率,运维监控的便捷性等。本文以 Apache Pulsar 为例,分享 Java 进程如何使用 Log4j2+Kafka+ELK 实现分布式、微服务化的日志的快速检索,达到服务治理的效果。

作者:ApachePulsar

来源:https://mp.weixin.qq.com/s/igFlFvEYz1pXDiscWdwW2g


最新经典文章,欢迎关注公众号



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /5 下一条