Go运维开发之日志收集(7)将日志入库到Elasticsearch并通过Kibana进行展示
Go运维开发之日志收集(7)将日志入库到Elasticsearch并通过Kibana进行展示
Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask
Go运维开发之日志收集(5)根据IP地址去拉取配置日志收集项
前面我们已经kafka取到日志文件了,接下来就是保存到ES中。
项目整理
我们之前开发的叫logAgent和现在的logTransfer是一起的,我把它整理到一个repo了
改成logCollect了,在这个项目下创建2个服务:logAgent和logTransfer
将日志保存到ES中
es包创建一个SendToEs
的方法将数据保存到ES。
es/es.go
1// SendToEs 获取记录存入Es中
2func SendToEs(index string, data interface{}) {
3 // 新增记录
4 // p1 := User{Name: "jerry", Age: 30, Email: "hzde0128@live.cn"}
5 put1, err := client.Index().
6 Index(index).
7 BodyJson(data).
8 Do(context.Background())
9 if err != nil {
10 // Handle error
11 panic(err)
12 }
13 fmt.Printf("Indexed %s %s to index %s, type %s\n", index, put1.Id, put1.Index, put1.Type)
14}
我们从前面的示例中看到它接收的是一组JSON串,之前是直接在终端打印,我们从那个地方开始改造。
kafka/kafka.go
1// LogData 造一个结构体存放json数据
2type LogData struct {
3 Data string `json:"data"`
4}
5
6// Init 初始化kafka连接,准备发送数据给es
7func Init(addr []string, topic string) (err error) {
8 consumer, err := sarama.NewConsumer(addr, nil)
9 if err != nil {
10 fmt.Printf("fail to start consumer, err:%v\n", err)
11 return
12 }
13
14 partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区
15 if err != nil {
16 fmt.Printf("fail to get list of partition:err%v\n", err)
17 return
18 }
19 fmt.Printf("topic:%s, partition:%v\n", topic, partitionList)
20 for partition := range partitionList { // 遍历所有的分区
21 // 针对每个分区创建一个对应的分区消费者
22 var cp sarama.PartitionConsumer
23 cp, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
24 if err != nil {
25 fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
26 return
27 }
28 // defer cp.AsyncClose()
29 // 异步从每个分区消费信息
30 var wg sync.WaitGroup
31 defer wg.Done()
32 wg.Add(1)
33 go func(sarama.PartitionConsumer) {
34 for msg := range cp.Messages() {
35 fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
36 // 准备发送数据给es
37 ld := LogData{
38 Data: string(msg.Value),
39 }
40 // 拼装好数据发给ES
41 es.SendToEs(topic, ld)
42 }
43 }(cp)
44 wg.Wait()
45 }
46 return
47}
构建测试
1go build
2./logTransfer
3load config success
4init es success
5topic:system_log, partition:[0]
6Partition:0 Offset:12675 Key:[] Value:Mar 5 15:55:57 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[8024]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/MacOS: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
7Indexed system_log 2uOxqXABInlWuVaaqUAl to index system_log, type _doc
已经看到有数据写入ES了,现在我们通过Kibana进行查看
Kibana查询
打开浏览器,输入kibana服务器的地址,我这是http://172.16.10.10:5601
查看我们的索引列表
可以看到我们之前创建的索引文件都在上面
创建索引模式
创建完再打开discover可以看到我们的日志了,具体操作不赘述。
函数调用函数优化
kafka调用es.SendToEs
的时候是函数调用函数的方式,我们可以先存入一个通道中,将通道的大小放入配置文件
conf/config.ini
1[es]
2address = 172.16.10.10:9200
3max_chan_size = 100000
4nums = 16
修改结构体*conf/config.go
*
1type EsConf struct {
2 Address string `ini:"address"`
3 MaxChanSize int `ini:"max_chan_size"`
4 Nums int `ini:"nums"`
5}
在es包中创建一个channel
es/es.go
1type LogData struct {
2 Topic string `json:"topic"`
3 Data string `json:"data"`
4}
5
6var (
7 client *elastic.Client
8 ch chan *LogData
9)
10
11func Init(address string, maxChanSize, nums int) (err error) {
12 if !strings.HasPrefix(address, "http://") {
13 address = "http://" + address
14 }
15 client, err = elastic.NewClient(elastic.SetURL(address))
16 if err != nil {
17 // Handle error
18 return err
19 }
20 fmt.Println("connect to es success")
21 ch = make(chan *LogData, maxChanSize)
22 for i:=0;i<nums;i++{
23 go sendToEs()
24 }
25 return
26}
27
28func SendToChan(msg *LogData) {
29 ch <- msg
30}
31
32func sendToEs() {
33 // 链式操作
34 for {
35 select {
36 case msg := <-ch:
37 put1, err := client.Index().
38 Index(msg.Topic).
39 BodyJson(msg).
40 Do(context.Background())
41 if err != nil {
42 // Handle error
43 fmt.Println(err)
44 continue
45 }
46 fmt.Printf("Indexed %s %v to index %s, type %s\n", msg.Topic, put1.Id, put1.Index, put1.Type)
47 default:
48 time.Sleep(time.Second)
49 }
50 }
51}
修改kafka/kafka.go
的执行写ES的入口
1 // 准备发送数据给es
2 ld := es.LogData{Topic: topic, Data:string(msg.Value)}
3 //es.SendToES(topic, ld) // 函数调用函数
4 // 优化一下: 直接放到一个chan中
5 es.SendToChan(&ld)
修改main.go
函数es初始化的时候把MaxChanSize加上
main.go
1 err = es.Init(cfg.EsConf.Address, cfg.EsConf.MaxChanSize, cfg.EsConf.Nums)
2 if err != nil {
3 fmt.Printf("init ES client failed,err:%v\n", err)
4 return
5 }
6 fmt.Println("init es success.")
构建测试
删除之前的索引和记录,方便查找
1curl -X DELETE http://172.16.10.10:9200/system_log
2{"acknowledged":true}
重新构建,并观察kibana
1go buid
2./logTransfer
3load config success
4connect to es success
5init es success.
6topic:system_log, partition:[0]
7Partition:0 Offset:15797 Key:[] Value:Mar 5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[14557]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/MacOS: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
8Partition:0 Offset:15798 Key:[] Value:Mar 5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper[14558]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/Helpers: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
9Partition:0 Offset:15799 Key:[] Value:Mar 5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[14557]): Service exited with abnormal code: 78
10Partition:0 Offset:15800 Key:[] Value:Mar 5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.
11Partition:0 Offset:15801 Key:[] Value:Mar 5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper[14558]): Service exited with abnormal code: 78
12Partition:0 Offset:15802 Key:[] Value:Mar 5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.
13Indexed system_log h-P2qXABInlWuVaajUdd to index system_log, type _doc
14Indexed system_log iOP2qXABInlWuVaajUdd to index system_log, type _doc
15Indexed system_log huP2qXABInlWuVaajUdd to index system_log, type _doc
16Indexed system_log heP2qXABInlWuVaajUdb to index system_log, type _doc
17Indexed system_log ieP2qXABInlWuVaajUe9 to index system_log, type _doc
18Indexed system_log hOP2qXABInlWuVaajUda to index system_log, type _doc
看日志是发送成功了,登录kibana进行查看
已经能看到最新的数据了。
后期开发计划:
-
topic通过etcd来进行监视,实现灵活
-
ES索引按天划分,防止日志量过大导致日志查询慢
-
日志优化…
Github代码地址:logCollect
- 原文作者:黄忠德
- 原文链接:https://huangzhongde.cn/post/2020-03-05-golang_devops_logAgent_7_write_to_es/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。