Chapter 2. Real-Time Continuous Data Collection
As a starting point for all streaming integration solutions, data needs to be continuously collected in real-time. This is referred to as a streaming-first approach, and both streaming integration and streaming analytics solutions cannot function without this initial step. The way in which this is achieved varies depending on the data source, but all share some common requirements:
Collect data as soon as it is generated by the source
Capture metadata and schema information from the source to place alongside the data
Turn the data into a common event structure for use in processing and delivery
Record source position if applicable for lineage and recovery purposes
Handle data schema changes
Scale through multithreading and parallelism
Handle error and failure scenarios with recovery to ensure that no data is missed
The following sections explain how we can implement these requirements for a variety of different source categories – databases, files and logs, messaging systems, cloud and APIs, and devices and IoT – and will provide examples to clarify each case.
Databases and Change Data Capture
A database represents the current state of some real-world application and is most meaningful in the context of transaction processing. Applications submit queries and updates from a number of network endpoints that are managed as a series of transactions for state observance and transition.
From the late 1970s to the beginning of this century, the term “database” has been commonly used to refer to a relational database in which the underlying entities and relationships between those entities are modeled as tables. Over the past two decades, this term has become an umbrella term for relational database systems as well as with the emerging NoSQL systems, which in turn also has become an umbrella term for key-value stores, document stores, graph databases, and others. These databases can be centralized or distributed. They can also be maintained on-premises or stored in the cloud.
However, since databases represent the current state of the data within them, and querying them only returns that state, they are not inherently suited to streaming integration through the query mechanism. Another approach is required to turn the database into a source of streaming data: CDC.
When applications interact with databases, they use inserts, updates, and deletes to manipulate the data. CDC directly intercepts the database activity and collects all the inserts, updates, and deletes as they happen, turning them into streaming events.
Several CDC methods have been in use for decades, each with its own merits depending on the use case. In high-velocity data environments in which time-sensitive decisions are made, low-latency, reliable, and scalable CDC-powered data flows are imperative.
The business transactions captured in relational databases are critical to understanding the state of business operations. Traditional batch-based approaches to move data once or several times a day introduce latency and reduce the operational value to the organization. CDC provides real-time or near-real-time movement of data by moving and processing data continuously as new database events occur. Moving the data continuously, throughout the day, also uses network bandwidth more efficiently.
There are several CDC methods to identify changes that need to be captured and moved. Figure 2-1 illustrates the common methods.
Let’s discuss the advantages and shortcomings of CDC methods:
By using existing LAST_UPDATED or DATE_MODIFIED columns, or by adding one if not available in the application, you can create your own CDC solution at the application level. This approach retrieves only the rows that have been changed since the data was last extracted. There might be issues with the integrity of the data in this method; for instance, if a row in the table has been deleted, there will be no DATE_MODIFIED column for this row and the deletion will not be captured. This approach also requires CPU resources to scan the tables for the changed data and maintenance resources to ensure that the DATE_MODIFIED column is applied reliably across all source tables.
- Table differencing
By comparing the tables to be replicated in the source and target systems by running a diff, this approach loads only the data that is different to enable consistency. Although this works better for managing deleted rows, the CPU resources required to identify the differences is significant and the requirement increases in line with the volume of data. The diff method also introduces latency and cannot be performed in real time.
Another method for building CDC at the application level is defining triggers and creating your own change log in shadow tables. Triggers fire before or after INSERT, UPDATE, or DELETE commands (that indicate a change) and are used to create a change log. Operating at the SQL level, some users prefer this approach. However, triggers are required for each table in the source database, and they have greater overhead associated with running triggers on operational tables while the changes are being made. In addition to having a significant impact on the performance of the application, maintaining the triggers as the application change leads to management burden. Many application users do not want to risk the application behavior by introducing triggers to operational tables.
- Log-based CDC
Databases contain transaction (sometimes called redo) logs that store all database events allowing for the database to be recovered in the event of a crash. With log-based CDC, new database transactions – including inserts, updates, and deletes – are read from source databases’ transaction or redo logs. The changes are captured without making application-level changes and without having to scan operational tables, both of which add additional workload and reduce the source systems’ performance.
Log-Based CDC Best Suited for Streaming Integration
CDC and, in particular, log-based CDC (Figure 2-2), has become popular in the past two decades as organizations have discovered that sharing real-time transactional data from Online Transaction Processing (OLTP) databases enables a wide variety of use-cases. The fast adoption of cloud solutions requires building real-time data pipelines from in-house databases in order to ensure that the cloud systems are continually up to date. Turning enterprise databases into a streaming source, without the constraints of batch windows, lays the foundation for today’s modern data architectures.
Streaming integration should utilize log-based CDC for multiple reasons. It minimizes the overhead on the source systems, reducing the chances of performance degradation. In addition, it is nonintrusive. It does not require changes to the application, such as adding triggers to tables would do. It is a lightweight but also a highly performant way to ingest change data. Although Data Manipulation Language (DML) operations (inserts, updates, deletes) are read from the database logs, these systems continue to run with high performance for their end users.
The ingestion of change data through CDC is only the first, but most important, step. In addition, the streaming integration platform needs to incorporate the following:
Log-based CDC from multiple databases for nonintrusive, low-impact real-time data ingestion to minimize CPU overhead on sources and not require application changes.
Ingestion from multiple, concurrent data sources to combine database transactions with semi-structured and unstructured data.
End-to-end change data integration, including:
- Zero data loss guarantee
Data loss cannot be tolerated by a downstream application due to the nature of data tracked in database systems. This means that if the external database system or the CDC process fails, event checkpointing must guarantee oldest active events of interest are carefully tracked by the CDC process.
- Event delivery guarantees
Exactly-once processing (E1P) and/or at-least-once processing guarantees must be preserved. This requires an understanding of consuming systems and the atomicity semantics they support.
- Ordering guarantees
Events are propagated in commit order or in generation order. So, data that’s generated in transactional order must be able to retain that order and at source side transactional boundaries as required.
- Transaction integrity
When ingesting change data from database logs, the committed transactions should have their transactional context maintained. Throughout the whole data movement, processing, and delivery steps, this transactional context should be preserved so that users can create reliable replica databases.
- In-flight change data processing
Users should be able to filter, aggregate, mask, transform, and enrich change data while it is in motion, without losing transactional context.
- Schema change replication
When a source database schema is modified and a Data Definition Language (DDL) statement is created, the streaming integration platform should be able to apply the schema change to the target system without pausing.
Turning change data to time-sensitive insights. In addition to building real-time integration solutions for change data, it should be possible to perform streaming analytics on the change data to gain immediate insights.
Log-based CDC is the modern way to turn databases into streaming data sources. However, ingesting the change data is only the first of many concerns that streaming integration solutions should address.
Files and Logs
Many applications such as web servers, application servers, IoT edge servers, or enterprise applications continually generate data records that are written to files or logs. These files can be on a local disk subsystem, a distributed filesystem, or in a cloud store.
This data contains valuable information needed for operational analytics. In batch processing Extract, Transform, and Load (ETL) systems, these files are written to and closed before being read by ETL. However, for real-time systems it is essential to be able to perform real-time data collection on files that are currently being written to (open files).
Data Collection from Filesystems
Collecting real-time file data requires a set of algorithms to detect changes to the files/directories/nodes:
Understanding the contents of the underlying file formats to be able to parse the file records
Maintaining position offsets to reflect the current EOF (end of file) markers for subsequent collection
Identifying torn/partial records
Recovery handling to address various failure scenarios
Traditional ETL has successfully managed to extract data from files after a file is complete. But for real-time processing, new records need to be collected as soon as they are written to keep the propagation latency at a lower granularity than the file size.
There are several common patterns that occur in real-time stream processing during ongoing file generation that need to be supported and pose significant technical challenges. Some examples include the following:
Support for multiple filesystems including Linux (ext*), Windows (NTFS), Hadoop (HDFS), network based (NFS), Cloud Storage systems (AWS S3, Azure ADLS, Google GCS, etc.).
Support for multiple file formats such as JSON, DSV, XML, Avro, Thrift, Protocol Buffers, and Binary.
Support for reading from multiple directories and subdirectories from which files need to be read. It’s not always possible to have one central repository where all files can be generated.
Support for data parsing using static and dynamic record delimiters.
Support for data collection using wildcarding at the levels of files and directories.
Support for data collection when files are in sequence and rollover to the base sequence.
Managing the number of open file descriptors.
Event guarantees with respect to data loss, at-least-once, or at-most-once processing.
Handling schema changes.
Of all the types of sources that can provide data for streaming integration, messaging systems are the most natural fit. They are inherently real time, and push data to consumers. In fact, messaging systems are often a required component of a streaming integration solution, being necessary for the continuous movement of data.
Messaging systems usually consist of producers that deliver messages to brokers to be read by consumers. To continuously collect data from a messaging system, the streaming integration solution needs to be able to connect to a broker as a consumer.
With the rapid increase in the adoption of cloud technologies in the past few years, messaging systems have also been introduced by cloud providers. Microsoft Azure Event Hubs, Amazon Kinesis, and Google Pub/Sub all provide cloud-based messaging platforms that are designed to elastically scale and support streaming and message-driven applications in the cloud.
Because heterogeneous integration and the collection of data from any enterprise (or cloud system) is an essential part of streaming integration, you need to consider all of these different types of messaging systems. Scalability of continuous collection is key given that most of these systems can handle tens of thousands to millions of messages per second.
Data Collection from Messaging Systems
There are two major considerations when working with messaging systems. First, the system needs to connect to the messaging provider and subscribe to receive messages utilizing some API. There are often security, compression, encryption, and architectural pieces to this that need to be resolved within a messaging adaptor.
Second, data needs to be extracted from the message. In addition to a data payload that can be in text, binary, key-value, or other forms, there are additional system and header properties that can contain useful information.
Different messaging systems require different APIs. Aside from Kafka, which has its own API, a good majority of messaging systems support the JMS API or AMQP protocol.
Collecting data from Java Message Service systems
When connecting to Java Message Service (JMS) systems, you need to first create an initial context that contains information about connecting to the provider, such as broker URL and security credentials. This information varies by provider. From this, you can obtain a connection factory either directly, or by looking up the service through Java Naming and Directory Interface (JNDI). The factory then allows you to create a connection to the provider and create a session through which you will send and receive messages.
For data collection, the interest is in receiving messages and these can be either from queues or topics. Queues are typically point-to-point and only one consumer will receive a message sent to a queue. Topics provide a publish/subscribe topology in which every subscriber will receive a copy of a published message. Queues and topics each have their own separate issues in the areas of scalability and reliability.
Because queues allow only a single consumer to receive a copy of a message, it is impossible to use an existing queue as a data source without breaking any existing data flow. Instead additional queues (or topics) need to be added with existing messages also routed to these new destinations.
Reading from queues has delivery guarantees that will ensure all messages are seen, but this might require persistent options to handle failure scenarios. Topics are more suited for data collection because they can have multiple subscribers. However, it is important that such subscribers are durable. This means that messages are kept until every subscriber has received them; otherwise they will just be discarded.
The biggest problem with collecting JMS data is recovery. Although JMS supports transactions, it does not permit repositioning, or rewinding within a queue or topic. In complex stateful processing pipelines utilizing windows or event buffers, recovery often requires replaying old events, which is not possible using the JMS API.
Collecting data from Apache Kafka
Apache Kafka is a high-throughput distributed messaging system. It utilizes a publish/subscribe mechanism and is inherently persistent, writing all messages to a distributed commit log. Clients connect to brokers as either producers or consumers, with producers sending messages to topics and consumers receiving them as subscribers to that topic. When a producer sends a message, it is stored in an append-only log on disk. The broker can be clustered over a large number of machines, with the data partitioned and replicated over the cluster.
When producers send a message to a broker, a partition key is used to determine which partition, and therefore which machines in a cluster, need to write the data to the log, with each partition written to a separate physical file. The broker can write the data to one or more machines for reliability and failover purposes. The logs are retained for a period of time, and consumers manage their own read location in the log. This enables consumers to come and go, and run at their own speeds without affecting other consumers.
Consumers belong to a consumer group, with each consumer in a group being assigned to one or more partitions. Each consumer group subscribed to a topic will receive all the messages sent to that topic, but the individual consumers in that group will receive only those messages belonging to its partitions. There cannot be more consumers than partitions, so deciding on a partitioning scheme for a topic is an essential early consideration. Importantly, because each consumer needs to keep track of the log position it is reading from, it is possible for consumers to position backwards and replay old messages, as long as they are retained on disk.
When collecting data from Kafka, it is important to consider both scalability and reliability scenarios.
Scalability of reading from Kafka is directly related to the number of partitions specified for a topic. To read in-parallel from a topic using multiple consumers, it is necessary to have at least as many partitions as consumers. It is possible to add additional partitions to a topic later, but this affects only new data, and it is impossible to reduce the number of partitions. Adding new consumers to a group dynamically (either as additional threads or in separate processes or machines) up to the partition limit enables more data to be read in parallel.
The major difference between Kafka and the other messaging systems is that Kafka requires consumers to track their read positions. This helps with reliability considerations because, in the case of failure, consumers can not only pick up where they left off, but can also rewind and replay old messages. By tracking the read position of consumers, and understanding how far those messages have progressed through a processing pipeline, it is possible to determine how far back consumers need to rewind to rebuild state before processing can resume.
Handling different data formats
The messaging systems described previously have different approaches to understanding the data being transmitted. JMS supports several types of messages including raw bytes, a serialized Java Object, text, and name/value pairs. Both AMQP and Kafka inherently send data as raw bytes, but AMQP can also specify the content-type in a way consistent with HTTP, whereas Kafka can utilize a separate schema registry to define the data structure of messages on a topic.
In most practical situations, however, the data is text serialized as bytes and formatted as delimited data, log file entries, JSON, or XML. From a collection perspective, it is important, then, to enable flexible parsing of text (similar to files) as part of working with messaging systems.
Cloud and APIs
Increasingly, enterprise applications are being deployed in the cloud in a SaaS multitenant pattern. Apps like Salesforce’s Sales Cloud, Workday Payroll, and HCM offer a subscription-based model to streamline business processes and enable rapid business transformations, and promise to incorporate newer intelligence in their applications using AI and machine learning. Many enterprises thus are gradually adopting a hybrid cloud deployment model in which newer applications are moving to the cloud.
Rarely will all of a corporation’s business applications run on a single public cloud. Often there’s a compute grid that spans multiple clouds and on-premises systems, across operations and analytics environments. To gain real-time visibility, data from these cloud SaaS applications also needs to be made available in a streaming manner. In fact, if on-premises systems are set up to receive streaming changes from on-premises applications, a SaaS checklist must include requirements to get data transferred in real time from the SaaS environment.
Some of the techniques we have discussed in previous sections might not pertain to SaaS environments because of inaccessibility of underlying platforms/databases due to security considerations (for example, the opening of certain network ports), service-level agreement (SLA) requirements (ad hoc CDC initial loads), or multitenancy manageability concerns (special trigger handling for CDC). Generally, data for business objects is made available in a batch via a bulk API or in real time via streaming APIs.
Devices and the IoT
The IoT has garnered a lot of attention as a big driver of digital transformation within many industries. Simply put, IoT is the worldwide collection of devices, sensors, and actuators that can collect, transfer, and receive data over a network without requiring human interaction. The “things” in IoT can refer to the devices themselves, or the objects they are monitoring, including people, animals, vehicles, appliances, and machinery.
Despite the reference to the “internet” in the name, IoT does not need to transfer data over the web. Internet here is a reference to Internet Protocol (IP) which enables data packets to be delivered from a source to destination based only on IP addresses. IoT works across any IP-based network, facilitating internal as well as external use cases. The notion that IoT requires devices to deliver data to the cloud is restrictive given that many commercial, industrial, and healthcare use cases keep the data private.
Data Collection from IoT Devices
The term IoT device is very broad and encompasses a wide range of hardware. A single temperature sensor sending data over WiFi can be considered an IoT device. However, a device that includes a temperature sensor, alongside other measurements and logic, such as a smart thermostat, weather station, or fire alarm, could also be an IoT device. Devices can be further combined to produce larger-scale “devices,” such as connected cars, smart refrigerators, or home security and control systems.
The size and electrical power available for the device will dictate to some degree how much computing power the device has and the types of protocol it can support. Smaller devices tend to have very little memory or CPU capabilities and require lightweight protocols for transmitting data. Larger devices can do more processing, utilize more complex code, and support heavier-weight, more resilient protocols. Figure 2-3 shows streaming integration for IoT.
The simplest protocols used by IoT are TCP and UDP at the transport layer of the TCP/IP network model, sending data directly as network packets to a destination. At the application layer, existing protocols can be used, and new protocols have emerged. HTTP and HTTPS (secure HTTP) are common, often implemented as JSON being sent over Representational State Transfer (REST) calls.
Message Queuing Telemetry Transport (MQTT) and WebSockets are common publish/subscribe protocols allowing two-way communication with devices. OPC-UA (OPC Unified Architecture from the OPC Foundation) is a next-generation standard that defines a client/server protocol mainly for industrial applications, utilizing UDP or MQTT under the hood for data transport.
Aside from the transfer protocol, the other consideration is the data format. There is no real standard for IoT devices, so integration needs to be considered on a case-by-case basis. JSON is common, but data can also be binary, delimited, XML, or appear in a proprietary textual form.
IoT Scalability Considerations
Any discussion of IoT data almost always includes the concept of edge processing. Edge processing is when computation is placed as close as possible to the physical edge device – usually in it – making the IoT device as “smart” as possible. IoT is fairly unique in the data world in how much data it generates, in that there can be hundreds, thousands, or even millions of individual devices all generating small amounts of data. Even if a single sensor or device generates data only 10 times per second, when this is multiplied by the number of devices, it can quickly become overwhelming.
A lot of this data is repetitive, redundant, or just not that interesting. It is the information content present in that data that is really required. (See the previous discussion on Value for the difference between data and information.)
A simple example is a temperature sensor. If a single sensor reads the temperature 10 times per second, it will generate 36,000 data points per hour. If the temperature stays at 70 degrees throughout that time, the information content is one item: “70 degrees for an hour.”
To reduce the amount of data generated by IoT, data from multiple individual sensors can be collected through a single edge device. Here, the data can be filtered, aggregated, and transformed to extract the information content. The important thing here is to not only do statistical analyses and send summary information, but also to be able to react instantly to change. Most edge processing is a combination of statistical summarization plus immediate change detection and sensor health signals.
Using the temperature sensor example, a combination of statistical summarization (min, max, average, standard deviation, etc.) within a specific period, combined with pings every minute to indicate the sensor is alive, and immediate messages if the temperature changes dramatically (plus/minus two standard deviations above mean) will drastically reduce the amount of data that needs to be collected.
The types of processing that can be done at the edge can be quite complex, even incorporating machine learning and advanced analytics for rapid response to important events. This is discussed in more detail in Chapter 6.