|
@@ -1,8 +1,10 @@
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
+ "bufio"
|
|
|
"bytes"
|
|
|
"github.com/abbot/go-http-auth"
|
|
|
+ "github.com/kpmy/mipfs/ipfs_api"
|
|
|
"github.com/kpmy/mipfs/wdfs"
|
|
|
. "github.com/kpmy/ypk/tc"
|
|
|
"github.com/streamrail/concurrent-map"
|
|
@@ -12,6 +14,8 @@ import (
|
|
|
"log"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
var um cmap.ConcurrentMap = cmap.New()
|
|
@@ -23,6 +27,57 @@ type FileLockSys struct {
|
|
|
dav *webdav.Handler
|
|
|
}
|
|
|
|
|
|
+const HistoryLimit = 256
|
|
|
+
|
|
|
+type pin struct {
|
|
|
+ hash string
|
|
|
+ pin bool
|
|
|
+}
|
|
|
+
|
|
|
+func writeRoot(ch chan string, user string) {
|
|
|
+ pinCh := make(chan pin, 1024)
|
|
|
+ go func() {
|
|
|
+ for p := range pinCh {
|
|
|
+ if p.pin {
|
|
|
+ ipfs_api.Shell().Pin(p.hash)
|
|
|
+ log.Println("pin", p.hash)
|
|
|
+ } else {
|
|
|
+ ipfs_api.Shell().Unpin(p.hash)
|
|
|
+ log.Println("unpin", p.hash)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ for {
|
|
|
+ for s := range ch {
|
|
|
+ if s != "" {
|
|
|
+ if old, err := KV.Read(user + ".root"); err == nil && s != string(old) {
|
|
|
+ history := new(bytes.Buffer)
|
|
|
+ history.Write(old)
|
|
|
+ history.Write([]byte("\n"))
|
|
|
+ if hs, err := KV.Read(user + ".root.history"); err == nil {
|
|
|
+ oldHistory := bytes.NewBuffer(hs)
|
|
|
+ io.CopyN(history, oldHistory, int64(history.Len()*HistoryLimit)) //лимит истории
|
|
|
+ rd := bufio.NewReader(oldHistory)
|
|
|
+ for {
|
|
|
+ if us, err := rd.ReadString('\n'); err == nil && us != "" {
|
|
|
+ us = strings.TrimSpace(us)
|
|
|
+ pinCh <- pin{hash: us}
|
|
|
+ } else {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ KV.Write(user+".root.history", history.Bytes())
|
|
|
+ }
|
|
|
+ KV.Write(user+".root", []byte(s))
|
|
|
+ pinCh <- pin{hash: s, pin: true}
|
|
|
+ } else {
|
|
|
+ Halt(100, "empty root")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func handler() http.Handler {
|
|
|
return auth.NewBasicAuthenticator("ipfs", func(user, realm string) (ret string) {
|
|
|
un := zbase32.EncodeToString([]byte(user))
|
|
@@ -35,39 +90,28 @@ func handler() http.Handler {
|
|
|
user := zbase32.EncodeToString([]byte(req.Username))
|
|
|
fl := new(FileLockSys)
|
|
|
|
|
|
- rootWr := make(chan string, 256)
|
|
|
- go func(ch chan string, user string) {
|
|
|
- for {
|
|
|
- i := 0
|
|
|
- for s := range ch {
|
|
|
- if s != "" {
|
|
|
- if old, err := KV.Read(user + ".root"); err == nil && s != string(old) {
|
|
|
- history := new(bytes.Buffer)
|
|
|
- history.Write(old)
|
|
|
- history.Write([]byte("\n"))
|
|
|
- if hs, err := KV.Read(user + ".root.history"); err == nil {
|
|
|
- io.CopyN(history, bytes.NewBuffer(hs), int64(history.Len()*128)) //лимит истории
|
|
|
- }
|
|
|
- KV.Write(user+".root.history", history.Bytes())
|
|
|
- i++
|
|
|
- }
|
|
|
- KV.Write(user+".root", []byte(s))
|
|
|
-
|
|
|
- } else {
|
|
|
- Halt(100, "empty root")
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }(rootWr, user)
|
|
|
+ rootWr := make(chan string, 1024)
|
|
|
+ go writeRoot(rootWr, user)
|
|
|
|
|
|
defaultRoot := wdfs.EmptyDirHash
|
|
|
if r, err := KV.Read(user + ".root"); err == nil && len(r) > 0 {
|
|
|
defaultRoot = string(r)
|
|
|
+ found := make(chan string)
|
|
|
+ go func() {
|
|
|
+ if _, err := ipfs_api.Shell().BlockGet(defaultRoot); err != nil {
|
|
|
+ defaultRoot = wdfs.EmptyDirHash
|
|
|
+ }
|
|
|
+ found <- defaultRoot
|
|
|
+ }()
|
|
|
+ select {
|
|
|
+ case <-found:
|
|
|
+ case <-time.After(10 * time.Second):
|
|
|
+ defaultRoot = wdfs.EmptyDirHash
|
|
|
+ }
|
|
|
} else {
|
|
|
KV.Write(user+".root", []byte(defaultRoot))
|
|
|
}
|
|
|
rm.Set(user+".root", defaultRoot)
|
|
|
-
|
|
|
fs := wdfs.NewFS(func() string {
|
|
|
r, _ := rm.Get(user + ".root")
|
|
|
return r.(string)
|