We believe empowering engineers drives innovation.

Great Expectations at Rearc (Part 1): Efficient Fault-Detecting Dataflows

By David Maxson
April 1, 2022

At Rearc, we work with hundreds of datasets sourced directly from various authorities, government agencies, and organizations. A core part of the value we provide to our customers is ensuring that they receive clean and reliable data, no matter what the raw original data looks like. While some of this is accomplished by enforcing data types and strict schemas, errors can easily slip through the cracks. To address these problems, we began investigating solutions for a succinct, self-documenting, self-verifying way of declaring what the output data should look like.

This post is the first in a short series, and will focus on our decision to integrate Great Expectations and the benefits our team has reaped from that.

Overview

Data can get ugly. Rearc tames this data, but doing so often requires custom code to handle the issues typically observed when sourcing the data; however, if those nuances change or an assumption falls apart, it can be easy to pass along faulty data that’s schematically fine but logically broken. Some examples of issues our team has encountered (across sources, and sometimes within the same data file) include:

What we wanted was a solution that could proactively detect these kinds of problems and alert our team before the faulty data could get to the customer. Ideally, such a solution would be:

Expectations

After a brief overview of our options, we settled on Great Expectations to address these needs.

Great Expectations is a shared, open standard for data quality. It helps data teams eliminate pipeline debt, through data testing, documentation, and profiling.

We dove into Great Expectations (GE) because it seemed to offer everything we needed for data validation. It ticked all the boxes listed above, and it’s available for use both via command line (CLI) and within custom Python code.

GE’s design is built around writing “expectations” about your data (similar to assertions in programming) in JSON. These expectations are grouped into Expectation Suites, which can be executed against your data.

Let’s take an example expectation suite:

{
  "expectation_suite_name": "us.states.populations",
  "expectations": [
    {
      "expectation_type": "expect_table_columns_to_match_ordered_list",
      "kwargs": {
        "column_list": [
          "date",
          "state",
          "population"
        ]
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "date"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_be_between",
      "kwargs": {
        "column": "population",
        "min_value": 0
      },
      "meta": {}
    }
  ]
}

The expectations in this suite specify:

There are a bunch of pre-written expectations, and you can also create your own. So far, so good. These provide a glimpse into what we expect of our data. The expectations are also written in a cross-language markup (JSON), so they can be readily version-controlled and used in automated tooling, even in different languages or in a browser. This makes it easy to auto-generate data dictionaries, validate multiple batches of data, track our data guarantees over time, and more.

This JSON format is the heart of Great Expectations. It can be easily generated using the provided profilers, modified in Jupyter notebooks, and interpreted and executed by the Great Expectations library to validate a particular instance of your data. Each expectation is also implemented not just in Pandas, but also in SQL and in Spark, so the same expectations that work for small CSV files will work on big data too. As your data grows, you can easily migrate your expectation execution from Pandas to SQL or Pyspark.

Great Expectations also has a concept of “batches” which helps encapsulate the various ways you might validate your data: maybe you have non-overlapping chunks of data you want to validate independently (e.g. daily logs), or maybe your data is small enough to validate in its entirety (e.g. a CSV file). Maybe your file is stored in S3, or on the local file system, or in a SQL database – maybe all three! You can use the same expectation suite on each one simply by defining a Datasource for each location the data is stored and generating Batch Requests within those sources. When an Expectation Suite is executed against a particular Batch Request at a particular moment in time, you get an immutable Validation report (as yet another JSON file) that records whether the suite passed, along with helpful information about any expectation failures.

Implementing Great Expectations in AWS

Our team integrated Great Expectations into our AWS-based data ecosystem. This meant integrating it with Apache Airflow running in Amazon Managed Workflows for Apache Airflow (MWAA) and leveraging S3 for data storage. This post will discuss our configuration, along with how we integrated Great Expectations into our tooling. There were some issues we encountered along the way that motivated some of these decisions; those will be discussed in the next post in this series.

Below, we will walk through diffrent components of our implementation, but you can also skip ahead and take a look at our final solution here: CustomGreatExpectationsOperator, which incorporates the practices discussed below. This operator simplifies the official operator by assuming S3 as the storage for everything and using default types.

Execution

Our dataflows are driven by Airflow. Fortunately, Great Expectations provides a plugin for Airflow. We ended up rolling our own operator to simplify our particular use case, but the fundamental differences are insignificant.

GE can be configured either using a YAML file or by providing the configuration as an object in memory. The former is convenient for the CLI, but the latter is easier to use in a programmatic context and is more flexible. There’s a fair amount of boilerplate involved in defining this configuration object, though it’s ultimately fairly straightforward.

The entire YAML configuration file is equivalent to an in-memory object called the Data Context. A Data Context can have multiple Datasources, using the following format:

datasources:
  my_datasource_name:
    module_name: great_expectations.datasource
    class_name: Datasource
    execution_engine:
      module_name: great_expectations.execution_engine
      class_name: PandasExecutionEngine
    data_connectors:
      my_dataconnector_name:
        module_name: great_expectations.datasource.data_connector
        class_name: InferredAssetS3DataConnector
        bucket: some_bucket
        # ...

Note the module_name and class_name keys scattered throughout. This is a convenient, standardized way of describing how to construct Python objects from a YAML file: the library can simply load the module mentioned, locate the specified class, and pass the remainder of the YAML dictionary to the constructor (possibly having pre-constructed the nested objects). This makes plugins super convenient, since you can install any package that provides the same functionality as any component and simply pass the appropriate module and class name.

The Python equivalent is to use the exact same dictionaries as we had encoded in YAML before:

from great_expectations.data_context import BaseDataContext

context = BaseDataContext(project_config={
    'datasources': {
        'my_datasource_name': {
            'module_name': 'great_expectations.datasource',
            'class_name': 'Datasource',
            'execution_engine': {
                'module_name': 'great_expectations.execution_engine',
                'class_name': 'PandasExecutionEngine',
            },
            'data_connectors': {
                'my_dataconnector_name': {
                    'module_name': 'great_expectations.datasource.data_connector',
                    'class_name': 'InferredAssetS3DataConnector',
                    'bucket': 'some_bucket',
                    'prefix': 'some_prefix',
                     'default_regex': {
                         "pattern": "(.*)",
                         "group_names": ["data_asset_name"],  # This name is important, not a placeholder
                     }
                },
            },
        }
    },
    ...
})

If you’re using the built-in classes, there are configuration objects (e.g. DatasourceConfig) which can validate your dictionaries, but they do little more than that. You still define everything as a dictionary, pass that to a Data Context constructor, then get the constructed objects back out of the Data Context. Trying to construct the underlying objects yourself is likely to break, as the dictionary-driven constructor does more than meets the eye. This approach will be critiqued in the next post in this series.

S3 Integration

The Great Expectations library is mature enough to offer drivers for storing things in memory, including expectation suites, data files, validation results, and more. It does this by abstracting the concept of a “file system” out into a generic API, which is then implemented for various platforms. An example Datasource based on S3 might look like the following:

datasources:
  my_datasource_name:
    class_name: Datasource
    data_connectors:
      my_dataconnector_name:
        class_name: InferredAssetS3DataConnector
        bucket: my-bucket

On the surface, this seems ok. If you spin up a simple bucket and try this out, it’ll work great. The problem crops up when your data bucket grows.

At the time of this writing, the issue we encountered was that, when we went to create and load a Batch Request, GE was trying to enumerate the entire “folder” to find the file we had specified. This occurs because InferredAssetS3DataConnector, or really a data connector in general, uses asset names instead of raw paths, and thus has to map the name to a path. It thus enumerates the whole bucket, generates the name mapping, then looks up the path for the asset name you provided and loads it. Since we use file paths as the asset name, this process is entirely redundant.

Note that this issue comes up elsewhere that S3 data storage is used in Great Expectations. This was the only case where the slowdown was so problematic as to warrant a workaround. This issue will be brought up later in this article series.

Our solution was to define a datasource so specific that enumerating all the files it contained would be reasonable. While we couldn’t do this with a hard-coded YML file, it’s straightforward in Python:

# For s3://{bucket}/{folder}/{file_name}
DataContextConfig(
    datasources={
        "my_s3_datasource": DatasourceConfig(
            data_connectors={
                "default_inferred_data_connector_name": DataConnectorConfig(
                    class_name="InferredAssetS3DataConnector",
                    bucket=bucket,
                    prefix=folder,
                    default_regex={
                        "pattern": "(.*)",
                        "group_names": ["data_asset_name"],
                    },
                ),
            },  # ...
        ),  # ...
    },  # ...
)

We then form the Batch Request like this:

batch_request = BatchRequest(
    datasource_name="my_s3_datasource",
    data_connector_name="default_inferred_data_connector_name",
    data_asset_name=file_path,  # {folder}/{file_name}
)

With this configuration, only the folder containing our file is enumerated; so long as we keep files in small, isolated folders, this enumeration is entirely feasible.

There may be a way to accomplish efficient S3 batch requests using RuntimeDataConnector instead. That will be covered later in this series.

Airflow Operator

Putting everything together you can add a validation step in your Airflow DAG as easy as:

validate_data = CustomGreatExpectationsOperator(
  task_id="validate_data",
  s3_path="<s3_path_of_data_to_validate>" # i.e. s3://humor-detection-pds/Humorous.csv,
  expectation_suite_name="<expectation_suite_name>" #i.e. validate.humor,
  config_prefix="<s3_path_of_expectations>" #i.e. s3://my-great-expectations/
)

Conclusion

This first article has discussed Great Expectations at a high level, then discussed how we implemented that in an S3 context. The next article will dive deeper into some of the frustrations we encountered in the process.

Despite the technical issues we encountered, Great Expectations has been a valuable asset to our data flows, forcing better data engineering practices and catching problems that would have been difficult to predict or check in other ways. We intend to continue expanding upon the excellent base that GE provides with auto-generated data dictionaries, customer-accessible data guarantees and validation reports, and introducing additional data wellness checks.

That said, our experience with Great Expectations has been that, for cases beyond the trivial and typical (working locally with local files), some custom engineering effort is likely to be required. The tool is a phenomenal foundation for providing data guarantees, but there is much still to be done to smooth the rough edges, improve performance on atypical use cases, and generally mature to behave more intuitively under more varied circumstances.

If you are trying to use Great Expectations for yourself, check out their docs and reach out to their Slack community when you need help, and consider contributing some time toward improving the project for your own use case, and thus for everyone.

Rearc Data

At Rearc we understand the complexities involved with acquiring data from disparate sources and transforming them to meet your analytical needs. Our goal is to provide our customers with clean, reliable, up-to-date data products and solutions that allow them to focus on their core mission.

Check out our 440+ data products available on AWS Data Exchange here. If none of the products match exactly what you need, reach out to us at data@rearc.io and we would love to work with you.