executor.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package luaexecutor
  2. import (
  3. "fmt"
  4. "github.com/Shopify/go-lua"
  5. "github.com/kpmy/xippo/c2s/stream"
  6. "github.com/kpmy/xippo/entity"
  7. "path/filepath"
  8. "sync"
  9. "time"
  10. )
  11. const sleepDuration time.Duration = 1 * time.Second
  12. const callbacksLocation string = "clbks"
  13. // An utility struct for incoming events.
  14. type IncomingEvent struct {
  15. Type string
  16. Data map[string]string
  17. }
  18. // Executor executes Lua scripts in a shared Lua VM.
  19. type Executor struct {
  20. incomingScripts chan string
  21. outgoingMsgs chan string
  22. incomingEvents chan IncomingEvent
  23. stateMutex sync.Mutex
  24. state *lua.State
  25. xmppStream stream.Stream
  26. }
  27. func NewExecutor(s stream.Stream) *Executor {
  28. e := &Executor{
  29. incomingScripts: make(chan string),
  30. outgoingMsgs: make(chan string),
  31. incomingEvents: make(chan IncomingEvent),
  32. }
  33. e.xmppStream = s
  34. e.state = lua.NewState()
  35. lua.OpenLibraries(e.state)
  36. send := func(l *lua.State) int {
  37. str, _ := l.ToString(1)
  38. e.outgoingMsgs <- str
  39. return 0
  40. }
  41. registerClbk := func(l *lua.State) int {
  42. // get events table
  43. l.PushString(callbacksLocation)
  44. l.Table(lua.RegistryIndex)
  45. // get callbacks table
  46. evtName := lua.CheckString(l, 1)
  47. l.PushString(evtName)
  48. l.Table(-2)
  49. // create new table if one doesn't exist
  50. if l.IsNil(-1) {
  51. l.Pop(1)
  52. // create and add table to the events table
  53. l.PushString(evtName)
  54. l.NewTable()
  55. l.SetTable(-3)
  56. // and get it back
  57. l.PushString(evtName)
  58. l.Table(-2)
  59. }
  60. // set callback
  61. l.PushValue(2)
  62. l.PushValue(3)
  63. l.SetTable(-3)
  64. l.SetTop(0)
  65. return 0
  66. }
  67. listClbks := func(l *lua.State) int {
  68. clbkNames := []string{}
  69. // get events table
  70. l.PushString(callbacksLocation)
  71. l.Table(lua.RegistryIndex)
  72. // get callbacks for the event
  73. evtName := lua.CheckString(l, 1)
  74. l.PushString(evtName)
  75. l.Table(-2)
  76. if !l.IsNil(-1) {
  77. // loop
  78. l.PushNil()
  79. for l.Next(-2) {
  80. key, ok := l.ToString(-2)
  81. // ignore non-string shit
  82. if ok {
  83. clbkNames = append(clbkNames, key)
  84. }
  85. l.Pop(1)
  86. }
  87. l.SetTop(0)
  88. l.NewTable()
  89. // build a list from callback names
  90. for i, key := range clbkNames {
  91. l.PushInteger(i + 1)
  92. l.PushString(key)
  93. l.SetTable(-3)
  94. }
  95. } else {
  96. l.SetTop(0)
  97. l.PushNil()
  98. }
  99. return 1
  100. }
  101. var chatLibrary = []lua.RegistryFunction{
  102. lua.RegistryFunction{"send", send},
  103. lua.RegistryFunction{"addEventHandler", registerClbk},
  104. lua.RegistryFunction{"listEventHandlers", listClbks},
  105. }
  106. lua.NewLibrary(e.state, chatLibrary)
  107. e.state.SetGlobal("chat")
  108. // set up callbacks table
  109. e.state.PushString(callbacksLocation)
  110. e.state.NewTable()
  111. e.state.SetTable(lua.RegistryIndex)
  112. lua.DoFile(e.state, filepath.Join("static", "bootstrap.lua"))
  113. return e
  114. }
  115. func (e *Executor) execute() {
  116. for script := range e.incomingScripts {
  117. e.stateMutex.Lock()
  118. err := lua.DoString(e.state, script)
  119. if err != nil {
  120. fmt.Printf("lua fucking shit error: %s\n", err)
  121. m := entity.MSG(entity.GROUPCHAT)
  122. m.To = "golang@conference.jabber.ru"
  123. m.Body = err.Error()
  124. e.xmppStream.Write(entity.ProduceStatic(m))
  125. }
  126. e.stateMutex.Unlock()
  127. }
  128. }
  129. func (e *Executor) sendingRoutine() {
  130. for msg := range e.outgoingMsgs {
  131. m := entity.MSG(entity.GROUPCHAT)
  132. m.To = "golang@conference.jabber.ru"
  133. m.Body = msg
  134. err := e.xmppStream.Write(entity.ProduceStatic(m))
  135. if err != nil {
  136. fmt.Printf("send error: %s", err)
  137. }
  138. time.Sleep(sleepDuration)
  139. }
  140. }
  141. func (e *Executor) processIncomingEvents() {
  142. for evt := range e.incomingEvents {
  143. e.stateMutex.Lock()
  144. // get events table
  145. e.state.PushString(callbacksLocation)
  146. e.state.Table(lua.RegistryIndex)
  147. // get callbacks table for the specific event
  148. e.state.PushString(evt.Type)
  149. e.state.Table(-2)
  150. // loop over callbacks
  151. if !e.state.IsNil(-1) {
  152. e.state.PushNil()
  153. for e.state.Next(-2) {
  154. if e.state.IsFunction(-1) {
  155. // create the table which will be passed to the handler
  156. e.state.NewTable()
  157. // loop over the event data, populating the table
  158. for k, v := range evt.Data {
  159. e.state.PushString(k)
  160. e.state.PushString(v)
  161. e.state.SetTable(-3)
  162. }
  163. err := e.state.ProtectedCall(1, 0, 0)
  164. if err != nil {
  165. m := entity.MSG(entity.GROUPCHAT)
  166. m.To = "golang@conference.jabber.ru"
  167. m.Body, _ = e.state.ToString(-1)
  168. e.xmppStream.Write(entity.ProduceStatic(m))
  169. e.state.Pop(1)
  170. }
  171. } else {
  172. e.state.Pop(1)
  173. }
  174. }
  175. }
  176. // pop callbacks table or nil value
  177. e.state.Pop(1)
  178. e.stateMutex.Unlock()
  179. }
  180. }
  181. func (e *Executor) Start() {
  182. go e.sendingRoutine()
  183. go e.execute()
  184. go e.processIncomingEvents()
  185. }
  186. func (e *Executor) Stop() {
  187. close(e.incomingScripts)
  188. close(e.incomingEvents)
  189. close(e.outgoingMsgs)
  190. }
  191. func (e *Executor) Run(script string) {
  192. e.incomingScripts <- script
  193. }
  194. // Call this on every event - it's required for event handlers to work
  195. func (e *Executor) NewEvent(evt IncomingEvent) {
  196. e.incomingEvents <- evt
  197. }