I want to know everything about the client! Or how to enrich dry DWH facts with digital paths and client properties from Amplitude

The Betting League corporate repository was created long before the introduction of Amplitude . It is used primarily by analysts and researchers. Products and marketers turned to analysts to get analytics from the warehouse because it requires programming skills.







DWH Facts has always lacked something of a grocery, digital vision in products that would spy on customers and give us insight into its pathways. With the advent of Amplitude in the company, we began to understand the value of the accumulated data in the system and it is very cool to use it in Amplitude itself, but the symbiosis of the two DWH and Amplitude systems did not give rest. We, of course, implemented the mechanics of data transfer from Amplitude for in-house analysis in a corporate warehouse and made instructions for setting up data transfer from Amplitude to DWH. We also invite you to the Betting League and Adventum webinar about the analysis and optimization of conversions in the product .







image







How DWH Data Combining Can Help



1. . DWH, .







2. . .







3. . , API . .







Amplitude DWH



Amplitude API . . . , , , . . , , UTC — , .







. Python, SQL . ! Amplitude , .







, — Amplitude . , CSV, ETL .







ETL — Extract, Transform, Load. , , DWH .







. , . , , .







Python 3.7 . , flow- (, , dag), , Windows. .bat ( ). , .







1.



# 
import os
import requests
import pandas as pd
import zipfile
import gzip
import time
from tqdm import tqdm
import pymssql
import datetime
import pyodbc
from urllib.parse import quote_plus
from sqlalchemy import create_engine, event
      
      





2.



, , . , .







#   
os.chdir("C:\Agents\AMPL\TEMP") #    
dts1 = time.time() #       
a = time.time() #  
now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") #    
      
      





3. API Amplitude



, (Settings => Project = > General).







#    API 
api_key = ''
secret_key = '  '
      
      





4. ()



, , . SQL , . yyyymmddThh (. . ). API , .







#     DWH (SQL)
server = " "
user = ""
password = ""

#     
conn = pymssql.connect(server, user, password, " ")
cursor = conn.cursor()
cursor.execute("   .    select")
      
      





5.



API Amplitude. .







#        
for row in cursor: 
    dt = row[0]
conn.close()   
      
      





6.



, . , , , , .







#   ,     
filename = 'AMPL_PROD_'+ dt + '_' + now

#  ,     \\  WIN
#      ,      os.chdir
working_dir = os.getcwd() + '\\'

      
      





7. SQL



SQL. , .







#    DWH (SQL). ,           
server = ' '
database = ' '
schema = ' '
table_to_export = ' '

#    DWH (SQL)
params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';'
quoted = quote_plus(params)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
      
      





8. Amplitude



Amplitude , , json .







#     API ,     json
class GetFile():

    def __init__(self, now, dt, api_key, secret_key, filename, working_dir):

        self.now = now
        self.dt = dt
        self.api_key = api_key
        self.secret_key = secret_key
        self.filename = filename
        self.working_dir = working_dir

    def response(self):
        """
           
        """
        print('   !', end='\n')
        count = 0
        while True:
            count += 1
            print(f' {count}.....', end='')
            try:
                response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt,
                                        auth=(self.api_key, self.secret_key),
                                        timeout=10)
                print('', end='\n') 
                print('1.    ', end='\n')
                break
            except:
                print('', end='\n')
                time.sleep(1)

        return response

    def download(self):
        '''
           
        '''
        with open(working_dir + self.filename + '.zip', "wb") as code:
            file = self.response()
            print('2.    .....', end='')           
            code.write(file.content)
        print('OK', end='\n')

    def extraction(self):
        '''
             
        '''
        z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r')
        z.extractall(path=self.working_dir + self.filename)
        print('3.         ' + self.filename)

    def getfilename(self):
        '''
            
        '''
        return ': {} \n : {}'.format(self.filename, self.working_dir + self.filename + '.zip')

def unzip_and_create_df(working_dir, filename):
        '''
         JSON.gz   json     (   )
         ,    .
        '''
        directory = working_dir + filename + '\\274440'
        files = os.listdir(directory)
        df = pd.DataFrame()
        print('  :')
        time.sleep(1)
        for i in tqdm(files):
            with gzip.open(directory + '\\' + i) as f:
                add = pd.read_json(f, lines=True)
            df = pd.concat([df, add], axis=0)
        time.sleep(1)    
        print('4. JSON         dataframe')
        return df

#    
file = GetFile(now, dt, api_key, secret_key, filename, working_dir)

#   (      )
file.download()

#  gz-   
file.extraction()

#   DataFrame    json.gz
adf = unzip_and_create_df(working_dir, filename)

      
      





9. ()



, . . , SQL. .







#    
print('5.    ,  , , .....', end='')

#   DWH        
#       -   
sql_query_columns = ("""
                        '             '
                    """)

settdf = pd.read_sql_query(sql_query_columns, new_con)

#   lower()  (= )   SAVE_COLUMN_NAME  dwh
#   , lower()       
settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower())
adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns]

#   
needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()]

#     
needed_columns.append('DOWNLOAD_FILE_NAME')

#    DF c  
adf['DOWNLOAD_FILE_NAME'] = filename

#   ( , ,  )
adf.reset_index(inplace=True)

#    ( )   ,   
adf = adf.astype('unicode_').where(pd.notnull(adf), None)

#  DataFrame    
df_to_sql = adf[needed_columns]

#     
print('OK', end='\n')
      
      





10.



. .







#    DWH
#   
dts2 = time.time()
print('6.    ...', end='')

#      DWH
connection = pyodbc.connect(params)
engine = create_engine(new_con)

#   ()    DWH (   -  )
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

#  None  RAM
df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False)

#    
connection.close() 
print('OK', end='\n')

dtf = time.time()
diff1, diff2 = [str(int((dtf - i) // 60)) + '  ' + str(int((dtf - i) % 60)) + ' ' for i in (dts1, dts2)]
print(f' : {diff1},   : {diff2}')
print(' ,   ')

      
      





11.



! . .







#    
#     
conn2 = pymssql.connect(server, user, password, "  ")
cursor2 = conn2.cursor()
query = "      ,  ")

#  
cursor2.execute(query)

#    
conn2.commit()
conn2.close()

      
      





12.



, . . , ? , ETL , .







print('  ')

#       ETL       
conn3 = pymssql.connect(server, user, password, "  ")
cursor3 = conn3.cursor()
query = " ETL   .  EXEC dbo.SP"

cursor3.execute(query)

conn3.commit()
conn3.close()
      
      





13.



, .







#      
b = time.time()
diff = b-a
minutes = diff//60
print('  : {:.0f} ()'.format( minutes))

      
      





. — , .







, ETL, . , - , .







, Amplitude, . s2s , .







20 17:00 . . , .








All Articles