filesystem.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package api
  17. import (
  18. "bufio"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "os"
  23. "path"
  24. "path/filepath"
  25. "sync"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/swarm/storage"
  29. )
  30. const maxParallelFiles = 5
  31. type FileSystem struct {
  32. api *Api
  33. }
  34. func NewFileSystem(api *Api) *FileSystem {
  35. return &FileSystem{api}
  36. }
  37. // Upload replicates a local directory as a manifest file and uploads it
  38. // using dpa store
  39. // TODO: localpath should point to a manifest
  40. //
  41. // DEPRECATED: Use the HTTP API instead
  42. func (self *FileSystem) Upload(lpath, index string) (string, error) {
  43. var list []*manifestTrieEntry
  44. localpath, err := filepath.Abs(filepath.Clean(lpath))
  45. if err != nil {
  46. return "", err
  47. }
  48. f, err := os.Open(localpath)
  49. if err != nil {
  50. return "", err
  51. }
  52. stat, err := f.Stat()
  53. if err != nil {
  54. return "", err
  55. }
  56. var start int
  57. if stat.IsDir() {
  58. start = len(localpath)
  59. log.Debug(fmt.Sprintf("uploading '%s'", localpath))
  60. err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
  61. if (err == nil) && !info.IsDir() {
  62. //fmt.Printf("lp %s path %s\n", localpath, path)
  63. if len(path) <= start {
  64. return fmt.Errorf("Path is too short")
  65. }
  66. if path[:start] != localpath {
  67. return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath)
  68. }
  69. entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(path)}, nil)
  70. list = append(list, entry)
  71. }
  72. return err
  73. })
  74. if err != nil {
  75. return "", err
  76. }
  77. } else {
  78. dir := filepath.Dir(localpath)
  79. start = len(dir)
  80. if len(localpath) <= start {
  81. return "", fmt.Errorf("Path is too short")
  82. }
  83. if localpath[:start] != dir {
  84. return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir)
  85. }
  86. entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(localpath)}, nil)
  87. list = append(list, entry)
  88. }
  89. cnt := len(list)
  90. errors := make([]error, cnt)
  91. done := make(chan bool, maxParallelFiles)
  92. dcnt := 0
  93. awg := &sync.WaitGroup{}
  94. for i, entry := range list {
  95. if i >= dcnt+maxParallelFiles {
  96. <-done
  97. dcnt++
  98. }
  99. awg.Add(1)
  100. go func(i int, entry *manifestTrieEntry, done chan bool) {
  101. f, err := os.Open(entry.Path)
  102. if err == nil {
  103. stat, _ := f.Stat()
  104. var hash storage.Key
  105. wg := &sync.WaitGroup{}
  106. hash, err = self.api.dpa.Store(f, stat.Size(), wg, nil)
  107. if hash != nil {
  108. list[i].Hash = hash.String()
  109. }
  110. wg.Wait()
  111. awg.Done()
  112. if err == nil {
  113. first512 := make([]byte, 512)
  114. fread, _ := f.ReadAt(first512, 0)
  115. if fread > 0 {
  116. mimeType := http.DetectContentType(first512[:fread])
  117. if filepath.Ext(entry.Path) == ".css" {
  118. mimeType = "text/css"
  119. }
  120. list[i].ContentType = mimeType
  121. }
  122. }
  123. f.Close()
  124. }
  125. errors[i] = err
  126. done <- true
  127. }(i, entry, done)
  128. }
  129. for dcnt < cnt {
  130. <-done
  131. dcnt++
  132. }
  133. trie := &manifestTrie{
  134. dpa: self.api.dpa,
  135. }
  136. quitC := make(chan bool)
  137. for i, entry := range list {
  138. if errors[i] != nil {
  139. return "", errors[i]
  140. }
  141. entry.Path = RegularSlashes(entry.Path[start:])
  142. if entry.Path == index {
  143. ientry := newManifestTrieEntry(&ManifestEntry{
  144. ContentType: entry.ContentType,
  145. }, nil)
  146. ientry.Hash = entry.Hash
  147. trie.addEntry(ientry, quitC)
  148. }
  149. trie.addEntry(entry, quitC)
  150. }
  151. err2 := trie.recalcAndStore()
  152. var hs string
  153. if err2 == nil {
  154. hs = trie.hash.String()
  155. }
  156. awg.Wait()
  157. return hs, err2
  158. }
  159. // Download replicates the manifest path structure on the local filesystem
  160. // under localpath
  161. //
  162. // DEPRECATED: Use the HTTP API instead
  163. func (self *FileSystem) Download(bzzpath, localpath string) error {
  164. lpath, err := filepath.Abs(filepath.Clean(localpath))
  165. if err != nil {
  166. return err
  167. }
  168. err = os.MkdirAll(lpath, os.ModePerm)
  169. if err != nil {
  170. return err
  171. }
  172. //resolving host and port
  173. uri, err := Parse(path.Join("bzz:/", bzzpath))
  174. if err != nil {
  175. return err
  176. }
  177. key, err := self.api.Resolve(uri)
  178. if err != nil {
  179. return err
  180. }
  181. path := uri.Path
  182. if len(path) > 0 {
  183. path += "/"
  184. }
  185. quitC := make(chan bool)
  186. trie, err := loadManifest(self.api.dpa, key, quitC)
  187. if err != nil {
  188. log.Warn(fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err))
  189. return err
  190. }
  191. type downloadListEntry struct {
  192. key storage.Key
  193. path string
  194. }
  195. var list []*downloadListEntry
  196. var mde error
  197. prevPath := lpath
  198. err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
  199. log.Trace(fmt.Sprintf("fs.Download: %#v", entry))
  200. key = common.Hex2Bytes(entry.Hash)
  201. path := lpath + "/" + suffix
  202. dir := filepath.Dir(path)
  203. if dir != prevPath {
  204. mde = os.MkdirAll(dir, os.ModePerm)
  205. prevPath = dir
  206. }
  207. if (mde == nil) && (path != dir+"/") {
  208. list = append(list, &downloadListEntry{key: key, path: path})
  209. }
  210. })
  211. if err != nil {
  212. return err
  213. }
  214. wg := sync.WaitGroup{}
  215. errC := make(chan error)
  216. done := make(chan bool, maxParallelFiles)
  217. for i, entry := range list {
  218. select {
  219. case done <- true:
  220. wg.Add(1)
  221. case <-quitC:
  222. return fmt.Errorf("aborted")
  223. }
  224. go func(i int, entry *downloadListEntry) {
  225. defer wg.Done()
  226. err := retrieveToFile(quitC, self.api.dpa, entry.key, entry.path)
  227. if err != nil {
  228. select {
  229. case errC <- err:
  230. case <-quitC:
  231. }
  232. return
  233. }
  234. <-done
  235. }(i, entry)
  236. }
  237. go func() {
  238. wg.Wait()
  239. close(errC)
  240. }()
  241. select {
  242. case err = <-errC:
  243. return err
  244. case <-quitC:
  245. return fmt.Errorf("aborted")
  246. }
  247. }
  248. func retrieveToFile(quitC chan bool, dpa *storage.DPA, key storage.Key, path string) error {
  249. f, err := os.Create(path) // TODO: path separators
  250. if err != nil {
  251. return err
  252. }
  253. reader := dpa.Retrieve(key)
  254. writer := bufio.NewWriter(f)
  255. size, err := reader.Size(quitC)
  256. if err != nil {
  257. return err
  258. }
  259. if _, err = io.CopyN(writer, reader, size); err != nil {
  260. return err
  261. }
  262. if err := writer.Flush(); err != nil {
  263. return err
  264. }
  265. return f.Close()
  266. }