executor.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package luaexecutor
  2. import (
  3. "fmt"
  4. "github.com/kpmy/go-lua"
  5. "github.com/kpmy/xippo/c2s/stream"
  6. "github.com/kpmy/xippo/entity"
  7. "sync"
  8. "time"
  9. )
  10. const sleepDuration time.Duration = 1 * time.Second
  11. const callbacksLocation string = "chatclbks"
  12. // An utility struct for incoming messages.
  13. type IncomingMessage struct {
  14. Sender string
  15. Body string
  16. }
  17. // Executor executes Lua scripts in a shared Lua VM.
  18. type Executor struct {
  19. incomingScripts chan string
  20. outgoingMsgs chan string
  21. incomingMsgs chan IncomingMessage
  22. stateMutex sync.Mutex
  23. state *lua.State
  24. xmppStream stream.Stream
  25. }
  26. func NewExecutor(s stream.Stream) *Executor {
  27. e := &Executor{
  28. incomingScripts: make(chan string),
  29. outgoingMsgs: make(chan string),
  30. incomingMsgs: make(chan IncomingMessage),
  31. }
  32. e.xmppStream = s
  33. e.state = lua.NewState()
  34. lua.OpenLibraries(e.state)
  35. send := func(l *lua.State) int {
  36. str, _ := l.ToString(1)
  37. e.outgoingMsgs <- str
  38. return 0
  39. }
  40. registerClbk := func(l *lua.State) int {
  41. // get callbacks table
  42. l.PushString(callbacksLocation)
  43. l.Table(lua.RegistryIndex)
  44. // set callback
  45. l.PushValue(1)
  46. l.PushValue(2)
  47. l.SetTable(-3)
  48. l.SetTop(0)
  49. return 0
  50. }
  51. listClbks := func(l *lua.State) int {
  52. clbkNames := []string{}
  53. // get callbacks table
  54. l.PushString(callbacksLocation)
  55. l.Table(lua.RegistryIndex)
  56. // loop
  57. l.PushNil()
  58. for l.Next(-2) {
  59. key, ok := l.ToString(-2)
  60. // ignore non-string shit
  61. if ok {
  62. clbkNames = append(clbkNames, key)
  63. }
  64. l.Pop(1)
  65. }
  66. l.SetTop(0)
  67. l.NewTable()
  68. // build a list from callback names
  69. for i, key := range clbkNames {
  70. l.PushInteger(i + 1)
  71. l.PushString(key)
  72. l.SetTable(-3)
  73. }
  74. return 1
  75. }
  76. var chatLibrary = []lua.RegistryFunction{
  77. lua.RegistryFunction{"send", send},
  78. lua.RegistryFunction{"onmessage", registerClbk},
  79. lua.RegistryFunction{"listmsghandlers", listClbks},
  80. }
  81. lua.NewLibrary(e.state, chatLibrary)
  82. e.state.SetGlobal("chat")
  83. // set up callbacks table
  84. e.state.PushString(callbacksLocation)
  85. e.state.NewTable()
  86. e.state.SetTable(lua.RegistryIndex)
  87. return e
  88. }
  89. func (e *Executor) execute() {
  90. for script := range e.incomingScripts {
  91. e.stateMutex.Lock()
  92. err := lua.DoString(e.state, script)
  93. if err != nil {
  94. fmt.Printf("lua fucking shit error: %s\n", err)
  95. m := entity.MSG(entity.GROUPCHAT)
  96. m.To = "golang@conference.jabber.ru"
  97. m.Body = err.Error()
  98. e.xmppStream.Write(entity.ProduceStatic(m))
  99. }
  100. e.stateMutex.Unlock()
  101. }
  102. }
  103. func (e *Executor) sendingRoutine() {
  104. for msg := range e.outgoingMsgs {
  105. m := entity.MSG(entity.GROUPCHAT)
  106. m.To = "golang@conference.jabber.ru"
  107. m.Body = msg
  108. err := e.xmppStream.Write(entity.ProduceStatic(m))
  109. if err != nil {
  110. fmt.Printf("send error: %s", err)
  111. }
  112. time.Sleep(sleepDuration)
  113. }
  114. }
  115. func (e *Executor) processIncomingMsgs() {
  116. for msg := range e.incomingMsgs {
  117. e.stateMutex.Lock()
  118. // get callbacks table
  119. e.state.PushString(callbacksLocation)
  120. e.state.Table(lua.RegistryIndex)
  121. // loop over callbacks
  122. e.state.PushNil()
  123. for e.state.Next(-2) {
  124. if e.state.IsFunction(-1) {
  125. e.state.PushString(msg.Sender)
  126. e.state.PushString(msg.Body)
  127. err := e.state.ProtectedCall(2, 0, 0)
  128. if err != nil {
  129. m := entity.MSG(entity.GROUPCHAT)
  130. m.To = "golang@conference.jabber.ru"
  131. m.Body, _ = e.state.ToString(-1)
  132. e.xmppStream.Write(entity.ProduceStatic(m))
  133. e.state.Pop(1)
  134. }
  135. } else {
  136. e.state.Pop(1)
  137. }
  138. }
  139. // pop callbacks table
  140. e.state.Pop(1)
  141. e.stateMutex.Unlock()
  142. }
  143. }
  144. func (e *Executor) Start() {
  145. go e.sendingRoutine()
  146. go e.execute()
  147. go e.processIncomingMsgs()
  148. }
  149. func (e *Executor) Stop() {
  150. close(e.incomingScripts)
  151. close(e.incomingMsgs)
  152. close(e.outgoingMsgs)
  153. }
  154. func (e *Executor) Run(script string) {
  155. e.incomingScripts <- script
  156. }
  157. // Call this on every incoming message - it's required for
  158. // chat.onmessage to work.
  159. func (e *Executor) NewMessage(msg IncomingMessage) {
  160. e.incomingMsgs <- msg
  161. }