Coral: A SQL translation, analysis, and rewrite engine for modern data lakehouses

Co-authors: Walaa Eldin Moustafa, Wenye Zhang, Sushant Raikar, Raymond Lam, Ron Hu, Shardul Mahadik, Laura Chen, Khai Tran, Chris Chen, and Nagarathnam Muthusamy

Introduction

At LinkedIn, our big data compute infrastructure continually grows over time, not only to keep pace with the growth in the number of data applications, or their domains spanning data curation, AI, deep learning, business analytics, and system operations, but also to accommodate:

  • Evolving compute engines and architectures, ranging from vanilla MapReduce to declarative compute engines like Spark, Presto, Hive, and Pig.
  • Evolving storage formats, such as Avro, ORC, and Parquet.
  • Evolving table formats, such as Hive, Iceberg, and other custom table formats that can deliver high value for specific access patterns including incremental compute and machine learning data representations.
  • Evolving table schemas, which constantly change to accommodate new business requirements.
  • Evolving data, which needs to be continuously captured and processed in a timely, yet reliable, manner.
  • Evolving business logic; data producers constantly find themselves having to update the logic or data sources producing their data, with zero dataset consumer downtime, and zero data staleness.

The Dali Catalog is a common data access layer at LinkedIn that enables high agility and velocity for data producers while abstracting data access details from compute engines, and, consequently, from the data consumers. For data consumers, the Dali Catalog provides access to the data in the data lakehouse in the form of “relations.” Broadly, there are two types of relations in the Dali Catalog: Dali tables and Dali views. A Dali table is a relation that references data that is physically stored in files, while a Dali view is a relation that references logic that is applied on base tables, or other views. Like tables, views are accessed from queries by their identifier, and they represent a relation that results from applying their logic. Dali views are a key component of the Dali Catalog. They not only enable data transformation, cleaning, and aggregation from multiple sources transparently, but also add a semantic meaning to the data. With views, data is not just files and hardcoded metadata, but also logic that interrelates the data and metadata in algebraically structured ways, introducing the “intelligence” aspect to the data lakehouse.

Introducing Coral
In this blog post, we focus on Coral, a newly open-sourced SQL translation, analysis, and rewrite engine that we integrate to the Dali Catalog, and use to virtualize views, to improve their accessibility, expressibility, and understandability by the engines, and to better control their behavior.

Coral has proven its value at LinkedIn by making Dali views more user-friendly, agile, secure, and portable. Recently, Coral has also been integrated into Presto, at the request of the open source community.

Dali views

Dali views are views that are registered in the Dali Catalog. Dali views are readable in Hive, Spark, Pig, and Presto. Predominantly, Dali views are written in HiveQL, but they are also expressible in other relational languages. Thanks to Coral, Dali views at LinkedIn enjoy a number of characteristics that are not commonly available in traditional views:

  • Dali views are portable. Their query definition language is not necessarily tied to the underlying engine; they are defined once and reused anywhere.
  • Dali views are agile. This means that their schemas are not rigid. If their underlying tables evolve, the view schemas evolve, too.
  • While Dali views are agile, they also have well-defined schema evolution rules. Schema evolution is not regulation-free, since some schema evolution forms are harmful to downstream consumers. Dali views are built on tables that adhere to strict schema evolution rules.
  • User Defined Functions (UDFs) are first-class citizens in Dali views. Their definitions are managed as part of the view definition and their execution is carried out transparently as part of the view execution, with no explicit UDF registration required from the user.
  • Dali views are accessible like tables. They share many contracts with their Dali table counterparts. They are readable using the same APIs, they can be accessed using strongly typed classes, and many metadata-level access methods that are not traditionally defined for views are also defined for them. Examples include data availability triggers, inference of partition values, inference of documentation of fields, propagation of non-SQL-standard data types, and inference of nullability and case preservation.

Further, Coral unlocks powerful data lakehouse intelligence features such as data governance (e.g., by rewriting a view so it applies data obfuscation or filtering operators), query optimization (e.g., to rewrite a query to reuse materialized views or pre-computed results), data lineage, and data-driven workflow DAG execution. Figure 1 depicts the Coral architecture, illustrating possible query rewrites and inferences through conversion to an intermediate representation. Before diving deeper into Coral components, let us get an inside look into the view lifecycle at LinkedIn.

  • diagram-showing-coral-architecture

Figure 1: Coral architecture

View development lifecycle
At LinkedIn, we built developer tools to streamline the view definition and creation process so that it follows the standard software development lifecycle. Users create Git projects and add view SQL files and Gradle dependency files to declare UDF dependencies and UDF class entry points. Users leverage the standard code review process in order to check in their view definitions. Once a view project is checked in, its artifacts are automatically published to an artifact management repository, and consequently the view is registered in the Dali Catalog. Figure 2 shows a high-level overview of the view authoring and creation process.

  • graphic-showing-the-view-authoring-and-creation-process

Figure 2: View authoring and creation process

When creating views in the Dali Catalog, a deployment process associates the view definition with necessary UDFs that are required to evaluate the view using table properties. Two designated table properties, “dependencies” and “functions,” carry necessary information about the UDFs used in the view. The “dependencies” property references a list of artifact coordinates for UDF jars. Conventionally, UDF jars are self-contained, i.e., a single jar contains all dependencies for a certain UDF. The “functions” property references a map from UDF names to UDF classes. The UDF name is prepended with the view name so that if different versions of the UDF are used in nested views, the names do not collide.

Let us say we have two example tables, Member and Company. Their schemas are as follows:

Member
Id         BIGINT
Name       VARCHAR
CompanyID  BIGINT
Position   VARCHAR

Company
Id BIGINT
Name VARCHAR

For the sake of this example, let us assume that in the Member table, member positions are taken directly from the user’s input. For example, members may express a Senior Engineer position as “Senior Engineer,” “Sr Engineer,” or “Sr Eng.” A standardization UDF, standardize(), standardizes the position to one canonical form, e.g., “Senior Engineer.” Below is the view SQL definition for a view that returns the number of engineers at each level in every company.

SELECT Company.Name, STANDARDIZE(Position) AS StdPosition, COUNT(*) AS Cnt
FROM Member JOIN Company ON Member.CompanyId = Company.Id
WHERE INSTR(STANDARDIZE(Position), ‘Engineer’) > 0
GROUP BY Company.Name, STANDARDIZE(Position)

To create the above view using a Gradle project, users leverage a Dali Gradle plugin that exposes Gradle extensions to help users link UDF references in the view text, e.g., standardize(), to UDF artifacts in artifact repository management systems. For example, in their build scripts, users declare UDF dependencies as follows:

dependencies { 
 udf(‘com.linkedin.standardization:standardization-udfs:1.0.0’) { 
   functions[STANDARDIZE] = ‘com.linkedin.standardization.Standardize’ 
 } 

In the above snippet, ‘com.linkedin.standardization.Standardize’ is the class name of the class that contains the definition of the STANDARDIZE UDF, and ‘com.linkedin.standardization:standardization-udfs:1.0.0’ is the Maven coordinate for the artifact that contains that class.

Once the user’s project is checked in, a view creation pipeline uses the published artifacts and creates the view in the Dali Catalog. Information about the view UDFs is kept alongside the view definition. The following is the equivalent CREATE VIEW statement to the above user logic:

CREATE VIEW CompanyEngLevels
TBLPROPERTIES(‘functions’ = ‘STANDARDIZE:com.linkedin.standardization.Standardize’,
‘dependencies’ = ‘com.linkedin.standardization:standardization-udfs:1.0.0’)
AS SELECT Company.Name, STANDARDIZE(Position) AS StdPosition, COUNT(*) AS Cnt
FROM Member JOIN Company ON Member.CompanyId = Company.Id
WHERE INSTR(STANDARDIZE(Position), ‘Engineer’) > 0
GROUP BY Company.Name, STANDARDIZE(Position)

Coral overview

In the following sections, we describe how Coral achieves view portability by accessing views in the Dali Catalog and making those view definitions accessible in a number of other engines, such as Presto, Spark, and Pig. The discussion revolves around three main aspects:

  • View virtualization: the process of accessing the view definition and converting it to an internal representation, called Coral IR (Intermediate Representation). This step also involves inferring view-level metadata, such as field nullability, case preservation, and non-SQL standard data types, that are leverageable by some engines.
  • View translation and rewrite: the process of rewriting the Coral IR to a representation that is suitable to a target engine (e.g., Presto, Spark, Pig), so the engine can use its native APIs to query the Dali view with UDFs.
  • Integration of Coral to target engines: necessary APIs and integration points with various engines to make Coral fit into the overall compute engine architecture, and its various read APIs.

View virtualization

Dali view definitions are currently stored in the Hive Metastore. The Dali Catalog uses Coral Hive to interface with it. This Coral module has two main purposes:

  • Accessing database, table, and view information: The Hive Metastore organizes tables and views in databases. Each table or view has a schema, which describes the column name and types for that table or view. One of the responsibilities of the Coral Hive module is to access table or view information such as database names, table or view names within a database, table or view schemas, and HiveQL view definitions, as well as their UDF properties, e.g., “functions” and “dependencies.” This information is necessary to enable view parsing, validation, and conversion to intermediate representation.
  • Parsing, validation, and conversion to intermediate representation of HiveQL view definitions: The same module houses a parser (based on the HiveQL parser), a validator, and a converter to intermediate representation. The parser converts HiveQL to an Abstract Syntax Tree (AST), and the validator validates the AST and ensures its semantic correctness. The intermediate representation converter converts the AST to Coral IR, which is based on Apache Calcite’s Relational Algebra Expressions. A relational algebra expression is an expression that operates on relations in a database. It consists of standard operators such as scans, filters, projections, joins, unions, aggregates, etc.

Figure 3 depicts the view virtualization process, converting a HiveQL view definition to Coral IR.

  • diagram-showing-view-virtualization

Figure 3: View virtualization process

Dali Operator Table
Operators of relational algebra expressions are associated with Row Expressions. For example, a relational filter operator is associated with a row filter expression to filter rows from the input relation, and a relational join operator is associated with a join condition to specify which rows of the input relations participate in the output. Row expressions are defined using “row expression operators.” In our running example view, the “INSTR(STANDARDIZE(Position), ‘Engineer’) > 0” is a row expression that is associated with the Filter relational operator to specify the filter condition. Dali Operator Table is a table that Coral uses to define row expression operators, and look them up when required. For each operator, the Dali Operator Table tells the query analyzer what its expected input data types are and how to infer the output data type of the operator. For example, an entry for the “INSTR” function states that the input types of this function are two STRING parameters, and the output type is an INTEGER. This is an example of a “built-in” function definition. The Dali Operator Table also contains similar definitions for user-defined functions. In our running example, it would define the “STANDARDIZE” function as a function whose input type is STRING and output type is STRING as well, in addition to tracking the function metadata such as its entry point in the classpath, e.g., “com.linkedin.standardization.Standardize”. UDF definitions in the Dali Operator Table can be defined statically, or resolved on the fly, i.e., dynamically. In dynamic resolution, Coral calls a UDF’s type resolution API dynamically and infers the type at runtime. 

View translation and rewrite

Coral rewrites view definitions into a number of engine-compliant languages and SQL dialects. Further, during that rewrite, it maps functions to their equivalent ones in the target engines so they are semantically equivalent. In this section, we discuss Coral translation and integration into notable engines, but start with explaining how function rewrite works in Coral, since it is common to all modules.

Function rewrite
There are two types of functions in SQL: built-in functions and user-defined functions. Coral leverages two types of translation methods for those types:

  • Built-in function transformation: This type of transformation rewrites built-in functions whose semantics do not directly match across SQL (or SQL-like) dialects. For example, the INSTR functionality in HiveQL is carried out by the function STRPOS in Presto. Therefore, Coral defines a general built-in rewrite transformation framework where a function in HiveQL, f(a1, a2,.. an), is rewritten as another function g(h1(a1,..an), h2(a1,..an),.. hm(a1,..an)) in another dialect. Such a transformation framework allows for transformations like input value adjustment, function renaming, and output value adjustment. Coral also implements a more general framework to map custom row expression operators directly from one implementation to another.
  • User-defined function transformation: Different engines expose different APIs to define user-defined functions. For example, in Hive, user-defined functions extend the GenericUDF API. In Presto, scalar user-defined functions extend the SqlScalarFunction API. Similarly, Spark user-defined functions can be defined by extending the Expression interface. Given such a disparity of UDF APIs, functions that are written for one engine are not usually executable on another. From Coral’s point of view, making UDFs portable means translating a view’s “dependencies” property to point to the correct artifacts implementing the UDF specifically for a target engine. Coral leverages Transport, a framework for writing user-defined functions in a unified API. While Transport users write their UDF once, Transport produces a number of artifacts, each custom-built for an engine, as if the UDF were written in that engine’s API in the first place. For more details, we encourage you to review the Transport UDFs engineering blog post and the Github repo.

Coral Presto
Coral Presto rewrites Dali view definitions to a Presto-compliant SQL query. After obtaining the Coral IR of a view definition using Coral Hive, Coral Presto performs the conversion through a two-step process:

First, it converts the Coral IR to a Presto-compliant IR, i.e., Presto IR. In this step, the relational algebra expression representation corresponding to the Coral IR is converted to another relational algebra expression that is closer to PrestoSQL. The following are some example operator transformations that are specific to Presto:

  • Converting the array subscript operator [] to the element_at function in Presto. Note that Coral IR array indexes start from 1, so the array keeps the indexes of Coral IR, but adjusts the function call. 
  • Adjusting the escape pattern of regexp_extract.
  • Converting the RLIKE operator to REGEXP_LIKE.
  • Converting the base64 and unbase64 functions to to_base64 and from_base64, respectively.

Second, it converts the Presto-IR to a Presto-SQL view definition. This step converts the Presto-oriented relational algebra expression to a Presto-compliant SQL. For example:

  • When writing scan operators, it ensures using the corresponding SQL SELECT clause contains correct catalog names.
  • When writing lateral join operators, it produces an UNNEST clause following Presto’s expected lateral join syntax. 

Figure 4 depicts the Coral Presto translation process from Coral IR to Presto SQL.

  • diagram-translating-coral-into-presto

Figure 4: Coral Presto translation process

Coral is integrated into Presto through Presto’s Hive Connector. Presto’s Hive Connector provides a Hive Metastore Client which is used to obtain a Coral instance. Coral is integrated to the Hive Connector through a View Reader interface as a mechanism for decoding the view and providing the expanded view text and schema expressed in the Presto type system.

Coral Spark
Coral Spark is the rewrite module for the Spark engine. Similar to Coral Presto, it converts Coral IR to Spark SQL by converting the Coral IR to a Spark-oriented relational algebra expression, then converts that expression to Spark SQL.

Coral integrates with Spark through Spark’s Hive Catalog. Using the Hive Metastore Client, Spark Hive Catalog instantiates a Coral instance. When passed a view name, Coral returns the following to the Spark Catalog:

  • The expanded Spark SQL view text.
  • A list of UDFs to register in the Spark session, along with their metadata. The UDF metadata is obtained from the view “dependencies” and “functions” properties, but also mapped to a Spark Expression version if available. Recall that when using Transport to write UDFs, the Spark version of the UDF extends the Expression API. If a Spark Expression version of the UDF is not available, Coral falls back to the Hive UDF, since Spark can also register and execute Hive UDFs. A module inside the Spark Hive Catalog registers the UDFs on the fly.
  • The Spark Schema of the view. The schema indicates the data types of the view columns as seen at view evaluation time. Therefore, if the underlying tables of a view have evolved at some point to include more fields, Coral view expansion and schema inference captures that evolution and reflects it in the view schema at read time.

Coral Spark: Case preservation and nullability

Spark offers a number of data manipulation APIs beyond just SQL. Such APIs allow for programmatic expression of data manipulation operations, and are defined for the Dataset (or its Row-based version, DataFrame) representation of relations. With such APIs, some concepts become first class citizens:

  • Case preservation and inference: When showing a schema of a view, the case must be preserved and inferred from base tables if no explicit casing is given in the view. For example, if a view V is defined as “SELECT * FROM T”, the casing of T schema should propagate to V. The same applies when the view is based on multiple base tables. Base table schemas, along with their proper casings, should be synthesized through the view logic to provide a proper case version of the view.
  • Nullability: Similar to case preservation, nullability information of the fields in the underlying tables should be propagated through the view logic to provide the nullability information at the view level.

There are a number of gaps in extracting this information from a vanilla Hive-based catalog in Spark:

  • The Hive Metastore drops casing information by storing all field names in lowercase.
  • The Hive Metastore drops nullability information by converting all fields to nullable.
  • There is no mechanism to derive this information at a view level starting from base tables.

To address those gaps, the Dali Catalog annotates Dali Tables with Avro schemas to be able to restore their original casing and nullability. Further, Coral Spark leverages Coral Schema for deriving the casing and nullability information at a view level to Spark, and consequently, to the Dataset (or DataFrame) that the user is interacting with programmatically. Coral provides this information in the Spark Schema, along with the field data types inferred at read time, as discussed in the previous section. A derived Avro Schema of our example view is shown below.

{
  “type” : “record”,
  “name” : “CompanyEngLevels”,
  “namespace” : “coral.schema.companyEngLevels”,
  “fields” : [ {
    “name” : “Name”,
    “type” : [ “null”, “string” ],
    “default” : null
  }, {
    “name” : “StdPosition”,
    “type” : [ “null”, “string” ],
    “default” : null
  }, {
    “name” : “Cnt”,
    “type” : [ “null”, “long” ],
    “default” : null
  } ]
}

Optionally, users can switch back to a vanilla Hive Catalog behavior, where all field names are lowercase, and nullable.

In addition to its role in improving Spark Schema, Coral Schema plays an important role for building strongly typed schemas in Spark applications, extending the data types support beyond the standard SQL types. More information about this schema management aspect can be found in our Spark applications schema management blog post.

Coral Pig
Coral Pig is a module used in the Pig engine to generate executable Pig Latin that produces Dali views. Given a view identifier, Coral Pig converts Coral IR into Pig Latin by mapping the Coral IR relational algebra expression to Pig Latin statements. Unlike the declarative nature of SQL, the procedural linguistic structure of Pig Latin almost resembles the relational algebra expression itself. This produces an intuitive mapping between the relational algebra operators to one or more Pig Latin statements which, when put together in a topologically sorted order, generates a Pig script representing the view. 

Figure 5 illustrates the relational algebra representation of the CompanyEngLevels view and its corresponding Pig Latin conversion.

 

  • diagram-translating-coral-into-pig-latin

Figure 5: Coral Pig translation process

As seen in the example above, the Pig script for the view was generated by sorting the relational expression tree topologically and combining the Pig Latin translation of each individual node. We have achieved this translation in Coral Pig by creating PigOperators that mirror the relational algebra operators. Each PigOperator is responsible for generating the Pig Latin for its relational operator. For example, PigLogicalAggregate is responsible for deriving the Pig Latin of an aggregation operator

We provide a seamless integration between Coral and Pig by transparently introducing view translations into users’ scripts upon execution. Users can access views by simply calling a LOAD of the view.

VIEW = LOAD “CompanyEngLevels” USING PigStorage();

Under the hood, a hook is added to the Pig engine’s parser that traverses and updates the Pig script to locate all LOAD statements on Dali views. For every identified Dali view, we query its view definition from the Dali Catalog, generate its Coral IR, then output its equivalent Pig Latin through Coral Pig. By performing an inline substitution of the original LOAD statement with the Dali view’s Pig Latin, a logically equivalent Pig Script is generated.

M = LOAD “Member” USING …;
MF = FILTER M BY …;
C = LOAD “COMPANY” USING …;
MC = JOIN M BY CompanyId, C BY Id …;
MCG = GROUP MC BY …;
MCGA = FOREACH MCG GENERATE COUNT(MCG) …;
MCGAP = FOREACH MCGA GENERATE …;
VIEW = MCGAP;

Conclusions and future work

Coral’s roadmap is exciting. Already, the team at LinkedIn has worked with the Presto community to integrate Coral’s functionality into the Presto Hive connector, enabling querying complex Hive views in Presto.

There is ongoing work to leverage Coral to analyze and translate Spark Catalyst plans. Further, we look forward to implementing more frontend query APIs, such as ones suitable for querying graph data, defining machine learning features, and programming in type-safe APIs. Work is underway to support evaluating Dali views in non-batch engines, such as streaming and online engines. There is also ongoing work to use Coral to:

  • Automatically synthesize field documentation from base tables.
  • Automatically derive field- and table-level lineage.
  • Automatically derive personally identifiable information (PII) at view level.
  • Automatically decide which views or compute subtrees to materialize.
  • Automatically rewrite a view to leverage materialized views.

Join the community
If any of the above roadmap items resonate with you, or if you have any suggestions or ideas to improve Coral, file an issue at Coral’s Github issues, file a pull request, or simply join the discussion on Coral’s public Slack workspace.

Acknowledgements

Coral is a product of collaboration, support, and contributions of many individuals. We are thankful to Adwait Tumbde, Carl Steinbach, Vasanth Rajamani, Azeem Ahmed, Sumitha Poornachandran, Shrikanth Shankar, Sunitha Beeram, Min Shen, Erik Krogen, Mark Wagner, Pratham Desai, Vikram Shukla, Suren Nihalani, Kai Xu, Jiefan Li, Martin Traverso, David Phillips, Dain Sundstrom, Shirshanka Das, Owen O’Malley, Eric Baldeschwieler, Kapil Surlaker, and Igor Perisic, for all they have done, helped, or contributed to Coral for it to grow and succeed.

Read More

Generated by Feedzy