"""Define the specification for the dataset."""
import re
import os
import logging
import pandas as pd
from string import Formatter
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy_json import NestedMutableJson
from .core import Base
from .core import uniquify
from .indexer import index
from .utils.gen import copy
from .utils.gen import filename
from .utils.gen import read_yaml
from .utils.gen import get_dir_contents
_TAG_PATTERN_TEMPLATE = re.compile(
r"({([\w\d]*?)(?:<([^>]+)>)?(?:\|((?:\.?[\w])+))?\})"
)
[docs]
@uniquify(index)
class Specification(Base):
"""Generic specification representation."""
__tablename__ = "specifications"
__identifier_attrs__ = {"name"}
name: Mapped[str] = mapped_column(primary_key=True, nullable=False)
details: Mapped[list[str]] = mapped_column(NestedMutableJson, nullable=False)
def __init__(self, *, name, details):
self.name = name
self.details = details
[docs]
def build_path(self, strict=True, **tags):
"""
Construct path given a set of tags.
Parameters
----------
strict : bool
If True, the tags provided should exactly match the
requirements. If False, extra tags are ignored and the
first matching path is built.
tags : key, value mappings
name:value tag pairs used for path building.
Returns
-------
path : str
The constructed path if successful, else `None`.
"""
path_patterns = self.details.get("path_patterns")
logging.debug(f"Building path with tags : {tags}")
# Remove none values
tags = {k: v for k, v in tags.items() if v or v == 0}
# Work with extension with or without .
if "extension" in tags:
ext = tags.get("extension")
tags["extension"] = ext if ext.startswith(".") else "." + ext
# Attempt to match pattern with tags and return first match
for pattern in path_patterns:
path = pattern
matches = _TAG_PATTERN_TEMPLATE.findall(pattern)
# Do not tamper with tags provided so that
# it can be used for other patterns
tags_copy = tags.copy()
# Skip if strict set and all tags not matched
if strict and set(tags_copy.keys()) - set([t[1] for t in matches]):
continue
# Validate and fill in missing tags with default value
for subpat, name, valid_vals, default in matches:
valid = [v for v in valid_vals.split("|")]
if valid and name in tags_copy and tags_copy[name] not in valid:
continue
if name not in tags_copy and default:
tags_copy[name] = default
if valid and default and default not in valid:
raise ValueError(f"Inconsistent default in pattern {subpat}")
# Simplify path
path = path.replace(subpat, "{%s}" % name)
# Keep or remove optional tags
optional_patterns = re.findall(r"(\[.*?\])", path)
for op in optional_patterns:
optional_tag = re.findall(r"\{(.*?)\}", op)[0]
path = (
path.replace(op, op[1:-1])
if optional_tag in tags_copy.keys()
else path.replace(op, "")
)
# Find fields available in path
fields = [f[1] for f in Formatter().parse(path)]
# Proceed only if all field data available
if set(fields) - set(tags_copy.keys()):
continue
# Fill in the fields
path = path.format_map(tags_copy)
return path
return None
[docs]
@staticmethod
def create_from_file(path):
"""Create Specification from yaml file."""
n, d = filename(path), read_yaml(path)
spec = Specification(name=n, details=d)
index.add(spec)
return spec
[docs]
@classmethod
def get(cls, **identifiers):
d = identifiers.pop("details", None)
obj = index.get(cls, **identifiers)
if obj and d and obj.details != d:
return None
return obj
[docs]
def organize(self, rules):
"""
Create organized copy of source directories based on defined rules.
Parameters
----------
rules : dict
Dictionary describing instructions for organizing.
"""
# Check for required keys
for mandatory_key in [
"source",
"destination",
"pattern",
"tag_rules",
]:
if mandatory_key not in rules:
raise KeyError(f"Expected '{mandatory_key}' key in rule.")
src = rules.get("source")
dst = rules.get("destination")
logging.info(f"Organizing {src} based on {self.name} -> {dst}")
overwrite = rules.get("overwrite", False)
if overwrite:
logging.warning("Overwrite set: Existing files will be overwritten")
add = rules.get("add", None)
for a in add or []:
logging.info(f"File {a['path']} will be added as {a['position']}.")
logging.debug(f"Matching contents with pattern {rules.get('pattern')}")
# Organize files matching pattern using rules
matches = get_dir_contents(src, rules["pattern"], rules.get("skip", None))
for file in matches or []:
logging.info(f"Found match with file {file}")
# Extract tags for file
tags = {}
for rule in rules.get("tag_rules"):
tag_name = rule.get("name")
logging.debug(f"Foraging for {tag_name} tag")
if "value" in rule:
val = rule.get("value")
tags[tag_name] = val
logging.debug(f"Setting tag with {val}")
else:
match = re.findall(rule.get("pattern"), file)
if match and len(match) != 1:
logging.warning("Expected single match, found more.")
# Choose last match always
val = match[-1] if match else None
logging.debug(f"Matching with pattern yields {val}")
if "prepend" in rule and val:
val = "".join([str(rule.get("prepend")), val])
logging.debug(f"Prepending tag value to get {val}")
if "length" in rule and val and len(val) != rule.get("length"):
if "iffy_prepend" in rule:
logging.debug("Insufficient length, prepending")
val = "".join([str(rule.get("iffy_prepend")), val])
if len(val) != rule.get("length"):
logging.debug("Tag value of insufficient length")
val = None
if "pad" in rule and val:
pad_args = rule["pad"]
val = val.rjust(pad_args["length"], str(pad_args["character"]))
if c := rule.get("case") in ["lower", "upper"] and val:
val = val.lower() if c == "lower" else val.upper()
if "default" in rule and not val:
val = rule.get("default")
logging.debug(f"Using default value of {val} for tag")
if "padding" in rule and val:
pad = rule.get("padding")
# Set defaults
direction = (pad.get("direction", "left"),)
char = (pad.get("char", "0"),)
length = pad["length"]
if direction == "left":
val.rjust(length, char)
elif direction == "right":
val.ljust(length, char)
if "replace" in rule and val:
rep = rule.get("replace")
col, with_, from_ = [rep[x] for x in ["col", "with", "from"]]
logging.info(f"File {from_} will be used to map tag values")
mapping = pd.read_csv(from_, dtype=str)
# TODO: Modularize below code snippet
m = mapping.where(mapping[col] == val).dropna()
if len(m) == 0:
logging.error(f"No mapping found for {val}")
continue
if len(m) > 1:
logging.error(f"Expected unique map for {val}, found many")
continue
val = m[with_].values[0]
if not val:
logging.error(f"Value for {tag_name} tag not found in {file}.")
logging.info(f"File marked with {tag_name}:{val} tag")
tags.update({tag_name: val})
# Warning, clunky code ahead. To be made better
rel_path = self.build_path(**tags)
if not rel_path:
logging.error("Unable to build destination path for file")
continue
new_path = os.path.join(dst, rel_path)
logging.info(f"Target destination path is {new_path}")
copy(file, new_path, overwrite)
logging.info("Moved file to target")
if add:
for addition in add:
if addition["position"] == "content":
addition_path = os.path.join(
new_path, os.path.basename(addition["path"])
)
elif addition["position"] == "fellow":
addition_path = os.path.join(
os.path.dirname(new_path), addition["path"]
)
else:
raise ValueError(
"Expected position to be either content or fellow"
)
copy(addition["path"], addition_path, overwrite)
logging.info(f"Added addition at {addition_path}")
if not rules.get("copy_fellows", False):
continue
# Find fellows
logging.info("Initiating copying of fellow files")
fellows = [
f.path
for f in os.scandir(os.path.dirname(file))
if f.name != os.path.basename(file) and not f.is_dir()
]
logging.info(f"Found {len(fellows)} fellows accompanying the file")
for fellow in fellows:
# Tag changes for fellow
logging.info(f"Changing tags for fellow {fellow}")
tags_copy = tags.copy()
tags_copy.update({"extension": os.path.splitext(fellow)[1]})
for rule in rules.get("rename_rules", []):
if re.findall(rule.get("target"), fellow):
tag_val = rule.get("suffix")
tags_copy.update({"suffix": tag_val})
logging.info(f"File marked with suffix:{tag_val} tag")
# Copy fellow files
rel_path = self.build_path(**tags_copy)
if not rel_path:
logging.error(f"Unable to build destination path for file {file}")
continue
new_path = os.path.join(dst, rel_path)
logging.info(f"Target destination path is {new_path}")
copy(fellow, new_path, overwrite)
@property
def tags(self):
"""Return list of tags defined in the specification."""
return [t.get("name") for t in self.details.get("tags")]
[docs]
def validate_path(self, path):
"""Return True if path is valid according to specification."""
tags = self.extract_tags(path)
if self.build_path(**tags) == path:
return True
return False
def __repr__(self):
return f"<Specification name: '{self.name}'>"