Spark schemaEvolution in practice

Dear readers, good day!



In this article, the leading consultant of the Big Data Solutions business line at Neoflex, describes in detail the options for building variable structure storefronts using Apache Spark.



As part of a data analysis project, the task of building marts based on loosely structured data often arises.



Usually these are logs, or responses from various systems, saved as JSON or XML. The data is uploaded to Hadoop, then you need to build a showcase from them. We can organize access to the created storefront, for example, through Impala.



In this case, the layout of the target storefront is previously unknown. Moreover, the diagram still cannot be drawn up in advance, since it depends on the data, and we are dealing with these very weakly structured data.



For example, today the following answer is logged:



{source: "app1", error_code: ""}


and tomorrow the following answer comes from the same system:



{source: "app1", error_code: "error", description: "Network error"}


As a result, one more field should be added to the storefront - description, and no one knows whether it will come or not.



The task of creating a mart on such data is fairly standard, and Spark has a number of tools for this. Both JSON and XML are supported for parsing raw data, and schemaEvolution support is provided for a previously unknown schema.



At first glance, the solution looks simple. We need to take a folder with JSON and read it into a dataframe. Spark will create a schema and turn the nested data into structures. Then everything needs to be saved in parquet, which is also supported in Impala, by registering the showcase in the Hive metastore.



Everything seems to be simple.



However, it is not clear from the short examples in the documentation what to do with a number of problems in practice.



The documentation describes an approach not for creating a storefront, but for reading JSON or XML into a dataframe.



Namely, it is simply given how to read and parse JSON:



df = spark.read.json(path...)


This is enough to make the data available to Spark.



In practice, the scenario is much more complicated than just reading JSON files from a folder and creating a dataframe. The situation looks like this: there is already a certain showcase, new data comes every day, they need to be added to the showcase, not forgetting that the scheme may be different.



The usual scheme for building a storefront is as follows:



Step 1. Data is loaded into Hadoop, followed by daily reloading and added to a new partition. It turns out the folder with the initial data partitioned by days.



Step 2.During the initializing load, this folder is read and parsed by Spark. The resulting dataframe is saved in a format available for analysis, for example, in parquet, which can then be imported into Impala. This creates a target showcase with all the data that has accumulated up to this point.



Step 3. A download is created that will update the storefront every day.

The question arises of incremental loading, the need for partitioning the showcase, and the question of supporting the general scheme of the showcase.



Let's give an example. Let's say the first step of building the storage is implemented, and the export of JSON files to a folder is configured.



It is not a problem to create a dataframe from them, and then save it as a showcase. This is the very first step that you can easily find in the Spark documentation:



df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)


Everything seems to be fine.



We read and parsed the JSON, then save the dataframe as a parquet, registering it with Hive in any convenient way:



df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')


We get a showcase.



But, the next day, new data from the source was added. We have a folder with JSON, and a showcase created based on this folder. After loading the next chunk of data from the source, the data mart runs out of data for one day.



A logical solution would be to partition the storefront by day, which will allow adding a new partition every next day. The mechanism for this is also well known, Spark allows you to write partitions separately.



First, we do initialization loading, saving the data as described above, adding only partitioning. This action is called storefront initialization and is done only once:



df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)


The next day, we only load a new partition:



df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")


All that remains is to re-register with Hive to update the schema.

However, this is where problems arise.



First problem. Sooner or later, the resulting parquet cannot be read. This has to do with how differently parquet and JSON approach empty fields.



Let's consider a typical situation. For example, JSON arrives yesterday:



 1: {"a": {"b": 1}},


and today the same JSON looks like this:



 2: {"a": null}


Let's say we have two different partitions with one row each.

When we read the entire raw data, Spark will be able to determine the type, and understand that "a" is a field of type "structure", with a nested field "b" of type INT. But, if each partition was saved separately, then a parquet with incompatible partition schemes is obtained:



df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)


This situation is well known, so an option has been specially added to remove empty fields when parsing initial data:



df = spark.read.json("...", dropFieldIfAllNull=True)


In this case, parquet will be made up of partitions that can be read together.

Although those who have done this in practice will laugh bitterly. Why? Because two more situations are likely to arise. Or three. Or four. The first one, which will almost certainly appear, is that numeric types will look different in different JSON files. For example, {intField: 1} and {intField: 1.1}. If such fields are found in one part, the schema merge will read everything correctly, leading to the most accurate type. But if in different, then one will have intField: int, and the other intField: double.



There is the following flag to handle this situation:



df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)


Now we have a folder where partitions are located, which can be read into a single dataframe and a valid parquet for the entire storefront. Yes? No.



Remember that we registered the table in Hive. Hive is not case sensitive in field names, while parquet is case sensitive. Therefore, partitions with schemas: field1: int, and Field1: int are the same for Hive, but not for Spark. Remember to lowercase the field names.



After that, everything seems to be fine.



However, not all so simple. A second, also well-known problem arises. Since each new partition is saved separately, the Spark service files will be in the partition folder, for example, the _SUCCESS operation success flag. This will throw an error when trying to parquet. To avoid this, you need to set up the configuration by disabling Spark from adding service files to the folder:



hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


It seems that now every day a new parquet partition is added to the target storefront folder, where the parsed data for the day is stored. We made sure in advance that there were no partitions with a data type conflict.



But, before us is the third problem. Now the general scheme is not known, moreover, in Hive, the table with the wrong scheme, since each new partition, most likely, introduced distortion in the scheme.



You need to re-register the table. This can be done simply: read the storefront parquet again, take the schema and create a DDL based on it, with which re-register the folder in Hive as an external table, updating the target storefront schema.



We are faced with a fourth problem. The first time we registered the table, we relied on Spark. Now we do it ourselves, and you need to remember that parquet fields can start with characters that are not valid for Hive. For example, Spark throws out lines that it could not parse in the "corrupt_record" field. Such a field cannot be registered with Hive without escaping.



Knowing this, we get the scheme:



f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)


Code ("_corrupt_record", "` _corrupt_record` ") +" "+ f [1] .replace (": "," `:"). Replace ("<", "<` "). Replace (", " , ",` "). replace (" array <`", "array <") makes DDL safe, that is, instead of:



create table tname (_field1 string, 1field string)


With field names such as "_field1, 1field", a secure DDL is made where field names are escaped: create table `tname` (` _field1` string, `1field` string).



The question arises: how to get the dataframe with the full schema correctly (in pf code)? How do I get this pf? This is the fifth problem. Reread the schema of all partitions from the folder with parquet files of the target storefront? This is the safest method, but the hardest one.



The schema is already in Hive. You can get a new schema by combining the schema of the entire table and the new partition. So you need to take the table schema from Hive and combine it with the new partition schema. This can be done by reading the test metadata from Hive, saving it to a temporary folder, and reading both partitions with Spark at once.



Basically, you have everything you need: the original table schema in Hive and a new partition. We also have the data. All that remains is to get a new schema that combines the storefront schema and new fields from the created partition:



from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")


Next, we create the DDL for registering the table, as in the previous snippet.

If the whole chain is working correctly, namely - there was an initializing load, and in Hive there is a correctly created table, then we get an updated table schema.



And the last problem is that you can't just add a partition to the Hive table, as it will be broken. You need to force Hive to fix the partition structure:



from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)


The simple task of reading JSON and creating a storefront from it translates into overcoming a number of implicit difficulties, for which you have to look for solutions separately. Although these solutions are simple, they take a long time to find.



To implement the construction of the showcase, I had to:



  • Add partitions to the storefront, getting rid of service files
  • Deal with empty fields in the original data that Spark has typed
  • Cast simple types to string
  • Convert field names to lowercase
  • Separate data dump and table registration in Hive (DDL creation)
  • Remember to escape field names that might not be compatible with Hive
  • Learn to update the registration of a table in Hive


Summing up, we note that the decision to build showcases is fraught with many pitfalls. Therefore, if difficulties arise in implementation, it is better to contact an experienced partner with successful expertise.



Thank you for reading this article, we hope you find the information useful.



All Articles