top of page

Connecting to API and Extracting event data into RAW layer. (part 1)

Hello, there! My name is Lucas Ricci Lucchi and I had the pleasure to work alongside Carl developing a Data Lakehouse (Delta Lake) from the ground up at a company who was upgrading their entire IT systems into the cloud. It was a big project and everyone within said company was involved. My role was to build a solution that connect to an API to extract data into a Data Lakehouse through its layers: Raw, Trusted and Curated. But more about that down below. Let’s dive in! (Get it? ....nvm.)

​

The client in question had a web-based booking system. Whenever a booking was made Azure Service Bus Event was sent. We consumed the Service Bus with an Azure Service Bus Function.

​

This blog post will only focus on the Raw stage. I built pipelines in Azure Synapse Analytics to orchestrate the process. Perhaps in a future post we can look into pipelines further but for now lets just skim through it. ‘Activities’ in the pipeline extract API credentials (parameters) from a DW I hard coded in a table. Authorization credentials was sent to the API and was granted to get an access token for downloading data.

Among these Pipeline activities I also had a Notebook activity. As you might have guessed that activity executed my Notebook, which were written in PySpark.

​

​

​

​

The first Cell in the Notebook handled the input parameters for the API, mentioned above. As they were hard coded into a DW table the fields can be left empty (‘’, “”). However, for illustration I left some example in them how it can look. 'Authorization' is the actual token we get from the API that is valid from a certain amount of time.

Authorization = 'ayAiOiJKV1QiLCJhbGcgRbbuU4519.ayMggTAt35SG6z4' 

APIKey = '1e0c250bZZZz' 

baseURL = "api.beforeprod.company"

accountname = 'suitableNameOfChoice'

backtime = '2022-01-25 07:00:00' 

#setting paths from enviroment

eventhubname = 'event-hub-name-beforeprod'

relativeURL = "/typeofdata-api-befprod/Booking/Get" 

TIP: The parameters can also be manually filled in if you only want to run cells/the Notebook separately in order to avoid running an entire Pipeline if you use Activities to extract said parameters from somewhere else. Otherwise, they will be “filled in” when running the pipeline with parameters from DW no matter what you have written in the code above.

FullCode

Next, we import some libraries to make our life easier. 

import json

import datetime

from pyspark.sql.functions import *

from pyspark.sql import *

import requests

import re

import pytz

import threading

import time

from pyspark.sql.types import StringType, StructField,StructType

Adding current local time and UTC timezone.

current_time = datetime.datetime.now(pytz.timezone('Europe/Stockholm'))

current_timeUtc = datetime.datetime.now(pytz.timezone('UTC'))

Now we proceed with using Authorization and getting that sweet, sweet data. As well as establishing file path for out data and JSON files.
Added a missing, and thoroughly needed, datetime field to see when the event was created utilizing EnqueuedTimeUtc. I needed a time field in order to connect my ‘input’ data (when a booking was made) with event data (from Event Hubs) but also other data that had its own pipelines and Notebooks. The date fields were important in this environment when e.g. connecting data later on. And we needed to ensure data quality, thus datetime field was needed.

def threadAPIsave(InputGUID: str, EnqueuedTimeUtc: str, SrcNr: int): 

    """

    makes the API call for a GUID and saves the result in Raw repository

    """

​

    data = {"InputIds":[InputGUID],"SrcIds":[10, 11, 13, 14 ]} 

    #Get Rest data

    url = 'https://' + baseURL + relativeURL + '/'

    heads = {

        'Content-Type': 'application/json',

        'User-Agent': 'PostmanRuntime/7.28.4',

        'Authorization':'Bearer {}'.format(Auth),

        'Ocp-Apim-Subscription-Key': "{key}".format(key=APIKey)

        }

​

    try:

        response = requests.post(url, headers=heads, json=data)

    except:

        print("Execution error")

 

    df = spark.read.json(sc.parallelize([response.text]))

 

    #1. Get Bus date and InputID to be inserted into separate path

    #create dataframe and fill it with InputGUID and EnqueuedTimeUtc

    columns = ["InputID", "EnqueuedTimeUtc"]

    data = [(InputGUID, EnqueuedTimeUtc)]

    dfBusDate = spark.createDataFrame(data).toDF(*columns) 

    current_time = datetime.datetime.now(pytz.timezone('Europe/Stockholm'))

 

        #Sets filepath for data files

    container_name = 'raw'

    filename = 'inputdata_' + current_time.strftime('%H%M%S') + '_' + InputGUID   

    relative_path = 'FolderA/FolderB/FolderC/FolderD/FolderInput/' + str(current_time.strftime('%Y')) + '/' + str(current_time.strftime('%m')) + '/' + str(current_time.strftime('%d')) + '/' +str(current_time.strftime('%H')) + '/' + filename

    outputPath = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path)

 

    #2. Sets filepath for inputEventJSON

    filename_bustime = 'inputEventJSON_' + current_time.strftime('%H%M%S') + '_' + InputGUID 

    relative_path_bustime = 'FolderA/FolderB/FolderC/FolderD/FolderJSON/' + str(current_time.strftime('%Y')) + '/' + str(current_time.strftime('%m')) + '/' + str(current_time.strftime('%d')) + '/' +str(current_time.strftime('%H')) + '/' + filename_bustime

    outputPathBus = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path_bustime)

 

    #3. Write a file, using coalesce to force engine to write just one Input file.

    if (df.count() != 0):

       df.coalesce(1).write.mode('overwrite').json(outputPath)

       print("Input Data has been uploaded!")

       dfBusDate.coalesce(1).write.mode('overwrite').json(outputPathBus)

       print("inputEventJSON file has been uploaded")

    else:

        print('no print file. no data recieved')

Storing the events in an Avro file-format and in its own event folder.

# get the input events -> create a stream (Later. For now, iterate over the stuff thats there)

container_name = 'raw'

 

# help navigating the folder tree

dyear = datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%Y')

dmonth = datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%m')

dday = datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%d')

dhour = datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%H')

 

#just look at the latest hours files, to save time

event_path = 'FolderA/FolderB/FolderC/FolderD/FolderInputEvent/'+ eventhubname + '/inputevent/*/' + dyear + '/' + dmonth +'/' + dday + '/' + dhour + '/*/*.avro'

 

eventPath ='abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, event_path)

 

#  read the events in folder - but only events newer than latest event

try:

    dfread = spark.read.format('avro').load(eventPath).where(to_timestamp('EnqueuedTimeUtc', 'MM/dd/yyyy hh:mm:ss a')  > datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S'))

except:

    ##if the hourly folder does not exist, look at day level, this solves midnight issues.

    event_path = 'FolderA/FolderB/FolderC/FolderD/FolderInputEvent/'+ eventhubname + '/inputevent/*/' + dyear + '/' + dmonth +'/' + dday + '/*/*/*.avro'

    eventPath ='abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, event_path)

    dfread = spark.read.format('avro').load(eventPath).where(to_timestamp('EnqueuedTimeUtc', 'MM/dd/yyyy hh:mm:ss a')  > datetime.datetime.strptime(daysBackInTime, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S'))

 

# add a datime formated column to do time stuff

dfWithTimeStamp = dfread.withColumn('timestamp', to_timestamp('EnqueuedTimeUtc', 'MM/dd/yyyy hh:mm:ss a'))

 

# if we didn't have any new events, set timestamp for next iteration to current timestamp UTC. This allows for hour/day/year changes. Else take latest 

timestamp from queue

if dfWithTimeStamp.rdd.isEmpty():

     row = [[str(current_timeUtc.strftime('%Y-%m-%d %H:%M:%S'))]]

     column = ['currentTime']

     newLatestTime = spark.createDataFrame(row, column)

else:

     newLatestTime = dfWithTimeStamp.agg({'timestamp': 'max'})

 

#extract the InputGUID to a InputGUID column

dfinput = dfWithTimeStamp\

     .withColumn('InputGUID', regexp_extract('Body', '[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}', 0))\

     .withColumn('rowno', row_number().over(Window.partitionBy('InputGUID').orderBy('InputGUID')))\

     .where('rowno = 1')\

     .select('InputGUID', 'EnqueuedTimeUtc')

Threading step.

#loop through the events, "Git" the inputs and save JSON's.

# Added EnqueuedTimeUtc variable

 

thread_list = []

 

for x in dfInput.rdd.collect():

    InputGUID = x['InputGUID']

    EnqueuedTimeUtc = x['EnqueuedTimeUtc']

    SrcNr = 10, 11, 13, 14   

    thread = threading.Thread(target= threadAPISave, args= (inputGUID, EnqueuedTimeUtc, SrcNr) )

    thread_list.append(thread)

 

#start multi thread

for thread in thread_list:

    thread.start()

 

#wait for all threads to finish

for thread in thread_list:

    thread.join()

And to finish it all off. This sends a timestamp when the latest load occurred. So next run proceeds from there.

# return the new timestamp to pipeline

mssparkutils.notebook.exit(newLatestTime.collect()[0][0])

That’s it. I didn’t go into detail about everything. There are more pipelines and notebooks for Trusted and Curated depository. Perhaps that can be blog posts in and of themselves (part 2, 3, ..).

 

Special thanks to Carl for lending me some space on his blog!

​

Best regards,

Lucas Ricci Lucchi

bottom of page