Viewing file: test_largefile.py (10.17 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
"""Test largefile support on system where this makes sense. """
import os import stat import sys import unittest import socket import shutil import threading from test.support import requires, bigmemtest, requires_resource from test.support import SHORT_TIMEOUT from test.support import socket_helper from test.support.os_helper import TESTFN, unlink import io # C implementation of io import _pyio as pyio # Python implementation of io
# size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes) size = 2_500_000_000 TESTFN2 = TESTFN + '2'
class LargeFileTest:
def setUp(self): if os.path.exists(TESTFN): mode = 'r+b' else: mode = 'w+b'
with self.open(TESTFN, mode) as f: current_size = os.fstat(f.fileno())[stat.ST_SIZE] if current_size == size+1: return
if current_size == 0: f.write(b'z')
f.seek(0) f.seek(size) f.write(b'a') f.flush() self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1)
@classmethod def tearDownClass(cls): with cls.open(TESTFN, 'wb'): pass if not os.stat(TESTFN)[stat.ST_SIZE] == 0: raise cls.failureException('File was not truncated by opening ' 'with mode "wb"') unlink(TESTFN2)
class TestFileMethods(LargeFileTest): """Test that each file function works as expected for large (i.e. > 2 GiB) files. """
# _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes, # so memuse=2 is needed @bigmemtest(size=size, memuse=2, dry_run=False) def test_large_read(self, _size): # bpo-24658: Test that a read greater than 2GB does not fail. with self.open(TESTFN, "rb") as f: self.assertEqual(len(f.read()), size + 1) self.assertEqual(f.tell(), size + 1)
def test_osstat(self): self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1)
def test_seek_read(self): with self.open(TESTFN, 'rb') as f: self.assertEqual(f.tell(), 0) self.assertEqual(f.read(1), b'z') self.assertEqual(f.tell(), 1) f.seek(0) self.assertEqual(f.tell(), 0) f.seek(0, 0) self.assertEqual(f.tell(), 0) f.seek(42) self.assertEqual(f.tell(), 42) f.seek(42, 0) self.assertEqual(f.tell(), 42) f.seek(42, 1) self.assertEqual(f.tell(), 84) f.seek(0, 1) self.assertEqual(f.tell(), 84) f.seek(0, 2) # seek from the end self.assertEqual(f.tell(), size + 1 + 0) f.seek(-10, 2) self.assertEqual(f.tell(), size + 1 - 10) f.seek(-size-1, 2) self.assertEqual(f.tell(), 0) f.seek(size) self.assertEqual(f.tell(), size) # the 'a' that was written at the end of file above self.assertEqual(f.read(1), b'a') f.seek(-size-1, 1) self.assertEqual(f.read(1), b'z') self.assertEqual(f.tell(), 1)
def test_lseek(self): with self.open(TESTFN, 'rb') as f: self.assertEqual(os.lseek(f.fileno(), 0, 0), 0) self.assertEqual(os.lseek(f.fileno(), 42, 0), 42) self.assertEqual(os.lseek(f.fileno(), 42, 1), 84) self.assertEqual(os.lseek(f.fileno(), 0, 1), 84) self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0) self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10) self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0) self.assertEqual(os.lseek(f.fileno(), size, 0), size) # the 'a' that was written at the end of file above self.assertEqual(f.read(1), b'a')
def test_truncate(self): with self.open(TESTFN, 'r+b') as f: if not hasattr(f, 'truncate'): raise unittest.SkipTest("open().truncate() not available " "on this system") f.seek(0, 2) # else we've lost track of the true size self.assertEqual(f.tell(), size+1) # Cut it back via seek + truncate with no argument. newsize = size - 10 f.seek(newsize) f.truncate() self.assertEqual(f.tell(), newsize) # else pointer moved f.seek(0, 2) self.assertEqual(f.tell(), newsize) # else wasn't truncated # Ensure that truncate(smaller than true size) shrinks # the file. newsize -= 1 f.seek(42) f.truncate(newsize) self.assertEqual(f.tell(), 42) f.seek(0, 2) self.assertEqual(f.tell(), newsize) # XXX truncate(larger than true size) is ill-defined # across platform; cut it waaaaay back f.seek(0) f.truncate(1) self.assertEqual(f.tell(), 0) # else pointer moved f.seek(0) self.assertEqual(len(f.read()), 1) # else wasn't truncated
def test_seekable(self): # Issue #5016; seekable() can return False when the current position # is negative when truncated to an int. for pos in (2**31-1, 2**31, 2**31+1): with self.open(TESTFN, 'rb') as f: f.seek(pos) self.assertTrue(f.seekable())
def skip_no_disk_space(path, required): def decorator(fun): def wrapper(*args, **kwargs): if not hasattr(shutil, "disk_usage"): raise unittest.SkipTest("requires shutil.disk_usage") if shutil.disk_usage(os.path.realpath(path)).free < required: hsize = int(required / 1024 / 1024) raise unittest.SkipTest( f"required {hsize} MiB of free disk space") return fun(*args, **kwargs) return wrapper return decorator
class TestCopyfile(LargeFileTest, unittest.TestCase): open = staticmethod(io.open)
# Exact required disk space would be (size * 2), but let's give it a # bit more tolerance. @skip_no_disk_space(TESTFN, size * 2.5) @requires_resource('cpu') def test_it(self): # Internally shutil.copyfile() can use "fast copy" methods like # os.sendfile(). size = os.path.getsize(TESTFN) shutil.copyfile(TESTFN, TESTFN2) self.assertEqual(os.path.getsize(TESTFN2), size) with open(TESTFN2, 'rb') as f: self.assertEqual(f.read(5), b'z\x00\x00\x00\x00') f.seek(size - 5) self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
@unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported') class TestSocketSendfile(LargeFileTest, unittest.TestCase): open = staticmethod(io.open) timeout = SHORT_TIMEOUT
def setUp(self): super().setUp() self.thread = None
def tearDown(self): super().tearDown() if self.thread is not None: self.thread.join(self.timeout) self.thread = None
def tcp_server(self, sock): def run(sock): with sock: conn, _ = sock.accept() conn.settimeout(self.timeout) with conn, open(TESTFN2, 'wb') as f: event.wait(self.timeout) while True: chunk = conn.recv(65536) if not chunk: return f.write(chunk)
event = threading.Event() sock.settimeout(self.timeout) self.thread = threading.Thread(target=run, args=(sock, )) self.thread.start() event.set()
# Exact required disk space would be (size * 2), but let's give it a # bit more tolerance. @skip_no_disk_space(TESTFN, size * 2.5) @requires_resource('cpu') def test_it(self): port = socket_helper.find_unused_port() with socket.create_server(("", port)) as sock: self.tcp_server(sock) with socket.create_connection(("127.0.0.1", port)) as client: with open(TESTFN, 'rb') as f: client.sendfile(f) self.tearDown()
size = os.path.getsize(TESTFN) self.assertEqual(os.path.getsize(TESTFN2), size) with open(TESTFN2, 'rb') as f: self.assertEqual(f.read(5), b'z\x00\x00\x00\x00') f.seek(size - 5) self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
def setUpModule(): try: import signal # The default handler for SIGXFSZ is to abort the process. # By ignoring it, system calls exceeding the file size resource # limit will raise OSError instead of crashing the interpreter. signal.signal(signal.SIGXFSZ, signal.SIG_IGN) except (ImportError, AttributeError): pass
# On Windows and Mac OSX this test consumes large resources; It # takes a long time to build the >2 GiB file and takes >2 GiB of disk # space therefore the resource must be enabled to run this test. # If not, nothing after this line stanza will be executed. if sys.platform[:3] == 'win' or sys.platform == 'darwin': requires('largefile', 'test requires %s bytes and a long time to run' % str(size)) else: # Only run if the current filesystem supports large files. # (Skip this test on Windows, since we now always support # large files.) f = open(TESTFN, 'wb', buffering=0) try: # 2**31 == 2147483648 f.seek(2147483649) # Seeking is not enough of a test: you must write and flush, too! f.write(b'x') f.flush() except (OSError, OverflowError): raise unittest.SkipTest("filesystem does not have " "largefile support") finally: f.close() unlink(TESTFN)
class CLargeFileTest(TestFileMethods, unittest.TestCase): open = staticmethod(io.open)
class PyLargeFileTest(TestFileMethods, unittest.TestCase): open = staticmethod(pyio.open)
def tearDownModule(): unlink(TESTFN) unlink(TESTFN2)
if __name__ == '__main__': unittest.main()
|