What should we do ... load JSON into Data Platform

Hello everyone!





In a recent article, we discussed how we went towards building our Data Platform. Today I would like to dive deeper into the "stomach" of our platform and along the way tell you about how we solved one of the problems that arose in connection with the growing variety of integrated data sources.





That is, if we return to the final picture from the above article (I specially duplicate it to make it more convenient for dear readers), today we will talk in more depth about the implementation of the "right side" of the scheme - the one that lies after Apache NiFi.





A diagram from our previous article.
A diagram from our previous article.

As a reminder, our company has over 350 relational databases. Naturally, not all of them are “unique” and many are, in fact, different copies of the same system installed in all stores of the trading network, but still there is a “zoo of diversity”. Therefore, one cannot do without any Framework that simplifies and accelerates the integration of sources into the Data Platform.





The general scheme for delivering data from sources to the Greenplum ODS layer using the framework we developed is shown below:





General scheme of data delivery to the Greenplum ODS layer
ODS- Greenplum
  1. - Kafka AVRO-, Apache NiFi, parquet S3.





  2. «» Spark’ :





    1. Compaction – ( «»), : distinct() coalesce(). S3. parsing' , « »;





    2. Parsing – , . , ( gzip) CSV- S3.





  3. – CSV- ODS- : external table S3 PXF S3 connector, pgsql ODS- Greenplum





  4. Airflow.





DAG’ Airflow . Parsing . , , :





  • ODS- - ;





  • Git YAML-:





    • ( : , , S3-, , email ..);





    • ODS ( , , ODS- ). , ;





, . , , JSON-. , MongoDB MongoDB Kafka source connector Kafka. framework’ . , S3 JSON - " ", parquet Apache NiFi.





Compaction. , «» , :





df = spark.read.format(in_format) \
               .options(**in_options) \
               .load(path) \
               .distinct()    
new_df = df.coalesce(div)
new_df.write.mode("overwrite") \ 
            .format(out_format) \
            .options(**out_options) \
            .save(path)
      
      



JSON-, - , JSON’ Spark mergeSchema, .. , . – , - . « ».





-, , , S3. :





JSON- DataFrame , JSON-.





. , :





file1:





{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
      
      



. JSON-, 1 = 1 . , , JSON-, JSON-. JSON- S3 ( " Apache NiFi).





:





#  
df = spark.read \
          .format("csv") \
          .option("sep", "\a") \
          .load("file1.json")

#   DataFrame
df.printSchema()

root
 |-- _c0: string (nullable = true)

#  
df.show()

+--------------------+
|                 _c0|
+--------------------+
|{"productId": 1, ...|
|{"productId": 2, ...|
+--------------------+
      
      



JSON CSV, , . , Bell character. DataFrame , dicstinct() coalesce(), . :





#  parquet
in_format = "parquet"
in_options = {}

#  JSON
in_format = "csv"
in_options = {"sep": "\a"}
      
      



DataFrame S3 :





df.write.mode("overwrite") \   
        .format(out_format) \
				.options(**out_options) \  
				.save(path)  

#  JSON     
out_format = "text" 
out_options = {"compression": "gzip"}  

#  parquet   
out_format = input_format 
out_options = {"compression": "snappy"}
      
      



Parsing. , , : JSON -, parquet, . , JSON- Spark , , JSON- , mergeSchema. . , - «field_1», , , . Spark DataFrame , Parsing, , - - , .





. , :





file1 ( ):





{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
      
      



file2:





{«productId»: 3, «productName»: «ProductName 3», «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5, «package»: [10, 20.5, 30]}}
      
      



Spark’ DataFrame:





df = spark.read \
          .format("json") \
          .option("multiline", "false") \
          .load(path)
df.printSchema()
df.show()
      
      



( ):





root
 |-- dimensions: struct (nullable = true)
 |    |-- height: double (nullable = true)
 |    |-- length: long (nullable = true)
 |    |-- width: long (nullable = true)
 |-- price: double (nullable = true)
 |-- productId: long (nullable = true)
 |-- productName: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------+-----+---------+-------------+--------------+
|    dimensions|price|productId|  productName|          tags|
+--------------+-----+---------+-------------+--------------+
|[12.5, 10, 12]| null|        1|ProductName 1|[tag 1, tag 2]|
|[12.5, 10, 12]|10.01|        2|         null|[tag 1, tag 2]|
+--------------+-----+---------+-------------+--------------+
      
      



( ):





root
 |-- dimensions: struct (nullable = true)
 |    |-- height: double (nullable = true)
 |    |-- length: long (nullable = true)
 |    |-- package: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- width: long (nullable = true)
 |-- productId: long (nullable = true)
 |-- productName: string (nullable = true)

+--------------------+---------+-------------+
|          dimensions|productId|  productName|
+--------------------+---------+-------------+
|[12.5, 10, [10.0,...|        3|ProductName 3|
+--------------------+---------+-------------+
      
      



, Spark . - , , DataFrame null ( price productName ).





, , ( ) ,





root
 |-- price: double (nullable = true)
 |-- productId: long (nullable = true)
 |-- productName: string (nullable = true)
      
      



«- file2», «price» , Spark , «price» DataFrame. parquet- , parquet- AVRO, , , parquet-.





, , , framework’, - JSON’ – JSON- S3.





, JSON- JSON- . JSON’ - , DataFrame , null:





df = spark.read \
          .format("json") \
          .option("multiline","false") \
          .schema(df_schema) \
          .load(path)
      
      



- YAML- . , Kafka, , Kafka Schema Registry, JSON ( , , Kafka Schema Registry ).





, :





  • Kafka Schema Registry





  • pyspark.sql.types.StructType – - :





# 1.   Kafka Schema Registry REST API   
# 2.     schema  :
df_schema = StructType.fromJson(schema)
      
      



  • JSON-





, … JSON-, Spark’. JSON file2 . JSON , :





df.schema.json()  
      
      



{
    "fields":
    [
        {
            "metadata": {},
            "name": "dimensions",
            "nullable": true,
            "type":
            {
                "fields":
                [
                    {"metadata":{},"name":"height","nullable":true,"type":"double"},
                    {"metadata":{},"name":"length","nullable":true,"type":"long"},
                    {"metadata":{},"name":"width","nullable":true,"type":"long"}
                ],
                "type": "struct"
            }
        },
        {
            "metadata": {},
            "name": "price",
            "nullable": true,
            "type": "double"
        },
        {
            "metadata": {},
            "name": "productId",
            "nullable": true,
            "type": "long"
        },
        {
            "metadata": {},
            "name": "productName",
            "nullable": true,
            "type": "string"
        },
        {
            "metadata": {},
            "name": "tags",
            "nullable": true,
            "type":
            {
                "containsNull": true,
                "elementType": "string",
                "type": "array"
            }
        }
    ],
    "type": "struct"
}

      
      



, JSON-.





« , JSON- , Spark’» - … , , , :





DataFrame JSON,





https://github.com/zalando-incubator/spark-json-schema, , Scala, pySpark …





, SchemaConverter. – . , «» - .





, , JSON. DataPlatform : NiFi Kafka, parquet, « » NiFi AVRO-schema, S3. - - -:





, :)
root
 |-- taskId: string (nullable = true)
 |-- extOrderId: string (nullable = true)
 |-- taskStatus: string (nullable = true)
 |-- taskControlStatus: string (nullable = true)
 |-- documentVersion: long (nullable = true)
 |-- buId: long (nullable = true)
 |-- storeId: long (nullable = true)
 |-- priority: string (nullable = true)
 |-- created: struct (nullable = true)
 |    |-- createdBy: string (nullable = true)
 |    |-- created: string (nullable = true)
 |-- lastUpdateInformation: struct (nullable = true)
 |    |-- updatedBy: string (nullable = true)
 |    |-- updated: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- employeeId: string (nullable = true)
 |-- pointOfGiveAway: struct (nullable = true)
 |    |-- selected: string (nullable = true)
 |    |-- available: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- dateOfGiveAway: string (nullable = true)
 |-- dateOfGiveAwayEnd: string (nullable = true)
 |-- pickingDeadline: string (nullable = true)
 |-- storageLocation: string (nullable = true)
 |-- currentStorageLocations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- customerType: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- totalAmount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- stockDecrease: boolean (nullable = true)
 |-- offline: boolean (nullable = true)
 |-- trackId: string (nullable = true)
 |-- transportationType: string (nullable = true)
 |-- stockRebook: boolean (nullable = true)
 |-- notificationStatus: string (nullable = true)
 |-- lines: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lineId: string (nullable = true)
 |    |    |-- extOrderLineId: string (nullable = true)
 |    |    |-- productId: string (nullable = true)
 |    |    |-- lineStatus: string (nullable = true)
 |    |    |-- lineControlStatus: string (nullable = true)
 |    |    |-- orderedQuantity: double (nullable = true)
 |    |    |-- confirmedQuantity: double (nullable = true)
 |    |    |-- assignedQuantity: double (nullable = true)
 |    |    |-- pickedQuantity: double (nullable = true)
 |    |    |-- controlledQuantity: double (nullable = true)
 |    |    |-- allowedForGiveAwayQuantity: double (nullable = true)
 |    |    |-- givenAwayQuantity: double (nullable = true)
 |    |    |-- returnedQuantity: double (nullable = true)
 |    |    |-- sellingScheme: string (nullable = true)
 |    |    |-- stockSource: string (nullable = true)
 |    |    |-- productPrice: double (nullable = true)
 |    |    |-- lineAmount: double (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- markingFlag: string (nullable = true)
 |    |    |-- operations: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- operationId: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- reason: string (nullable = true)
 |    |    |    |    |-- quantity: double (nullable = true)
 |    |    |    |    |-- dmCodes: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- timeStamp: string (nullable = true)
 |    |    |    |    |-- updatedBy: string (nullable = true)
 |    |    |-- source: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- items: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- assignedQuantity: double (nullable = true)
 |-- linkedObjects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- objectType: string (nullable = true)
 |    |    |-- objectId: string (nullable = true)
 |    |    |-- objectStatus: string (nullable = true)
 |    |    |-- objectLines: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- objectLineId: string (nullable = true)
 |    |    |    |    |-- taskLineId: string (nullable = true)

      
      



, , -, Avro- JSON-. : , «» . , , ( ) , JSON-, Kafka Schema Registry, «, ».





SparkJsonSchemaConverter – , definitions, refs ( ) oneOf. , «» JSON- pyspark.sql.types.StructType





, , Open Source, , , , Open Source . . Open Source , , !





SparkJsonSchemaConverter’ Parsing «» S3: ( ) S3 -:





#  JSON
df = spark.read.format(in_format)\
            .option("multiline", "false")\
            .schema(json_schema) \
            .load(path)

#  parquet:
df = spark.read.format(in_format)\
            .load(path)
      
      



, DataFrame’ CSV-.





framework’ Data Platform JSON- . :





  • 4 JSON-!





  • « » framework’, , «» .








All Articles