分享

flume-ng 求助

tail 采集日志入库,flume-ng跑一段时间后取到的日志不完整了,,一条数据 前面被截了几十个字符采集日志为nginx 日志一条 大约 几百字符。。 感谢
配置如下
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail  -F /opt/logs/tengine/access.log
agent1.sources.source1.channels = channel1

# Describe sink1
agent1.sinks.sink1.type = cn.com.flaginfo.flume.sink.db.DbSink
agent1.sinks.sink1.proxool.path = /opt/flume/conf/ProxoolConf.xml
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 1000

# Bind the source and sink to the channel  
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1



已有(11)人评论

跳转到指定楼层
sstutu 发表于 2014-7-17 09:28:03
你们这个是不是自定义的插件,需要看源码
回复

使用道具 举报

2278 发表于 2014-7-17 09:33:17
sstutu 发表于 2014-7-17 09:28
你们这个是不是自定义的插件,需要看源码
  1. public class DbSink extends AbstractSink
  2.   implements Configurable
  3. {
  4.   private static final Logger log = LoggerFactory.getLogger(DbSink.class);
  5.   public Sink.Status process() throws EventDeliveryException {
  6.         Channel channel = getChannel();
  7.     Transaction tx = channel.getTransaction();
  8.     Sink.Status localStatus;
  9.     try {
  10.       tx.begin();
  11.       Event e = channel.take();
  12.       if (e == null) {
  13.         tx.rollback();
  14.         localStatus = Sink.Status.BACKOFF;
  15.         return localStatus;
  16.       }
  17.        String msg = new String(e.getBody());
  18.        log.debug(msg); //丢失部分数据点
  19.        JSONObject json = parse(msg); //格式化
  20.        tx.commit();
  21.       return Sink.Status.READY;
  22.     } catch (Exception e) {
  23.             e.printStackTrace();
  24.         log.error(e.toString());
  25.       tx.rollback();
  26.       return Sink.Status.BACKOFF;
  27.     } finally {
  28.       tx.close();
  29.     } }
  30.   public void configure(Context context)
  31.   {
  32.     String path = JafkaUtil.getJafkaConfigParameter(context, "proxool.path");
  33.     try {
  34.       log.info("proxool.dbpool appDir:" + path);
  35.       JAXPConfigurator.configure(path, false);
  36.       log.info("proxool.dbpool init succ!");
  37.     } catch (ProxoolException e) {
  38.       e.printStackTrace();
  39.     }
  40.   }
复制代码

回复

使用道具 举报

2278 发表于 2014-7-17 10:11:36
sstutu 发表于 2014-7-17 09:28
你们这个是不是自定义的插件,需要看源码

通过日志 我查到,在不出错的正常情况下,body.length 在200-300之间 一般小于500  而出错的那条。都大于1000 。。而我的内存设置是1000  会是内存设太小?、、、nginx 日志一般不会变,在nginx日志里查看基本上每条都差不多长度,,可是为什么body。length 有条会忽然比其他的大很多。。 这是为啥呢,,正在开大内存测试中。。。。
回复

使用道具 举报

hyj 发表于 2014-7-17 13:21:51
2278 发表于 2014-7-17 10:11
通过日志 我查到,在不出错的正常情况下,body.length 在200-300之间 一般小于500  而出错的那条。都大 ...
你们是不是爬虫,爬完内容,然后通过flume传递,然后数据分析。
如果Memory Channel那就关系比较大了。你可以调大试一下。
回复

使用道具 举报

lbwahoo 发表于 2014-7-17 13:21:58
留下来看看 !
回复

使用道具 举报

2278 发表于 2014-7-17 16:10:29
hyj 发表于 2014-7-17 13:21
你们是不是爬虫,爬完内容,然后通过flume传递,然后数据分析。
如果Memory Channel那就关系比较大了。你 ...

  就是直接tail -f 形式取nginx日志,split 下 丢数据库,遇到的问题是,程序在正常运行一段时间后。。就取不到数据了。而body的长度又不为0。太郁闷了。。。

谢谢

谢谢


回复

使用道具 举报

hyj 发表于 2014-7-17 16:16:12
2278 发表于 2014-7-17 16:10
就是直接tail -f 形式取nginx日志,split 下 丢数据库,遇到的问题是,程序在正常运行一段时间后。。就 ...

建议从两方面检查:
1.程序问题
从你的错误来看,是数组越界了。
2.内存或则其他原因,数据累计
造成错误



回复

使用道具 举报

2278 发表于 2014-7-17 16:18:49
hyj 发表于 2014-7-17 16:16
建议从两方面检查:
1.程序问题
从你的错误来看,是数组越界了。

数组越界是因为 没取到body  ,没做处理才报的
我再找找原因。困扰我几天了
回复

使用道具 举报

hyj 发表于 2014-7-17 16:22:43
2278 发表于 2014-7-17 16:18
数组越界是因为 没取到body  ,没做处理才报的
我再找找原因。困扰我几天了

不了解你的实际情况,觉得你的方向应该这样:刚开始正常,后来不正常,你需要从数据不断累加方向考虑。数据的累加,会带来什么问题。

回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条