Add products.py
This commit is contained in:
commit
2d39466ece
225
products.py
Normal file
225
products.py
Normal file
|
@ -0,0 +1,225 @@
|
|||
import json
|
||||
from pyspark import SparkConf
|
||||
import requests
|
||||
import pyspark.sql
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.conf import SparkConf
|
||||
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,TimestampType
|
||||
import pyspark.sql.functions as f
|
||||
|
||||
from datetime import timedelta,datetime
|
||||
|
||||
import pyodbc
|
||||
|
||||
class Appie:
|
||||
def __init__(self,spark:SparkSession):
|
||||
self.mSpark = spark
|
||||
self.mToken = ""
|
||||
self.mAnonToken = ""
|
||||
self.mHost = "api.ah.nl"
|
||||
self.mHostPort = 443
|
||||
|
||||
self.mHeader = {"User-Agent":"Appie/8.50","Accept":"*/*","X-Application":"AHWEBSHOP"}
|
||||
|
||||
self.mSQLHost = '192.168.100.24'
|
||||
self.mDB = 'appie'
|
||||
self.mDBUser = 'job'
|
||||
self.mDBPassword = '1kb3nj0b'
|
||||
self.mDBODBCDriver = 'MariaDB Unicode'
|
||||
self.mODBCString = 'DRIVER={'+self.mDBODBCDriver+'};Server='+self.mSQLHost+';Database='+self.mDB+';User='+self.mDBUser+';Password='+self.mDBPassword
|
||||
|
||||
self.setupTables()
|
||||
self.getAnonToken()
|
||||
self.newToken()
|
||||
|
||||
|
||||
|
||||
def setupTables(self):
|
||||
|
||||
query = """
|
||||
CREATE TABLE auth
|
||||
USING jdbc
|
||||
OPTIONS(
|
||||
driver 'org.mariadb.jdbc.Driver',
|
||||
user 'job',
|
||||
url 'jdbc:mysql://192.168.100.24/appie&permitMysqlScheme',
|
||||
password '1kb3nj0b',
|
||||
dbtable 'auth',
|
||||
database 'appie'
|
||||
)
|
||||
"""
|
||||
self.mSpark.sql("DROP TABLE IF EXISTS auth;")
|
||||
self.mSpark.sql(query)
|
||||
|
||||
query = """
|
||||
CREATE TABLE products
|
||||
USING jdbc
|
||||
OPTIONS(
|
||||
driver 'org.mariadb.jdbc.Driver',
|
||||
user 'job',
|
||||
url 'jdbc:mysql://192.168.100.24/appie&permitMysqlScheme',
|
||||
password '1kb3nj0b',
|
||||
dbtable 'products',
|
||||
database 'appie'
|
||||
)
|
||||
"""
|
||||
self.mSpark.sql("DROP TABLE IF EXISTS products;")
|
||||
self.mSpark.sql(query)
|
||||
|
||||
|
||||
|
||||
def getAnonToken(self):
|
||||
#Do we have a valid token?
|
||||
df = self.mSpark.read.table("auth")
|
||||
df = df.where((df.tokenType == "anon_token") & (df.validTill > datetime.now()))
|
||||
if(df.count() >0):
|
||||
self.mAnonToken = df.head()["token"]
|
||||
return df.head()["token"]
|
||||
|
||||
tokenJson = self.appiePost("/mobile-auth/v1/auth/token/anonymous","{\"clientId\":\"appie\"}")
|
||||
self.mAnonToken = tokenJson["access_token"]
|
||||
|
||||
validTill = datetime.now() + timedelta(seconds=tokenJson["expires_in"])
|
||||
|
||||
schema=StructType([StructField('id', IntegerType(), True),StructField('tokenType', StringType(), True), StructField('token', StringType(), True), StructField('validTill', TimestampType(), True)])
|
||||
newDf = self.mSpark.createDataFrame(schema=schema,data=[(0,'anon_token',tokenJson["access_token"],validTill)])
|
||||
newDf.write.mode("append").format("jdbc").saveAsTable("auth")
|
||||
return self.mAnonToken
|
||||
|
||||
def appiePost(self,url,body):
|
||||
header = self.mHeader
|
||||
header.update({"Content-type":"application/json"})
|
||||
reply = requests.post(url="https://"+self.mHost+url,headers=header,data=body)
|
||||
print(reply.content)
|
||||
return reply.json()
|
||||
|
||||
|
||||
def newToken(self) -> str:
|
||||
df = self.mSpark.read.table("auth")
|
||||
tokenDf = df.where((df.tokenType == "access_token") & (df.validTill > datetime.now()))
|
||||
tokenDf.show()
|
||||
if(tokenDf.count() >0):
|
||||
self.mToken = tokenDf.head()["token"]
|
||||
return tokenDf.head()["token"]
|
||||
|
||||
refreshDF = df.where((df.tokenType == "refresh_token") & (df.validTill > datetime.now()))
|
||||
refreshDF.show()
|
||||
|
||||
refreshToken = refreshDF.head()["token"]
|
||||
|
||||
refeshBody = "{\"clientId\":\"appie-android\",\"refreshToken\":\"" + refreshToken + "\"}"
|
||||
print(refeshBody)
|
||||
tokenJson = self.appiePost("/mobile-auth/v1/auth/token/refresh",refeshBody)
|
||||
self.mToken = tokenJson["access_token"]
|
||||
|
||||
conn = pyodbc.connect(self.mODBCString)
|
||||
curr = conn.cursor()
|
||||
curr.execute("update auth set token=\'"+tokenJson["refresh_token"]+"\' where tokenType=\'refresh_token\'")
|
||||
curr.commit()
|
||||
conn.close()
|
||||
|
||||
validTill = datetime.now() + timedelta(seconds=tokenJson["expires_in"])
|
||||
|
||||
schema=StructType([StructField('id', IntegerType(), True),StructField('tokenType', StringType(), True), StructField('token', StringType(), True), StructField('validTill', TimestampType(), True)])
|
||||
newDf = self.mSpark.createDataFrame(schema=schema,data=[(0,'access_token',tokenJson["access_token"],validTill)])
|
||||
newDf.write.mode("append").format("jdbc").saveAsTable("auth")
|
||||
return self.mToken
|
||||
|
||||
def appieGet(self,url:str,token:str) -> requests.Response:
|
||||
link= "https://"+self.mHost+url
|
||||
header = self.mHeader
|
||||
header.update({"Authorization":"Bearer "+token})
|
||||
reply = requests.get(url=link,headers=header)
|
||||
|
||||
return reply
|
||||
|
||||
def searchProduct(self,search:str):
|
||||
page = 0
|
||||
maxPages =1
|
||||
while(page <maxPages):
|
||||
url = "/mobile-services/product/search/v2?query="+search+"&sortOn=RELEVANCE&size=1000&page="+str(page)
|
||||
productsJson = self.appieGet(url,self.getAnonToken())
|
||||
maxPages = productsJson.json()["page"]["totalPages"]
|
||||
page = page +1
|
||||
|
||||
productsDict = productsJson.json()["products"]
|
||||
jsonRDD = self.mSpark.sparkContext.parallelize([json.dumps(productsDict)])
|
||||
df = self.mSpark.read.json(jsonRDD,multiLine=True).dropDuplicates(["webshopId"])
|
||||
print(search+": "+str(df.count()))
|
||||
|
||||
#if 'unitPriceDescription' not in df.columns:
|
||||
# df = df.withColumn('f', f.lit(''))
|
||||
|
||||
oldDF = self.mSpark.read.format("jdbc").table("products")
|
||||
oldPricingDf = self.mSpark.read.format("jdbc").table("pricing").drop("pricingDate")
|
||||
|
||||
changes = list(set(oldPricingDf.columns).difference(df.columns))
|
||||
pricingDF = df
|
||||
for change in changes:
|
||||
pricingDF = pricingDF.withColumn(change,f.lit(''))
|
||||
pricingDF = pricingDF.select("webshopId","hqId","unitPriceDescription","currentPrice","priceBeforeBonus")
|
||||
|
||||
changes = list(set(oldDF.columns).difference(df.columns))
|
||||
newDF = df
|
||||
for change in changes:
|
||||
newDF = newDF.withColumn(change,f.lit(''))
|
||||
newDF = newDF.select("webshopId","hqId","title","salesUnitSize","orderAvailabilityStatus","mainCategory","subCategory","brand","shopType")
|
||||
|
||||
#pricingDF.withColumn("pricingDate",f.current_timestamp()).write.mode("overwrite").format("jdbc").option("driver","org.mariadb.jdbc.Driver").option("user","job").option("password","1kb3nj0b").option("dbtable","pricing").option("url","jdbc:mariadb://192.168.100.24").option("database","appie").saveAsTable("pricing")
|
||||
|
||||
newPricingDf = pricingDF.alias("new").join(oldPricingDf.alias("old"),['hqId','webshopId'],how='left').filter("(old.currentPrice is NULL and old.unitPriceDescription is NULL and old.priceBeforeBonus is NULL) or old.unitPriceDescription != new.unitPriceDescription or old.priceBeforeBonus != new.priceBeforeBonus or old.currentPrice != new.currentPrice").select('hqId', 'webshopId', 'new.unitPriceDescription','new.currentPrice','new.priceBeforeBonus')
|
||||
newPricingDf = newPricingDf.withColumn("pricingDate",f.current_timestamp())
|
||||
#newPricingDf.show()
|
||||
#.format("jdbc").option("driver","org.mariadb.jdbc.Driver").option("user","job").option("password","1kb3nj0b").option("dbtable","pricing").option("url","jdbc:mariadb://192.168.100.24").option("database","appie")
|
||||
newPricingDf.write.mode("append").format("jdbc").option("driver","org.mariadb.jdbc.Driver").option("user","job").option("password","1kb3nj0b").option("dbtable","pricing").option("url","jdbc:mariadb://192.168.100.24").option("database","appie").saveAsTable("pricing")
|
||||
|
||||
cols = ["title","salesUnitSize","orderAvailabilityStatus","mainCategory","subCategory","brand","shopType"]
|
||||
|
||||
|
||||
insertDF = newDF.alias("new").join(oldDF.alias("old"),['hqId','webshopId'],how='left').filter("old.title is NULL").select('new.webshopId','new.hqId',
|
||||
*(f.coalesce('new.' + col, 'old.' + col).alias(col) for col in cols)
|
||||
)
|
||||
#insertDF.show()
|
||||
#newDF.write.mode("overwrite").option("driver","org.mariadb.jdbc.Driver").option("user","job").option("password","1kb3nj0b").option("dbtable","products").option("url","jdbc:mariadb://192.168.100.24").option("database","appie").saveAsTable("products")
|
||||
insertDF.write.mode("append").format("jdbc").saveAsTable("products")
|
||||
|
||||
def getReceipts(self):
|
||||
print(self.mToken)
|
||||
print(self.appieGet("/mobile-services/v1/receipts",self.mToken).content)
|
||||
|
||||
def init_spark():
|
||||
spark = SparkSession.builder.appName("Appie").enableHiveSupport().getOrCreate()
|
||||
sc = spark.sparkContext
|
||||
return spark,sc
|
||||
|
||||
def main():
|
||||
spark,sc = init_spark()
|
||||
apieInstance = Appie(spark)
|
||||
|
||||
apieInstance.getReceipts()
|
||||
|
||||
apieInstance.searchProduct("brood")
|
||||
apieInstance.searchProduct("kaas")
|
||||
apieInstance.searchProduct("zeep")
|
||||
apieInstance.searchProduct("koek")
|
||||
apieInstance.searchProduct("chips")
|
||||
apieInstance.searchProduct("groente")
|
||||
apieInstance.searchProduct("fruit")
|
||||
apieInstance.searchProduct("sap")
|
||||
apieInstance.searchProduct("salade")
|
||||
apieInstance.searchProduct("pizza")
|
||||
apieInstance.searchProduct("hagelslag")
|
||||
apieInstance.searchProduct("wasmiddel")
|
||||
apieInstance.searchProduct("kruiden")
|
||||
apieInstance.searchProduct("ijs")
|
||||
apieInstance.searchProduct("maandverband")
|
||||
apieInstance.searchProduct("tonijn")
|
||||
apieInstance.searchProduct("kat")
|
||||
apieInstance.searchProduct("chocolade")
|
||||
apieInstance.searchProduct("olie")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
Loading…
Reference in a new issue