Source code for cvfe.data.preprocessor
__all__ = [
"DataDictPreprocessor",
"CanadaDataDictPreprocessor",
"FileTransformCompose",
"FileTransform",
"CopyFile",
"MakeContentCopyProtectedMachineReadable",
]
import csv
import logging
import shutil
from typing import Any, Callable, Optional
import pikepdf
from dateutil import parser
from dateutil.relativedelta import *
from cvfe.configs import CANADA_COUNTRY_CODE_TO_NAME
from cvfe.data import functional
from cvfe.data.constant import *
from cvfe.data.pdf import CanadaXFA
# config logger
logger = logging.getLogger(__name__)
[docs]
class DataDictPreprocessor:
"""A set of utilities over dictionary of data to make it easier for data preprocessing
A class that contains methods for dealing with dictionaries regarding
transformation of data such as filling missing values, dropping keys,
or aggregating multiple keys into a single more meaningful one.
This class needs to be extended for file specific preprocessing where tags are
unique and need to be done entirely manually.
In this case, :func:`file_specific_basic_transform` needs to be implemented.
"""
[docs]
def __init__(self, data_dict: Optional[dict[str, Any]] = None) -> None:
"""
Args:
data_dict (Optional[dict[str, Any]], optional): Main dictionary of data to
be preprocessed. Defaults to None.
"""
self.data_dict = data_dict
[docs]
def key_dropper(
self,
string: str,
exclude: Optional[str] = None,
regex: bool = False,
inplace: bool = True,
) -> Optional[dict[str, Any]]:
"""See :func:`cvfe.data.functional.key_dropper` for more information"""
return functional.key_dropper(
data_dict=self.data_dict,
string=string,
exclude=exclude,
regex=regex,
inplace=inplace,
)
[docs]
def file_specific_basic_transform(
self, doc_type: DocTypes, path: str
) -> dict[str, Any]:
"""
Takes a specific file then does data type fixing, missing value filling, discretization, etc.
Note:
Since each files has its own unique tags and requirements,
it is expected that all these transformation being hardcoded for each file,
hence this method exists to just improve readability without any generalization
to other problems or even files.
Args:
doc_type (DocTypes): The input document type
(see :class:`DocTypes <cvfe.data.constant.DocTypes>`)
path (str): Path to the input document
"""
raise NotImplementedError
[docs]
def change_dtype(
self,
key_name: str,
dtype: Callable,
if_nan: str | Callable = "skip",
**kwargs,
):
"""See :func:`cvfe.data.functional.change_dtype` for more details"""
return functional.change_dtype(
data_dict=self.data_dict,
key_name=key_name,
dtype=dtype,
if_nan=if_nan,
**kwargs,
)
[docs]
def config_csv_to_dict(self, path: str) -> dict:
"""
Take a config CSV and return a dictionary of key and values
Args:
path (str): string path to config file
"""
with open(path, newline="") as f:
config_dict = csv.DictReader(f, delimiter=",")
return config_dict
[docs]
class CanadaDataDictPreprocessor(DataDictPreprocessor):
[docs]
def __init__(self, data_dict: Optional[dict[str, Any]] = None) -> None:
super().__init__(data_dict)
self.base_date = (
None # the time forms were filled, considered "today" for forms
)
# get country code to name dict
self.config_path = CANADA_COUNTRY_CODE_TO_NAME
self.CANADA_COUNTRY_CODE_TO_NAME = self.config_csv_to_dict(self.config_path)
[docs]
def convert_country_code_to_name(self, string: str) -> str:
"""
Converts the (custom and non-standard) code of a country to its name given the XFA docs LOV section.
Args:
string (str): input code string
"""
country = [c for c in self.CANADA_COUNTRY_CODE_TO_NAME.keys() if string in c]
if country:
return self.CANADA_COUNTRY_CODE_TO_NAME[country]
else:
logger.debug(
(
f'"{string}" country code could not be found'
f'in the config file="{self.config_path}".'
)
)
return CanadaFillna.COUNTRY_CODE_5257E
[docs]
def file_specific_basic_transform(
self, doc_type: DocTypes, path: str
) -> dict[str, Any]:
canada_xfa = CanadaXFA() # Canada PDF to XML
if doc_type == DocTypes.CANADA_5257E:
# XFA to XML
xml = canada_xfa.extract_raw_content(path)
xml = canada_xfa.clean_xml_for_csv(xml=xml, type=DocTypes.CANADA_5257E)
# XML to flattened dict
data_dict = canada_xfa.xml_to_flattened_dict(xml=xml)
data_dict = canada_xfa.flatten_dict(data_dict)
# clean flattened dict
data_dict = functional.dict_summarizer(
data_dict,
cutoff_term=CanadaCutoffTerms.CA5257E,
KEY_ABBREVIATION_DICT=CANADA_5257E_KEY_ABBREVIATION,
VALUE_ABBREVIATION_DICT=CANADA_5257E_VALUE_ABBREVIATION,
)
self.data_dict = data_dict
# drop pepeg keys
functional.drop(dictionary=data_dict, keys=CANADA_5257E_DROP_COLUMNS)
# Adult binary state: adult=True or child=False
feature = "P1.AdultFlag"
data_dict[feature] = True if data_dict[feature] == "adult" else False
# service language: 1=En, 2=Fr -> need to be changed to categorical
feature = "P1.PD.ServiceIn.ServiceIn"
data_dict = self.change_dtype(key_name=feature, dtype=int, if_nan="skip")
# AliasNameIndicator: 1=True, 0=False
feature = "P1.PD.AliasName.AliasNameIndicator.AliasNameIndicator"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# VisaType: String -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.VisaType.VisaType",
dtype=str,
if_nan="fill",
value=CanadaFillna.VISA_TYPE_5257E,
)
# Birth City: String -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.PlaceBirthCity",
dtype=str,
if_nan="fill",
value=CanadaFillna.PLACE_BIRTH_CITY_5257E,
)
# Birth country: string -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.PlaceBirthCountry",
dtype=str,
if_nan="fill",
value=CanadaFillna.COUNTRY_5257E,
)
# citizen of: string -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.Citizenship.Citizenship",
dtype=str,
if_nan="fill",
value=CanadaFillna.CITIZENSHIP_5257E,
)
# current country of residency: string -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.CurrCOR.Row2.Country",
dtype=str,
if_nan="fill",
value=CanadaFillna.COUNTRY_5257E,
)
# current country of residency status: string -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.CurrCOR.Row2.Status",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.RESIDENCY_STATUS_5257E),
)
# current country of residency other description: bool -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.CurrCOR.Row2.Other",
dtype=bool,
if_nan="fill",
value=CanadaFillna.OTHER_DESCRIPTION_INDICATOR_5257E,
)
# validation date of information, i.e. current date: datetime
data_dict = self.change_dtype(
key_name="P3.Sign.C1CertificateIssueDate",
dtype=parser.parse,
if_nan="skip",
)
# keep it so we can access for other file if that was None
feature = "P3.Sign.C1CertificateIssueDate"
if data_dict[feature] is not None:
self.base_date = data_dict[feature]
# date of birth in year: string -> datetime
data_dict = self.change_dtype(
key_name="P1.PD.DOBYear", dtype=parser.parse, if_nan="skip"
)
# current country of residency period: None -> Datetime (=age period)
data_dict = self.change_dtype(
key_name="P1.PD.CurrCOR.Row2.FromDate",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P1.PD.DOBYear"],
)
data_dict = self.change_dtype(
key_name="P1.PD.CurrCOR.Row2.ToDate",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
# has previous country of residency: bool -> categorical
feature = "P1.PD.PCRIndicator"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# clean previous country of residency features
country_tag_list = [
c for c in list(data_dict.keys()) if "P1.PD.PrevCOR." in c
]
PREV_COUNTRY_MAX_FEATURES = 4
for i in range(len(country_tag_list) // PREV_COUNTRY_MAX_FEATURES):
# in XLA extracted file, this section start from `Row2` (ie. i+2)
i += 2
# previous country of residency 02: string -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.PrevCOR.Row" + str(i) + ".Country",
dtype=str,
if_nan="fill",
value=CanadaFillna.PREVIOUS_COUNTRY_5257E,
)
# previous country of residency status 02: string -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.PrevCOR.Row" + str(i) + ".Status",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.RESIDENCY_STATUS_5257E),
)
# previous country of residency 02 period (P1.PD.PrevCOR.Row2): string -> datetime -> int days
data_dict = self.change_dtype(
key_name="P1.PD.PrevCOR.Row" + str(i) + ".FromDate",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
data_dict = self.change_dtype(
key_name="P1.PD.PrevCOR.Row" + str(i) + ".ToDate",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
# apply from country of residency (cwa=country where apply): Y=True, N=False
feature = "P1.PD.SameAsCORIndicator"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# country where applying: string -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.CWA.Row2.Country",
dtype=str,
if_nan="fill",
value=CanadaFillna.COUNTRY_WHERE_APPLYING_5257E,
)
# country where applying status: string -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.CWA.Row2.Status",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.RESIDENCY_STATUS_5257E),
)
# country where applying other: string -> categorical
data_dict = self.change_dtype(
key_name="P1.PD.CWA.Row2.Other",
dtype=bool,
if_nan="fill",
value=CanadaFillna.OTHER_DESCRIPTION_INDICATOR_5257E,
)
# country where applying period: datetime -> int days
data_dict = self.change_dtype(
key_name="P1.PD.CWA.Row2.FromDate",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
data_dict = self.change_dtype(
key_name="P1.PD.CWA.Row2.ToDate",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
# marriage period: datetime -> int days
data_dict = self.change_dtype(
key_name="P1.MS.SecA.DateOfMarr",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
# previous marriage: Y=True, N=False
feature = "P2.MS.SecA.PrevMarrIndicator"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# previous marriage type of relationship
data_dict = self.change_dtype(
key_name="P2.MS.SecA.TypeOfRelationship",
dtype=str,
if_nan="fill",
value=CanadaFillna.MARRIAGE_TYPE_5257E,
)
# previous spouse age period: string -> datetime -> int days
data_dict = self.change_dtype(
key_name="P2.MS.SecA.PrevSpouseDOB.DOBYear",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
# previous marriage period: string -> datetime -> int days
data_dict = self.change_dtype(
key_name="P2.MS.SecA.FromDate",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
data_dict = self.change_dtype(
key_name="P2.MS.SecA.ToDate.ToDate",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
# passport country of issue: string -> categorical
data_dict = self.change_dtype(
key_name="P2.MS.SecA.Psprt.CountryofIssue.CountryofIssue",
dtype=str,
if_nan="fill",
value=CanadaFillna.PASSPORT_COUNTRY_5257E,
)
# expiry remaining period: datetime -> int days
# if None, fill with 1 year ago, ie. period=1year
feature = "P3.Sign.C1CertificateIssueDate"
temp_date = parser.parse(data_dict[feature]) + relativedelta(years=-1)
data_dict = self.change_dtype(
key_name="P2.MS.SecA.Psprt.ExpiryDate",
dtype=parser.parse,
if_nan="fill",
value=temp_date,
)
# native lang: string -> categorical
data_dict = self.change_dtype(
key_name="P2.MS.SecA.Langs.languages.nativeLang.nativeLang",
dtype=str,
if_nan="fill",
value=CanadaFillna.NATIVE_LANG_5257E,
)
# communication lang: Eng, Fr, both, none -> categorical
data_dict = self.change_dtype(
key_name="P2.MS.SecA.Langs.languages.ableToCommunicate.ableToCommunicate",
dtype=str,
if_nan="fill",
value=CanadaFillna.LANGUAGES_ABLE_TO_COMMUNICATE_5257E,
)
# language official test: bool -> binary
feature = "P2.MS.SecA.Langs.LangTest"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# have national ID: bool -> binary
feature = "P2.natID.q1.natIDIndicator"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# national ID country of issue: string -> categorical
data_dict = self.change_dtype(
key_name="P2.natID.natIDdocs.CountryofIssue.CountryofIssue",
dtype=str,
if_nan="fill",
value=CanadaFillna.ID_COUNTRY_5257E,
)
# United States doc: bool -> binary
feature = "P2.USCard.q1.usCardIndicator"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# US Canada phone number: bool -> binary
feature = "P2.CI.cntct.PhnNums.Phn.CanadaUS"
data_dict[feature] = True if data_dict[feature] == "1" else False
# US Canada alt phone number: bool -> binary
feature = "P2.CI.cntct.PhnNums.AltPhn.CanadaUS"
data_dict[feature] = True if data_dict[feature] == "1" else False
# purpose of visit: string, 8 states -> categorical
data_dict = self.change_dtype(
key_name="P3.DOV.PrpsRow1.PrpsOfVisit.PrpsOfVisit",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.PURPOSE_OF_VISIT_5257E),
) # 7 is other in the form
# purpose of visit description: string -> binary
data_dict = self.change_dtype(
key_name="P3.DOV.PrpsRow1.Other.Other",
dtype=bool,
if_nan="fill",
value=CanadaFillna.OTHER_DESCRIPTION_INDICATOR_5257E,
)
# how long going to stay: None -> datetime (0 days)
data_dict = self.change_dtype(
key_name="P3.DOV.PrpsRow1.HLS.FromDate",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
data_dict = self.change_dtype(
key_name="P3.DOV.PrpsRow1.HLS.ToDate",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
# fund to integer
data_dict = self.change_dtype(
key_name="P3.DOV.PrpsRow1.Funds.Funds", dtype=int, if_nan="skip"
)
# relation to applicant of purpose of visit 01: string -> categorical
data_dict = self.change_dtype(
key_name="P3.DOV.cntcts_Row1.RelationshipToMe.RelationshipToMe",
dtype=str,
if_nan="fill",
value=CanadaFillna.CONTACT_TYPE_5257E,
)
# relation to applicant of purpose of visit 02: string -> categorical
data_dict = self.change_dtype(
key_name="P3.cntcts_Row2.Relationship.RelationshipToMe",
dtype=str,
if_nan="fill",
value=CanadaFillna.CONTACT_TYPE_5257E,
)
# higher education: bool -> binary
feature = "P3.Edu.EduIndicator"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# higher education period: string -> datetime -> int days
data_dict = self.change_dtype(
key_name="P3.Edu.Edu_Row1.FromYear",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
data_dict = self.change_dtype(
key_name="P3.Edu.Edu_Row1.ToYear",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
# higher education country: string -> categorical
data_dict = self.change_dtype(
key_name="P3.Edu.Edu_Row1.Country.Country",
dtype=str,
if_nan="fill",
value=CanadaFillna.COUNTRY_5257E,
)
# field of study: string -> categorical
feature = "P3.Edu.Edu_Row1.FieldOfStudy"
data_dict[feature] = str(data_dict[feature])
# clean occupation features
feature = "P3.Occ.OccRow"
occupation_tag_list = [c for c in list(data_dict.keys()) if feature in c]
PREV_OCCUPATION_MAX_FEATURES = 9
for i in range(len(occupation_tag_list) // PREV_OCCUPATION_MAX_FEATURES):
i += 1 # in the form, it starts from Row1 (ie. i+1)
# occupation period 01: none -> string year -> int days
data_dict = self.change_dtype(
key_name="P3.Occ.OccRow" + str(i) + ".FromYear",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
data_dict = self.change_dtype(
key_name="P3.Occ.OccRow" + str(i) + ".ToYear",
dtype=parser.parse,
if_nan="fill",
value=data_dict["P3.Sign.C1CertificateIssueDate"],
)
# occupation type 01: string -> categorical
data_dict = self.change_dtype(
key_name="P3.Occ.OccRow" + str(i) + ".Occ.Occ",
dtype=str,
if_nan="fill",
value=CanadaFillna.OCCUPATION_5257E,
)
# occupation country: string -> categorical
data_dict = self.change_dtype(
key_name="P3.Occ.OccRow" + str(i) + ".Country.Country",
dtype=str,
if_nan="fill",
value=CanadaFillna.COUNTRY_5257E,
)
# medical details: string -> binary
data_dict = self.change_dtype(
key_name="P3.BGI.Details.MedicalDetails",
dtype=bool,
if_nan="fill",
value=CanadaFillna.INDICATOR_FIELD_5257E,
)
# other than medical: string -> binary
data_dict = self.change_dtype(
key_name="P3.BGI.otherThanMedic",
dtype=bool,
if_nan="fill",
value=CanadaFillna.INDICATOR_FIELD_5257E,
)
# without authentication stay, work, etc: bool -> binary
feature = "P3.noAuthStay"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# deported or refused entry: bool -> binary
feature = "P3.refuseDeport"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# previously applied: bool -> binary
feature = "P3.BGI2.PrevApply"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# criminal record: bool -> binary
feature = "P3.PWrapper.criminalRec"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# military record: bool -> binary
feature = "P3.PWrapper.Military.Choice"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# political, violent movement record: bool -> binary
feature = "P3.PWrapper.politicViol"
data_dict[feature] = True if data_dict[feature] == "Y" else False
# witness of ill treatment: bool -> binary
feature = "P3.PWrapper.witnessIllTreat"
data_dict[feature] = True if data_dict[feature] == "Y" else False
return data_dict
if doc_type == DocTypes.CANADA_5645E:
# XFA to XML
xml = canada_xfa.extract_raw_content(path)
xml = canada_xfa.clean_xml_for_csv(xml=xml, type=DocTypes.CANADA_5645E)
# XML to flattened dict
data_dict = canada_xfa.xml_to_flattened_dict(xml=xml)
data_dict = canada_xfa.flatten_dict(data_dict)
# clean flattened dict
data_dict = functional.dict_summarizer(
data_dict,
cutoff_term=CanadaCutoffTerms.CA5645E,
KEY_ABBREVIATION_DICT=CANADA_5645E_KEY_ABBREVIATION,
VALUE_ABBREVIATION_DICT=None,
)
self.data_dict = data_dict
# drop pepeg keys
functional.drop(dictionary=data_dict, keys=CANADA_5645E_DROP_COLUMNS)
# transform multiple pleb keys into a single chad one and fixing key data types
# type of application: (already one hot) string -> int
keys = [key for key in list(data_dict.keys()) if "p1.Subform1" in key]
for k in keys:
data_dict = self.change_dtype(
key_name=k,
dtype=int,
if_nan="fill",
value=int(CanadaFillna.VISA_APPLICATION_TYPE_5645E),
)
# drop all Accompany=No and only rely on Accompany=Yes using binary state
self.key_dropper(string="No", inplace=True)
# applicant marriage status: string to integer
data_dict = self.change_dtype(
key_name="p1.SecA.App.ChdMStatus",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E),
)
# validation date of information, i.e. current date: datetime
data_dict = self.change_dtype(
key_name="p1.SecC.SecCdate",
dtype=parser.parse,
if_nan="fill",
value=self.base_date,
)
# spouse date of birth: string -> datetime
data_dict = self.change_dtype(
key_name="p1.SecA.Sps.SpsDOB",
dtype=parser.parse,
if_nan="fill",
value=data_dict["p1.SecC.SecCdate"],
)
# spouse country of birth: string -> categorical
data_dict = self.change_dtype(
key_name="p1.SecA.Sps.SpsCOB", dtype=str, if_nan="skip"
)
# spouse occupation type (issue #2): string -> categorical
data_dict = self.change_dtype(
key_name="p1.SecA.Sps.SpsOcc",
dtype=str,
if_nan="fill",
value=CanadaFillna.OCCUPATION_5257E,
)
# spouse accompanying: coming=True or not_coming=False
feature = "p1.SecA.Sps.SpsAccomp"
data_dict[feature] = True if data_dict[feature] == "1" else False
# mother date of birth: string -> datetime
data_dict = self.change_dtype(
key_name="p1.SecA.Mo.MoDOB",
dtype=parser.parse,
if_nan="fill",
value=data_dict["p1.SecC.SecCdate"],
)
# mother occupation type (issue #2): string -> categorical
data_dict = self.change_dtype(
key_name="p1.SecA.Mo.MoOcc",
dtype=str,
if_nan="fill",
value=CanadaFillna.OCCUPATION_5257E,
)
# mother marriage status: int -> categorical
data_dict = self.change_dtype(
key_name="p1.SecA.Mo.ChdMStatus",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E),
)
# mother accompanying: coming=True or not_coming=False
feature = "p1.SecA.Mo.MoAccomp"
data_dict[feature] = True if data_dict[feature] == "1" else False
# father date of birth: string -> datetime
data_dict = self.change_dtype(
key_name="p1.SecA.Fa.FaDOB",
dtype=parser.parse,
if_nan="fill",
value=data_dict["p1.SecC.SecCdate"],
)
# mother occupation type (issue #2): string -> categorical
data_dict = self.change_dtype(
key_name="p1.SecA.Fa.FaOcc",
dtype=str,
if_nan="fill",
value=CanadaFillna.OCCUPATION_5257E,
)
# father marriage status: int -> categorical
data_dict = self.change_dtype(
key_name="p1.SecA.Fa.ChdMStatus",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E),
)
# father accompanying: coming=True or not_coming=False
feature = "p1.SecA.Fa.FaAccomp"
data_dict[feature] = True if data_dict[feature] == "1" else False
# children's status
children_tag_list = [
c for c in list(data_dict.keys()) if "p1.SecB.Chd" in c
]
CHILDREN_MAX_FEATURES = 7
for i in range(len(children_tag_list) // CHILDREN_MAX_FEATURES):
# child's marriage status 01: string to integer
data_dict = self.change_dtype(
key_name="p1.SecB.Chd.[" + str(i) + "].ChdMStatus",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E),
)
# child's relationship 01: string -> categorical
data_dict = self.change_dtype(
key_name="p1.SecB.Chd.[" + str(i) + "].ChdRel",
dtype=str,
if_nan="fill",
value=CanadaFillna.CHILD_RELATION_5645E,
)
# child's date of birth 01: string -> datetime
data_dict = self.change_dtype(
key_name="p1.SecB.Chd.[" + str(i) + "].ChdDOB",
dtype=parser.parse,
if_nan="skip",
)
# child's country of birth 01: string -> categorical
data_dict = self.change_dtype(
key_name="p1.SecB.Chd.[" + str(i) + "].ChdCOB",
dtype=str,
if_nan="fill",
value=CanadaFillna.COUNTRY_5257E,
)
# child's occupation type 01 (issue #2): string -> categorical
data_dict = self.change_dtype(
key_name="p1.SecB.Chd.[" + str(i) + "].ChdOcc",
dtype=str,
if_nan="fill",
value=CanadaFillna.OCCUPATION_5257E,
)
# child's marriage status: int -> categorical
data_dict = self.change_dtype(
key_name="p1.SecB.Chd.[" + str(i) + "].ChdMStatus",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E),
)
# child's accompanying 01: coming=True or not_coming=False
feature = "p1.SecB.Chd.[" + str(i) + "].ChdAccomp"
data_dict[feature] = True if data_dict[feature] == "1" else False
# check if the child does not exist and fill it properly (ghost case monkaS)
if (
(
data_dict["p1.SecB.Chd.[" + str(i) + "].ChdMStatus"]
== CanadaFillna.CHILD_MARRIAGE_STATUS_5645E
)
and (data_dict["p1.SecB.Chd.[" + str(i) + "].ChdRel"] == "OTHER")
and (data_dict["p1.SecB.Chd.[" + str(i) + "].ChdDOB"] is None)
and (data_dict["p1.SecB.Chd.[" + str(i) + "].ChdAccomp"] == False)
):
# ghost child's date of birth: None -> datetime (current date) -> 0 days
data_dict = self.change_dtype(
key_name="p1.SecB.Chd.[" + str(i) + "].ChdDOB",
dtype=parser.parse,
if_nan="fill",
value=data_dict["p1.SecC.SecCdate"],
)
# siblings' status
siblings_tag_list = [
c for c in list(data_dict.keys()) if "p1.SecC.Chd" in c
]
SIBLINGS_MAX_FEATURES = 8
for i in range(len(siblings_tag_list) // SIBLINGS_MAX_FEATURES):
# sibling's marriage status 01: string to integer
data_dict = self.change_dtype(
key_name="p1.SecC.Chd.[" + str(i) + "].ChdMStatus",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.CHILD_MARRIAGE_STATUS_5645E),
)
# sibling's relationship 01: string -> categorical
data_dict = self.change_dtype(
key_name="p1.SecC.Chd.[" + str(i) + "].ChdRel",
dtype=str,
if_nan="fill",
value=CanadaFillna.CHILD_RELATION_5645E,
)
# sibling's date of birth 01: string -> datetime
data_dict = self.change_dtype(
key_name="p1.SecC.Chd.[" + str(i) + "].ChdDOB",
dtype=parser.parse,
if_nan="skip",
)
# sibling's country of birth 01: string -> categorical
data_dict = self.change_dtype(
key_name="p1.SecC.Chd.[" + str(i) + "].ChdCOB",
dtype=str,
if_nan="fill",
value=CanadaFillna.COUNTRY_5257E,
)
# sibling's occupation type 01 (issue #2): string -> categorical
data_dict = self.change_dtype(
key_name="p1.SecC.Chd.[" + str(i) + "].ChdOcc",
dtype=str,
if_nan="fill",
value=CanadaFillna.OCCUPATION_5257E,
)
# sibling's accompanying: coming=True or not_coming=False
feature = "p1.SecC.Chd.[" + str(i) + "].ChdAccomp"
data_dict[feature] = True if data_dict[feature] == "1" else False
# check if the sibling does not exist and fill it properly (ghost case monkaS)
if (
(
data_dict["p1.SecC.Chd.[" + str(i) + "].ChdMStatus"]
== CanadaFillna.CHILD_MARRIAGE_STATUS_5645E
)
and (data_dict["p1.SecC.Chd.[" + str(i) + "].ChdRel"] == "OTHER")
and (data_dict["p1.SecC.Chd.[" + str(i) + "].ChdOcc"] is None)
and (data_dict["p1.SecC.Chd.[" + str(i) + "].ChdAccomp"] == False)
):
# ghost sibling's date of birth: None -> datetime (current date) -> 0 days
data_dict = self.change_dtype(
key_name="p1.SecC.Chd.[" + str(i) + "].ChdDOB",
dtype=parser.parse,
if_nan="fill",
value=data_dict["p1.SecC.SecCdate"],
)
return data_dict
if doc_type == DocTypes.CANADA_LABEL:
with open(path, newline="") as f:
data_dict = csv.DictReader(f, delimiter=" ", fieldnames=["VisaResult"])
functional.change_dtype(
data_dict=data_dict,
key_name="VisaResult",
dtype=int,
if_nan="fill",
value=int(CanadaFillna.VISA_RESULT),
)
return data_dict
[docs]
class FileTransform:
"""A base class for applying transforms as a composable object over files.
Any behavior over the files itself (not the content of files)
must extend this class.
"""
[docs]
def __call__(self, src: str, dst: str, *args: Any, **kwds: Any) -> Any:
"""
Args:
src (str): source file to be processed
dst (str): the pass that the processed file to be saved
"""
pass
[docs]
class CopyFile(FileTransform):
"""Only copies a file, a wrapper around `shutil`'s copying methods
Default is set to 'cf', i.e. `shutil.copyfile`. For more info see
shutil_ documentation.
Reference:
1. https://stackoverflow.com/a/30359308/18971263
"""
[docs]
def __init__(self, mode: str) -> None:
super().__init__()
self.COPY_MODES = ["c", "cf", "c2"]
self.mode = mode if mode is not None else "cf"
self.__check_mode(mode=mode)
[docs]
def __call__(self, src: str, dst: str, *args: Any, **kwds: Any) -> Any:
if self.mode == "c":
shutil.copy(src=src, dst=dst)
elif self.mode == "cf":
shutil.copyfile(src=src, dst=dst)
elif self.mode == "c2":
shutil.copy2(src=src, dst=dst)
def __check_mode(self, mode: str):
"""Checks copying mode to be available in shutil_
Args:
mode: copying mode in `shutil`, one of `'c'`, `'cf'`, `'c2'`
.. _shutil: https://docs.python.org/3/library/shutil.html
"""
if not mode in self.COPY_MODES:
raise ValueError(
(f"Mode {mode} does not exist,", f'choose one of "{self.COPY_MODES}".')
)
[docs]
class MakeContentCopyProtectedMachineReadable(FileTransform):
"""Reads a 'content-copy' protected PDF and removes this restriction
Removing the protection is done by saving a "printed" version of via pikepdf_
References:
1. https://www.reddit.com/r/Python/comments/t32z2o/simple_code_to_unlock_all_readonly_pdfs_in/
2. https://pikepdf.readthedocs.io/en/latest/
.. _pikepdf: https://pikepdf.readthedocs.io/en/latest/
"""
[docs]
def __call__(self, src: str, dst: str, *args: Any, **kwds: Any) -> Any:
"""
Args:
src (str): source file to be processed
dst (str): destination to save the processed file
Returns:
Any: None
"""
pdf = pikepdf.open(src, allow_overwriting_input=True)
pdf.save(dst)
[docs]
class FileTransformCompose:
"""Composes several transforms operating on files together
The transforms should be tied to files with keyword and this will be only applying
functions on files that match the keyword using a dictionary
Transformation dictionary over files in the following structure::
{
FileTransform: 'filter_str',
...,
}
Note:
Transforms will be applied in order of the keys in the dictionary
"""
[docs]
def __init__(self, transforms: dict[FileTransform, str]) -> None:
"""
Args:
transforms (dict[FileTransform, str]): a dictionary of transforms, where the key is the instance of
FileTransform and the value is the keyword that the transform will be
applied to
Raises:
ValueError: if the keyword is not a string
"""
if transforms is not None:
for k in transforms.keys():
if not issubclass(k.__class__, FileTransform):
raise TypeError(f"Keys must be {FileTransform} instance.")
self.transforms = transforms
[docs]
def __call__(self, src: str, dst: str, *args: Any, **kwds: Any) -> Any:
"""Applies transforms in order
Args:
src (str): source file path to be processed
dst (str): destination to save the processed file
"""
for transform, file_filter in self.transforms.items():
if file_filter in src:
transform(src, dst)