Răsfoiți Sursa

fixed ratelimiting

Vladimir Petrov 9 ani în urmă
părinte
comite
c1a9dc5657
3 a modificat fișierele cu 62 adăugiri și 14 ștergeri
  1. 38 9
      hookexecutor/executor.go
  2. 19 1
      hookexecutor/hookclient/client.go
  3. 5 4
      hookexecutor/trafcontrol.go

+ 38 - 9
hookexecutor/executor.go

@@ -20,7 +20,7 @@ const (
 	DefaultAddr             = "0.0.0.0:1984"
 	DefaultJSONAddr         = "0.0.0.0:1985"
 	DefaultInboxBufferSize  = 4
-	DefaultOutboxBufferSize = 4
+	DefaultOutboxBufferSize = 10
 	DefaultClientBufferSize = 8
 	DefaultHeartbeatTrigger = 5 * time.Second
 	DefaultHeartbeatTimeout = 10 * time.Second
@@ -29,6 +29,8 @@ const (
 	DefaultDelayForMessages = 1 * time.Second
 )
 
+var defaultShaper = ShaperConfig{RatePerMinute: 10, RatePer10sec: 3}
+
 type IncomingEvent struct {
 	Type string
 	Data map[string]string
@@ -36,7 +38,24 @@ type IncomingEvent struct {
 
 type Message struct {
 	*IncomingEvent
-	ID int
+	ID    int
+	Error string
+}
+
+func (m *Message) String() string {
+	var handle = &codec.JsonHandle{}
+	var buf []byte
+	var encoder = codec.NewEncoderBytes(&buf, handle)
+	err := encoder.Encode(m.Data)
+	if err != nil {
+		buf = []byte("<malformed data>")
+	}
+
+	if m.Error != "" {
+		return fmt.Sprintf("err_msg of %s [%d]: '%s', content: %s", m.Type, m.ID, m.Error, string(buf))
+	} else {
+		return fmt.Sprintf("msg of %s [%d] content: %s", m.Type, m.ID, string(buf))
+	}
 }
 
 type MessageWriter func(conn net.Conn, timeout time.Duration, msg *Message) error
@@ -202,7 +221,7 @@ func (exc *Executor) clientWriter(inbox chan *Message, write MessageWriter, conn
 				return
 			}
 		case <-heartbeatTicker.C:
-			ping := &Message{&IncomingEvent{"ping", nil}, -1}
+			ping := &Message{&IncomingEvent{"ping", nil}, -1, ""}
 			err := write(conn, DefaultHeartbeatTimeout, ping)
 			if err != nil {
 				exc.logger.Printf("failed to write ping message: %v", err)
@@ -346,26 +365,25 @@ func (exc *Executor) createClient() (inbox, outbox chan *Message) {
 	return r.info.inbox, r.outbox
 }
 
-var defaultShaper = ShaperConfig{RatePerMinute: 10, RatePer10sec: 5}
-
 func (exc *Executor) processEvents() {
 	defer stopPanic(exc, "processEvents", func(_ error) { exc.processEvents() })
 
 	for {
 		select {
 		case msg := <-exc.inbox:
-			message := &Message{msg, exc.counter}
+			message := &Message{msg, exc.counter, ""}
 			exc.sendMessage(message)
 			exc.counter++
 		case cmd := <-exc.cmdInbox:
 			// TODO(mechmind): handle cmds
 			exc.logger.Printf("ignoring cmd: '%s'", cmd)
 		case req := <-exc.clientRequests:
-			outbox := exc.outbox
 			inbox := make(chan *Message, DefaultClientBufferSize)
 			tcOutbox := make(chan *Message)
 
-			tc := NewTrafficController(defaultShaper, nil, tcOutbox, outbox, exc.logger)
+			rejecter := simpleRejecter(inbox, exc.logger)
+
+			tc := NewTrafficController(defaultShaper, rejecter, tcOutbox, exc.outbox, exc.logger)
 			tc.Start()
 			info := &clientInfo{
 				inbox:             inbox,
@@ -374,7 +392,18 @@ func (exc *Executor) processEvents() {
 			}
 
 			exc.clients = append(exc.clients, info)
-			req <- clientReply{outbox, info}
+			req <- clientReply{tcOutbox, info}
+		}
+	}
+}
+
+func simpleRejecter(inbox chan *Message, logger *log.Logger) Rejecter {
+	return func(msg *Message, reason string) {
+		msg.Error = reason
+		logger.Printf("rejecting message: %s", msg.String())
+		select {
+		case inbox <- msg:
+		default:
 		}
 	}
 }

+ 19 - 1
hookexecutor/hookclient/client.go

@@ -20,6 +20,7 @@ type Client struct {
 
 	prefixHandlers []stringMatchHandler
 	substrHandlers []stringMatchHandler
+	errorHandler   Handler
 
 	logger *log.Logger
 	stop   chan struct{}
@@ -46,6 +47,7 @@ func NewClient(addr string) *Client {
 		nil,
 		nil,
 		nil,
+		nil,
 		log.New(os.Stderr, "[hookclient] ", log.LstdFlags),
 		nil,
 	}
@@ -94,7 +96,7 @@ func (c *Client) run() {
 			}
 
 			if msg.Type == "ping" {
-				pong := &hookexecutor.Message{&hookexecutor.IncomingEvent{"pong", nil}, -1}
+				pong := &hookexecutor.Message{&hookexecutor.IncomingEvent{"pong", nil}, -1, ""}
 				outbox <- pong
 				continue
 			}
@@ -113,6 +115,18 @@ func (c *Client) run() {
 func (c *Client) selectHandlers(msg *hookexecutor.Message) []Handler {
 	handlers := []Handler{}
 
+	if msg.Error != "" {
+		if c.errorHandler != nil {
+			return []Handler{c.errorHandler}
+		} else {
+			logger := func(msg *hookexecutor.Message) (*hookexecutor.Message, error) {
+				c.logger.Printf("remote failure: %s", msg.String())
+				return nil, nil
+			}
+			return []Handler{HandlerFunc(logger)}
+		}
+	}
+
 	text := msg.Data["body"]
 	for _, prefixHandler := range c.prefixHandlers {
 		if strings.HasPrefix(text, prefixHandler.prefix) {
@@ -208,3 +222,7 @@ func (c *Client) HandlePrefix(prefix string, handler Handler) {
 func (c *Client) HandleSubstr(needle string, handler Handler) {
 	c.substrHandlers = append(c.substrHandlers, stringMatchHandler{needle, handler})
 }
+
+func (c *Client) HandleError(handler Handler) {
+	c.errorHandler = handler
+}

+ 5 - 4
hookexecutor/trafcontrol.go

@@ -89,19 +89,20 @@ func (tc *TrafficController) run() {
 func checkRateLimit(tsLog []*TrafficEvent, event *TrafficEvent, limit int, period time.Duration) rateLimitCheck {
 	target := time.Now().Add(-period)
 	total := 0
+
 	for i := len(tsLog) - 1; i > 0; i-- {
 		item := tsLog[i]
-		if target.After(tsLog[i].timestamp) {
+		if target.After(item.timestamp) {
 			break
 		}
 
 		total += item.score
-		if total > limit {
+		if total >= limit {
 			return RLOverflow
 		}
 	}
 
-	if total+event.score < limit {
+	if total+event.score <= limit {
 		return RLOK
 	} else {
 		return RLPartialOverflow
@@ -113,7 +114,7 @@ func addToLog(tsLog []*TrafficEvent, event *TrafficEvent, period time.Duration)
 
 	first := time.Now().Add(-period)
 	for idx, event := range tsLog {
-		if first.After(event.timestamp) {
+		if first.Before(event.timestamp) {
 			return tsLog[idx:]
 		}
 	}