filesystem.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package api
  17. import (
  18. "bufio"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "os"
  23. "path/filepath"
  24. "sync"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/logger"
  27. "github.com/ethereum/go-ethereum/logger/glog"
  28. "github.com/ethereum/go-ethereum/swarm/storage"
  29. )
  30. const maxParallelFiles = 5
  31. type FileSystem struct {
  32. api *Api
  33. }
  34. func NewFileSystem(api *Api) *FileSystem {
  35. return &FileSystem{api}
  36. }
  37. // Upload replicates a local directory as a manifest file and uploads it
  38. // using dpa store
  39. // TODO: localpath should point to a manifest
  40. func (self *FileSystem) Upload(lpath, index string) (string, error) {
  41. var list []*manifestTrieEntry
  42. localpath, err := filepath.Abs(filepath.Clean(lpath))
  43. if err != nil {
  44. return "", err
  45. }
  46. f, err := os.Open(localpath)
  47. if err != nil {
  48. return "", err
  49. }
  50. stat, err := f.Stat()
  51. if err != nil {
  52. return "", err
  53. }
  54. var start int
  55. if stat.IsDir() {
  56. start = len(localpath)
  57. glog.V(logger.Debug).Infof("uploading '%s'", localpath)
  58. err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
  59. if (err == nil) && !info.IsDir() {
  60. //fmt.Printf("lp %s path %s\n", localpath, path)
  61. if len(path) <= start {
  62. return fmt.Errorf("Path is too short")
  63. }
  64. if path[:start] != localpath {
  65. return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath)
  66. }
  67. entry := &manifestTrieEntry{
  68. Path: filepath.ToSlash(path),
  69. }
  70. list = append(list, entry)
  71. }
  72. return err
  73. })
  74. if err != nil {
  75. return "", err
  76. }
  77. } else {
  78. dir := filepath.Dir(localpath)
  79. start = len(dir)
  80. if len(localpath) <= start {
  81. return "", fmt.Errorf("Path is too short")
  82. }
  83. if localpath[:start] != dir {
  84. return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir)
  85. }
  86. entry := &manifestTrieEntry{
  87. Path: filepath.ToSlash(localpath),
  88. }
  89. list = append(list, entry)
  90. }
  91. cnt := len(list)
  92. errors := make([]error, cnt)
  93. done := make(chan bool, maxParallelFiles)
  94. dcnt := 0
  95. awg := &sync.WaitGroup{}
  96. for i, entry := range list {
  97. if i >= dcnt+maxParallelFiles {
  98. <-done
  99. dcnt++
  100. }
  101. awg.Add(1)
  102. go func(i int, entry *manifestTrieEntry, done chan bool) {
  103. f, err := os.Open(entry.Path)
  104. if err == nil {
  105. stat, _ := f.Stat()
  106. var hash storage.Key
  107. wg := &sync.WaitGroup{}
  108. hash, err = self.api.dpa.Store(f, stat.Size(), wg, nil)
  109. if hash != nil {
  110. list[i].Hash = hash.String()
  111. }
  112. wg.Wait()
  113. awg.Done()
  114. if err == nil {
  115. first512 := make([]byte, 512)
  116. fread, _ := f.ReadAt(first512, 0)
  117. if fread > 0 {
  118. mimeType := http.DetectContentType(first512[:fread])
  119. if filepath.Ext(entry.Path) == ".css" {
  120. mimeType = "text/css"
  121. }
  122. list[i].ContentType = mimeType
  123. }
  124. }
  125. f.Close()
  126. }
  127. errors[i] = err
  128. done <- true
  129. }(i, entry, done)
  130. }
  131. for dcnt < cnt {
  132. <-done
  133. dcnt++
  134. }
  135. trie := &manifestTrie{
  136. dpa: self.api.dpa,
  137. }
  138. quitC := make(chan bool)
  139. for i, entry := range list {
  140. if errors[i] != nil {
  141. return "", errors[i]
  142. }
  143. entry.Path = RegularSlashes(entry.Path[start:])
  144. if entry.Path == index {
  145. ientry := &manifestTrieEntry{
  146. Path: "",
  147. Hash: entry.Hash,
  148. ContentType: entry.ContentType,
  149. }
  150. trie.addEntry(ientry, quitC)
  151. }
  152. trie.addEntry(entry, quitC)
  153. }
  154. err2 := trie.recalcAndStore()
  155. var hs string
  156. if err2 == nil {
  157. hs = trie.hash.String()
  158. }
  159. awg.Wait()
  160. return hs, err2
  161. }
  162. // Download replicates the manifest path structure on the local filesystem
  163. // under localpath
  164. func (self *FileSystem) Download(bzzpath, localpath string) error {
  165. lpath, err := filepath.Abs(filepath.Clean(localpath))
  166. if err != nil {
  167. return err
  168. }
  169. err = os.MkdirAll(lpath, os.ModePerm)
  170. if err != nil {
  171. return err
  172. }
  173. //resolving host and port
  174. key, _, path, err := self.api.parseAndResolve(bzzpath, true)
  175. if err != nil {
  176. return err
  177. }
  178. if len(path) > 0 {
  179. path += "/"
  180. }
  181. quitC := make(chan bool)
  182. trie, err := loadManifest(self.api.dpa, key, quitC)
  183. if err != nil {
  184. glog.V(logger.Warn).Infof("fs.Download: loadManifestTrie error: %v", err)
  185. return err
  186. }
  187. type downloadListEntry struct {
  188. key storage.Key
  189. path string
  190. }
  191. var list []*downloadListEntry
  192. var mde error
  193. prevPath := lpath
  194. err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
  195. glog.V(logger.Detail).Infof("fs.Download: %#v", entry)
  196. key = common.Hex2Bytes(entry.Hash)
  197. path := lpath + "/" + suffix
  198. dir := filepath.Dir(path)
  199. if dir != prevPath {
  200. mde = os.MkdirAll(dir, os.ModePerm)
  201. prevPath = dir
  202. }
  203. if (mde == nil) && (path != dir+"/") {
  204. list = append(list, &downloadListEntry{key: key, path: path})
  205. }
  206. })
  207. if err != nil {
  208. return err
  209. }
  210. wg := sync.WaitGroup{}
  211. errC := make(chan error)
  212. done := make(chan bool, maxParallelFiles)
  213. for i, entry := range list {
  214. select {
  215. case done <- true:
  216. wg.Add(1)
  217. case <-quitC:
  218. return fmt.Errorf("aborted")
  219. }
  220. go func(i int, entry *downloadListEntry) {
  221. defer wg.Done()
  222. err := retrieveToFile(quitC, self.api.dpa, entry.key, entry.path)
  223. if err != nil {
  224. select {
  225. case errC <- err:
  226. case <-quitC:
  227. }
  228. return
  229. }
  230. <-done
  231. }(i, entry)
  232. }
  233. go func() {
  234. wg.Wait()
  235. close(errC)
  236. }()
  237. select {
  238. case err = <-errC:
  239. return err
  240. case <-quitC:
  241. return fmt.Errorf("aborted")
  242. }
  243. }
  244. func retrieveToFile(quitC chan bool, dpa *storage.DPA, key storage.Key, path string) error {
  245. f, err := os.Create(path) // TODO: path separators
  246. if err != nil {
  247. return err
  248. }
  249. reader := dpa.Retrieve(key)
  250. writer := bufio.NewWriter(f)
  251. size, err := reader.Size(quitC)
  252. if err != nil {
  253. return err
  254. }
  255. if _, err = io.CopyN(writer, reader, size); err != nil {
  256. return err
  257. }
  258. if err := writer.Flush(); err != nil {
  259. return err
  260. }
  261. return f.Close()
  262. }