背景

由于工作需要,在web端执行相关的部署操作,能够在页面实时展示部署任务的实时日志信息,使用到websocket来实现。

websocket通信特点

  • 全双工通信协议
  • 允许服务端主动向客户端推送数据
  • 浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

实现过程

参考了gorilla/websocket上的command示例,改成gin框架的

代码如下:

  1package main
  2
  3import (
  4  "bufio"
  5  "flag"
  6  "io"
  7  "log"
  8  "net/http"
  9  "os"
 10  "time"
 11
 12  "github.com/gin-gonic/gin"
 13  "github.com/gorilla/websocket"
 14)
 15
 16const (
 17  // Time allowed to write a message to the peer.
 18  writeWait = 10 * time.Second
 19
 20  // Maximum message size allowed from peer.
 21  maxMessageSize = 8192
 22
 23  // Time allowed to read the next pong message from the peer.
 24  pongWait = 60 * time.Second
 25
 26  // Send pings to peer with this period. Must be less than pongWait.
 27  pingPeriod = (pongWait * 9) / 10
 28
 29  // Time to wait before force close on connection.
 30  closeGracePeriod = 10 * time.Second
 31)
 32
 33var upgrader = websocket.Upgrader{}
 34
 35func main() {
 36
 37  r := gin.Default()
 38  r.LoadHTMLFiles("index.html")
 39  r.GET("/", func(ctx *gin.Context) {
 40    ctx.HTML(http.StatusOK, "index.html", nil)
 41  })
 42  r.GET("/ws", serveWS)
 43  r.Run()
 44}
 45
 46func pumpStdin(ws *websocket.Conn, w io.Writer) {
 47  defer ws.Close()
 48  ws.SetReadLimit(maxMessageSize)
 49  ws.SetReadDeadline(time.Now().Add(pongWait))
 50  ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
 51  for {
 52    _, message, err := ws.ReadMessage()
 53    if err != nil {
 54      break
 55    }
 56    message = append(message, '\n')
 57    if _, err := w.Write(message); err != nil {
 58      break
 59    }
 60  }
 61}
 62
 63func pumpStdout(ws *websocket.Conn, r io.Reader, done chan struct{}) {
 64  defer func() {
 65  }()
 66  s := bufio.NewScanner(r)
 67  for s.Scan() {
 68    ws.SetWriteDeadline(time.Now().Add(writeWait))
 69    if err := ws.WriteMessage(websocket.TextMessage, s.Bytes()); err != nil {
 70      ws.Close()
 71      break
 72    }
 73  }
 74  if s.Err() != nil {
 75    log.Println("scan:", s.Err())
 76  }
 77  close(done)
 78
 79  ws.SetWriteDeadline(time.Now().Add(writeWait))
 80  ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
 81  time.Sleep(closeGracePeriod)
 82  ws.Close()
 83}
 84
 85func serveWS(ctx *gin.Context) {
 86  ws, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
 87  if err != nil {
 88    log.Fatalf("upgrade failed, %s", err.Error())
 89    return
 90  }
 91
 92  defer ws.Close()
 93
 94  outr, outw, err := os.Pipe()
 95  if err != nil {
 96    ctx.JSON(http.StatusOK, gin.H{"stdout": err.Error()})
 97    return
 98  }
 99  defer outr.Close()
100  defer outw.Close()
101
102  inr, inw, err := os.Pipe()
103  if err != nil {
104    log.Println("stdin", err)
105    ws.WriteMessage(websocket.TextMessage, []byte("Internal server error."))
106    return
107  }
108  defer inr.Close()
109  defer inw.Close()
110
111  proc, err := os.StartProcess("/bin/sh", flag.Args(), &os.ProcAttr{
112    Files: []*os.File{inr, outw, outw},
113  })
114  if err != nil {
115    log.Println("start", err)
116    ws.WriteMessage(websocket.TextMessage, []byte("Internal server error."))
117    return
118  }
119
120  inr.Close()
121  outw.Close()
122
123  stdoutDone := make(chan struct{})
124  go pumpStdout(ws, outr, stdoutDone)
125  go ping(ws, stdoutDone)
126
127  pumpStdin(ws, inw)
128
129  // Some commands will exit when stdin is closed.
130  inw.Close()
131
132  // Other commands need a bonk on the head.
133  if err := proc.Signal(os.Interrupt); err != nil {
134    log.Println("inter:", err)
135  }
136
137  select {
138  case <-stdoutDone:
139  case <-time.After(time.Second):
140    // A bigger bonk on the head.
141    if err := proc.Signal(os.Kill); err != nil {
142      log.Println("term:", err)
143    }
144    <-stdoutDone
145  }
146
147  if _, err := proc.Wait(); err != nil {
148    log.Println("wait:", err)
149  }
150}
151
152func ping(ws *websocket.Conn, done chan struct{}) {
153  ticker := time.NewTicker(pingPeriod)
154  defer ticker.Stop()
155  for {
156    select {
157    case <-ticker.C:
158      if err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)); err != nil {
159        log.Println("ping:", err)
160      }
161    case <-done:
162      return
163    }
164  }
165}

效果演示

执行ansible任务

websocket-ansible

执行ping操作

websocket-ping