Merging Data dengan Perintah Insert dan Update Menggunakan Hashing di Python
Berbeda apabila kita melakukan merging menggunakan library pandas. Pendekatan yang akan kita gunakan adalah dengan melakukan merging insert dan update pada database.
Berikut beberapa cara yang akan disajikan untuk melakukan proses penggabungan antara dataset yang sebelumnya sudah ada dengan dataset yang baru masuk dengan menggunakan kondisi penggabungan yang kemudian dilakukan proses inserting atau update berdasarkan tabel yang sesuai. Pada tutorial kali ini akan menggunakan dua sampel data yaitu sampel data user dan new_user. Anggap saya data user merupakan data yang sudah ada (existing data ) sedangkan new_user merupakan data yang masuk secara harian dan secara mendetail.
user_data :
new_user :
Selanjutnya kita akan membuat sebuah kode dimana kode ini akan melakukan import beberapa library yang dibutuhkan dalam proses merging data. Pada proses ini pun menggunakan library hashlib untuk menghasilkan hashing pada beberapa kolom yang digunakan sebagai proses perbandingan.
Percobaan 1
Pada percobaan pertama, dilakukan pendekatan dengan menggunakan fungsi hashing pada kedua data tersebut untuk membuat nilai hasing pada kolom yang berisi nilai string, kemudian kita akan menggunakannya untuk proses perbandingan. Sebelumnya kita pastikan terlebih dahulu bahwa datanya sudah ada di dalam database.
Pertama tama kita masukan library yang dibutuhkan seperti pandas, driver postgresql dan hashlib untuk proses hashingnya.
import pandas as pd
import psycopg2 as psy
import hashlib
Selanjutnya menghubungkan ke RDBMS postgressql dengan perintah dibawah ini.
# connect to postgresqlconnect = psy.connect(
host = 'localhost',
database = 'testing',
user = 'postgres',
password = '*****'
)
cur = connect.cursor()
Kemdian apabila sudah terhubung di postgresqlnya kita dapat melakukan proses insert dan updatenya.Namun sebelumnya kita baca terlebih dahulu dataset yang akan digunakan pada percobaan kali ini
user_data = pd.read_csv('user_data.csv')
new_data = pd.read_csv('new_user.csv')
Selanjutnya kita buat fungsi untuk proses hashingnya
def hash(sourcedf,destinationdf,*column):
columnName = ''
destinationdf['hash_id'] = \
pd.DataFrame(sourcedf[list(column)].values.sum(axis = 1 ))\
[0].str.encode('utf-8').apply(lambda x : \
(hashlib.sha512(x).hexdigest().upper()))
hash(user_data, user_data,'name', 'address')
hash(new_data, new_data,'name', 'address')
Dalam proses hashing pada kolom alasan mengapa menggunakan perintah lambda di bandingkan jika kita menggunakan for loop bisa di lihat di artikel di bawah ini :
Selanjutnya untuk dapat melakukan proses insert dan update mula mula kita akan membagi kedalam dua variabel list yang isinya data data yang akan dilakukan proses update dan list data yang akan dilakukan proses insert (data baru )
#membuat list id dan nilai hash dari data yang ada
user_id = user_data['id'].values.tolist()
user_hash = user_data['hash_id'].values.tolist()# generate update data berdasarkan comparing id di dataset new_data dengan list yang ada pada user_id dan melakukan checking has_id di dataset new_data apakah match dengan user_hash kalau match akan dilakukan update jika tidak berarti akan ada data baru yang masukupdate = new_data[new_data.id.isin(user_id) & \
(~new_data.hash_id.isin(user_hash))]insert =new_data[~new_data.id.isin(user_id)]
insert = insert[["id","name","address"]]
Kedua list yang sudah digenerate kita masukan kedalam fungsi masing masing (update & insert).
Fungsi Insert
def gen_insert (dataframe):
inserts = []
for index, row in dataframe.iterrows():
inserts.append(" INSERT INTO user_data VALUES" + str(tuple(row.values))+ ";")
return insertsfor i in gen_insert(insert):
cur.execute(i)
cur.close()
Fungsi Update
def gen_update (dataframe):
updates = []
for i in range (len(dataframe)):
updates.append(" UPDATE user_data SET name= '%s', address = '%s' where id =%d;"%(dataframe['name'][i], dataframe['address'][i], dataframe['id'][i]) )
return updatesfor i in gen_update(update):
cur.execute(i)
cur.close()
Percobaan 2
Pada percobaan kedua, disini kita akan melakukan proses load data ke dalam stagging terlebih dahulu, setiap kali ada data yang masuk, kita akan melakukan truncate pada tabel existing yang ada di stagging dan lalu menyimpan data yang baru masuk teersebut.
new_data = new_data[["id","name","address"]]
def gen_data_news (dataframe):
insert_news= []
for index, row in dataframe.iterrows():
inserts.append(" INSERT INTO new_data VALUES" + str(tuple(row.values))+ ";")
return insert_newstruncate data
cur.execute ('''truncate table new_data cascade''')
for i in gen_data_news(new_data):
cur.execute(i)
cur.close()# merge with upsertquery = ''' MERGE INTO user_data ud USING new_data nd
ON (ud.id = nd.id AND ud.address = nd.address)
WHEN MATCHED THEN
UPDATE SET ud.name=nd.name,ud.address = nd.address
WHERE ud.id = nd.id;
WHEN NOT MATCHED THEN
INSERT ( ud.id, ud.name, ud.address )
VALUES ( nd.id, nd.name, nd.address )
WHERE ud.id=nd.id'''
cur.execute(query)
cur.close()
FYI proses hashing biasa digunakan untuk melindungi data sensitif dalam berbagai cara misalnya ketika ingin menyembunyikan data sebelum membagikannya atau menggunakannya untuk menyimpan kata sandi.