1
0

xml.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package stream
  2. import (
  3. "bytes"
  4. "encoding/xml"
  5. "github.com/kpmy/ypk/halt"
  6. "io"
  7. "reflect"
  8. )
  9. func spl1t(bunch io.Reader) (ret chan []byte) {
  10. ret = make(chan []byte)
  11. go func() {
  12. d := xml.NewDecoder(bunch)
  13. d.Strict = false
  14. var (
  15. _t xml.Token
  16. err error
  17. buf *bytes.Buffer
  18. e *xml.Encoder
  19. )
  20. init := func() {
  21. buf = bytes.NewBuffer(nil)
  22. e = xml.NewEncoder(buf)
  23. }
  24. flush := func() {
  25. e.Flush()
  26. if buf.Len() > 0 {
  27. ret <- buf.Bytes()
  28. }
  29. init()
  30. }
  31. join := func(n xml.Name) (ret string) {
  32. if n.Space != "" {
  33. ret = n.Space + ":"
  34. }
  35. ret = ret + n.Local
  36. return
  37. }
  38. depth := 0
  39. init()
  40. for stop := false; !stop && err == nil; {
  41. if _t, err = d.RawToken(); err == nil {
  42. switch t := _t.(type) {
  43. case xml.ProcInst:
  44. e.EncodeToken(t.Copy())
  45. case xml.StartElement:
  46. tt := t.Copy()
  47. tt.Name = xml.Name{Local: join(t.Name)}
  48. var tmp []xml.Attr
  49. for _, a := range tt.Attr {
  50. a.Name = xml.Name{Local: join(a.Name)}
  51. tmp = append(tmp, a)
  52. }
  53. tt.Attr = tmp
  54. e.EncodeToken(tt)
  55. if tt.Name.Local == "stream:stream" {
  56. depth--
  57. flush()
  58. }
  59. depth++
  60. case xml.EndElement:
  61. tt := t
  62. tt.Name = xml.Name{Local: join(t.Name)}
  63. e.EncodeToken(tt)
  64. depth--
  65. if depth == 0 {
  66. flush()
  67. }
  68. case xml.CharData:
  69. e.EncodeToken(t)
  70. default:
  71. halt.As(100, reflect.TypeOf(t))
  72. }
  73. } else {
  74. flush()
  75. }
  76. }
  77. close(ret)
  78. }()
  79. return
  80. }