Delta Lake Python API - Simple and Reliable Upsert and Delete Operations

-, Ni-Fi data ingestion . Ni-Fi. OTUS - .



.


Delta Lake 0.4.0 Python API Parquet Delta Lake

Delta Lake 0.4.0, Python API, Delta-. :

  • Convert-to-Delta (#78) - Parquet Delta Lake - . Parquet, Delta-. , - Parquet Delta Lake, (, ) Parquet. .

  • SQL - SQL vacuum history. , Spark Delta Lake SQL.

Delta Lake 0.4.0 Delta Lake > , .

Python Python API Delta Lake 0.4.0 Apache Spark™ 2.4.3. , upsert delete, (time travel) vacuum.

Delta Lake

Delta Lake --packages. VACUUM Delta Lake SQL Apache Spark. , :

  • spark.databricks.delta.retentionDurationCheck.enabled=false, vacuum , 7 . SQL VACUUM.

  • spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension Delta Lake SQL Apache Spark; Python API Scala .

#    Spark

./bin/pyspark --packages io.delta:delta-core2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"

Delta Lake

, RITA BTS; - 2014 d3.js Crossfilter, - GraphFrames Apache Spark™. github. pyspark.

#  
tripdelaysFilePath = "/root/data/departuredelays.csv"
pathToEventsTable = "/root/deltalake/departureDelays.delta"

#     
departureDelays = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(tripdelaysFilePath)

departureDelays Delta Lake. Delta Lake, , ACID , , .

#        Delta Lake
departureDelays \
.write \
.format("delta") \
.mode("overwrite") \
.save("departureDelays.delta")

 

, Parquet; format("parquet") format("delta"). , , Delta Lake departureDelays.

/departureDelays.delta$ ls -l
.
..
_delta_log
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet

, _delta_log - , Delta Lake. Delta Lake: .

, DataFrame Delta Lake.

#        Delta Lake
delays_delta = spark \
.read \
.format("delta") \
.load("departureDelays.delta")

#   
delays_delta.createOrReplaceTempView("delays_delta")
 
#      -
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

, -; 1698.

Delta Lake

Parquet, Delta Lake , .. . , .

from delta.tables import *

#     parquet   '/path/to/table'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")

#    parquet   '/path/to/table',      'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")

, , Scala SQL, Delta Lake.

Data Lake , :

  1. , , .

  2. .

  3. .

  4. .

Delta Lake , DELETE. , , (.. delay < 0).

from delta.tables import *
from pyspark.sql.functions import *

#    Delta Lake
deltaTable = DeltaTable.forPath(spark, pathToEventsTable
)
#       
deltaTable.delete("delay < 0") 

#      -
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

( ) , , 837 -. , , , .

/departureDelays.delta$ ls -l
_delta_log
part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet

( delete) , , . Delta Lake , , . , Delta Lake : , , . ( ) , . 

  Data Lake, :

  1. , , .

  2. , /.

  3. , .

  4. .

  5. .

Delta Lake , UPDATE. , , .

#     ,      
deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } ) 

#      -
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

, , 986 , -. departureDelays (, $../departureDelays/ls -l), , 11 ( 8 ).

- . (, ), , , , . Delta Lake merge ( SQL MERGE).

, , .

#     (SEA)  - (SFO)    
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()

 

. , , , (), () ().

merge_table, , , .

items = [(1010710, 31, 590, 'SEA', 'SFO'), (1010521, 10, 590, 'SEA', 'SFO'), (1010822, 31, 590, 'SEA', 'SFO')]
cols = ['date', 'delay', 'distance', 'origin', 'destination']
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()

(merge_table) , :

  1. 1010521: flights ()

  2. 1010710: ()

  3. 1010822: ()

Delta Lake merge, .

#  merge_table  flights
deltaTable.alias("flights") \
    .merge(merge_table.alias("updates"),"flights.date = updates.date") \
    .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
    .whenNotMatchedInsertAll() \
    .execute()

#     (SEA)  - (SFO)    
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()

: , .

, (, ) . , Delta Lake. DeltaTable.history(), .

deltaTable.history().show()
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      2|2019-09-29 15:41:22|  null|    null|   UPDATE|[predicate -> (or...|null|    null|     null|          1|          null|        false|
|      1|2019-09-29 15:40:45|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          0|          null|        false|
|      0|2019-09-29 15:40:14|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|       null|          null|        false|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

SQL:

spark.sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show()

, , ( ​​ ) ( , ):

version

timestamp

operation

operationParameters

2

2019-09-29 15:41:22

UPDATE

[predicate -> (or…

1

2019-09-29 15:40:45

DELETE

[predicate -> [“(…

0

2019-09-29 15:40:14

WRITE

[mode -> Overwrit…

Delta Lake . Delta Lake > Time Travel. , version Timestamp; , version

#  DataFrames   
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")
dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")
dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")

#      (SEA)  - (SFO)    
cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()

#  
print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))

## 
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986

Vacuum

vacuum Delta Lake ( ) 7 (: Delta Lake Vacuum). , 11 .

/departureDelays.delta$ ls -l
_delta_log
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

, vacuum ( 7 ).

        SQL : ¸

#     0 

spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)

vacuum, , .

/departureDelays.delta$ ls -l
_delta_log
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

, vacuum.

?

Delta Lake , Apache Spark 2.4.3 ( ). Delta Lake, ( , ). , https://delta.io/ Delta Lake Slack Google Group. github milestones.


-.


:




All Articles