一、 引言
本文章将介绍elasticsearch从采集数据到分析数据,在不同阶段进行字段的过滤的详尽方法。
二、背景
由于收集的日志信息经过解析后字段非常多,很多都不是目标字段,客户需要我们将字段进行过滤操作。
从收集的流程来看一般的日志数据处理架构是由beats进行数据处理,由kafka进行数据缓存,流入logstash进行数据的解析,推送至elasticsearch进行数据的分析存储,最后由kibana进行分析展示,因此可以从源端、管道端、分析端,这三个阶段考虑完成这个目标需求。
技术架构:
三、 解决方案
1、源端
Filebeat是elastic stack中一个日志文件托运工具,在服务器上安装客户端后,Filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读),并且转发这些信息到ElasticSearch或者Logstash中存放。
当开启Filebeat程序的时候,它会启动一个或多个探测器(prospectors)去检测指定的日志目录或文件,对于探测器找出的每一个日志文件,Filebeat启动收割进程(harvester),每一个收割进程读取一个日志文件的新内容,并发送这些新的日志数据到处理程序(spooler),处理程序会集合这些事件,最后filebeat会发送集合的数据到指定的地点。Filebeat其中包含了processors模块可以进行数据的初步处理,例如过滤数据等。
可以在filebeat.yml文件中找到这个模块进行自定义的字段操作
采用drop_fields方法清除非目标字段,当满足条件时,将非目标字段进行弃用。例如想去掉非目标字段http.response.code,使用has_fields方法进行判断,存在目标字段时将其弃用。
processors:
- drop_fields:
when:
has_fields: ['http.response.code']
fields: [“http.response.code”]
ignore_missing: false #如果指定字段不存在时,不会返回错误
2、管道端
Logstash 是elastic stack开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到存储库中。数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。
Logstash是基于pipeline方式进行数据处理的,pipeline可以理解为数据处理流程的抽象。在一条pipeline数据经过上游数据源汇总到消息队列中,然后由多个工作线程进行数据的转换处理,最后输出到下游组件。一个logstash中可以包含多个pipeline。
Logstash管道有两个必需的元素,输入和输出,以及一个可选元素过滤器:
Input:数据输入组件,用于对接各种数据源,接入数据,支持解码器,允许对数据进行编码解码操作;必选组件;
output:数据输出组件,用于对接下游组件,发送处理后的数据,支持解码器,允许对数据进行编码解码操作;必选组件;
filter:数据过滤组件,负责对输入数据进行加工处理;
针对过滤数据字段的需求,我们可以采用drop filter插件进行数据的处理,使用filebeat从源端采集数据后进入kafka集群进行队列缓存,数据流入logstash进行过滤,使用drop remove_field可以 将非目标字段进行移除。
filter {
drop {
remove_field => [ "http.response.code" ]
}
}
2、分析端
Elastic 增加了Ingest node功能,Ingest node可以自定义pipeline,用户可在pipeline中自定processor,对数据进行过滤转换提取等操作,功能很强大,为elastic提供了一种文档预处理和增强转换的轻量级方案。
数据流入elastisearch,我们将采用Elasticsearch ingest pipeline移除已经存在的字段lasticsearch ingest pipeline用于预处理数据,pipeline是一系列处理管道,一系列的processors,先来看下pipeline的处理过程:
常用pipeline如下:
1、Trim:去除空格,如果是字符串类型的数组,数组中所有字符串都会被替换空格处理;
2、Split:切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型;
3、Rename:重命名字段;
4、Foreach:对一组数据进行相同的预处理,可以使用Foreach;
5、Lowercase/Uppercase:对字段进行大小写转换;
6、Remove:移除字段;
7、Set:设置字段值;
本次需求是去除非目标字段,因此选用remove方法进行操作:
#定义remove pipeline
PUT _ingest/pipeline/remove_pipeline
{
"description": "remove processor",
"processors": [
{
"remove": {
"field": "http.response.code"
}
}
]
}
#测试
POST _ingest/pipeline/remove_pipeline/_simulate
{
"docs": [
{
"_source": {
" http.response.code ": [
"200",
"400"
]
}
}
]
}
#返回,可以看到http.response.code字段已经被移除
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : { }
}
}
}
]
}
四、 综合对比
filebeat | logstash | elasticsearch | |
内存 | 小 | 大 | 大 |
功能 | 传输 | 从多种输入端采集并实时解析和转换数据并输出到多种输出端 | 分析、处理、存储数据 |
轻重 | 轻量级二进制文件 | 相对较重 | 较重 |
过滤能力 | 有过滤能力但是较弱 | 强大的过滤能力 | 无过滤能力,但是可以将已经导入的数据进行处理 |
进程 | 十分稳定 | 一台服务器只允许一个logstash进程,挂掉之后需要手动拉起 | 十分稳定 |
集群 | 单节点 | 单节点 | 分布式 |
Filebeat:
设计目的写Logstash和ES,轻量型日志采集工具,与logstash协作,更稳健少宕机(数十MB级别)
Logstash:
使用简单较好上手,插件丰富,扩展功能更强,注重数据预处理(GB级别)
Elasticsearch:
上手难度大于前两个,注重于数据导入后的数据处理(GB级别)