executor.go 9.5 KB


  1. package hookexecutor
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net"
  10. "os"
  11. "time"
  12. "github.com/kpmy/xippo/c2s/stream"
  13. "github.com/kpmy/xippo/entity"
  14. "github.com/ugorji/go/codec"
  15. )
  16. const (
  17. DefaultAddr = "0.0.0.0:1984"
  18. DefaultJSONAddr = "0.0.0.0:1985"
  19. DefaultInboxBufferSize = 4
  20. DefaultOutboxBufferSize = 4
  21. DefaultClientBufferSize = 8
  22. DefaultHeartbeatTrigger = 5 * time.Second
  23. DefaultHeartbeatTimeout = 10 * time.Second
  24. DefaultMessageLengthCap = 4 * 1024
  25. DefaultDelayForMessages = 1 * time.Second
  26. )
  27. type IncomingEvent struct {
  28. Type string
  29. Data map[string]string
  30. }
  31. type Message struct {
  32. *IncomingEvent
  33. ID int
  34. }
  35. type MessageWriter func(conn net.Conn, timeout time.Duration, msg *Message) error
  36. type MessageReader func(conn net.Conn, timeout time.Duration) (*Message, error)
  37. type clientReply struct {
  38. outbox chan *Message
  39. info *clientInfo
  40. }
  41. type clientInfo struct {
  42. inbox chan *Message
  43. stop chan struct{}
  44. }
  45. type Executor struct {
  46. listener net.Listener
  47. jsonListener net.Listener
  48. xmppStream stream.Stream
  49. logger *log.Logger
  50. inbox chan *IncomingEvent
  51. outbox chan *Message
  52. cmdInbox chan string
  53. clientRequests chan chan clientReply
  54. stop chan struct{}
  55. clients []*clientInfo
  56. counter int
  57. }
  58. func NewExecutor(s stream.Stream) *Executor {
  59. return &Executor{
  60. nil,
  61. nil,
  62. s,
  63. log.New(os.Stderr, "[hookexecutor] ", log.LstdFlags),
  64. make(chan *IncomingEvent, DefaultInboxBufferSize),
  65. make(chan *Message, DefaultOutboxBufferSize),
  66. make(chan string, DefaultInboxBufferSize),
  67. make(chan chan clientReply, DefaultInboxBufferSize),
  68. make(chan struct{}),
  69. nil,
  70. 0,
  71. }
  72. }
  73. func (exc *Executor) Start() {
  74. go exc.ListenAndServe(DefaultAddr)
  75. go exc.ListenAndServeJSON(DefaultJSONAddr)
  76. go exc.processEvents()
  77. go exc.sendToBot()
  78. }
  79. func (exc *Executor) Stop() {
  80. close(exc.inbox)
  81. close(exc.cmdInbox)
  82. close(exc.stop)
  83. }
  84. func (exc *Executor) Run(cmd string) {
  85. exc.cmdInbox <- cmd
  86. }
  87. func (exc *Executor) NewEvent(e IncomingEvent) {
  88. select {
  89. case exc.inbox <- &e:
  90. default:
  91. log.Println("executor is blocked")
  92. }
  93. }
  94. func stopPanic(exc *Executor, where string, callback func(err error)) {
  95. if err := recover(); err != nil {
  96. exc.logger.Printf("catched panic in %s: %s", where, err)
  97. if callback != nil {
  98. if realErr, ok := err.(error); ok {
  99. go callback(realErr)
  100. } else {
  101. go callback(fmt.Errorf("%v", err))
  102. }
  103. }
  104. }
  105. }
  106. func (exc *Executor) ListenAndServe(addr string) {
  107. defer stopPanic(exc, "listener", func(_ error) { exc.ListenAndServe(addr) })
  108. listener, err := net.Listen("tcp", addr)
  109. if err != nil {
  110. exc.logger.Printf("failed to start listener, hooker disabled: %v", err)
  111. return
  112. }
  113. defer listener.Close()
  114. for {
  115. conn, err := listener.Accept()
  116. if err != nil {
  117. exc.logger.Printf("failed to accept new connection: %v", err)
  118. return
  119. }
  120. inbox, outbox := exc.createClient()
  121. stop := make(chan struct{})
  122. errors := make(chan error, 2)
  123. go exc.clientWriter(inbox, WriteMessage, conn, errors, stop)
  124. go exc.clientReader(outbox, ReadMessage, conn, errors, stop)
  125. go exc.stopOnError(stop, errors)
  126. }
  127. }
  128. func (exc *Executor) ListenAndServeJSON(addr string) {
  129. defer stopPanic(exc, "listener", func(_ error) { exc.ListenAndServeJSON(addr) })
  130. listener, err := net.Listen("tcp", addr)
  131. if err != nil {
  132. exc.logger.Printf("failed to start listener, hooker disabled: %v", err)
  133. return
  134. }
  135. defer listener.Close()
  136. for {
  137. conn, err := listener.Accept()
  138. if err != nil {
  139. exc.logger.Printf("failed to accept new connection: %v", err)
  140. return
  141. }
  142. inbox, outbox := exc.createClient()
  143. stop := make(chan struct{})
  144. errors := make(chan error, 2)
  145. go exc.clientWriter(inbox, WriteJSONMessage, conn, errors, stop)
  146. go exc.clientReader(outbox, ReadJSONMessage, conn, errors, stop)
  147. go exc.stopOnError(stop, errors)
  148. }
  149. }
  150. func (exc *Executor) clientWriter(inbox chan *Message, write MessageWriter, conn net.Conn, errors chan error, stop chan struct{}) {
  151. defer stopPanic(exc, "clientWriter",
  152. func(err error) {
  153. exc.logger.Printf("catched panic in writer: %v", err)
  154. errors <- err
  155. })
  156. defer conn.Close()
  157. heartbeatTicker := time.NewTicker(DefaultHeartbeatTrigger)
  158. defer heartbeatTicker.Stop()
  159. for {
  160. select {
  161. case msg, ok := <-inbox:
  162. if !ok {
  163. close(stop)
  164. return
  165. }
  166. err := write(conn, DefaultHeartbeatTimeout, msg)
  167. if err != nil {
  168. exc.logger.Printf("failed to write message: %v", err)
  169. errors <- err
  170. return
  171. }
  172. case <-heartbeatTicker.C:
  173. ping := &Message{&IncomingEvent{"ping", nil}, -1}
  174. err := write(conn, DefaultHeartbeatTimeout, ping)
  175. if err != nil {
  176. exc.logger.Printf("failed to write ping message: %v", err)
  177. errors <- err
  178. return
  179. }
  180. case <-stop:
  181. return
  182. }
  183. }
  184. }
  185. func (exc *Executor) clientReader(outbox chan *Message, read MessageReader, conn net.Conn, errors chan error, stop chan struct{}) {
  186. defer stopPanic(exc, "clientReader",
  187. func(err error) {
  188. exc.logger.Printf("catched panic in reader: %v", err)
  189. errors <- err
  190. })
  191. defer conn.Close()
  192. for {
  193. msg, err := read(conn, DefaultHeartbeatTimeout)
  194. if err != nil {
  195. exc.logger.Printf("failed to read message: %v", err)
  196. errors <- err
  197. return
  198. }
  199. if msg.Type == "pong" {
  200. // ignore pongs, they are for resetting timeouts
  201. continue
  202. }
  203. select {
  204. case outbox <- msg:
  205. case <-stop:
  206. return
  207. }
  208. }
  209. }
  210. func (exc *Executor) stopOnError(stop chan struct{}, errors chan error) {
  211. defer stopPanic(exc, "stopper", nil)
  212. <-errors
  213. close(stop)
  214. }
  215. func ReadMessage(conn net.Conn, timeout time.Duration) (*Message, error) {
  216. conn.SetReadDeadline(time.Now().Add(DefaultHeartbeatTimeout))
  217. var lengthBuf [2]byte
  218. _, err := conn.Read(lengthBuf[:])
  219. if err != nil {
  220. return nil, err
  221. }
  222. length := int(binary.BigEndian.Uint16(lengthBuf[:]))
  223. if length > DefaultMessageLengthCap {
  224. return nil, errors.New("message is too long")
  225. }
  226. buf := bytes.NewBuffer(make([]byte, 0, length))
  227. _, err = io.CopyN(buf, conn, int64(length))
  228. if err != nil {
  229. return nil, err
  230. }
  231. var handle = &codec.MsgpackHandle{}
  232. var decoder = codec.NewDecoder(buf, handle)
  233. var result = &Message{}
  234. err = decoder.Decode(result)
  235. if err != nil {
  236. return nil, err
  237. }
  238. return result, nil
  239. }
  240. func ReadJSONMessage(conn net.Conn, timeout time.Duration) (*Message, error) {
  241. conn.SetReadDeadline(time.Now().Add(DefaultHeartbeatTimeout))
  242. var handle = &codec.JsonHandle{}
  243. var decoder = codec.NewDecoder(conn, handle)
  244. var result = &Message{}
  245. err := decoder.Decode(result)
  246. if err != nil {
  247. return nil, err
  248. }
  249. return result, nil
  250. }
  251. func WriteMessage(conn net.Conn, timeout time.Duration, msg *Message) error {
  252. var handle = &codec.MsgpackHandle{}
  253. var buf []byte
  254. var encoder = codec.NewEncoderBytes(&buf, handle)
  255. err := encoder.Encode(msg)
  256. if err != nil {
  257. return err
  258. }
  259. length := len(buf)
  260. if length > DefaultMessageLengthCap {
  261. return errors.New("message is too long")
  262. }
  263. var lengthBuf [2]byte
  264. binary.BigEndian.PutUint16(lengthBuf[:], uint16(length))
  265. conn.SetWriteDeadline(time.Now().Add(timeout))
  266. _, err = conn.Write(lengthBuf[:])
  267. if err != nil {
  268. return err
  269. }
  270. _, err = io.Copy(conn, bytes.NewBuffer(buf))
  271. return err
  272. }
  273. func WriteJSONMessage(conn net.Conn, timeout time.Duration, msg *Message) error {
  274. var handle = &codec.JsonHandle{}
  275. var buf []byte
  276. var encoder = codec.NewEncoderBytes(&buf, handle)
  277. err := encoder.Encode(msg)
  278. if err != nil {
  279. return err
  280. }
  281. conn.SetWriteDeadline(time.Now().Add(timeout))
  282. _, err = io.Copy(conn, bytes.NewBuffer(buf))
  283. if err != nil {
  284. return err
  285. }
  286. _, err = conn.Write([]byte{'\n'})
  287. return err
  288. }
  289. func (exc *Executor) createClient() (inbox, outbox chan *Message) {
  290. reply := make(chan clientReply, 1)
  291. exc.clientRequests <- reply
  292. r := <-reply
  293. return r.info.inbox, r.outbox
  294. }
  295. func (exc *Executor) processEvents() {
  296. defer stopPanic(exc, "processEvents", func(_ error) { exc.processEvents() })
  297. for {
  298. select {
  299. case msg := <-exc.inbox:
  300. message := &Message{msg, exc.counter}
  301. exc.sendMessage(message)
  302. exc.counter++
  303. case cmd := <-exc.cmdInbox:
  304. // TODO(mechmind): handle cmds
  305. exc.logger.Printf("ignoring cmd: '%s'", cmd)
  306. case req := <-exc.clientRequests:
  307. outbox := exc.outbox
  308. info := &clientInfo{
  309. inbox: make(chan *Message, DefaultClientBufferSize),
  310. stop: make(chan struct{}),
  311. }
  312. exc.clients = append(exc.clients, info)
  313. req <- clientReply{outbox, info}
  314. }
  315. }
  316. }
  317. func (exc *Executor) sendMessage(msg *Message) {
  318. deadClientIDs := []int{}
  319. for idx, ch := range exc.clients {
  320. select {
  321. case ch.inbox <- msg:
  322. default:
  323. deadClientIDs = append(deadClientIDs, idx)
  324. }
  325. }
  326. if len(deadClientIDs) == 0 {
  327. return
  328. }
  329. aliveClients := make([]*clientInfo, 0, len(exc.clients)-len(deadClientIDs))
  330. currentID := 0
  331. for idx, client := range exc.clients {
  332. if currentID < len(deadClientIDs) && idx == deadClientIDs[currentID] {
  333. // client is dead, drop him
  334. close(client.inbox)
  335. currentID++
  336. } else {
  337. // client alive, take him
  338. aliveClients = append(aliveClients, client)
  339. }
  340. }
  341. exc.clients = aliveClients
  342. }
  343. func (exc *Executor) sendToBot() {
  344. defer stopPanic(exc, "sendToBot", func(_ error) { go exc.sendToBot() })
  345. for {
  346. select {
  347. case msg := <-exc.outbox:
  348. exc.SendMessageToBot(msg)
  349. time.Sleep(DefaultDelayForMessages)
  350. case <-exc.stop:
  351. return
  352. }
  353. }
  354. }
  355. func (exc *Executor) SendMessageToBot(msg *Message) {
  356. m := entity.MSG(entity.GROUPCHAT)
  357. m.To = "golang@conference.jabber.ru"
  358. m.Body = msg.IncomingEvent.Data["body"]
  359. err := exc.xmppStream.Write(entity.ProduceStatic(m))
  360. if err != nil {
  361. exc.logger.Printf("failed to write message to xmpp stream: %v", err)
  362. }
  363. }