PySpark. Solving the problem of finding sessions

Good afternoon, dear readers! A few days ago, rereading the book by Anthony Molinaro โ€œSQL. A collection of recipes โ€, in one of the chapters I came across a topic that was devoted to determining the beginning and end of the range of consecutive values. Having read the material briefly, I immediately remembered that I had already come across this question as one of the test tasks, but then the topic was declared as โ€œThe task of finding sessionsโ€. The trick of the technical interview was not a review of the work performed, but one of the interviewer's questions about how to get similar values โ€‹โ€‹using Spark. Preparing for the interview, I did not know that the company uses (or maybe not ...) Apache Spark, and therefore did not collect information on a new tool for me at that time. It only remained to put forward a hypothesis that the desired solution could be like a script,which can be written using the Pandas library. Although very remotely, I still hit the target, but I didnโ€™t manage to work in this organization.





To be fair, I want to note that over the years I have made little progress in learning Apache Spark. But I still want to share the best practices with the readers, since many analysts have not come across this tool at all, while others may have a similar interview. If you are a Spark professional, you can always suggest more optimal code in the comments to the post.





This was a preamble, let's proceed directly to the analysis of this topic. Let's go first and write a SQL script. But first, let's create a database and fill it with values. Since this is a demo example, I suggest using SQLite. This database is inferior to more powerful "colleagues in the shop", but its capabilities for script development are enough for us in full. To automate the above operations, I wrote the following code in Python.





#  
import sqlite3

#     
projects = [
    ('2020-01-01', '2020-01-02'),
    ('2020-01-02', '2020-01-03'),
    ('2020-01-03', '2020-01-04'),
    ('2020-01-04', '2020-01-05'),
    ('2020-01-06', '2020-01-07'),
    ('2020-01-16', '2020-01-17'),
    ('2020-01-17', '2020-01-18'),
    ('2020-01-18', '2020-01-19'),
    ('2020-01-19', '2020-01-20'),
    ('2020-01-21', '2020-01-22'),
    ('2020-01-26', '2020-01-27'),
    ('2020-01-27', '2020-01-28'),
    ('2020-01-28', '2020-01-29'),
    ('2020-01-29', '2020-01-30')
]

try:
    #  
    con = sqlite3.connect("projects.sqlite")
    #  
    cur = con.cursor()
    #  
    cur.execute("""CREATE TABLE IF NOT EXISTS projects (
                    proj_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    proj_start TEXT,
                    proj_end TEXT)""")
    #  
    cur.executemany("INSERT INTO projects VALUES(NULL, ?,?)", projects)
    #  
    con.commit()
    #  
    cur.close()
except sqlite3.Error as err:
    print("  ", err)
finally:
    #  
    con.close()
    print("  ")

      
      



. DBeaver. , SQL .





, , , . , - . , . ().





select 
      p3.proj_group, 
      min(p3.proj_start) as date_start,
      max(p3.proj_end) as date_end,
      julianday(max(p3.proj_end))-julianday( min(p3.proj_end))+1 as delta
from
    (select 
	     p2.*,
	     sum(p2.flag)over(order by p2.proj_id) as proj_group
	from 
		(select 
		      p.proj_id , 
		      p.proj_start, 
		      p.proj_end, 
		      case 
		      when lag(p.proj_end)over(order by p.proj_id) = p.proj_start then 0 else 1 
		      end as flag
		from projects as p) as p2) as p3
group by p3.proj_group
      
      



, . . , : . , . , , lag. 0, 1. , . . , .  . , ( julianday SQLite). . Spark.





, Apache Spark         ,  Hadoop. Java, Scala R, Spark PySpark. . Google Colab, . - , . , .





Linux OpenJDK, Spark. . findspark. , .





SQLite , . , .





Spark , . , . -, , , -, . , โ€œ Spark. โ€, , , , .





, , SQL. : , ( datediff).





, . , - , , , SQL Spark. , , . .





from pyspark.sql.functions import lag
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Equivalent of Pandas.dataframe.shift() method
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df.withColumn('lag', F.lag("proj_end").over(w))
#...
# Equivalent of SQL- CASE WHEN...THEN...ELSE... END
df_dataframe = df_dataframe.withColumn('flag',F.when(df_dataframe["proj_start"] == df_dataframe["lag"],0).otherwise(1))
#...
# Cumsum by column flag
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df_dataframe.withColumn("proj_group", F.sum("flag").over(w))
#...
# Equivalent of SQL - GROUP BY
from pyspark.sql.functions import  min, max
df_group = df_dataframe.groupBy("proj_group").agg(min("proj_start").alias("date_start"), \
                                                  max("proj_end").alias("date_end"))
df_group = df_group.withColumn("delta", F.datediff(df_group.date_end,df_group.date_start))
df_group.show()
      
      



.





  1. , . . , โ€œโ€ , .





  2. Even if you have never worked with Spark before, this is not a reason to turn down the competition for a vacant position. The basics of PySpark can be mastered in a short time, provided that the background already has programming experience using the Pandas library.





  3. There is no shortage of books on Spark.





That's all. All health, good luck and professional success!








All Articles