Prechádzať zdrojové kódy

уточнил интерфейс бегунка по буферу

kpmy 9 rokov pred
rodič
commit
e546d9efb9

+ 2 - 3
c2s/actors/steps/auth.go

@@ -20,15 +20,14 @@ func (p *PlainAuth) Act() func(stream.Stream) error {
 		*auth = entity.PlainAuthPrototype
 		auth.Init(p.Client.Name, p.Pwd)
 		if err = s.Write(entity.Produce(auth)); err == nil {
-			s.Ring(func(b *bytes.Buffer) (ret *bytes.Buffer) {
+			s.Ring(func(b *bytes.Buffer) (done bool) {
 				var _e entity.Entity
 				if _e, err = entity.Consume(b); err == nil {
 					switch e := _e.(type) {
 					case *entity.Success:
-
+						done = true
 					default:
 						log.Println(reflect.TypeOf(e))
-						ret = b //pass
 					}
 				}
 				return

+ 4 - 8
c2s/actors/steps/bind.go

@@ -19,7 +19,7 @@ func (b *Bind) Act() func(s stream.Stream) error {
 		bind.Resource = b.Rsrc
 		iq := entity.IQ(entity.SET, bind)
 		if err = s.Write(entity.Produce(iq)); err == nil {
-			s.Ring(func(b *bytes.Buffer) (ret *bytes.Buffer) {
+			s.Ring(func(b *bytes.Buffer) (done bool) {
 				var _e entity.Entity
 				if _e, err = entity.Consume(b); err == nil {
 					switch e := _e.(type) {
@@ -27,12 +27,10 @@ func (b *Bind) Act() func(s stream.Stream) error {
 						switch {
 						case e.Id == iq.Id && e.Type == entity.RESULT:
 							stream.Bind(s, e.Inner.(*entity.Bind).Jid)
-						default:
-							ret = b //pass
+							done = true
 						}
 					default:
 						log.Println(reflect.TypeOf(e))
-						ret = b //pass
 					}
 				}
 				return
@@ -45,19 +43,17 @@ func (b *Bind) Act() func(s stream.Stream) error {
 func Session(s stream.Stream) (err error) {
 	iq := entity.IQ(entity.SET, &entity.SessionPrototype)
 	if err = s.Write(entity.Produce(iq)); err == nil {
-		s.Ring(func(b *bytes.Buffer) (ret *bytes.Buffer) {
+		s.Ring(func(b *bytes.Buffer) (done bool) {
 			var _e entity.Entity
 			if _e, err = entity.Consume(b); err == nil {
 				switch e := _e.(type) {
 				case *entity.Iq:
 					switch {
 					case e.Id == iq.Id && e.Type == entity.RESULT:
-					default:
-						ret = b
+						done = true
 					}
 				default:
 					log.Println(reflect.TypeOf(e))
-					ret = b //pass
 				}
 			}
 			return

+ 4 - 5
c2s/actors/steps/first.go

@@ -13,18 +13,17 @@ import (
 
 func Starter(s stream.Stream) (err error) {
 	if err = s.Write(entity.Open(s.Server()).Produce()); err == nil {
-		s.Ring(func(b *bytes.Buffer) (ret *bytes.Buffer) {
+		s.Ring(func(b *bytes.Buffer) (done bool) {
 			_e, e := entity.Consume(b)
 			if _e != nil {
 				switch e := _e.(type) {
 				case *entity.Stream:
 					s.Id(e.Id)
+					done = true
 				default:
 					log.Println(reflect.TypeOf(e))
-					ret = b //pass
 				}
 			} else if e == nil {
-				ret = nil
 				err = errors.New(fmt.Sprint("unknown entity ", string(b.Bytes())))
 			} else {
 				err = e
@@ -41,15 +40,15 @@ type Negotiation struct {
 
 func (n *Negotiation) Act() func(stream.Stream) error {
 	return func(s stream.Stream) (err error) {
-		s.Ring(func(b *bytes.Buffer) (ret *bytes.Buffer) {
+		s.Ring(func(b *bytes.Buffer) (done bool) {
 			var _e entity.Entity
 			if _e, err = entity.Consume(b); err == nil {
 				switch e := _e.(type) {
 				case *entity.Features:
 					n.AuthMechanisms = e.Mechanisms
+					done = true
 				default:
 					log.Println(reflect.TypeOf(e))
-					ret = b //pass
 				}
 			}
 			return

+ 9 - 23
c2s/stream/stream.go

@@ -5,7 +5,6 @@ import (
 	"errors"
 	"github.com/kpmy/ypk/halt"
 	"hash/adler32"
-	"log"
 	"net"
 	"reflect"
 	"time"
@@ -16,7 +15,7 @@ import (
 type Stream interface {
 	Server() *units.Server
 	Write(*bytes.Buffer) error
-	Ring(func(*bytes.Buffer) *bytes.Buffer, time.Duration)
+	Ring(func(*bytes.Buffer) bool, time.Duration)
 	Id(...string) string
 }
 
@@ -26,7 +25,7 @@ type wrapperStream struct {
 
 func (w *wrapperStream) Write(b *bytes.Buffer) error { return w.base.Write(b) }
 
-func (w *wrapperStream) Ring(fn func(*bytes.Buffer) *bytes.Buffer, t time.Duration) {
+func (w *wrapperStream) Ring(fn func(*bytes.Buffer) bool, t time.Duration) {
 	w.base.Ring(fn, t)
 }
 
@@ -38,10 +37,10 @@ type dummyStream struct {
 	to *units.Server
 }
 
-func (d *dummyStream) Ring(func(*bytes.Buffer) *bytes.Buffer, time.Duration) { panic(126) }
-func (d *dummyStream) Write(b *bytes.Buffer) error                           { panic(126) }
-func (d *dummyStream) Server() *units.Server                                 { return d.to }
-func (d *dummyStream) Id(...string) string                                   { return "" }
+func (d *dummyStream) Ring(func(*bytes.Buffer) bool, time.Duration) { panic(126) }
+func (d *dummyStream) Write(b *bytes.Buffer) error                  { panic(126) }
+func (d *dummyStream) Server() *units.Server                        { return d.to }
+func (d *dummyStream) Id(...string) string                          { return "" }
 
 type xmppStream struct {
 	to   *units.Server
@@ -66,15 +65,11 @@ func (x *xmppStream) Id(s ...string) string {
 func (x *xmppStream) Server() *units.Server { return x.to }
 
 func (x *xmppStream) Write(b *bytes.Buffer) (err error) {
-	log.Println("OUT")
-	log.Println(string(b.Bytes()))
-	log.Println()
 	_, err = x.conn.Write(b.Bytes())
 	return
 }
 
-func (x *xmppStream) Ring(fn func(*bytes.Buffer) *bytes.Buffer, timeout time.Duration) {
-	log.Println("wait")
+func (x *xmppStream) Ring(fn func(*bytes.Buffer) bool, timeout time.Duration) {
 	timed := make(chan bool)
 	if timeout > 0 {
 		go func() {
@@ -85,13 +80,10 @@ func (x *xmppStream) Ring(fn func(*bytes.Buffer) *bytes.Buffer, timeout time.Dur
 	for stop := false; !stop; {
 		select {
 		case p := <-x.data:
-			log.Println("try ", p.hash)
-			res := fn(bytes.NewBuffer(p.data))
-			if res != nil {
-				log.Println("passed", p.hash)
+			done := fn(bytes.NewBuffer(p.data))
+			if !done {
 				x.data <- pack{data: p.data, hash: p.hash}
 			} else {
-				log.Println("done", p.hash)
 				stop = true
 			}
 		case <-timed:
@@ -140,13 +132,7 @@ func Dial(_s Stream) (err error) {
 							if n > 0 && err == nil {
 								data := make([]byte, n)
 								copy(data, buf)
-								log.Println("PRE", len(data), adler32.Checksum(data))
-								log.Println(string(data))
-								log.Println()
 								for data := range spl1t(data) {
-									log.Println("IN", len(data), adler32.Checksum(data))
-									log.Println(string(data))
-									log.Println()
 									stream.data <- pack{data: data, hash: adler32.Checksum(data)}
 								}
 							}

+ 1 - 5
c2s/stream/xml.go

@@ -4,8 +4,6 @@ import (
 	"bytes"
 	"encoding/xml"
 	"github.com/kpmy/ypk/halt"
-	"io"
-	"log"
 	"reflect"
 )
 
@@ -69,10 +67,8 @@ func spl1t(bunch []byte) (ret chan []byte) {
 				default:
 					halt.As(100, reflect.TypeOf(t))
 				}
-			} else if err == io.EOF {
-				flush()
 			} else {
-				log.Println(err)
+				flush()
 			}
 		}
 		close(ret)

+ 10 - 7
muc-client/main.go

@@ -10,6 +10,8 @@ import (
 	"github.com/skratchdot/open-golang/open"
 	"html/template"
 	"log"
+	"math/rand"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -60,17 +62,19 @@ func init() {
 	posts = new(Posts)
 }
 
-func conv(fn func(entity.Entity)) func(*bytes.Buffer) *bytes.Buffer {
-	return func(in *bytes.Buffer) (out *bytes.Buffer) {
+func conv(fn func(entity.Entity)) func(*bytes.Buffer) bool {
+	return func(in *bytes.Buffer) (done bool) {
+		done = true
+		log.Println("IN")
+		log.Println(string(in.Bytes()))
+		log.Println()
 		if _e, err := entity.Consume(in); err == nil {
 			switch e := _e.(type) {
 			case *entity.Message:
 				fn(e)
 			}
 		} else {
-			log.Println("ERROR", err)
-			log.Println(string(in.Bytes()))
-			log.Println()
+			log.Println(err)
 		}
 		return
 	}
@@ -93,7 +97,7 @@ func main() {
 			if neg.HasMechanism("PLAIN") {
 				auth := &steps.PlainAuth{Client: c, Pwd: pwd}
 				neg := &steps.Negotiation{}
-				bind := &steps.Bind{Rsrc: resource}
+				bind := &steps.Bind{Rsrc: resource + strconv.Itoa(rand.Intn(500))}
 				actors.With(st).Do(auth.Act(), errHandler).Do(steps.Starter).Do(neg.Act()).Do(bind.Act()).Do(steps.Session).Run()
 				actors.With(st).Do(steps.InitialPresence).Run()
 				actors.With(st).Do(func(st stream.Stream) error {
@@ -106,7 +110,6 @@ func main() {
 									posts.Lock()
 									posts.data = append(posts.data, Post{User: strings.TrimPrefix(e.From, "golang@conference.jabber.ru/"),
 										Msg: e.Body})
-									log.Println(len(posts.data))
 									posts.Unlock()
 								}
 							}