executor.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package luaexecutor
  2. import (
  3. "fmt"
  4. "github.com/kpmy/go-lua"
  5. "github.com/kpmy/xippo/c2s/stream"
  6. "github.com/kpmy/xippo/entity"
  7. "path/filepath"
  8. "sync"
  9. "time"
  10. )
  11. const sleepDuration time.Duration = 1 * time.Second
  12. const callbacksLocation string = "chatclbks"
  13. // An utility struct for incoming messages.
  14. type IncomingMessage struct {
  15. Sender string
  16. Body string
  17. }
  18. // Executor executes Lua scripts in a shared Lua VM.
  19. type Executor struct {
  20. incomingScripts chan string
  21. outgoingMsgs chan string
  22. incomingMsgs chan IncomingMessage
  23. stateMutex sync.Mutex
  24. state *lua.State
  25. xmppStream stream.Stream
  26. }
  27. func NewExecutor(s stream.Stream) *Executor {
  28. e := &Executor{
  29. incomingScripts: make(chan string),
  30. outgoingMsgs: make(chan string),
  31. incomingMsgs: make(chan IncomingMessage),
  32. }
  33. e.xmppStream = s
  34. e.state = lua.NewState()
  35. lua.OpenLibraries(e.state)
  36. send := func(l *lua.State) int {
  37. str, _ := l.ToString(1)
  38. e.outgoingMsgs <- str
  39. return 0
  40. }
  41. registerClbk := func(l *lua.State) int {
  42. // get callbacks table
  43. l.PushString(callbacksLocation)
  44. l.Table(lua.RegistryIndex)
  45. // set callback
  46. l.PushValue(1)
  47. l.PushValue(2)
  48. l.SetTable(-3)
  49. l.SetTop(0)
  50. return 0
  51. }
  52. listClbks := func(l *lua.State) int {
  53. clbkNames := []string{}
  54. // get callbacks table
  55. l.PushString(callbacksLocation)
  56. l.Table(lua.RegistryIndex)
  57. // loop
  58. l.PushNil()
  59. for l.Next(-2) {
  60. key, ok := l.ToString(-2)
  61. // ignore non-string shit
  62. if ok {
  63. clbkNames = append(clbkNames, key)
  64. }
  65. l.Pop(1)
  66. }
  67. l.SetTop(0)
  68. l.NewTable()
  69. // build a list from callback names
  70. for i, key := range clbkNames {
  71. l.PushInteger(i + 1)
  72. l.PushString(key)
  73. l.SetTable(-3)
  74. }
  75. return 1
  76. }
  77. var chatLibrary = []lua.RegistryFunction{
  78. lua.RegistryFunction{"send", send},
  79. lua.RegistryFunction{"onmessage", registerClbk},
  80. lua.RegistryFunction{"listmsghandlers", listClbks},
  81. }
  82. lua.NewLibrary(e.state, chatLibrary)
  83. e.state.SetGlobal("chat")
  84. // set up callbacks table
  85. e.state.PushString(callbacksLocation)
  86. e.state.NewTable()
  87. e.state.SetTable(lua.RegistryIndex)
  88. lua.DoFile(e.state, filepath.Join("static", "bootstrap.lua"))
  89. return e
  90. }
  91. func (e *Executor) execute() {
  92. for script := range e.incomingScripts {
  93. e.stateMutex.Lock()
  94. err := lua.DoString(e.state, script)
  95. if err != nil {
  96. fmt.Printf("lua fucking shit error: %s\n", err)
  97. m := entity.MSG(entity.GROUPCHAT)
  98. m.To = "golang@conference.jabber.ru"
  99. m.Body = err.Error()
  100. e.xmppStream.Write(entity.ProduceStatic(m))
  101. }
  102. e.stateMutex.Unlock()
  103. }
  104. }
  105. func (e *Executor) sendingRoutine() {
  106. for msg := range e.outgoingMsgs {
  107. m := entity.MSG(entity.GROUPCHAT)
  108. m.To = "golang@conference.jabber.ru"
  109. m.Body = msg
  110. err := e.xmppStream.Write(entity.ProduceStatic(m))
  111. if err != nil {
  112. fmt.Printf("send error: %s", err)
  113. }
  114. time.Sleep(sleepDuration)
  115. }
  116. }
  117. func (e *Executor) processIncomingMsgs() {
  118. for msg := range e.incomingMsgs {
  119. e.stateMutex.Lock()
  120. // get callbacks table
  121. e.state.PushString(callbacksLocation)
  122. e.state.Table(lua.RegistryIndex)
  123. // loop over callbacks
  124. e.state.PushNil()
  125. for e.state.Next(-2) {
  126. if e.state.IsFunction(-1) {
  127. e.state.PushString(msg.Sender)
  128. e.state.PushString(msg.Body)
  129. err := e.state.ProtectedCall(2, 0, 0)
  130. if err != nil {
  131. m := entity.MSG(entity.GROUPCHAT)
  132. m.To = "golang@conference.jabber.ru"
  133. m.Body, _ = e.state.ToString(-1)
  134. e.xmppStream.Write(entity.ProduceStatic(m))
  135. e.state.Pop(1)
  136. }
  137. } else {
  138. e.state.Pop(1)
  139. }
  140. }
  141. // pop callbacks table
  142. e.state.Pop(1)
  143. e.stateMutex.Unlock()
  144. }
  145. }
  146. func (e *Executor) Start() {
  147. go e.sendingRoutine()
  148. go e.execute()
  149. go e.processIncomingMsgs()
  150. }
  151. func (e *Executor) Stop() {
  152. close(e.incomingScripts)
  153. close(e.incomingMsgs)
  154. close(e.outgoingMsgs)
  155. }
  156. func (e *Executor) Run(script string) {
  157. e.incomingScripts <- script
  158. }
  159. // Call this on every incoming message - it's required for
  160. // chat.onmessage to work.
  161. func (e *Executor) NewMessage(msg IncomingMessage) {
  162. e.incomingMsgs <- msg
  163. }