Go运维开发之日志收集(5)根据IP地址去拉取配置日志收集项
Go运维开发之日志收集(5)根据IP地址去拉取配置日志收集项
Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask
前面我们已经能够从etcd实时获取到日志收集项并进行日志收集。想象这样一个问题,有一堆的服务器,而且服务部署的也不尽相同,直接简单粗暴的向所有服务器收集全部的配置项也不现实。这个时候就需要我们对这些业务进行区分,比如根据IP地址去拉取自己服务器的配置项。
获取当前服务器IP地址
我们使用net标准库来获取本机IP
新建一个ip_demo的目录,并创建main.go文件
ip_demo/main.go
1package main
2
3import (
4 "fmt"
5 "net"
6 "strings"
7)
8
9func main() {
10 conn, err := net.Dial("udp", "8.8.8.8:80")
11 if err != nil {
12 return
13 }
14 defer conn.Close()
15 localAddr := conn.LocalAddr().(*net.UDPAddr)
16 fmt.Println(localAddr.String())
17 ip := strings.Split(localAddr.IP.String(), ":")[0]
18 fmt.Println(ip)
19}
运行测试一下
1go build
2./ip_demo
3192.168.10.117:50081
4192.168.10.117
这个拿到的是跟外网通信的服务的IP地址,如果你的业务服务器在内网不能连上互联网或者服务器有多个网卡,内部通信用的单独的网断,获取到的将会不准确。这个时候需要将Dail的地址改成中心服务器的IP。
新增utils包
创建utils目录并创建ip.go
文件
utils/ip.go
1package utils
2
3import (
4"net"
5"strings"
6)
7
8// GetOutboundIP 获取本地对外IP
9func GetOutboundIP(addr string) (ip string, err error) {
10 conn, err := net.Dial("udp", addr)
11 if err != nil {
12 return
13 }
14 defer conn.Close()
15 localAddr := conn.LocalAddr().(*net.UDPAddr)
16 //fmt.Println(localAddr.String())
17 ip = strings.Split(localAddr.IP.String(), ":")[0]
18 return
19}
这里我假设一下,我的中心服务器的IP172.16.10.10(包括etcd和kafka都部署在这台服务器上),我日志收集客户端在172.16.10.1(也就是我这台电脑)
新增配置项,增加运维平台服务器的IP地址和端口
conf/config.ini
1[center]
2address = 172.16.10.10:80
3
4[kafka]
5address = 172.16.10.10:9092
6chan_max_size=100000
7
8[etcd]
9address = 172.16.10.10:2379
10timeout = 5
11collect_log_key = /logagent/%s/collect # 根据本机IP收集的日志的key
然后修改对应的结构体文件conf/config.go
1type AppConf struct {
2 CenterConf `ini:"center"`
3 KafkaConf `ini:"kafka"`
4 EtcdConf `ini:"etcd"`
5}
6
7type CenterConf struct {
8 Address string `ini:"address"`
9}
修改main.go文件中的main函数,在初始化etcd之后,先获取到当前服务器的IP
1 // 为了实现每个logagent都拉取自己独有的配置,所以要以自己的IP地址作为区分
2 ipStr, err := utils.GetOutboundIP(cfg.CenterConf.Address)
3 if err != nil {
4 panic(err) // 获取不到,直接panic
5 }
6 etcdConfKey := fmt.Sprintf(cfg.EtcdConf.Key, ipStr)
7 // 2.1 从etcd中获取日志收集项的配置信息
8 logEntryConf, err := etcd.GetConf(etcdConfKey)
9 if err != nil {
10 fmt.Printf("get conf from etcd failed,err:%v\n", err)
11 return
12 }
13 fmt.Printf("get conf from etcd success, %v\n", logEntryConf)
14 for index, value := range logEntryConf {
15 fmt.Printf("index:%v value:%v\n", index, value)
16 }
17
18 // 3. 收集日志发往Kafka
19 taillog.Init(logEntryConf)
20 // 因为NewConfChan访问了tskMgr的newConfChan, 这个channel是在taillog.Init(logEntryConf) 执行的初始化
21 newConfChan := taillog.NewConfChan() // 从taillog包中获取对外暴露的通道
22 wg.Add(1)
23 go etcd.WatchConf(etcdConfKey, newConfChan) // 哨兵发现最新的配置信息会通知上面的那个通道
24 wg.Wait()
定义好key的格式如下/logagent/172.16.10.1/collect
构建测试
重新构建程序,并项etcd服务器put不同的key进行测试
1go build
2./logAgent
3init kafka success
4init etcd success.
5get conf from etcd success, []
put到原先的key上
1etcdctl --endpoints=http://127.0.0.1:2379 put /logagent/collect '[{"path":"./mysql.log","topic":"mysql_log"}]'
2OK
put之前的key,没反应。我们再put到/logagent/collect_172.16.10.1
这个key试试
1etcdctl --endpoints=http://127.0.0.1:2379 put /logagent/172.16.10.1/collect '[{"path":"/private/var/log/system.log","topic":"system_log"}]'
2OK
logAgent输出如下
1Type:PUT key:/logagent/172.16.10.1/collect value:[{"path":"/private/var/log/system.log","topic":"system_log"}]
2 get new conf:[0xc00000e500]
3新的配置来了! [0xc00000e500]
42020/03/04 21:23:54 Seeked /private/var/log/system.log - &{Offset:0 Whence:2}
5get log data from /private/var/log/system.log success, log:Mar 4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[43963]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/MacOS: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
6get log data from /private/var/log/system.log success, log:Mar 4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper[43964]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/Helpers: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
7get log data from /private/var/log/system.log success, log:Mar 4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[43963]): Service exited with abnormal code: 78
8get log data from /private/var/log/system.log success, log:Mar 4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.
9get log data from /private/var/log/system.log success, log:Mar 4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper[43964]): Service exited with abnormal code: 78
10get log data from /private/var/log/system.log success, log:Mar 4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.
我们再检查下kafka的system_log
1bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic system_log --from-beginning
2Mar 4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[43963]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/MacOS: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
3Mar 4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper[43964]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/Helpers: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
4Mar 4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[43963]): Service exited with abnormal code: 78
5Mar 4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.
已经成功获取到日志。logAgent的开发暂时就到这里,后面再优化,比如用第三方日志库logrus等记录日志,记录当前日志的读取位置方便重启之后继续从上次的位置开始读等。后续开发logTransfer将从kafka里面获取日志信息入库。可以存elasticsearch,供kibana进行可视化查询。
- 原文作者:黄忠德
- 原文链接:https://huangzhongde.cn/post/2020-03-04-golang_devops_logAgent_5_get_conf_adapter_ipaddr/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。