Browse Source

Merge pull request #9 from mechmind/hookexecutor-jsonserver

json server for hook executor
kpmy 9 years ago
parent
commit
164f4fc063
1 changed files with 75 additions and 10 deletions
  1. 75 10
      hookexecutor/executor.go

+ 75 - 10
hookexecutor/executor.go

@@ -18,6 +18,7 @@ import (
 
 const (
 	DefaultAddr             = "0.0.0.0:1984"
+	DefaultJSONAddr         = "0.0.0.0:1985"
 	DefaultInboxBufferSize  = 4
 	DefaultOutboxBufferSize = 4
 	DefaultClientBufferSize = 8
@@ -36,6 +37,9 @@ type Message struct {
 	ID int
 }
 
+type MessageWriter func(conn net.Conn, timeout time.Duration, msg *Message) error
+type MessageReader func(conn net.Conn, timeout time.Duration) (*Message, error)
+
 type clientReply struct {
 	outbox chan *Message
 	info   *clientInfo
@@ -47,9 +51,10 @@ type clientInfo struct {
 }
 
 type Executor struct {
-	listener   net.Listener
-	xmppStream stream.Stream
-	logger     *log.Logger
+	listener     net.Listener
+	jsonListener net.Listener
+	xmppStream   stream.Stream
+	logger       *log.Logger
 
 	inbox          chan *IncomingEvent
 	outbox         chan *Message
@@ -62,6 +67,7 @@ type Executor struct {
 
 func NewExecutor(s stream.Stream) *Executor {
 	return &Executor{
+		nil,
 		nil,
 		s,
 		log.New(os.Stderr, "[hookexecutor] ", log.LstdFlags),
@@ -76,6 +82,7 @@ func NewExecutor(s stream.Stream) *Executor {
 
 func (exc *Executor) Start() {
 	go exc.ListenAndServe(DefaultAddr)
+	go exc.ListenAndServeJSON(DefaultJSONAddr)
 	go exc.processEvents()
 }
 
@@ -129,13 +136,39 @@ func (exc *Executor) ListenAndServe(addr string) {
 		inbox, outbox := exc.createClient()
 		stop := make(chan struct{})
 		errors := make(chan error, 2)
-		go exc.clientWriter(inbox, conn, errors, stop)
-		go exc.clientReader(outbox, conn, errors, stop)
+		go exc.clientWriter(inbox, WriteMessage, conn, errors, stop)
+		go exc.clientReader(outbox, ReadMessage, conn, errors, stop)
 		go exc.stopOnError(stop, errors)
 	}
 }
 
-func (exc *Executor) clientWriter(inbox chan *Message, conn net.Conn, errors chan error, stop chan struct{}) {
+func (exc *Executor) ListenAndServeJSON(addr string) {
+	defer stopPanic(exc, "listener", func(_ error) { exc.ListenAndServeJSON(addr) })
+
+	listener, err := net.Listen("tcp", addr)
+	if err != nil {
+		exc.logger.Printf("failed to start listener, hooker disabled: %v", err)
+		return
+	}
+	defer listener.Close()
+
+	for {
+		conn, err := listener.Accept()
+		if err != nil {
+			exc.logger.Printf("failed to accept new connection: %v", err)
+			return
+		}
+
+		inbox, outbox := exc.createClient()
+		stop := make(chan struct{})
+		errors := make(chan error, 2)
+		go exc.clientWriter(inbox, WriteJSONMessage, conn, errors, stop)
+		go exc.clientReader(outbox, ReadJSONMessage, conn, errors, stop)
+		go exc.stopOnError(stop, errors)
+	}
+}
+
+func (exc *Executor) clientWriter(inbox chan *Message, write MessageWriter, conn net.Conn, errors chan error, stop chan struct{}) {
 	defer stopPanic(exc, "clientWriter",
 		func(err error) {
 			exc.logger.Printf("catched panic in writer: %v", err)
@@ -155,7 +188,7 @@ func (exc *Executor) clientWriter(inbox chan *Message, conn net.Conn, errors cha
 				return
 			}
 
-			err := WriteMessage(conn, DefaultHeartbeatTimeout, msg)
+			err := write(conn, DefaultHeartbeatTimeout, msg)
 			if err != nil {
 				exc.logger.Printf("failed to write message: %v", err)
 				errors <- err
@@ -163,7 +196,7 @@ func (exc *Executor) clientWriter(inbox chan *Message, conn net.Conn, errors cha
 			}
 		case <-heartbeatTicker.C:
 			ping := &Message{&IncomingEvent{"ping", nil}, -1}
-			err := WriteMessage(conn, DefaultHeartbeatTimeout, ping)
+			err := write(conn, DefaultHeartbeatTimeout, ping)
 			if err != nil {
 				exc.logger.Printf("failed to write ping message: %v", err)
 				errors <- err
@@ -175,7 +208,7 @@ func (exc *Executor) clientWriter(inbox chan *Message, conn net.Conn, errors cha
 	}
 }
 
-func (exc *Executor) clientReader(outbox chan *Message, conn net.Conn, errors chan error, stop chan struct{}) {
+func (exc *Executor) clientReader(outbox chan *Message, read MessageReader, conn net.Conn, errors chan error, stop chan struct{}) {
 	defer stopPanic(exc, "clientReader",
 		func(err error) {
 			exc.logger.Printf("catched panic in reader: %v", err)
@@ -184,7 +217,7 @@ func (exc *Executor) clientReader(outbox chan *Message, conn net.Conn, errors ch
 	defer conn.Close()
 
 	for {
-		msg, err := ReadMessage(conn, DefaultHeartbeatTimeout)
+		msg, err := read(conn, DefaultHeartbeatTimeout)
 		if err != nil {
 			exc.logger.Printf("failed to read message: %v", err)
 			errors <- err
@@ -240,6 +273,19 @@ func ReadMessage(conn net.Conn, timeout time.Duration) (*Message, error) {
 	return result, nil
 }
 
+func ReadJSONMessage(conn net.Conn, timeout time.Duration) (*Message, error) {
+	conn.SetReadDeadline(time.Now().Add(DefaultHeartbeatTimeout))
+	var handle = &codec.JsonHandle{}
+	var decoder = codec.NewDecoder(conn, handle)
+	var result = &Message{}
+	err := decoder.Decode(result)
+	if err != nil {
+		return nil, err
+	}
+
+	return result, nil
+}
+
 func WriteMessage(conn net.Conn, timeout time.Duration, msg *Message) error {
 	var handle = &codec.MsgpackHandle{}
 	var buf []byte
@@ -267,6 +313,25 @@ func WriteMessage(conn net.Conn, timeout time.Duration, msg *Message) error {
 	return err
 }
 
+func WriteJSONMessage(conn net.Conn, timeout time.Duration, msg *Message) error {
+	var handle = &codec.JsonHandle{}
+	var buf []byte
+	var encoder = codec.NewEncoderBytes(&buf, handle)
+	err := encoder.Encode(msg)
+	if err != nil {
+		return err
+	}
+
+	conn.SetWriteDeadline(time.Now().Add(timeout))
+	_, err = io.Copy(conn, bytes.NewBuffer(buf))
+	if err != nil {
+		return err
+	}
+
+	_, err = conn.Write([]byte{'\n'})
+	return err
+}
+
 func (exc *Executor) createClient() (inbox, outbox chan *Message) {
 	reply := make(chan clientReply, 1)
 	exc.clientRequests <- reply