diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9a277f2..0dfd3c7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,15 @@
+## v0.7.0 (2026-03-12)
+
+### Feat
+
+- add row index (#57)
+- add option in csv readers to clean and null empty strings (#64)
+
+### Fix
+
+- add ability to strictly enforce date format in conformatteddate (#70)
+- add postcode type to model gen (#69)
+
## v0.6.2 (2026-03-09)
### Fix
diff --git a/README.md b/README.md
index af86b44..6b51ab6 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
-
+
Data Validation Engine
@@ -25,11 +25,11 @@ If you'd like more detailed documentation around these services the please read
The DVE has been designed in a way that's modular and can support users who just want to utilise specific "services" from the DVE (i.e. just the file transformation + data contract). Additionally, the DVE is designed to support different backend implementations. As part of the base installation of DVE, you will find backend support for `Spark` and `DuckDB`. So, if you need a `MySQL` backend implementation, you can implement this yourself. Given our organisations requirements, it will be unlikely that we add anymore specific backend implementations into the base package beyond Spark and DuckDB. So, if you are unable to implement this yourself, I would recommend reading the guidance on [requesting new features and raising bug reports here](#requesting-new-features-and-raising-bug-reports).
-Additionally, if you'd like to contribute a new backend implementation into the base DVE package, then please look at the [Contributing][#Contributing] section.
+Additionally, if you'd like to contribute a new backend implementation into the base DVE package, then please look at the [Contributing](#Contributing) section.
## Installation and usage
-The DVE is a Python package and can be installed using `pip`. As of release v0.6.1 we currently support Python 3.10 & 3.11, with Spark version 3.4 and DuckDB version of 1.1. In the future we will be looking to upgrade the DVE to working on a higher versions of Python, DuckDB and Spark.
+The DVE is a Python package and can be installed using package managers such as [pip](https://pypi.org/project/pip/). As of the latest release we support Python 3.10 & 3.11, with Spark v3.4 and DuckDB v1.1. In the future we will be looking to upgrade the DVE to working on a higher versions of Python, DuckDB and Spark.
If you're planning to use the Spark backend implementation, you will also need OpenJDK 11 installed.
@@ -38,12 +38,12 @@ Python dependencies are listed in `pyproject.toml`.
To install the DVE package you can simply install using a package manager such as [pip](https://pypi.org/project/pip/).
```
-pip install git+https://github.com/NHSDigital/data-validation-engine.git@v0.6.1
+pip install data-validation-engine
```
-Once you have installed the DVE you are ready to use it. For guidance on how to create your dischema JSON document (configuration), please read the [documentation](./docs/).
+*Note - Only versions >=0.6.2 are available on PyPi. For older versions please install directly from the git repo or build from source.*
-Please note - The long term aim is to make the DVE available via PyPi and Conda but we are not quite there yet. Once available this documentation will be updated to contain the new installation options.
+Once you have installed the DVE you are ready to use it. For guidance on how to create your dischema JSON document (configuration), please read the [documentation](./docs/).
Version 0.0.1 does support a working Python 3.7 installation. However, we will not be supporting any issues with that version of the DVE if you choose to use it. __Use at your own risk__.
@@ -60,10 +60,10 @@ Below is a list of features that we would like to implement or have been request
| ------- | --------------- | --------- |
| Open source release | 0.1.0 | Yes |
| Uplift to Python 3.11 | 0.2.0 | Yes |
-| Upgrade to Pydantic 2.0 | Not yet confirmed | No |
+| Upgrade to Pydantic 2.0 | Before 1.0 release | No |
| Create a more user friendly interface for building and modifying dischema files | Not yet confirmed | No |
-Beyond the Python upgrade, we cannot confirm the other features will be made available anytime soon. Therefore, if you have the interest and desire to make these features available, then please read the [Contributing](#contributing) section and get involved.
+Beyond the Python and Pydantic upgrade, we cannot confirm the other features will be made available anytime soon. Therefore, if you have the interest and desire to make these features available, then please read the [Contributing](#contributing) section and get involved.
## Contributing
Please see guidance [here](./CONTRIBUTE.md).
diff --git a/poetry.lock b/poetry.lock
index 7b1987a..ca9cf31 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2533,6 +2533,13 @@ optional = false
python-versions = ">=3.8"
groups = ["dev", "test"]
files = [
+ {file = "PyYAML-6.0.3-cp38-cp38-macosx_10_13_x86_64.whl", hash = "sha256:c2514fceb77bc5e7a2f7adfaa1feb2fb311607c9cb518dbc378688ec73d8292f"},
+ {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9c57bb8c96f6d1808c030b1687b9b5fb476abaa47f0db9c0101f5e9f394e97f4"},
+ {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:efd7b85f94a6f21e4932043973a7ba2613b059c4a000551892ac9f1d11f5baf3"},
+ {file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:22ba7cfcad58ef3ecddc7ed1db3409af68d023b7f940da23c6c2a1890976eda6"},
+ {file = "PyYAML-6.0.3-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:6344df0d5755a2c9a276d4473ae6b90647e216ab4757f8426893b5dd2ac3f369"},
+ {file = "PyYAML-6.0.3-cp38-cp38-win32.whl", hash = "sha256:3ff07ec89bae51176c0549bc4c63aa6202991da2d9a6129d7aef7f1407d3f295"},
+ {file = "PyYAML-6.0.3-cp38-cp38-win_amd64.whl", hash = "sha256:5cf4e27da7e3fbed4d6c3d8e797387aaad68102272f8f9752883bc32d61cb87b"},
{file = "pyyaml-6.0.3-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:214ed4befebe12df36bcc8bc2b64b396ca31be9304b8f59e25c11cf94a4c033b"},
{file = "pyyaml-6.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:02ea2dfa234451bbb8772601d7b8e426c2bfa197136796224e50e35a78777956"},
{file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b30236e45cf30d2b8e7b3e85881719e98507abed1011bf463a8fa23e9c3e98a8"},
@@ -2963,14 +2970,14 @@ files = [
[[package]]
name = "werkzeug"
-version = "3.1.5"
+version = "3.1.6"
description = "The comprehensive WSGI web application library."
optional = false
python-versions = ">=3.9"
groups = ["dev", "test"]
files = [
- {file = "werkzeug-3.1.5-py3-none-any.whl", hash = "sha256:5111e36e91086ece91f93268bb39b4a35c1e6f1feac762c9c822ded0a4e322dc"},
- {file = "werkzeug-3.1.5.tar.gz", hash = "sha256:6a548b0e88955dd07ccb25539d7d0cc97417ee9e179677d22c7041c8f078ce67"},
+ {file = "werkzeug-3.1.6-py3-none-any.whl", hash = "sha256:7ddf3357bb9564e407607f988f683d72038551200c704012bb9a4c523d42f131"},
+ {file = "werkzeug-3.1.6.tar.gz", hash = "sha256:210c6bede5a420a913956b4791a7f4d6843a43b6fcee4dfa08a65e93007d0d25"},
]
[package.dependencies]
@@ -3120,4 +3127,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.12"
-content-hash = "08ea1eedf25a896fdc21f03d04f4403d47d655fc90eb5eb310ff7cde7e3b7a6d"
+content-hash = "0b0b9c546709542f314418c15b0c6151803a006891b74808fd80b1f86ff28d94"
diff --git a/pyproject.toml b/pyproject.toml
index c4ca268..482c438 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "data-validation-engine"
-version = "0.6.2"
+version = "0.7.0"
description = "`nhs data validation engine` is a framework used to validate data"
authors = ["NHS England "]
readme = "README.md"
@@ -51,7 +51,7 @@ faker = "18.11.1"
behave = "1.3.3"
coverage = "7.11.0"
moto = {extras = ["s3"], version = "4.0.13"}
-Werkzeug = "3.1.5"
+Werkzeug = "3.1.6"
pytest = "8.4.2"
pytest-lazy-fixtures = "1.4.0" # switched from https://github.com/TvoroG/pytest-lazy-fixture as it's no longer supported
xlsx2csv = "0.8.2"
@@ -119,341 +119,70 @@ init-hook = "import sys; sys.path.append('./pylint_checkers')"
load-plugins = "check_typing_imports"
[tool.pylint.main]
-# Analyse import fallback blocks. This can be used to support both Python 2 and 3
-# compatible code, which means that the block might have code that exists only in
-# one or another interpreter, leading to false positives when analysed.
-# analyse-fallback-blocks =
-
-# Always return a 0 (non-error) status code, even if lint errors are found. This
-# is primarily useful in continuous integration scripts.
-# exit-zero =
-
-# A comma-separated list of package or module names from where C extensions may
-# be loaded. Extensions are loading into the active Python interpreter and may
-# run arbitrary code.
extension-pkg-allow-list = ["pyspark", "lxml", "pydantic"]
-
-# A comma-separated list of package or module names from where C extensions may
-# be loaded. Extensions are loading into the active Python interpreter and may
-# run arbitrary code. (This is an alternative name to extension-pkg-allow-list
-# for backward compatibility.)
-# extension-pkg-whitelist =
-
-# Return non-zero exit code if any of these messages/categories are detected,
-# even if score is above --fail-under value. Syntax same as enable. Messages
-# specified are enabled, while categories only check already-enabled messages.
-# fail-on =
-
-# Specify a score threshold to be exceeded before program exits with error.
fail-under = 10.0
-
-# Interpret the stdin as a python script, whose filename needs to be passed as
-# the module_or_package argument.
-# from-stdin =
-
-# Files or directories to be skipped. They should be base names, not paths.
ignore = ["CVS"]
-
-# Add files or directories matching the regex patterns to the ignore-list. The
-# regex matches against paths and can be in Posix or Windows format.
-# ignore-paths =
-
-# Files or directories matching the regex patterns are skipped. The regex matches
-# against base names, not paths. The default value ignores Emacs file locks
ignore-patterns = ["^\\.#"]
-
-# List of module names for which member attributes should not be checked (useful
-# for modules/projects where namespaces are manipulated during runtime and thus
-# existing member attributes cannot be deduced by static analysis). It supports
-# qualified module names, as well as Unix pattern matching.
-# ignored-modules =
-
-# Python code to execute, usually for sys.path manipulation such as
-# pygtk.require().
-# init-hook =
-
-# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
-# number of processors available to use, and will cap the count on Windows to
-# avoid hangs.
jobs = 1
-
-# Control the amount of potential inferred values when inferring a single object.
-# This can help the performance when dealing with large functions or complex,
-# nested conditions.
limit-inference-results = 100
-
-# List of plugins (as comma separated values of python module names) to load,
-# usually to register additional checkers.
-# load-plugins =
-
-# Pickle collected data for later comparisons.
persistent = true
-
-# Minimum Python version to use for version dependent checks. Will default to the
-# version used to run pylint.
-py-version = "3.11"
-
-# Discover python modules and packages in the file system subtree.
-# recursive =
-
-# When enabled, pylint would attempt to guess common misconfiguration and emit
-# user-friendly hints instead of false-positive error messages.
+py-version = "3.10"
suggestion-mode = true
-# Allow loading of arbitrary C extensions. Extensions are imported into the
-# active Python interpreter and may run arbitrary code.
-# unsafe-load-any-extension =
-
[tool.pylint.basic]
-# Naming style matching correct argument names.
argument-naming-style = "snake_case"
-
-# Regular expression matching correct argument names. Overrides argument-naming-
-# style. If left empty, argument names will be checked with the set naming style.
-# argument-rgx =
-
-# Naming style matching correct attribute names.
attr-naming-style = "snake_case"
-
-# Regular expression matching correct attribute names. Overrides attr-naming-
-# style. If left empty, attribute names will be checked with the set naming
-# style.
-# attr-rgx =
-
-# Bad variable names which should always be refused, separated by a comma.
bad-names = ["foo", "bar", "baz", "toto", "tutu", "tata"]
-
-# Bad variable names regexes, separated by a comma. If names match any regex,
-# they will always be refused
-# bad-names-rgxs =
-
-# Naming style matching correct class attribute names.
class-attribute-naming-style = "any"
-
-# Regular expression matching correct class attribute names. Overrides class-
-# attribute-naming-style. If left empty, class attribute names will be checked
-# with the set naming style.
-# class-attribute-rgx =
-
-# Naming style matching correct class constant names.
class-const-naming-style = "UPPER_CASE"
-
-# Regular expression matching correct class constant names. Overrides class-
-# const-naming-style. If left empty, class constant names will be checked with
-# the set naming style.
-# class-const-rgx =
-
-# Naming style matching correct class names.
class-naming-style = "PascalCase"
-
-# Regular expression matching correct class names. Overrides class-naming-style.
-# If left empty, class names will be checked with the set naming style.
-# class-rgx =
-
-# Naming style matching correct constant names.
const-naming-style = "UPPER_CASE"
-
-# Regular expression matching correct constant names. Overrides const-naming-
-# style. If left empty, constant names will be checked with the set naming style.
-# const-rgx =
-
-# Minimum line length for functions/classes that require docstrings, shorter ones
-# are exempt.
docstring-min-length = -1
-
-# Naming style matching correct function names.
function-naming-style = "snake_case"
-
-# Regular expression matching correct function names. Overrides function-naming-
-# style. If left empty, function names will be checked with the set naming style.
-# function-rgx =
-
-# Good variable names which should always be accepted, separated by a comma.
good-names = ["i", "j", "k", "ex", "Run", "df", "_", "f"]
-
-# Good variable names regexes, separated by a comma. If names match any regex,
-# they will always be accepted
-# good-names-rgxs =
-
-# Include a hint for the correct naming format with invalid-name.
-# include-naming-hint =
-
-# Naming style matching correct inline iteration names.
inlinevar-naming-style = "any"
-
-# Regular expression matching correct inline iteration names. Overrides
-# inlinevar-naming-style. If left empty, inline iteration names will be checked
-# with the set naming style.
-# inlinevar-rgx =
-
-# Naming style matching correct method names.
method-naming-style = "snake_case"
-
-# Regular expression matching correct method names. Overrides method-naming-
-# style. If left empty, method names will be checked with the set naming style.
-# method-rgx =
-
-# Naming style matching correct module names.
module-naming-style = "snake_case"
-
-# Regular expression matching correct module names. Overrides module-naming-
-# style. If left empty, module names will be checked with the set naming style.
-# module-rgx =
-
-# Colon-delimited sets of names that determine each other's naming style when the
-# name regexes allow several styles.
-# name-group =
-
-# Regular expression which should only match function or class names that do not
-# require a docstring.
no-docstring-rgx = "^_"
-
-# List of decorators that produce properties, such as abc.abstractproperty. Add
-# to this list to register other decorators that produce valid properties. These
-# decorators are taken in consideration only for invalid-name.
property-classes = ["abc.abstractproperty"]
-
-# Regular expression matching correct type variable names. If left empty, type
-# variable names will be checked with the set naming style.
-# typevar-rgx =
-
-# Naming style matching correct variable names.
variable-naming-style = "snake_case"
-# Regular expression matching correct variable names. Overrides variable-naming-
-# style. If left empty, variable names will be checked with the set naming style.
-# variable-rgx =
-
[tool.pylint.classes]
-# Warn about protected attribute access inside special methods
-# check-protected-access-in-special-methods =
-
-# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods = ["__init__", "__new__", "setUp", "__post_init__"]
-
-# List of member names, which should be excluded from the protected access
-# warning.
exclude-protected = ["_asdict", "_fields", "_replace", "_source", "_make"]
-
-# List of valid names for the first argument in a class method.
valid-classmethod-first-arg = ["cls"]
-
-# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg = ["cls"]
[tool.pylint.design]
-# List of regular expressions of class ancestor names to ignore when counting
-# public methods (see R0903)
-# exclude-too-few-public-methods =
-
-# List of qualified class names to ignore when counting class parents (see R0901)
-# ignored-parents =
-
-# Maximum number of arguments for function / method.
max-args = 8
-
-# Maximum number of attributes for a class (see R0902).
max-attributes = 10
-
-# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr = 5
-
-# Maximum number of branch for function / method body.
max-branches = 15
-
-# Maximum number of locals for function / method body.
max-locals = 20
-
-# Maximum number of parents for a class (see R0901).
max-parents = 7
-
-# Maximum number of public methods for a class (see R0904).
max-public-methods = 20
-
-# Maximum number of return / yield for function / method body.
max-returns = 6
-
-# Maximum number of statements in function / method body.
max-statements = 50
-
-# Minimum number of public methods for a class (see R0903).
min-public-methods = 2
[tool.pylint.exceptions]
-# Exceptions that will emit a warning when caught.
overgeneral-exceptions = ["BaseException", "Exception"]
[tool.pylint.format]
-# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
-# expected-line-ending-format =
-
-# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines = "^\\s*(# )??$"
-
-# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren = 4
-
-# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
-# tab).
indent-string = " "
-
-# Maximum number of characters on a single line.
max-line-length = 100
-
-# Maximum number of lines in a module.
max-module-lines = 1000
-# Allow the body of a class to be on the same line as the declaration if body
-# contains single statement.
-# single-line-class-stmt =
-
-# Allow the body of an if to be on the same line as the test if there is no else.
-# single-line-if-stmt =
-
[tool.pylint.imports]
-# List of modules that can be imported at any level, not just the top level one.
-# allow-any-import-level =
-
-# Allow wildcard imports from modules that define __all__.
-# allow-wildcard-with-all =
-
-# Deprecated modules which should not be used, separated by a comma.
-# deprecated-modules =
-
-# Output a graph (.gv or any supported image format) of external dependencies to
-# the given file (report RP0402 must not be disabled).
-# ext-import-graph =
-
-# Output a graph (.gv or any supported image format) of all (i.e. internal and
-# external) dependencies to the given file (report RP0402 must not be disabled).
-# import-graph =
-
-# Output a graph (.gv or any supported image format) of internal dependencies to
-# the given file (report RP0402 must not be disabled).
-# int-import-graph =
-
-# Force import order to recognize a module as part of the standard compatibility
-# libraries.
-# known-standard-library =
-
-# Force import order to recognize a module as part of a third party library.
known-third-party = ["enchant"]
-# Couples of modules and preferred modules, separated by a comma.
-# preferred-modules =
-
[tool.pylint.logging]
-# The type of string formatting that logging methods do. `old` means using %
-# formatting, `new` is for `{}` formatting.
logging-format-style = "old"
-
-# Logging modules to check that the string format arguments are in logging
-# function parameter format.
logging-modules = ["logging"]
[tool.pylint."messages control"]
-# Only show warnings with the listed confidence levels. Leave empty to show all.
-# Valid levels: HIGH, CONTROL_FLOW, INFERENCE, INFERENCE_FAILURE, UNDEFINED.
confidence = [
"HIGH",
"CONTROL_FLOW",
@@ -461,16 +190,6 @@ confidence = [
"INFERENCE_FAILURE",
"UNDEFINED",
]
-
-# Disable the message, report, category or checker with the given id(s). You can
-# either give multiple identifiers separated by comma (,) or put this option
-# multiple times (only on the command line, not in the configuration file where
-# it should appear only once). You can also use "--disable=all" to disable
-# everything first and then re-enable specific checks. For example, if you want
-# to run only the similarities checker, you can use "--disable=all
-# --enable=similarities". If you want to run only the classes checker, but have
-# no Warning level messages displayed, use "--disable=all --enable=classes
-# --disable=W".
disable = [
"raw-checker-failed",
"bad-inline-option",
@@ -483,188 +202,56 @@ disable = [
"logging-fstring-interpolation",
"fixme",
]
-
-# Enable the message, report, category or checker with the given id(s). You can
-# either give multiple identifier separated by comma (,) or put this option
-# multiple time (only on the command line, not in the configuration file where it
-# should appear only once). See also the "--disable" option for examples.
enable = ["c-extension-no-member"]
[tool.pylint.miscellaneous]
-# List of note tags to take in consideration, separated by a comma.
notes = ["FIXME", "XXX", "TODO"]
-# Regular expression of note tags to take in consideration.
-# notes-rgx =
-
[tool.pylint.refactoring]
-# Maximum number of nested blocks for function / method body
max-nested-blocks = 5
-
-# Complete name of functions that never returns. When checking for inconsistent-
-# return-statements if a never returning function is called then it will be
-# considered as an explicit return statement and no message will be printed.
never-returning-functions = ["sys.exit", "argparse.parse_error"]
[tool.pylint.reports]
-# Python expression which should return a score less than or equal to 10. You
-# have access to the variables 'fatal', 'error', 'warning', 'refactor',
-# 'convention', and 'info' which contain the number of messages in each category,
-# as well as 'statement' which is the total number of statements analyzed. This
-# score is used by the global evaluation report (RP0004).
evaluation = "max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10))"
-
-# Template used to display messages. This is a python new-style format string
-# used to format the message information. See doc for all details.
-# msg-template =
-
-# Set the output format. Available formats are text, parseable, colorized, json
-# and msvs (visual studio). You can also give a reporter class, e.g.
-# mypackage.mymodule.MyReporterClass.
-# output-format =
-
-# Tells whether to display a full report or only the messages.
-# reports =
-
-# Activate the evaluation score.
score = true
[tool.pylint.similarities]
-# Comments are removed from the similarity computation
ignore-comments = true
-
-# Docstrings are removed from the similarity computation
ignore-docstrings = true
-
-# Imports are removed from the similarity computation
ignore-imports = true
-
-# Signatures are removed from the similarity computation
ignore-signatures = true
-
-# Minimum lines number of a similarity.
min-similarity-lines = 10
[tool.pylint.spelling]
-# Limits count of emitted suggestions for spelling mistakes.
max-spelling-suggestions = 4
-
-# Spelling dictionary name. Available dictionaries: none. To make it work,
-# install the 'python-enchant' package.
-# spelling-dict =
-
-# List of comma separated words that should be considered directives if they
-# appear at the beginning of a comment and should not be checked.
spelling-ignore-comment-directives = "fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy:"
-# List of comma separated words that should not be checked.
-# spelling-ignore-words =
-
-# A path to a file that contains the private dictionary; one word per line.
-# spelling-private-dict-file =
-
-# Tells whether to store unknown words to the private dictionary (see the
-# --spelling-private-dict-file option) instead of raising a message.
-# spelling-store-unknown-words =
-
-[tool.pylint.string]
-# This flag controls whether inconsistent-quotes generates a warning when the
-# character used as a quote delimiter is used inconsistently within a module.
-# check-quote-consistency =
-
-# This flag controls whether the implicit-str-concat should generate a warning on
-# implicit string concatenation in sequences defined over several lines.
-# check-str-concat-over-line-jumps =
-
[tool.pylint.typecheck]
-# List of decorators that produce context managers, such as
-# contextlib.contextmanager. Add to this list to register other decorators that
-# produce valid context managers.
contextmanager-decorators = ["contextlib.contextmanager"]
-
-# List of members which are set dynamically and missed by pylint inference
-# system, and so shouldn't trigger E1101 when accessed. Python regular
-# expressions are accepted.
-# generated-members =
-
-# Tells whether missing members accessed in mixin class should be ignored. A
-# class is considered mixin if its name matches the mixin-class-rgx option.
-# Tells whether to warn about missing members when the owner of the attribute is
-# inferred to be None.
ignore-none = true
-
-# This flag controls whether pylint should warn about no-member and similar
-# checks whenever an opaque object is returned when inferring. The inference can
-# return multiple potential results while evaluating a Python object, but some
-# branches might not be evaluated, which results in partial inference. In that
-# case, it might be useful to still emit no-member and other checks for the rest
-# of the inferred objects.
ignore-on-opaque-inference = true
-
-# List of symbolic message names to ignore for Mixin members.
ignored-checks-for-mixins = [
"no-member",
"not-async-context-manager",
"not-context-manager",
"attribute-defined-outside-init",
]
-
-# List of class names for which member attributes should not be checked (useful
-# for classes with dynamically set attributes). This supports the use of
-# qualified names.
ignored-classes = [
"optparse.Values",
"thread._local",
"_thread._local",
"argparse.Namespace",
]
-
-# Show a hint with possible names when a member name was not found. The aspect of
-# finding the hint is based on edit distance.
missing-member-hint = true
-
-# The minimum edit distance a name should have in order to be considered a
-# similar match for a missing member name.
missing-member-hint-distance = 1
-
-# The total number of similar names that should be taken in consideration when
-# showing a hint for a missing member.
missing-member-max-choices = 1
-
-# Regex pattern to define which classes are considered mixins.
mixin-class-rgx = ".*[Mm]ixin"
-# List of decorators that change the signature of a decorated function.
-# signature-mutators =
-
[tool.pylint.variables]
-# List of additional names supposed to be defined in builtins. Remember that you
-# should avoid defining new builtins when possible.
-# additional-builtins =
-
-# Tells whether unused global variables should be treated as a violation.
allow-global-unused-variables = true
-
-# List of names allowed to shadow builtins
-# allowed-redefined-builtins =
-
-# List of strings which can identify a callback function by name. A callback name
-# must start or end with one of those strings.
callbacks = ["cb_", "_cb"]
-
-# A regular expression matching the name of dummy variables (i.e. expected to not
-# be used).
dummy-variables-rgx = "_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_"
-
-# Argument names that match this expression will be ignored. Default to name with
-# leading underscore.
ignored-argument-names = "_.*|^ignored_|^unused_"
-
-# Tells whether we should check for unused import in __init__ files.
-# init-import =
-
-# List of qualified module names which can have objects that can redefine
-# builtins.
redefining-builtins-modules = [
"six.moves",
"past.builtins",
diff --git a/src/dve/core_engine/backends/base/backend.py b/src/dve/core_engine/backends/base/backend.py
index 9d6abaa..29e8644 100644
--- a/src/dve/core_engine/backends/base/backend.py
+++ b/src/dve/core_engine/backends/base/backend.py
@@ -163,7 +163,7 @@ def apply(
return entities, get_parent(processing_errors_uri), successful
for entity_name, entity in entities.items():
- entities[entity_name] = self.step_implementations.add_row_id(entity)
+ entities[entity_name] = self.step_implementations.add_record_index(entity)
# TODO: Handle entity manager creation errors.
entity_manager = EntityManager(entities, reference_data)
@@ -172,9 +172,6 @@ def apply(
# TODO: and return uri to errors
_ = self.step_implementations.apply_rules(working_dir, entity_manager, rule_metadata)
- for entity_name, entity in entity_manager.entities.items():
- entity_manager.entities[entity_name] = self.step_implementations.drop_row_id(entity)
-
return entity_manager.entities, get_parent(dc_feedback_errors_uri), True
def process(
diff --git a/src/dve/core_engine/backends/base/contract.py b/src/dve/core_engine/backends/base/contract.py
index fc7da4d..948ff77 100644
--- a/src/dve/core_engine/backends/base/contract.py
+++ b/src/dve/core_engine/backends/base/contract.py
@@ -337,9 +337,9 @@ def read_raw_entities(
successful = True
for entity_name, resource in entity_locations.items():
reader_metadata = contract_metadata.reader_metadata[entity_name]
- extension = "." + (
- get_file_suffix(resource) or ""
- ).lower() # Already checked that extension supported.
+ extension = (
+ "." + (get_file_suffix(resource) or "").lower()
+ ) # Already checked that extension supported.
reader_config = reader_metadata[extension]
reader_type = get_reader(reader_config.reader)
@@ -369,6 +369,14 @@ def read_raw_entities(
return entities, dedup_messages(messages), successful
+ def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
+ """Add a record index to the entity"""
+ raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")
+
+ def drop_record_index(self, entity: EntityType, **kwargs) -> EntityType:
+ """Drop a record index from the entity"""
+ raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")
+
@abstractmethod
def apply_data_contract(
self,
diff --git a/src/dve/core_engine/backends/base/reader.py b/src/dve/core_engine/backends/base/reader.py
index 54abaa9..ac30111 100644
--- a/src/dve/core_engine/backends/base/reader.py
+++ b/src/dve/core_engine/backends/base/reader.py
@@ -127,6 +127,14 @@ def read_to_entity_type(
return reader_func(self, resource, entity_name, schema)
+ def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
+ """Add a record index to the entity"""
+ raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")
+
+ def drop_record_index(self, entity: EntityType, **kwargs) -> EntityType:
+ """Drop a record index to the entity"""
+ raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")
+
def write_parquet(
self,
entity: EntityType,
diff --git a/src/dve/core_engine/backends/base/rules.py b/src/dve/core_engine/backends/base/rules.py
index 97a6b4d..b66b3ae 100644
--- a/src/dve/core_engine/backends/base/rules.py
+++ b/src/dve/core_engine/backends/base/rules.py
@@ -135,15 +135,13 @@ def register_udfs(cls, **kwargs):
"""Method to register all custom dve functions for use during business rules application"""
raise NotImplementedError()
- @staticmethod
- def add_row_id(entity: EntityType) -> EntityType:
- """Add a unique row id field to an entity"""
- raise NotImplementedError()
+ def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
+ """Add a record index to the entity"""
+ raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")
- @staticmethod
- def drop_row_id(entity: EntityType) -> EntityType:
- """Add a unique row id field to an entity"""
- raise NotImplementedError()
+ def drop_record_index(self, entity: EntityType) -> EntityType:
+ """Drop a unique row id field to an entity"""
+ raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")
@classmethod
def _raise_notimplemented_error(
diff --git a/src/dve/core_engine/backends/implementations/duckdb/contract.py b/src/dve/core_engine/backends/implementations/duckdb/contract.py
index 860f06b..25fb8a7 100644
--- a/src/dve/core_engine/backends/implementations/duckdb/contract.py
+++ b/src/dve/core_engine/backends/implementations/duckdb/contract.py
@@ -29,6 +29,7 @@
)
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
duckdb_read_parquet,
+ duckdb_record_index,
duckdb_write_parquet,
get_duckdb_type_from_annotation,
relation_is_empty,
@@ -37,6 +38,7 @@
from dve.core_engine.backends.metadata.contract import DataContractMetadata
from dve.core_engine.backends.types import StageSuccessful
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityLocations
from dve.core_engine.validation import RowValidator, apply_row_validator_helper
@@ -54,6 +56,7 @@ def __call__(self, row: pd.Series):
return row # no op
+@duckdb_record_index
@duckdb_write_parquet
@duckdb_read_parquet
class DuckDBDataContract(BaseDataContract[DuckDBPyRelation]):
@@ -144,10 +147,12 @@ def apply_data_contract(
fld.name: get_duckdb_type_from_annotation(fld.annotation)
for fld in entity_fields.values()
}
+ ddb_schema[RECORD_INDEX_COLUMN_NAME] = get_duckdb_type_from_annotation(int)
polars_schema: dict[str, PolarsType] = {
fld.name: get_polars_type_from_annotation(fld.annotation)
for fld in entity_fields.values()
}
+ polars_schema[RECORD_INDEX_COLUMN_NAME] = get_polars_type_from_annotation(int)
if relation_is_empty(relation):
self.logger.warning(f"+ Empty relation for {entity_name}")
empty_df = pl.DataFrame([], schema=polars_schema) # type: ignore # pylint: disable=W0612
@@ -170,6 +175,9 @@ def apply_data_contract(
self.logger.info(f"Data contract found {msg_count} issues in {entity_name}")
+ if RECORD_INDEX_COLUMN_NAME not in relation.columns:
+ relation = self.add_record_index(relation)
+
casting_statements = [
(
self.generate_ddb_cast_statement(column, dtype)
diff --git a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py
index 843ee40..f5b0fe9 100644
--- a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py
+++ b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py
@@ -12,13 +12,14 @@
import duckdb.typing as ddbtyp
import numpy as np
-from duckdb import DuckDBPyConnection, DuckDBPyRelation
+from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression
from duckdb.typing import DuckDBPyType
from pandas import DataFrame
from pydantic import BaseModel
from typing_extensions import Annotated, get_args, get_origin, get_type_hints
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.type_hints import URI
from dve.parser.file_handling.service import LocalFilesystemImplementation, _get_implementation
@@ -286,3 +287,29 @@ def duckdb_rel_to_dictionaries(
cols: tuple[str] = tuple(entity.columns) # type: ignore
while rows := entity.fetchmany(batch_size):
yield from (dict(zip(cols, rw)) for rw in rows)
+
+
+def _add_duckdb_record_index(
+ self, entity: DuckDBPyRelation # pylint: disable=W0613
+) -> DuckDBPyRelation:
+ """Add record index to duckdb relation"""
+ if RECORD_INDEX_COLUMN_NAME in entity.columns:
+ return entity
+
+ return entity.select(f"*, row_number() OVER () as {RECORD_INDEX_COLUMN_NAME}")
+
+
+def _drop_duckdb_record_index(
+ self, entity: DuckDBPyRelation # pylint: disable=W0613
+) -> DuckDBPyRelation:
+ """Drop record index from duckdb relation"""
+ if RECORD_INDEX_COLUMN_NAME not in entity.columns:
+ return entity
+ return entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME]))
+
+
+def duckdb_record_index(cls):
+ """Class decorator to add record index methods for duckdb implementations"""
+ setattr(cls, "add_record_index", _add_duckdb_record_index)
+ setattr(cls, "drop_record_index", _drop_duckdb_record_index)
+ return cls
diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
index ff65d9f..2018010 100644
--- a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
+++ b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
@@ -6,23 +6,32 @@
import duckdb as ddb
import polars as pl
-from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection, read_csv
+from duckdb import (
+ DuckDBPyConnection,
+ DuckDBPyRelation,
+ StarExpression,
+ default_connection,
+ read_csv,
+)
from pydantic import BaseModel
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
+ duckdb_record_index,
duckdb_write_parquet,
get_duckdb_type_from_annotation,
)
from dve.core_engine.backends.implementations.duckdb.types import SQLType
from dve.core_engine.backends.readers.utilities import check_csv_header_expected
-from dve.core_engine.backends.utilities import get_polars_type_from_annotation
+from dve.core_engine.backends.utilities import get_polars_type_from_annotation, polars_record_index
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import get_content_length
+@duckdb_record_index
@duckdb_write_parquet
class DuckDBCSVReader(BaseFileReader):
"""A reader for CSV files including the ability to compare the passed model
@@ -46,6 +55,7 @@ def __init__(
field_check: bool = False,
field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch",
field_check_error_message: Optional[str] = "The submitted header is missing fields",
+ null_empty_strings: bool = False,
**_,
):
self.header = header
@@ -55,6 +65,7 @@ def __init__(
self.field_check = field_check
self.field_check_error_code = field_check_error_code
self.field_check_error_message = field_check_error_message
+ self.null_empty_strings = null_empty_strings
super().__init__()
@@ -109,9 +120,19 @@ def read_to_relation( # pylint: disable=unused-argument
}
reader_options["columns"] = ddb_schema
- return read_csv(resource, **reader_options)
+ rel = self.add_record_index(read_csv(resource, **reader_options, parallel=False))
+
+ if self.null_empty_strings:
+ cleaned_cols = ",".join(
+ [f"NULLIF(TRIM({c}), '') as {c}" for c in reader_options["columns"].keys()]
+ )
+ rel = rel.select(cleaned_cols)
+
+ return rel
+
+@polars_record_index
class PolarsToDuckDBCSVReader(DuckDBCSVReader):
"""
Utilises the polars lazy csv reader which is then converted into a DuckDBPyRelation object.
@@ -145,7 +166,19 @@ def read_to_relation( # pylint: disable=unused-argument
# there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85
# redundant
- df = pl.scan_csv(resource, **reader_options).select(list(polars_types.keys())) # type: ignore # pylint: disable=W0612
+ df = self.add_record_index( # pylint: disable=W0612
+ pl.scan_csv(resource, **reader_options).select( # type: ignore
+ list(polars_types.keys())
+ )
+ )
+
+ if self.null_empty_strings:
+ pl_exprs = [
+ pl.col(c).str.strip_chars().replace("", None)
+ for c in df.columns
+ if c != RECORD_INDEX_COLUMN_NAME
+ ] + [pl.col(RECORD_INDEX_COLUMN_NAME)]
+ df = df.select(pl_exprs)
return ddb.sql("SELECT * FROM df")
@@ -189,8 +222,10 @@ def __init__(
def read_to_relation( # pylint: disable=unused-argument
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
) -> DuckDBPyRelation:
- entity = super().read_to_relation(resource=resource, entity_name=entity_name, schema=schema)
- entity = entity.distinct()
+ entity: DuckDBPyRelation = super().read_to_relation(
+ resource=resource, entity_name=entity_name, schema=schema
+ )
+ entity = entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME])).distinct()
no_records = entity.shape[0]
if no_records != 1:
@@ -219,4 +254,4 @@ def read_to_relation( # pylint: disable=unused-argument
],
)
- return entity
+ return entity.select(f"*, 1 as {RECORD_INDEX_COLUMN_NAME}")
diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py
index b1a3ad4..8afb5a4 100644
--- a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py
+++ b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py
@@ -9,6 +9,7 @@
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
+ duckdb_record_index,
duckdb_write_parquet,
get_duckdb_type_from_annotation,
)
@@ -16,6 +17,7 @@
from dve.core_engine.type_hints import URI, EntityName
+@duckdb_record_index
@duckdb_write_parquet
class DuckDBJSONReader(BaseFileReader):
"""A reader for JSON files"""
@@ -47,4 +49,6 @@ def read_to_relation( # pylint: disable=unused-argument
for fld in schema.__fields__.values()
}
- return read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore
+ return self.add_record_index(
+ read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore
+ )
diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py
index a955946..a10998c 100644
--- a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py
+++ b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py
@@ -11,10 +11,15 @@
from dve.core_engine.backends.exceptions import MessageBearingError
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet
from dve.core_engine.backends.readers.xml import XMLStreamReader
-from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
+from dve.core_engine.backends.utilities import (
+ get_polars_type_from_annotation,
+ polars_record_index,
+ stringify_model,
+)
from dve.core_engine.type_hints import URI
+@polars_record_index
@duckdb_write_parquet
class DuckDBXMLStreamReader(XMLStreamReader):
"""A reader for XML files"""
@@ -39,7 +44,9 @@ def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseMod
for fld in stringify_model(schema).__fields__.values()
}
- _lazy_frame = pl.LazyFrame(
- data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema
+ _lazy_frame = self.add_record_index(
+ pl.LazyFrame(
+ data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema
+ )
)
return self.ddb_connection.sql("select * from _lazy_frame")
diff --git a/src/dve/core_engine/backends/implementations/duckdb/rules.py b/src/dve/core_engine/backends/implementations/duckdb/rules.py
index e556c6b..7ed775c 100644
--- a/src/dve/core_engine/backends/implementations/duckdb/rules.py
+++ b/src/dve/core_engine/backends/implementations/duckdb/rules.py
@@ -23,6 +23,7 @@
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
DDBStruct,
duckdb_read_parquet,
+ duckdb_record_index,
duckdb_rel_to_dictionaries,
duckdb_write_parquet,
get_all_registered_udfs,
@@ -51,13 +52,13 @@
SemiJoin,
TableUnion,
)
-from dve.core_engine.constants import ROWID_COLUMN_NAME
from dve.core_engine.functions import implementations as functions
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.templating import template_object
from dve.core_engine.type_hints import Messages
+@duckdb_record_index
@duckdb_write_parquet
@duckdb_read_parquet
class DuckDBStepImplementations(BaseStepImplementations[DuckDBPyRelation]):
@@ -106,20 +107,6 @@ def register_udfs( # type: ignore
connection.sql(_sql)
return cls(connection=connection, **kwargs)
- @staticmethod
- def add_row_id(entity: DuckDBPyRelation) -> DuckDBPyRelation:
- """Adds a row identifier to the Relation"""
- if ROWID_COLUMN_NAME not in entity.columns:
- entity = entity.project(f"*, ROW_NUMBER() OVER () as {ROWID_COLUMN_NAME}")
- return entity
-
- @staticmethod
- def drop_row_id(entity: DuckDBPyRelation) -> DuckDBPyRelation:
- """Drops the row identiifer from a Relation"""
- if ROWID_COLUMN_NAME in entity.columns:
- entity = entity.select(StarExpression(exclude=[ROWID_COLUMN_NAME]))
- return entity
-
def add(self, entities: DuckDBEntities, *, config: ColumnAddition) -> Messages:
"""A transformation step which adds a column to an entity."""
entity: DuckDBPyRelation = entities[config.entity_name]
diff --git a/src/dve/core_engine/backends/implementations/spark/backend.py b/src/dve/core_engine/backends/implementations/spark/backend.py
index 742e9e3..3999b62 100644
--- a/src/dve/core_engine/backends/implementations/spark/backend.py
+++ b/src/dve/core_engine/backends/implementations/spark/backend.py
@@ -11,7 +11,7 @@
from dve.core_engine.backends.implementations.spark.rules import SparkStepImplementations
from dve.core_engine.backends.implementations.spark.spark_helpers import get_type_from_annotation
from dve.core_engine.backends.implementations.spark.types import SparkEntities
-from dve.core_engine.constants import ROWID_COLUMN_NAME
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.loggers import get_child_logger, get_logger
from dve.core_engine.models import SubmissionInfo
from dve.core_engine.type_hints import URI, EntityParquetLocations
@@ -58,7 +58,7 @@ def write_entities_to_parquet(
locations = {}
self.logger.info(f"Writing entities to the output location: {cache_prefix}")
for entity_name, entity in entities.items():
- entity = entity.drop(ROWID_COLUMN_NAME)
+ entity = entity.drop(RECORD_INDEX_COLUMN_NAME)
self.logger.info(f"Entity: {entity_name}")
diff --git a/src/dve/core_engine/backends/implementations/spark/contract.py b/src/dve/core_engine/backends/implementations/spark/contract.py
index d8078bd..7215113 100644
--- a/src/dve/core_engine/backends/implementations/spark/contract.py
+++ b/src/dve/core_engine/backends/implementations/spark/contract.py
@@ -10,7 +10,7 @@
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.functions import col, lit
-from pyspark.sql.types import ArrayType, DataType, MapType, StringType, StructType
+from pyspark.sql.types import ArrayType, DataType, LongType, MapType, StructField, StructType
from dve.common.error_utils import (
BackgroundMessageWriter,
@@ -28,19 +28,21 @@
df_is_empty,
get_type_from_annotation,
spark_read_parquet,
+ spark_record_index,
spark_write_parquet,
)
from dve.core_engine.backends.implementations.spark.types import SparkEntities
from dve.core_engine.backends.metadata.contract import DataContractMetadata
from dve.core_engine.backends.readers import CSVFileReader
from dve.core_engine.backends.types import StageSuccessful
-from dve.core_engine.constants import ROWID_COLUMN_NAME
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.type_hints import URI, EntityLocations, EntityName
COMPLEX_TYPES: set[type[DataType]] = {StructType, ArrayType, MapType}
"""Spark types indicating complex types."""
+@spark_record_index
@spark_write_parquet
@spark_read_parquet
class SparkDataContract(BaseDataContract[DataFrame]):
@@ -84,6 +86,7 @@ def create_entity_from_py_iterator(
schema=get_type_from_annotation(schema),
)
+ # pylint: disable=R0915
def apply_data_contract(
self,
working_dir: URI,
@@ -100,14 +103,16 @@ def apply_data_contract(
successful = True
for entity_name, record_df in entities.items():
spark_schema = get_type_from_annotation(contract_metadata.schemas[entity_name])
-
+ spark_schema.add(StructField(RECORD_INDEX_COLUMN_NAME, LongType()))
if df_is_empty(record_df):
self.logger.warning(f"+ Empty dataframe for {entity_name}")
entities[entity_name] = self.spark_session.createDataFrame( # type: ignore
[], schema=spark_schema
- ).withColumn(ROWID_COLUMN_NAME, lit(None).cast(StringType()))
+ )
continue
+ if RECORD_INDEX_COLUMN_NAME not in record_df.columns:
+ record_df = self.add_record_index(record_df)
if self.debug:
# Note, the count will realise the dataframe, so only do this
diff --git a/src/dve/core_engine/backends/implementations/spark/readers/csv.py b/src/dve/core_engine/backends/implementations/spark/readers/csv.py
index 95db464..8c2b137 100644
--- a/src/dve/core_engine/backends/implementations/spark/readers/csv.py
+++ b/src/dve/core_engine/backends/implementations/spark/readers/csv.py
@@ -3,6 +3,7 @@
from collections.abc import Iterator
from typing import Any, Optional
+import pyspark.sql.functions as psf
from pydantic import BaseModel
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType
@@ -11,12 +12,14 @@
from dve.core_engine.backends.exceptions import EmptyFileError
from dve.core_engine.backends.implementations.spark.spark_helpers import (
get_type_from_annotation,
+ spark_record_index,
spark_write_parquet,
)
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import get_content_length
+@spark_record_index
@spark_write_parquet
class SparkCSVReader(BaseFileReader):
"""A Spark reader for CSV files."""
@@ -30,6 +33,7 @@ def __init__(
header: bool = True,
multi_line: bool = False,
encoding: str = "utf-8-sig",
+ null_empty_strings: bool = False,
spark_session: Optional[SparkSession] = None,
**_,
) -> None:
@@ -40,6 +44,7 @@ def __init__(
self.quote_char = quote_char
self.header = header
self.multi_line = multi_line
+ self.null_empty_strings = null_empty_strings
self.spark_session = spark_session if spark_session else SparkSession.builder.getOrCreate() # type: ignore # pylint: disable=C0301
super().__init__()
@@ -70,8 +75,15 @@ def read_to_dataframe(
"multiLine": self.multi_line,
}
- return (
+ df = self.add_record_index(
self.spark_session.read.format("csv")
.options(**kwargs) # type: ignore
.load(resource, schema=spark_schema)
)
+
+ if self.null_empty_strings:
+ df = df.select(
+ *[psf.trim(psf.col(c.name)).alias(c.name) for c in spark_schema.fields]
+ ).replace("", None)
+
+ return df
diff --git a/src/dve/core_engine/backends/implementations/spark/readers/json.py b/src/dve/core_engine/backends/implementations/spark/readers/json.py
index c336ee0..0b4a09f 100644
--- a/src/dve/core_engine/backends/implementations/spark/readers/json.py
+++ b/src/dve/core_engine/backends/implementations/spark/readers/json.py
@@ -11,12 +11,14 @@
from dve.core_engine.backends.exceptions import EmptyFileError
from dve.core_engine.backends.implementations.spark.spark_helpers import (
get_type_from_annotation,
+ spark_record_index,
spark_write_parquet,
)
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import get_content_length
+@spark_record_index
@spark_write_parquet
class SparkJSONReader(BaseFileReader):
"""A Spark reader for JSON files."""
@@ -59,7 +61,7 @@ def read_to_dataframe(
"multiline": self.multi_line,
}
- return (
+ return self.add_record_index(
self.spark_session.read.format("json")
.options(**kwargs) # type: ignore
.load(resource, schema=spark_schema)
diff --git a/src/dve/core_engine/backends/implementations/spark/readers/xml.py b/src/dve/core_engine/backends/implementations/spark/readers/xml.py
index 30d6756..39433b3 100644
--- a/src/dve/core_engine/backends/implementations/spark/readers/xml.py
+++ b/src/dve/core_engine/backends/implementations/spark/readers/xml.py
@@ -17,6 +17,7 @@
from dve.core_engine.backends.implementations.spark.spark_helpers import (
df_is_empty,
get_type_from_annotation,
+ spark_record_index,
spark_write_parquet,
)
from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader
@@ -28,6 +29,7 @@
"""The mode to use when parsing XML files with Spark."""
+@spark_record_index
@spark_write_parquet
class SparkXMLStreamReader(XMLStreamReader):
"""An XML stream reader that adds a method to read to a dataframe"""
@@ -45,12 +47,15 @@ def read_to_dataframe(
if not self.spark:
self.spark = SparkSession.builder.getOrCreate() # type: ignore
spark_schema = get_type_from_annotation(schema)
- return self.spark.createDataFrame( # type: ignore
- list(self.read_to_py_iterator(resource, entity_name, schema)),
- schema=spark_schema,
+ return self.add_record_index(
+ self.spark.createDataFrame( # type: ignore
+ list(self.read_to_py_iterator(resource, entity_name, schema)),
+ schema=spark_schema,
+ )
)
+@spark_record_index
@spark_write_parquet
class SparkXMLReader(BasicXMLFileReader): # pylint: disable=too-many-instance-attributes
"""A reader for XML files built atop Spark-XML."""
@@ -177,7 +182,7 @@ def read_to_dataframe(
df = self._add_missing_columns(df, spark_schema)
df = self._sanitise_columns(df)
- return df
+ return self.add_record_index(df)
def _add_missing_columns(self, df: DataFrame, fields: Iterable[StructField]) -> DataFrame:
for field in fields:
diff --git a/src/dve/core_engine/backends/implementations/spark/rules.py b/src/dve/core_engine/backends/implementations/spark/rules.py
index 15afa09..5d1cfe0 100644
--- a/src/dve/core_engine/backends/implementations/spark/rules.py
+++ b/src/dve/core_engine/backends/implementations/spark/rules.py
@@ -15,6 +15,7 @@
get_all_registered_udfs,
object_to_spark_literal,
spark_read_parquet,
+ spark_record_index,
spark_write_parquet,
)
from dve.core_engine.backends.implementations.spark.types import (
@@ -43,13 +44,13 @@
SemiJoin,
TableUnion,
)
-from dve.core_engine.constants import ROWID_COLUMN_NAME
from dve.core_engine.functions import implementations as functions
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.templating import template_object
from dve.core_engine.type_hints import Messages
+@spark_record_index
@spark_write_parquet
@spark_read_parquet
class SparkStepImplementations(BaseStepImplementations[DataFrame]):
@@ -100,18 +101,6 @@ def register_udfs(
return cls(spark_session=spark_session, **kwargs)
- @staticmethod
- def add_row_id(entity: DataFrame) -> DataFrame:
- if ROWID_COLUMN_NAME not in entity.columns:
- entity = entity.withColumn(ROWID_COLUMN_NAME, sf.expr("uuid()"))
- return entity
-
- @staticmethod
- def drop_row_id(entity: DataFrame) -> DataFrame:
- if ROWID_COLUMN_NAME in entity.columns:
- entity = entity.drop(ROWID_COLUMN_NAME)
- return entity
-
def add(self, entities: SparkEntities, *, config: ColumnAddition) -> Messages:
entity: DataFrame = entities[config.entity_name]
entity = entity.withColumn(config.column_name, sf.expr(config.expression))
diff --git a/src/dve/core_engine/backends/implementations/spark/spark_helpers.py b/src/dve/core_engine/backends/implementations/spark/spark_helpers.py
index 7cb7b17..07a4a04 100644
--- a/src/dve/core_engine/backends/implementations/spark/spark_helpers.py
+++ b/src/dve/core_engine/backends/implementations/spark/spark_helpers.py
@@ -17,14 +17,16 @@
from delta.exceptions import ConcurrentAppendException, DeltaConcurrentModificationException
from pydantic import BaseModel
from pydantic.types import ConstrainedDecimal
-from pyspark.sql import DataFrame, SparkSession
+from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql import functions as sf
from pyspark.sql import types as st
from pyspark.sql.column import Column
from pyspark.sql.functions import lit, udf
+from pyspark.sql.types import LongType, StructField, StructType
from typing_extensions import Annotated, Protocol, TypedDict, get_args, get_origin, get_type_hints
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.type_hints import URI
# It would be really nice if there was a more parameterisable
@@ -410,3 +412,30 @@ def _inner(*args, **kwargs):
return _inner
return _wrapper
+
+
+def _add_spark_record_index(self, entity: DataFrame) -> DataFrame: # pylint: disable=W0613
+ """Add a record index to spark dataframe"""
+ if RECORD_INDEX_COLUMN_NAME in entity.columns:
+ return entity
+ schema: StructType = entity.schema
+ schema.add(StructField(RECORD_INDEX_COLUMN_NAME, LongType()))
+ return (
+ entity.rdd.zipWithIndex()
+ .map(lambda x: Row(**x[0].asDict(True), RECORD_INDEX_COLUMN_NAME=x[1] + 1))
+ .toDF(schema=schema)
+ )
+
+
+def _drop_spark_record_index(self, entity: DataFrame) -> DataFrame: # pylint: disable=W0613
+ """Drop record index from spark dataframe"""
+ if RECORD_INDEX_COLUMN_NAME not in entity.columns:
+ return entity
+ return entity.drop(RECORD_INDEX_COLUMN_NAME)
+
+
+def spark_record_index(cls):
+ """Class decorator to add record index methods for spark implementations"""
+ setattr(cls, "add_record_index", _add_spark_record_index)
+ setattr(cls, "drop_record_index", _drop_spark_record_index)
+ return cls
diff --git a/src/dve/core_engine/backends/readers/csv.py b/src/dve/core_engine/backends/readers/csv.py
index bc05b58..edd6bf0 100644
--- a/src/dve/core_engine/backends/readers/csv.py
+++ b/src/dve/core_engine/backends/readers/csv.py
@@ -16,6 +16,7 @@
MissingHeaderError,
)
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.type_hints import EntityName
from dve.parser.file_handling import get_content_length, open_stream
from dve.parser.file_handling.implementations.file import file_uri_to_local_path
@@ -204,7 +205,9 @@ def read_to_py_iterator(
)
coerce_func = partial(self._coerce, field_names=field_names)
- yield from map(coerce_func, reader)
+ for idx, record in enumerate(map(coerce_func, reader), start=1):
+ record[RECORD_INDEX_COLUMN_NAME] = idx # type: ignore
+ yield record
def write_parquet( # type: ignore
self,
@@ -223,6 +226,7 @@ def write_parquet( # type: ignore
fld.name: get_polars_type_from_annotation(fld.annotation)
for fld in stringify_model(schema).__fields__.values()
}
+ polars_schema[RECORD_INDEX_COLUMN_NAME] = get_polars_type_from_annotation(int)
pl.LazyFrame(data=entity, schema=polars_schema).sink_parquet(
path=target_location, compression="snappy"
diff --git a/src/dve/core_engine/backends/readers/xml.py b/src/dve/core_engine/backends/readers/xml.py
index e7480f1..4620402 100644
--- a/src/dve/core_engine/backends/readers/xml.py
+++ b/src/dve/core_engine/backends/readers/xml.py
@@ -14,6 +14,7 @@
from dve.core_engine.backends.exceptions import EmptyFileError
from dve.core_engine.backends.readers.xml_linting import run_xmllint
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.loggers import get_logger
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityName
@@ -310,7 +311,9 @@ def read_to_py_iterator(
raise EmptyFileError(f"File at {resource!r} is empty")
with open_stream(resource, "rb") as stream:
- yield from self._parse_xml(stream, schema)
+ for idx, record in enumerate(self._parse_xml(stream, schema), start=1):
+ record[RECORD_INDEX_COLUMN_NAME] = idx # type: ignore
+ yield record
def write_parquet( # type: ignore
self,
@@ -329,6 +332,7 @@ def write_parquet( # type: ignore
fld.name: get_polars_type_from_annotation(fld.type_)
for fld in stringify_model(schema).__fields__.values()
}
+ polars_schema[RECORD_INDEX_COLUMN_NAME] = get_polars_type_from_annotation(int)
pl.LazyFrame(data=entity, schema=polars_schema).sink_parquet(
path=target_location, compression="snappy", **kwargs
)
diff --git a/src/dve/core_engine/backends/utilities.py b/src/dve/core_engine/backends/utilities.py
index 9261806..d94d0bf 100644
--- a/src/dve/core_engine/backends/utilities.py
+++ b/src/dve/core_engine/backends/utilities.py
@@ -11,6 +11,7 @@
from pydantic import BaseModel, create_model
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.type_hints import Messages
# We need to rely on a Python typing implementation detail in Python <= 3.7.
@@ -175,3 +176,24 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType:
if polars_type:
return polars_type
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")
+
+
+def _add_polars_record_index(self, entity: pl.LazyFrame) -> pl.LazyFrame: # pylint: disable=W0613
+ """Add a record index to polars dataframe"""
+ if RECORD_INDEX_COLUMN_NAME in entity.columns:
+ return entity
+ return entity.with_row_index(name=RECORD_INDEX_COLUMN_NAME, offset=1)
+
+
+def _drop_polars_record_index(self, entity: pl.LazyFrame) -> pl.LazyFrame: # pylint: disable=W0613
+ """Drop record index from polars dataframe"""
+ if RECORD_INDEX_COLUMN_NAME not in entity.columns:
+ return entity
+ return entity.drop(RECORD_INDEX_COLUMN_NAME)
+
+
+def polars_record_index(cls):
+ """Class decorator to add record index methods for polars implementations"""
+ setattr(cls, "add_record_index", _add_polars_record_index)
+ setattr(cls, "drop_record_index", _drop_polars_record_index)
+ return cls
diff --git a/src/dve/core_engine/constants.py b/src/dve/core_engine/constants.py
index d452c9b..a2a4a65 100644
--- a/src/dve/core_engine/constants.py
+++ b/src/dve/core_engine/constants.py
@@ -1,7 +1,7 @@
"""Constant values used in mutiple places."""
-ROWID_COLUMN_NAME: str = "__rowid__"
-"""The name of the column containing the row ID for each entity."""
+RECORD_INDEX_COLUMN_NAME: str = "__record_index__"
+"""The name of the column containing the record index for each entity."""
CONTRACT_ERROR_VALUE_FIELD_NAME: str = "__error_value"
"""The name of the field that can be used to extract the field value that caused
diff --git a/src/dve/core_engine/engine.py b/src/dve/core_engine/engine.py
index 28a2ac5..c5d1ba9 100644
--- a/src/dve/core_engine/engine.py
+++ b/src/dve/core_engine/engine.py
@@ -15,7 +15,7 @@
from dve.core_engine.backends.implementations.spark.types import SparkEntities
from dve.core_engine.configuration.base import BaseEngineConfig
from dve.core_engine.configuration.v1 import V1EngineConfig
-from dve.core_engine.constants import ROWID_COLUMN_NAME
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.loggers import get_child_logger, get_logger
from dve.core_engine.models import EngineRunValidation, SubmissionInfo
from dve.core_engine.type_hints import EntityName, JSONstring
@@ -200,7 +200,7 @@ def _write_entity_outputs(self, entities: SparkEntities) -> SparkEntities:
self.main_log.info(f"Writing entities to the output location: {self.output_prefix_uri}")
for entity_name, entity in entities.items():
- entity = entity.drop(ROWID_COLUMN_NAME)
+ entity = entity.drop(RECORD_INDEX_COLUMN_NAME)
self.main_log.info(f"Entity: {entity_name} {type(entity)}")
diff --git a/src/dve/core_engine/message.py b/src/dve/core_engine/message.py
index f2a4e52..627ae3a 100644
--- a/src/dve/core_engine/message.py
+++ b/src/dve/core_engine/message.py
@@ -13,7 +13,7 @@
from pydantic import BaseModel, ValidationError, validator
from pydantic.dataclasses import dataclass
-from dve.core_engine.constants import CONTRACT_ERROR_VALUE_FIELD_NAME, ROWID_COLUMN_NAME
+from dve.core_engine.constants import CONTRACT_ERROR_VALUE_FIELD_NAME, RECORD_INDEX_COLUMN_NAME
from dve.core_engine.templating import template_object
from dve.core_engine.type_hints import (
EntityName,
@@ -116,6 +116,8 @@ class UserMessage:
"The offending values"
Category: ErrorCategory
"The category of error"
+ RecordIndex: Optional[int] = None
+ "The record index where the error occurred (if applicable)"
@property
def is_informational(self) -> bool:
@@ -187,6 +189,7 @@ class FeedbackMessage: # pylint: disable=too-many-instance-attributes
"ErrorMessage",
"ErrorCode",
"ReportingField",
+ "RecordIndex",
"Value",
"Category",
]
@@ -224,15 +227,6 @@ def _validate_error_location(cls, value: Any) -> Optional[str]:
return str(value)
- @validator("record")
- def _strip_rowid( # pylint: disable=no-self-argument
- cls, value: Optional[dict[str, Any]]
- ) -> Optional[dict[str, Any]]:
- """Strip the row ID column from the record, if present."""
- if isinstance(value, dict):
- value.pop(ROWID_COLUMN_NAME, None)
- return value
-
@property
def is_critical(self) -> bool:
"""Whether the error is unrecoverable."""
@@ -333,6 +327,7 @@ def to_row(
error_message,
self.error_code,
self.reporting_field_name or reporting_field,
+ (self.record.get(RECORD_INDEX_COLUMN_NAME) if self.record else None),
value,
self.category,
)
diff --git a/src/dve/core_engine/type_hints.py b/src/dve/core_engine/type_hints.py
index afb6d9d..3112e28 100644
--- a/src/dve/core_engine/type_hints.py
+++ b/src/dve/core_engine/type_hints.py
@@ -135,6 +135,8 @@
"""The value that caused the error."""
ErrorCategory = Literal["Blank", "Wrong format", "Bad value", "Bad file"]
"""A string indicating the category of the error."""
+RecordIndex = Optional[int]
+"""The record index that the error relates to (if applicable)"""
MessageTuple = tuple[
Optional[EntityName],
@@ -146,6 +148,7 @@
ErrorMessage,
ErrorCode,
ReportingField,
+ RecordIndex,
Optional[FieldValue],
Optional[ErrorCategory],
]
diff --git a/src/dve/metadata_parser/domain_types.py b/src/dve/metadata_parser/domain_types.py
index 6c4a5c4..545429f 100644
--- a/src/dve/metadata_parser/domain_types.py
+++ b/src/dve/metadata_parser/domain_types.py
@@ -260,6 +260,8 @@ class ConFormattedDate(dt.date):
DATE_FORMAT: ClassVar[Optional[str]] = None
"""The specific format of the date as a Python 'strptime' string."""
+ strict: ClassVar[Optional[bool]] = False
+ """Add additional check to ensure that date supplied meets the date format exactly."""
ge: ClassVar[Optional[dt.date]] = None
"""The earliest date allowed."""
le: ClassVar[Optional[dt.date]] = None
@@ -280,6 +282,8 @@ def validate(cls, value: Optional[Union[dt.date, str]]) -> Optional[dt.date]:
elif cls.DATE_FORMAT is not None:
try:
date = dt.datetime.strptime(value, cls.DATE_FORMAT).date()
+ if cls.strict and (date.strftime(cls.DATE_FORMAT) != value):
+ raise ValueError
except ValueError as err:
raise ValueError(
f"Unable to parse provided datetime in format {cls.DATE_FORMAT}"
@@ -317,6 +321,7 @@ def __get_validators__(cls) -> Iterator[classmethod]:
@validate_arguments
def conformatteddate(
date_format: Optional[str] = None,
+ strict: Optional[bool] = False,
ge: Optional[dt.date] = None, # pylint: disable=invalid-name
le: Optional[dt.date] = None, # pylint: disable=invalid-name
gt: Optional[dt.date] = None, # pylint: disable=invalid-name
@@ -331,6 +336,7 @@ def conformatteddate(
dict_ = ConFormattedDate.__dict__.copy()
dict_["DATE_FORMAT"] = date_format
+ dict_["strict"] = strict
dict_["ge"] = ge
dict_["le"] = le
dict_["gt"] = gt
diff --git a/src/dve/metadata_parser/model_generator.py b/src/dve/metadata_parser/model_generator.py
index 7681b7f..2706458 100644
--- a/src/dve/metadata_parser/model_generator.py
+++ b/src/dve/metadata_parser/model_generator.py
@@ -65,7 +65,7 @@ def constr(
"conint": pyd.validate_arguments(pyd.conint),
"condate": pyd.validate_arguments(pyd.condate),
"condecimal": pyd.validate_arguments(pyd.condecimal),
- "postcode": domain_types.Postcode,
+ "postcode": domain_types.postcode,
"nhsnumber": domain_types.NHSNumber,
"permissivenhsno": domain_types.permissive_nhs_number(),
"alphanumeric": domain_types.alphanumeric,
diff --git a/src/dve/metadata_parser/models.py b/src/dve/metadata_parser/models.py
index 18cdc68..73e6b5c 100644
--- a/src/dve/metadata_parser/models.py
+++ b/src/dve/metadata_parser/models.py
@@ -371,6 +371,7 @@ class Config(pyd.BaseConfig):
fields = self.aliases # type: ignore
anystr_strip_whitespace = True
allow_population_by_field_name = True
+ extra = pyd.Extra.ignore
return pyd.create_model( # type: ignore
model_name,
diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py
index 46a89c2..b14ada1 100644
--- a/src/dve/pipeline/pipeline.py
+++ b/src/dve/pipeline/pipeline.py
@@ -432,7 +432,9 @@ def apply_data_contract(
for path, _ in fh.iter_prefix(read_from):
entity_locations[fh.get_file_name(path)] = path
- entities[fh.get_file_name(path)] = self.data_contract.read_parquet(path)
+ entities[fh.get_file_name(path)] = self.data_contract.add_record_index(
+ self.data_contract.read_parquet(path)
+ )
key_fields = {model: conf.reporting_fields for model, conf in model_config.items()}
@@ -563,8 +565,9 @@ def apply_business_rules(
for parquet_uri, _ in fh.iter_prefix(contract):
file_name = fh.get_file_name(parquet_uri)
- entities[file_name] = self.step_implementations.read_parquet(parquet_uri) # type: ignore
- entities[file_name] = self.step_implementations.add_row_id(entities[file_name]) # type: ignore
+ entities[file_name] = self.step_implementations.add_record_index( # type: ignore
+ self.step_implementations.read_parquet(parquet_uri) # type: ignore
+ )
entities[f"Original{file_name}"] = self.step_implementations.read_parquet(parquet_uri) # type: ignore
sub_info_entity = (
@@ -742,6 +745,7 @@ def _get_error_dataframes(self, submission_id: str):
pl.col("ErrorCode").alias("Error_Code"), # type: ignore
pl.col("ReportingField").alias("Data_Item"), # type: ignore
pl.col("ErrorMessage").alias("Error"), # type: ignore
+ pl.col("RecordIndex").alias("Record_Index"),
pl.col("Value"), # type: ignore
pl.col("Key").alias("ID"), # type: ignore
pl.col("Category"), # type: ignore
diff --git a/src/dve/reporting/error_report.py b/src/dve/reporting/error_report.py
index 8852fcb..9e947bf 100644
--- a/src/dve/reporting/error_report.py
+++ b/src/dve/reporting/error_report.py
@@ -18,6 +18,7 @@
"Error_Code": Utf8(),
"Data_Item": Utf8(),
"Error": Utf8(),
+ "Record_Index": pl.UInt32(),
"Value": Utf8(),
"ID": Utf8(),
"Category": Utf8(),
diff --git a/tests/features/books.feature b/tests/features/books.feature
index f13658a..60cc5db 100644
--- a/tests/features/books.feature
+++ b/tests/features/books.feature
@@ -4,33 +4,6 @@ Feature: Pipeline tests using the books dataset
This tests submissions using nested, complex JSON datasets with arrays, and
introduces more complex transformations that require aggregation.
- Scenario: Validate complex nested XML data (spark)
- Given I submit the books file nested_books.XML for processing
- And A spark pipeline is configured with schema file 'nested_books.dischema.json'
- And I add initial audit entries for the submission
- Then the latest audit record for the submission is marked with processing status file_transformation
- When I run the file transformation phase
- Then the header entity is stored as a parquet after the file_transformation phase
- And the nested_books entity is stored as a parquet after the file_transformation phase
- And the latest audit record for the submission is marked with processing status data_contract
- When I run the data contract phase
- Then there is 1 record rejection from the data_contract phase
- And the header entity is stored as a parquet after the data_contract phase
- And the nested_books entity is stored as a parquet after the data_contract phase
- And the latest audit record for the submission is marked with processing status business_rules
- When I run the business rules phase
- Then The rules restrict "nested_books" to 3 qualifying records
- And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books"
- And the nested_books entity is stored as a parquet after the business_rules phase
- And the latest audit record for the submission is marked with processing status error_report
- When I run the error report phase
- Then An error report is produced
- And The statistics entry for the submission shows the following information
- | parameter | value |
- | record_count | 4 |
- | number_record_rejections | 2 |
- | number_warnings | 0 |
-
Scenario: Validate complex nested XML data (duckdb)
Given I submit the books file nested_books.XML for processing
And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json'
diff --git a/tests/features/movies.feature b/tests/features/movies.feature
index d737574..fa041ea 100644
--- a/tests/features/movies.feature
+++ b/tests/features/movies.feature
@@ -21,18 +21,18 @@ Feature: Pipeline tests using the movies dataset
When I run the data contract phase
Then there are 3 record rejections from the data_contract phase
And there are errors with the following details and associated error_count from the data_contract phase
- | Entity | ErrorCode | ErrorMessage | error_count |
- | movies | BLANKYEAR | year not provided | 1 |
- | movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
- | movies | DODGYDATE | date_joined value is not valid: daft_date | 1 |
+ | Entity | ErrorCode | ErrorMessage | RecordIndex | error_count |
+ | movies | BLANKYEAR | year not provided | 2 | 1 |
+ | movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 | 1 |
+ | movies | DODGYDATE | date_joined value is not valid: daft_date | 1 | 1 |
And the movies entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "movies" to 4 qualifying records
And there are errors with the following details and associated error_count from the business_rules phase
- | ErrorCode | ErrorMessage | error_count |
- | LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 |
- | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
+ | ErrorCode | ErrorMessage | RecordIndex | error_count |
+ | LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 4 | 1 |
+ | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | 1 |
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Then An error report is produced
@@ -57,18 +57,18 @@ Feature: Pipeline tests using the movies dataset
When I run the data contract phase
Then there are 3 record rejections from the data_contract phase
And there are errors with the following details and associated error_count from the data_contract phase
- | Entity | ErrorCode | ErrorMessage | error_count |
- | movies | BLANKYEAR | year not provided | 1 |
- | movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
- | movies | DODGYDATE | date_joined value is not valid: daft_date | 1 |
+ | Entity | ErrorCode | ErrorMessage | RecordIndex | error_count |
+ | movies | BLANKYEAR | year not provided | 2 | 1 |
+ | movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 | 1 |
+ | movies | DODGYDATE | date_joined value is not valid: daft_date | 1 | 1 |
And the movies entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "movies" to 4 qualifying records
And there are errors with the following details and associated error_count from the business_rules phase
- | ErrorCode | ErrorMessage | error_count |
- | LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 |
- | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
+ | ErrorCode | ErrorMessage | RecordIndex | error_count |
+ | LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 4 | 1 |
+ | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | 1 |
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Then An error report is produced
diff --git a/tests/features/steps/utilities.py b/tests/features/steps/utilities.py
index aa9adc1..58edc67 100644
--- a/tests/features/steps/utilities.py
+++ b/tests/features/steps/utilities.py
@@ -23,6 +23,7 @@
"ErrorType",
"ErrorLocation",
"ErrorMessage",
+ "RecordIndex",
"ReportingField",
"Category",
]
diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py
index 0300808..d382ecb 100644
--- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py
+++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py
@@ -14,6 +14,7 @@
from dve.core_engine.backends.implementations.duckdb.readers.xml import DuckDBXMLStreamReader
from dve.core_engine.backends.metadata.contract import DataContractMetadata, ReaderConfig
from dve.core_engine.backends.utilities import stringify_model
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.message import UserMessage
from dve.core_engine.type_hints import URI
from dve.core_engine.validation import RowValidator
@@ -93,10 +94,12 @@ def test_duckdb_data_contract_csv(temp_csv_file):
data_contract: DuckDBDataContract = DuckDBDataContract(connection)
entities, feedback_errors_uri, stage_successful = data_contract.apply_data_contract(get_parent(uri.as_posix()), entities, entity_locations, dc_meta)
rel: DuckDBPyRelation = entities.get("test_ds")
- assert dict(zip(rel.columns, rel.dtypes)) == {
+ expected_schema = {
fld.name: str(get_duckdb_type_from_annotation(fld.annotation))
for fld in mdl.__fields__.values()
}
+ expected_schema[RECORD_INDEX_COLUMN_NAME] = get_duckdb_type_from_annotation(int)
+ assert dict(zip(rel.columns, rel.dtypes)) == expected_schema
assert not get_resource_exists(feedback_errors_uri)
assert stage_successful
@@ -195,10 +198,12 @@ def test_duckdb_data_contract_xml(temp_xml_file):
fld.name: get_duckdb_type_from_annotation(fld.type_)
for fld in header_model.__fields__.values()
}
+ header_expected_schema[RECORD_INDEX_COLUMN_NAME] = get_duckdb_type_from_annotation(int)
class_data_expected_schema: Dict[str, DuckDBPyType] = {
fld.name: get_duckdb_type_from_annotation(fld.type_)
for fld in class_model.__fields__.values()
}
+ class_data_expected_schema[RECORD_INDEX_COLUMN_NAME] = get_duckdb_type_from_annotation(int)
class_data_rel: DuckDBPyRelation = entities.get("test_class_info")
assert not get_resource_exists(feedback_errors_uri)
assert header_rel.count("*").fetchone()[0] == 1
@@ -223,7 +228,7 @@ def test_ddb_data_contract_read_and_write_basic_parquet(
"id": "VARCHAR",
"datefield": "VARCHAR",
"strfield": "VARCHAR",
- "datetimefield": "VARCHAR",
+ "datetimefield": "VARCHAR"
}
# check processes entity
contract_dict = json.loads(contract_meta).get("contract")
@@ -266,6 +271,7 @@ def test_ddb_data_contract_read_and_write_basic_parquet(
"datefield": "DATE",
"strfield": "VARCHAR",
"datetimefield": "TIMESTAMP",
+ RECORD_INDEX_COLUMN_NAME: get_duckdb_type_from_annotation(int)
}
@@ -282,7 +288,7 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet):
"id": "VARCHAR",
"strfield": "VARCHAR",
"datetimefield": "VARCHAR",
- "subfield": "STRUCT(id VARCHAR, substrfield VARCHAR, subarrayfield VARCHAR[])[]",
+ "subfield": "STRUCT(id VARCHAR, substrfield VARCHAR, subarrayfield VARCHAR[])[]"
}
# check processes entity
contract_dict = json.loads(contract_meta).get("contract")
@@ -325,6 +331,7 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet):
"strfield": "VARCHAR",
"datetimefield": "TIMESTAMP",
"subfield": "STRUCT(id BIGINT, substrfield VARCHAR, subarrayfield DATE[])[]",
+ RECORD_INDEX_COLUMN_NAME: get_duckdb_type_from_annotation(int)
}
def test_duckdb_data_contract_custom_error_details(nested_all_string_parquet_w_errors,
diff --git a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py
index 921c9be..70c6b9c 100644
--- a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py
+++ b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py
@@ -16,6 +16,7 @@
from dve.core_engine.backends.implementations.spark.contract import SparkDataContract
from dve.core_engine.backends.metadata.contract import DataContractMetadata, ReaderConfig
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.message import UserMessage
from dve.core_engine.type_hints import URI
from dve.core_engine.validation import RowValidator
@@ -89,6 +90,7 @@ def test_spark_data_contract_read_and_write_basic_parquet(
StructField("datefield", DateType()),
StructField("strfield", StringType()),
StructField("datetimefield", TimestampType()),
+ StructField(RECORD_INDEX_COLUMN_NAME, LongType())
]
)
@@ -173,6 +175,7 @@ def test_spark_data_contract_read_nested_parquet(nested_all_string_parquet):
)
),
),
+ StructField(RECORD_INDEX_COLUMN_NAME, LongType())
]
)
diff --git a/tests/test_core_engine/test_backends/test_readers/test_csv.py b/tests/test_core_engine/test_backends/test_readers/test_csv.py
index 4cd7e07..0737ad2 100644
--- a/tests/test_core_engine/test_backends/test_readers/test_csv.py
+++ b/tests/test_core_engine/test_backends/test_readers/test_csv.py
@@ -12,6 +12,7 @@
from dve.core_engine.backends.exceptions import EmptyFileError, FieldCountMismatch
from dve.core_engine.backends.readers import CSVFileReader
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from ....conftest import get_test_file_path
from ....fixtures import temp_dir
@@ -25,10 +26,13 @@ def planet_location() -> Iterator[str]:
@pytest.fixture(scope="function")
def planet_data() -> Iterator[Dict[str, Dict[str, str]]]:
- """The planet data, as loaded by Python's default parser."""
+ """The expected planet data after reading, as loaded by Python's default parser."""
with get_test_file_path("planets/planets.csv").open("r", encoding="utf-8") as file:
reader = csv.DictReader(file)
- yield {row["planet"]: row for row in reader}
+ data = {}
+ for idx, row in enumerate(reader, start=1):
+ data[row["planet"]] = {RECORD_INDEX_COLUMN_NAME: idx, **row}
+ yield data
@pytest.fixture(scope="function")
@@ -138,7 +142,7 @@ def test_csv_file_get_subset(
# Keep only keys in the subset from the source
subset_keys = set(PlanetsSubset.__fields__.keys())
for data in planet_data.values():
- to_pop = set(data.keys()) - subset_keys
+ to_pop = set(data.keys()) - subset_keys - {RECORD_INDEX_COLUMN_NAME}
for key in to_pop:
del data[key]
@@ -160,7 +164,7 @@ def test_csv_file_get_subset_add_missing(
# Keep only keys in the subset from the source
subset_keys = set(PlanetsSubset.__fields__.keys())
for data in planet_data.values():
- to_pop = set(data.keys()) - subset_keys
+ to_pop = set(data.keys()) - subset_keys - {RECORD_INDEX_COLUMN_NAME}
for key in to_pop:
del data[key]
data["random_null"] = None # type: ignore
@@ -182,7 +186,10 @@ def test_csv_file_filled_from_provided(
results = list(reader.read_to_py_iterator(planet_location, "", Planets))
parsed = {row["planet"]: row for row in results}
del parsed["planet"]
+ for rec in parsed.values():
+ rec[RECORD_INDEX_COLUMN_NAME] -= 1
assert parsed == planet_data
+
def test_csv_file_raises_missing_cols(self, planet_location: str):
"""
@@ -235,7 +242,7 @@ def test_csv_file_can_be_pipe_delimited(
"""Test that a pipe-delimited CSV file can be parsed."""
reader = CSVFileReader(delimiter="|")
results = list(reader.read_to_py_iterator(pipe_delimited_location, "", BasicModel))
- assert results == [{"ColumnA": "1", "ColumnB": "2", "ColumnC": "3"}]
+ assert results == [{"ColumnA": "1", "ColumnB": "2", "ColumnC": "3", RECORD_INDEX_COLUMN_NAME: 1}]
@pytest.mark.parametrize(["schema"], [(None,), (Planets,)])
def test_base_csv_reader_parquet_write(
@@ -252,5 +259,5 @@ def test_base_csv_reader_parquet_write(
reader.write_parquet(entity=entity, target_location=target_location, schema=schema)
assert sorted(
pd.read_parquet(target_location).to_dict(orient="records"),
- key=lambda x: x.get("planet"),
- ) == sorted([dict(val) for val in planet_data.values()], key=lambda x: x.get("planet"))
+ key=lambda x: x.get(RECORD_INDEX_COLUMN_NAME),
+ ) == sorted([dict(val) for val in planet_data.values()], key=lambda x: x.get(RECORD_INDEX_COLUMN_NAME))
diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py
index 8f9d40d..f364045 100644
--- a/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py
+++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py
@@ -2,6 +2,7 @@
from pathlib import Path
from tempfile import TemporaryDirectory
+import polars as pl
import pytest
from duckdb import DuckDBPyRelation, default_connection
from pydantic import BaseModel
@@ -16,6 +17,7 @@
PolarsToDuckDBCSVReader,
)
from dve.core_engine.backends.utilities import stringify_model
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from tests.test_core_engine.test_backends.fixtures import duckdb_connection
# pylint: disable=C0116
@@ -33,6 +35,10 @@ class SimpleHeaderModel(BaseModel):
header_2: str
+class VerySimpleModel(BaseModel):
+ test_col: str
+
+
@pytest.fixture
def temp_dir():
with TemporaryDirectory(prefix="ddb_test_csv_reader") as temp_dir:
@@ -69,21 +75,25 @@ def test_ddb_csv_reader_all_str(temp_csv_file):
rel: DuckDBPyRelation = reader.read_to_entity_type(
DuckDBPyRelation, str(uri), "test", stringify_model(mdl)
)
- assert rel.columns == header.split(",")
- assert dict(zip(rel.columns, rel.dtypes)) == {fld: "VARCHAR" for fld in header.split(",")}
- assert rel.fetchall() == [tuple(str(val) for val in rw) for rw in data]
+ expected_dtypes = {**{fld: "VARCHAR" for fld in header.split(",")}, RECORD_INDEX_COLUMN_NAME: "BIGINT"}
+ expected_data = [(*[str(val) for val in rw], idx) for idx, rw in enumerate(data, start=1)]
+ assert rel.columns == header.split(",") + [RECORD_INDEX_COLUMN_NAME]
+ assert dict(zip(rel.columns, rel.dtypes)) == expected_dtypes
+ assert rel.fetchall() == expected_data
def test_ddb_csv_reader_cast(temp_csv_file):
uri, header, data, mdl = temp_csv_file
reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection)
rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, str(uri), "test", mdl)
- assert rel.columns == header.split(",")
- assert dict(zip(rel.columns, rel.dtypes)) == {
+ expected_dtypes = {**{
fld.name: str(get_duckdb_type_from_annotation(fld.annotation))
for fld in mdl.__fields__.values()
- }
- assert rel.fetchall() == [tuple(rw) for rw in data]
+ }, RECORD_INDEX_COLUMN_NAME: get_duckdb_type_from_annotation(int)}
+ expected_data = [(*rw, idx) for idx, rw in enumerate(data, start=1)]
+ assert rel.columns == header.split(",") + [RECORD_INDEX_COLUMN_NAME]
+ assert dict(zip(rel.columns, rel.dtypes)) == expected_dtypes
+ assert rel.fetchall() == expected_data
def test_ddb_csv_write_parquet(temp_csv_file):
@@ -95,7 +105,7 @@ def test_ddb_csv_write_parquet(temp_csv_file):
target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix()
reader.write_parquet(rel, target_loc)
parquet_rel = reader._connection.read_parquet(target_loc)
- assert parquet_rel.df().to_dict(orient="records") == rel.df().to_dict(orient="records")
+ assert sorted(parquet_rel.df().to_dict(orient="records"), key=lambda x: x.get(RECORD_INDEX_COLUMN_NAME)) == sorted([{**rec, RECORD_INDEX_COLUMN_NAME: idx} for idx, rec in enumerate(rel.df().to_dict(orient="records"), start=1)], key=lambda x: x.get(RECORD_INDEX_COLUMN_NAME))
def test_ddb_csv_read_empty_file(temp_empty_csv_file):
@@ -157,3 +167,74 @@ def test_ddb_csv_repeating_header_reader_with_more_than_one_set_of_distinct_valu
with pytest.raises(MessageBearingError):
reader.read_to_relation(str(file_uri), "test", SimpleHeaderModel)
+
+
+def test_DuckDBCSVReader_with_null_empty_strings(temp_dir):
+ test_df = pl.DataFrame({"test_col": ["fine", " ", " "]})
+ file_uri = temp_dir.joinpath("test_empty_string1.csv").as_posix()
+ test_df.write_csv(
+ file_uri,
+ include_header=True,
+ quote_style="always"
+ )
+
+ reader = DuckDBCSVReader(
+ header=True,
+ delim=",",
+ quotechar='"',
+ connection=default_connection,
+ null_empty_strings=True,
+ )
+
+ entity = reader.read_to_relation(file_uri, "test", VerySimpleModel)
+
+ assert entity.shape[0] == 3
+ assert entity.filter("test_col IS NULL").shape[0] == 2
+
+
+def test_DuckDBCSVRepeatingHeaderReader_with_null_empty_strings(temp_dir):
+ test_df = pl.DataFrame({
+ "header_1": ["fine",], "header_2": [" "],
+ })
+ file_uri = temp_dir.joinpath("test_empty_string2.csv").as_posix()
+ test_df.write_csv(
+ file_uri,
+ include_header=True,
+ quote_style="always"
+ )
+
+ reader = DuckDBCSVRepeatingHeaderReader(
+ header=True,
+ delim=",",
+ quotechar='"',
+ connection=default_connection,
+ null_empty_strings=True,
+ )
+
+ entity = reader.read_to_relation(file_uri, "test", SimpleHeaderModel)
+
+ assert entity.shape[0] == 1
+ assert entity.filter("header_2 IS NULL").shape[0] == 1
+
+
+def test_PolarsToDuckDBCSVReader_with_null_empty_strings(temp_dir):
+ test_df = pl.DataFrame({"test_col": ["fine", " ", " "]})
+ file_uri = temp_dir.joinpath("test_empty_string3.csv").as_posix()
+ test_df.write_csv(
+ file_uri,
+ include_header=True,
+ quote_style="always"
+ )
+
+ reader = PolarsToDuckDBCSVReader(
+ header=True,
+ delim=",",
+ quotechar='"',
+ connection=default_connection,
+ null_empty_strings=True,
+ )
+
+ entity = reader.read_to_relation(file_uri, "test", VerySimpleModel)
+
+ assert entity.shape[0] == 3
+ assert entity.filter("test_col IS NULL").shape[0] == 2
diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py
index c326fef..6942c6a 100644
--- a/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py
+++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py
@@ -13,6 +13,7 @@
)
from dve.core_engine.backends.implementations.duckdb.readers.json import DuckDBJSONReader
from dve.core_engine.backends.utilities import stringify_model
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from tests.test_core_engine.test_backends.fixtures import duckdb_connection
@@ -59,9 +60,9 @@ def test_ddb_json_reader_all_str(temp_json_file):
rel: DuckDBPyRelation = reader.read_to_entity_type(
DuckDBPyRelation, uri.as_posix(), "test", stringify_model(mdl)
)
- assert rel.columns == expected_fields
- assert dict(zip(rel.columns, rel.dtypes)) == {fld: "VARCHAR" for fld in expected_fields}
- assert rel.fetchall() == [tuple(str(val) for val in rw.values()) for rw in data]
+ assert rel.columns == expected_fields + [RECORD_INDEX_COLUMN_NAME]
+ assert dict(zip(rel.columns, rel.dtypes)) == {**{fld: "VARCHAR" for fld in expected_fields}, RECORD_INDEX_COLUMN_NAME: "BIGINT"}
+ assert rel.fetchall() == [(*[str(val) for val in rw.values()], idx) for idx, rw in enumerate(data, start=1)]
def test_ddb_json_reader_cast(temp_json_file):
@@ -70,15 +71,15 @@ def test_ddb_json_reader_cast(temp_json_file):
reader = DuckDBJSONReader()
rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri.as_posix(), "test", mdl)
- assert rel.columns == expected_fields
- assert dict(zip(rel.columns, rel.dtypes)) == {
+ assert rel.columns == expected_fields + [RECORD_INDEX_COLUMN_NAME]
+ assert dict(zip(rel.columns, rel.dtypes)) == {**{
fld.name: str(get_duckdb_type_from_annotation(fld.annotation))
for fld in mdl.__fields__.values()
- }
- assert rel.fetchall() == [tuple(rw.values()) for rw in data]
+ }, RECORD_INDEX_COLUMN_NAME: "BIGINT"}
+ assert rel.fetchall() == [(*rw.values(), idx) for idx, rw in enumerate(data, start = 1)]
-def test_ddb_csv_write_parquet(temp_json_file):
+def test_ddb_json_write_parquet(temp_json_file):
uri, _, mdl = temp_json_file
reader = DuckDBJSONReader()
rel: DuckDBPyRelation = reader.read_to_entity_type(
diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py
index dad5b06..585f7b7 100644
--- a/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py
+++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py
@@ -9,6 +9,7 @@
from pydantic import BaseModel
from dve.core_engine.backends.implementations.duckdb.readers.xml import DuckDBXMLStreamReader
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
@pytest.fixture
@@ -19,15 +20,15 @@ def temp_dir():
@pytest.fixture
def temp_xml_file(temp_dir: Path):
- header_data: Dict[str, str] = {
+ header_data: list[dict[str, str]] = [{
"school_name": "Meadow Fields",
"category": "Primary",
"headteacher": "Mrs Smith",
- }
- class_data: Dict[str, Dict[str, str]] = {
+ }]
+ class_data: list[dict[str, Dict[str, str]]] = [{
"year_1": {"class_size": "10", "teacher": "Mrs Armitage"},
"year_2": {"class_size": "12", "teacher": "Mr Barney"},
- }
+ }]
class HeaderModel(BaseModel):
school_name: str
@@ -44,16 +45,17 @@ class ClassDataModel(BaseModel):
root = ET.Element("root")
header = ET.SubElement(root, "Header")
- for nm, val in header_data.items():
+ for nm, val in header_data[0].items():
_tag = ET.SubElement(header, nm)
_tag.text = val
- data = ET.SubElement(root, "ClassData")
- for nm, val in class_data.items():
- _parent_tag = ET.SubElement(data, nm)
- for sub_nm, sub_val in val.items():
- _child_tag = ET.SubElement(_parent_tag, sub_nm)
- _child_tag.text = sub_val
+ for dta in class_data:
+ data = ET.SubElement(root, "ClassData")
+ for nm, val in dta.items():
+ _parent_tag = ET.SubElement(data, nm)
+ for sub_nm, sub_val in val.items():
+ _child_tag = ET.SubElement(_parent_tag, sub_nm)
+ _child_tag.text = sub_val
with open(temp_dir.joinpath("test.xml"), mode="wb") as xml_fle:
xml_fle.write(ET.tostring(root))
@@ -76,10 +78,12 @@ def test_ddb_xml_reader_all_str(temp_xml_file):
class_rel: DuckDBPyRelation = class_reader.read_to_relation(
uri.as_uri(), "class_data", class_data_model
)
+ expected_header = [{**recs, RECORD_INDEX_COLUMN_NAME: idx} for idx, recs in enumerate(header_data, start=1)]
+ expected_class = [{**recs, RECORD_INDEX_COLUMN_NAME: idx} for idx, recs in enumerate(class_data, start=1)]
assert header_rel.count("*").fetchone()[0] == 1
- assert header_rel.df().to_dict("records")[0] == header_data
+ assert header_rel.df().to_dict("records") == expected_header
assert class_rel.count("*").fetchone()[0] == 1
- assert class_rel.df().to_dict("records")[0] == class_data
+ assert class_rel.df().to_dict("records") == expected_class
def test_ddb_xml_reader_write_parquet(temp_xml_file):
diff --git a/tests/test_core_engine/test_backends/test_readers/test_spark/test_spark.py b/tests/test_core_engine/test_backends/test_readers/test_spark/test_spark.py
new file mode 100644
index 0000000..d5dccc1
--- /dev/null
+++ b/tests/test_core_engine/test_backends/test_readers/test_spark/test_spark.py
@@ -0,0 +1,56 @@
+"""Test Spark readers"""
+
+# pylint: disable=W0621
+# pylint: disable=C0116
+# pylint: disable=C0103
+# pylint: disable=C0115
+
+import tempfile
+from pathlib import Path
+
+import polars as pl
+import pytest
+from pydantic import BaseModel
+from pyspark.sql import DataFrame, Row, SparkSession
+from pyspark.sql.types import StringType, StructField, StructType
+
+from dve.core_engine.backends.implementations.spark.readers.csv import SparkCSVReader
+
+
+class SparkCSVTestModel(BaseModel):
+ test_col: str
+
+
+@pytest.fixture
+def spark_null_csv_resource():
+ test_df = pl.DataFrame({"test_col": ["fine", " ", " "]})
+
+ with tempfile.TemporaryDirectory() as tdir:
+ resource_uri = Path(tdir, "test_spark_csv_reader.csv").as_posix()
+ test_df.write_csv(resource_uri, include_header=True, quote_style="always")
+
+ yield resource_uri
+
+
+def test_SparkCSVReader_clean_empty_strings(spark: SparkSession, spark_null_csv_resource):
+ resource_uri = spark_null_csv_resource
+ expected_df = spark.createDataFrame(
+ [
+ Row(
+ test_col="fine",
+ ),
+ Row(
+ test_col=None,
+ ),
+ Row(test_col=None),
+ ],
+ StructType([StructField("test_field", StringType())]),
+ )
+
+ reader = SparkCSVReader(null_empty_strings=True, spark_session=spark)
+
+ result_df: DataFrame = reader.read_to_dataframe(
+ resource=resource_uri, entity_name="test", schema=SparkCSVTestModel
+ )
+
+ assert result_df.exceptAll(expected_df).count() == 0
diff --git a/tests/test_core_engine/test_backends/test_readers/test_spark_json.py b/tests/test_core_engine/test_backends/test_readers/test_spark_json.py
index 3cbecb8..24674ca 100644
--- a/tests/test_core_engine/test_backends/test_readers/test_spark_json.py
+++ b/tests/test_core_engine/test_backends/test_readers/test_spark_json.py
@@ -7,13 +7,14 @@
import pytest
from pydantic import BaseModel
from pyspark.sql import DataFrame
-from pyspark.sql.types import StructType, StructField, StringType
+from pyspark.sql.types import LongType, StructType, StructField, StringType
from dve.core_engine.backends.implementations.spark.spark_helpers import (
get_type_from_annotation,
)
from dve.core_engine.backends.implementations.spark.readers.json import SparkJSONReader
from dve.core_engine.backends.utilities import stringify_model
+from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
class SimpleModel(BaseModel):
@@ -54,25 +55,25 @@ class SimpleModel(BaseModel):
def test_spark_json_reader_all_str(temp_json_file):
uri, data, mdl = temp_json_file
- expected_fields = [fld for fld in mdl.__fields__]
+ expected_fields = [fld for fld in mdl.__fields__] + [RECORD_INDEX_COLUMN_NAME]
reader = SparkJSONReader()
df: DataFrame = reader.read_to_entity_type(
DataFrame, uri.as_posix(), "test", stringify_model(mdl)
)
assert df.columns == expected_fields
- assert df.schema == StructType([StructField(nme, StringType()) for nme in expected_fields])
- assert [rw.asDict() for rw in df.collect()] == [{k: str(v) for k, v in rw.items()} for rw in data]
+ assert df.schema == StructType([StructField(nme, StringType() if not nme == RECORD_INDEX_COLUMN_NAME else LongType()) for nme in expected_fields])
+ assert [rw.asDict() for rw in df.collect()] == [{**{k: str(v) for k, v in rw.items()}, RECORD_INDEX_COLUMN_NAME: idx} for idx, rw in enumerate(data, start=1)]
def test_spark_json_reader_cast(temp_json_file):
uri, data, mdl = temp_json_file
- expected_fields = [fld for fld in mdl.__fields__]
+ expected_fields = [fld for fld in mdl.__fields__] + [RECORD_INDEX_COLUMN_NAME]
reader = SparkJSONReader()
df: DataFrame = reader.read_to_entity_type(DataFrame, uri.as_posix(), "test", mdl)
assert df.columns == expected_fields
assert df.schema == StructType([StructField(fld.name, get_type_from_annotation(fld.annotation))
- for fld in mdl.__fields__.values()])
- assert [rw.asDict() for rw in df.collect()] == data
+ for fld in mdl.__fields__.values()] + [StructField(RECORD_INDEX_COLUMN_NAME, get_type_from_annotation(int))])
+ assert [rw.asDict() for rw in df.collect()] == [{**rw, RECORD_INDEX_COLUMN_NAME: idx} for idx, rw in enumerate(data, start=1)]
def test_spark_json_write_parquet(spark, temp_json_file):
diff --git a/tests/test_core_engine/test_message.py b/tests/test_core_engine/test_message.py
index edf89fc..ccb6736 100644
--- a/tests/test_core_engine/test_message.py
+++ b/tests/test_core_engine/test_message.py
@@ -8,20 +8,8 @@
from pydantic import BaseModel, ValidationError
import pytest
-from dve.core_engine.constants import ROWID_COLUMN_NAME
from dve.core_engine.message import DEFAULT_ERROR_DETAIL, DataContractErrorDetail, FeedbackMessage
-
-def test_rowid_column_stripped():
- """Ensure that the rowID column is stripped from FeedbackMessages."""
-
- message = FeedbackMessage(
- entity="entity", record={"key": "value", ROWID_COLUMN_NAME: "some identifier"}
- )
-
- assert message.record.get(ROWID_COLUMN_NAME) is None
-
-
@pytest.mark.parametrize(
("derived_column", "expected"),
[
diff --git a/tests/test_pipeline/pipeline_helpers.py b/tests/test_pipeline/pipeline_helpers.py
index ddd4ef8..b13bef3 100644
--- a/tests/test_pipeline/pipeline_helpers.py
+++ b/tests/test_pipeline/pipeline_helpers.py
@@ -152,6 +152,7 @@ def dodgy_planet_data_after_file_transformation() -> Iterator[Tuple[SubmissionIn
"numberOfMoons": "-1",
"hasRingSystem": "false",
"hasGlobalMagneticField": "sometimes",
+ "__record_index__": "1"
}
planet_contract_df = pl.DataFrame(
planet_contract_data, {k: pl.Utf8() for k in planet_contract_data}
@@ -381,7 +382,8 @@ def error_data_after_business_rules() -> Iterator[Tuple[SubmissionInfo, str]]:
"ErrorCode": "LONG_ORBIT",
"ReportingField": "orbitalPeriod",
"Value": "365.20001220703125",
- "Category": "Bad value"
+ "Category": "Bad value",
+ "RecordIndex": "1"
},
{
"Entity": "planets",
@@ -394,7 +396,8 @@ def error_data_after_business_rules() -> Iterator[Tuple[SubmissionInfo, str]]:
"ErrorCode": "STRONG_GRAVITY",
"ReportingField": "gravity",
"Value": "9.800000190734863",
- "Category": "Bad value"
+ "Category": "Bad value",
+ "RecordIndex": "1"
}
]"""
)
diff --git a/tests/test_pipeline/test_spark_pipeline.py b/tests/test_pipeline/test_spark_pipeline.py
index 910626a..262d84f 100644
--- a/tests/test_pipeline/test_spark_pipeline.py
+++ b/tests/test_pipeline/test_spark_pipeline.py
@@ -175,6 +175,7 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name
"ErrorMessage": "is invalid",
"ErrorCode": "BadValue",
"ReportingField": "planet",
+ "RecordIndex": "1",
"Value": "EarthEarthEarthEarthEarthEarthEarthEarthEarth",
"Category": "Bad value",
},
@@ -188,6 +189,7 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name
"ErrorMessage": "is invalid",
"ErrorCode": "BadValue",
"ReportingField": "numberOfMoons",
+ "RecordIndex": "1",
"Value": "-1",
"Category": "Bad value",
},
@@ -201,6 +203,7 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name
"ErrorMessage": "is invalid",
"ErrorCode": "BadValue",
"ReportingField": "hasGlobalMagneticField",
+ "RecordIndex": "1",
"Value": "sometimes",
"Category": "Bad value",
},
@@ -347,6 +350,7 @@ def test_apply_business_rules_with_data_errors( # pylint: disable=redefined-out
"ReportingField": "orbitalPeriod",
"Value": "365.20001220703125",
"Category": "Bad value",
+ "RecordIndex": "1"
},
{
"Entity": "planets",
@@ -360,6 +364,7 @@ def test_apply_business_rules_with_data_errors( # pylint: disable=redefined-out
"ReportingField": "gravity",
"Value": "9.800000190734863",
"Category": "Bad value",
+ "RecordIndex": "1"
},
]
@@ -504,6 +509,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out
"Error Code": "LONG_ORBIT",
"Data Item Submission Name": "orbitalPeriod",
"Errors and Warnings": "Planet has long orbital period",
+ "Record Index": 1,
"Value": 365.20001220703125,
"ID": None,
"Category": "Bad value",
@@ -516,6 +522,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out
"Error Code": "STRONG_GRAVITY",
"Data Item Submission Name": "gravity",
"Errors and Warnings": "Planet has too strong gravity",
+ "Record Index": 1,
"Value": 9.800000190734863,
"ID": None,
"Category": "Bad value",
diff --git a/tests/testdata/demographics/basic_demographics.dischema.json b/tests/testdata/demographics/basic_demographics.dischema.json
index e9c8944..02e4367 100644
--- a/tests/testdata/demographics/basic_demographics.dischema.json
+++ b/tests/testdata/demographics/basic_demographics.dischema.json
@@ -9,7 +9,9 @@
"Forename": "str",
"Surname": "str",
"Date_Of_Birth": "date",
- "Postcode": "postcode",
+ "Postcode": {
+ "callable": "postcode"
+ },
"Postcode_Country": "str",
"Postcode_Format_Valid": "str",
"Postcode_Valid": "str"
diff --git a/tests/testdata/demographics/basic_demographics_ddb.dischema.json b/tests/testdata/demographics/basic_demographics_ddb.dischema.json
index 44e2f77..d58ace1 100644
--- a/tests/testdata/demographics/basic_demographics_ddb.dischema.json
+++ b/tests/testdata/demographics/basic_demographics_ddb.dischema.json
@@ -9,7 +9,9 @@
"Forename": "str",
"Surname": "str",
"Date_Of_Birth": "date",
- "Postcode": "postcode",
+ "Postcode": {
+ "callable": "postcode"
+ },
"Postcode_Country": "str",
"Postcode_Format_Valid": "str",
"Postcode_Valid": "str"
diff --git a/tests/testdata/movies/movies_ddb_rule_store.json b/tests/testdata/movies/movies_ddb_rule_store.json
index 843d4fa..6a51fd6 100644
--- a/tests/testdata/movies/movies_ddb_rule_store.json
+++ b/tests/testdata/movies/movies_ddb_rule_store.json
@@ -61,7 +61,7 @@
"name": "Get median sequel rating",
"operation": "group_by",
"entity": "with_sequels",
- "group_by": "title",
+ "group_by": ["__record_index__", "title"],
"agg_columns": {
"list_aggregate(sequel_rating, 'median')": "median_sequel_rating"
}
diff --git a/tests/testdata/movies/movies_spark_rule_store.json b/tests/testdata/movies/movies_spark_rule_store.json
index 08ad641..e8204c5 100644
--- a/tests/testdata/movies/movies_spark_rule_store.json
+++ b/tests/testdata/movies/movies_spark_rule_store.json
@@ -63,6 +63,7 @@
"entity": "with_sequels",
"columns": {
"title": "title",
+ "__record_index__": "__record_index__",
"explode(sequel_rating)": "sequel_rating"
}
},
@@ -70,7 +71,7 @@
"name": "Get median sequel rating",
"operation": "group_by",
"entity": "with_sequels",
- "group_by": "title",
+ "group_by": ["__record_index__","title"],
"agg_columns": {
"percentile_approx(sequel_rating, 0.5)": "median_sequel_rating"
}
diff --git a/tests/testdata/planets/planets.dischema.json b/tests/testdata/planets/planets.dischema.json
index 7a0387c..b44bb2e 100644
--- a/tests/testdata/planets/planets.dischema.json
+++ b/tests/testdata/planets/planets.dischema.json
@@ -114,8 +114,8 @@
},
{
"entity": "planets",
- "name": "has_row_id",
- "expression": "__rowid__ IS NOT NULL"
+ "name": "has_record_index",
+ "expression": "__record_index__ IS NOT NULL"
},
{
"entity": "planets",
diff --git a/tests/testdata/planets/planets_ddb.dischema.json b/tests/testdata/planets/planets_ddb.dischema.json
index 51e6650..0869aad 100644
--- a/tests/testdata/planets/planets_ddb.dischema.json
+++ b/tests/testdata/planets/planets_ddb.dischema.json
@@ -115,7 +115,7 @@
{
"entity": "planets",
"name": "has_row_id",
- "expression": "__rowid__ IS NOT NULL"
+ "expression": "__record_index__ IS NOT NULL"
},
{
"entity": "planets",