"""cQuery - Content Object Model traversal.
Attributes:
CONTAINER (str): Metadata storage prefix
Metadata associated with directories are prefixed
with a so-called "container". In Open Metadata land, this means an
additional directory by the name of `~openmetadata.Path.CONTAINER`
UP (flag): Search direction
A flag for :func:`cquery.matches` specifying that the content
traversal should proceed up from the `root` directory. Use this to
retrieve a hierarchy of matches.
DOWN (flag): Search direction
The opposite of the above UP. Use this to retrieve
multiple matches within a given hierarchy, located under `root`
"""
import os
import errno
__all__ = [
'NONE',
'UP',
'DOWN',
'tag',
'detag',
'matches',
'qualify',
'has_class',
'has_id',
'first_match',
'convert',
'TagExists',
'RootExists',
'CONTAINER'
]
# As per the Open Metadata RFC
# http://rfc.abstractfactory.io/spec/10/
CONTAINER = ".meta"
# Directions
NONE = 1 << 0
UP = 1 << 1
DOWN = 1 << 2
class TagExists(OSError):
"""Raised when a selector either exists or does not exist"""
class RootExists(OSError):
"""Raised when a root either exists or does not exist"""
def has_class(root, selector):
"""Test absolute path `root` for class `selector`
Returns:
True if `root` has been tagged with class `selector` else False
"""
return True if first_match(root, selector) is not None else False
def has_id(root, selector):
"""Test absolute path `root` for id `selector`
Returns:
True if `root` has been tagged with id `selector` else False
"""
return True if first_match(root, selector) is not None else False
def tag(root, selector):
"""Tag absolute path `root` with selector `selector`
This function physically tags a directory with metadata
relevant for queries.
Arguments:
root (str): Absolute path at which to write
selector (str): CSS3-compliant selector
Returns:
status (bool): True if success
Raises:
TagExists: If selector `selector` already exists.
RootExists: If root does not exist
Example:
>>> import tempfile
>>> root = tempfile.mkdtemp()
>>> tag(root, ".User")
True
>>> detag(root, ".User")
True
"""
selector = convert(selector)
if not os.path.exists(root):
raise RootExists("{} did not exist".format(root))
container = os.path.join(root, CONTAINER)
if not os.path.exists(container):
os.makedirs(container)
path = os.path.join(container, selector)
# Use os.open() as opposed to __builtin__.open()
# due to support for low-level flags. This only
# creates a new file if no file already exists.
try:
f = os.open(path, os.O_CREAT | os.O_EXCL)
os.close(f)
except OSError as e:
if e.errno == errno.EEXIST:
raise TagExists("%s already exists." % selector)
raise
return True
def detag(root, selector):
"""Detag selector `selector` from absolute path `root`
This function is the inverse of :func:`tag` and in effect
removes a tag from the given directory.
Precondition:
Selector must exists within root
Returns:
status (bool): True if successful, False otherwise
Raises:
TagExist: If selector `selector` at absolute path `root`
does not exists.
Example:
>>> import tempfile
>>> root = tempfile.mkdtemp()
>>> tag(root, ".User")
True
>>> detag(root, ".User")
True
"""
if not os.path.exists(root):
raise RootExists("Root did not exist")
selector = convert(selector)
container = os.path.join(root, CONTAINER)
path = os.path.join(container, selector)
try:
os.remove(path)
except OSError as e:
if e.errno == errno.ENOENT:
raise TagExists("%s does not exist." % selector)
raise
return True
[docs]def convert(selector):
"""Convert CSS3 selector `selector` into compatible file-path
Arguments:
selector (str): CSS3 selector, e.g. .Asset
Returns:
str: Resolved selector
Example:
.. code-block:: bash
$ .Asset --> Asset.class
$ #MyId --> MyId.id
"""
# By Class
if selector.startswith("."):
selector = selector[1:] + '.class'
# By ID
elif selector.startswith("#"):
selector = selector[1:] + '.id'
# By Name
else:
pass
return selector
def qualify(selector):
"""Return fully-qualified selector from `selector`
This function converts `selector` into an a searchable
relative path for :func:`matches`
Arguments:
selector (str): CSS3-compliant selector
Returns:
path (str): Relative path to selector within a root directory
Example:
>>> import os
>>> path = qualify('.Asset')
>>> if os.name == 'nt':
... assert path == ".meta\\Asset.class"
... else:
... assert path == ".meta/Asset.class"
"""
return os.path.join(CONTAINER, convert(selector))
[docs]def matches(root, selector, direction=DOWN, depth=-1):
"""Yield matches at absolute path `root` for selector `selector`
given the direction `direction`.
When looking for a first match only, use :func:`first_match`
Arguments:
root (str): Absolute path from which where to start looking
selector (str): CSS3-compliant selector, e.g. ".Asset"
direction (enum, optional): Search either up or down a hierarchy
depth (int): Depth of traversal; a value of -1 means infinite
Yields:
path (str): Absolute path of next match.
Attributes:
errors: Collection of errors occured during os.walk (NotImplemented)
Raises:
OSError: ENOTDIR is raised if path `root` is not a directory
Example:
>>> import os
>>> paths = list()
>>> for match in matches(os.getcwd(), ".Asset"):
... paths.append(match)
"""
assert isinstance(direction, int)
if os.path.exists(root) and not os.path.isdir(root):
raise OSError(errno.ENOTDIR, "{} is not a directory".format(root))
errors = list()
selector = qualify(selector)
def error_collector(exception):
errors.append(exception)
if direction & DOWN:
for base, dirs, _ in os.walk(root,
topdown=True,
onerror=error_collector):
if os.path.basename(base).startswith("."):
continue
if depth >= 0:
head = base[len(root)+len(os.path.sep) - 1:]
level = head.count(os.path.sep)
if level >= depth:
dirs[:] = [] # Don't recurse any deeper
path = os.path.join(base, selector)
if os.path.isfile(path):
yield base
elif direction & UP:
level = 0
while True:
path = os.path.join(root, selector)
if os.path.isfile(path):
yield root
old_root = root
root = os.path.dirname(root)
if depth >= 0:
level += 1
if level >= depth:
break
if root == old_root:
# Top-level reached
break
elif direction & NONE:
path = os.path.join(root, selector)
if os.path.isfile(path):
yield root
else:
raise ValueError("Direction not recognised: %s" % direction)
[docs]def first_match(root, selector, direction=DOWN, depth=-1):
"""Convenience function for returning a first match from :func:`matches`.
Arguments:
root (str): Absolute path from which where to start loo
selector (str): CSS-style selector, e.g. .Asset
direction (enum): Search either up or down a hierarchy
Returns:
path (str): Absolute path if successful, None otherwise.
Example:
>>> import os
>>> path = first_match(os.getcwd(), ".Asset")
"""
try:
return next(matches(root=root,
selector=selector,
direction=direction,
depth=depth))
except StopIteration:
return None
if __name__ == '__main__':
import doctest
doctest.testmod()