executor.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package luaexecutor
  2. import (
  3. "fmt"
  4. "github.com/kpmy/go-lua"
  5. "github.com/kpmy/xippo/c2s/stream"
  6. "github.com/kpmy/xippo/entity"
  7. "sync"
  8. "time"
  9. )
  10. const sleepDuration time.Duration = 1 * time.Second
  11. const callbackLocation string = "chatclbk"
  12. // An utility struct for incoming messages.
  13. type IncomingMessage struct {
  14. Sender string
  15. Body string
  16. }
  17. // Executor executes Lua scripts in a shared Lua VM.
  18. type Executor struct {
  19. incomingScripts chan string
  20. outgoingMsgs chan string
  21. incomingMsgs chan IncomingMessage
  22. stateMutex sync.Mutex
  23. state *lua.State
  24. xmppStream stream.Stream
  25. }
  26. func NewExecutor(s stream.Stream) *Executor {
  27. e := &Executor{
  28. incomingScripts: make(chan string),
  29. outgoingMsgs: make(chan string),
  30. incomingMsgs: make(chan IncomingMessage),
  31. }
  32. e.xmppStream = s
  33. e.state = lua.NewState()
  34. lua.OpenLibraries(e.state)
  35. send := func(l *lua.State) int {
  36. str, _ := l.ToString(1)
  37. e.outgoingMsgs <- str
  38. return 0
  39. }
  40. registerClbk := func(l *lua.State) int {
  41. l.PushString(callbackLocation)
  42. l.PushValue(-2)
  43. l.SetTable(lua.RegistryIndex)
  44. return 0
  45. }
  46. var chatLibrary = []lua.RegistryFunction{
  47. lua.RegistryFunction{"send", send},
  48. lua.RegistryFunction{"onmessage", registerClbk},
  49. }
  50. lua.NewLibrary(e.state, chatLibrary)
  51. e.state.SetGlobal("chat")
  52. return e
  53. }
  54. func (e *Executor) execute() {
  55. for script := range e.incomingScripts {
  56. e.stateMutex.Lock()
  57. err := lua.DoString(e.state, script)
  58. if err != nil {
  59. fmt.Printf("lua fucking shit error: %s\n", err)
  60. m := entity.MSG(entity.GROUPCHAT)
  61. m.To = "golang@conference.jabber.ru"
  62. m.Body = err.Error()
  63. e.xmppStream.Write(entity.ProduceStatic(m))
  64. }
  65. e.stateMutex.Unlock()
  66. }
  67. }
  68. func (e *Executor) sendingRoutine() {
  69. for msg := range e.outgoingMsgs {
  70. m := entity.MSG(entity.GROUPCHAT)
  71. m.To = "golang@conference.jabber.ru"
  72. m.Body = msg
  73. err := e.xmppStream.Write(entity.ProduceStatic(m))
  74. if err != nil {
  75. fmt.Printf("send error: %s", err)
  76. }
  77. time.Sleep(sleepDuration)
  78. }
  79. }
  80. func (e *Executor) processIncomingMsgs() {
  81. for msg := range e.incomingMsgs {
  82. e.stateMutex.Lock()
  83. e.state.PushString(callbackLocation)
  84. e.state.Table(lua.RegistryIndex)
  85. if e.state.IsFunction(-1) {
  86. e.state.PushString(msg.Sender)
  87. e.state.PushString(msg.Body)
  88. err := e.state.ProtectedCall(2, 0, 0)
  89. if err != nil {
  90. m := entity.MSG(entity.GROUPCHAT)
  91. m.To = "golang@conference.jabber.ru"
  92. m.Body, _ = e.state.ToString(-1)
  93. e.xmppStream.Write(entity.ProduceStatic(m))
  94. }
  95. } else {
  96. e.state.Pop(1)
  97. }
  98. e.stateMutex.Unlock()
  99. }
  100. }
  101. func (e *Executor) Start() {
  102. go e.sendingRoutine()
  103. go e.execute()
  104. go e.processIncomingMsgs()
  105. }
  106. func (e *Executor) Stop() {
  107. close(e.incomingScripts)
  108. close(e.incomingMsgs)
  109. close(e.outgoingMsgs)
  110. }
  111. func (e *Executor) Run(script string) {
  112. e.incomingScripts <- script
  113. }
  114. // Call this on every incoming message - it's required for
  115. // chat.onmessage to work.
  116. func (e *Executor) NewMessage(msg IncomingMessage) {
  117. select {
  118. case e.incomingMsgs <- msg:
  119. default:
  120. }
  121. }