分享

有没有人讨论下从flume的sink数据到solr中?

wtaisi 发表于 2017-3-30 14:19:18 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 8721
我的数据来源于从程序到kafka中,通过flume采集到solr,但是配置这里难住了,不太懂 morphlines.conf这个里面是怎么写的。
我的数据格式是:xxx|xxx|xxx|xxxx|xxxxxxxx|xxxxx|xxxxx|xxxxx|xxx


这个是我的flume配置文件(参照http://www.aboutyun.com/thread-14925-1-1.html):
#指定本sink从哪个channel中读取数据
agent.sinks.solrSink.channel = memoryChannel
#指定sink类型,MorphlineSolrSink
agent.sinks.solrSink.type= org.apache.flume.sink.solr.morphline.MorphlineSolrSink
#指定morphline的配置文件路径,如果不加前面的路径,则默认读取该flume.conf文件所在同级路径的morphline文件。
agent.sinks.solrSink.morphlineFile = morphlines.conf
agent.sinks.solrSink.morphlineId = morphline1
#当channel中存在100条数据开始处理
agent.sinks.solrSink.batchSize = 100
#当channel中数据存在超过1000ms时开始处理。batchSize与batchDurationMillis采取优先原则,哪个参数值先到,哪个先处理。
agent.sinks.solrSink.batchDurationMillis = 1000


已有(3)人评论

跳转到指定楼层
yaojiank 发表于 2017-3-30 19:11:41
其实都有案例:
规律来讲:

第一部分:前面先配置SOLR_LOCATOR
如下面
SOLR_LOCATOR : {
  # Name of solr collection
  collection : collection1

  # ZooKeeper ensemble
  zkHost : "127.0.0.1:2181/solr"
}

这些如果懂solr,这个不难的。

第二部分:指定一个或则多个morphlines。
每一个定义一个ETL transformation链。一个morphline包含一个或则多个(多个潜在的嵌套)命令。一个morphline是consume记录的一种方式。把他们转换为stream记录,和pipe流,通过一组配置transformations的目标应用程序的方式比如solr.


morphlines : [
  {
    # 如果有多个,下面是名字morphline1
    id : morphline1


#导入所有的morphline命令,在Java包和子包中
#可能在classpath存在其它命令,这是不可见的
    importCommands : ["com.cloudera.**", "org.apache.solr.**"]

    commands : [
      {
        # 解析 Avro container 文件和提交记录为每一个Avro object
        readAvroContainer {
          # 可选, 需要输入匹配的MIME类型:
          # supportedMimeTypes : [avro/binary]

          #可选,使用定制的 Avro schema 以 JSON 格式 inline:
          # readerSchemaString : """<json can go here>"""

          # 可选, use a custom Avro schema file in JSON format:
          # readerSchemaFile : /path/to/syslog.avsc
        }
      }

      {
#下面都是英文,如果楼主哪里不明白可以在继续讨论
        # Consume the output record of the previous command and pipe another
        # record downstream.
        #
        # extractAvroPaths is a command that uses zero or more Avro path
        # expressions to extract values from an Avro object. Each expression
        # consists of a record output field name (on the left side of the
        # colon ':') as well as zero or more path steps (on the right hand
        # side), each path step separated by a '/' slash. Avro arrays are
        # traversed with the '[]' notation.
        #
        # The result of a path expression is a list of objects, each of which
        # is added to the given record output field.
        #
        # The path language supports all Avro concepts, including nested
        # structures, records, arrays, maps, unions, etc, as well as a flatten
        # option that collects the primitives in a subtree into a flat list.
        extractAvroPaths {
          flatten : false
          paths : {
            id : /id
            username : /user_screen_name
            created_at : /created_at
            text : /text
          }
        }
      }

      # Consume the output record of the previous command and pipe another
      # record downstream.
      #
      # convert timestamp field to native Solr timestamp format
      # e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
      {
        convertTimestamp {
          field : created_at
          inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd"]
          inputTimezone : America/Los_Angeles
          outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
          outputTimezone : UTC
        }
      }

      # Consume the output record of the previous command and pipe another
      # record downstream.
      #
      # Command that deletes record fields that are unknown to Solr
      # schema.xml.
      #
      # Recall that Solr throws an exception on any attempt to load a document
      # that contains a field that isn't specified in schema.xml.
      {
        sanitizeUnknownSolrFields {
          # Location from which to fetch Solr schema
          solrLocator : ${SOLR_LOCATOR}
        }
      }

      # log the record at DEBUG level to SLF4J
      { logDebug { format : "output record: {}", args : ["@{}"] } }

      # load the record into a Solr server or MapReduce Reducer
      {
        loadSolr {
          solrLocator : ${SOLR_LOCATOR}
        }
      }
    ]
  }
]





回复

使用道具 举报

yaojiank 发表于 2017-3-30 19:13:22
比方下面比较简单的,就是
第一步:命名一个morphline1,
第二步导入包
第三步:使用命令
morphlines : [
  {
    id : morphline1

    # Import all morphline commands in these java packages and their subpackages.
    # Other commands that may be present on the classpath are not visible to this morphline.
    importCommands : ["org.kitesdk.**"]

    commands : [
        {
                 readLine {
                        charset : UTF-8
                 }
        }
    ]
  }
]
楼主可以从简单的开始


回复

使用道具 举报

wtaisi 发表于 2017-5-5 15:24:59
yaojiank 发表于 2017-3-30 19:11
其实都有案例:
规律来讲:

那。原生环境的Flume sink 数据到solr怎么配置呢?需要morphline吗?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条