filesystem.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package api
  17. import (
  18. "bufio"
  19. "context"
  20. "fmt"
  21. "io"
  22. "net/http"
  23. "os"
  24. "path"
  25. "path/filepath"
  26. "sync"
  27. "github.com/ethereum/go-ethereum/common"
  28. "github.com/ethereum/go-ethereum/swarm/log"
  29. "github.com/ethereum/go-ethereum/swarm/storage"
  30. )
  31. const maxParallelFiles = 5
  32. type FileSystem struct {
  33. api *API
  34. }
  35. func NewFileSystem(api *API) *FileSystem {
  36. return &FileSystem{api}
  37. }
  38. // Upload replicates a local directory as a manifest file and uploads it
  39. // using FileStore store
  40. // This function waits the chunks to be stored.
  41. // TODO: localpath should point to a manifest
  42. //
  43. // DEPRECATED: Use the HTTP API instead
  44. func (fs *FileSystem) Upload(lpath, index string, toEncrypt bool) (string, error) {
  45. var list []*manifestTrieEntry
  46. localpath, err := filepath.Abs(filepath.Clean(lpath))
  47. if err != nil {
  48. return "", err
  49. }
  50. f, err := os.Open(localpath)
  51. if err != nil {
  52. return "", err
  53. }
  54. stat, err := f.Stat()
  55. if err != nil {
  56. return "", err
  57. }
  58. var start int
  59. if stat.IsDir() {
  60. start = len(localpath)
  61. log.Debug(fmt.Sprintf("uploading '%s'", localpath))
  62. err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
  63. if (err == nil) && !info.IsDir() {
  64. if len(path) <= start {
  65. return fmt.Errorf("Path is too short")
  66. }
  67. if path[:start] != localpath {
  68. return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath)
  69. }
  70. entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(path)}, nil)
  71. list = append(list, entry)
  72. }
  73. return err
  74. })
  75. if err != nil {
  76. return "", err
  77. }
  78. } else {
  79. dir := filepath.Dir(localpath)
  80. start = len(dir)
  81. if len(localpath) <= start {
  82. return "", fmt.Errorf("Path is too short")
  83. }
  84. if localpath[:start] != dir {
  85. return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir)
  86. }
  87. entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(localpath)}, nil)
  88. list = append(list, entry)
  89. }
  90. cnt := len(list)
  91. errors := make([]error, cnt)
  92. done := make(chan bool, maxParallelFiles)
  93. dcnt := 0
  94. awg := &sync.WaitGroup{}
  95. for i, entry := range list {
  96. if i >= dcnt+maxParallelFiles {
  97. <-done
  98. dcnt++
  99. }
  100. awg.Add(1)
  101. go func(i int, entry *manifestTrieEntry, done chan bool) {
  102. f, err := os.Open(entry.Path)
  103. if err == nil {
  104. stat, _ := f.Stat()
  105. var hash storage.Address
  106. var wait func(context.Context) error
  107. ctx := context.TODO()
  108. hash, wait, err = fs.api.fileStore.Store(ctx, f, stat.Size(), toEncrypt)
  109. if hash != nil {
  110. list[i].Hash = hash.Hex()
  111. }
  112. err = wait(ctx)
  113. awg.Done()
  114. if err == nil {
  115. first512 := make([]byte, 512)
  116. fread, _ := f.ReadAt(first512, 0)
  117. if fread > 0 {
  118. mimeType := http.DetectContentType(first512[:fread])
  119. if filepath.Ext(entry.Path) == ".css" {
  120. mimeType = "text/css"
  121. }
  122. list[i].ContentType = mimeType
  123. }
  124. }
  125. f.Close()
  126. }
  127. errors[i] = err
  128. done <- true
  129. }(i, entry, done)
  130. }
  131. for dcnt < cnt {
  132. <-done
  133. dcnt++
  134. }
  135. trie := &manifestTrie{
  136. fileStore: fs.api.fileStore,
  137. }
  138. quitC := make(chan bool)
  139. for i, entry := range list {
  140. if errors[i] != nil {
  141. return "", errors[i]
  142. }
  143. entry.Path = RegularSlashes(entry.Path[start:])
  144. if entry.Path == index {
  145. ientry := newManifestTrieEntry(&ManifestEntry{
  146. ContentType: entry.ContentType,
  147. }, nil)
  148. ientry.Hash = entry.Hash
  149. trie.addEntry(ientry, quitC)
  150. }
  151. trie.addEntry(entry, quitC)
  152. }
  153. err2 := trie.recalcAndStore()
  154. var hs string
  155. if err2 == nil {
  156. hs = trie.ref.Hex()
  157. }
  158. awg.Wait()
  159. return hs, err2
  160. }
  161. // Download replicates the manifest basePath structure on the local filesystem
  162. // under localpath
  163. //
  164. // DEPRECATED: Use the HTTP API instead
  165. func (fs *FileSystem) Download(bzzpath, localpath string) error {
  166. lpath, err := filepath.Abs(filepath.Clean(localpath))
  167. if err != nil {
  168. return err
  169. }
  170. err = os.MkdirAll(lpath, os.ModePerm)
  171. if err != nil {
  172. return err
  173. }
  174. //resolving host and port
  175. uri, err := Parse(path.Join("bzz:/", bzzpath))
  176. if err != nil {
  177. return err
  178. }
  179. addr, err := fs.api.Resolve(context.TODO(), uri)
  180. if err != nil {
  181. return err
  182. }
  183. path := uri.Path
  184. if len(path) > 0 {
  185. path += "/"
  186. }
  187. quitC := make(chan bool)
  188. trie, err := loadManifest(context.TODO(), fs.api.fileStore, addr, quitC)
  189. if err != nil {
  190. log.Warn(fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err))
  191. return err
  192. }
  193. type downloadListEntry struct {
  194. addr storage.Address
  195. path string
  196. }
  197. var list []*downloadListEntry
  198. var mde error
  199. prevPath := lpath
  200. err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
  201. log.Trace(fmt.Sprintf("fs.Download: %#v", entry))
  202. addr = common.Hex2Bytes(entry.Hash)
  203. path := lpath + "/" + suffix
  204. dir := filepath.Dir(path)
  205. if dir != prevPath {
  206. mde = os.MkdirAll(dir, os.ModePerm)
  207. prevPath = dir
  208. }
  209. if (mde == nil) && (path != dir+"/") {
  210. list = append(list, &downloadListEntry{addr: addr, path: path})
  211. }
  212. })
  213. if err != nil {
  214. return err
  215. }
  216. wg := sync.WaitGroup{}
  217. errC := make(chan error)
  218. done := make(chan bool, maxParallelFiles)
  219. for i, entry := range list {
  220. select {
  221. case done <- true:
  222. wg.Add(1)
  223. case <-quitC:
  224. return fmt.Errorf("aborted")
  225. }
  226. go func(i int, entry *downloadListEntry) {
  227. defer wg.Done()
  228. err := retrieveToFile(quitC, fs.api.fileStore, entry.addr, entry.path)
  229. if err != nil {
  230. select {
  231. case errC <- err:
  232. case <-quitC:
  233. }
  234. return
  235. }
  236. <-done
  237. }(i, entry)
  238. }
  239. go func() {
  240. wg.Wait()
  241. close(errC)
  242. }()
  243. select {
  244. case err = <-errC:
  245. return err
  246. case <-quitC:
  247. return fmt.Errorf("aborted")
  248. }
  249. }
  250. func retrieveToFile(quitC chan bool, fileStore *storage.FileStore, addr storage.Address, path string) error {
  251. f, err := os.Create(path) // TODO: basePath separators
  252. if err != nil {
  253. return err
  254. }
  255. reader, _ := fileStore.Retrieve(context.TODO(), addr)
  256. writer := bufio.NewWriter(f)
  257. size, err := reader.Size(quitC)
  258. if err != nil {
  259. return err
  260. }
  261. if _, err = io.CopyN(writer, reader, size); err != nil {
  262. return err
  263. }
  264. if err := writer.Flush(); err != nil {
  265. return err
  266. }
  267. return f.Close()
  268. }