Go运维开发之日志收集(6)从kafka中获取日志信息
Go运维开发之日志收集(6)从kafka中获取日志信息
Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask
Go运维开发之日志收集(5)根据IP地址去拉取配置日志收集项
日志收集这块主要的功能已经实现,优化工作放到后面再做,先把流程走通。
前面已经实现了数据写入到kafka中了,我们需要将数据落地到elastciseartch中,再通过kibana进行展示,关于elasticsearch和kibana的安装可以看下我之前的文章ELKStack-6.7.1部署,这里不做具体介绍。
备注:Elasticsearch版本需要和Kibana版本保持一致。
什么是 Elasticsearch
Elasticsearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据,包括文本、数字、地理空间、结构化和非结构化数据。Elasticsearch 在 Apache Lucene 的基础上开发而成,由 Elasticsearch N.V.(即现在的 Elastic)于 2010 年首次发布。Elasticsearch 以其简单的 REST 风格 API、分布式特性、速度和可扩展性而闻名,是 Elastic Stack 的核心组件;Elastic Stack 是适用于数据采集、充实、存储、分析和可视化的一组开源工具。人们通常将 Elastic Stack 称为 ELK Stack(代指 Elasticsearch、Logstash 和 Kibana),目前 Elastic Stack 包括一系列丰富的轻量型数据采集代理,这些代理统称为 Beats,可用来向 Elasticsearch 发送数据。
Elasticsearch API基本操作
目前最新的版本是7.6.1
1curl 172.16.10.10:9200
输出结果
1{
2 "name" : "node0",
3 "cluster_name" : "elasticsearch",
4 "cluster_uuid" : "K99BHGyST3aW1_kYb-AzeQ",
5 "version" : {
6 "number" : "7.6.1",
7 "build_flavor" : "default",
8 "build_type" : "tar",
9 "build_hash" : "aa751e09be0a5072e8570670309b1f12348f023b",
10 "build_date" : "2020-02-29T00:15:25.529771Z",
11 "build_snapshot" : false,
12 "lucene_version" : "8.4.0",
13 "minimum_wire_compatibility_version" : "6.8.0",
14 "minimum_index_compatibility_version" : "6.0.0-beta1"
15 },
16 "tagline" : "You Know, for Search"
17}
创建索引
1curl -X PUT 172.16.10.10:9200/web_index
输出结果
1{"acknowledged":true,"shards_acknowledged":true,"index":"web_index"}
插入记录
1curl -H "Content-Type:application/json" -X POST 172.16.10.10:9200/web_index/logs -d '
2{
3 "time": "2020-03-05 10:20:25",
4 "msg": "GET /js/common.js HTTP/1.1",
5 "status": 200,
6 "agent": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)"
7}'
输出结果
1{"_index":"web_index","_type":"logs","_id":"zuOBqHABInlWuVaas0D8","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1}
查询记录
查询所有
1curl -X GET 172.16.10.10:9200/web_index/logs/_search?pretty
输出结果
1{
2 "took" : 10,
3 "timed_out" : false,
4 "_shards" : {
5 "total" : 1,
6 "successful" : 1,
7 "skipped" : 0,
8 "failed" : 0
9 },
10 "hits" : {
11 "total" : {
12 "value" : 1,
13 "relation" : "eq"
14 },
15 "max_score" : 1.0,
16 "hits" : [
17 {
18 "_index" : "web_index",
19 "_type" : "logs",
20 "_id" : "zuOBqHABInlWuVaas0D8",
21 "_score" : 1.0,
22 "_source" : {
23 "time" : "2020-03-05 10:20:25",
24 "msg" : "GET /js/common.js HTTP/1.1",
25 "status" : 200,
26 "agent" : "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)"
27 }
28 }
29 ]
30 }
31}
按条件查询
1curl -H "Content-Type:application/json" -X PUT 172.16.10.10:9200/web_index/logs/_query?pretty -d '
2{
3 "query":{
4 "match": {"status": 200}
5 }
6}'
输出结果
1{
2 "_index" : "web_index",
3 "_type" : "logs",
4 "_id" : "_query",
5 "_version" : 2,
6 "result" : "updated",
7 "_shards" : {
8 "total" : 2,
9 "successful" : 1,
10 "failed" : 0
11 },
12 "_seq_no" : 2,
13 "_primary_term" : 1
14}
删除索引
1curl -X DELETE 172.16.10.10:9200/web_index
输出结果
1{"acknowledged":true}
更多相关的操作可以查看官方的文档Elasticsearch入门
go操作elasticsearch
我们使用第三方库github.com/olivere/elastic
elastic库的官方文档https://olivere.github.io/elastic/上面有更加详细的实例
根据readme的介绍:6.X包路径为github.com/olivere/elastic
7.X的包路径为github.com/olivere/elastic/v7
,根据自己安装的elasticsearch版本进行区分。
新建es_demo目录,并创建main.go文件
es_demo/main.go
1package main
2
3import (
4 "context"
5 "fmt"
6 "olivere/elastic"
7
8 "github.com/olivere/elastic/v7"
9)
10
11type User struct {
12 Name string `json:"name"`
13 Age int `json:"age"`
14 Email string `json:"email"`
15}
16
17func main() {
18 client, err := elastic.NewClient(elastic.SetURL("http://172.16.10.10:9200"))
19 if err != nil {
20 panic(err)
21 }
22
23 fmt.Println("connect to es success")
24
25 // 新增记录
26 p1 := User{Name: "jerry", Age: 30, Email: "hzde0128@live.cn"}
27 put1, err := client.Index().
28 Index("user").
29 BodyJson(p1).
30 Do(context.Background())
31 if err != nil {
32 // Handle error
33 panic(err)
34 }
35 fmt.Printf("Indexed user %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
36}
运行测试
1go mod init
2go run main.go
3connect to es success
4Indexed user z-OpqHABInlWuVaa2EDw to index user, type _doc
查询ES看有没有写进去
1curl -XGET http://172.16.10.10:9200/user/_doc/_search?pretty
2{
3 "took" : 4,
4 "timed_out" : false,
5 "_shards" : {
6 "total" : 1,
7 "successful" : 1,
8 "skipped" : 0,
9 "failed" : 0
10 },
11 "hits" : {
12 "total" : {
13 "value" : 1,
14 "relation" : "eq"
15 },
16 "max_score" : 1.0,
17 "hits" : [
18 {
19 "_index" : "user",
20 "_type" : "_doc",
21 "_id" : "z-OpqHABInlWuVaa2EDw",
22 "_score" : 1.0,
23 "_source" : {
24 "name" : "jerry",
25 "age" : 30,
26 "email" : "hzde0128@live.cn"
27 }
28 }
29 ]
30 }
31}
好了,已经有了。
删除修改操作这些暂时不介绍,可以看官方文档。
kafka消费示例
前面介绍了写数据到kafka中,数据需要从消息队列里面取出最终落地到es中。简单介绍下从kafka中获取数据
创建kakfa_demo目录,并创建main.go文件
kafka_demo/main.go
1package main
2
3import (
4 "fmt"
5
6 "github.com/Shopify/sarama"
7)
8
9// kafka 消费示例
10
11func main() {
12 broker := []string{"172.16.10.10:9092"}
13 topic := "system_log"
14
15 client, err := sarama.NewConsumer(broker, nil)
16 if err != nil {
17 fmt.Printf("fail to start consumer, err:%v\n", err)
18 return
19 }
20 fmt.Println("connect kafka success")
21
22 partitionList, err := client.Partitions(topic) // 根据topic取到所有的分区
23 if err != nil {
24 fmt.Printf("fail to get list of partition:err%v\n", err)
25 return
26 }
27 fmt.Printf("topic:%s, partition:%v\n",topic,partitionList)
28 for partition := range partitionList { // 遍历所有的分区
29 // 针对每个分区创建一个对应的分区消费者
30 cp, err := client.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
31 if err != nil {
32 fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
33 return
34 }
35 defer cp.AsyncClose()
36 // 异步从每个分区消费信息
37 go func(sarama.PartitionConsumer) {
38 for msg := range cp.Messages() {
39 fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
40 }
41 }(cp)
42 select {
43
44 }
45 }
46}
把之前的logAgent运行起来
1./logAgent
构建测试
1go mod init
2go build
3./kafka_demo
4connect kafka success
5topic:system_log, partition:[0]
6Partition:0 Offset:3308 Key:[] Value:Mar 5 12:28:19 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent[67874]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/Helpers: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
7Partition:0 Offset:3309 Key:[] Value:Mar 5 12:28:19 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent[67874]): Service exited with abnormal code: 78
8Partition:0 Offset:3310 Key:[] Value:Mar 5 12:28:19 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.
已经能够从kafka中拿到日志信息了。
下面开始logTransfer服务的开发,数据已经到kafka了,现在要从kafka中消费数据,然后写到es中,logTransfer做的就是这个工作。
初始化项目目录
创建logTransfer目录,并创建kafka、es、conf三个子目录,目录结构如下
1tree
2.
3├── conf // 配置文件目录
4│ └── config.go
5│ └── config.ini
6├── es // es包
7│ └── es.go
8├── go.mod
9├── kafka // kafka包
10│ └── kafka.go
11└── main.go // 主程序
12
133 directories, 5 files
加载配置文件
跟之前的logAgent一样,先加载配置文件,创建结构体映射,main函数调用
conf/config.ini
1[kafka]
2address = 172.16.10.10:9092
3topic = web_log
4
5[es]
6address = 172.16.10.10:9200
conf/config.go
1package conf
2
3type AppConf struct{
4 KafkaConf `ini:"kafka"`
5 EsConf `ini:"es"`
6}
7
8type KafkaConf struct{
9 Address string `ini:"address"`
10 Topic string `ini:"topic"`
11}
12
13type EsConf struct{
14 Address string `ini:"address"`
15}
main.go
1package main
2
3import (
4 "fmt"
5 "logTransfer/conf"
6
7 "gopkg.in/ini.v1"
8)
9
10var (
11 cfg = new(conf.AppConf)
12)
13
14func main() {
15 err := ini.MapTo(cfg, "conf/config.ini")
16 if err != nil {
17 fmt.Printf("load config failed, err:%v\n", err)
18 return
19 }
20 fmt.Println("load config success")
21}
初始化连接
es/es.go
1package es
2
3import (
4 "fmt"
5 "strings"
6
7 "github.com/olivere/elastic/v7"
8)
9
10var (
11 client *elastic.Client
12)
13
14// Init 初始化连接
15func Init(addr string) (err error) {
16 if !strings.HasPrefix(addr, "http://") {
17 addr = "http://" + addr
18 }
19 client, err = elastic.NewClient(elastic.SetURL(addr))
20 if err != nil {
21 fmt.Printf("es connect failed, err:%v\n", err)
22 return
23 }
24 return
25}
kafka/kafka.go
1package kafka
2
3import (
4 "fmt"
5 "sync"
6
7 "github.com/Shopify/sarama"
8)
9
10// Init 初始化kafka连接,准备发送数据给es
11func Init(addr []string, topic string) (err error) {
12 consumer, err := sarama.NewConsumer(addr, nil)
13 if err != nil {
14 fmt.Printf("fail to start consumer, err:%v\n", err)
15 return
16 }
17
18 partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区
19 if err != nil {
20 fmt.Printf("fail to get list of partition:err%v\n", err)
21 return
22 }
23 fmt.Printf("topic:%s, partition:%v\n", topic, partitionList)
24 for partition := range partitionList { // 遍历所有的分区
25 // 针对每个分区创建一个对应的分区消费者
26 var cp sarama.PartitionConsumer
27 cp, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
28 if err != nil {
29 fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
30 return
31 }
32 // defer cp.AsyncClose()
33 // 异步从每个分区消费信息
34 var wg sync.WaitGroup
35 defer wg.Done()
36 wg.Add(1)
37 go func(sarama.PartitionConsumer) {
38 for msg := range cp.Messages() {
39 fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
40 // 准备发送数据给es
41 }
42 }(cp)
43 wg.Wait()
44 }
45 return
46}
main.go
1package main
2
3import (
4 "fmt"
5 "logTransfer/conf"
6 "logTransfer/es"
7
8 "logTransfer/kafka"
9
10 "gopkg.in/ini.v1"
11)
12
13var (
14 cfg = new(conf.AppConf)
15)
16
17func main() {
18 // 1.加载配置文件
19 err := ini.MapTo(cfg, "conf/config.ini")
20 if err != nil {
21 fmt.Printf("load config failed, err:%v\n", err)
22 return
23 }
24 fmt.Println("load config success")
25
26 // 2.初始化es连接
27 err = es.Init(cfg.EsConf.Address)
28 if err != nil {
29 fmt.Printf("init es failed, err:%v\n", err)
30 return
31 }
32 fmt.Println("init es success")
33
34 // 3.初始化kafka连接
35 err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.Topic)
36 if err != nil {
37 fmt.Printf("init kafka failed, err:%v\n", err)
38 return
39 }
40 fmt.Println("init kafka success")
41}
构建测试
1go build
2./logTransfer
3load config success
4init es success
5topic:system_log, partition:[0]
6Partition:0 Offset:10595 Key:[] Value:Mar 5 14:43:01 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent[97352]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/Helpers: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
7Partition:0 Offset:10596 Key:[] Value:Mar 5 14:43:01 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent[97352]): Service exited with abnormal code: 78
8Partition:0 Offset:10597 Key:[] Value:Mar 5 14:43:01 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.
OK,能正常获取到数据,后面就是将获取到的数据存入ES中。
- 原文作者:黄忠德
- 原文链接:https://huangzhongde.cn/post/2020-03-05-golang_devops_logAgent_6_get_data_from_kafka/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。