Appie/products.py

226 lines
10 KiB
Python
Raw Permalink Normal View History

2024-05-31 21:10:24 +02:00
import json
from pyspark import SparkConf
import requests
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,TimestampType
import pyspark.sql.functions as f
from datetime import timedelta,datetime
import pyodbc
class Appie:
def __init__(self,spark:SparkSession):
self.mSpark = spark
self.mToken = ""
self.mAnonToken = ""
self.mHost = "api.ah.nl"
self.mHostPort = 443
self.mHeader = {"User-Agent":"Appie/8.50","Accept":"*/*","X-Application":"AHWEBSHOP"}
self.mSQLHost = '192.168.100.24'
self.mDB = 'appie'
self.mDBUser = 'job'
self.mDBPassword = '1kb3nj0b'
self.mDBODBCDriver = 'MariaDB Unicode'
self.mODBCString = 'DRIVER={'+self.mDBODBCDriver+'};Server='+self.mSQLHost+';Database='+self.mDB+';User='+self.mDBUser+';Password='+self.mDBPassword
self.setupTables()
self.getAnonToken()
self.newToken()
def setupTables(self):
query = """
CREATE TABLE auth
USING jdbc
OPTIONS(
driver 'org.mariadb.jdbc.Driver',
user 'job',
url 'jdbc:mysql://192.168.100.24/appie&permitMysqlScheme',
password '1kb3nj0b',
dbtable 'auth',
database 'appie'
)
"""
self.mSpark.sql("DROP TABLE IF EXISTS auth;")
self.mSpark.sql(query)
query = """
CREATE TABLE products
USING jdbc
OPTIONS(
driver 'org.mariadb.jdbc.Driver',
user 'job',
url 'jdbc:mysql://192.168.100.24/appie&permitMysqlScheme',
password '1kb3nj0b',
dbtable 'products',
database 'appie'
)
"""
self.mSpark.sql("DROP TABLE IF EXISTS products;")
self.mSpark.sql(query)
def getAnonToken(self):
#Do we have a valid token?
df = self.mSpark.read.table("auth")
df = df.where((df.tokenType == "anon_token") & (df.validTill > datetime.now()))
if(df.count() >0):
self.mAnonToken = df.head()["token"]
return df.head()["token"]
tokenJson = self.appiePost("/mobile-auth/v1/auth/token/anonymous","{\"clientId\":\"appie\"}")
self.mAnonToken = tokenJson["access_token"]
validTill = datetime.now() + timedelta(seconds=tokenJson["expires_in"])
schema=StructType([StructField('id', IntegerType(), True),StructField('tokenType', StringType(), True), StructField('token', StringType(), True), StructField('validTill', TimestampType(), True)])
newDf = self.mSpark.createDataFrame(schema=schema,data=[(0,'anon_token',tokenJson["access_token"],validTill)])
newDf.write.mode("append").format("jdbc").saveAsTable("auth")
return self.mAnonToken
def appiePost(self,url,body):
header = self.mHeader
header.update({"Content-type":"application/json"})
reply = requests.post(url="https://"+self.mHost+url,headers=header,data=body)
print(reply.content)
return reply.json()
def newToken(self) -> str:
df = self.mSpark.read.table("auth")
tokenDf = df.where((df.tokenType == "access_token") & (df.validTill > datetime.now()))
tokenDf.show()
if(tokenDf.count() >0):
self.mToken = tokenDf.head()["token"]
return tokenDf.head()["token"]
refreshDF = df.where((df.tokenType == "refresh_token") & (df.validTill > datetime.now()))
refreshDF.show()
refreshToken = refreshDF.head()["token"]
refeshBody = "{\"clientId\":\"appie-android\",\"refreshToken\":\"" + refreshToken + "\"}"
print(refeshBody)
tokenJson = self.appiePost("/mobile-auth/v1/auth/token/refresh",refeshBody)
self.mToken = tokenJson["access_token"]
conn = pyodbc.connect(self.mODBCString)
curr = conn.cursor()
curr.execute("update auth set token=\'"+tokenJson["refresh_token"]+"\' where tokenType=\'refresh_token\'")
curr.commit()
conn.close()
validTill = datetime.now() + timedelta(seconds=tokenJson["expires_in"])
schema=StructType([StructField('id', IntegerType(), True),StructField('tokenType', StringType(), True), StructField('token', StringType(), True), StructField('validTill', TimestampType(), True)])
newDf = self.mSpark.createDataFrame(schema=schema,data=[(0,'access_token',tokenJson["access_token"],validTill)])
newDf.write.mode("append").format("jdbc").saveAsTable("auth")
return self.mToken
def appieGet(self,url:str,token:str) -> requests.Response:
link= "https://"+self.mHost+url
header = self.mHeader
header.update({"Authorization":"Bearer "+token})
reply = requests.get(url=link,headers=header)
return reply
def searchProduct(self,search:str):
page = 0
maxPages =1
while(page <maxPages):
url = "/mobile-services/product/search/v2?query="+search+"&sortOn=RELEVANCE&size=1000&page="+str(page)
productsJson = self.appieGet(url,self.getAnonToken())
maxPages = productsJson.json()["page"]["totalPages"]
page = page +1
productsDict = productsJson.json()["products"]
jsonRDD = self.mSpark.sparkContext.parallelize([json.dumps(productsDict)])
df = self.mSpark.read.json(jsonRDD,multiLine=True).dropDuplicates(["webshopId"])
print(search+": "+str(df.count()))
#if 'unitPriceDescription' not in df.columns:
# df = df.withColumn('f', f.lit(''))
oldDF = self.mSpark.read.format("jdbc").table("products")
oldPricingDf = self.mSpark.read.format("jdbc").table("pricing").drop("pricingDate")
changes = list(set(oldPricingDf.columns).difference(df.columns))
pricingDF = df
for change in changes:
pricingDF = pricingDF.withColumn(change,f.lit(''))
pricingDF = pricingDF.select("webshopId","hqId","unitPriceDescription","currentPrice","priceBeforeBonus")
changes = list(set(oldDF.columns).difference(df.columns))
newDF = df
for change in changes:
newDF = newDF.withColumn(change,f.lit(''))
newDF = newDF.select("webshopId","hqId","title","salesUnitSize","orderAvailabilityStatus","mainCategory","subCategory","brand","shopType")
#pricingDF.withColumn("pricingDate",f.current_timestamp()).write.mode("overwrite").format("jdbc").option("driver","org.mariadb.jdbc.Driver").option("user","job").option("password","1kb3nj0b").option("dbtable","pricing").option("url","jdbc:mariadb://192.168.100.24").option("database","appie").saveAsTable("pricing")
newPricingDf = pricingDF.alias("new").join(oldPricingDf.alias("old"),['hqId','webshopId'],how='left').filter("(old.currentPrice is NULL and old.unitPriceDescription is NULL and old.priceBeforeBonus is NULL) or old.unitPriceDescription != new.unitPriceDescription or old.priceBeforeBonus != new.priceBeforeBonus or old.currentPrice != new.currentPrice").select('hqId', 'webshopId', 'new.unitPriceDescription','new.currentPrice','new.priceBeforeBonus')
newPricingDf = newPricingDf.withColumn("pricingDate",f.current_timestamp())
#newPricingDf.show()
#.format("jdbc").option("driver","org.mariadb.jdbc.Driver").option("user","job").option("password","1kb3nj0b").option("dbtable","pricing").option("url","jdbc:mariadb://192.168.100.24").option("database","appie")
newPricingDf.write.mode("append").format("jdbc").option("driver","org.mariadb.jdbc.Driver").option("user","job").option("password","1kb3nj0b").option("dbtable","pricing").option("url","jdbc:mariadb://192.168.100.24").option("database","appie").saveAsTable("pricing")
cols = ["title","salesUnitSize","orderAvailabilityStatus","mainCategory","subCategory","brand","shopType"]
insertDF = newDF.alias("new").join(oldDF.alias("old"),['hqId','webshopId'],how='left').filter("old.title is NULL").select('new.webshopId','new.hqId',
*(f.coalesce('new.' + col, 'old.' + col).alias(col) for col in cols)
)
#insertDF.show()
#newDF.write.mode("overwrite").option("driver","org.mariadb.jdbc.Driver").option("user","job").option("password","1kb3nj0b").option("dbtable","products").option("url","jdbc:mariadb://192.168.100.24").option("database","appie").saveAsTable("products")
insertDF.write.mode("append").format("jdbc").saveAsTable("products")
def getReceipts(self):
print(self.mToken)
print(self.appieGet("/mobile-services/v1/receipts",self.mToken).content)
def init_spark():
spark = SparkSession.builder.appName("Appie").enableHiveSupport().getOrCreate()
sc = spark.sparkContext
return spark,sc
def main():
spark,sc = init_spark()
apieInstance = Appie(spark)
apieInstance.getReceipts()
apieInstance.searchProduct("brood")
apieInstance.searchProduct("kaas")
apieInstance.searchProduct("zeep")
apieInstance.searchProduct("koek")
apieInstance.searchProduct("chips")
apieInstance.searchProduct("groente")
apieInstance.searchProduct("fruit")
apieInstance.searchProduct("sap")
apieInstance.searchProduct("salade")
apieInstance.searchProduct("pizza")
apieInstance.searchProduct("hagelslag")
apieInstance.searchProduct("wasmiddel")
apieInstance.searchProduct("kruiden")
apieInstance.searchProduct("ijs")
apieInstance.searchProduct("maandverband")
apieInstance.searchProduct("tonijn")
apieInstance.searchProduct("kat")
apieInstance.searchProduct("chocolade")
apieInstance.searchProduct("olie")
if __name__ == '__main__':
main()