| Viewing file:  test_file.py (13.26 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
#!/usr/bin/env python#
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
 # Copyright (c) 2008-2016 California Institute of Technology.
 # Copyright (c) 2016-2023 The Uncertainty Quantification Foundation.
 # License: 3-clause BSD.  The full license text is available at:
 #  - https://github.com/uqfoundation/dill/blob/master/LICENSE
 
 import os
 import sys
 import string
 import random
 
 import dill
 
 
 dill.settings['recurse'] = True
 
 fname = "_test_file.txt"
 rand_chars = list(string.ascii_letters) + ["\n"] * 40  # bias newline
 
 buffer_error = ValueError("invalid buffer size")
 dne_error = FileNotFoundError("[Errno 2] No such file or directory: '%s'" % fname)
 
 
 def write_randomness(number=200):
 f = open(fname, "w")
 for i in range(number):
 f.write(random.choice(rand_chars))
 f.close()
 f = open(fname, "r")
 contents = f.read()
 f.close()
 return contents
 
 
 def trunc_file():
 open(fname, "w").close()
 
 
 def throws(op, args, exc):
 try:
 op(*args)
 except type(exc):
 return sys.exc_info()[1].args == exc.args
 else:
 return False
 
 
 def teardown_module():
 if os.path.exists(fname):
 os.remove(fname)
 
 
 def bench(strictio, fmode, skippypy):
 import platform
 if skippypy and platform.python_implementation() == 'PyPy':
 # Skip for PyPy...
 return
 
 # file exists, with same contents
 # read
 
 write_randomness()
 
 f = open(fname, "r")
 _f = dill.loads(dill.dumps(f, fmode=fmode))#, strictio=strictio))
 assert _f.mode == f.mode
 assert _f.tell() == f.tell()
 assert _f.read() == f.read()
 f.close()
 _f.close()
 
 # write
 
 f = open(fname, "w")
 f.write("hello")
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 f1mode = f.mode
 ftell = f.tell()
 f.close()
 f2 = dill.loads(f_dumped) #FIXME: fails due to pypy/issues/1233
 # TypeError: expected py_object instance instead of str
 f2mode = f2.mode
 f2tell = f2.tell()
 f2name = f2.name
 f2.write(" world!")
 f2.close()
 
 if fmode == dill.HANDLE_FMODE:
 assert open(fname).read() == " world!"
 assert f2mode == f1mode
 assert f2tell == 0
 elif fmode == dill.CONTENTS_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2mode == f1mode
 assert f2tell == ftell
 assert f2name == fname
 elif fmode == dill.FILE_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2mode == f1mode
 assert f2tell == ftell
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 
 # append
 
 trunc_file()
 
 f = open(fname, "a")
 f.write("hello")
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 f1mode = f.mode
 ftell = f.tell()
 f.close()
 f2 = dill.loads(f_dumped)
 f2mode = f2.mode
 f2tell = f2.tell()
 f2.write(" world!")
 f2.close()
 
 assert f2mode == f1mode
 if fmode == dill.CONTENTS_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2tell == ftell
 elif fmode == dill.HANDLE_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2tell == ftell
 elif fmode == dill.FILE_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2tell == ftell
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 
 # file exists, with different contents (smaller size)
 # read
 
 write_randomness()
 
 f = open(fname, "r")
 fstr = f.read()
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 f1mode = f.mode
 ftell = f.tell()
 f.close()
 _flen = 150
 _fstr = write_randomness(number=_flen)
 
 if strictio:  # throw error if ftell > EOF
 assert throws(dill.loads, (f_dumped,), buffer_error)
 else:
 f2 = dill.loads(f_dumped)
 assert f2.mode == f1mode
 if fmode == dill.CONTENTS_FMODE:
 assert f2.tell() == _flen
 assert f2.read() == ""
 f2.seek(0)
 assert f2.read() == _fstr
 assert f2.tell() == _flen  # 150
 elif fmode == dill.HANDLE_FMODE:
 assert f2.tell() == 0
 assert f2.read() == _fstr
 assert f2.tell() == _flen  # 150
 elif fmode == dill.FILE_FMODE:
 assert f2.tell() == ftell  # 200
 assert f2.read() == ""
 f2.seek(0)
 assert f2.read() == fstr
 assert f2.tell() == ftell  # 200
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 f2.close()
 
 # write
 
 write_randomness()
 
 f = open(fname, "w")
 f.write("hello")
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 f1mode = f.mode
 ftell = f.tell()
 f.close()
 fstr = open(fname).read()
 
 f = open(fname, "w")
 f.write("h")
 _ftell = f.tell()
 f.close()
 
 if strictio:  # throw error if ftell > EOF
 assert throws(dill.loads, (f_dumped,), buffer_error)
 else:
 f2 = dill.loads(f_dumped)
 f2mode = f2.mode
 f2tell = f2.tell()
 f2.write(" world!")
 f2.close()
 if fmode == dill.CONTENTS_FMODE:
 assert open(fname).read() == "h world!"
 assert f2mode == f1mode
 assert f2tell == _ftell
 elif fmode == dill.HANDLE_FMODE:
 assert open(fname).read() == " world!"
 assert f2mode == f1mode
 assert f2tell == 0
 elif fmode == dill.FILE_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2mode == f1mode
 assert f2tell == ftell
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 f2.close()
 
 # append
 
 trunc_file()
 
 f = open(fname, "a")
 f.write("hello")
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 f1mode = f.mode
 ftell = f.tell()
 f.close()
 fstr = open(fname).read()
 
 f = open(fname, "w")
 f.write("h")
 _ftell = f.tell()
 f.close()
 
 if strictio:  # throw error if ftell > EOF
 assert throws(dill.loads, (f_dumped,), buffer_error)
 else:
 f2 = dill.loads(f_dumped)
 f2mode = f2.mode
 f2tell = f2.tell()
 f2.write(" world!")
 f2.close()
 assert f2mode == f1mode
 if fmode == dill.CONTENTS_FMODE:
 # position of writes cannot be changed on some OSs
 assert open(fname).read() == "h world!"
 assert f2tell == _ftell
 elif fmode == dill.HANDLE_FMODE:
 assert open(fname).read() == "h world!"
 assert f2tell == _ftell
 elif fmode == dill.FILE_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2tell == ftell
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 f2.close()
 
 # file does not exist
 # read
 
 write_randomness()
 
 f = open(fname, "r")
 fstr = f.read()
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 f1mode = f.mode
 ftell = f.tell()
 f.close()
 
 os.remove(fname)
 
 if strictio:  # throw error if file DNE
 assert throws(dill.loads, (f_dumped,), dne_error)
 else:
 f2 = dill.loads(f_dumped)
 assert f2.mode == f1mode
 if fmode == dill.CONTENTS_FMODE:
 # FIXME: this fails on systems where f2.tell() always returns 0
 # assert f2.tell() == ftell # 200
 assert f2.read() == ""
 f2.seek(0)
 assert f2.read() == ""
 assert f2.tell() == 0
 elif fmode == dill.FILE_FMODE:
 assert f2.tell() == ftell  # 200
 assert f2.read() == ""
 f2.seek(0)
 assert f2.read() == fstr
 assert f2.tell() == ftell  # 200
 elif fmode == dill.HANDLE_FMODE:
 assert f2.tell() == 0
 assert f2.read() == ""
 assert f2.tell() == 0
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 f2.close()
 
 # write
 
 write_randomness()
 
 f = open(fname, "w+")
 f.write("hello")
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 ftell = f.tell()
 f1mode = f.mode
 f.close()
 
 os.remove(fname)
 
 if strictio:  # throw error if file DNE
 assert throws(dill.loads, (f_dumped,), dne_error)
 else:
 f2 = dill.loads(f_dumped)
 f2mode = f2.mode
 f2tell = f2.tell()
 f2.write(" world!")
 f2.close()
 if fmode == dill.CONTENTS_FMODE:
 assert open(fname).read() == " world!"
 assert f2mode == 'w+'
 assert f2tell == 0
 elif fmode == dill.HANDLE_FMODE:
 assert open(fname).read() == " world!"
 assert f2mode == f1mode
 assert f2tell == 0
 elif fmode == dill.FILE_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2mode == f1mode
 assert f2tell == ftell
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 
 # append
 
 trunc_file()
 
 f = open(fname, "a")
 f.write("hello")
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 ftell = f.tell()
 f1mode = f.mode
 f.close()
 
 os.remove(fname)
 
 if strictio:  # throw error if file DNE
 assert throws(dill.loads, (f_dumped,), dne_error)
 else:
 f2 = dill.loads(f_dumped)
 f2mode = f2.mode
 f2tell = f2.tell()
 f2.write(" world!")
 f2.close()
 assert f2mode == f1mode
 if fmode == dill.CONTENTS_FMODE:
 assert open(fname).read() == " world!"
 assert f2tell == 0
 elif fmode == dill.HANDLE_FMODE:
 assert open(fname).read() == " world!"
 assert f2tell == 0
 elif fmode == dill.FILE_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2tell == ftell
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 
 # file exists, with different contents (larger size)
 # read
 
 write_randomness()
 
 f = open(fname, "r")
 fstr = f.read()
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 f1mode = f.mode
 ftell = f.tell()
 f.close()
 _flen = 250
 _fstr = write_randomness(number=_flen)
 
 # XXX: no safe_file: no way to be 'safe'?
 
 f2 = dill.loads(f_dumped)
 assert f2.mode == f1mode
 if fmode == dill.CONTENTS_FMODE:
 assert f2.tell() == ftell  # 200
 assert f2.read() == _fstr[ftell:]
 f2.seek(0)
 assert f2.read() == _fstr
 assert f2.tell() == _flen  # 250
 elif fmode == dill.HANDLE_FMODE:
 assert f2.tell() == 0
 assert f2.read() == _fstr
 assert f2.tell() == _flen  # 250
 elif fmode == dill.FILE_FMODE:
 assert f2.tell() == ftell  # 200
 assert f2.read() == ""
 f2.seek(0)
 assert f2.read() == fstr
 assert f2.tell() == ftell  # 200
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 f2.close()  # XXX: other alternatives?
 
 # write
 
 f = open(fname, "w")
 f.write("hello")
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 f1mode = f.mode
 ftell = f.tell()
 
 fstr = open(fname).read()
 
 f.write(" and goodbye!")
 _ftell = f.tell()
 f.close()
 
 # XXX: no safe_file: no way to be 'safe'?
 
 f2 = dill.loads(f_dumped)
 f2mode = f2.mode
 f2tell = f2.tell()
 f2.write(" world!")
 f2.close()
 if fmode == dill.CONTENTS_FMODE:
 assert open(fname).read() == "hello world!odbye!"
 assert f2mode == f1mode
 assert f2tell == ftell
 elif fmode == dill.HANDLE_FMODE:
 assert open(fname).read() == " world!"
 assert f2mode == f1mode
 assert f2tell == 0
 elif fmode == dill.FILE_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2mode == f1mode
 assert f2tell == ftell
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 f2.close()
 
 # append
 
 trunc_file()
 
 f = open(fname, "a")
 f.write("hello")
 f_dumped = dill.dumps(f, fmode=fmode)#, strictio=strictio)
 f1mode = f.mode
 ftell = f.tell()
 fstr = open(fname).read()
 
 f.write(" and goodbye!")
 _ftell = f.tell()
 f.close()
 
 # XXX: no safe_file: no way to be 'safe'?
 
 f2 = dill.loads(f_dumped)
 f2mode = f2.mode
 f2tell = f2.tell()
 f2.write(" world!")
 f2.close()
 assert f2mode == f1mode
 if fmode == dill.CONTENTS_FMODE:
 assert open(fname).read() == "hello and goodbye! world!"
 assert f2tell == ftell
 elif fmode == dill.HANDLE_FMODE:
 assert open(fname).read() == "hello and goodbye! world!"
 assert f2tell == _ftell
 elif fmode == dill.FILE_FMODE:
 assert open(fname).read() == "hello world!"
 assert f2tell == ftell
 else:
 raise RuntimeError("Unknown file mode '%s'" % fmode)
 f2.close()
 
 
 def test_nostrictio_handlefmode():
 bench(False, dill.HANDLE_FMODE, False)
 teardown_module()
 
 
 def test_nostrictio_filefmode():
 bench(False, dill.FILE_FMODE, False)
 teardown_module()
 
 
 def test_nostrictio_contentsfmode():
 bench(False, dill.CONTENTS_FMODE, True)
 teardown_module()
 
 
 #bench(True, dill.HANDLE_FMODE, False)
 #bench(True, dill.FILE_FMODE, False)
 #bench(True, dill.CONTENTS_FMODE, True)
 
 
 if __name__ == '__main__':
 test_nostrictio_handlefmode()
 test_nostrictio_filefmode()
 test_nostrictio_contentsfmode()
 
 |