filesystem.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package api
  17. import (
  18. "bufio"
  19. "context"
  20. "fmt"
  21. "io"
  22. "os"
  23. "path"
  24. "path/filepath"
  25. "sync"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/swarm/log"
  28. "github.com/ethereum/go-ethereum/swarm/storage"
  29. )
  30. const maxParallelFiles = 5
  31. type FileSystem struct {
  32. api *API
  33. }
  34. func NewFileSystem(api *API) *FileSystem {
  35. return &FileSystem{api}
  36. }
  37. // Upload replicates a local directory as a manifest file and uploads it
  38. // using FileStore store
  39. // This function waits the chunks to be stored.
  40. // TODO: localpath should point to a manifest
  41. //
  42. // DEPRECATED: Use the HTTP API instead
  43. func (fs *FileSystem) Upload(lpath, index string, toEncrypt bool) (string, error) {
  44. var list []*manifestTrieEntry
  45. localpath, err := filepath.Abs(filepath.Clean(lpath))
  46. if err != nil {
  47. return "", err
  48. }
  49. f, err := os.Open(localpath)
  50. if err != nil {
  51. return "", err
  52. }
  53. stat, err := f.Stat()
  54. if err != nil {
  55. return "", err
  56. }
  57. var start int
  58. if stat.IsDir() {
  59. start = len(localpath)
  60. log.Debug(fmt.Sprintf("uploading '%s'", localpath))
  61. err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
  62. if (err == nil) && !info.IsDir() {
  63. if len(path) <= start {
  64. return fmt.Errorf("Path is too short")
  65. }
  66. if path[:start] != localpath {
  67. return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath)
  68. }
  69. entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(path)}, nil)
  70. list = append(list, entry)
  71. }
  72. return err
  73. })
  74. if err != nil {
  75. return "", err
  76. }
  77. } else {
  78. dir := filepath.Dir(localpath)
  79. start = len(dir)
  80. if len(localpath) <= start {
  81. return "", fmt.Errorf("Path is too short")
  82. }
  83. if localpath[:start] != dir {
  84. return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir)
  85. }
  86. entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(localpath)}, nil)
  87. list = append(list, entry)
  88. }
  89. errors := make([]error, len(list))
  90. sem := make(chan bool, maxParallelFiles)
  91. defer close(sem)
  92. for i, entry := range list {
  93. sem <- true
  94. go func(i int, entry *manifestTrieEntry) {
  95. defer func() { <-sem }()
  96. f, err := os.Open(entry.Path)
  97. if err != nil {
  98. errors[i] = err
  99. return
  100. }
  101. defer f.Close()
  102. stat, err := f.Stat()
  103. if err != nil {
  104. errors[i] = err
  105. return
  106. }
  107. var hash storage.Address
  108. var wait func(context.Context) error
  109. ctx := context.TODO()
  110. hash, wait, err = fs.api.fileStore.Store(ctx, f, stat.Size(), toEncrypt)
  111. if err != nil {
  112. errors[i] = err
  113. return
  114. }
  115. if hash != nil {
  116. list[i].Hash = hash.Hex()
  117. }
  118. if err := wait(ctx); err != nil {
  119. errors[i] = err
  120. return
  121. }
  122. list[i].ContentType, err = DetectContentType(f.Name(), f)
  123. if err != nil {
  124. errors[i] = err
  125. return
  126. }
  127. }(i, entry)
  128. }
  129. for i := 0; i < cap(sem); i++ {
  130. sem <- true
  131. }
  132. trie := &manifestTrie{
  133. fileStore: fs.api.fileStore,
  134. }
  135. quitC := make(chan bool)
  136. for i, entry := range list {
  137. if errors[i] != nil {
  138. return "", errors[i]
  139. }
  140. entry.Path = RegularSlashes(entry.Path[start:])
  141. if entry.Path == index {
  142. ientry := newManifestTrieEntry(&ManifestEntry{
  143. ContentType: entry.ContentType,
  144. }, nil)
  145. ientry.Hash = entry.Hash
  146. trie.addEntry(ientry, quitC)
  147. }
  148. trie.addEntry(entry, quitC)
  149. }
  150. err2 := trie.recalcAndStore()
  151. var hs string
  152. if err2 == nil {
  153. hs = trie.ref.Hex()
  154. }
  155. return hs, err2
  156. }
  157. // Download replicates the manifest basePath structure on the local filesystem
  158. // under localpath
  159. //
  160. // DEPRECATED: Use the HTTP API instead
  161. func (fs *FileSystem) Download(bzzpath, localpath string) error {
  162. lpath, err := filepath.Abs(filepath.Clean(localpath))
  163. if err != nil {
  164. return err
  165. }
  166. err = os.MkdirAll(lpath, os.ModePerm)
  167. if err != nil {
  168. return err
  169. }
  170. //resolving host and port
  171. uri, err := Parse(path.Join("bzz:/", bzzpath))
  172. if err != nil {
  173. return err
  174. }
  175. addr, err := fs.api.Resolve(context.TODO(), uri.Addr)
  176. if err != nil {
  177. return err
  178. }
  179. path := uri.Path
  180. if len(path) > 0 {
  181. path += "/"
  182. }
  183. quitC := make(chan bool)
  184. trie, err := loadManifest(context.TODO(), fs.api.fileStore, addr, quitC, NOOPDecrypt)
  185. if err != nil {
  186. log.Warn(fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err))
  187. return err
  188. }
  189. type downloadListEntry struct {
  190. addr storage.Address
  191. path string
  192. }
  193. var list []*downloadListEntry
  194. var mde error
  195. prevPath := lpath
  196. err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
  197. log.Trace(fmt.Sprintf("fs.Download: %#v", entry))
  198. addr = common.Hex2Bytes(entry.Hash)
  199. path := lpath + "/" + suffix
  200. dir := filepath.Dir(path)
  201. if dir != prevPath {
  202. mde = os.MkdirAll(dir, os.ModePerm)
  203. prevPath = dir
  204. }
  205. if (mde == nil) && (path != dir+"/") {
  206. list = append(list, &downloadListEntry{addr: addr, path: path})
  207. }
  208. })
  209. if err != nil {
  210. return err
  211. }
  212. wg := sync.WaitGroup{}
  213. errC := make(chan error)
  214. done := make(chan bool, maxParallelFiles)
  215. for i, entry := range list {
  216. select {
  217. case done <- true:
  218. wg.Add(1)
  219. case <-quitC:
  220. return fmt.Errorf("aborted")
  221. }
  222. go func(i int, entry *downloadListEntry) {
  223. defer wg.Done()
  224. err := retrieveToFile(quitC, fs.api.fileStore, entry.addr, entry.path)
  225. if err != nil {
  226. select {
  227. case errC <- err:
  228. case <-quitC:
  229. }
  230. return
  231. }
  232. <-done
  233. }(i, entry)
  234. }
  235. go func() {
  236. wg.Wait()
  237. close(errC)
  238. }()
  239. select {
  240. case err = <-errC:
  241. return err
  242. case <-quitC:
  243. return fmt.Errorf("aborted")
  244. }
  245. }
  246. func retrieveToFile(quitC chan bool, fileStore *storage.FileStore, addr storage.Address, path string) error {
  247. f, err := os.Create(path) // TODO: basePath separators
  248. if err != nil {
  249. return err
  250. }
  251. reader, _ := fileStore.Retrieve(context.TODO(), addr)
  252. writer := bufio.NewWriter(f)
  253. size, err := reader.Size(context.TODO(), quitC)
  254. if err != nil {
  255. return err
  256. }
  257. if _, err = io.CopyN(writer, reader, size); err != nil {
  258. return err
  259. }
  260. if err := writer.Flush(); err != nil {
  261. return err
  262. }
  263. return f.Close()
  264. }