分享

使用Kafka和KSQL进行实时系统日志处理:过滤

本帖最后由 pig2 于 2018-7-14 11:10 编辑
问题导读

1.kafka sql的作用是什么?

2.kafka sql如何实现过滤数据?
3.kafka sql对于过去的,现在接受的,以后接受的数据如何过滤?






系统日志是现代计算运行的无处不在的标准之一。 Linux,它在IP摄像机等网络和物联网设备中也很常见。它提供了一种流式传输日志消息的方法,以及源主机,消息严重性等元数据。有时目标只是一个本地日志文件,但更常见的是它是一个集中式系统日志服务器,它可以进一步记录或处理消息。

作为一种高性能的分布式流媒体平台,ApacheKafka®是集中提取系统日志数据的绝佳工具。由于Apache Kafka还持久保存数据并支持本地流处理,因此我们无需在使用数据之前将其置于其中。您可以通过各种方式流式传输数据,包括具有专用syslog插件的Kafka Connect。

在这篇文章中,我们将看到KSQL如何在实时到达时处理系统日志消息。 KSQL是Apache Kafka的SQL流引擎。使用SQL,交互式执行或作为应用程序,我们可以在Kafka中过滤,丰富和聚合数据流。来自KSQL的数据处理可以追溯到Kafka,这意味着我们可以轻松地获取Kafka中的数据。


ksql_syslog01-1024x258.png


使用KSQL过滤syslog数据
使用KSQL处理syslog和流很简单。 首先,下载并安装Confluent Platform(https://www.confluent.io/download/),然后按照GitHub仓库中的说明安装KSQL(https://github.com/confluentinc/ksql/)。 (从Confluent Platform 4.1开始,KSQL将包含在平台中。)你需要安装和配置Kafka Connect syslog插件(https://github.com/rmoff/kafka-connect-syslog/blob/4eeb310bf159487d78711d9d3c56c76c28651edb/usage.adoc),然后配置syslog source,发送数据。 如何操作取决于客户端syslog实现 - 对于rsyslog,请参见此处(http://www.rsyslog.com/sending-messages-to-a-remote-syslog-server/)。

假设数据使用syslog topic接受,我们可以从KSQL提示(运行ksql local开始)开始检查topic上的数据:
[mw_shl_code=bash,true]ksql> PRINT 'syslog' FROM BEGINNING;
Format:AVRO
12/03/18 12:21:06 GMT, */192.168.10.250:47823, {"date": null, "facility": 1, "host": "I", "level": 6, "message": "I   logs\n", "charset": "UTF-8", "remote_address": "/192.168.10.250:47823", "hostname": "proxmox01.moffatt.me"}[/mw_shl_code]
如果让这个运行,将在控制台看到所有的新事件流。按Ctrl-C返回到KSQL提示符。

我们现在可以使用(检测到的格式)在topic上声明KSQL:
[mw_shl_code=bash,true]ksql> CREATE STREAM SYSLOG WITH (KAFKA_TOPIC='syslog',VALUE_FORMAT='AVRO');

Message
----------------
Stream created
----------------[/mw_shl_code]
使用标准SQL命令,我们可以查询和操作事件流。
[mw_shl_code=bash,true]ksql> SELECT HOSTNAME,MESSAGE FROM SYSLOG;
proxmox01.moffatt.me | I  logs
proxmox01.moffatt.me | I still  logs[/mw_shl_code]
这是一个连续查询,并将继续显示到达时新数据。 按Ctrl-C取消并返回KSQL提示符。
Apache Kafka数据仍然存在,因此KSQL不仅可以显示当前和未来的入站(接受)消息 - 我们还可以查询过去! 为此,我们将KSQL设置从主题的开头处理消息:

[mw_shl_code=bash,true]ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'[/mw_shl_code]
现在你可以重新运行select,你将会看到syslog topic的很多内容
[mw_shl_code=bash,true]ksql> SELECT HOSTNAME,MESSAGE FROM SYSLOG;
localhost | X
localhost | foo I love logs
localhost | I   logs
proxmox01.moffatt.me | I   logs
proxmox01.moffatt.me | I still   logs[/mw_shl_code]
我们现在接入实时系统日志,这样我们可以看到KSQL的潜力。我的日志流来自家庭网络,包括一些服务器,容器,一些network应用,和一堆移动设备。
[mw_shl_code=bash,true]ksql> SELECT TIMESTAMPTOSTRING(DATE, 'yyyy-MM-dd HH:mm:ss') AS SYSLOG_TS, HOST, MESSAGE FROM SYSLOG;
2018-03-12 13:30:59 | rpi-03 | rpi-03 sshd[30105]: Invalid user oracle from 185.55.218.153
2018-03-12 13:30:59 | rpi-03 | rpi-03 sshd[30105]: input_userauth_request: invalid user oracle [preauth]
2018-03-12 13:30:59 | rpi-03 | rpi-03 sshd[30105]: Received disconnect from 185.55.218.153: 11: Bye Bye [preauth]
2018-03-12 13:31:00 | rpi-03 | rpi-03 sshd[30109]: reverse mapping checking getaddrinfo for host3.artegix.info [185.55.218.153] failed - POSSIBLE BREAK-IN ATTEMPT!
2018-03-12 13:31:01 | rpi-03 | rpi-03 sshd[30117]: Invalid user test from 185.55.218.153
2018-03-12 13:31:01 | rpi-03 | rpi-03 sshd[30117]: input_userauth_request: invalid user test [preauth]
2018-03-12 13:31:01 | rpi-03 | rpi-03 sshd[30117]: Received disconnect from 185.55.218.153: 11: Bye Bye [preauth]
2018-03-12 13:31:02 | rpi-03 | rpi-03 sshd[30121]: reverse mapping checking getaddrinfo for host3.artegix.info [185.55.218.153] failed - POSSIBLE BREAK-IN ATTEMPT!
2018-03-12 13:31:02 | rpi-03 | rpi-03 sshd[30121]: Invalid user test from 185.55.218.153
2018-03-12 13:31:05 | ("BZ2,24a43c000000,v3.7.40.6115") | ("BZ2,24a43c000000,v3.7.40.6115") syslog: dpi.dpi_stainfo_notify(): dpi not enable
2018-03-12 13:31:05 | ("BZ2,24a43c000000,v3.7.40.6115") | ("BZ2,24a43c000000,v3.7.40.6115") hostapd: ath1: STA xx:xx:xx:xx:xx:xx IEEE 802.11: associated
2018-03-12 13:31:06 | ("U7PG2,f09fc2000000,v3.7.40.6115") | ("U7PG2,f09fc2000000,v3.7.40.6115") hostapd: ath3: STA xx:xx:xx:xx:xx:xx IEEE 802.11: disassociated
2018-03-12 13:31:06 | ("U7PG2,f09fc2000000,v3.7.40.6115") | ("U7PG2,f09fc2000000,v3.7.40.6115") libubnt[9577]: dpi.dpi_stainfo_notify(): dpi not enable
2018-03-12 13:31:06 | ("U7PG2,f09fc2000000,v3.7.40.6115") | ("U7PG2,f09fc2000000,v3.7.40.6115") libubnt[9577]: wevent.ubnt_custom_event(): EVENT_STA_LEAVE ath3: xx:xx:xx:xx:xx:xx / 0
2018-03-12 13:31:06 | ("BZ2,24a43c000000,v3.7.40.6115") | ("BZ2,24a43c000000,v3.7.40.6115") hostapd: ath1: STA xx:xx:xx:xx:xx:xx RADIUS: starting accounting session 5A9BFF48-00000286
2018-03-12 13:31:06 | ("BZ2,24a43c000000,v3.7.40.6115") | ("BZ2,24a43c000000,v3.7.40.6115") hostapd: ath1: STA xx:xx:xx:xx:xx:xx WPA: pairwise key handshake completed (RSN)
2018-03-12 13:31:06 | ("BZ2,24a43c000000,v3.7.40.6115") | ("BZ2,24a43c000000,v3.7.40.6115") syslog: wevent.ubnt_custom_event(): EVENT_STA_JOIN ath1: xx:xx:xx:xx:xx:xx / 4
2018-03-12 13:33:35 | proxmox01 | proxmox01 kernel: [30936072.607801] audit: type=1400 audit(1520861615.501:3062182): apparmor="DENIED" operation="ptrace" profile="docker-default" pid=26854 comm="node" requested_mask="trace" denied_mask="trace" peer="docker-default"
2018-03-12 13:33:38 | proxmox01 | proxmox01 kernel: [30936075.188047] audit: type=1400 audit(1520861618.081:3062183): apparmor="DENIED" operation="ptrace" profile="docker-default" pid=26854 comm="node" requested_mask="trace" denied_mask="trace" peer="docker-default"----[/mw_shl_code]
这只是一个小样本的内容,我们可以发现一些事情:
  • 针对我的面向公共服务器的登录攻击
  • WiFi接入点客户端连接/断开
  • Linux安全模块动作日志

现在我们将看到如何在这个案例中提取某些感兴趣的日志,即登录攻击。

要过滤掉我们的数据系统日志(包括:已经存在的,达到的,以后会到达的),我们只使用SQL语句:
[mw_shl_code=bash,true]ksql> SELECT TIMESTAMPTOSTRING(DATE, 'yyyy-MM-dd HH:mm:ss') AS SYSLOG_TS, HOST, MESSAGE \
FROM SYSLOG \
WHERE HOST='rpi-03' AND MESSAGE LIKE '%Invalid user%'\
LIMIT 5;
2018-03-04 15:14:24 | rpi-03 | rpi-03 sshd[24150]: Invalid user mini from 114.130.4.16
2018-03-04 15:21:49 | rpi-03 | rpi-03 sshd[24200]: Invalid user admin from 103.99.0.209
2018-03-04 15:21:58 | rpi-03 | rpi-03 sshd[24204]: Invalid user support from 103.99.0.209
2018-03-04 15:22:06 | rpi-03 | rpi-03 sshd[24208]: Invalid user user from 103.99.0.209
2018-03-04 15:22:23 | rpi-03 | rpi-03 sshd[24216]: Invalid user 1234 from 103.99.0.209
LIMIT reached for the partition.
Query terminated[/mw_shl_code]
(注意:我这里使用\换行来延续命令,如果sql语句都待在一行也是可以的)

这对于能够快速查询和检查日志非常有用。 但是让我们看看更有用的东西! 我们可以保留这些数据,不止过去的日志过滤,收到的最新的日志也是如此, 为此,只需将CREATE STREAM foo AS(通常称为CSAS)添加到查询的前面:
[mw_shl_code=bash,true]ksql> CREATE STREAM SYSLOG_INVALID_USERS AS \
SELECT * \
FROM SYSLOG \
WHERE HOST='rpi-03' AND MESSAGE LIKE '%Invalid user%';

Message
----------------------------
Stream created and running
----------------------------[/mw_shl_code]
我们创建了一个派生(derived )流,可以像在KSQL中的任何其他对象一样查询:
[mw_shl_code=bash,true]ksql> SELECT * FROM  SYSLOG_INVALID_USERS LIMIT 1;
1520176464386 | //192.168.10.105:38254 | 1520176464000 | 4 | rpi-03 | 6 | rpi-03 sshd[24150]: Invalid user mini from 114.130.4.16 | UTF-8 | /192.168.10.105:38254 | rpi-03.moffatt.me
LIMIT reached for the partition.
Query terminated[/mw_shl_code]
在原理上,KSK实际上创建了一个主题,并使用我们针对源主题定义的任何消息实时填充此主题。 我们可以看到新主题及其中的消息 - 它只是一个Kafka主题:
[mw_shl_code=bash,true]$ kafka-topics --zookeeper localhost:2181 --list|grep SYSLOG
SYSLOG_INVALID_USERS
$
$ kafka-avro-console-consumer \
--bootstrap-server proxmox01.moffatt.me:9092 \
--property schema.registry.url=http://proxmox01.moffatt.me:8081 \
--topic SYSLOG_INVALID_USERS --max-messages=1 --from-beginning|jq '.'
{
   "DATE": {
     "long": 1520176464000
   },
   "FACILITY": {
     "int": 4
   },
   "HOST": {
     "string": "rpi-03"
   },
   "LEVEL": {
     "int": 6
   },
   "MESSAGE": {
     "string": "rpi-03 sshd[24150]: Invalid user mini from 114.130.4.16"
   },
   "CHARSET": {
     "string": "UTF-8"
   },
   "REMOTE_ADDRESS": {
     "string": "/192.168.10.105:38254"
   },
   "HOSTNAME": {
     "string": "rpi-03.moffatt.me"
   }
}
Processed a total of 1 messages[/mw_shl_code]
要跟踪新流,列等的吞吐量,使用DESCRIBE EXTENDED命令:
[mw_shl_code=bash,true]ksql> DESCRIBE EXTENDED SYSLOG_INVALID_USER_LOGIN;

Type                 : STREAM
Key field            :
Timestamp field      : Not set - using <ROWTIME>
Key format           : STRING
Value format         : AVRO
Kafka output topic   : SYSLOG_INVALID_USER_LOGIN (partitions: 4, replication: 1)

Field          | Type
--------------------------------------------
ROWTIME        | BIGINT           (system)
ROWKEY         | VARCHAR(STRING)  (system)
DATE           | BIGINT
FACILITY       | INTEGER
HOST           | VARCHAR(STRING)
LEVEL          | INTEGER
MESSAGE        | VARCHAR(STRING)
CHARSET        | VARCHAR(STRING)
REMOTE_ADDRESS | VARCHAR(STRING)
HOSTNAME       | VARCHAR(STRING)
--------------------------------------------

Queries that write into this STREAM
-----------------------------------
id:CSAS_SYSLOG_INVALID_USER_LOGIN - CREATE STREAM SYSLOG_INVALID_USER_LOGIN AS SELECT * FROM SYSLOG WHERE HOST='rpi-03' AND MESSAGE LIKE '%Invalid user%';

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:     13.46   total-messages:      1335     last-message: 3/12/18 1:59:35 PM GMT
failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic SYSLOG_INVALID_USER_LOGIN)
ksql>[/mw_shl_code]

总结

KSQL使用SQL的简单声明性语言为任何人提供编写流处理应用程序的能力。 在本文中,我们可以看到我们如何检查数据syslog的入站流,并轻松创建写入第二个Kafka主题的过滤消息的实时流。

在下一篇文章中,我们将简要介绍一下KSQL本身,并看看使用Python编写一个非常简单的推送通知系统。 我们将看到一些简单的异常检测,它基于KSQL的有状态聚合功能。

########################
关注公众号,查看about云经典文章



还有更多资源
关注公众号,获取人工智能20套,区块链资源5阶段
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23842





本帖被以下淘专辑推荐:

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条