123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- package hookexecutor
- import (
- "bytes"
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "log"
- "net"
- "os"
- "time"
- "github.com/kpmy/xippo/c2s/stream"
- "github.com/kpmy/xippo/entity"
- "github.com/ugorji/go/codec"
- )
- const (
- DefaultAddr = "0.0.0.0:1984"
- DefaultJSONAddr = "0.0.0.0:1985"
- DefaultInboxBufferSize = 4
- DefaultOutboxBufferSize = 4
- DefaultClientBufferSize = 8
- DefaultHeartbeatTrigger = 5 * time.Second
- DefaultHeartbeatTimeout = 10 * time.Second
- DefaultMessageLengthCap = 4 * 1024
- DefaultDelayForMessages = 1 * time.Second
- )
- type IncomingEvent struct {
- Type string
- Data map[string]string
- }
- type Message struct {
- *IncomingEvent
- ID int
- }
- type MessageWriter func(conn net.Conn, timeout time.Duration, msg *Message) error
- type MessageReader func(conn net.Conn, timeout time.Duration) (*Message, error)
- type clientReply struct {
- outbox chan *Message
- info *clientInfo
- }
- type clientInfo struct {
- inbox chan *Message
- stop chan struct{}
- }
- type Executor struct {
- listener net.Listener
- jsonListener net.Listener
- xmppStream stream.Stream
- logger *log.Logger
- inbox chan *IncomingEvent
- outbox chan *Message
- cmdInbox chan string
- clientRequests chan chan clientReply
- stop chan struct{}
- clients []*clientInfo
- counter int
- }
- func NewExecutor(s stream.Stream) *Executor {
- return &Executor{
- nil,
- nil,
- s,
- log.New(os.Stderr, "[hookexecutor] ", log.LstdFlags),
- make(chan *IncomingEvent, DefaultInboxBufferSize),
- make(chan *Message, DefaultOutboxBufferSize),
- make(chan string, DefaultInboxBufferSize),
- make(chan chan clientReply, DefaultInboxBufferSize),
- make(chan struct{}),
- nil,
- 0,
- }
- }
- func (exc *Executor) Start() {
- go exc.ListenAndServe(DefaultAddr)
- go exc.ListenAndServeJSON(DefaultJSONAddr)
- go exc.processEvents()
- go exc.sendToBot()
- }
- func (exc *Executor) Stop() {
- close(exc.inbox)
- close(exc.cmdInbox)
- close(exc.stop)
- }
- func (exc *Executor) Run(cmd string) {
- exc.cmdInbox <- cmd
- }
- func (exc *Executor) NewEvent(e IncomingEvent) {
- select {
- case exc.inbox <- &e:
- default:
- log.Println("executor is blocked")
- }
- }
- func stopPanic(exc *Executor, where string, callback func(err error)) {
- if err := recover(); err != nil {
- exc.logger.Printf("catched panic in %s: %s", where, err)
- if callback != nil {
- if realErr, ok := err.(error); ok {
- go callback(realErr)
- } else {
- go callback(fmt.Errorf("%v", err))
- }
- }
- }
- }
- func (exc *Executor) ListenAndServe(addr string) {
- defer stopPanic(exc, "listener", func(_ error) { exc.ListenAndServe(addr) })
- listener, err := net.Listen("tcp", addr)
- if err != nil {
- exc.logger.Printf("failed to start listener, hooker disabled: %v", err)
- return
- }
- defer listener.Close()
- for {
- conn, err := listener.Accept()
- if err != nil {
- exc.logger.Printf("failed to accept new connection: %v", err)
- return
- }
- inbox, outbox := exc.createClient()
- stop := make(chan struct{})
- errors := make(chan error, 2)
- go exc.clientWriter(inbox, WriteMessage, conn, errors, stop)
- go exc.clientReader(outbox, ReadMessage, conn, errors, stop)
- go exc.stopOnError(stop, errors)
- }
- }
- func (exc *Executor) ListenAndServeJSON(addr string) {
- defer stopPanic(exc, "listener", func(_ error) { exc.ListenAndServeJSON(addr) })
- listener, err := net.Listen("tcp", addr)
- if err != nil {
- exc.logger.Printf("failed to start listener, hooker disabled: %v", err)
- return
- }
- defer listener.Close()
- for {
- conn, err := listener.Accept()
- if err != nil {
- exc.logger.Printf("failed to accept new connection: %v", err)
- return
- }
- inbox, outbox := exc.createClient()
- stop := make(chan struct{})
- errors := make(chan error, 2)
- go exc.clientWriter(inbox, WriteJSONMessage, conn, errors, stop)
- go exc.clientReader(outbox, ReadJSONMessage, conn, errors, stop)
- go exc.stopOnError(stop, errors)
- }
- }
- func (exc *Executor) clientWriter(inbox chan *Message, write MessageWriter, conn net.Conn, errors chan error, stop chan struct{}) {
- defer stopPanic(exc, "clientWriter",
- func(err error) {
- exc.logger.Printf("catched panic in writer: %v", err)
- errors <- err
- })
- defer conn.Close()
- heartbeatTicker := time.NewTicker(DefaultHeartbeatTrigger)
- defer heartbeatTicker.Stop()
- for {
- select {
- case msg, ok := <-inbox:
- if !ok {
- close(stop)
- return
- }
- err := write(conn, DefaultHeartbeatTimeout, msg)
- if err != nil {
- exc.logger.Printf("failed to write message: %v", err)
- errors <- err
- return
- }
- case <-heartbeatTicker.C:
- ping := &Message{&IncomingEvent{"ping", nil}, -1}
- err := write(conn, DefaultHeartbeatTimeout, ping)
- if err != nil {
- exc.logger.Printf("failed to write ping message: %v", err)
- errors <- err
- return
- }
- case <-stop:
- return
- }
- }
- }
- func (exc *Executor) clientReader(outbox chan *Message, read MessageReader, conn net.Conn, errors chan error, stop chan struct{}) {
- defer stopPanic(exc, "clientReader",
- func(err error) {
- exc.logger.Printf("catched panic in reader: %v", err)
- errors <- err
- })
- defer conn.Close()
- for {
- msg, err := read(conn, DefaultHeartbeatTimeout)
- if err != nil {
- exc.logger.Printf("failed to read message: %v", err)
- errors <- err
- return
- }
- if msg.Type == "pong" {
- // ignore pongs, they are for resetting timeouts
- continue
- }
- select {
- case outbox <- msg:
- case <-stop:
- return
- }
- }
- }
- func (exc *Executor) stopOnError(stop chan struct{}, errors chan error) {
- defer stopPanic(exc, "stopper", nil)
- <-errors
- close(stop)
- }
- func ReadMessage(conn net.Conn, timeout time.Duration) (*Message, error) {
- conn.SetReadDeadline(time.Now().Add(DefaultHeartbeatTimeout))
- var lengthBuf [2]byte
- _, err := conn.Read(lengthBuf[:])
- if err != nil {
- return nil, err
- }
- length := int(binary.BigEndian.Uint16(lengthBuf[:]))
- if length > DefaultMessageLengthCap {
- return nil, errors.New("message is too long")
- }
- buf := bytes.NewBuffer(make([]byte, 0, length))
- _, err = io.CopyN(buf, conn, int64(length))
- if err != nil {
- return nil, err
- }
- var handle = &codec.MsgpackHandle{}
- var decoder = codec.NewDecoder(buf, handle)
- var result = &Message{}
- err = decoder.Decode(result)
- if err != nil {
- return nil, err
- }
- return result, nil
- }
- func ReadJSONMessage(conn net.Conn, timeout time.Duration) (*Message, error) {
- conn.SetReadDeadline(time.Now().Add(DefaultHeartbeatTimeout))
- var handle = &codec.JsonHandle{}
- var decoder = codec.NewDecoder(conn, handle)
- var result = &Message{}
- err := decoder.Decode(result)
- if err != nil {
- return nil, err
- }
- return result, nil
- }
- func WriteMessage(conn net.Conn, timeout time.Duration, msg *Message) error {
- var handle = &codec.MsgpackHandle{}
- var buf []byte
- var encoder = codec.NewEncoderBytes(&buf, handle)
- err := encoder.Encode(msg)
- if err != nil {
- return err
- }
- length := len(buf)
- if length > DefaultMessageLengthCap {
- return errors.New("message is too long")
- }
- var lengthBuf [2]byte
- binary.BigEndian.PutUint16(lengthBuf[:], uint16(length))
- conn.SetWriteDeadline(time.Now().Add(timeout))
- _, err = conn.Write(lengthBuf[:])
- if err != nil {
- return err
- }
- _, err = io.Copy(conn, bytes.NewBuffer(buf))
- return err
- }
- func WriteJSONMessage(conn net.Conn, timeout time.Duration, msg *Message) error {
- var handle = &codec.JsonHandle{}
- var buf []byte
- var encoder = codec.NewEncoderBytes(&buf, handle)
- err := encoder.Encode(msg)
- if err != nil {
- return err
- }
- conn.SetWriteDeadline(time.Now().Add(timeout))
- _, err = io.Copy(conn, bytes.NewBuffer(buf))
- if err != nil {
- return err
- }
- _, err = conn.Write([]byte{'\n'})
- return err
- }
- func (exc *Executor) createClient() (inbox, outbox chan *Message) {
- reply := make(chan clientReply, 1)
- exc.clientRequests <- reply
- r := <-reply
- return r.info.inbox, r.outbox
- }
- func (exc *Executor) processEvents() {
- defer stopPanic(exc, "processEvents", func(_ error) { exc.processEvents() })
- for {
- select {
- case msg := <-exc.inbox:
- message := &Message{msg, exc.counter}
- exc.sendMessage(message)
- exc.counter++
- case cmd := <-exc.cmdInbox:
- // TODO(mechmind): handle cmds
- exc.logger.Printf("ignoring cmd: '%s'", cmd)
- case req := <-exc.clientRequests:
- outbox := exc.outbox
- info := &clientInfo{
- inbox: make(chan *Message, DefaultClientBufferSize),
- stop: make(chan struct{}),
- }
- exc.clients = append(exc.clients, info)
- req <- clientReply{outbox, info}
- }
- }
- }
- func (exc *Executor) sendMessage(msg *Message) {
- deadClientIDs := []int{}
- for idx, ch := range exc.clients {
- select {
- case ch.inbox <- msg:
- default:
- deadClientIDs = append(deadClientIDs, idx)
- }
- }
- if len(deadClientIDs) == 0 {
- return
- }
- aliveClients := make([]*clientInfo, 0, len(exc.clients)-len(deadClientIDs))
- currentID := 0
- for idx, client := range exc.clients {
- if currentID < len(deadClientIDs) && idx == deadClientIDs[currentID] {
- // client is dead, drop him
- close(client.inbox)
- currentID++
- } else {
- // client alive, take him
- aliveClients = append(aliveClients, client)
- }
- }
- exc.clients = aliveClients
- }
- func (exc *Executor) sendToBot() {
- defer stopPanic(exc, "sendToBot", func(_ error) { go exc.sendToBot() })
- for {
- select {
- case msg := <-exc.outbox:
- exc.SendMessageToBot(msg)
- time.Sleep(DefaultDelayForMessages)
- case <-exc.stop:
- return
- }
- }
- }
- func (exc *Executor) SendMessageToBot(msg *Message) {
- m := entity.MSG(entity.GROUPCHAT)
- m.To = "golang@conference.jabber.ru"
- m.Body = msg.IncomingEvent.Data["body"]
- err := exc.xmppStream.Write(entity.ProduceStatic(m))
- if err != nil {
- exc.logger.Printf("failed to write message to xmpp stream: %v", err)
- }
- }
|