Source code for cvfe.data.functional

__all__ = [
    "dict_summarizer",
    "dict_to_csv",
    "key_dropper",
    "fillna_datetime",
    "aggregate_datetime",
    "tag_to_regex_compatible",
    "change_dtype",
    "flatten_dict",
    "xml_to_flattened_dict",
    "process_directory",
]

"""
Contains implementation of functions that could be used for processing data everywhere and
    are not necessarily bounded to a class.

"""

import csv
import datetime
import logging
import os
import re
from copy import deepcopy
from fnmatch import fnmatch
from typing import Any, Callable, Iterable, Optional, cast

import xmltodict
from dateutil import parser

from cvfe.data.constant import *
from cvfe.data.preprocessor import FileTransformCompose

# set logger
logger = logging.getLogger(__name__)


def drop(dictionary: dict[str, Any], keys: Iterable[str]) -> None:
    """Takes a dictionary and removes ``keys`` from it

    Args:
        dictionary (dict[str, Any]): The dictionary to be processed
        keys (Iterable[str]): An iterable of string as subset of keys.
            If an item in ``keys`` is none existent in the dictionary,
            it will just skip it.
    """

    for k in keys:
        dictionary.pop(k, None)


def fillna(original: Any, new: Any) -> Any:
    """Replaces an ``original`` value with a ``new`` if None

    Args:
        original (Any): Original value to be replaced if None
        new (Any): The new value as replacement of ``original``

    Returns:
        Any: The new value
    """

    return new if original is None else original


[docs] def dict_summarizer( data_dict: dict[str, Any], cutoff_term: str, KEY_ABBREVIATION_DICT: dict = None, VALUE_ABBREVIATION_DICT: dict = None, ) -> dict[str, Any]: """Takes a flattened dictionary and shortens its keys Args: data_dict (dict[str, Any]): The dictionary to be shortened cutoff_term (str): The string that used to find in keys and remove anything behind it KEY_ABBREVIATION_DICT (dict, optional): A dictionary containing abbreviation mapping for keys. Defaults to None. VALUE_ABBREVIATION_DICT (dict, optional): A dictionary containing abbreviation mapping for values. Defaults to None. Returns: dict[str, Any]: A dict with shortened keys by throwing away some part and using a abbreviation dictionary for both keys and values. """ new_keys = {} new_values = {} for k, v in data_dict.items(): if KEY_ABBREVIATION_DICT is not None: new_k = k if cutoff_term in k: # FIXME: cutoff part should be outside of abbreviation new_k = k[k.index(cutoff_term) + len(cutoff_term) + 1 :] # add any filtering over keys here # abbreviation for word, abbr in KEY_ABBREVIATION_DICT.items(): new_k = re.sub(word, abbr, new_k) new_keys[k] = new_k if VALUE_ABBREVIATION_DICT is not None: # values can be None if v is not None: new_v = v if ( cutoff_term in v ): # FIXME: cutoff part should be outside of abbreviation new_v = v[v.index(cutoff_term) + len(cutoff_term) + 1 :] # add any filtering over values here # abbreviation for word, abbr in VALUE_ABBREVIATION_DICT.items(): new_v = re.sub(word, abbr, new_v) new_values[v] = new_v else: new_values[v] = v # return a new dictionary with updated values if KEY_ABBREVIATION_DICT is None: new_keys = dict((key, key) for (key, _) in data_dict.items()) if VALUE_ABBREVIATION_DICT is None: new_values = dict((value, value) for (_, value) in data_dict.items()) return dict( (new_keys[key], new_values[value]) for (key, value) in data_dict.items() )
[docs] def dict_to_csv(data_dict: dict[str, Any], path: str) -> None: """Takes a flattened dictionary and writes it to a CSV file. Args: data_dict (dict[str, Any]): A dictionary to be saved path (str): Path to the output file (will be created if not exist) """ with open(path, "w") as f: w = csv.DictWriter(f, data_dict.keys()) w.writeheader() w.writerow(data_dict)
[docs] def key_dropper( data_dict: dict[str, Any], string: str, exclude: Optional[str] = None, regex: bool = False, inplace: bool = True, ) -> Optional[dict[str, Any]]: """Takes a dictionary and drops keys matching a pattern Args: data_dict (dict[str, Any]): Dictionary to be processed string (str): string to look for in ``data_dict`` keys exclude (Optional[str], optional): string to exclude a subset of keys from being dropped. Defaults to None. regex (bool, optional): compile ``string`` as regex. Defaults to False. inplace (bool, optional): whether or not use and inplace operation. Defaults to True. Returns: Optional[dict[str, Any]]: Takes a dictionary and searches for keys *containing* ``string`` in them either raw string or regex (in latter case, use ``regex=True``) and after ``exclude`` ing a subset of them, drops the remaining *in-place*. """ if regex: r = re.compile(string) key_to_drop = list(filter(r.match, list(data_dict.keys()))) else: key_to_drop = [key for key in list(data_dict.keys()) if string in key] if exclude is not None: key_to_drop = [key for key in key_to_drop if exclude not in key] if inplace: drop(dictionary=data_dict, keys=key_to_drop) else: data_dict_copy = deepcopy(data_dict) drop(dictionary=data_dict_copy, keys=key_to_drop) return None if inplace else data_dict_copy
[docs] def fillna_datetime( data_dict: dict[str, Any], key_base_name: str, date: str, doc_type: DocTypes, one_sided: str | bool = False, inplace: bool = False, ) -> Optional[dict[str, Any]]: """Takes names of two keys with dates value (start, end) and fills them with a predefined value Args: data_dict (dict[str, Any]): A dictionary to be processed key_base_name (str): Base key name that accepts ``'From'`` and ``'To'`` for extracting dates of same category date (str): The desired date doc_type (DocTypes): :class:`DocTypes <cvfe.data.constant.DocTypes>` used to use rules for matching tags and filling appropriately. Defaults to False. one_sided (str | bool, optional): Different ways of filling empty date keys: 1. ``'right'``: Uses the ``current_date`` as the final time 2. ``'left'``: Uses the ``reference_date`` as the starting time inplace (bool, optional): whether or not use an inplace operation. Defaults to False. Note: In transformation operations such as :func:`aggregate_datetime` function, this would be converted to period of zero. It is useful for filling periods of non existing items (e.g. age of children for single person). Returns: dict[str, Any]: A dictionary that two keys with dates types that had no value (None) which was filled to the exact same date via ``date``. """ if not one_sided: r = re.compile(tag_to_regex_compatible(string=key_base_name, doc_type=doc_type)) else: r = re.compile( tag_to_regex_compatible(string=key_base_name, doc_type=doc_type) + "\.(From|To).+" ) keys_to_fillna_names = list(filter(r.match, list(data_dict.keys()))) for key in data_dict[keys_to_fillna_names]: if inplace: data_dict[key] = fillna(original=data_dict[key], new=date) else: data_dict_copy = deepcopy(data_dict_copy) data_dict_copy[key] = fillna(original=data_dict_copy[key], new=date) return None if inplace else data_dict
[docs] def aggregate_datetime( data_dict: dict[str, Any], key_base_name: str, new_key_name: str, doc_type: DocTypes, if_nan: str | Callable = "skip", one_sided: Optional[str] = None, reference_date: Optional[str] = None, current_date: Optional[str] = None, **kwargs, ) -> dict[str, Any]: """Takes two keys of dates in string form and calculates the period of them Args: data_dict (dict[str, Any]): A dictionary to be processed key_base_name (str): Base key name that accepts ``'From'`` and ``'To'`` for extracting dates of same category new_key_name (str): The key name that extends ``key_base_name`` and will be the final key containing the period. doc_type (DocTypes): document type used to use rules for matching tags and filling appropriately. See :class:`DocTypes <cvfe.data.constant.DocTypes>`. if_nan (str | Callable, optional): What to do with None s (NaN). Could be a function or predefined states as follow: 1. ``'skip'``: do nothing (i.e. ignore ``None``s). Defaults to ``'skip'``. one_sided (Optional[str], optional): Different ways of filling empty date keys. Defaults to None. Could be one of the following: 1. ``'right'``: Uses the ``current_date`` as the final time 2. ``'left'``: Uses the ``reference_date`` as the starting time reference_date (Optional[str], optional): Assumed ``reference_date`` (t0<t1). Defaults to None. current_date (Optional[str], optional): Assumed ``current_date`` (t1>t0). Defaults to None. default_datetime: accepts datetime.datetime_ to set default date for dateutil.parser.parse_. Returns: dict[str, Any]: A new dictionary that contains a key with result of calculation of the period of two keys with values of dates and represent it in integer form. The two keys used for this are dropped. .. _datetime.datetime: https://docs.python.org/3/library/datetime.html .. _dateutil.parser.parse: https://dateutil.readthedocs.io/en/stable/parser.html """ default_datetime = datetime.datetime( year=DATEUTIL_DEFAULT_DATETIME["year"], month=DATEUTIL_DEFAULT_DATETIME["month"], day=DATEUTIL_DEFAULT_DATETIME["day"], ) default_datetime = kwargs.get("default_datetime", default_datetime) aggregated_key_name = None if one_sided is None: aggregated_key_name = key_base_name + "." + new_key_name r = re.compile( tag_to_regex_compatible(string=key_base_name, doc_type=doc_type) + "\.(From|To).+" ) else: # when one_sided, we no longer have *From* or *To* aggregated_key_name = key_base_name + "." + new_key_name r = re.compile(tag_to_regex_compatible(string=key_base_name, doc_type=doc_type)) keys_to_aggregate_names = list(filter(r.match, list(data_dict.keys()))) # *.FromDate and *.ToDate --> *.Period key_from_date = reference_date key_to_date = current_date if one_sided == "left": key_from_date = reference_date to_date = keys_to_aggregate_names[0] elif one_sided == "right": key_to_date = current_date from_date = keys_to_aggregate_names[0] else: from_date = [col for col in keys_to_aggregate_names if "From" in col][0] to_date = [col for col in keys_to_aggregate_names if "To" in col][0] if isinstance(key_to_date, str): key_to_date = parser.parse(key_to_date, default=default_datetime) if key_from_date is None: # ignore reference_date if from_date exists # to able to use already parsed data from fillna if not isinstance(data_dict[from_date], datetime.datetime): data_dict[from_date] = ( parser.parse(data_dict[from_date], default=default_datetime) if data_dict[from_date] is not None else data_dict[from_date] ) key_from_date = data_dict[from_date] else: if isinstance(key_from_date, str): key_from_date = parser.parse(key_from_date, default=default_datetime) if key_to_date is None: # ignore current_date if to_date exists # to able to use already parsed data from fillna if not isinstance(data_dict[to_date], datetime.datetime): data_dict[to_date] = ( parser.parse(data_dict[to_date], default=default_datetime) if data_dict[to_date] is not None else data_dict[to_date] ) key_to_date = data_dict[to_date] else: if isinstance(key_to_date, str): key_to_date = parser.parse(key_to_date, default=default_datetime) if if_nan is not None: if if_nan == "skip": if key_from_date is None or key_to_date is None: return data_dict data_dict[aggregated_key_name] = None # combination of dates data_dict[aggregated_key_name] = fillna( original=data_dict[aggregated_key_name], new=(key_to_date - key_from_date).days ) assert isinstance(data_dict[aggregated_key_name], int) # days must be int # drop start and end date keys drop(dictionary=data_dict, keys=keys_to_aggregate_names) return data_dict
[docs] def tag_to_regex_compatible(string: str, doc_type: DocTypes) -> str: """Takes a string and makes it regex compatible for XML parsed string Note: This is specialized method and it may be better to override it for your own case. Args: string (str): input string to get manipulated doc_type (DocTypes): specified :class:`DocTypes <cvfe.data.constant.DocTypes>` to determine regex rules Returns: str: A modified string """ if ( doc_type == DocTypes.CANADA_5257E or doc_type == DocTypes.CANADA_5645E or doc_type == DocTypes.CANADA ): string = string.replace(".", "\.").replace("[", "\[").replace("]", "\]") return string
[docs] def change_dtype( data_dict: dict[str, Any], key_name: str, dtype: Callable, if_nan: str | Callable = "skip", **kwargs, ) -> dict[str, Any]: """Changes the data type of a key with ability to fill ``None`` s (fillna) Args: data_dict (dict[str, Any]): A dictionary that ``key_name`` will be searched on key_name (str): Desired key name of the dictionary dtype (Callable): target data type as a function e.g. ``float`` if_nan (str, Callable, optional): What to do with None s (NaN). Defaults to ``'skip'``. Could be a function or predefined states as follow: 1. ``'skip'``: do nothing (i.e. ignore ``None`` s) 2. ``'value'``: fill the None with ``value`` argument via ``kwargs`` default_datetime(optional): accepts datetime.datetime_ to set default date for dateutil.parser.parse_ Raises: ValueError: if string mode passed to ``if_nan`` does not exist. It won't raise if ``if_nan`` is ``Callable``. Returns: dict[str, Any]: A dictionary that contains the calculation of the period of two keys with values of type dates and represent it in number of days. The two keys used for the calculation of period are dropped. """ default_datetime = datetime.datetime( year=DATEUTIL_DEFAULT_DATETIME["year"], month=DATEUTIL_DEFAULT_DATETIME["month"], day=DATEUTIL_DEFAULT_DATETIME["day"], ) default_datetime = kwargs.get("default_datetime", default_datetime) # define `func` for different cases of predefined logics if isinstance(if_nan, str): # predefined `if_nan` cases if if_nan == "skip": # the function to be used in `.apply` method of dataframe def func(x): return x elif if_nan == "fill": value = kwargs["value"] # the function to be used in `.apply` method of dataframe def func(x): return value else: raise ValueError(f'Unknown mode "{if_nan}".') else: pass def standardize(value: Any) -> Any: """Takes a value and make it standard for the target function that is going to parse it Args: value (Any): the input value that need to be standardized Note: This is mostly hardcoded and cannot be written better (I think!). So, you can remove it entirely, and see what errors you get, and change this accordingly to errors and exceptions you get. Returns: Any: Standardized value """ if dtype == parser.parse: # datetime parser try: parser.parse(value) except ValueError: # bad input format for `parser.parse` value = cast(str, value) # we want YYYY-MM-DD # MMDDYYYY format (Canada Common Forms) if len(value) == 8 and value.isnumeric(): value = f"{value[4:]}-{value[2:4]}-{value[0:2]}" # fix values if value[5:7] == "02" and value[8:10] == "30": # using >28 for February value = "28".join(value.rsplit("30", 1)) return value def apply_dtype(x: Any) -> Any: """Handles the default ``datetime.datetime`` for ``dateutil.parser.parse`` during casting dtypes Note: This function is only used to handle for a specific case of casting it is hardcoded Args: x (Any): Any value that its dtype going to be casted Returns: Any: ``x`` that is casted to a new type """ if dtype == parser.parse: return dtype(x, default=default_datetime).isoformat() return dtype(x) # apply the rules and data type change data_dict[key_name] = data_dict[key_name] data_dict[key_name] = ( apply_dtype(standardize(data_dict[key_name])) if data_dict[key_name] is not None else func(data_dict[key_name]) ) return data_dict
[docs] def flatten_dict(dictionary: dict[str, Any]) -> dict[str, Any]: """Takes a (nested) multilevel dictionary and flattens it Args: dictionary (dict[str, Any]): A dictionary (could be multilevel) References: 1. https://stackoverflow.com/a/67744709/18971263 Returns: dict[str, Any]: Flattened dictionary where keys and values of returned dict are: * ``new_keys[i] = f'{old_leys[level]}.{old_leys[level+1]}.[...].{old_leys[level+n]}'`` * ``new_value = old_value`` """ def items(): if isinstance(dictionary, dict): for key, value in dictionary.items(): # nested subtree if isinstance(value, dict): for subkey, subvalue in flatten_dict(value).items(): yield f"{key}.{subkey}", subvalue # nested list elif isinstance(value, list): for num, elem in enumerate(value): for subkey, subvalue in flatten_dict(elem).items(): yield f"{key}.[{num}].{subkey}", subvalue # everything else (only leafs should remain) else: yield key, value return dict(items())
[docs] def xml_to_flattened_dict(xml: str) -> dict: """Takes a (nested) XML and flattens it to a dict via :func:`flatten_dict` Args: xml (str): A XML string Returns: dict: A flattened dictionary of given XML """ flattened_dict = xmltodict.parse(xml) # XML to dict flattened_dict = flatten_dict(flattened_dict) return flattened_dict
[docs] def process_directory( src_dir: str, dst_dir: str, compose: FileTransformCompose, file_pattern: str = "*", ) -> None: """Transforms all files that match pattern in given dir and saves new files preserving dir structure Note: A methods used for handling files from manually processed dataset to raw-dataset see :class:`FileTransform <cvfe.data.preprocessor.FileTransform>` for more information. References: 1. https://stackoverflow.com/a/24041933/18971263 Args: src_dir (str): Source directory to be processed dst_dir (str): Destination directory to write processed files compose (FileTransformCompose): An instance of transform composer. see :class:`Compose <cvfe.data.preprocessor.FileTransformCompose>`. file_pattern (str, optional): pattern to match files, default to ``'*'`` for all files. Defaults to ``'*'``. """ assert src_dir != dst_dir, "Source and destination dir must differ." if src_dir[-1] != "/": src_dir += "/" # process directories for dirpath, _, all_filenames in os.walk(src_dir): # filter out files that match pattern only filenames = filter(lambda fname: fnmatch(fname, file_pattern), all_filenames) dirname = dirpath[len(dirpath) - dirpath[::-1].find("/") :] logger.info(f'Processing directory="{dirname}"...') if filenames: dir_ = os.path.join(dst_dir, dirpath.replace(src_dir, "")) os.makedirs(dir_, exist_ok=True) for fname in filenames: in_fname = os.path.join(dirpath, fname) # original path out_fname = os.path.join(dir_, fname) # processed path compose(in_fname, out_fname) # composition of transforms logger.info(f'Processed file="{fname}"') logger.info(f"Processed the data entry.")
def extended_dict_get( string: str, dic: dict, if_nan: str, condition: Optional[Callable | bool] = None ): """Takes a string and looks for it inside a dictionary with default value if condition is satisfied Args: string (str): the ``string`` to look for inside dictionary ``dic`` dic (dict): the dictionary that ``string`` is expected to be if_nan (str): the value returned if ``string`` could not be found in ``dic`` condition (Optional[Callable | bool], optional): look for ``string`` in ``dic`` only if ``condition`` is True. Defaults to None. Examples: >>> d = {'1': 'a', '2': 'b', '3': 'c'} >>> extended_dict_get('1', d, 'z', str.isnumeric) 'a' >>> extended_dict_get('x', d, 'z', str.isnumeric) 'x' Returns: Any: Substituted value instead of `string` """ condition = (lambda x: True) if condition is None else condition condition = cast(Callable, condition) # check given `condition` is true or not if condition(string): return dic.get(string, if_nan) # look for `string` if not use `if_nan` else: logger.info( ( f'"{string}" is not True for the given `condition`', "==> `false_condition_value` will be applied.", ) ) return string