123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- #
- # DEPRECATED: implementation for ffi.verify()
- #
- import sys, os, binascii, shutil, io
- from . import __version_verifier_modules__
- from . import ffiplatform
- from .error import VerificationError
- if sys.version_info >= (3, 3):
- import importlib.machinery
- def _extension_suffixes():
- return importlib.machinery.EXTENSION_SUFFIXES[:]
- else:
- import imp
- def _extension_suffixes():
- return [suffix for suffix, _, type in imp.get_suffixes()
- if type == imp.C_EXTENSION]
- if sys.version_info >= (3,):
- NativeIO = io.StringIO
- else:
- class NativeIO(io.BytesIO):
- def write(self, s):
- if isinstance(s, unicode):
- s = s.encode('ascii')
- super(NativeIO, self).write(s)
- class Verifier(object):
- def __init__(self, ffi, preamble, tmpdir=None, modulename=None,
- ext_package=None, tag='', force_generic_engine=False,
- source_extension='.c', flags=None, relative_to=None, **kwds):
- if ffi._parser._uses_new_feature:
- raise VerificationError(
- "feature not supported with ffi.verify(), but only "
- "with ffi.set_source(): %s" % (ffi._parser._uses_new_feature,))
- self.ffi = ffi
- self.preamble = preamble
- if not modulename:
- flattened_kwds = ffiplatform.flatten(kwds)
- vengine_class = _locate_engine_class(ffi, force_generic_engine)
- self._vengine = vengine_class(self)
- self._vengine.patch_extension_kwds(kwds)
- self.flags = flags
- self.kwds = self.make_relative_to(kwds, relative_to)
- #
- if modulename:
- if tag:
- raise TypeError("can't specify both 'modulename' and 'tag'")
- else:
- key = '\x00'.join(['%d.%d' % sys.version_info[:2],
- __version_verifier_modules__,
- preamble, flattened_kwds] +
- ffi._cdefsources)
- if sys.version_info >= (3,):
- key = key.encode('utf-8')
- k1 = hex(binascii.crc32(key[0::2]) & 0xffffffff)
- k1 = k1.lstrip('0x').rstrip('L')
- k2 = hex(binascii.crc32(key[1::2]) & 0xffffffff)
- k2 = k2.lstrip('0').rstrip('L')
- modulename = '_cffi_%s_%s%s%s' % (tag, self._vengine._class_key,
- k1, k2)
- suffix = _get_so_suffixes()[0]
- self.tmpdir = tmpdir or _caller_dir_pycache()
- self.sourcefilename = os.path.join(self.tmpdir, modulename + source_extension)
- self.modulefilename = os.path.join(self.tmpdir, modulename + suffix)
- self.ext_package = ext_package
- self._has_source = False
- self._has_module = False
- def write_source(self, file=None):
- """Write the C source code. It is produced in 'self.sourcefilename',
- which can be tweaked beforehand."""
- with self.ffi._lock:
- if self._has_source and file is None:
- raise VerificationError(
- "source code already written")
- self._write_source(file)
- def compile_module(self):
- """Write the C source code (if not done already) and compile it.
- This produces a dynamic link library in 'self.modulefilename'."""
- with self.ffi._lock:
- if self._has_module:
- raise VerificationError("module already compiled")
- if not self._has_source:
- self._write_source()
- self._compile_module()
- def load_library(self):
- """Get a C module from this Verifier instance.
- Returns an instance of a FFILibrary class that behaves like the
- objects returned by ffi.dlopen(), but that delegates all
- operations to the C module. If necessary, the C code is written
- and compiled first.
- """
- with self.ffi._lock:
- if not self._has_module:
- self._locate_module()
- if not self._has_module:
- if not self._has_source:
- self._write_source()
- self._compile_module()
- return self._load_library()
- def get_module_name(self):
- basename = os.path.basename(self.modulefilename)
- # kill both the .so extension and the other .'s, as introduced
- # by Python 3: 'basename.cpython-33m.so'
- basename = basename.split('.', 1)[0]
- # and the _d added in Python 2 debug builds --- but try to be
- # conservative and not kill a legitimate _d
- if basename.endswith('_d') and hasattr(sys, 'gettotalrefcount'):
- basename = basename[:-2]
- return basename
- def get_extension(self):
- ffiplatform._hack_at_distutils() # backward compatibility hack
- if not self._has_source:
- with self.ffi._lock:
- if not self._has_source:
- self._write_source()
- sourcename = ffiplatform.maybe_relative_path(self.sourcefilename)
- modname = self.get_module_name()
- return ffiplatform.get_extension(sourcename, modname, **self.kwds)
- def generates_python_module(self):
- return self._vengine._gen_python_module
- def make_relative_to(self, kwds, relative_to):
- if relative_to and os.path.dirname(relative_to):
- dirname = os.path.dirname(relative_to)
- kwds = kwds.copy()
- for key in ffiplatform.LIST_OF_FILE_NAMES:
- if key in kwds:
- lst = kwds[key]
- if not isinstance(lst, (list, tuple)):
- raise TypeError("keyword '%s' should be a list or tuple"
- % (key,))
- lst = [os.path.join(dirname, fn) for fn in lst]
- kwds[key] = lst
- return kwds
- # ----------
- def _locate_module(self):
- if not os.path.isfile(self.modulefilename):
- if self.ext_package:
- try:
- pkg = __import__(self.ext_package, None, None, ['__doc__'])
- except ImportError:
- return # cannot import the package itself, give up
- # (e.g. it might be called differently before installation)
- path = pkg.__path__
- else:
- path = None
- filename = self._vengine.find_module(self.get_module_name(), path,
- _get_so_suffixes())
- if filename is None:
- return
- self.modulefilename = filename
- self._vengine.collect_types()
- self._has_module = True
- def _write_source_to(self, file):
- self._vengine._f = file
- try:
- self._vengine.write_source_to_f()
- finally:
- del self._vengine._f
- def _write_source(self, file=None):
- if file is not None:
- self._write_source_to(file)
- else:
- # Write our source file to an in memory file.
- f = NativeIO()
- self._write_source_to(f)
- source_data = f.getvalue()
- # Determine if this matches the current file
- if os.path.exists(self.sourcefilename):
- with open(self.sourcefilename, "r") as fp:
- needs_written = not (fp.read() == source_data)
- else:
- needs_written = True
- # Actually write the file out if it doesn't match
- if needs_written:
- _ensure_dir(self.sourcefilename)
- with open(self.sourcefilename, "w") as fp:
- fp.write(source_data)
- # Set this flag
- self._has_source = True
- def _compile_module(self):
- # compile this C source
- tmpdir = os.path.dirname(self.sourcefilename)
- outputfilename = ffiplatform.compile(tmpdir, self.get_extension())
- try:
- same = ffiplatform.samefile(outputfilename, self.modulefilename)
- except OSError:
- same = False
- if not same:
- _ensure_dir(self.modulefilename)
- shutil.move(outputfilename, self.modulefilename)
- self._has_module = True
- def _load_library(self):
- assert self._has_module
- if self.flags is not None:
- return self._vengine.load_library(self.flags)
- else:
- return self._vengine.load_library()
- # ____________________________________________________________
- _FORCE_GENERIC_ENGINE = False # for tests
- def _locate_engine_class(ffi, force_generic_engine):
- if _FORCE_GENERIC_ENGINE:
- force_generic_engine = True
- if not force_generic_engine:
- if '__pypy__' in sys.builtin_module_names:
- force_generic_engine = True
- else:
- try:
- import _cffi_backend
- except ImportError:
- _cffi_backend = '?'
- if ffi._backend is not _cffi_backend:
- force_generic_engine = True
- if force_generic_engine:
- from . import vengine_gen
- return vengine_gen.VGenericEngine
- else:
- from . import vengine_cpy
- return vengine_cpy.VCPythonEngine
- # ____________________________________________________________
- _TMPDIR = None
- def _caller_dir_pycache():
- if _TMPDIR:
- return _TMPDIR
- result = os.environ.get('CFFI_TMPDIR')
- if result:
- return result
- filename = sys._getframe(2).f_code.co_filename
- return os.path.abspath(os.path.join(os.path.dirname(filename),
- '__pycache__'))
- def set_tmpdir(dirname):
- """Set the temporary directory to use instead of __pycache__."""
- global _TMPDIR
- _TMPDIR = dirname
- def cleanup_tmpdir(tmpdir=None, keep_so=False):
- """Clean up the temporary directory by removing all files in it
- called `_cffi_*.{c,so}` as well as the `build` subdirectory."""
- tmpdir = tmpdir or _caller_dir_pycache()
- try:
- filelist = os.listdir(tmpdir)
- except OSError:
- return
- if keep_so:
- suffix = '.c' # only remove .c files
- else:
- suffix = _get_so_suffixes()[0].lower()
- for fn in filelist:
- if fn.lower().startswith('_cffi_') and (
- fn.lower().endswith(suffix) or fn.lower().endswith('.c')):
- try:
- os.unlink(os.path.join(tmpdir, fn))
- except OSError:
- pass
- clean_dir = [os.path.join(tmpdir, 'build')]
- for dir in clean_dir:
- try:
- for fn in os.listdir(dir):
- fn = os.path.join(dir, fn)
- if os.path.isdir(fn):
- clean_dir.append(fn)
- else:
- os.unlink(fn)
- except OSError:
- pass
- def _get_so_suffixes():
- suffixes = _extension_suffixes()
- if not suffixes:
- # bah, no C_EXTENSION available. Occurs on pypy without cpyext
- if sys.platform == 'win32':
- suffixes = [".pyd"]
- else:
- suffixes = [".so"]
- return suffixes
- def _ensure_dir(filename):
- dirname = os.path.dirname(filename)
- if dirname and not os.path.isdir(dirname):
- os.makedirs(dirname)
|