index.js 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. 'use strict'
  2. const MiniPass = require('minipass')
  3. const EE = require('events').EventEmitter
  4. const fs = require('fs')
  5. // for writev
  6. const binding = process.binding('fs')
  7. const writeBuffers = binding.writeBuffers
  8. /* istanbul ignore next */
  9. const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback
  10. const _autoClose = Symbol('_autoClose')
  11. const _close = Symbol('_close')
  12. const _ended = Symbol('_ended')
  13. const _fd = Symbol('_fd')
  14. const _finished = Symbol('_finished')
  15. const _flags = Symbol('_flags')
  16. const _flush = Symbol('_flush')
  17. const _handleChunk = Symbol('_handleChunk')
  18. const _makeBuf = Symbol('_makeBuf')
  19. const _mode = Symbol('_mode')
  20. const _needDrain = Symbol('_needDrain')
  21. const _onerror = Symbol('_onerror')
  22. const _onopen = Symbol('_onopen')
  23. const _onread = Symbol('_onread')
  24. const _onwrite = Symbol('_onwrite')
  25. const _open = Symbol('_open')
  26. const _path = Symbol('_path')
  27. const _pos = Symbol('_pos')
  28. const _queue = Symbol('_queue')
  29. const _read = Symbol('_read')
  30. const _readSize = Symbol('_readSize')
  31. const _reading = Symbol('_reading')
  32. const _remain = Symbol('_remain')
  33. const _size = Symbol('_size')
  34. const _write = Symbol('_write')
  35. const _writing = Symbol('_writing')
  36. const _defaultFlag = Symbol('_defaultFlag')
  37. class ReadStream extends MiniPass {
  38. constructor (path, opt) {
  39. opt = opt || {}
  40. super(opt)
  41. this.writable = false
  42. if (typeof path !== 'string')
  43. throw new TypeError('path must be a string')
  44. this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
  45. this[_path] = path
  46. this[_readSize] = opt.readSize || 16*1024*1024
  47. this[_reading] = false
  48. this[_size] = typeof opt.size === 'number' ? opt.size : Infinity
  49. this[_remain] = this[_size]
  50. this[_autoClose] = typeof opt.autoClose === 'boolean' ?
  51. opt.autoClose : true
  52. if (typeof this[_fd] === 'number')
  53. this[_read]()
  54. else
  55. this[_open]()
  56. }
  57. get fd () { return this[_fd] }
  58. get path () { return this[_path] }
  59. write () {
  60. throw new TypeError('this is a readable stream')
  61. }
  62. end () {
  63. throw new TypeError('this is a readable stream')
  64. }
  65. [_open] () {
  66. fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))
  67. }
  68. [_onopen] (er, fd) {
  69. if (er)
  70. this[_onerror](er)
  71. else {
  72. this[_fd] = fd
  73. this.emit('open', fd)
  74. this[_read]()
  75. }
  76. }
  77. [_makeBuf] () {
  78. return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))
  79. }
  80. [_read] () {
  81. if (!this[_reading]) {
  82. this[_reading] = true
  83. const buf = this[_makeBuf]()
  84. /* istanbul ignore if */
  85. if (buf.length === 0) return process.nextTick(() => this[_onread](null, 0, buf))
  86. fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) =>
  87. this[_onread](er, br, buf))
  88. }
  89. }
  90. [_onread] (er, br, buf) {
  91. this[_reading] = false
  92. if (er)
  93. this[_onerror](er)
  94. else if (this[_handleChunk](br, buf))
  95. this[_read]()
  96. }
  97. [_close] () {
  98. if (this[_autoClose] && typeof this[_fd] === 'number') {
  99. fs.close(this[_fd], _ => this.emit('close'))
  100. this[_fd] = null
  101. }
  102. }
  103. [_onerror] (er) {
  104. this[_reading] = true
  105. this[_close]()
  106. this.emit('error', er)
  107. }
  108. [_handleChunk] (br, buf) {
  109. let ret = false
  110. // no effect if infinite
  111. this[_remain] -= br
  112. if (br > 0)
  113. ret = super.write(br < buf.length ? buf.slice(0, br) : buf)
  114. if (br === 0 || this[_remain] <= 0) {
  115. ret = false
  116. this[_close]()
  117. super.end()
  118. }
  119. return ret
  120. }
  121. emit (ev, data) {
  122. switch (ev) {
  123. case 'prefinish':
  124. case 'finish':
  125. break
  126. case 'drain':
  127. if (typeof this[_fd] === 'number')
  128. this[_read]()
  129. break
  130. default:
  131. return super.emit(ev, data)
  132. }
  133. }
  134. }
  135. class ReadStreamSync extends ReadStream {
  136. [_open] () {
  137. let threw = true
  138. try {
  139. this[_onopen](null, fs.openSync(this[_path], 'r'))
  140. threw = false
  141. } finally {
  142. if (threw)
  143. this[_close]()
  144. }
  145. }
  146. [_read] () {
  147. let threw = true
  148. try {
  149. if (!this[_reading]) {
  150. this[_reading] = true
  151. do {
  152. const buf = this[_makeBuf]()
  153. /* istanbul ignore next */
  154. const br = buf.length === 0 ? 0 : fs.readSync(this[_fd], buf, 0, buf.length, null)
  155. if (!this[_handleChunk](br, buf))
  156. break
  157. } while (true)
  158. this[_reading] = false
  159. }
  160. threw = false
  161. } finally {
  162. if (threw)
  163. this[_close]()
  164. }
  165. }
  166. [_close] () {
  167. if (this[_autoClose] && typeof this[_fd] === 'number') {
  168. try {
  169. fs.closeSync(this[_fd])
  170. } catch (er) {}
  171. this[_fd] = null
  172. this.emit('close')
  173. }
  174. }
  175. }
  176. class WriteStream extends EE {
  177. constructor (path, opt) {
  178. opt = opt || {}
  179. super(opt)
  180. this.readable = false
  181. this[_writing] = false
  182. this[_ended] = false
  183. this[_needDrain] = false
  184. this[_queue] = []
  185. this[_path] = path
  186. this[_fd] = typeof opt.fd === 'number' ? opt.fd : null
  187. this[_mode] = opt.mode === undefined ? 0o666 : opt.mode
  188. this[_pos] = typeof opt.start === 'number' ? opt.start : null
  189. this[_autoClose] = typeof opt.autoClose === 'boolean' ?
  190. opt.autoClose : true
  191. // truncating makes no sense when writing into the middle
  192. const defaultFlag = this[_pos] !== null ? 'r+' : 'w'
  193. this[_defaultFlag] = opt.flags === undefined
  194. this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags
  195. if (this[_fd] === null)
  196. this[_open]()
  197. }
  198. get fd () { return this[_fd] }
  199. get path () { return this[_path] }
  200. [_onerror] (er) {
  201. this[_close]()
  202. this[_writing] = true
  203. this.emit('error', er)
  204. }
  205. [_open] () {
  206. fs.open(this[_path], this[_flags], this[_mode],
  207. (er, fd) => this[_onopen](er, fd))
  208. }
  209. [_onopen] (er, fd) {
  210. if (this[_defaultFlag] &&
  211. this[_flags] === 'r+' &&
  212. er && er.code === 'ENOENT') {
  213. this[_flags] = 'w'
  214. this[_open]()
  215. } else if (er)
  216. this[_onerror](er)
  217. else {
  218. this[_fd] = fd
  219. this.emit('open', fd)
  220. this[_flush]()
  221. }
  222. }
  223. end (buf, enc) {
  224. if (buf)
  225. this.write(buf, enc)
  226. this[_ended] = true
  227. // synthetic after-write logic, where drain/finish live
  228. if (!this[_writing] && !this[_queue].length &&
  229. typeof this[_fd] === 'number')
  230. this[_onwrite](null, 0)
  231. }
  232. write (buf, enc) {
  233. if (typeof buf === 'string')
  234. buf = new Buffer(buf, enc)
  235. if (this[_ended]) {
  236. this.emit('error', new Error('write() after end()'))
  237. return false
  238. }
  239. if (this[_fd] === null || this[_writing] || this[_queue].length) {
  240. this[_queue].push(buf)
  241. this[_needDrain] = true
  242. return false
  243. }
  244. this[_writing] = true
  245. this[_write](buf)
  246. return true
  247. }
  248. [_write] (buf) {
  249. fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>
  250. this[_onwrite](er, bw))
  251. }
  252. [_onwrite] (er, bw) {
  253. if (er)
  254. this[_onerror](er)
  255. else {
  256. if (this[_pos] !== null)
  257. this[_pos] += bw
  258. if (this[_queue].length)
  259. this[_flush]()
  260. else {
  261. this[_writing] = false
  262. if (this[_ended] && !this[_finished]) {
  263. this[_finished] = true
  264. this[_close]()
  265. this.emit('finish')
  266. } else if (this[_needDrain]) {
  267. this[_needDrain] = false
  268. this.emit('drain')
  269. }
  270. }
  271. }
  272. }
  273. [_flush] () {
  274. if (this[_queue].length === 0) {
  275. if (this[_ended])
  276. this[_onwrite](null, 0)
  277. } else if (this[_queue].length === 1)
  278. this[_write](this[_queue].pop())
  279. else {
  280. const iovec = this[_queue]
  281. this[_queue] = []
  282. writev(this[_fd], iovec, this[_pos],
  283. (er, bw) => this[_onwrite](er, bw))
  284. }
  285. }
  286. [_close] () {
  287. if (this[_autoClose] && typeof this[_fd] === 'number') {
  288. fs.close(this[_fd], _ => this.emit('close'))
  289. this[_fd] = null
  290. }
  291. }
  292. }
  293. class WriteStreamSync extends WriteStream {
  294. [_open] () {
  295. let fd
  296. try {
  297. fd = fs.openSync(this[_path], this[_flags], this[_mode])
  298. } catch (er) {
  299. if (this[_defaultFlag] &&
  300. this[_flags] === 'r+' &&
  301. er && er.code === 'ENOENT') {
  302. this[_flags] = 'w'
  303. return this[_open]()
  304. } else
  305. throw er
  306. }
  307. this[_onopen](null, fd)
  308. }
  309. [_close] () {
  310. if (this[_autoClose] && typeof this[_fd] === 'number') {
  311. try {
  312. fs.closeSync(this[_fd])
  313. } catch (er) {}
  314. this[_fd] = null
  315. this.emit('close')
  316. }
  317. }
  318. [_write] (buf) {
  319. try {
  320. this[_onwrite](null,
  321. fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))
  322. } catch (er) {
  323. this[_onwrite](er, 0)
  324. }
  325. }
  326. }
  327. const writev = (fd, iovec, pos, cb) => {
  328. const done = (er, bw) => cb(er, bw, iovec)
  329. const req = new FSReqWrap()
  330. req.oncomplete = done
  331. binding.writeBuffers(fd, iovec, pos, req)
  332. }
  333. exports.ReadStream = ReadStream
  334. exports.ReadStreamSync = ReadStreamSync
  335. exports.WriteStream = WriteStream
  336. exports.WriteStreamSync = WriteStreamSync