简介
主要描述
Apache Flume是一个分布式,可靠且可用的系统:用于有效地从许多不同的源收集、聚合和移动大量日志数据到一个集中式的数据存储区。
不只限于日志数据。因为数据源可以定制也可以被用来传输大量事件数据
这些数据不仅仅包括网络通讯数据、社交媒体产生的数据、电子邮件信息等等。
历史
Apache Flume 是 Apache 基金会的顶级项目,在加入 Apache 之前由 cloudera 公司开发以及维护。
Apache Flume 目前有两种主版本: 0.9.x 和 1.x。
其中 0.9.x 是历史版本,我们称之为 Flume OG(original generation)。
2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构
重构后的版本统称为 Flume NG(next generation),也就是 1.x 版本。
对比Logstash优势
- Flume注重数据事务,注重传输的事务正确性
- 因为有些Flume Channel支持数据会持久化,所以不存在Logstash Buffer的数据丢失
架构
数据流模型
其中最核心的三个组件是 Source、Chanel 以及 Slink。
Source
消费由外部源(如Web服务器)传递给它的事件。外部源以一定的格式发送数据给 Flume,这个格式的定义由目标 Flume Source 来确定。
例如:一个Avro Flume source
可以从 Avro(Avro是一个hadoop的一个子项目,基于二进制数据传输的高性能中间件)
客户端接收 Avro 事件,也可以从其他Flume Agents
(该 Flume agents 有 Avro sink)接收 Avro 事件。
同样,我们可以定义一个Thrift Flume Source
接收来自Thrift Sink
、Flume Thrift RPC
客户端或者其他任意客户端(该客户端可以使用任何语言编写,只要满足 Flume thrift 协议)的事件。
Channel
可以理解为缓存区,用来保存从Source
那拿到的数据,直到Flume Slink
将数据消费。
File Chanel
是一个例子,它将数据保存在文件系统中(当然你可以将数据放在内存中)。
Slink
从 channel 消费完数据就会将数据从Channel
中清除,随后将数据放到外部存储系统例如HDFS(使用 Flume HDFS Sink)
或发送到其他 Flume Agent
的Source
中。
不管是Source
还是Slink
都是异步发送和消费数据。
复杂的流
合并情景
多Agent流情景
优势
可靠性
事件被存储在每个Flume Agent
的Channel
中。
随后这些事件会发送到流中的下一个Flume Agent
或者设备存储中(例如 HDFS)。
只有事件已经被存储在下一个Flume Agent
的Channel
中或设备存储中时,当前Channel
会清除该事件。
这种机制保证了流在端到端的传输中具有可靠性。
Flume使用事务方法(transactional approach)来保证事件的可靠传输。
在 source 和 slink 中,事件的存储以及恢复作为事务进行封装,存放事件到 channel 中以及从 channel 中拉取事件均是事务性的。
这保证了流中的事件在节点之间传输是可靠的。
可恢复
事件在 channel 中进行,该 channel 负责保障事件从故障中恢复。
Flume 支持一个由本地文件系统支持的持久化文件(文件模式:channel.type="file")
channel。
同样也支持内存模式(channel.type="memmory")
,即将事件保存在内存队列中。
显然内存模式相对与文件模型性能会更好,但是当 agent 进程不幸挂掉时,内存模式下存储在 channel 中的事件将丢失,无法进行恢复。
使用
Flume agent 的配置保存在一个本地配置文件中conf/flume-conf.properties
。
可以在同一配置文件中指定一个或多个 agent 的配置。
配置文件指定了 agnet 中每个 source、channel、slink 的属性,以及三者如何组合形成数据流。
流中的每一个组件(source、channel、slink)都有自己的名称、类型以及一系列配置属性。例如以下组件
Channel
Memory Channel
Memory Channel是使用内存来存储Event,使用内存的意味着数据传输速率会很快,但是当Agent挂掉后,存储在Channel中的数据将会丢失。
Property Name | Default | Description |
---|---|---|
type | – | 类型指定为:memory |
capacity | 100 | 存储在channel中的最大容量 |
transactionCapacity | 100 | 从一个source中去或者给一个sink,每个事务中最大的事件数 |
keep-alive | 3 | 对于添加或者删除一个事件的超时的秒钟 |
byteCapacityBufferPercentage | 20 | 定义缓存百分比 |
byteCapacity | see description | Channel中允许存储的最大字节总数 |
配置
1 | a1.channels = c1 |
除此之外还有FileChannel,KafkaChannel,JDBC Channel,具体介绍于:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
Source
Spooling Directory Source
Spooling Directory Source可以获取硬盘上”spooling”目录的数据
这个Source将监视指定目录是否有新文件,如果有新文件的话,就解析这个新文件。
事件的解析逻辑是可插拔的。在文件的内容所有的都读取到Channel之后
Spooling Directory Source会重名或者是删除该文件以表示文件已经读取完成。
不像Exec Source,这个Source是可靠的,且不会丢失数据。即使Flume重启或者被Kill。但是需要注意如下两点:
- 1:如果文件在放入spooling目录之后还在写,那么Flume会打印错误日志,并且停止处理该文件。
- 2:如果文件之后重复使用,Flume将打印错误日志,并且停止处理。
为了避免以上问题,我们可以使用唯一的标识符来命令文件,例如:时间戳。
配置
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | 组件类型名称需要为“spooldir”。 |
spoolDir | – | 要从中读取文件的目录。 |
fileSuffix | .COMPLETED | 文件的后缀 |
deletePolicy | never | 何时删除已完成的文件: never or immediate |
fileHeader | false | 是否添加存储绝对路径添加到header中,解析出来的event在header上将添加一个属性 |
fileHeaderKey | file | 当将绝对路径附加到event header使用的Key。 |
basenameHeader | false | 是否添加存储文件名的标题到event header中。 |
basenameHeaderKey | basename | 将文件名附加到event header时使用的Key。 |
includePattern | ^.*$ | 指定要包含哪些文件的正则表达式。它可以和ignorePattern一起使用。 如果一个文件同时匹配’ ignorePattern ‘和’ includePattern ‘ regex,该文件将被忽略。 |
ignorePattern | ^$ | 指定要忽略(跳过)哪些文件的正则表达式。它可以和includePattern一起使用。 如果一个文件同时匹配’ ignorePattern ‘和’ includePattern ‘ regex,该文件将被忽略。 |
trackerDir | .flumespool | 目录存储与文件处理相关的元数据。 如果此路径不是绝对路径,则将其解释为相对于spoolDir。 |
consumeOrder | oldest | 消费spooling目录文件的规则,分别有:oldest,youngest和random。在oldest 和 youngest的情况下, 通过文件的最后修改时间来比较文件。如果最后修改时间相同,就根据字典的序列从小开始。 在随机的情况下,就随意读取文件。如果文件列表很长,采用oldest/youngest可能会很慢,因为用oldest/youngest要扫描文件。 但是如果采用random的话,就可能造成新的文件消耗的很快,老的文件一直都没有被消费。 |
pollDelay | 500 | 轮询新文件时使用的延迟(毫秒)。 |
recursiveDirectorySearch | false | 是否监视要读取的新文件的子目录。 |
maxBackoff | 4000 | 如果Channel已经满了,那么该Source连续尝试写入该Channel的最长时间(单位:毫秒)。 |
batchSize | 100 | 批量传输到Channel的粒度。 |
inputCharset | UTF-8 | 字符集 |
decodeErrorPolicy | FAIL |
在文件中有不可解析的字符时的解析策略。FAIL : 抛出一个异常,并且不能解析该文件。REPLACE : 取代不可解析的字符,通常用Unicode U+FFFD. IGNORE : 丢弃不可能解析字符序列。 |
deserializer | LINE |
自定序列化的方式,自定的话,必须实现EventDeserializer.Builder . |
deserializer.* | ||
bufferMaxLines | – | 已废弃 |
bufferMaxLineLength | 5000 | (不推荐使用) 一行中最大的长度,可以使用deserializer.maxLineLength代替。 |
selector.type | replicating | replicating(复制) 或 multiplexing(复用) |
selector.* | 取决于selector.type的值 | |
interceptors | – | 空格分割的interceptor列表。 |
interceptors.* |
案例
一个a-1的Agent Sources
的例子:
1 | a1.channels = ch-1 |
完整的例子
1 |
|
组件存放目录
192.168.4.218 /data/workspace/apache-flume-1.8.0-bin
执行命令
bin/flume-ng agent --conf conf --conf-file conf/flume-directory.properties --name a1 -Dflume.root.logger=INFO,console
解析
配置相关
1 |
|
启动
1 |
|
执行流程
1 | private class SpoolDirectoryRunnable implements Runnable { |
Taildir Source
由于SpoolDirectorySource
监听目录下的文件不允许动态变化以及无法监听目录嵌套子目录
所以使用这个组件即可消除限制,ExecSource
+ SpoolDirectorySource
的功能
配置
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | 组件类型名称需要为TAILDIR . |
filegroups | – | 文件组的以空格分隔的列表。每个文件组表示要跟踪的一组文件. |
filegroups.<filegroupName> |
– | 文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名. |
positionFile | ~/.flume/taildir_position.json | 文件的JSON格式,以记录inode、绝对路径和每个尾随文件的最后位置. |
headers.<filegroupName>.<headerKey> |
– | |
byteOffsetHeader | false | 是否将尾部行的字节偏移量添加到名为“byteoffset”的header中。 |
skipToEnd | false | 如果文件没有写在位置文件上,是否跳过位置到EOF。 |
idleTimeout | 120000 | 关闭非活动文件的时间(ms)。 如果将已关闭的文件追加到新行,则该源将自动重新打开它。 |
writePosInterval | 3000 | 将每个文件的最后一个位置写入位置文件的间隔时间(ms)。 |
batchSize | 100 | 读取的行数,默认是100 |
backoffSleepIncrement | 1000 | 重新尝试轮询新数据之前的时间延迟增量,上一次尝试时未发现任何新数据。 |
maxBackoffSleep | 5000 | 每次重新尝试轮询新数据之间的最大时间延迟,上一次尝试未发现任何新数据。 |
cachePatternMatching | true | 对于包含数千个文件的目录,列出目录并应用filename regex模式可能会很耗时。 缓存匹配的文件列表可以提高性能。 文件的使用顺序也将被缓存。要求文件系统以至少1秒的粒度跟踪修改时间。 |
fileHeader | false | 是否添加header存储文件绝对路径 |
fileHeaderKey | file | fileHeader启用时,使用的key |
案例
1 | a1.sources = source1 |
组件存放目录
192.168.4.218 /data/workspace/apache-flume-1.8.0-bin
执行命令
bin/flume-ng agent --conf conf --conf-file conf/flume-taildir.properties --name a1 -Dflume.root.logger=INFO,console
问题
- 由于log更名重复获取数据
- 合并异常行功能缺失
以上问题可直接使用以下项目解决,将target打出的jar包丢入${flume目录}/lib
下
http://192.168.4.210/yanfa/flume-taildir-source
并按照以上项目的flume source
配置方式进行配置
Interceptor
简介
拦截在source
层对event
的包装、筛选过滤、提取相关数据等作用
Timestamp Interceptor
Host Interceptor
Static Interceptor
Regex Filtering Interceptor
Regex Extractor Interceptor
案例
1 | #sources描述/配置Source |
Sink
ElasticSearchSink
配置
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | 组件名称org.apache.flume.sink.elasticsearch.ElasticSearchSink |
hostNames | – | 主机名的逗号分隔列表:端口,如果端口不存在将使用默认端口’ 9300 ‘ |
indexName | flume | 将添加日期的索引的名称。 示例“flume”->“flume-yyyy- mm -dd”支持任意标题的替换 例如。%{header}用指定事件头的值替换 |
indexType | logs | 支持为文档建立索引的类型,默认为“log”。%{header}用指定事件头的值替换 |
clusterName | elasticsearch | 要连接的ElasticSearch集群的名称 |
batchSize | 100 | 每个txn要写入的事件数。 |
ttl | – | TTL在设置的时候会自动删除过期的文档,如果不设置则永远不会自动删除。TTL只接受前面整数形式的整数, 还有限定词ms(毫秒)、s(秒)、m(分钟)、h(小时)、d(天)和w(周)。 示例ttl = 5d将ttl设置为5天。更多信息请访问http://www.elasticsearch.org/guide/reference/mapping/ttl-field/。 |
serializer | org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer | 要使用的ElasticSearchIndexRequestBuilderFactory或ElasticSearchEventSerializer。这两个类的实现都可以接受,但最好是ElasticSearchIndexRequestBuilderFactory。 |
serializer.* | – | 要传递给序列化器的属性。 |
案例
1 | a1.sources = source1 |
组件存放目录
192.168.4.218 /data/workspace/apache-flume-1.8.0-bin
因flume不支持es的新版本,所以需要自定义ElasticSearchSink
当前问题可直接使用以下项目解决,将以下项目编译,在target打出的zip包内的jar包丢入${flume目录}/lib
下
flume-elasticsearch-sink
由以上项目得到的配置文件
Property Name | Default | Description |
---|---|---|
channel | - | |
type | - | The component type name, has to be com.cognitree.flume.sink.elasticsearch.ElasticSearchSink |
es.cluster.name | elasticsearch | Name of the elasticsearch cluster to connect to |
es.client.hosts | - | Comma separated hostname:port pairs ex: host1:9300,host2:9300. The default port is 9300 |
es.bulkActions | 1000 | The number of actions to batch into a request |
es.bulkProcessor.name | flume | Name of the bulk processor |
es.bulkSize | 5 | Flush the bulk request every mentioned size |
es.bulkSize.unit | MB | Bulk request unit, supported values are KB and MB |
es.concurrent.request | 1 | The maximum number of concurrent requests to allow while accumulating new bulk requests |
es.flush.interval.time | 10s | Flush a batch as a bulk request every mentioned seconds irrespective of the number of requests |
es.backoff.policy.time.interval | 50M | Backoff policy time interval, wait initially for the 50 miliseconds |
es.backoff.policy.retries | 8 | Number of backoff policy retries |
es.client.transport.sniff | false | Enable or disable the sniff feature of the elastic search |
es.client.transport.ignore_cluster_name | false | Ignore cluster name validation of connected nodes |
es.client.transport.ping_timeout | 5s | The time to wait for a ping response from a node |
es.client.transport.nodes_sampler_interval | 5s | How often to sample / ping the nodes listed and connected |
es.index | default | Index name to be used to store the documents |
es.type | default | Type to be used to store the documents |
es.index.builder | com.cognitree. flume.sink. elasticsearch. StaticIndexBuilder | com.cognitree.flume.sink.elasticsearch的实现。IndexBuilder接口 |
es.serializer | com.cognitree. flume.sink. elasticsearch. SimpleSerializer | com.cognitree.flume.sink.elasticsearch的实现。序列化器接口 |
es.serializer.csv.fields | - | 逗号分隔的csv字段名与数据类型,即column1:type1,column2:type2,支持的数据类型有string, boolean, int和float |
es.serializer.csv.delimiter | ,(comma) | 事件体中数据的分隔符 |
es.serializer.avro.schema.file | - | 模式配置文件的绝对路径 |
1 | a1.sinks.sink1.type=com.cognitree.flume.sink.elasticsearch.ElasticSearchSink |
执行命令
bin/flume-ng agent --conf conf --conf-file conf/flume-es.properties --name a1 -Dflume.root.logger=INFO,console
Kibana
增加Shield设置不同的权限级别
基础查询
基于Lucene查询语法
如输入@type: INFO
,则将所有@type
字段为INFO
的文档搜索列举出来
为了指定复杂的查询条件可以用布尔操作符 AND , OR , 和 NOT
- 如果不打引号
""
,则@message: servlet web
,会搜索servlet
,web
两个关键词
Visualize
Line
Area
HorizontalBar
Pie
HeatMap
VerticalBar
Metric
Timelion
DashBoard
通过之前制造的各类图表放入仪表盘,然后通过search
可以搜索具体呈现的各类仪表
未完待续……..
国内查看评论需要代理~