Building a real-time data warehouse-when TiDB meets Pravega

Author introduction: Wang Tianyi, TiDB community department architect. He has worked in Fidelity Investment and Softbank Investment, has rich experience in database high availability scheme design, and has in-depth research on the high availability architecture and database ecology of TiDB, Oracle, PostgreSQL, MySQL and other databases.

Data warehouse is a basic service that must be provided after the company's data has developed to a certain scale, and it is also a basic link in the construction of "data intelligence". In the early days, data warehouses were mostly offline, mainly processing T+1 data. With the advent of the Internet era, there are more and more scenarios for real-time data processing. Offline data warehouses can no longer meet the real-time needs of business development. In order to better solve the real-time requirements of business scenarios, the construction of real-time data warehouses has become an inevitable trend , which is also one of the important capabilities of the HTAP database.

Compared with offline data warehouses, real-time data warehouses mainly deal with T+0 data, which has higher real-time performance and perfectly meets the needs of efficient business operation. In terms of architecture, real-time data warehouses usually use Flink to consume data in Kafka and write data streams to the database in real time. Although this solution solves the timeliness of data processing, in many cases, because Kafka does not have a disk placement mechanism, data in the message queue may be lost in extreme situations.

In response to the above problems, the author investigated the database and storage engine on the page and found a new solution for real-time data warehouses that can completely solve the problem of Kafka placement.

First of all, in the choice of database, consider the more scalable distributed database TiDB, which solves the problem of massive data storage from the database level, followed by the distributed stream storage engine Pravega, which solves the problem of data loss and data loss using traditional message queues. The automatic scaling problem improves the parallelism, availability and safety of the real-time data warehouse system. **The following will be analyzed in detail.

TiDB meets Pravega by chance

Pravega is a DellEMC open source streaming storage project and has entered the CNCF sandbox stage. From a functional point of view, in addition to being similar to Apache Kafka and Apache Pulsar, Pravega provides stream and schema registry. In addition, the most important features of Pravega are: (1) Auto-scaling without application awareness and (2) is a complete storage interface, providing an abstract interface with stream to support the unification of upper-layer computing engines Access.

Distributed message delivery is generally based on reliable message queues, and messages are delivered asynchronously between client applications and messaging systems. When it comes to message queues, Kafka cannot be bypassed anyway. Kafka is a distributed, multi-partition, multi-copy, multi-subscriber , distributed log system based on Zookeeper coordination. Pravege is a new architecture summarized in the practice of using Kafka.

Pravega refactored the architecture of streaming storage . As a streaming real-time storage solution, applications can directly persist data to Pravega. It is precisely because Pravega puts the data on HDFS/S3, it is no longer limited to the retention of the data, and only one copy of the data is stored in the entire big data pipeline.

Why Pravega reinvents the wheel

As a superficial Kafka user, there are three problems that bother me:

The problem of data loss is that more data is eaten in, but less data is spit out. If the offset is submitted, there is a risk of data loss.

acks = all, only when all consumers confirm that the message is saved, ack will be returned, and no data will be lost.

acks = 1. When the leader consumer saves the message, it returns ack. If the receiving leader fails to back up after confirmation, it will hang up, and data will be lost.

acks = 0, do not wait for any confirmation, data will be lost when the receiver hangs up.

Kafka data is limited by retention, and there is no simple and efficient hdfs/S3 placement solution. Although the commercial version provides this function, once the data is moved, you must use two sets of storage interfaces to mix and access data at different levels.

To introduce flume, you can follow the link of kafka -> flume -> hdfs.

Introducing kafka hadoop loader, you can follow the link of kafka -> kafka hadoop loader -> hdfs.

Introducing kafka-connect-hdfs, you can follow the link of kafka -> kafka-connect-hdfs -> hdfs.

The Consumer rebalance process is very harmful.

In the process of consumer reblance, the consumption of the queue may be suspended due to the increase of cunsumer.

In the process of consumer reblance, there may be a problem of repeated consumption due to the long submission interval.

Both suspension of consumption and repeated consumption may cause a backlog of messages, and there will be a consumption spike problem after the reblance ends.

So in the process of building the wheel, what problems did Pravega solve? The following is a comparison between Pravega and Kafka :

The special thing about Pravega is that although it uses Bookkeeper to handle parallel real-time data low-latency write problems like many open source products, Bookkeeper is only used as the first batch write to HDFS/S3 in Pravega. Phase (the only exception is when recovering after an unexpected node failure). All Pravega reads are directly applied to HDFS/S3 to take advantage of their high throughput capabilities.

Therefore, Pravega does not regard BookKeeper as a data caching layer, but only provides a new storage layer based on HDFS/S3 to meet the abstraction of both " low-latency tail-read and tail-write " and " high-throughput catch-up read ". Therefore, unlike most projects that use "layered" design, performance cannot be guaranteed when data is moved between BookKeeper and HDFS/S3.

Back to the pain point

Most DBAs or operation and maintenance personnel are most concerned about three things: the correctness of the data, the stability of the system, and the ease of use of the system . The correctness of the data is the foundation of the DBA. Data loss, data damage, and data duplication are a huge blow to the company; stability and ease of use free the hands of the DBA, allowing the DBA from the complicated operation and maintenance work Free yourself, and have more time to focus on issues related to architecture selection and system adaptation.

From these three points, Pravega has indeed solved the pain points of most operation and maintenance personnel . Long-term retention ensures data security, Exactly-Once Semantics ensures data accuracy, and Auto-scaling makes maintenance itself easy. These characteristics make people more willing to conduct further research and adaptation to Pravega.

TiDB and Pravega's new solution for real-time data warehouse

Before the release of TiDB 5.0, its MPP architecture was mainly to split the business load into several tasks and push it down to multiple servers and nodes. After the calculation task of each node is completed, it is merged into the final result and delivered to the user. In TiDB 5.0, TiFlash will fully supplement the computing power of TiDB, and TiDB will degenerate into a master node in the OLAP scenario. Based on the MPP architecture, users will send query SQL to TiDB Server, and this query SQL will be borne by the shared TiDB server. These TiDB servers will join and then hand it over to the optimizer to make decisions. The optimizer will use row memory, column memory, certain indexes, stand-alone engines, MPP engines, or use different combinations to generate different execution plans, all into the same cost model for evaluation, and finally select the best one action plan.

In some order transaction systems, it is possible that the business peak can be reached quickly in a short time due to promotional activities. Often this kind of instantaneous traffic peak requires us to quickly perform analytical queries, so as to give feedback within a limited time to influence decision-making. The traditional real-time data warehouse architecture is difficult to carry traffic peaks in a short period of time, and subsequent analysis operations may require a lot of time to complete. If you use traditional computing engines, you may not be able to perform aggregation analysis operations in seconds. With the MPP calculation engine, it is possible to convert the predictable traffic peak into the physical cost of capacity expansion, and achieve a second-level response. **With the support of the MPP computing engine, TiDB can better handle massive data queries of analysis types.

The architecture of the real-time data warehouse solution

Real-time data warehouse has experienced three important milestones :

The emergence of Storm broke the single calculation method of MapReduce, allowing businesses to process T+0 data;

The evolution of Lambda to Kappa architecture, transforming offline data warehouses into real-time data warehouses;

The emergence of Flink gave a better way to practice batch-flow integration.

The structure of real-time data warehouse is constantly changing. Many times, when we just selected a set of architecture models, the technology stack of the data warehouse was still iterating at a high speed. We can't predict what kind of technical architecture will appear after Lambda and Kappa, but we can snoop on one or two through the current architecture. Generally speaking, we can divide the real-time data warehouse into four parts: real-time data collection terminal, data warehouse storage layer, real-time computing layer, and real-time application layer. The integration of multiple technology stacks can help us build a borderless big data basic platform, helping us to support analysis and mining, business launch and batch stream processing at the same time.

With the advancement of digital transformation, more and more companies are facing unprecedented data scale. With the increasing business competition, both external users and internal company decisions can no longer rely on offline data analysis with poor timeliness. More real-time data analysis, and even analysis of ongoing transaction data, to support more agile business decisions.

for example:

The best effect of the risk control scenario is to prevent problems before they happen, so of the three schemes before and after the event, the effects of early warning and during the event are the best. This requires that the risk control system must be real-time.

During the e-commerce promotion period, we hope to monitor sales in a stable and timely manner rather than historical data.

Traditionally, data analysis/data warehouse solutions based on Hadoop or analytical databases have obstacles that cannot well support real-time analysis; NoSQL solutions such as HBase can support good scalability and real-time performance, but they cannot provide all of them. The required analytical capabilities; traditional stand-alone databases cannot provide the scalability required for data analysis.

After integrating TiFlash, TiDB realizes the combination of OLTP and OLAP, which can be applied to transactional database scenarios, analytical database scenarios, or separate areas in the OLTP business to complete analytical queries. With Flink, TiDB can be well adapted to Pravega, providing a real-time, high-throughput, and stable data warehouse system. Meet the needs of users for analysis of various types of data in big data scenarios.

In order to facilitate readers to better understand, we provide a Pravega -> Flink -> TiDB path based on docker-compose in github tidb-pravega-quick-start. After docker-compose is started, you can write and submit Flink tasks through Flink SQL Client, and observe the execution through HOST_IP:8081.

Currently, TiDB + Pravega is building a real-time data warehouse program to recruit experience officers for the community! The new data warehouse program is the first to experience, and you can also get the beautiful surroundings of the TiDB community and Pravega community. If you have any questions in the process of exploration and practice, the community will provide certain technical support~ Interested friends, please scan the code and sign up!

πŸ‘‡ New digital warehouse plan, scan the code to try it out firstπŸ‘‡

Supplementary reading: Why choose TiDB?

TiDB is a database characterized by HTAP. It is positioned as a fusion database of online transaction processing/online analytical processing HTAP (Hybrid Transactional / Analytical Processing). It has one-click horizontal scalability , strong consistency, multiple copies of data security, and distributed transactions. , Real-time HTAP and other important features, compatible with MySQL protocol and ecology, convenient migration, and low operation and maintenance costs.

Compared with other open source databases, in the construction of real-time data warehouses, TiDB can not only be used to store highly concurrent transaction data, but also can deal with complex analytical queries, which is undoubtedly very friendly to users. Starting from 4.0, TiDB has introduced the TiFlash column storage engine, which can physically isolate real-time business requirements and analysis requirements in the storage layer. In addition, TiDB also has the following advantages :

HTAP architecture based on row memory and column memory:

Provide a complete index and high concurrent access for precise positioning of detailed data to meet the high QPS point check.

High-performance MPP framework and updatable column storage engine. After the data is updated, it can be synchronously modified to the column storage engine in real time, so that the system can access the latest data with the read performance of the analytical database to meet the real-time query needs of users .

A set of entrances meets both AP and TP requirements. The optimizer will automatically determine whether to perform TP access, index selection, column storage or MPP calculation mode based on the type of request, simplifying the complexity of the architecture.

Flexible expansion and contraction: Flexible and convenient expansion and contraction of TiDB, PD, and TiKV components in the online environment without affecting the production environment and transparent operation.

SQL standard and compatible with MySQL protocol: support standard SQL syntax, including aggregation, JOIN, sorting, window function, DML, online DDL and other functions. Users can perform flexible analysis and calculation on data through standard SQL.

Simple management: Use the TiUP tool to quickly complete the construction and deployment of the cluster environment; it does not need to rely on other systems during normal operation, and it is easy to get started; it provides its own monitoring system to facilitate performance bottleneck analysis and troubleshooting.

In addition, in terms of architecture , the application of TiDB in real-time data warehouse scenarios also has unique advantages.

First of all, the kernel design , TiDB distributed database splits the overall architecture into multiple modules, and each module communicates with each other to form a complete TiDB system. The corresponding architecture diagram is as follows:

TiDB Server: The SQL layer, which exposes the connection endpoint of the MySQL protocol to the outside, is responsible for accepting client connections, executing SQL analysis and optimization, and finally generating a distributed execution plan.

PD (Placement Driver) Server: The meta-information management module of the entire TiDB cluster. It is responsible for storing the real-time data distribution of each TiKV node and the overall topology of the cluster, providing TiDB Dashboard control interface, and assigning transaction IDs for distributed transactions.

Storage node

TiKV Server: Responsible for storing data. From the outside, TiKV is a distributed Key-Value storage engine that provides transactions. The basic unit of data storage is Region. Each Region is responsible for storing the data of a Key Range (the left closed and right open interval from StartKey to EndKey), and each TiKV node is responsible for multiple Regions.

TiFlash: TiFlash is a special type of storage node. Unlike ordinary TiKV nodes, inside TiFlash, data is stored in columnar form, and the main function is to accelerate analytical scenes.

Secondly, TiDB 5.0 introduces the MPP architecture through the TiFlash node, which enables large-scale table connection queries to be shared and completed by different TiFlash nodes .

When the MPP mode is turned on, TiDB will determine whether it should be calculated by the MPP framework through the cost. In MPP mode, the table connection will distribute the calculation pressure to each TiFlash execution node by redistributing the JOIN Key data calculation (Exchange operation), so as to achieve the purpose of accelerating calculation. In addition to the aggregation calculations already supported by TiFlash, in the MPP mode, TiDB can push down all the calculations of a query to the TiFlash MPP cluster, thereby accelerating the entire execution process with the help of a distributed environment and greatly improving the speed of analysis and query.

Benchmark test shows that under the scale of TPC-H 100, TiFlash MPP provides significantly faster speed than Greenplum, Apache Spark and other traditional analysis databases or analysis engines on data lakes . With this architecture, large-scale analysis and query of the latest transaction data can be directly carried out, and the performance can surpass traditional offline analysis solutions. At the same time, the test results show that under the same resources of TiDB 5.0, the overall performance of the MPP engine is between two to three times that of Greenplum 6.15.0 and Apache Spark 3.1.1, and the performance difference for some queries can reach 8 times. Has significant advantages in data analysis performance.