1. 首页
  2. > 海外公司注册 >

银行卡开户行 卡bin(银行卡bin代码查询开户行)

https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html


https://www.elastic.co/guide/en/beats/filebeat/7.13/filebeat-installation-configuration.html#filebeat-installation-configuration


Logstash是一个具有实时流水线功能的开源数据收集引擎。


为不同的高级下游分析和可视化用例清理和民主化您的所有数据。


虽然Logstash最初推动了日志收集方面的创新,但它的功能远远超出了这一用例。


任何类型的事件都可以通过大量的输入、过滤和输出插件来丰富和转换,使用许多本地编解码器进一步简化了摄取过程。Logstash通过利用更大的容量和种类的数据来加速您的洞察力。


Getting Started with Logstash

本节包括以下主题:


  • Java (JVM) version
  • Installing Logstash
  • Stashing Your First Event
  • Parsing Logs with Logstash
  • Stitching Together Multiple Input and Output Plugins

Logstash需要以下版本之一:


Java 8


Java 11


Java 15(请参阅使用JDK 15的设置信息)


安装

下载并安装公钥:


sudo rpm --import https://artifacts.elastic.co/GPG-KEY-Elasticsearch

在/etc/yum.conf下新建repo文件,添加以下内容。以.repo为后缀的文件中的D /目录,例如logstash.repo ,在里面写入如下:


[logstash-7.x] name=Elastic repository for 7.x packages baseurl=https://artifacts.elastic.co/packages/7.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md

[repo] linux创建文件:touch


最后:


yum install logstash

tar下载地址:


https://www.elastic.co/cn/downloads/logstash


运行

centos 7


systemctl start logstash.service

测试

首先,让我们运行最基本的Logstash管道来测试您的Logstash安装。


Logstash管道有两个必需元素,输入和输出,以及一个可选元素,过滤器。输入插件使用来自源的数据,筛选插件根据您的指定修改数据,输出插件将数据写入目标。


要测试Logstash安装,请运行最基本的Logstash管道。例如:


一般logstash在系统的该目录下:


/usr/share/logstash

cd logstash-7.13.1 bin/logstash -e input { stdin { } } output { stdout {} }

启动Logstash后,等待直到你看到“Pipeline main started”,表示运行成功。


然后在命令提示符输入hello world: 结果如下:


2013-11-21T01:22:14.405 0000 0.0.0.0 hello world 本机测试的是带{} 形式的结果数据。

Logstash向消息添加时间戳和IP地址信息。通过在运行Logstash的shell中发出CTRL-D命令来退出Logstash。


恭喜你!您已经创建并运行了一个基本的Logstash管道。接下来,您将学习如何创建一个更现实的管道。


使用Logstash解析日志

在Stashing Your First Event中,您创建了一个基本的Logstash管道来测试您的Logstash设置。在现实世界中,Logstash管道有点复杂:它通常有一个或多个输入、过滤器和输出插件。


在本节中,您将创建一个Logstash管道,该管道使用Filebeat将Apache web日志作为输入,解析这些日志以从日志中创建特定的、命名的字段,并将解析后的数据写入Elasticsearch集群。您将在配置文件中定义管道,而不是在命令行中定义管道配置。


要开始,请到这里下载filebeat:https://www.elastic.co/cn/downloads/beats/filebeat


Filebeat客户端

作用:将日志行发送到Logstash


在创建Logstash管道之前,您将配置Filebeat将日志行发送到Logstash。Filebeat客户端是一个轻量级的、资源友好的工具,它从服务器上的文件收集日志,并将这些日志转发到您的Logstash实例进行处理。Filebeat是为可靠性和低延迟而设计的。Filebeat在主机上占用了较少的资源,而Beats输入插件将Logstash实例上的资源需求最小化。


安装

由于我们已经安装过logstash 的yum源,所以我们可以直接yum下载:


yum install filebeat

配置

安装Filebeat之后,您需要配置它。打开filebeat。yml文件位于您的Filebeat安装目录中,并使用以下行替换内容。确保路径指向前面下载的示例Apache日志文件logstash-tutorial.log:


[logstash-tutorial.log] 测试web日志下载地址:https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz


所在目录


usr/share

但是该目录下没有filebeat.yml文件,所以我们搜索一下:


find / -name filebeat.yml

发现在 etc/filebeat 文件下,向里面修改为:


filebeat.inputs: - type: log enable: true #这个CAO,难为了我一天!! paths: - /var/log/logstash-tutorial.log output.logstash: hosts: ["localhost:5044"]

运行

保存您的更改。


为了保持配置简单,您不会像在现实场景中那样指定TLS/SSL设置。


在数据源机器上,使用以下命令运行Filebeat:


sudo ./filebeat -e -c filebeat.yml -d "publish"

[^]: 去 usr下运行 会报错,找不到yml


所以要指定yml的路径运行


sudo ./filebeat -e -c /etc/filebeat/filebeat.yml -d "publish"

;还是会报错:


因为 filebeat的启动 在output里的配置,它只能运行一个,也就是说 配置 output.logstash 的主机后,es的就要注释掉!


Configuring Logstash for Filebeat Input

接下来,创建一个Logstash配置管道,该管道使用Beats输入插件接收来自Beats的事件。


下面的文本代表了一个配置管道的框架:


# The # character at the beginning of a line indicates a comment. Use # comments to describe your configuration. input { } # The filter part of this file is commented out to indicate that it is # optional. # filter { # # } output { }

logstash自带了一个简单的conf文件,我的名字叫 logstash-sample.conf;


打开后 是这样的:


input { beats { port => "5044" } } output{ elasticsearch{ hosts => .. index => ... user .. password ... } }

[..] 注意这边是es,我们上面再yml中注释掉了es的!


这个框架是非功能性的,因为输入和输出部分没有定义任何有效的选项。


接下来,配置你的Logstash实例来使用Beats输入插件,方法是将以下几行添加到第一个pipeline.conf文件的输入部分:


创建一个pipeline.conf; 去etc/logstash ,默认下的conf.d文件夹下是没有东西的,我们新建立一个first-pipeline.conf ,写入如下代码:


ps: 新建conf文件 写不进去东西,所以我们复制一份:


cp logstash-sample.conf my-first-pipeline.conf

重写写入:


input { beats { port => "5044" } } # The filter part of this file is commented out to indicate that it is # optional. # filter { # # } output { stdout { codec => rubydebug } }

要验证您的配置,运行以下命令:


指定pipeline运行logstash

我们指定我们的conf文件所在地


bin/logstash -f /etc/logstash/my-first-pipeline.conf --config.test_and_exit

如果配置测试通过,会显示 configuration OK.


——配置。Test_and_exit选项解析配置文件并报告任何错误。


如果配置文件通过了配置测试,使用以下命令启动Logstash:


bin/logstash -f /etc/logstash/my-first-pipeline.conf --config.reload.automatic

config.reload。automatic选项允许自动重新加载配置,这样每次修改配置文件时都不必停止并重新启动Logstash。


当Logstash启动时,您可能会看到一个或多个关于Logstash忽略管道的警告消息。yml文件。您完全可以忽略此警告。管道。yml文件用于在一个Logstash实例中运行多个管道。对于这里显示的示例,您运行的是单个管道。


如果你的管道工作正常,你应该会看到一系列的事件写入控制台,如下所示:


starting server on port: 5044 ...

解析Web日志与Grok过滤器插件

现在您有了一个从Filebeat读取日志行的工作管道my-first-pipeline。但是,您会注意到日志消息的格式并不理想。您希望解析日志消息以从日志中创建特定的、命名的字段。为此,您将使用grok过滤器插件。


grok过滤器插件是Logstash中默认可用的几个插件之一。关于如何管理Logstash插件的详细信息,请参见插件管理器的参考文档。


grok过滤器插件使您能够将非结构化日志数据解析为结构化和可查询的数据。


因为grok过滤器插件在传入的日志数据中寻找模式,所以配置插件需要您决定如何识别您的用例感兴趣的模式。web服务器日志示例中的一行代表如下:


83.149.9.216 - - [04/Jan/2015:05:13:42 0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

行开头的IP地址很容易识别,括号中的时间戳也很容易识别。要解析数据,你可以使用%{COMBINEDApacheLOG} grok模式,它使用以下模式从Apache日志中构造行:


grok过滤器参数列表

Information


Field Name


IP Address


clientip


User ID


ident


User Authentication


auth


timestamp


timestamp


HTTP Verb


verb


Request body


request


HTTP Version


httpversion


HTTP Status Code


response


Bytes served


bytes


Referrer URL


referrer


User agent


agent


[^] 如果您需要帮助构建grok模式,请尝试grok调试器。Grok调试器是基本许可下的X-Pack特性,因此可以免费使用。


grok过滤器配置

编辑你的pipeline.conf文件,用下面的文本替换整个过滤器部分:


input { beats { port => "5044" } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}"} } } output { stdout { codec => rubydebug } }

保存您的更改。因为您已经启用了自动重新加载配置,所以您不必重新启动Logstash来获取您的更改。但是,您确实需要强制Filebeat从头读取日志文件。要做到这一点,转到Filebeat正在运行的终端窗口,按Ctrl C关闭Filebeat。然后删除Filebeat注册表文件。例如,运行:


sudo rm data/registry

由于Filebeat将它收集的每个文件的状态存储在注册表中,删除注册表文件将强制Filebeat从头读取它收集的所有文件。


接下来,用下面的命令重启Filebeat:


sudo ./filebeat -e -c filebeat.yml -d "publish"

如果Filebeat需要等待Logstash重新加载配置文件,那么在它开始处理事件之前可能会有一点延迟。


在Logstash应用grok模式后,事件将有以下JSON表示:


{ "request" => "/presentations/logstash-monitorama-2013/images/kibana-search.png", "agent" => ""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"", "offset" => 325, "auth" => "-", "ident" => "-", "verb" => "GET", "prospector" => { "type" => "log" }, "input" => { "type" => "log" }, "source" => "/path/to/file/logstash-tutorial.log", "message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"", "tags" => [ [0] "beats_input_codec_plain_applied" ], "referrer" => ""http://semicomplete.com/presentations/logstash-monitorama-2013/"", "@timestamp" => 2017-11-09T02:51:12.416Z, "response" => "200", "bytes" => "203023", "clientip" => "83.149.9.216", "@version" => "1", "beat" => { "name" => "My-MacBook-Pro.local", "hostname" => "My-MacBook-Pro.local", "version" => "6.0.0" }, "host" => "My-MacBook-Pro.local", "httpversion" => "1.1", "timestamp" => "04/Jan/2015:05:13:42 0000" }

请注意,事件包含原始消息,但日志消息也被分解为特定的字段。


增强您的数据与Geoip过滤器插件

除了解析日志数据以进行更好的搜索之外,过滤插件还可以从现有数据中获得补充信息。例如,geoip插件查找IP地址,从地址中获取地理位置信息,并将该位置信息添加到日志中。


配置你的Logstash实例来使用geoip过滤器插件,通过添加以下行到第一个管道.conf文件的过滤器部分:


geoip { source => "clientip" }

geoip插件配置要求您指定包含要查找的IP地址的源字段的名称。在本例中,clientip字段包含IP地址。


因为过滤器是按顺序计算的,所以要确保geoip部分位于配置文件的grok部分之后,并且grok部分和geoip部分都嵌套在过滤器部分中。


当你完成时,first-pipeline.conf的内容应该是这样的:


input { beats { port => "5044" } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}"} } geoip { source => "clientip" } } output { stdout { codec => rubydebug } }

保存您的更改。如前所述,要强制Filebeat从头读取日志文件,请关闭Filebeat(按Ctrl C),删除注册表文件,然后使用以下命令重新启动Filebeat:


sudo ./filebeat -e -c filebeat.yml -d "publish"

注意,事件现在包含了地理位置信息:


{ "request" => "/presentations/logstash-monitorama-2013/images/kibana-search.png", "agent" => ""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"", "geoip" => { "timezone" => "Europe/Moscow", "ip" => "83.149.9.216", "latitude" => 55.7485, "continent_code" => "EU", "city_name" => "Moscow", "country_name" => "Russia", "country_code2" => "RU", "country_code3" => "RU", "region_name" => "Moscow", "location" => { "lon" => 37.6184, "lat" => 55.7485 }, "postal_code" => "101194", "region_code" => "MOW", "longitude" => 37.6184 },

将数据索引到Elasticsearch中

现在,web日志已经被分解成特定的字段,您已经准备好将数据放入Elasticsearch。


Logstash管道可以将数据索引到Elasticsearch集群中。编辑第一个pipeline.conf文件,用下面的文本替换整个输出部分:


// 换输出对象 ,我们输出到es;


output { elasticsearch { hosts => [ "localhost:9200" ] } }

通过这个配置,Logstash使用http协议连接到Elasticsearch。上面的例子假设Logstash和Elasticsearch在同一个实例上运行。您可以通过使用hosts配置来指定像hosts => ["es-machine:9092"]这样的东西来指定远程Elasticsearch实例。


此时,你的第一个pipeline.conf文件已经正确配置了输入、过滤和输出部分,如下所示:


input { beats { port => "5044" } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}"} } geoip { source => "clientip" } } output { elasticsearch { hosts => [ "localhost:9200" ] } }

保存您的更改。如前所述,要强制Filebeat从头读取日志文件,请关闭Filebeat(按Ctrl C),删除注册表文件,然后使用以下命令重新启动Filebeat:


sudo ./filebeat -e -c filebeat.yml -d "publish"

测试pipeline

现在Logstash管道已经配置为将数据索引到Elasticsearch集群中,您可以查询Elasticsearch。


尝试基于grok过滤器插件创建的字段对Elasticsearch进行测试查询。将$DATE替换为当前日期,格式为YYYY.MM.DD:


curl -XGET 192.168.19.130:9200/logstash-$DATE/_search?pretty&q=response=200

索引名称中使用的日期基于UTC,而不是Logstash运行的时区。如果查询返回index_not_found_exception,请确保logstash-$DATE反映了索引的实际名称。要查看可用索引的列表,可以使用以下查询:curl localhost:9200/_cat/indices?v。


我的索引:


logstash-2021.06.07-000001 filebeat-7.13.1-2021.06.07 两个 ,我们看从filebeat中来的数据 所以这么查: curl -XGET 192.168.19.130:9200/filebeat-7.13.1-2021.06.07/_search?pretty&q=response=200

应该会得到多次反击。例如:


{ "took": 50, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 98, "max_score": 2.793642, "hits": [ { "_index": "logstash-2017.11.09", "_type": "doc", "_id": "3IzDnl8BW52sR0fx5wdV", "_score": 2.793642, "_source": { "request": "/presentations/logstash-monitorama-2013/images/frontend-response-codes.png", "agent": """"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"""", "geoip": { "timezone": "Europe/Moscow", "ip": "83.149.9.216", "latitude": 55.7485, "continent_code": "EU", "city_name": "Moscow", "country_name": "Russia", "country_code2": "RU", "country_code3": "RU", "region_name": "Moscow", "location": { "lon": 37.6184, "lat": 55.7485 }, "postal_code": "101194", "region_code": "MOW", "longitude": 37.6184 }, "offset": 2932, "auth": "-", "ident": "-", "verb": "GET", "prospector": { "type": "log" }, "input": { "type": "log" }, "source": "/path/to/file/logstash-tutorial.log", "message": """83.149.9.216 - - [04/Jan/2015:05:13:45 0000] "GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png HTTP/1.1" 200 52878 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"""", "tags": [ "beats_input_codec_plain_applied" ], "referrer": """"http://semicomplete.com/presentations/logstash-monitorama-2013/"""", "@timestamp": "2017-11-09T03:11:35.304Z", "response": "200", "bytes": "52878", "clientip": "83.149.9.216", "@version": "1", "beat": { "name": "My-MacBook-Pro.local", "hostname": "My-MacBook-Pro.local", "version": "6.0.0" }, "host": "My-MacBook-Pro.local", "httpversion": "1.1", "timestamp": "04/Jan/2015:05:13:45 0000" } }, ...

ps:报错503:原因是;我配的es集群,但是只开了一个node1节点,导致报错。


办法一:去es的.yml 将node发现改为1


办法二: 将集群es都打开


办法三: 重新定义一个es


我的反击

"hits" : { "total" : { "value" : 98, "relation" : "eq" }, "max_score" : 3.8152673, "hits" : [ { "_index" : "filebeat-7.13.1-2021.06.07", "_type" : "_doc", "_id" : "UQvM5XkBKi3bYQvDc0GL", "_score" : 3.8152673, "_source" : { "host" : { "id" : "0f725ccead3d4f03900732eca6ab5597", "architecture" : "x86_64", "os" : { "type" : "linux", "codename" : "Core", "kernel" : "3.10.0-1160.el7.x86_64", "version" : "7 (Core)", "family" : "redhat", "platform" : "centos", "name" : "CentOS Linux" }, "mac" : [ "00:0c:29:01:e7:a5", "52:54:00:d1:60:01", "52:54:00:d1:60:01" ], "hostname" : "localhost.localdomain", "containerized" : false, "ip" : [ "192.168.19.130", "fe80::79f3:ba8d:b195:4d8", "192.168.122.1" ], "name" : "localhost.localdomain" }, "message" : "83.149.9.216 - - [04/Jan/2015:05:13:45 0000] "GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png HTTP/1.1" 200 52878 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"", "log" : { "file" : { "path" : "/var/log/logstash-tutorial.log" }, "offset" : 2598 }, "agent" : { "id" : "eb96fc56-cd1d-4931-a979-6b8c86e9e85f", "type" : "filebeat", "hostname" : "localhost.localdomain", "ephemeral_id" : "203946d1-2d3d-48d6-bc9b-ea4e09cb3244", "version" : "7.13.1", "name" : "localhost.localdomain" }, "@version" : "1", "@timestamp" : "2021-06-07T09:27:18.448Z", "cloud" : { "provider" : "openstack", "instance" : { "id" : "<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">r <html xmlns="http://www.w3.org/1999/xhtml">r <head>r <meta content="text/html; charset=utf-8" http-equiv="Content-Type" />r <meta content="no-cache" http-equiv="Pragma" />r <title>Waiting...</title>r <script type="text/javascript">r var pageName = /;r top.location.replace(pageName);r </script>r </head>r <body></body>r </html>r ", "name" : "<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">r <html xmlns="http://www.w3.org/1999/xhtml">r <head>r <meta content="text/html; charset=utf-8" http-equiv="Content-Type" />r <meta content="no-cache" http-equiv="Pragma" />r <title>Waiting...</title>r <script type="text/javascript">r var pageName = /;r top.location.replace(pageName);r </script>r </head>r <body></body>r </html>r " }, "machine" : { "type" : "<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">r <html xmlns="http://www.w3.org/1999/xhtml">r <head>r <meta content="text/html; charset=utf-8" http-equiv="Content-Type" />r <meta content="no-cache" http-equiv="Pragma" />r <title>Waiting...</title>r <script type="text/javascript">r var pageName = /;r top.location.replace(pageName);r </script>r </head>r <body></body>r </html>r " }, "service" : { "name" : "Nova" }, "availability_zone" : "<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">r <html xmlns="http://www.w3.org/1999/xhtml">r <head>r <meta content="text/html; charset=utf-8" http-equiv="Content-Type" />r <meta content="no-cache" http-equiv="Pragma" />r <title>Waiting...</title>r <script type="text/javascript">r var pageName = /;r top.location.replace(pageName);r </script>r </head>r <body></body>r </html>r " }, "input" : { "type" : "log" }, "ecs" : { "version" : "1.8.0" }, "tags" : [ "beats_input_codec_plain_applied" ] } },

尝试另一种从IP地址获取地理信息的搜索。将$DATE替换为当前日期,格式为YYYY.MM.DD:


curl -XGET localhost:9200/filebeat-7.13.1-2021.06.07/_search?pretty&q=geoip.city_name=Buffalo

有一些日志条目来自Buffalo,因此查询产生以下响应:


{ "took": 9, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 2, "max_score": 2.6390574, "hits": [ { "_index": "logstash-2017.11.09", "_type": "doc", "_id": "L4zDnl8BW52sR0fx5whY", "_score": 2.6390574, "_source": { "request": "/blog/geekery/disabling-battery-in-ubuntu-vms.html?utm_source=feedburner&utm_medium=feed&utm_campaign=Feed: semicomplete/main (semicomplete.com - Jordan Sissel)", "agent": """"Tiny Tiny RSS/1.11 (http://tt-rss.org/)"""", "geoip": { "timezone": "America/New_York", "ip": "198.46.149.143", "latitude": 42.8864, "continent_code": "NA", "city_name": "Buffalo", "country_name": "United States", "country_code2": "US", "dma_code": 514, "country_code3": "US", "region_name": "New York", "location": { "lon": -78.8781, "lat": 42.8864 }, "postal_code": "14202", "region_code": "NY", "longitude": -78.8781 }, "offset": 22795, "auth": "-", "ident": "-", "verb": "GET", "prospector": { "type": "log" }, "input": { "type": "log" }, "source": "/path/to/file/logstash-tutorial.log", "message": """198.46.149.143 - - [04/Jan/2015:05:29:13 0000] "GET /blog/geekery/disabling-battery-in-ubuntu-vms.html?utm_source=feedburner&utm_medium=feed&utm_campaign=Feed: semicomplete/main (semicomplete.com - Jordan Sissel) HTTP/1.1" 200 9316 "-" "Tiny Tiny RSS/1.11 (http://tt-rss.org/)"""", "tags": [ "beats_input_codec_plain_applied" ], "referrer": """"-"""", "@timestamp": "2017-11-09T03:11:35.321Z", "response": "200", "bytes": "9316", "clientip": "198.46.149.143", "@version": "1", "beat": { "name": "My-MacBook-Pro.local", "hostname": "My-MacBook-Pro.local", "version": "6.0.0" }, "host": "My-MacBook-Pro.local", "httpversion": "1.1", "timestamp": "04/Jan/2015:05:29:13 0000" } }, ...

如果您正在使用Kibana来可视化您的数据,您也可以在Kibana中探索Filebeat数据。 Filebeat quick start docs


到此,您已经成功创建了一个管道,该管道使用Filebeat将Apache web日志作为输入,解析这些日志以从日志中创建特定的、命名的字段,并将解析后的数据写入Elasticsearch集群。接下来,您将学习如何创建一个使用多个输入和输出插件的管道。


高级:logstash多个 Input and Output

您需要管理的信息通常来自多个不同的源,用例可能需要数据的多个目的地。您的Logstash管道可以使用多个输入和输出插件来处理这些需求。


在本节中,您将创建一个Logstash管道,该管道接收来自Twitter提要和Filebeat客户机的输入,然后将信息发送到Elasticsearch集群,并将信息直接写入文件。


读取twitter上的数据

要添加Twitter提要,您可以使用Twitter输入插件。要配置插件,你需要几个信息:


  • 消费者密钥,唯一标识您的Twitter应用程序。
  • 消费者的秘密,作为你的Twitter应用的密码。
  • 在传入提要中搜索的一个或多个关键词。这个例子展示了使用“cloud”作为关键字,但是您可以使用任何您想要的。
  • 一个oauth令牌,用于识别使用此应用程序的Twitter帐户。
  • 一个oauth令牌secret,用作Twitter帐户的密码。

访问https://dev.twitter.com/apps建立一个Twitter帐户,生成您的用户密钥和密钥,以及您的访问令牌和密钥。如果您不确定如何生成这些键,请参阅文档中的twitter输入插件。


与之前使用Logstash解析日志时所做的一样,创建一个包含配置管道框架的配置文件(称为second-pipeline.conf)。如果愿意,可以重用前面创建的文件,但是要确保在运行Logstash时传入正确的配置文件名。


配置第二个pipeline.conf

将以下几行添加到第二个pipeline.conf文件的输入部分,用这里显示的占位符值替换您的值:


twitter { consumer_key => "enter_your_consumer_key_here" consumer_secret => "enter_your_secret_here" keywords => ["cloud"] oauth_token => "enter_your_access_token_here" oauth_token_secret => "enter_your_access_token_secret_here" }

配置Filebeat发送日志行到日志库

正如您在前面配置Filebeat发送日志行到Logstash中所了解的,Filebeat客户端是一个轻量级的、资源友好的工具,它从服务器上的文件收集日志,并将这些日志转发到您的Logstash实例进行处理。


安装Filebeat之后,您需要配置它。打开filebeat。yml文件位于您的Filebeat安装目录中,并使用以下行替换内容。确保路径指向你的syslog:


filebeat.inputs: - type: log paths: - /var/log/*.log #Filebeat处理的一个或多个文件的绝对路径。 fields: type: syslog #将一个名为type的值为syslog的字段添加到事件中。 output.logstash: hosts: ["localhost:5044"]

保存您的更改。


为了保持配置简单,您不会像在现实场景中那样指定TLS/SSL设置。


配置你的Logstash实例来使用Filebeat输入插件,添加以下行到第二个管道.conf文件的输入部分:


beats { port => "5044" }

向文件写入日志存储数据

您可以通过文件输出插件配置Logstash管道,将数据直接写入文件。


配置你的Logstash实例来使用文件输出插件,添加以下行到第二个管道.conf文件的输出部分:


file { path => "/path/to/target/file" }

写入多个Elasticsearch节点

写入多个Elasticsearch节点可以减轻给定Elasticsearch节点上的资源需求,并在特定节点不可用时提供进入集群的冗余入口点。


要配置你的Logstash实例写入多个Elasticsearch节点,编辑第二个pipeline.conf文件的输出部分来读取:


output { elasticsearch { hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"] } }

在主机行中使用Elasticsearch集群中三个非主节点的IP地址。当hosts参数列出多个IP地址时,Logstash跨地址列表的负载平衡请求。还要注意,Elasticsearch的默认端口是9200,可以在上面的配置中省略。


测试管道

此时,你的第二个pipeline.conf文件看起来像这样:


input { twitter { consumer_key => "enter_your_consumer_key_here" consumer_secret => "enter_your_secret_here" keywords => ["cloud"] oauth_token => "enter_your_access_token_here" oauth_token_secret => "enter_your_access_token_secret_here" } beats { port => "5044" } } output { elasticsearch { hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"] } file { path => "/path/to/target/file" } }

Logstash从您配置的Twitter feed中消耗数据,从Filebeat接收数据,并将这些信息索引到Elasticsearch集群中的三个节点,并写入一个文件。


在数据源机器上,使用以下命令运行Filebeat:


sudo ./filebeat -e -c filebeat.yml -d "publish"

Filebeat将尝试在端口5044上连接。在Logstash启动一个活动的Beats插件之前,该端口上不会有任何答案,所以您看到的关于该端口连接失败的任何消息现在都是正常的。


要验证您的配置,运行以下命令:


bin/logstash -f second-pipeline.conf --config.test_and_exit

——配置。Test_and_exit选项解析配置文件并报告任何错误。当配置文件通过配置测试后,使用以下命令启动Logstash:


bin/logstash -f second-pipeline.conf

使用grep工具在目标文件中搜索,以验证信息是否存在:


grep syslog /path/to/target/file

运行Elasticsearch查询,在Elasticsearch集群中找到相同的信息:


curl -XGET localhost:9200/logstash-$DATE/_search?pretty&q=fields.type:syslog

将$DATE替换为当前日期,格式为YYYY.MM.DD。


curl -XGET http://localhost:9200/logstash-$DATE/_search?pretty&q=client:iphone

同样,记住将$DATE替换为当前日期,格式为YYYY.MM.DD。


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至123456@qq.com 举报,一经查实,本站将立刻删除。

联系我们

工作日:9:30-18:30,节假日休息