filesystem.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package api
  17. import (
  18. "bufio"
  19. "context"
  20. "fmt"
  21. "io"
  22. "os"
  23. "path"
  24. "path/filepath"
  25. "sync"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/swarm/log"
  28. "github.com/ethereum/go-ethereum/swarm/storage"
  29. )
  30. const maxParallelFiles = 5
  31. type FileSystem struct {
  32. api *API
  33. }
  34. func NewFileSystem(api *API) *FileSystem {
  35. return &FileSystem{api}
  36. }
  37. // Upload replicates a local directory as a manifest file and uploads it
  38. // using FileStore store
  39. // This function waits the chunks to be stored.
  40. // TODO: localpath should point to a manifest
  41. //
  42. // DEPRECATED: Use the HTTP API instead
  43. func (fs *FileSystem) Upload(lpath, index string, toEncrypt bool) (string, error) {
  44. var list []*manifestTrieEntry
  45. localpath, err := filepath.Abs(filepath.Clean(lpath))
  46. if err != nil {
  47. return "", err
  48. }
  49. f, err := os.Open(localpath)
  50. if err != nil {
  51. return "", err
  52. }
  53. stat, err := f.Stat()
  54. if err != nil {
  55. return "", err
  56. }
  57. var start int
  58. if stat.IsDir() {
  59. start = len(localpath)
  60. log.Debug(fmt.Sprintf("uploading '%s'", localpath))
  61. err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
  62. if (err == nil) && !info.IsDir() {
  63. if len(path) <= start {
  64. return fmt.Errorf("Path is too short")
  65. }
  66. if path[:start] != localpath {
  67. return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath)
  68. }
  69. entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(path)}, nil)
  70. list = append(list, entry)
  71. }
  72. return err
  73. })
  74. if err != nil {
  75. return "", err
  76. }
  77. } else {
  78. dir := filepath.Dir(localpath)
  79. start = len(dir)
  80. if len(localpath) <= start {
  81. return "", fmt.Errorf("Path is too short")
  82. }
  83. if localpath[:start] != dir {
  84. return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir)
  85. }
  86. entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(localpath)}, nil)
  87. list = append(list, entry)
  88. }
  89. errors := make([]error, len(list))
  90. sem := make(chan bool, maxParallelFiles)
  91. defer close(sem)
  92. for i, entry := range list {
  93. sem <- true
  94. go func(i int, entry *manifestTrieEntry) {
  95. defer func() { <-sem }()
  96. f, err := os.Open(entry.Path)
  97. if err != nil {
  98. errors[i] = err
  99. return
  100. }
  101. defer f.Close()
  102. stat, err := f.Stat()
  103. if err != nil {
  104. errors[i] = err
  105. return
  106. }
  107. var hash storage.Address
  108. var wait func(context.Context) error
  109. ctx := context.TODO()
  110. hash, wait, err = fs.api.fileStore.Store(ctx, f, stat.Size(), toEncrypt)
  111. if hash != nil {
  112. list[i].Hash = hash.Hex()
  113. }
  114. if err := wait(ctx); err != nil {
  115. errors[i] = err
  116. return
  117. }
  118. list[i].ContentType, err = DetectContentType(f.Name(), f)
  119. if err != nil {
  120. errors[i] = err
  121. return
  122. }
  123. }(i, entry)
  124. }
  125. for i := 0; i < cap(sem); i++ {
  126. sem <- true
  127. }
  128. trie := &manifestTrie{
  129. fileStore: fs.api.fileStore,
  130. }
  131. quitC := make(chan bool)
  132. for i, entry := range list {
  133. if errors[i] != nil {
  134. return "", errors[i]
  135. }
  136. entry.Path = RegularSlashes(entry.Path[start:])
  137. if entry.Path == index {
  138. ientry := newManifestTrieEntry(&ManifestEntry{
  139. ContentType: entry.ContentType,
  140. }, nil)
  141. ientry.Hash = entry.Hash
  142. trie.addEntry(ientry, quitC)
  143. }
  144. trie.addEntry(entry, quitC)
  145. }
  146. err2 := trie.recalcAndStore()
  147. var hs string
  148. if err2 == nil {
  149. hs = trie.ref.Hex()
  150. }
  151. return hs, err2
  152. }
  153. // Download replicates the manifest basePath structure on the local filesystem
  154. // under localpath
  155. //
  156. // DEPRECATED: Use the HTTP API instead
  157. func (fs *FileSystem) Download(bzzpath, localpath string) error {
  158. lpath, err := filepath.Abs(filepath.Clean(localpath))
  159. if err != nil {
  160. return err
  161. }
  162. err = os.MkdirAll(lpath, os.ModePerm)
  163. if err != nil {
  164. return err
  165. }
  166. //resolving host and port
  167. uri, err := Parse(path.Join("bzz:/", bzzpath))
  168. if err != nil {
  169. return err
  170. }
  171. addr, err := fs.api.Resolve(context.TODO(), uri.Addr)
  172. if err != nil {
  173. return err
  174. }
  175. path := uri.Path
  176. if len(path) > 0 {
  177. path += "/"
  178. }
  179. quitC := make(chan bool)
  180. trie, err := loadManifest(context.TODO(), fs.api.fileStore, addr, quitC, NOOPDecrypt)
  181. if err != nil {
  182. log.Warn(fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err))
  183. return err
  184. }
  185. type downloadListEntry struct {
  186. addr storage.Address
  187. path string
  188. }
  189. var list []*downloadListEntry
  190. var mde error
  191. prevPath := lpath
  192. err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
  193. log.Trace(fmt.Sprintf("fs.Download: %#v", entry))
  194. addr = common.Hex2Bytes(entry.Hash)
  195. path := lpath + "/" + suffix
  196. dir := filepath.Dir(path)
  197. if dir != prevPath {
  198. mde = os.MkdirAll(dir, os.ModePerm)
  199. prevPath = dir
  200. }
  201. if (mde == nil) && (path != dir+"/") {
  202. list = append(list, &downloadListEntry{addr: addr, path: path})
  203. }
  204. })
  205. if err != nil {
  206. return err
  207. }
  208. wg := sync.WaitGroup{}
  209. errC := make(chan error)
  210. done := make(chan bool, maxParallelFiles)
  211. for i, entry := range list {
  212. select {
  213. case done <- true:
  214. wg.Add(1)
  215. case <-quitC:
  216. return fmt.Errorf("aborted")
  217. }
  218. go func(i int, entry *downloadListEntry) {
  219. defer wg.Done()
  220. err := retrieveToFile(quitC, fs.api.fileStore, entry.addr, entry.path)
  221. if err != nil {
  222. select {
  223. case errC <- err:
  224. case <-quitC:
  225. }
  226. return
  227. }
  228. <-done
  229. }(i, entry)
  230. }
  231. go func() {
  232. wg.Wait()
  233. close(errC)
  234. }()
  235. select {
  236. case err = <-errC:
  237. return err
  238. case <-quitC:
  239. return fmt.Errorf("aborted")
  240. }
  241. }
  242. func retrieveToFile(quitC chan bool, fileStore *storage.FileStore, addr storage.Address, path string) error {
  243. f, err := os.Create(path) // TODO: basePath separators
  244. if err != nil {
  245. return err
  246. }
  247. reader, _ := fileStore.Retrieve(context.TODO(), addr)
  248. writer := bufio.NewWriter(f)
  249. size, err := reader.Size(context.TODO(), quitC)
  250. if err != nil {
  251. return err
  252. }
  253. if _, err = io.CopyN(writer, reader, size); err != nil {
  254. return err
  255. }
  256. if err := writer.Flush(); err != nil {
  257. return err
  258. }
  259. return f.Close()
  260. }