Go运维开发之日志收集(6)从kafka中获取日志信息

Go运维开发之日志收集(1)收集应用程序日志到kafka中

Go运维开发之日志收集(2)从etcd中获取配置信息

Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask

Go运维开发之日志收集(4)监视etcd配置项的变更

Go运维开发之日志收集(5)根据IP地址去拉取配置日志收集项

日志收集这块主要的功能已经实现,优化工作放到后面再做,先把流程走通。

前面已经实现了数据写入到kafka中了,我们需要将数据落地到elastciseartch中,再通过kibana进行展示,关于elasticsearch和kibana的安装可以看下我之前的文章ELKStack-6.7.1部署,这里不做具体介绍。

备注:Elasticsearch版本需要和Kibana版本保持一致。

什么是 Elasticsearch

Elasticsearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据,包括文本、数字、地理空间、结构化和非结构化数据。Elasticsearch 在 Apache Lucene 的基础上开发而成,由 Elasticsearch N.V.(即现在的 Elastic)于 2010 年首次发布。Elasticsearch 以其简单的 REST 风格 API、分布式特性、速度和可扩展性而闻名,是 Elastic Stack 的核心组件;Elastic Stack 是适用于数据采集、充实、存储、分析和可视化的一组开源工具。人们通常将 Elastic Stack 称为 ELK Stack(代指 Elasticsearch、Logstash 和 Kibana),目前 Elastic Stack 包括一系列丰富的轻量型数据采集代理,这些代理统称为 Beats,可用来向 Elasticsearch 发送数据。

Elasticsearch API基本操作

目前最新的版本是7.6.1

1curl 172.16.10.10:9200

输出结果

 1{
 2  "name" : "node0",
 3  "cluster_name" : "elasticsearch",
 4  "cluster_uuid" : "K99BHGyST3aW1_kYb-AzeQ",
 5  "version" : {
 6    "number" : "7.6.1",
 7    "build_flavor" : "default",
 8    "build_type" : "tar",
 9    "build_hash" : "aa751e09be0a5072e8570670309b1f12348f023b",
10    "build_date" : "2020-02-29T00:15:25.529771Z",
11    "build_snapshot" : false,
12    "lucene_version" : "8.4.0",
13    "minimum_wire_compatibility_version" : "6.8.0",
14    "minimum_index_compatibility_version" : "6.0.0-beta1"
15  },
16  "tagline" : "You Know, for Search"
17}

创建索引

1curl -X PUT 172.16.10.10:9200/web_index

输出结果

1{"acknowledged":true,"shards_acknowledged":true,"index":"web_index"}

插入记录

1curl -H "Content-Type:application/json" -X POST 172.16.10.10:9200/web_index/logs -d '
2{
3	"time": "2020-03-05 10:20:25",
4	"msg": "GET /js/common.js HTTP/1.1",
5	"status": 200,
6	"agent": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)"
7}'

输出结果

1{"_index":"web_index","_type":"logs","_id":"zuOBqHABInlWuVaas0D8","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1}

查询记录

查询所有

1curl -X GET 172.16.10.10:9200/web_index/logs/_search?pretty

输出结果

 1{
 2  "took" : 10,
 3  "timed_out" : false,
 4  "_shards" : {
 5    "total" : 1,
 6    "successful" : 1,
 7    "skipped" : 0,
 8    "failed" : 0
 9  },
10  "hits" : {
11    "total" : {
12      "value" : 1,
13      "relation" : "eq"
14    },
15    "max_score" : 1.0,
16    "hits" : [
17      {
18        "_index" : "web_index",
19        "_type" : "logs",
20        "_id" : "zuOBqHABInlWuVaas0D8",
21        "_score" : 1.0,
22        "_source" : {
23          "time" : "2020-03-05 10:20:25",
24          "msg" : "GET /js/common.js HTTP/1.1",
25          "status" : 200,
26          "agent" : "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)"
27        }
28      }
29    ]
30  }
31}

按条件查询

1curl -H "Content-Type:application/json" -X PUT 172.16.10.10:9200/web_index/logs/_query?pretty -d '
2{
3	"query":{
4		"match": {"status": 200}
5	}	
6}'

输出结果

 1{
 2  "_index" : "web_index",
 3  "_type" : "logs",
 4  "_id" : "_query",
 5  "_version" : 2,
 6  "result" : "updated",
 7  "_shards" : {
 8    "total" : 2,
 9    "successful" : 1,
10    "failed" : 0
11  },
12  "_seq_no" : 2,
13  "_primary_term" : 1
14}

删除索引

1curl -X DELETE 172.16.10.10:9200/web_index

输出结果

1{"acknowledged":true}

更多相关的操作可以查看官方的文档Elasticsearch入门

go操作elasticsearch

我们使用第三方库github.com/olivere/elastic

elastic库的官方文档https://olivere.github.io/elastic/上面有更加详细的实例

根据readme的介绍:6.X包路径为github.com/olivere/elastic 7.X的包路径为github.com/olivere/elastic/v7,根据自己安装的elasticsearch版本进行区分。

新建es_demo目录,并创建main.go文件

es_demo/main.go

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"olivere/elastic"
 7
 8	"github.com/olivere/elastic/v7"
 9)
10
11type User struct {
12	Name  string `json:"name"`
13	Age   int    `json:"age"`
14	Email string `json:"email"`
15}
16
17func main() {
18	client, err := elastic.NewClient(elastic.SetURL("http://172.16.10.10:9200"))
19	if err != nil {
20		panic(err)
21	}
22
23	fmt.Println("connect to es success")
24
25	// 新增记录
26	p1 := User{Name: "jerry", Age: 30, Email: "hzde0128@live.cn"}
27	put1, err := client.Index().
28		Index("user").
29		BodyJson(p1).
30		Do(context.Background())
31	if err != nil {
32		// Handle error
33		panic(err)
34	}
35	fmt.Printf("Indexed user %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
36}

运行测试

1go mod init
2go run main.go 
3connect to es success
4Indexed user z-OpqHABInlWuVaa2EDw to index user, type _doc

查询ES看有没有写进去

 1curl -XGET http://172.16.10.10:9200/user/_doc/_search?pretty
 2{
 3  "took" : 4,
 4  "timed_out" : false,
 5  "_shards" : {
 6    "total" : 1,
 7    "successful" : 1,
 8    "skipped" : 0,
 9    "failed" : 0
10  },
11  "hits" : {
12    "total" : {
13      "value" : 1,
14      "relation" : "eq"
15    },
16    "max_score" : 1.0,
17    "hits" : [
18      {
19        "_index" : "user",
20        "_type" : "_doc",
21        "_id" : "z-OpqHABInlWuVaa2EDw",
22        "_score" : 1.0,
23        "_source" : {
24          "name" : "jerry",
25          "age" : 30,
26          "email" : "hzde0128@live.cn"
27        }
28      }
29    ]
30  }
31}

好了,已经有了。

删除修改操作这些暂时不介绍,可以看官方文档。

kafka消费示例

前面介绍了写数据到kafka中,数据需要从消息队列里面取出最终落地到es中。简单介绍下从kafka中获取数据

创建kakfa_demo目录,并创建main.go文件

kafka_demo/main.go

 1package main
 2
 3import (
 4	"fmt"
 5
 6	"github.com/Shopify/sarama"
 7)
 8
 9// kafka 消费示例
10
11func main() {
12	broker := []string{"172.16.10.10:9092"}
13	topic := "system_log"
14
15	client, err := sarama.NewConsumer(broker, nil)
16	if err != nil {
17		fmt.Printf("fail to start consumer, err:%v\n", err)
18		return
19	}
20	fmt.Println("connect kafka success")
21
22	partitionList, err := client.Partitions(topic) // 根据topic取到所有的分区
23	if err != nil {
24		fmt.Printf("fail to get list of partition:err%v\n", err)
25		return
26	}
27	fmt.Printf("topic:%s, partition:%v\n",topic,partitionList)
28	for partition := range partitionList { // 遍历所有的分区
29		// 针对每个分区创建一个对应的分区消费者
30		cp, err := client.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
31		if err != nil {
32			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
33			return
34		}
35		defer cp.AsyncClose()
36		// 异步从每个分区消费信息
37		go func(sarama.PartitionConsumer) {
38			for msg := range cp.Messages() {
39				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
40			}
41		}(cp)
42		select {
43
44		}
45	}
46}

把之前的logAgent运行起来

1./logAgent

构建测试

1go mod init
2go build
3./kafka_demo 
4connect kafka success
5topic:system_log, partition:[0]
6Partition:0 Offset:3308 Key:[] Value:Mar  5 12:28:19 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent[67874]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/Helpers: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
7Partition:0 Offset:3309 Key:[] Value:Mar  5 12:28:19 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent[67874]): Service exited with abnormal code: 78
8Partition:0 Offset:3310 Key:[] Value:Mar  5 12:28:19 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.

已经能够从kafka中拿到日志信息了。

下面开始logTransfer服务的开发,数据已经到kafka了,现在要从kafka中消费数据,然后写到es中,logTransfer做的就是这个工作。

初始化项目目录

创建logTransfer目录,并创建kafka、es、conf三个子目录,目录结构如下

 1tree
 2.
 3├── conf              // 配置文件目录
 4│   └── config.go
 5│   └── config.ini
 6├── es                // es包
 7│   └── es.go
 8├── go.mod
 9├── kafka             // kafka包
10│   └── kafka.go
11└── main.go           // 主程序
12
133 directories, 5 files

加载配置文件

跟之前的logAgent一样,先加载配置文件,创建结构体映射,main函数调用

conf/config.ini

1[kafka]
2address = 172.16.10.10:9092
3topic = web_log
4
5[es]
6address = 172.16.10.10:9200

conf/config.go

 1package conf
 2
 3type AppConf struct{
 4  KafkaConf `ini:"kafka"`
 5  EsConf `ini:"es"`
 6}
 7
 8type KafkaConf struct{
 9  Address string `ini:"address"`
10  Topic string `ini:"topic"`
11}
12
13type EsConf struct{
14  Address string `ini:"address"`
15}

main.go

 1package main
 2
 3import (
 4	"fmt"
 5	"logTransfer/conf"
 6
 7	"gopkg.in/ini.v1"
 8)
 9
10var (
11	cfg = new(conf.AppConf)
12)
13
14func main() {
15	err := ini.MapTo(cfg, "conf/config.ini")
16	if err != nil {
17		fmt.Printf("load config failed, err:%v\n", err)
18		return
19	}
20	fmt.Println("load config success")
21}

初始化连接

es/es.go

 1package es
 2
 3import (
 4	"fmt"
 5	"strings"
 6
 7	"github.com/olivere/elastic/v7"
 8)
 9
10var (
11	client *elastic.Client
12)
13
14// Init 初始化连接
15func Init(addr string) (err error) {
16	if !strings.HasPrefix(addr, "http://") {
17		addr = "http://" + addr
18	}
19	client, err = elastic.NewClient(elastic.SetURL(addr))
20	if err != nil {
21		fmt.Printf("es connect failed, err:%v\n", err)
22		return
23	}
24	return
25}

kafka/kafka.go

 1package kafka
 2
 3import (
 4	"fmt"
 5	"sync"
 6
 7	"github.com/Shopify/sarama"
 8)
 9
10// Init 初始化kafka连接,准备发送数据给es
11func Init(addr []string, topic string) (err error) {
12	consumer, err := sarama.NewConsumer(addr, nil)
13	if err != nil {
14		fmt.Printf("fail to start consumer, err:%v\n", err)
15		return
16	}
17
18	partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区
19	if err != nil {
20		fmt.Printf("fail to get list of partition:err%v\n", err)
21		return
22	}
23	fmt.Printf("topic:%s, partition:%v\n", topic, partitionList)
24	for partition := range partitionList { // 遍历所有的分区
25		// 针对每个分区创建一个对应的分区消费者
26		var cp sarama.PartitionConsumer
27		cp, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
28		if err != nil {
29			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
30			return
31		}
32		// defer cp.AsyncClose()
33		// 异步从每个分区消费信息
34		var wg sync.WaitGroup
35		defer wg.Done()
36		wg.Add(1)
37		go func(sarama.PartitionConsumer) {
38			for msg := range cp.Messages() {
39				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
40				// 准备发送数据给es
41			}
42		}(cp)
43		wg.Wait()
44	}
45	return
46}

main.go

 1package main
 2
 3import (
 4	"fmt"
 5	"logTransfer/conf"
 6	"logTransfer/es"
 7
 8	"logTransfer/kafka"
 9
10	"gopkg.in/ini.v1"
11)
12
13var (
14	cfg = new(conf.AppConf)
15)
16
17func main() {
18	// 1.加载配置文件
19	err := ini.MapTo(cfg, "conf/config.ini")
20	if err != nil {
21		fmt.Printf("load config failed, err:%v\n", err)
22		return
23	}
24	fmt.Println("load config success")
25
26	// 2.初始化es连接
27	err = es.Init(cfg.EsConf.Address)
28	if err != nil {
29		fmt.Printf("init es failed, err:%v\n", err)
30		return
31	}
32	fmt.Println("init es success")
33
34	// 3.初始化kafka连接
35	err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.Topic)
36	if err != nil {
37		fmt.Printf("init kafka failed, err:%v\n", err)
38		return
39	}
40	fmt.Println("init kafka success")
41}

构建测试

1go build
2./logTransfer
3load config success
4init es success
5topic:system_log, partition:[0]
6Partition:0 Offset:10595 Key:[] Value:Mar  5 14:43:01 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent[97352]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/Helpers: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
7Partition:0 Offset:10596 Key:[] Value:Mar  5 14:43:01 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent[97352]): Service exited with abnormal code: 78
8Partition:0 Offset:10597 Key:[] Value:Mar  5 14:43:01 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.desktopagent): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.

OK,能正常获取到数据,后面就是将获取到的数据存入ES中。