Darcy's Blog

不如烂笔头


  • 首页

  • 标签

  • 分类

  • 归档

  • 搜索

Fluentd性能优化实践

发表于 2018-10-04 | 更新于 2019-01-11 | 分类于 日志收集 |

我们的日志收集系统使用的是Fluentd,使用Fluentd的原因大概是因为配置简单、插件比较多、而且能够比较容易的定制自己的插件。但是随着日志越来越多以后,Fluentd会出现性能上的问题,以下的文章将回顾我们进行Fluentd性能优化的操作。

Fluentd 性能问题的主要原因?

抛开自己配置错误的一些原因,Fluentd性能问题的最主要原因是因为Fluentd是使用Ruby写的,而Ruby有全局锁(GIL),因而在一个Ruby进程里面同时最多只有一个线程在运行。这样的话,Ruby的多线程对需要更多计算资源的操作显得无能为力,具体的体现可以用top查看进程的运行情况,如果Fluentd到达性能瓶颈的话,Fluentd的进程会一直占用100%左右的计算资源,再也不能提升,对于有四个核的计算机来说,最多也就使用的1/4的计算能力,这是极其浪费的。而且当Fluentd进程到达瓶颈后,数据会处理不完,导致数据收集的速度落后于数据产生的速度。

Fluentd 多进程优化一

既然已经知道了Fluentd性能瓶颈的问题主要是因为单进程不能使用多核的计算能力,那解决的方法也是很简单的,可以把收集的日志按照不同的类型来拆分成不同的进程,这样就能充分的利用多核的计算能力了。

多进程架构

Fluentd多进程架构一 如上图所示,之前单进程的架构可以改成如上的多进程架构,这样每个进程最多可以使用100%的cpu资源,理论上四核的机器起四个Fluentd进程就可以充分的利用机器的计算资源的了,但是实际上这个架构还有一些问题未能解决。

主要问题

这个多进程的架构相比于单进程的架构在性能上已经有很大的提升了,不过还有如下两个问题:

  1. 该架构要求Log的拆分要比较均衡,这样每个进程都能合理的利用计算资源,不然会出现有些进程非常繁忙,但是有些进程却非常的空闲。然而Log的拆分是按照之前的经验来拆分的,不可能做到绝对的均衡,而且拆分完后是直接写到配置文件里面的,也不能进行实时调整。
  2. 即使是按照现在的架构进行日志拆分了,但是有些日志的计算任务比较繁重,有可能导致即使一个进程只处理一个类型的Log也会到达性能瓶颈。如上图的Process3只处理Log5,但是在top中却看到Process3的cpu使用率一直是100%,这说明Process3已经到达了性能瓶颈,但是Log5已经不能再进行拆分了。

基于上面的这两个问题,这种多进程架构还是会遇到性能瓶颈,因此需要对架构再继续进行优化,接下来介绍新的架构来优化Fluentd的性能。

Fluentd 多进程优化二

上面的日志拆分架构其实在我们的系统中已经用过了一段时间了,但是我们发现有些Fluentd进程一直很繁忙,我们本来是想着怎么把日志拆分的更加均衡一些,但是无意中在Fluentd官网中看到了他们推荐的架构,觉得这才是真正正确的做法,之前走的是弯路。

新的多进程架构

Fluentd多进程架构二 这是Fluentd官网给出的多进程架构图,该架构采用两层的结构,前面的一层只负责数据的路由,把数据按照一定的比例路由到第二层,然后第二层才对数据进行处理。第二层可以根据机器的资源起不同个数的进程,甚至可以把第二层部署到不同的机器上去。这样就能解决旧的架构的分配不均和计算瓶颈的问题了,每种日志都能很均衡的使用机器的计算资源,甚至可以分布式扩展。同时也不用苦恼于怎么拆分Log来让进程的计算资源更加的均衡。

注意事项

新的架构虽然在理论上是非常好的架构,但是在配置的过程中需要注意一些问题:

  1. 新架构第一层使用的是forward插件把log路由到第二层的,需要注意的是forward的插件也是需要把buffer_type配置成文件的,不然如果第二层的处理能力不够的话,就会导致第一层的buffer数据一直堆在内存里面,导致内存不够。还会造成在停Fluentd进程的时候,如果是第二层进程先停的话,那么第一层的数据会不能发送的到第二层,第一层的进程会一直停不掉。如果机器不小心关掉的话,还会造成数据丢失。
  2. 新架构第一层的flush_interval(推荐1秒)和buffer_chunk_size(推荐1M)要配置的尽量小,这样数据才能尽快的发送到第二层进行处理。其实如果配置的比较大的话,到时候第二层会有报警的。
  3. 新架构的缺点是同一种Log会有多个处理进程,这样的话就会导致一些只能单进程处理的操作变得不那么优美了。比如webhdfs插件,现在使用新的多进程架构后,因为每个hdfs文件只能由一个进程写入,所以现在同一种Log是由多个进程写入的,只能写入到多个不同的文件,这样会造成hdfs文件数量成倍的增加。
配置实例

下面给出了一个比较简单的配置实例,该实例只适用于单机版本。如果数据比较多的话,还可以把out_fluent.x.conf的配置文件扩展到多台机器。

multiprocess.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
</source>

<source>
@type multiprocess
<process>
cmdline -c in_fluentd.conf --log logs/in_fluentd.conf.log
sleep_before_start 1s
sleep_before_shutdown 5s
</process>
<process>
cmdline -c out_fluent.0.conf --log logs/out_fluent.0.conf.log
sleep_before_start 1s
sleep_before_shutdown 5s
</process>
<process>
cmdline -c out_fluent.1.conf --log logs/out_fluent.1.conf.log
sleep_before_start 1s
sleep_before_shutdown 5s
</process>
</source>

in_fluentd.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<source>
@type tail
@log_level warn
format tsv
keys source,version,event_time
time_key event_time
time_format %Y-%m-%d %H:%M:%S
path /data/*/rolelogout.*
pos_file logs/fluentd/pos/rolelogout.pos
refresh_interval 10s
read_from_head true
keep_time_key true
tag pro_role_logout
</source>

<match pro_role_logout>
@type forward
num_threads 4
buffer_type file
buffer_queue_limit 2048
buffer_chunk_limit 10m
flush_interval 10s
buffer_path logs/fluentd/buffer/in_pro_role_logout.buffer
<server>
host 127.0.0.1
port 24000
</server>
<server>
host 127.0.0.1
port 24001
</server>
</match>

out_fluent.0.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<source>
@type forward
port 24000
bind 0.0.0.0
</source>

<match pro_role_logout>
@type forest
subtype webhdfs
<template>
username webuser
namenode tsh-hdp-namenode-001:50070
standby_namenode tsh-hdp-namenode-002:50070
path /raw_logs/dt=%Y-%m-%d/role_logout.${tag}.%Y%m%d%H.VPROFLTDSG.0.log
flush_interval 10s
field_separator TAB
buffer_queue_limit 1024
buffer_chunk_limit 16m
buffer_type file
buffer_path logs/fluentd/buffer/webhdfs_role_logout.${tag}.VPROFLTDSG.0.buffer
output_include_time false
output_include_tag false
output_data_type attr:version,event_time
flush_at_shutdown true
retry_wait 30s
num_threads 1
read_timeout 180
open_timeout 120
</template>
</match>

out_fluent.1.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<source>
@type forward
port 24001
bind 0.0.0.0
</source>

<match pro_role_logout>
@type forest
subtype webhdfs
<template>
username webuser
namenode tsh-hdp-namenode-001:50070
standby_namenode tsh-hdp-namenode-002:50070
path /raw_logs/dt=%Y-%m-%d/rrole_logout.${tag}.%Y%m%d%H.VPROFLTDSG.1.log
flush_interval 10s
field_separator TAB
buffer_queue_limit 1024
buffer_chunk_limit 16m
buffer_type file
buffer_path logs/fluentd/buffer/webhdfs_role_logout.${tag}.VPROFLTDSG.0.buffer
output_include_time false
output_include_tag false
output_data_type attr:version,event_time
flush_at_shutdown true
retry_wait 30s
num_threads 1
read_timeout 180
open_timeout 120
</template>
</match>

总结

通过这次架构的升级,Fluentd的性能已经得到了很大的提升,而且配置也变得更加简单了,好的架构往往能够事半功倍。

Python的Iterator不能当List用

发表于 2018-09-09 |

之前在重构流处理框架的时候,把在每个模块里面处理的数据类型从List,变成了pyspark里面foreachPartition输入的函数的参数,其实就是一个Iterator类型的参数,用来遍历整个Partition的数据。但是后面发现有些模块没有执行,最后发现竟然是误用Iterator造成的bug。

问题

Iterator类型的数据只能遍历一次,但是List可以一直遍历,很简单的一段代码就可以说明这个问题:

1
2
3
4
5
6
lst = [1, 2, 3, 4, 5]
it = iter(lst)
print([v for v in lst])
print([v for v in lst])
print([v for v in it])
print([v for v in it])

这段代码的输出是:

1
2
3
4
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[]

很简单的就可以看出Iterator和List的不同了,我项目中的问题就对Iterator遍历了两次,第二次遍历的代码等于没执行。这样说来Iterator是不是就都可以用List替换了,或者说Iterator就没有优势了?答案显然不是的。

Iterator

什么是Iterator

Iterator是访问集合元素的一种方式。Iterator对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。Iterator只能单向访问,且不能回退。

Iterator的优势

我觉得Iterator的主要优势是延迟计算,他并不像List那样需要事先把所有的元素都放到List,而是访问到的时候才产生所需要的元素,访问之后的元素如果其他地方没有用到,其占用的内存也可以被回收掉,大大的减少了集合遍历所需的内存。想象一下,如果需要访问的集合是非常巨大的话,这样的话List就需要分配非常多的内存。下面的代码举个例子:

1
2
3
4
5
6
7
8
import random
def gen():
i = 0
while i < 1000000000:
yield random.random()

for rand in gen():
print(rand)

总结

这篇文章主要了解了下List和Iterator的区别,不过在有些方法中不管你传的是Iterator还是List都会转成List类型,用List来求集合的长度,比如进程池的map函数。当集合比较小的时候不管是使用List还是Iterator,都行。

fabric远程执行后台脚本卡住的问题

发表于 2018-07-28 |

最近用fabric开发了一个简单的运维系统,该系统可以在远程机器上执行一些命令,在批量执行后台脚本的启动的时候,发现fabric会卡住,不能正常的返回。在fabric官方有给出一些解决方案Why can’t I run programs in the background with &? It makes Fabric hang,但是这些方案都要额外的工具,有些麻烦。接下来简单的介绍下fabric的机制和我的解决方案。

fabric是怎么在远程机器上执行命令的?

fabric是先用ssh连接到远程机器上,然后再执行相关的命令。

fabric为什么会卡住不返回?

ssh连接到远程机器上的时候会起一个session,fabric卡住不返回是因为这个session一直结束不掉,这个session之所以结束不掉是因为我们起的后台进程有输出是定向到这个session的。

怎么解决?

只要把后台进程的输出重定向到其他地方,fabric就可以正常返回。比如把后台脚本的启动方式改成yourcommand > /dev/null 2>&1 &,这样就会把yourcommand脚本的所有输出重定向到/dev/null,因此fabric就可以正常返回了。

注意事项

我之前后台脚本的启动方式是yourcommand 2>&1 1> /dev/null,按照我本来的理解,应该是把stderr和stdout都重定向到/dev/null了,但是这个命令只是把stdout重定向到/dev/null,stderr还是没有被重定向。

使用yourcommand &> /dev/null这种方式也可以把所有的输出重定向到/dev/null,这种方式的命令也更短一些。

python标准开发环境

发表于 2018-06-23 |

很多初学python的同学可能都知道pip,通过pip可以安装自己的需要的python第三方库。但是有很多同学可能不知道pipenv,在这篇文章我将介绍如何用pipenv来构建自己的开发环境,以及用pipenv快速部署生产环境

什么是pipenv?

pipenv是一个把包管理工具和虚拟环境结合的工具,使用pipenv install和使用pip install一样,可以安装任何你想安装的第三方库,不同的是pipenv会记录你安装的库和使用的python版本,在部署生产环境的时候直接使用pipenv install就可以安装所有的依赖、甚至特定的python版本。pipenv项目地址

如何使用pipenv?

  1. 开发环境使用
    项目开发的时候使用pip install在项目的目录下面安装所需要的库,pipenv会自动在项目的根目录下面产生Pipfile,Pipfile.lock这两个文件,这两个文件记录了当前项目的使用第三方库和python版本的信息,同时pipenv会为这个项目构建一个虚拟环境(pipenv –venv可以查看虚拟环境的位置)。pipenv shell可以把当前的命令行环境切换到当前项目的虚拟环境下执行。要用虚拟环境运行一个python文件的话,则使用pipenv run python a.py。
  2. 生产环境使用
    当项目开发完成以后,把项目上传到需要部署该项目的服务器。然后在项目的根目录下运行pipenv install,这时候pipenv会自动为该项目创建所依赖的python的版本的虚拟环境,同时在该虚拟环境下安装所有需要的第三方库。有一个问题需要注意的是:如果机器上没有安装项目所需要的python版本的话,这时候虚拟环境就会创建失败。要解决这个失败的话有两个办法:1.安装项目所依赖的python版本。2.安装pyenv,让pipenv在创建虚拟环境的时候使用pyenv自动安装所需要的python版本。

使用pyenv

pyenv是一个python包管理工具,可以在一台机器上面安装多个python版本。我推荐至少在生产环境上面安装pyenv,这样在项目代码部署到生产环境的时候根据项目的实际需要安装不同的python版本。pyenv项目地址

总结

python程序员在开发项目的时候推荐使用pipenv,这样在项目分享给别人或者部署的时候都可以无比的轻松。同时推荐使用pyenv,pipenv和pyenv配合使用不要太爽了。这篇文章只是简单的介绍pipenv和pyenv,具体的使用方法请查阅他们的官方文档。

在Spark中使用Python单例的技巧

发表于 2018-05-20 | 更新于 2018-10-27 | 分类于 python |

在使用spark编程的时候,我们需要知道我们编写的某一段代码是在driver端执行,还是在executor端执行。如果是在executor端执行的话,就要注意这些代码是否是可以序列化发送到executor端执行,如果不行的话就会报错。接下来这篇文章主要讨论Python单例在使用Spark的时候解决的问题。

问题描述

在使用spark的时候经常会用到map来对数据进行处理,在map函数里面的代码会被序列化,然后发送到executor端执行。但是并不是所有的代码都是可以被序列化的,比如一个mysql连接的实例就不能被序列化,然后发送到executor端执行。要解决这个问题很简单,如果一个mysql连接的实例不能被序列化发送的话,可以在executor端实例化一个mysql连接,然后使用这个连接。

1
2
3
4
def transform(row):
connector = mysql.connector.connect(**mysql_conf)
# Do something
rdd.map(transform)

上述的代码可以正确的运行,但是有一个比较严重的问题:map函数在每执行一次转换的时候都需要实例化一个mysql的实例,在实例化的时候要与mysql服务器建立连接,这个开销是比较大的,可想而知效率会非常的低。

要解决这个问题我们可以用mapPartitions接口来代替map,mapPartitions接口不是对每行数据进行操作的时候都执行一个mysql实例化操作,而是对一个partition的数据执行操作时候才执行一个mysql实例化操作,这样效率会大大的提升,上述的代码可以改成如下的代码。

1
2
3
4
5
def transform(rows):
connector = mysql.connector.connect(**mysql_conf)
for row in rows:
# Do something
rdd.mapPartitions(transform)

这样问题似乎已经解决了,但是解决的还不完全,或者说只解决rdd的问题,因为虽然spark的rdd支持mapPartitions操作,但是DataFrame并不支持mapPartitions的操作(对DataFrame执行map操作可以注册一个udf函数,然后调用这个udf函数)。有一个不太优美的做法是先把DataFrame转化成rdd进行mapPartitions,然后再把处理后的rdd转化成DataFrame。要在两种数据结构之间来回转化,这个操作实在是不够优美,而且效率低下。所以我们需要找到一个让mysql在map操作中只实例化一次的方法。

问题解决

要优美的解决上面提到的这个问题的话,我们就需要用到Python的单例了。Python的单例实现方式有很多种,这在边我只介绍最简单的一种。利用Python的import机制可以确保一个模块只被导入一次,如果在这个模块里面实例化一个mysql实例的话,那么即使被多次导入,也只会被实例化一次。

1
2
# SingletonMysql.py
connector = mysql.connector.connect(**mysql_conf)

1
2
3
4
5
6
def transform(row):
from SingletonMysql import connector
# Do something
rdd.map(transform)
spark.udf.register("transform", transform)
spark.sql("select transform(name) from table")

通过上面的代码,我们就可以在map函数里面使用mysql实例,而且能够保证它只会在第一次使用的时候被实例化。

总结

在一些第三方库的接口已经确定的情况下,有时候我们必须使用一些语言的特性才能达到一些特定的目标。这个Python单例的使用只是其中的一个例子,如果Spark本身的接口支持在executor端进行一些初始化操作的话就不需要用到单例了。还有,比如map里面的函数只能是一个参数的函数,但是有时候我们需要一些额外的信息,所以这时候就要用匿名函数来达到这个目的了。

Spark报错——java.lang.outofmemoryerror: java heap space问题处理

发表于 2018-05-20 | 分类于 错误处理 |

最近在用spark处理数据的时候遇到内存不足的报错,主要的报错信息是在executor端的log中显示java.lang.outofmemoryerror: java heap space。

问题描述

具体的问题是spark在执行到最后一个stage后有一个task一直执行不成功,每次都是重试四次后失败。下面的两张图是具体失败的信息: 四次task失败信息 具体失败的log

task的失败的信息图中显示:失败的任务的Shffle Read Size是0,这个是不对的,因为这个信息在任务失败的时候都会被置零,实际上在任务在运行的时候这个值是六百多M,远远大于其他task的输入的20多M。

从上面失败的信息中我们可以看到失败的原因是有一个task的输入的数据量太大,以至于spark executor运行的时候需要的内存大大增加,这才导致了内存不足的异常。

问题解决

解决尝试一

最简单直接的解决方法是直接通过增大executor-memory的值来增加executor最大的内存使用量,由于yarn默认的每个executor的core是一个,如果本身启动的executor比较多的话,增加executor-memory的值的话,yarn集群就要多消耗executor的数量✖️增加的内存量的内存,内存的消耗会比较大。所以可以减少executor的数量,为每个executor分配多个core,这样需要的内存量就大大的减少了,但是每个executor可以使用的内存量又可以增加,这样的配置可以减少因为数据倾斜导致任务失败的概率。

最终我们用这个方法把每个executor的executor-memory值增大到了12G,但是最后还是由于内存不够失败了。

解决尝试二

由于某个task需要的内存量非常的大,然而其他task的内存量都很小,这应该不是简单的数据倾斜。spark sql只是对玩家的登陆数据进行以device_id为key的group by操作,数据的倾斜不可能这么严重。

在重新观察了玩家的登陆数据后,我发现有很多数据的device_id为null。这下就很清楚的知道数据倾斜的原因了,接着对device_id为null的数据进行过滤后,问题就迎刃而解了。

总结

在处理数据倾斜问题的时候可以通过调整spark的参数来优化任务的执行。但是如果想更彻底的优化任务的执行的话,要观察数据,知道是什么原因造成的数据倾斜。这样才能进行更彻底的优化。

使用spark进行流处理

发表于 2018-05-04 | 更新于 2018-05-05 | 分类于 流处理 |

最近在做一个假量检测的项目,主要是用来检测是否有一些伪造的广告点击之类的,然后该项目使用了spark来做在线的流处理

spark的使用场景

spark主要用来读取kafka里面的一些点击、安装、登入和登出等数据,然后使用spark的流处理模块对这些数据进行处理,最后把处理完的数据存储到相应的数据库中,供后面的数据分析使用。

使用的spark流处理模块

spark的流处理模块有两个:

  • Spark Streaming(Dstream) 老的接口
  • Structured Streaming 新的接口

我们的项目使用了Dstream实现流处理,一个主要的原因是在新的Structured Streaming中我们不能获取到读取的kafka的offset,这样当我们有数据处理失败的时候就不能从相应的offset中恢复继续运行,虽然可以设置checkpoint来恢复失败的任务,但是checkpoint的恢复是基于任务的,不能对该任务进行修改,然后再重新运行。
对于怎么在Structured Streaming中获取offset,我查了一些资料,如果实在是想获取offset的话也可以通过读取checkpoint文件夹下面的offset文件夹来获取当前的offset,不过这种方法比较奇怪。还有一个方法是使用StreamingQueryListener类里面的onQueryProgress回调来获取当前执行的状态,其中包括offset的信息,但是非常遗憾这种方法只支持scala和java,而我们的开发语言是python。下面的链接是该问题的具体讨论:如何从Structed streaming中获取offset的问题。

在一个流中处理多个topic

感觉spark的api设置的非常不友好,想要在一个流中处理多个topic也挺麻烦的,主要的问题如下:

  • 如果使用Dstream,在创建Dstream的时候可以传入多个topic,这样貌似可以解决读取多个topic的问题,但是有一个很严重的问题,读取到的内容你不知道是属于哪个topic,这样你就不能对不同的topic执行不同的处理了。
  • 如果使用Structured Streaming,也可以在DataStreamReader中指定多个topic,而且传入的每行数据中也有相应的topic信息,是可以根据不同的topic来调用不同的处理方法的。但是如上面所说的,Structured Streaming不支持获取offset让我们放弃了它。

最后我们的处理方法是在一个流中建立多个Dstream,在每个Dstream中拉取和处理同一个topic的数据,这样一个流就可以处理多个topic了,示例代码如下所示:

1
2
3
4
5
6
for topic in topic_info:
from_offsets = restore_off_sets(topic)
DStream = KafkaUtils.createDirectStream(ssc, [topic], kafka_params, from_offsets)
DStream.transform((lambda t: lambda rdd: get_offset_ranges(t, rdd))(topic))\
.map(lambda x: x[1])\
.foreachRDD((lambda t: lambda rdd: process_rdd(t, rdd))(topic))

总结

第一次使用spark,感觉spark的接口设置不是很友好,而且文档写的也不是很友好。比如foreachRDD的回调函数如果是两个参数的函数的话,第一个参数就是时间,这个在文档中没有提及,一不注意就有奇怪的bug了。总之自己还是一个菜鸟,还要多多学习。

Fluentd语法速记

发表于 2018-05-01 | 更新于 2018-05-04 | 分类于 日志收集 |

最近开始转行做大数据,大数据中很重要的一部分是数据的收集,我们公司主要用的数据收集工具是Fluentd,由于Fluentd的配置比较多,有可能配置过一次后就会忘了。我这边在学习Fluentd配置的同时也对这些配置进行一些记录,方便后面再用到时可以快速的查找。

Fluentd简介

Fluentd是一款完全免费且完全开源的日志收集器,拥有“Log Everything”的体系结构,能够与125种以上的系统对接。

Fluentd-architecture

配置文件语法

Fluentd事件的生命周期
  1. 每个输入的事件会带有一个tag
  2. Fluentd通过tag匹配output
  3. Fluentd发送事件到匹配的output
  4. Fluentd支持多个数据源和数据输出
  5. 通过过滤器,事件可以被重新触发
“source”: 定义数据源

数据源可以在source指令中定义,比如我们可以定义http和forward的数据源。http数据源可以通过http协议来接收数据,forward可以通过tcp协议来接收数据。

1
2
3
4
5
6
7
8
9
10
11
12
# Receive events from 24224/tcp
# This is used by log forwarding and the fluent-cat command
<source>
@type forward
port 24224
</source>

# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source>

所有source指令中必须包含@type参数,该参数用来指定使用哪个输入插件,比如我们还可以用tail插件来读取文件的内容。

路由

source指令把事件提交到Fluentd的路由引擎。一个事件由三个实体组成:tag、time和record。tag是由’.’分割的字符串组成,被内部路由引擎使用。time由input插件指定,必须是Unix时间戳格式。record是一个Json对象。

强烈推荐使用小写字母、数字和下划线来命名tag,虽然其他的字符也是合法的。

“match”: 定义数据的输出目标

match指令通过匹配tag字段来将事件输出到其他的系统。同样match指令也必须指定@type参数,该参数用来指定使用哪个输出插件。在下面的例子中,只有myapp.access的tag能够匹配到该输出插件。

1
2
3
4
<match myapp.access>
@type file
path /var/log/fluent/access
</match>

匹配模式

下面的这些匹配模式可以在<match>中使用,用来匹配tag:

  • *用来匹配tag的一部分(比如:a.*可以匹配a.b,但是不能匹配a或者a.b.c)
  • **可以用来匹配tag的0个或多个部分(比如:a.**可以匹配a、a.b和a.b.c)
  • {X,Y,Z}匹配X,Y或者Z(比如:{a,b}可以匹配a和b,但是不能匹配c。他可以和*或者**结合起来一起使用。)
  • 如果有多个匹配模式写在<match>里面,则可以用空格分开(比如:<match a b>能够匹配a和b。<match a.** b.* >能够匹配a,a.b,a.b.c和b.d。)
匹配顺序

Fluentd是按顺序匹配的,先在配置文件里面出现的match会先匹配。下面的例子中myapp.access永远都不会被匹配到。

1
2
3
4
5
6
7
8
9
# ** matches all tags. Bad :(
<match **>
@type blackhole_plugin
</match>

<match myapp.access>
@type file
path /var/log/fluent/access
</match>

“filter”:事件处理管道

“filter”指令的语法和”match”指令的语法相同,但是”filter”能够在管道中被连起来处理,如下所示:

1
Input -> filter 1 -> ... -> filter N -> Output

下面的例子展示了record_transformer fliter的用法。source首先会接收到一个{“event”:”data”}的事件,然后该事件会首先被路由到filter,filter会增加一个host_param的字段到record中,然后再把该事件发送到match中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source>

<filter myapp.access>
@type record_transformer
<record>
host_param "#{Socket.gethostname}"
</record>
</filter>

<match myapp.access>
@type file
path /var/log/fluent/access
</match>

“system”:设置系统范围配置

以下的配置能够由”system”指令指定。也可以通过Fluentd的配置选项设置相同的配置:

  • log_level
  • suppress_repeated_stacktrace
  • emit_error_log_interval
  • suppress_config_dump
  • without_source
  • process_name (只能用”system”指令指定)

下面是一些例子:

1
2
3
4
5
6
<system>
# 等价于-qq选项
log_level error
#等价于--without-source选项
without_source
</system>

1
2
3
<system>
process_name fluentd1
</system>

process_name用来指定Fluentd监控进程和工作进程的名字,通过ps可以看到

1
2
3
% ps aux | grep fluentd1
foo 45673 0.4 0.2 2523252 38620 s001 S+ 7:04AM 0:00.44 worker:fluentd1
foo 45647 0.0 0.1 2481260 23700 s001 S+ 7:04AM 0:00.40 supervisor:fluentd1

“label”:用来组织filter和match

“label”指令用来降低tag路由的复杂度,通过”label”指令可以用来组织filter和match的内部路由。下面是一个配置的例子,由于”label”是内建的插件,所以他的参数需要以@开头。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<source>
@type forward
</source>

<source>
@type tail
@label @SYSTEM
</source>

<filter access.**>
@type record_transformer
<record>
# ...
</record>
</filter>
<match **>
@type elasticsearch
# ...
</match>

<label @SYSTEM>
<filter var.log.middleware.**>
@type grep
# ...
</filter>
<match **>
@type s3
# ...
</match>
</label>

在上面的例子中,forward的数据源的事件被路由到record_transformer filter和elasticsearch output中。tail数据源被路由到@system里面的grep filter和s3 output中。

@ERROR label

@ERROR label是内建的label,用来记录emit_error_event错误事件的。如果在配置文件里面设置了

“@include”:重用配置

可以通过”@include”来导入其他的配置文件,配置文件是按顺序导入的。如果使用模式匹配的话,文件是按字母顺序导入的。

1
2
3
4
5
6
7
8
# If you have a.conf,b.conf,...,z.conf and a.conf / z.conf are important...
# This is bad
@include *.conf

# This is good
@include a.conf
@include config.d/*.conf
@include z.conf

如果导入的文件有顺序的要求的话,最好自己主动写导入的语句,模式匹配导入容易出错。

支持的数据类型

每个插件都需要一些参数。例如:in_tail插件有rotate_wait和pos_file这两个参数。每个参数都有对应的类型与其关联。下面是这些类型的定义:

  • string 类型:该类型被解析成一个字符串。string类型可以有三种形式:不带引号的字符串、带单引号的字符串和带双引号的字符串。
  • integer 类型:该类型被解析成一个整数。
  • float 类型:该类型被解析成一个浮点数。
  • size 类型:该类型用来解析成有多少个字节。可以在整数后面加上k/K、m/M、g/G、t/T,对应的是计算机学科的度量单位。比如:12k表示为12*1024后的数值。
  • time 类型:该类型被解析成时间。可以在浮点数后面加上s、m、h和d分别表示为秒、分、小时、天。可以用0.1表示100ms。
  • array 类型:该类型被解析成JSON数组。这种类型还支持缩写,比如:[“key1”, “key2”]可以缩写成key1,key2。
  • hash 类型:该类型被解析成JSON对象。这种类型也支持缩写,比如:{“key1”:”value1”, “key2”:”value2”}可以缩写成key1:value1,key2:value2。

常见的插件参数

这些参数是系统保留的并且带有@前缀。

  • @type: 指定插件的类型。
  • @id: 指定插件的id。
  • @label:用来指定标签。
  • @log_level:用来指定每个插件的log级别。

检查配置文件

通过–dry-run选项,可以在不启动插件的情况下检查配置文件。

1
$ fluentd --dry-run -c fluent.conf

格式建议

双引号包起来的字符串、数组和哈希类型支持多行
1
2
3
4
5
6
7
8
9
str_param "foo  # This line is converted to "foo\nbar". NL is kept in the parameter
bar"
array_param [
"a", "b"
]
hash_param {
"k":"v",
"k1":10
}

如果想让[或者{开头的字符串不被解析成数组或者对象,则需要用’或者“把该字符串包起来。

1
2
3
4
5
6
7
8
9
<match **>
@type mail
subject "[CRITICAL] foo's alert system"
</match>
<match tag>
@type map
map '[["code." + tag, time, { "code" => record["code"].to_i}], ["time." + tag, time, { "time" => record["time"].to_i}]]'
multi true
</match>

嵌入Ruby代码

可以在”包住的#{}里面执行Ruby代码,这可以用来获取一些机器的信息,比如hostname。

1
2
host_param "#{hostname}"  # This is same with Socket.gethostname
@id "out_foo#{worker_id}" # This is same with ENV["SERVERENGINE_WORKER_ID"]

在双引号字符串中,\是转义字符

\被解释为转义字符。你需要用\来设置”,\r,\n,\t,\或双引号字符串中的多个字符。

1
str_param "foo\nbar" # \n is interpreted as actual LF character

图片转像素风实现

发表于 2018-03-30 | 更新于 2018-10-27 | 分类于 python |

Python用来写各种小工具简直是神器,昨天晚上花了点时间实现了一个图片转像素风的小工具,下面附上图片Demo和代码

下面是实现的具体代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from PIL import Image  
import argparse

# 命令行输入参数处理
parser = argparse.ArgumentParser()

parser.add_argument('file') # 输入文件
parser.add_argument('-o', '--output') # 输出文件
parser.add_argument('--maxlen', type=int, default=150)

# 获取参数
args = parser.parse_args()

IMG = args.file
MAXLEN = args.maxlen
OUTPUT = args.output

def resize_image(im):
(width, height) = im.size
max_len = max(width, height)

if max_len <= MAXLEN:
return im
else:
scale = max_len / MAXLEN
size = (int(width//scale), int(height//scale))
return im.resize(size, Image.NEAREST)

if __name__ == '__main__':

im = Image.open(IMG)
im = resize_image(im)

txt = ""
(width, height) = im.size
for i in range(height):
for j in range(width):
(r, b, g) = im.getpixel((j, i))
txt += "<font style='background:rgb({},{},{});display:inline-block;width:3px;height:3px;margin:1px;'></font>".format(r, b, g)
txt += '</br>'

if not OUTPUT:
OUTPUT = "output.txt"

with open(OUTPUT,'w') as f:
f.write(txt)

使用erlang:get_stacktrace注意避开的坑

发表于 2018-03-29 | 更新于 2018-05-04 | 分类于 Erlang |

之前在使用erlang:get_stacktrace()函数的时候发现不能正确的获取发生异常的栈内容,但是错误类型和原因却是正常,感觉非常奇怪,下面是具体的代码:

1
2
3
4
5
6
7
8
9
10
dispatch_cmd(User, Mod, Msg) ->
try Mod:req(User, Msg) of
Result ->
Result
catch
Class:Reason ->
monitor:notify(ws_dispatch_crash, io_lib:format("<error-info: ~p:req ~p:~p>", [Mod, Class, Reason])),
?ERROR("Req Msg: ~p.~nStacktrace: ~s", [?PR(Msg), ?PR_ST(erlang:get_stacktrace(), {Class, Reason})]),
?ERR_AT_DISPATCH_CMD
end.

上面的代码有什么问题呢? 主要的问题是在调用erlang:get_stacktrace()之前执行了其他有可能会有异常捕获的语句,而在io_lib:format里面会有catch函数,如果io_lib:format函数里面的catch被调用的话,erlang:get_stacktrace()返回的就不是我们想要打印的异常栈,而是io_lib:format里面的异常栈。 如何解决? 在catch之后里面立马调用erlang:get_stacktrace()

1
2
3
4
5
6
7
8
9
10
11
dispatch_cmd(User, Mod, Msg) ->
try Mod:req(User, Msg) of
Result ->
Result
catch
Class:Reason ->
Stacktrace = erlang:get_stacktrace(),
monitor:notify(ws_dispatch_crash, io_lib:format("<error-info: ~p:req ~p:~p>", [Mod, Class, Reason])),
?ERROR("Req Msg: ~p.~nStacktrace: ~s", [?PR(Msg), ?PR_ST(Stacktrace, {Class, Reason})]),
?ERR_AT_DISPATCH_CMD
end.

据说erlang的开发团队也认为erlang:get_stacktrace()是一个不好的东西,会在OTP 21中把它废弃掉,有一位叫@peterdmv的开发人员是这样说的:

1
2
3
4
5
6
7
erlang:get_stacktrace/0 is deprecated in OTP 21, you can use the following expression instead:

try Expr
catch
Class:Reason:Stacktrace ->
{Class,Reason,Stacktrace}
end

我试了下这个新语法,在OTP 20.3上面还不行,应该在接下来的OTP 21中能够使用它吧~

123…5
Darcy

Darcy

欢迎来到我的个人站

50 日志
17 分类
40 标签
RSS
GitHub E-Mail
© 2019 Darcy
由 Hexo 强力驱动 v3.8.0
|
主题 – NexT.Mist v6.4.2