-, Ni-Fi data ingestion . Ni-Fi. OTUS - .
.
Delta Lake 0.4.0 Python API Parquet Delta Lake
Delta Lake 0.4.0, Python API, Delta-. :
Python API DML (#89) - Python API (update)/(delete)/(merge) ( , vacuum history) Delta Lake. Python, , (SCD - Slowly Changing Dimension), upsert . .
Convert-to-Delta (#78) - Parquet Delta Lake - . Parquet, Delta-. , - Parquet Delta Lake, (, ) Parquet. .
Delta Lake 0.4.0 Delta Lake > , .
Python Python API Delta Lake 0.4.0 Apache Spark™ 2.4.3. , upsert delete, (time travel) vacuum.
Delta Lake
Delta Lake --packages
. VACUUM Delta Lake SQL Apache Spark. , :
spark.databricks.delta.retentionDurationCheck.enabled=false
, vacuum , 7 . SQL VACUUM.
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
Delta Lake SQL Apache Spark; Python API Scala .
# Spark
./bin/pyspark --packages io.delta:delta-core2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
Delta Lake
, RITA BTS; - 2014 d3.js Crossfilter, - GraphFrames Apache Spark™. github. pyspark
.
#
tripdelaysFilePath = "/root/data/departuredelays.csv"
pathToEventsTable = "/root/deltalake/departureDelays.delta"
#
departureDelays = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(tripdelaysFilePath)
departureDelays
Delta Lake. Delta Lake, , ACID , , .
# Delta Lake
departureDelays \
.write \
.format("delta") \
.mode("overwrite") \
.save("departureDelays.delta")
, Parquet; format("parquet")
format("delta")
. , , Delta Lake departureDelays
.
/departureDelays.delta$ ls -l
.
..
_delta_log
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
,
_delta_log
- , Delta Lake. Delta Lake: .
, DataFrame Delta Lake.
# Delta Lake
delays_delta = spark \
.read \
.format("delta") \
.load("departureDelays.delta")
#
delays_delta.createOrReplaceTempView("delays_delta")
# -
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()
, -; 1698.
Delta Lake
Parquet, Delta Lake , .. . , .
from delta.tables import *
# parquet '/path/to/table'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")
# parquet '/path/to/table', 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")
, , Scala SQL, Delta Lake.
Data Lake , :
, , .
.
.
.
Delta Lake , DELETE. , , (.. delay < 0
).
from delta.tables import *
from pyspark.sql.functions import *
# Delta Lake
deltaTable = DeltaTable.forPath(spark, pathToEventsTable
)
#
deltaTable.delete("delay < 0")
# -
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()
( ) , , 837 -. , , , .
/departureDelays.delta$ ls -l
_delta_log
part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet
( delete) , , . Delta Lake , , . , Delta Lake : , , . ( ) , .
Data Lake, :
, , .
, /.
, .
.
.
Delta Lake , UPDATE. , , .
# ,
deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } )
# -
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()
, , 986 , -. departureDelays (, $../departureDelays/ls -l
), , 11 ( 8 ).
- . (, ), , , , . Delta Lake merge ( SQL MERGE).
, , .
# (SEA) - (SFO)
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()
. , , , (), () ().
merge_table
, , , .
items = [(1010710, 31, 590, 'SEA', 'SFO'), (1010521, 10, 590, 'SEA', 'SFO'), (1010822, 31, 590, 'SEA', 'SFO')]
cols = ['date', 'delay', 'distance', 'origin', 'destination']
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()
(merge_table) , :
1010521: flights ()
1010710: ()
1010822: ()
Delta Lake merge, .
# merge_table flights
deltaTable.alias("flights") \
.merge(merge_table.alias("updates"),"flights.date = updates.date") \
.whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
.whenNotMatchedInsertAll() \
.execute()
# (SEA) - (SFO)
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()
: , .
, (, ) . , Delta Lake. DeltaTable.history()
, .
deltaTable.history().show()
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
| 2|2019-09-29 15:41:22| null| null| UPDATE|[predicate -> (or...|null| null| null| 1| null| false|
| 1|2019-09-29 15:40:45| null| null| DELETE|[predicate -> ["(...|null| null| null| 0| null| false|
| 0|2019-09-29 15:40:14| null| null| WRITE|[mode -> Overwrit...|null| null| null| null| null| false|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
SQL:
spark.sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show()
, , ( ) ( , ):
version | timestamp | operation | operationParameters |
2 | 2019-09-29 15:41:22 | UPDATE | [predicate -> (or… |
1 | 2019-09-29 15:40:45 | DELETE | [predicate -> [“(… |
0 | 2019-09-29 15:40:14 | WRITE | [mode -> Overwrit… |
Delta Lake . Delta Lake > Time Travel. , version Timestamp; , version
# DataFrames
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")
dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")
dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")
# (SEA) - (SFO)
cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()
#
print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))
##
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986
Vacuum
vacuum Delta Lake ( ) 7 (: Delta Lake Vacuum). , 11 .
/departureDelays.delta$ ls -l
_delta_log
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
, vacuum ( 7 ).
SQL : ¸
# 0
spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)
vacuum, , .
/departureDelays.delta$ ls -l
_delta_log
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
, vacuum.
?
Delta Lake , Apache Spark 2.4.3 ( ). Delta Lake, ( , ). , https://delta.io/ Delta Lake Slack Google Group. github milestones.