4. The Ingestion Architecture – The Modern Data Warehouse in Azure: Building with Speed and Agility on Microsoft’s Cloud Platform

© Matt How 2020
M. HowThe Modern Data Warehouse in Azurehttps://doi.org/10.1007/978-1-4842-5823-1_4

4. The Ingestion Architecture

Matt How1 
(1)
Alton, UK
 

Data does not stand still. As data warehouse developers, this is a known fact on which our careers are based. For data to have value, it has to be reliably moved to a place where that value can be realized and the method by which we move data should depend on the needs of our users and the frequency of the data, not on the physical or technological limits of the system. As this book examines a modern data warehouse, we need to research beyond the traditional defaults such as batch-based ingestion and simple lift and shift extract, transform, and load (ETL) patterns and explore how we offer more flexibility to the end users. This chapter outlines an approach for warehouse loading that promotes efficiency and resilience, moving on to describe three ingestion modes. By defining the risks and benefits of batch-based, event-based, and streaming modes, you will know how to implement each approach while also being aware of the additional complexities of each, ensuring a successful implementation.

Layers of Curation

ETL describes the process of lifting and changing data so that it can be used in an analytical data warehouse. Often this process requires many complex steps involving data cleaning, data transformation, and data integration, and in some systems, there is an attempt to negotiate all of these steps in once single process. Arguments are made regarding the efficiency or compact nature of such an approach, but ultimately, these ETL designs nearly always become slow, difficult to maintain, and a primary reason for rebuilding ETL pipelines.

For these reasons, it is crucial to partition the ETL work up into clearly defined layers that separate loading and cleaning concerns from transformation and integration concerns.

The Raw Layer

The initial layer in your data warehouse loading process should hold your source data in its rawest format. No cleaning, no filtering, just data exactly as it arrives from your source provider. This convention should be followed even in the instances where you collect data directly from a database yourself. Even though that data could go directly into another database, having the forethought to snapshoot the data in a raw layer will have numerous benefits downstream. Additionally, data in this area should be truly immutable (never deleted or updated). By storing data in this way, you ensure that in the worst possible case, your warehouse can be truncated and rehydrated from data that exactly matches how it arrived in the first place - an ultimate rollback option from any given point in time. Additionally, if your source datasets need to be consumed and integrated by other areas of the business, you can easily provide access to this consistent raw layer without the need to make any changes to your ETL processing pipeline.

Because of the demands of this layer, the most fitting technology is a data lake. Primarily, data lakes have the ability to scale to limitless capacity and can store files of any type and size without the need for a set column structure or data types, as would be the case in a database environment. In order to make your data lake as efficient as possible, it should be a developer lead initiative that promotes clear organization and, while allowing datasets to be easily ingested, should also enforce a rigorous convention for placing datasets in a well-defined, logical directory structure. In almost all cases, this structure should have an initial layer that is divided by source system. This is so that cleaning and sensitivity concerns can be considered separately and ensures that changes to one source systems processing should not have any knock-on effects to other source systems. Beneath this source system–driven layer, you should then split data by individual dataset with further year, month, and day partitioning below that. This instills a degree of metadata into the lake directory itself but also helps to derive chronology and lineage in a very intuitive way. An example of this structure is shown in Figure 4-1.
Figure 4-1

A folder hierarchy showing the RAW directory with one source system and two datasets

The Clean Layer

The first step in moving data from its raw source format into a curated, data warehouse–ready format is to clean and standardize that data. By cleaning your data, you ensure that bad records are not processed into the warehouse and the standardization allows you to integrate data consistently across your platform. Depending on the quality of data that arrives from your source systems, you may find that cleaning rules can become very complex, hence why they should be performed in their own layer so as not to interfere with your immutable source but also so that value adding business logic is not hindered by complex cleaning rules.

The output of the clean layer should be stand-alone datasets that are primed and ready to be integrated, and this could mean that data has been filtered, columns have been removed, or that values have been transformed in some way so that they will align better with similar type values from different systems.

The Transformed Layer

The final movement of data in this layered system is from its clean location into an analytical data warehouse. The demands of this layer require data to be joined, aggregated, and integrated, and again the processing logic can get very complex. Of course, this is made simpler because you know you are only working with clean, valid data and can therefore focus solely on the business logic that is required. Ultimately this layer will have to implement slowly changing dimensions, surrogate keys, conformed dimensions, as well as many other warehousing concepts and therefore should attempt to utilize patterns that perform each operation consistently as opposed to designing each flow per dataset. In much the same way as you would want to clean strings consistently across all inputs, you would also want to implement data warehouse concepts consistently so that debugging and maintenance can be simplified. Additionally, new feeds of data can then be integrated very quickly because the patterns exist; it is simply a matter of choosing the right pattern and supplying the right columns.

Now that you understand the layers of processing that go into building the warehouse, and the justification for each layer, we can discuss how the differing processing architectures can interact with each layer.

Understanding Ingestion Architecture

At the start of a modern data warehousing project, there should always be a phase of planning and discovery. Part of this phase should be spent understanding the methods by which data arrives and then using this knowledge to plan how that data will be fed into the warehouse. For example, if a source provider delivers datasets at a single point on a daily basis, then there is no need to ingest that data into the warehouse more than once a day. Streaming this data constantly would provide no benefit to the users as nothing would change. Conversely, if a source provider has the ability to stream data into your environment, then this opportunity should be realized. There should be no reason why the users cannot see the data in near real time. By understanding each of the potential ingestion scenarios, you can begin to plan how your data warehouse might handle each of these.

Batch Ingestion

By far the most tried and tested method of populating a data warehouse is to use batch ingestion, a process where data is loaded from raw through clean and into the warehouse in regular, predefined, scheduled increments. The reason for this method being so popular is that it promotes resilience and stability above all other attributes. Optimizations can be made for speed and efficiency, but the batch is still a batch, with a start and an end and a relatively stable amount of processing in the middle. Were the batch to fail, then we can safely say that the entire batch failed and that it would need to be processed again. Additionally, batch loads often have to conform to a fixed window and so users can easily grasp the schedule by which their data arrives and know when to refresh reports and dashboards. For a long time, batch ingestion suited nearly all scenarios; however, there are of course increasing needs to have data arrive more frequently or perhaps not based on a schedule but on the occurrence of an event.

The Risks and Opportunities of Batch Ingestion

The term “risks” is applied loosely here as batch ingestion is by far the most stable ingestion method; however, there are certainly things it cannot do, the risk being you may need to do those exact things some other way.

The ETL Window

In nearly every batch-based scenario, there is an allowable start and end time. This window is known as the ETL window , and it is the role of the developers to ensure that the entire end-to-end processing occurs within these times to avoid disruption to the business. Generally, these times are set to ensure the processing starts a safe amount of time after the last daily transaction and then to complete a safe amount of time before the next day begins. Often between midnight and 5am are peak processing times for ETL solutions. The rigid nature of this scheduled window gives developers key metrics to work toward, and its simplicity comes from being analogous to a calendar date. All being well, report users can rely on their data being no more than one day out of date and can live in certainty that no numbers will change between the first glance at a dashboard in the morning and a last check before going home.

However, as data feeds increase, it is not long before what may have seemed like a generous ETL window begins to feel constricting. More and more pressure will be placed on performance, but ultimately, things can only go so fast, and while speed is a focus, reliability is likely to suffer. Of course, you can explore options around splitting batches, prioritizing certain workloads, or beefing up servers; however, these are only kicking the can down the road. Ultimately, there is the risk that your batch can become too big for your ETL window. Of course there is always the risk that a user may require data to be processed outside of the ETL window; now you have to handle the fact that transactions are happening throughout your batch load, greatly increasing the chance for error.

A final issue with the ETL window is that there can be times when data arrives late. How can you process a critical dimension if the data of the required day has not yet arrived? What is more, your batch is only aware of schedules, so unless this delay is pre-arranged, there is a high chance of failure or worse, success, but with the wrong data! Ensuring that scenarios such as this are handled is critical to a batch processing architecture, and often the ability to programmatically decide to halt or postpone a batch using a series of checks saves a large amount of headache further down the line.

The ETL Anti-window

Given that a batch process happens within a set window, it means therefore there is an anti-window , the passage of time that is not considered critical for processing. This regular ETL anti-window means that the development teams have a prime opportunity to deploy new code or data feeds into the batch process without the risk of immediately creating problems. New solutions can be deployed and tested safely with the knowledge that if the tests fail, the deployment is rolled back, and the batch continues as normal, again, reiterating the point that batch equals resilience and stability.

Failure Investigation and Troubleshooting

Continuing from the idea of the anti-window, this also provides the development team a chance to investigate and resolve any issues that occurred in a nightly batch. Knowing that the system does not need to operate again for several hours allows team members free reign to investigate issues without the risk of accidentally interfering with some ongoing process. Once an issue is determined and a fix implemented, this can be tested and then promoted into the production environment all within the relative safety of the anti-window.

However, while this activity is going on, there are potentially two problems that are unfolding in the background. The first is that an analyst or C level exec is waiting for a report to arrive. Because the issue happened overnight, often issues are not discovered until the next morning, and even if an on-call service is provided, there needs to be significant investigation to determine if the whole batch is bad or if only part of the batch needs rerunning.

The second problem is that regardless of whether you must completely restart a batch or can operate on a subset, you will likely still have some amount of processing time ahead once the issue is resolved. The point of a batch is that it is a larger amount of data processed at a convenient time. However, in this occasion, you could be dealing with a large amount of data that needs to be processed at a very inconvenient time.

The Batch Ingestion Tools

An implementation of batch ingestion could be stood up using a variety of tools. Most SQL engines lend themselves nicely to batch-based tasks, and I imagine most developers reading this will have tools in mind to perform such a solution. To elaborate, Azure Synapse Analytics can connect directly to several cloud data stores and utilize PolyBase and external tables to read data straight from a file, into an internal SQL table. This approach requires only that Azure Synapse Analytics is running and that it has a connection to the relevant data store; no other tools would be required.

However, the more common scenario is that a database does not have PolyBase technology, for example, Azure SQL Database, and will need to be fed using some kind of integration engine. In this case, Azure Data Factory is by far the best tool as it supports a multitude of connectivity options and has specialized activities for the task of loading databases. Of course, SQL Server Integration Services (SSIS) is an alternative option here; however, it cannot scale to the realms of big data as easily as Azure Data Factory can.

Finally, there may be times when files are simply too large or too complex to be read using SQL engines, and therefore, extended processing to a data lake–based tool may be required. One such tool is Azure Databricks, a PaaS implementation of Spark, which will be discussed later in this book as a potential alternative when data exceeds the reasonable limits of Azure SQL engines.

Batch Ingestion for Azure Synapse Analytics

Reading large batch files efficiently is something Azure Synapse Analytics does very well, and when reading from a data lake, there is a huge efficiency gained from using the PolyBase engine. A common pattern is to define an external table root location that is the starting point for a partitioned set of data made up of any number of files, for example:
/Raw/Sales System/Daily Sales/...

This root location is then the starting point for PolyBase when it begins searching for data in the lake. Underneath the root location, you could create many files and folders; PolyBase will be able to see and read them all. Often you would extend from the table root with a year/month/day structure although you could use other partitions as well such as customer, product, and so on.

Many file types are not type safe, meaning the data within each column may not conform to a set data type. In these cases, it is important to either remove the offending rows using the PolyBase rejected rows functionality, or assuming you do not want to lose data, set each column of the external table to be NVARCHAR(1000). However, a Parquet file type is type safe and therefore the external table can be strongly typed also, removing the need to cast as part of the ETL, and this is a major reason for choosing Parquet files when performing ETL a large scale.

Once the data is visible to Azure Synapse Analytics through the use of an external table, it needs to be read into a persisted table in the database. There are a few ways to do this; however, the CTAS method provides a minimally logged option that also surfaces the most flexibility for the developer.

The Create Table As Select statement is the staple method to move data around in Azure Synapse Analytics. The reason for this is that it works in a parallelized manner but also provides a great deal of control to the developer. With a CTAS statement, many key parts of the DDL can be changed, such as the distribution type, the index type, the partition values, and even the columns data type. These inherent capabilities make the CTAS statement ideal for loading data through a layered processing pipeline because each transformation can be optimized down to the index, distribution, and partitioning level. The following steps show how the CTAS pattern can be used to facilitate ETL through each layer of the warehouse:
  1. 1.

    A raw CSV file in the data lake would be exposed as an external table with NVARCHAR(1000) type columns. By using the Create Table As Select statement, the DDL of the produced internal table will be derived from any casting or transformation implemented by the developer. Additionally, indexing and distribution can be configured intelligently as opposed to relying on the defaults. An example of CTAS for this layer is documented in Listing 4-1.

     
IF OBJECT_ID('Clean.DirtyCSVFile') IS NOT NULL
DROP TABLE Clean.DirtyCSVFile;
CREATE TABLE Clean.DirtyCSVFile
WITH
(
    HEAP,
    DISTRIBUTION = HASH([OrderNo])
)
AS
SELECT
    ISNULL(CAST([ID] AS INT),'0'),
    ISNULL(CAST([SkuItemId] AS VARCHAR(18)),''),
    ISNULL(CAST([CustomerId] AS INT),'0'),
    ISNULL(CAST([OrderNo] AS INT),'0'),
    ISNULL(CAST([Quantity] AS INT),'0'),
    ISNULL(CAST([Price] AS DECIMAL(10,2),'0'),
FROM [Ext].DirtyCSVFile
OPTION (LABEL = 'Clean.DirtyCSVFile.CTAS');
Listing 4-1

A CTAS statement used to load data into a clean area

Note that in the preceding code, the index has been defined as a HEAP; this is because there is an overhead to creating a formal index, and as the whole dataset will be loaded, there is no benefit to be gained. A further detail is the distribution being set to hash on OrderNo. This ensures that all data relating to the same order will be stored on the same storage node of the server and therefore provide better performance for joining downstream. In the SELECT itself, all the columns definitions have an ISNULL and CAST statement which enforces a NOT NULL and the CASTED data type on the destination table (Clean.DirtyCSVFile in our case). Finally a label has been added which allows the engine to identify this query later for gathering aspects such as row counts and error details.
  1. 2.

    The now clean data is to be joined and integrated with other tables. The resulting dataset will no longer resemble the source datasets, and so the CTAS offers maximum capability in terms of table definition but also in optimizing the data for its new purpose; now it has been enriched. A CTAS statement to carry out this step is shown in Listing 4-2.

     
CREATE TABLE Warehouse.DirtyCSVFile
WITH
(
    CLUSTERED COLUMN STORE,
    DISTRIBUTION = HASH([OrderNo])
)
AS
    WITH
    cte_Orders    AS
    (
        SELECT
            OrderNo,
            SkuItemId,
            CustomerId,
            Quantity,
            Price
        FROM Clean.DirtyCSVFile
    ),
    cte_DimCustomer AS
    (
        SELECT
            CustomerKey,
            CustomerBusinessKey
        FROM Dim.Customer
    ),
    cte_DimProduct AS
    (
        SELECT
            ProductKey,
            SkuItemId
        FROM Dim.Product
    )
    SELECT
        CAST(o.OrderNo AS INT) AS OrderNo,
        CAST(dc.CustomerKey AS INT) AS CustomerKey,
        CAST(dp.ProductKey    AS INT) AS ProductKey,
        CAST(o.Quantity AS INT) AS Quantity ,
        CAST(o.Price AS DECIMAL(10,2) AS Price
    FROM cte_Orders AS o
    INNER JOIN cte_DimCustomer AS dc ON dc.CustomerBusinessKey = o.CustomerId
    INNER JOIN cte_DimProduct AS dp ON dp.SkuItemId = o.SkuItemId
OPTION (LABEL = 'Warehouse.DirtyCSVFile.CTAS');
Listing 4-2

CTAS statement to create a warehouse fact table

In the preceding code, the index definition has changed from HEAP to CLUSTERED COLUMN STORE so that the data is more efficient for analytical queries such as aggregations; however, the distribution configuration has not changed which will ensure that the lowest amount of data movement should occur. The SELECT part of the statement uses more complex logic by employing common table expressions (CTEs); these are common in data integration and demonstrate that all SELECT capabilities exist within the CTAS.

While the CTAS pattern offers a number of efficiencies, there is a functionality gap in that when data is selected from the external table, any filter predicates in the WHERE clause cannot be pushed down to the data lake. In practice, all of the data below the root is read and only then is the filtering done – obviously this is not the most efficient way to extract a small daily batch from what may be a much larger set of data.

One solution to this problem is to use an active partition, where the most recent data is stored, and an inactive partition, where the less recent data is located. As the data is ingested, it can then be copied into the inactive partition so that it is available if needed but will not unnecessarily increase the volumes of data to be loaded to Azure Synapse Analytics. The structure for this might look like that shown in Figure 4-2.
Figure 4-2

An example of a file structure used to load active data and obstruct loading of inactive data

In this case, the external table definition would resemble the code in Listing 4-3.
CREATE EXTERNAL TABLE External.DirtyCSVFile
(
    [ID] NVARCHAR(1000) NULL,
    [SkuItemId] NVARCHAR(1000) NULL,
    [OrderNo] NVARCHAR(1000) NULL,
    [CustomerId] NVARCHAR(1000) NULL,
    [Quantity] NVARCHAR(1000)) NULL,
    [Price] NVARCHAR(1000) NULL,
    [LastUpdateDateTime] DATETIME2 NULL
)
WITH (LOCATION='/Raw/Sales System/Active/Sales',
      DATA_SOURCE  = DataLakeSource,
      FILE_FORMAT  = CSV,
      REJECT_TYPE = VALUE,
      REJECT_VALUE = 0);
Listing 4-3

Data definition language (DDL) statement to create an external table in Azure Synapse Analytics

You can see that by specifying active in the location string, the inactive data will not be read. An alternative solution is to use a stored procedure containing dynamic SQL to create a new external table each time an ETL process is kicked off. This could have the specific location string passed in as a parameter, meaning that only a single file is read at that specific time.

Create External Table As Select (CETAS)

When working with batched data and Azure Synapse Analytics, there may be a need to write transformed data back out into the data lake for consumption by other systems. The way to do this using Azure Synapse Analytics is to use the CREATE EXTERNAL TABLE AS SELECT (CETAS) statement . When considering this statement, remember that a CREATE TABLE AS SELECT (CTAS) statement generates a brand new internal table based on the select that is provided and a CETAS is no different other than the data for the table is stored externally, that is, in the data lake. Provided that the required PolyBase objects are created (the external data source and the file format), the SQL engine can use PolyBase to push data back to the lake. The syntax for this statement is shown in Listing 4-4.
CREATE EXTERNAL TABLE ext_FctOrders
WITH (
        LOCATION='/DWH/Fact/Oders.pqt',
        DATA_SOURCE = DataLake,
        FILE_FORMAT = PqtFormat
) AS SELECT TOP 100 PERCENT FROM Warehouse.FctOrder;
Listing 4-4

Creating an external table as SELECT

One consideration here is that the produced file will be written to the data lake and partitioned according to the storage engine and not how the data was partitioned in Azure Synapse Analytics. There is a method to achieve intelligent partitioning and this is described in more detail in Chapter 7, “Logging, Auditing, and Resilience.”

Event Ingestion

Event ingestion is not dissimilar to batch ingestion although instead of multiple files being processed at once, now a single file is considered your batch. Of course, the challenges and opportunities of single file batches are much closer to traditional batch processing than stream processing, which is based on a record by record flow. The primary difference is that files are processed the minute they arrive within the agreed location and not based on an arbitrary schedule. In nearly all cases, this means that files are processed as a single unit, without any dependence on other files that may also arrive throughout the course of the day.

A warehouse will almost always require multiple files to be ingested in order to be refreshed; however, the early cleaning and validation stages for those files can be entirely independent.

Many services within Azure can generate events when things happen, and also many services can listen to these events and take actions when they do. An example is Blob Storage, and therefore Azure Data Lake Gen 2; the storage engine can fire events when new files are added or existing files are deleted. Subsequently, Azure Data Factory can listen for those events and then trigger a pipeline, utilizing the metadata provided with the event, for example, file name and location. Generally speaking, event data is implemented using JavaScript Object Notation (JSON) because many services can process and understand this simple object type while it also is not schema bound, meaning additional attributes can be added to the object without disrupting existing processes. The Azure platform also has a wide number of tools for managing and working with event data, some of which will be discussed later in this section; however, an exhaustive list of event-enabled services is out of scope for this book.

The Risks and Opportunities of Event-Based Ingestion

The primary benefit of event-based ingestion is that files can be processed quicker and the warehouse can be updated sooner. If a file arrives early, then the processing can be completed earlier. Although if a file arrives late, this should not collapse the integration process because the process would not have started until the file arrives. In batch ingestion, files could arrive at any point during the day but still not be processed until the evening, assuming a nightly ETL window is implemented. The only other way to manage file ingestion in such a way is to build intraday ETL loads, but this would require managing multiple schedules and ETL windows; ultimately this can quickly become very complicated. The reason for event-based ingestion being better is that the trigger is the event, perhaps a file arriving in Blob Storage, and the integration engine knows how to respond to the event because of the associated metadata of that event.

This seems like a great way to build upon a batch process; however, it is not without its own pitfalls. In a more simplistic event-based application, you have to realize that you are relinquishing the ability to decide when files are processed. You are no longer telling the engine to get to work when you know the environment is ready; you are granting the data provider that ability. Even if this is done with no intention to negatively affect your system, you must ensure that the platform is always able to process data, day or night. If for some reason it cannot, then you need other options for storing events and returning to them later in the day.

Finally, you will need a mechanism to determine when all the required files have arrived in clean and therefore the warehouse is ready to be refreshed. This can often start out as a simple stored procedure but can quickly become a complex mesh of intertwined dependencies that becomes very difficult to navigate and resolve. The risk here is that the data warehouse will never be processed because the necessary files were never all ready at the right time; it is essential that you manage this process closely and ensure that your warehouse will not be starved.

Implementing Event Ingestion

As discussed, event-based processing can present many benefits, but there are also challenges and technical considerations. This next section aims to focus on the real-world implementation considerations needed when working with events.

Decoupled Processing

The essence of decoupled processing is in the absence of unnecessary dependencies. When datasets are dependent on each other, there is a higher chance of failure, and particularly at early stages of an ETL pipeline, there is little reason to enforce these dependencies. Instead, each dataset can be processed independently, and if there was a failure, this should not disrupt any of the other datasets being processed at the same time.

Often ETL designs originating from a batch-based paradigm tend to favor complete success or complete failure, whereby all datasets are coupled, and a single error means the entire batch must be fixed and reprocessed. The issue is that this is inefficient in terms of compute power, every second counts in a cloud-based environment, but also in the amount of time, it takes to deliver insights to your users. Instead, all successful datasets can be handled according to their individual needs.

In a data warehouse scenario, decoupled ingestion allows the loading process for an individual file to be triggered by an event, usually the file arriving in a storage repository. An event could occur at any time of day and be handled in several different ways, but this concept allows the file to be ingested as soon as it arrives, allowing BI teams to move away from a single, monolithic nightly ETL load. This approach is illustrated in Figure 4-3.
Figure 4-3

A decoupled process loading data between layers

In the preceding diagram, files 2 and 4 both have failures between layers; however, files 1 and 3 can be fully processed into the warehouse, if appropriate.

Assuming the cleaning process completes successfully, the next procedure is to transform and integrate datasets to produce warehouse tables. In contrast to the load and clean routines, the transformation procedure needs to interact with multiple datasets in order to add value. With the possibility that some datasets may fail to reach the clean layer, a special type of query is needed that can check the dependencies for each warehouse table and tell the ETL engine which tables can be created and which are not yet ready to run.

Referring to the preceding image, a warehouse table that was dependent on files 1 and 3 could be created; however, a file that required files 1 and 2 could not. In order to resolve this kind of processing logic, a dependency resolution engine is required.

A dependency resolution engine can take many forms depending on the prevalent technologies in your platform though one common method may be to use a stored procedure. This assumes you are storing your dataset processing runs, high watermarks, and dependency mappings in an auditing database, as is the pattern recommended and explained in more detail in Chapter 7, “Logging, Auditing, and Resilience.” A dependency resolution query would be triggered each time a dataset is successfully cleaned and would comprise of several steps. These are described as follows using dataset 1 as the dataset just cleaned, with the warehouse tables depending on datasets 1, 2, and 3 in order to be refreshed:
  1. 1.

    Knowing the identifier of the dataset just cleaned, query for all of the subsequent tables that are dependent on that dataset.

     
  2. 2.

    Using those identifiers, query for all the other datasets that would be required to fulfil each list of dependencies.

     
  3. 3.

    Using the high watermarks, determine which of those datasets have a watermark that is greater than the warehouse table enforcing the dependency.

     
  4. 4.

    For those where all high watermarks are greater, run the proc to generate the table. For the others, do not run the proc and check again when the next file arrives into the clean layer.

     
A more visual example of these steps is shown in Figure 4-4.
Figure 4-4

An example of a simplistic dependency resolution process

More complex scenarios can develop of course, and a common requirement is to daisy chain dependencies together, otherwise known as recursion. This can be implemented simply by triggering the dependency engine query from later stages in the process. In Figure 4-5, a warehouse table needs to be produced in order to generate subsequent warehouse tables and so the dependency query would be called as the datasets arrive in clean but also when tables are refreshed in the warehouse layer.
Figure 4-5

Using a dependency resolution engine to resolve a meshed dependency structure. In this second scenario, all subsequent processing is blocked until file 2 arrives. If file 2 was a low-priority dataset or did not change often, but its data was still required as in the preceding scenario, it would be reasonable to question whether this is a worthwhile endeavor. To overcome this, we can overlay the dependency engine with a simple policy concept that can override the fact that a required file was not refreshed. This is exemplified in Figure 4-6

Figure 4-6

Showing how optional and mandatory files can ensure processing is not held up by late or infrequently arriving files

In the preceding figure, files 1 and 3 are mandatory; however, file 2 is optional and therefore should not delay processing. A real-world example could be to have file 1 as a product file, file 2 as a product type file, and file 3 as a product sales file. The types change infrequently, so we are happy to take a latest version of that file even if it is not as recent as the other files.

Listening for Events

The ability to listen to events being raised across an Azure subscription is fairly common. While there are many services that can manage and process events, the most relatable to the content of this book is Azure Data Factory, which has the ability to be triggered by either a blob creation or blob deletion event. The event has to come from a Blob Storage account and so Azure Data Lake Gen 1 (ADL Gen1) is out of the question; however, the architecture of Azure Data Lake Gen 2 means that it is compatible with this event trigger. A limitation here however is that the event can only be filtered by the name of the file triggering the event although there is the ability to wildcard this filter to a point. When filtering for events, you can either choose to react to blobs that have a certain prefix or suffix, and this means you can either be entirely specific about a particular blob to look out for or very generic to pick up on any event that occurs within a set directory. See the following Table 4-1 for some examples.

Table 4-1

A table demonstrating some implementations of event filters

Filter Type

Filter Expression

Would Find

Blob path begins with (prefix)

Sourcefiles/SalesDetail/

Any file in the SalesDetail folder in any container

Blob path begins with (prefix)

/Sourcefiles/

Any file in the Sourcefiles container

Blob path ends with (suffix)

.csv

Any CSV file in any container

Blob path ends with (suffix)

/Sourcefiles/SalesDetail/sales.csv

Any files named sales.csv in the specific directory

Queuing Events

In an event-driven architecture, the goal is to listen and process events in real time; however, this is not always possible. Whether it be agreed downtime with the business to maintain analytical consistency throughout the day or a scheduled maintenance window to allow for deployments, there is guaranteed to be a time when your platform cannot respond immediately to events. During these windows, it is essential that the platform has a robust mechanism for queueing the events in the correct order so that they can be processed at a later date or time.

To tackle this, delayed event processing can be employed to listen to all events; however, only process them outside of the agreed downtime, similar to an ETL window. Events that happen within the downtime should be stored in order and processed when the platform resumes. See Figure 4-7 for a diagram of how this would work.
Figure 4-7

A diagram showing an implementation of delayed event processing

A similar but alternative pattern is to implement a period of selective event listening whereby some files are allowed to process as they will not disturb the other activities going on in core business hours. However, some may be withheld until the ETL window opens. A diagram for this pattern is shown in Figure 4-8.
Figure 4-8

A diagram showing an implementation of selective event processing

The benefit in both delayed event processing and selective event processing is that your stable period throughout the day is maintained; however, throughout the course of the evening, data is allowed to process freely as and when it arrives, alleviating pressure vs. a single batch load. Further to this, now that file processes are decoupled, a failure for one file has absolutely no bearing on whether any other files will fail to process or not. This can make debugging much simpler because the affected file will be clearly identified and there is less work to do once the issue is resolved. A full explanation and implementation of this pattern is held in Chapter 8, “Using Scripting and Automation.”

Event Ingestion for Azure Synapse Analytics

The method for ingesting event-based datasets in Azure Synapse Analytics is broadly the same as for batch-based data. The additional consideration is that you may receive smaller volumes of data more frequently. Because the standard pattern in Azure Synapse Analytics is to use the CREATE TABLE AS SELECT (CTAS) syntax, you essentially have to re-create the entire table for every dataset you want to append, and also you will be more frequently using up limited concurrency slots. In some scenarios, however, you may find there is a disconnect between the rate your data arrives and the frequency that your users need it. Therefore, you should be open to the possibility of batching your data to a frequency that is agreed with your users. Fortunately, because of PolyBase technology, your multiple files can remain separate and Azure Synapse Analytics will be able to read the data in one pass.

Event Ingestion for Azure SQL Database

Azure SQL Database is a good fit for event-based ingestion because it is architected to be more transactional than Azure Synapse Analytics. This means that data arriving in smaller and more regular batches is easily ingested at the same frequency as the originating events, unlike Azure Synapse Analytics where you may group datasets together. Of course, without PolyBase, you still need to use an integration engine such as Azure Data Factory to push the data into the database.

Stream Ingestion

When reading about event-based and batch-based processing, you will see that there are a number of similarities between the two modes. Both use files that can be processed in a decoupled manner although event based perhaps processes smaller files more regularly and is more responsive than a scheduled batch process. Ultimately, however, there is a regular interval in which data is processed and clear beginning and end to the process routine. This third mode, stream ingestion, is completely different from the previous two and relies on a constant flow of data from a source which can then be landed into files or passed directly into a destination system. Many modern systems are now able to offer the ability to tap into data streaming outlets, and with the increasing dependence on Internet of Things (IoT) technology, stream-based ingestion is becoming increasingly popular. Azure has a whole set of technologies dedicated to streaming capabilities, and this is in addition to the open source technology options, such as Databricks that can support a streaming architecture.

The Risks and Opportunities of Stream Ingestion

Data streaming presents a number of benefits against the slower pace of batch- and event-based ingestion, and while the available technologies go a long way to making this approach simple, there are still challenges that need to be overcome. The most obvious benefit is the speed at which data is available to your users. Mere moments after transactions occur in your source system, they will be available to your users via their analytics dashboards and reports. This can lead to incredibly efficient decision making and the ability to react to changes in real time, not hours or days later. This can not only have a commercial benefit to a business but also can allow for critical systems to be monitored continuously. By streaming events from IoT sensors, businesses are able to monitor attributes of hardware such as temperature or pressure to detect the possibility of failure before any real problems occur.

A further benefit of data streaming is the size of data being processed through the system is likely to be very small, usually single records or micro batches of one or two records at a time, and this means that architecture components can be relatively inexpensive as there is no memory pressure.

Though the speed of data streaming is arguably the key benefit it also presents the primary challenge. The rate of the arriving data means that solutions must be always running and available to process records successfully or at least store them securely in a system that can preserve the order in which records arrived. Additionally, the speed of the arriving data means you want to limit the number attributes arriving through your stream to only what is needed to be displayed in real time. This could perhaps mean that some attributes are being removed and therefore a mop-up routine that pulls the remaining data into your data warehouse for later analysis could also be required.

A further challenge is the granularity and isolation of the data. Because each record is isolated from all other records and also not processed in the same scope as any other records, aggregating data to achieve a different grain cannot be done as part of the constant stream but would have to be completed in a subsequent processing phase. This can delay the records arriving in your destination and should encourage you to keep data as granular as possible right through to the destination. The other side of this challenge is the isolation. Unlike in event or batch processing where we, at some point at least, have a chunk of static data for a period of time which can then be used to join and transform, the data is constantly in motion, although streamed records could be joined to reference datasets.

Finally, the layered approach used in both batch-based and event-based processing does not really apply here; the data does not sit still for long enough. Therefore, data cleaning and standardization needs to be written directly into the stream and requires developers to maintain multiple sets of cleaning logic.

Implementing Stream Ingestion

In this section, we will look at a method for stream-based ingestion. The core of this method utilizes event hub and stream analytics; however, there are some differences depending on the location. While one version writes data directly into a destination database, the other writes data into a Blob Storage account and assumes the streamed data will be batched and processed in batch mode. At the time of writing, hierarchical namespace-enabled storage accounts are only supported in preview mode.

Stream Ingestion with Azure Event Hub’s and Stream Analytics Jobs

As mentioned, the core of this method uses Azure Event Hubs, which by design makes many sources available as inputs to your streaming job, and likewise, there are many options for destination. This gives ultimate flexibility when working in Azure as streams can be routed to wherever needed through a very simple interface. Another great advantage to this method is that the streaming jobs are written in an approximation of SQL and so are easy to pick up for developers already working in a SQL-based environment. The actual development of these streaming jobs is performed from within the Azure portal and can easily be tested using the UI. This does however pose a problem from a source control perspective, as the streaming jobs themselves cannot be source controlled other than in ARM templates.

A key feature of Azure Stream Analytics Jobs is the ability to integrate with reference datasets. Because records are being loaded very regularly and may only have a minimal amount of data, such as an IoT device, the reference data allows for the streamed records to be enriched while in flight. Azure Stream Analytics supports reference data stored in Blob Storage or in an Azure SQL Database. In the case of Blob Storage, the data is stored under a set path and then split by date and time values as lower levels of the directory; the reference query can then load each set of data as and when it becomes relevant. Alternatively, Azure SQL Database reference data can either be queried fetched once at the start of the job and used throughout or periodically refreshed down to the grain of a minute. For very static data then, a single dataset will likely suffice; however, if you have regularly updated values, then you can make use of this periodic refresh capability. Additionally, if the dataset is very large, then a delta query can be supplied to avoid lengthy operations that would result in timeouts. Reference data is loaded into memory by the streaming runtime, and this allows for very quick joining of data. However, this dataset is limited to a size of 300 MB at a level of six streaming units and above. Less than six and the limit is half of that if not smaller. This means that care needs to be taken when writing the reference data queries to ensure that the snapshot times are correct and that deltas are employed if needed. Further to this, compression is not supported for reference data.

Stream Ingestion for Azure Blob Storage

As previously mentioned, Azure Data Lake Gen 2 is not a generally available destination for a streaming analytics job; it is currently in preview for select regions and therefore not a reliable production option. While this does present an issue from an architectural conformity perspective, it does make the route for inputting data from streaming devices simpler. Data is simply written into a file that is hosted in a specific directory. In order to assist with file management, the stream analytics config UI allows for dynamic placeholders to be used so that file names and directory locations can accurately describe the window of data that is contained. Examples of these are shown as follows. Alongside these placeholders, a minimum batch row count and maximum time window can be set to restrict the size of each batched file. For example, the maximum time parameter can be set so that a batch is written every 5 minutes, even if that does not meet the minimum row count. In this scenario, an event can be raised by the Blob Storage account when the batch is completed, which is then interrupted and processed by Azure Data Factory. These batch windows can be 1 minute or greater, and while this is not quite record by record immediacy, it certainly is near real time and would comfortably cover off many fast-moving data scenarios.

Stream Ingestion for Azure SQL Database

The preceding streaming approach shows how a stream can be rolled into a batch and then ingested; however, direct integration is also very simple to achieve by using Azure SQL Database as a destination. Records can be inserted near to the rate of generation if required, and while this speed is clearly a benefit, it can pose issues when it needs to be integrated with other datasets. Without the rigorous structure of the ETL processing, you must assume that the data arriving from a stream may have some cleaning problems that are only acceptable given the speed at which the records arrive. To work around these problems and blend the data with the slower moving batch or event-based data feeds, you should employ a lambda architecture.

The Lambda Architecture

The lambda architecture approach is defined as a blend of streaming and batch-based ingestion that allows for historical, well-curated data to be seamlessly integrated with high-velocity data, allowing for a cohesive and contextualized view of real-time information. The technical challenges when implementing a lambda solution are not generally the individual feeds; while streaming and batch-based feeds can present their own challenges, the main issue to overcome is that of integrating the data. Slow-paced, batch, or event data will be well cleaned, prepared, and accurate; however, streamed data will often be very raw and close to its original source format. For this reason, there is the need to have a serving layer that can tightly control the way in which data is presented to querying applications and users. Additionally, a modicum of cleaning can be applied here at the frequency defined by the business requirements without slowing down or interfering with the stream.

Blending Streams and Batches

In all cases, stream data should be loaded into a separate table that is designed specifically for the stream and never into a pre-existing warehouse table. This is so that the accuracy and integrity of your warehouse is not compromised but also so that long running processes for loading dimensions and facts are not continually interrupted by frequently arriving stream data. Given the two tables of information, a view should be built that consistently picks common attributes from both tables and present them in a way that is transparent to the end user or application. Due to the nature of streamed data, the records in this table will be far less enriched than those of a proper warehouse table, and therefore the view should utilize logic that can provide defaults or lookup values in order to make the streamed records meaningful alongside the warehouse datasets.

The Serving Layer

Any tables or views that are exposed to a user or application need to be carefully designed; however, with a lambda solution, this is even more critical. The requirements of the reports need to be well understood so that the real-time data can deliver the metric values to fulfil these requirements but also the absolute minimum set of dimension attributes so that the aggregated values can be sliced and diced. In some cases, the warehouse data may already be aggregated and stored at a higher grain than that of the streamed data. In those cases, the logic behind the view layer will need to aggregate the granular data to the correct grain and then blend the two datasets.

As mentioned earlier, a degree of cleaning could be performed in this layer and in fact this is highly recommended. By allowing the stream processors to focus solely on pushing data to your database or file system, you ensure that records are passed off quickly and reliably. Any increase in complexity midstream only makes the process more likely to fail while also consuming more streaming units without a returnable benefit. Even if the serving layer was queried every 30 seconds, this would allow ample time to apply common data cleansing techniques on the delta of records since the previous 30 seconds.

In most lambda scenarios, the records arriving from the speed layer would be considered fact information in that they are individual transactions or readings from a sensor, for example. Each of these records is then loaded and stored at their lowest grain and aggregated into the serving layer. To enrich the existing fact data, streamed records can be unioned, that is, to join the data vertically like stacking to the existing data. The key here is that both sets of data must have the same schema, and so the stream records must hold a minimum set of attributes to allow this to happen or at least be enriched to that point, within the serving layer. There is however the possibility that the stream data applies to existing dimension records, for example, customer statuses that are regularly changing. In this scenario, dimension data that is already enriched is then further enriched with real-time data. In this case, speed data is joined horizontally and so needs to have at least one joining characteristic, for example, customer id, so that both versions of the record can be aligned.

The following code shows how a core warehouse table and a stream table can be unioned together to present a consistent set of facts to an end user. Additionally, the streamed data is being enriched with product names as part of the view definition, instead of these taking up part of the stream. Listing 4-5 shows how the two tables can be unioned to create a single presentation view.
CREATE VIEW Warehouse.Sales
AS
    SELECT
         [SalesPerson]
        ,[SalesAmount]
        ,[ProductName]
        ,[ProductId]
        ,[CustomerId]
    FROM [Clean].[Sales]
UNION ALL
    SELECT
         'Anon Sales'
        ,[SalesAmount]
        ,p.[ProductName]
        ,p.[ProductId]
        ,[CustomerId]
    FROM [Stream].[Sales] AS s
    INNER JOIN Warehouse.DimProduct AS p
        ON p.ProductId = s.ProductId
SELECT * FROM Warehouse.Sales
Listing 4-5

Creating a single presentation view

Assessing the Approach

The goal of this chapter was to outline a range or ingestion architectures that can be employed in varying degrees across a data warehouse ETL solution. By having a full understanding of the risks, opportunities, and implementation considerations of each approach, you can determine how each might fit with the data you have to process into your data warehouse. The approaches in this chapter were laid out in order of complexity, and so if ETL in Azure is a new concept to you and the developers you work with, then a batch-based architecture is a great starting point. By implementing this and doing it well, you will have the fundamental building blocks for an event-based architecture. Only once you have a solid grasp of how these two methods hydrate your warehouse should you begin to plan how streamed data could be used to further enrich your data warehouse.