Connecting to API and Extracting event data into RAW layer. (part 1)
Hello, there! My name is Lucas Ricci Lucchi and I had the pleasure to work alongside Carl developing a Data Lakehouse (Delta Lake) from the ground up at a company who was upgrading their entire IT systems into the cloud. It was a big project and everyone within said company was involved. My role was to build a solution that connect to an API to extract data into a Data Lakehouse through its layers: Raw, Trusted and Curated. But more about that down below. Let’s dive in! (Get it? ....nvm.)
​
The client in question had a web-based booking system. Whenever a booking was made Azure Service Bus Event was sent. We consumed the Service Bus with an Azure Service Bus Function.
​
This blog post will only focus on the Raw stage. I built pipelines in Azure Synapse Analytics to orchestrate the process. Perhaps in a future post we can look into pipelines further but for now lets just skim through it. ‘Activities’ in the pipeline extract API credentials (parameters) from a DW I hard coded in a table. Authorization credentials was sent to the API and was granted to get an access token for downloading data.
Among these Pipeline activities I also had a Notebook activity. As you might have guessed that activity executed my Notebook, which were written in PySpark.
​
​
​
​
The first Cell in the Notebook handled the input parameters for the API, mentioned above. As they were hard coded into a DW table the fields can be left empty (‘’, “”). However, for illustration I left some example in them how it can look. 'Authorization' is the actual token we get from the API that is valid from a certain amount of time.
Authorization = 'ayAiOiJKV1QiLCJhbGcgRbbuU4519.ayMggTAt35SG6z4'
APIKey = '1e0c250bZZZz'
baseURL = "api.beforeprod.company"
accountname = 'suitableNameOfChoice'
backtime = '2022-01-25 07:00:00'
#setting paths from enviroment
eventhubname = 'event-hub-name-beforeprod'
relativeURL = "/typeofdata-api-befprod/Booking/Get"
TIP: The parameters can also be manually filled in if you only want to run cells/the Notebook separately in order to avoid running an entire Pipeline if you use Activities to extract said parameters from somewhere else. Otherwise, they will be “filled in” when running the pipeline with parameters from DW no matter what you have written in the code above.
Next, we import some libraries to make our life easier.
import json
import datetime
from pyspark.sql.functions import *
from pyspark.sql import *
import requests
import re
import pytz
import threading
import time
from pyspark.sql.types import StringType, StructField,StructType
Adding current local time and UTC timezone.
current_time = datetime.datetime.now(pytz.timezone('Europe/Stockholm'))
current_timeUtc = datetime.datetime.now(pytz.timezone('UTC'))
Now we proceed with using Authorization and getting that sweet, sweet data. As well as establishing file path for out data and JSON files.
Added a missing, and thoroughly needed, datetime field to see when the event was created utilizing EnqueuedTimeUtc. I needed a time field in order to connect my ‘input’ data (when a booking was made) with event data (from Event Hubs) but also other data that had its own pipelines and Notebooks. The date fields were important in this environment when e.g. connecting data later on. And we needed to ensure data quality, thus datetime field was needed.
def threadAPIsave(InputGUID: str, EnqueuedTimeUtc: str, SrcNr: int):
"""
makes the API call for a GUID and saves the result in Raw repository
"""
​
data = {"InputIds":[InputGUID],"SrcIds":[10, 11, 13, 14 ]}
#Get Rest data
url = 'https://' + baseURL + relativeURL + '/'
heads = {
'Content-Type': 'application/json',
'User-Agent': 'PostmanRuntime/7.28.4',
'Authorization':'Bearer {}'.format(Auth),
'Ocp-Apim-Subscription-Key': "{key}".format(key=APIKey)
}
​
try:
response = requests.post(url, headers=heads, json=data)
except:
print("Execution error")
df = spark.read.json(sc.parallelize([response.text]))
#1. Get Bus date and InputID to be inserted into separate path
#create dataframe and fill it with InputGUID and EnqueuedTimeUtc
columns = ["InputID", "EnqueuedTimeUtc"]
data = [(InputGUID, EnqueuedTimeUtc)]
dfBusDate = spark.createDataFrame(data).toDF(*columns)
current_time = datetime.datetime.now(pytz.timezone('Europe/Stockholm'))
#Sets filepath for data files
container_name = 'raw'
filename = 'inputdata_' + current_time.strftime('%H%M%S') + '_' + InputGUID
relative_path = 'FolderA/FolderB/FolderC/FolderD/FolderInput/' + str(current_time.strftime('%Y')) + '/' + str(current_time.strftime('%m')) + '/' + str(current_time.strftime('%d')) + '/' +str(current_time.strftime('%H')) + '/' + filename
outputPath = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path)
#2. Sets filepath for inputEventJSON
filename_bustime = 'inputEventJSON_' + current_time.strftime('%H%M%S') + '_' + InputGUID
relative_path_bustime = 'FolderA/FolderB/FolderC/FolderD/FolderJSON/' + str(current_time.strftime('%Y')) + '/' + str(current_time.strftime('%m')) + '/' + str(current_time.strftime('%d')) + '/' +str(current_time.strftime('%H')) + '/' + filename_bustime
outputPathBus = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path_bustime)
#3. Write a file, using coalesce to force engine to write just one Input file.
if (df.count() != 0):
df.coalesce(1).write.mode('overwrite').json(outputPath)
print("Input Data has been uploaded!")
dfBusDate.coalesce(1).write.mode('overwrite').json(outputPathBus)
print("inputEventJSON file has been uploaded")
else:
print('no print file. no data recieved')
Storing the events in an Avro file-format and in its own event folder.
# get the input events -> create a stream (Later. For now, iterate over the stuff thats there)
container_name = 'raw'
# help navigating the folder tree
dyear = datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%Y')
dmonth = datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%m')
dday = datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%d')
dhour = datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%H')
#just look at the latest hours files, to save time
event_path = 'FolderA/FolderB/FolderC/FolderD/FolderInputEvent/'+ eventhubname + '/inputevent/*/' + dyear + '/' + dmonth +'/' + dday + '/' + dhour + '/*/*.avro'
eventPath ='abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, event_path)
# read the events in folder - but only events newer than latest event
try:
dfread = spark.read.format('avro').load(eventPath).where(to_timestamp('EnqueuedTimeUtc', 'MM/dd/yyyy hh:mm:ss a') > datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S'))
except:
##if the hourly folder does not exist, look at day level, this solves midnight issues.
event_path = 'FolderA/FolderB/FolderC/FolderD/FolderInputEvent/'+ eventhubname + '/inputevent/*/' + dyear + '/' + dmonth +'/' + dday + '/*/*/*.avro'
eventPath ='abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, event_path)
dfread = spark.read.format('avro').load(eventPath).where(to_timestamp('EnqueuedTimeUtc', 'MM/dd/yyyy hh:mm:ss a') > datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S'))
# add a datime formated column to do time stuff
dfWithTimeStamp = dfread.withColumn('timestamp', to_timestamp('EnqueuedTimeUtc', 'MM/dd/yyyy hh:mm:ss a'))
# if we didn't have any new events, set timestamp for next iteration to current timestamp UTC. This allows for hour/day/year changes. Else take latest
timestamp from queue
if dfWithTimeStamp.rdd.isEmpty():
row = [[str(current_timeUtc.strftime('%Y-%m-%d %H:%M:%S'))]]
column = ['currentTime']
newLatestTime = spark.createDataFrame(row, column)
else:
newLatestTime = dfWithTimeStamp.agg({'timestamp': 'max'})
#extract the InputGUID to a InputGUID column
dfinput = dfWithTimeStamp\
.withColumn('InputGUID', regexp_extract('Body', '[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}', 0))\
.withColumn('rowno', row_number().over(Window.partitionBy('InputGUID').orderBy('InputGUID')))\
.where('rowno = 1')\
.select('InputGUID', 'EnqueuedTimeUtc')
Threading step.
#loop through the events, "Git" the inputs and save JSON's.
# Added EnqueuedTimeUtc variable
thread_list = []
for x in dfInput.rdd.collect():
InputGUID = x['InputGUID']
EnqueuedTimeUtc = x['EnqueuedTimeUtc']
SrcNr = 10, 11, 13, 14
thread = threading.Thread(target= threadAPISave, args= (inputGUID, EnqueuedTimeUtc, SrcNr) )
thread_list.append(thread)
#start multi thread
for thread in thread_list:
thread.start()
#wait for all threads to finish
for thread in thread_list:
thread.join()
And to finish it all off. This sends a timestamp when the latest load occurred. So next run proceeds from there.
# return the new timestamp to pipeline
mssparkutils.notebook.exit(newLatestTime.collect()[0][0])
That’s it. I didn’t go into detail about everything. There are more pipelines and notebooks for Trusted and Curated depository. Perhaps that can be blog posts in and of themselves (part 2, 3, ..).
Special thanks to Carl for lending me some space on his blog!
​
Best regards,