sftp_file.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. # Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
  2. #
  3. # This file is part of paramiko.
  4. #
  5. # Paramiko is free software; you can redistribute it and/or modify it under the
  6. # terms of the GNU Lesser General Public License as published by the Free
  7. # Software Foundation; either version 2.1 of the License, or (at your option)
  8. # any later version.
  9. #
  10. # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
  11. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
  13. # details.
  14. #
  15. # You should have received a copy of the GNU Lesser General Public License
  16. # along with Paramiko; if not, write to the Free Software Foundation, Inc.,
  17. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  18. """
  19. SFTP file object
  20. """
  21. from __future__ import with_statement
  22. from binascii import hexlify
  23. from collections import deque
  24. import socket
  25. import threading
  26. import time
  27. from paramiko.common import DEBUG
  28. from paramiko.file import BufferedFile
  29. from paramiko.py3compat import u, long
  30. from paramiko.sftp import (
  31. CMD_CLOSE,
  32. CMD_READ,
  33. CMD_DATA,
  34. SFTPError,
  35. CMD_WRITE,
  36. CMD_STATUS,
  37. CMD_FSTAT,
  38. CMD_ATTRS,
  39. CMD_FSETSTAT,
  40. CMD_EXTENDED,
  41. )
  42. from paramiko.sftp_attr import SFTPAttributes
  43. class SFTPFile(BufferedFile):
  44. """
  45. Proxy object for a file on the remote server, in client mode SFTP.
  46. Instances of this class may be used as context managers in the same way
  47. that built-in Python file objects are.
  48. """
  49. # Some sftp servers will choke if you send read/write requests larger than
  50. # this size.
  51. MAX_REQUEST_SIZE = 32768
  52. def __init__(self, sftp, handle, mode="r", bufsize=-1):
  53. BufferedFile.__init__(self)
  54. self.sftp = sftp
  55. self.handle = handle
  56. BufferedFile._set_mode(self, mode, bufsize)
  57. self.pipelined = False
  58. self._prefetching = False
  59. self._prefetch_done = False
  60. self._prefetch_data = {}
  61. self._prefetch_extents = {}
  62. self._prefetch_lock = threading.Lock()
  63. self._saved_exception = None
  64. self._reqs = deque()
  65. def __del__(self):
  66. self._close(async_=True)
  67. def close(self):
  68. """
  69. Close the file.
  70. """
  71. self._close(async_=False)
  72. def _close(self, async_=False):
  73. # We allow double-close without signaling an error, because real
  74. # Python file objects do. However, we must protect against actually
  75. # sending multiple CMD_CLOSE packets, because after we close our
  76. # handle, the same handle may be re-allocated by the server, and we
  77. # may end up mysteriously closing some random other file. (This is
  78. # especially important because we unconditionally call close() from
  79. # __del__.)
  80. if self._closed:
  81. return
  82. self.sftp._log(DEBUG, "close({})".format(u(hexlify(self.handle))))
  83. if self.pipelined:
  84. self.sftp._finish_responses(self)
  85. BufferedFile.close(self)
  86. try:
  87. if async_:
  88. # GC'd file handle could be called from an arbitrary thread
  89. # -- don't wait for a response
  90. self.sftp._async_request(type(None), CMD_CLOSE, self.handle)
  91. else:
  92. self.sftp._request(CMD_CLOSE, self.handle)
  93. except EOFError:
  94. # may have outlived the Transport connection
  95. pass
  96. except (IOError, socket.error):
  97. # may have outlived the Transport connection
  98. pass
  99. def _data_in_prefetch_requests(self, offset, size):
  100. k = [
  101. x for x in list(self._prefetch_extents.values()) if x[0] <= offset
  102. ]
  103. if len(k) == 0:
  104. return False
  105. k.sort(key=lambda x: x[0])
  106. buf_offset, buf_size = k[-1]
  107. if buf_offset + buf_size <= offset:
  108. # prefetch request ends before this one begins
  109. return False
  110. if buf_offset + buf_size >= offset + size:
  111. # inclusive
  112. return True
  113. # well, we have part of the request. see if another chunk has
  114. # the rest.
  115. return self._data_in_prefetch_requests(
  116. buf_offset + buf_size, offset + size - buf_offset - buf_size
  117. )
  118. def _data_in_prefetch_buffers(self, offset):
  119. """
  120. if a block of data is present in the prefetch buffers, at the given
  121. offset, return the offset of the relevant prefetch buffer. otherwise,
  122. return None. this guarantees nothing about the number of bytes
  123. collected in the prefetch buffer so far.
  124. """
  125. k = [i for i in self._prefetch_data.keys() if i <= offset]
  126. if len(k) == 0:
  127. return None
  128. index = max(k)
  129. buf_offset = offset - index
  130. if buf_offset >= len(self._prefetch_data[index]):
  131. # it's not here
  132. return None
  133. return index
  134. def _read_prefetch(self, size):
  135. """
  136. read data out of the prefetch buffer, if possible. if the data isn't
  137. in the buffer, return None. otherwise, behaves like a normal read.
  138. """
  139. # while not closed, and haven't fetched past the current position,
  140. # and haven't reached EOF...
  141. while True:
  142. offset = self._data_in_prefetch_buffers(self._realpos)
  143. if offset is not None:
  144. break
  145. if self._prefetch_done or self._closed:
  146. break
  147. self.sftp._read_response()
  148. self._check_exception()
  149. if offset is None:
  150. self._prefetching = False
  151. return None
  152. prefetch = self._prefetch_data[offset]
  153. del self._prefetch_data[offset]
  154. buf_offset = self._realpos - offset
  155. if buf_offset > 0:
  156. self._prefetch_data[offset] = prefetch[:buf_offset]
  157. prefetch = prefetch[buf_offset:]
  158. if size < len(prefetch):
  159. self._prefetch_data[self._realpos + size] = prefetch[size:]
  160. prefetch = prefetch[:size]
  161. return prefetch
  162. def _read(self, size):
  163. size = min(size, self.MAX_REQUEST_SIZE)
  164. if self._prefetching:
  165. data = self._read_prefetch(size)
  166. if data is not None:
  167. return data
  168. t, msg = self.sftp._request(
  169. CMD_READ, self.handle, long(self._realpos), int(size)
  170. )
  171. if t != CMD_DATA:
  172. raise SFTPError("Expected data")
  173. return msg.get_string()
  174. def _write(self, data):
  175. # may write less than requested if it would exceed max packet size
  176. chunk = min(len(data), self.MAX_REQUEST_SIZE)
  177. sftp_async_request = self.sftp._async_request(
  178. type(None),
  179. CMD_WRITE,
  180. self.handle,
  181. long(self._realpos),
  182. data[:chunk],
  183. )
  184. self._reqs.append(sftp_async_request)
  185. if not self.pipelined or (
  186. len(self._reqs) > 100 and self.sftp.sock.recv_ready()
  187. ):
  188. while len(self._reqs):
  189. req = self._reqs.popleft()
  190. t, msg = self.sftp._read_response(req)
  191. if t != CMD_STATUS:
  192. raise SFTPError("Expected status")
  193. # convert_status already called
  194. return chunk
  195. def settimeout(self, timeout):
  196. """
  197. Set a timeout on read/write operations on the underlying socket or
  198. ssh `.Channel`.
  199. :param float timeout:
  200. seconds to wait for a pending read/write operation before raising
  201. ``socket.timeout``, or ``None`` for no timeout
  202. .. seealso:: `.Channel.settimeout`
  203. """
  204. self.sftp.sock.settimeout(timeout)
  205. def gettimeout(self):
  206. """
  207. Returns the timeout in seconds (as a `float`) associated with the
  208. socket or ssh `.Channel` used for this file.
  209. .. seealso:: `.Channel.gettimeout`
  210. """
  211. return self.sftp.sock.gettimeout()
  212. def setblocking(self, blocking):
  213. """
  214. Set blocking or non-blocking mode on the underiying socket or ssh
  215. `.Channel`.
  216. :param int blocking:
  217. 0 to set non-blocking mode; non-0 to set blocking mode.
  218. .. seealso:: `.Channel.setblocking`
  219. """
  220. self.sftp.sock.setblocking(blocking)
  221. def seekable(self):
  222. """
  223. Check if the file supports random access.
  224. :return:
  225. `True` if the file supports random access. If `False`,
  226. :meth:`seek` will raise an exception
  227. """
  228. return True
  229. def seek(self, offset, whence=0):
  230. """
  231. Set the file's current position.
  232. See `file.seek` for details.
  233. """
  234. self.flush()
  235. if whence == self.SEEK_SET:
  236. self._realpos = self._pos = offset
  237. elif whence == self.SEEK_CUR:
  238. self._pos += offset
  239. self._realpos = self._pos
  240. else:
  241. self._realpos = self._pos = self._get_size() + offset
  242. self._rbuffer = bytes()
  243. def stat(self):
  244. """
  245. Retrieve information about this file from the remote system. This is
  246. exactly like `.SFTPClient.stat`, except that it operates on an
  247. already-open file.
  248. :returns:
  249. an `.SFTPAttributes` object containing attributes about this file.
  250. """
  251. t, msg = self.sftp._request(CMD_FSTAT, self.handle)
  252. if t != CMD_ATTRS:
  253. raise SFTPError("Expected attributes")
  254. return SFTPAttributes._from_msg(msg)
  255. def chmod(self, mode):
  256. """
  257. Change the mode (permissions) of this file. The permissions are
  258. unix-style and identical to those used by Python's `os.chmod`
  259. function.
  260. :param int mode: new permissions
  261. """
  262. self.sftp._log(
  263. DEBUG, "chmod({}, {!r})".format(hexlify(self.handle), mode)
  264. )
  265. attr = SFTPAttributes()
  266. attr.st_mode = mode
  267. self.sftp._request(CMD_FSETSTAT, self.handle, attr)
  268. def chown(self, uid, gid):
  269. """
  270. Change the owner (``uid``) and group (``gid``) of this file. As with
  271. Python's `os.chown` function, you must pass both arguments, so if you
  272. only want to change one, use `stat` first to retrieve the current
  273. owner and group.
  274. :param int uid: new owner's uid
  275. :param int gid: new group id
  276. """
  277. self.sftp._log(
  278. DEBUG,
  279. "chown({}, {!r}, {!r})".format(hexlify(self.handle), uid, gid),
  280. )
  281. attr = SFTPAttributes()
  282. attr.st_uid, attr.st_gid = uid, gid
  283. self.sftp._request(CMD_FSETSTAT, self.handle, attr)
  284. def utime(self, times):
  285. """
  286. Set the access and modified times of this file. If
  287. ``times`` is ``None``, then the file's access and modified times are
  288. set to the current time. Otherwise, ``times`` must be a 2-tuple of
  289. numbers, of the form ``(atime, mtime)``, which is used to set the
  290. access and modified times, respectively. This bizarre API is mimicked
  291. from Python for the sake of consistency -- I apologize.
  292. :param tuple times:
  293. ``None`` or a tuple of (access time, modified time) in standard
  294. internet epoch time (seconds since 01 January 1970 GMT)
  295. """
  296. if times is None:
  297. times = (time.time(), time.time())
  298. self.sftp._log(
  299. DEBUG, "utime({}, {!r})".format(hexlify(self.handle), times)
  300. )
  301. attr = SFTPAttributes()
  302. attr.st_atime, attr.st_mtime = times
  303. self.sftp._request(CMD_FSETSTAT, self.handle, attr)
  304. def truncate(self, size):
  305. """
  306. Change the size of this file. This usually extends
  307. or shrinks the size of the file, just like the ``truncate()`` method on
  308. Python file objects.
  309. :param size: the new size of the file
  310. """
  311. self.sftp._log(
  312. DEBUG, "truncate({}, {!r})".format(hexlify(self.handle), size)
  313. )
  314. attr = SFTPAttributes()
  315. attr.st_size = size
  316. self.sftp._request(CMD_FSETSTAT, self.handle, attr)
  317. def check(self, hash_algorithm, offset=0, length=0, block_size=0):
  318. """
  319. Ask the server for a hash of a section of this file. This can be used
  320. to verify a successful upload or download, or for various rsync-like
  321. operations.
  322. The file is hashed from ``offset``, for ``length`` bytes.
  323. If ``length`` is 0, the remainder of the file is hashed. Thus, if both
  324. ``offset`` and ``length`` are zero, the entire file is hashed.
  325. Normally, ``block_size`` will be 0 (the default), and this method will
  326. return a byte string representing the requested hash (for example, a
  327. string of length 16 for MD5, or 20 for SHA-1). If a non-zero
  328. ``block_size`` is given, each chunk of the file (from ``offset`` to
  329. ``offset + length``) of ``block_size`` bytes is computed as a separate
  330. hash. The hash results are all concatenated and returned as a single
  331. string.
  332. For example, ``check('sha1', 0, 1024, 512)`` will return a string of
  333. length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes
  334. of the file, and the last 20 bytes will be the SHA-1 of the next 512
  335. bytes.
  336. :param str hash_algorithm:
  337. the name of the hash algorithm to use (normally ``"sha1"`` or
  338. ``"md5"``)
  339. :param offset:
  340. offset into the file to begin hashing (0 means to start from the
  341. beginning)
  342. :param length:
  343. number of bytes to hash (0 means continue to the end of the file)
  344. :param int block_size:
  345. number of bytes to hash per result (must not be less than 256; 0
  346. means to compute only one hash of the entire segment)
  347. :return:
  348. `str` of bytes representing the hash of each block, concatenated
  349. together
  350. :raises:
  351. ``IOError`` -- if the server doesn't support the "check-file"
  352. extension, or possibly doesn't support the hash algorithm requested
  353. .. note:: Many (most?) servers don't support this extension yet.
  354. .. versionadded:: 1.4
  355. """
  356. t, msg = self.sftp._request(
  357. CMD_EXTENDED,
  358. "check-file",
  359. self.handle,
  360. hash_algorithm,
  361. long(offset),
  362. long(length),
  363. block_size,
  364. )
  365. msg.get_text() # ext
  366. msg.get_text() # alg
  367. data = msg.get_remainder()
  368. return data
  369. def set_pipelined(self, pipelined=True):
  370. """
  371. Turn on/off the pipelining of write operations to this file. When
  372. pipelining is on, paramiko won't wait for the server response after
  373. each write operation. Instead, they're collected as they come in. At
  374. the first non-write operation (including `.close`), all remaining
  375. server responses are collected. This means that if there was an error
  376. with one of your later writes, an exception might be thrown from within
  377. `.close` instead of `.write`.
  378. By default, files are not pipelined.
  379. :param bool pipelined:
  380. ``True`` if pipelining should be turned on for this file; ``False``
  381. otherwise
  382. .. versionadded:: 1.5
  383. """
  384. self.pipelined = pipelined
  385. def prefetch(self, file_size=None):
  386. """
  387. Pre-fetch the remaining contents of this file in anticipation of future
  388. `.read` calls. If reading the entire file, pre-fetching can
  389. dramatically improve the download speed by avoiding roundtrip latency.
  390. The file's contents are incrementally buffered in a background thread.
  391. The prefetched data is stored in a buffer until read via the `.read`
  392. method. Once data has been read, it's removed from the buffer. The
  393. data may be read in a random order (using `.seek`); chunks of the
  394. buffer that haven't been read will continue to be buffered.
  395. :param int file_size:
  396. When this is ``None`` (the default), this method calls `stat` to
  397. determine the remote file size. In some situations, doing so can
  398. cause exceptions or hangs (see `#562
  399. <https://github.com/paramiko/paramiko/pull/562>`_); as a
  400. workaround, one may call `stat` explicitly and pass its value in
  401. via this parameter.
  402. .. versionadded:: 1.5.1
  403. .. versionchanged:: 1.16.0
  404. The ``file_size`` parameter was added (with no default value).
  405. .. versionchanged:: 1.16.1
  406. The ``file_size`` parameter was made optional for backwards
  407. compatibility.
  408. """
  409. if file_size is None:
  410. file_size = self.stat().st_size
  411. # queue up async reads for the rest of the file
  412. chunks = []
  413. n = self._realpos
  414. while n < file_size:
  415. chunk = min(self.MAX_REQUEST_SIZE, file_size - n)
  416. chunks.append((n, chunk))
  417. n += chunk
  418. if len(chunks) > 0:
  419. self._start_prefetch(chunks)
  420. def readv(self, chunks):
  421. """
  422. Read a set of blocks from the file by (offset, length). This is more
  423. efficient than doing a series of `.seek` and `.read` calls, since the
  424. prefetch machinery is used to retrieve all the requested blocks at
  425. once.
  426. :param chunks:
  427. a list of ``(offset, length)`` tuples indicating which sections of
  428. the file to read
  429. :return: a list of blocks read, in the same order as in ``chunks``
  430. .. versionadded:: 1.5.4
  431. """
  432. self.sftp._log(
  433. DEBUG, "readv({}, {!r})".format(hexlify(self.handle), chunks)
  434. )
  435. read_chunks = []
  436. for offset, size in chunks:
  437. # don't fetch data that's already in the prefetch buffer
  438. if self._data_in_prefetch_buffers(
  439. offset
  440. ) or self._data_in_prefetch_requests(offset, size):
  441. continue
  442. # break up anything larger than the max read size
  443. while size > 0:
  444. chunk_size = min(size, self.MAX_REQUEST_SIZE)
  445. read_chunks.append((offset, chunk_size))
  446. offset += chunk_size
  447. size -= chunk_size
  448. self._start_prefetch(read_chunks)
  449. # now we can just devolve to a bunch of read()s :)
  450. for x in chunks:
  451. self.seek(x[0])
  452. yield self.read(x[1])
  453. # ...internals...
  454. def _get_size(self):
  455. try:
  456. return self.stat().st_size
  457. except:
  458. return 0
  459. def _start_prefetch(self, chunks):
  460. self._prefetching = True
  461. self._prefetch_done = False
  462. t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
  463. t.daemon = True
  464. t.start()
  465. def _prefetch_thread(self, chunks):
  466. # do these read requests in a temporary thread because there may be
  467. # a lot of them, so it may block.
  468. for offset, length in chunks:
  469. num = self.sftp._async_request(
  470. self, CMD_READ, self.handle, long(offset), int(length)
  471. )
  472. with self._prefetch_lock:
  473. self._prefetch_extents[num] = (offset, length)
  474. def _async_response(self, t, msg, num):
  475. if t == CMD_STATUS:
  476. # save exception and re-raise it on next file operation
  477. try:
  478. self.sftp._convert_status(msg)
  479. except Exception as e:
  480. self._saved_exception = e
  481. return
  482. if t != CMD_DATA:
  483. raise SFTPError("Expected data")
  484. data = msg.get_string()
  485. while True:
  486. with self._prefetch_lock:
  487. # spin if in race with _prefetch_thread
  488. if num in self._prefetch_extents:
  489. offset, length = self._prefetch_extents[num]
  490. self._prefetch_data[offset] = data
  491. del self._prefetch_extents[num]
  492. if len(self._prefetch_extents) == 0:
  493. self._prefetch_done = True
  494. break
  495. def _check_exception(self):
  496. """if there's a saved exception, raise & clear it"""
  497. if self._saved_exception is not None:
  498. x = self._saved_exception
  499. self._saved_exception = None
  500. raise x