Go运维开发之日志收集(5)根据IP地址去拉取配置日志收集项

Go运维开发之日志收集(1)收集应用程序到kafka中

Go运维开发之日志收集(2)从etcd中获取配置信息

Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask

Go运维开发之日志收集(4)监视etcd配置项的变更

前面我们已经能够从etcd实时获取到日志收集项并进行日志收集。想象这样一个问题,有一堆的服务器,而且服务部署的也不尽相同,直接简单粗暴的向所有服务器收集全部的配置项也不现实。这个时候就需要我们对这些业务进行区分,比如根据IP地址去拉取自己服务器的配置项。

获取当前服务器IP地址

我们使用net标准库来获取本机IP

新建一个ip_demo的目录,并创建main.go文件

ip_demo/main.go

 1package main
 2
 3import (
 4	"fmt"
 5	"net"
 6	"strings"
 7)
 8
 9func main() {
10	conn, err := net.Dial("udp", "8.8.8.8:80")
11	if err != nil {
12		return
13	}
14	defer conn.Close()
15	localAddr := conn.LocalAddr().(*net.UDPAddr)
16	fmt.Println(localAddr.String())
17	ip := strings.Split(localAddr.IP.String(), ":")[0]
18	fmt.Println(ip)
19}

运行测试一下

1go build
2./ip_demo
3192.168.10.117:50081
4192.168.10.117

这个拿到的是跟外网通信的服务的IP地址,如果你的业务服务器在内网不能连上互联网或者服务器有多个网卡,内部通信用的单独的网断,获取到的将会不准确。这个时候需要将Dail的地址改成中心服务器的IP。

新增utils包

创建utils目录并创建ip.go文件

utils/ip.go

 1package utils
 2
 3import (
 4"net"
 5"strings"
 6)
 7
 8// GetOutboundIP 获取本地对外IP
 9func GetOutboundIP(addr string) (ip string, err error) {
10	conn, err := net.Dial("udp", addr)
11	if err != nil {
12		return
13	}
14	defer conn.Close()
15	localAddr := conn.LocalAddr().(*net.UDPAddr)
16	//fmt.Println(localAddr.String())
17	ip = strings.Split(localAddr.IP.String(), ":")[0]
18	return
19}

这里我假设一下,我的中心服务器的IP172.16.10.10(包括etcd和kafka都部署在这台服务器上),我日志收集客户端在172.16.10.1(也就是我这台电脑)

新增配置项,增加运维平台服务器的IP地址和端口

conf/config.ini

 1[center]
 2address = 172.16.10.10:80
 3
 4[kafka]
 5address = 172.16.10.10:9092
 6chan_max_size=100000
 7
 8[etcd]
 9address = 172.16.10.10:2379
10timeout = 5
11collect_log_key = /logagent/%s/collect # 根据本机IP收集的日志的key

然后修改对应的结构体文件conf/config.go

1type AppConf struct {
2	CenterConf `ini:"center"`
3	KafkaConf `ini:"kafka"`
4	EtcdConf  `ini:"etcd"`
5}
6
7type CenterConf struct {
8	Address string `ini:"address"`
9}

修改main.go文件中的main函数,在初始化etcd之后,先获取到当前服务器的IP

 1  // 为了实现每个logagent都拉取自己独有的配置,所以要以自己的IP地址作为区分
 2	ipStr, err := utils.GetOutboundIP(cfg.CenterConf.Address)
 3	if err != nil {
 4		panic(err)  // 获取不到,直接panic
 5	}
 6	etcdConfKey := fmt.Sprintf(cfg.EtcdConf.Key, ipStr)
 7	// 2.1 从etcd中获取日志收集项的配置信息
 8	logEntryConf, err := etcd.GetConf(etcdConfKey)
 9	if err != nil {
10		fmt.Printf("get conf from etcd failed,err:%v\n", err)
11		return
12	}
13	fmt.Printf("get conf from etcd success, %v\n", logEntryConf)
14	for index, value := range logEntryConf {
15		fmt.Printf("index:%v value:%v\n", index, value)
16	}
17
18	// 3. 收集日志发往Kafka
19	taillog.Init(logEntryConf)
20	// 因为NewConfChan访问了tskMgr的newConfChan, 这个channel是在taillog.Init(logEntryConf) 执行的初始化
21	newConfChan := taillog.NewConfChan() // 从taillog包中获取对外暴露的通道
22	wg.Add(1)
23	go etcd.WatchConf(etcdConfKey, newConfChan) // 哨兵发现最新的配置信息会通知上面的那个通道
24	wg.Wait()

定义好key的格式如下/logagent/172.16.10.1/collect

构建测试

重新构建程序,并项etcd服务器put不同的key进行测试

1go build
2./logAgent
3init kafka success
4init etcd success.
5get conf from etcd success, []

put到原先的key上

1etcdctl --endpoints=http://127.0.0.1:2379 put /logagent/collect '[{"path":"./mysql.log","topic":"mysql_log"}]'
2OK

put之前的key,没反应。我们再put到/logagent/collect_172.16.10.1这个key试试

1etcdctl --endpoints=http://127.0.0.1:2379 put /logagent/172.16.10.1/collect '[{"path":"/private/var/log/system.log","topic":"system_log"}]'
2OK

logAgent输出如下

 1Type:PUT key:/logagent/172.16.10.1/collect value:[{"path":"/private/var/log/system.log","topic":"system_log"}]
 2 get new conf:[0xc00000e500]
 3新的配置来了! [0xc00000e500]
 42020/03/04 21:23:54 Seeked /private/var/log/system.log - &{Offset:0 Whence:2}
 5get log data from /private/var/log/system.log success, log:Mar  4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[43963]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/MacOS: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
 6get log data from /private/var/log/system.log success, log:Mar  4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper[43964]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/Helpers: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
 7get log data from /private/var/log/system.log success, log:Mar  4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[43963]): Service exited with abnormal code: 78
 8get log data from /private/var/log/system.log success, log:Mar  4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.
 9get log data from /private/var/log/system.log success, log:Mar  4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper[43964]): Service exited with abnormal code: 78
10get log data from /private/var/log/system.log success, log:Mar  4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.

我们再检查下kafka的system_log

1bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic system_log --from-beginning
2Mar  4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[43963]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/MacOS: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
3Mar  4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper[43964]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/Helpers: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
4Mar  4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[43963]): Service exited with abnormal code: 78
5Mar  4 21:24:00 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.

已经成功获取到日志。logAgent的开发暂时就到这里,后面再优化,比如用第三方日志库logrus等记录日志,记录当前日志的读取位置方便重启之后继续从上次的位置开始读等。后续开发logTransfer将从kafka里面获取日志信息入库。可以存elasticsearch,供kibana进行可视化查询。