SQL to AVRO using Python3
First read SQL server using Python3 and PyODBC.
Start out downloading PyODBC via pip from pypi.org. “pip install pyodbc”. I created a file called getsql.py. I used AdventureWorksDW2017 downloaded from Microsoft in this example. This function will return a list of customers that will be used in creating an Avro file.
import pyodbc
def customers():
conn = pyodbc.connect('Driver={SQL Server};'
'Server=name of the sql server;'
'Database=AdventureWorksDW2017;'
'Trusted_Connection=yes;')
custs = conn.cursor()
customers = custs.execute("""SELECT top 1 CustomerKey,
GeographyKey,
CustomerAlternateKey,
FirstName,
COALESCE(MiddleName, '-') MiddleName,
LastName,
BirthDate,
Gender,
cast(YearlyIncome as float) YearlyIncome
FROM DimCustomer
""")
return customers
To create an Avro file, we first need to define a schema. The schema is of a JSON style and can validate input data and include complex objects.
#Json Schema used to format Avro file
schema = avro.schema.Parse(json.dumps(
{
'namespace': 'AVRO',
'type': 'record',
'name': 'Customer',
'fields': [
{'name': 'CustomerKey','type': 'int'},
{'name': 'GeographyKey', 'type': 'int'},
{'name': 'CustomerAlternateKey', 'type': 'string'},
{'name': 'FirstName', 'type': 'string'},
{'name': 'MiddleName', 'type': 'string'},
{'name': 'LastName', 'type': 'string'},
{'name': 'BirthDate', 'type': 'string'},
{'name': 'Gender', 'type': 'string'},
{'name': 'YearlyIncome', 'type': 'float'}
]
}))
The function CreateAvroFile() loads the result from getSQL and adds it to the list customers. Then the list is written to Avro format using DataFileWriter row by row using the schema. The row_writer.close() saves the file.
#Function to write Avro file iterated over SQL input in getsql
def CreateAvroFile():
customers = getsql.customers()
#opens file object using wb(write binary)
file = open(filename, 'wb')
datum_writer = DatumWriter()
row_writer = DataFileWriter(file, datum_writer, schema)
for customer in customers:
row_writer.append({
'CustomerKey' : int(customer.CustomerKey),
'GeographyKey' : int(customer.GeographyKey),
'CustomerAlternateKey' : customer.CustomerAlternateKey,
'FirstName' : customer.FirstName,
'MiddleName' : customer.MiddleName,
'LastName' : customer.LastName,
'BirthDate' : customer.BirthDate,
'Gender' : customer.Gender,
'YearlyIncome' : customer.YearlyIncome
})
row_writer.close()
To read the content of the file we use the DataFileReader instead. As you can see in the print output you can read them as objects using square brackets and single quotes.
#Read and print function
def PrintOutResult():
#opens file object using rb(read binary)
fileObject = open(filename, "rb")
datum_reader = DatumReader()
filereader = DataFileReader(fileObject, datum_reader)
#Iteration of list to show two ways of printing the data
for rec in filereader:
print('Row for: ' + rec['FirstName'] + ' ' + rec['LastName'])
print(rec)
filereader.close()
The complete code:
#used version avro-python3 1.9.2.1
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
import getsql
import json
import avro.schema
import os
filename = 'Customer.avro'
#Deletes file exists
if os.path.exists(filename):
os.remove(filename)
#Json Schema used to format Avro file
schema = avro.schema.Parse(json.dumps(
{
'namespace': 'AVRO',
'type': 'record',
'name': 'Customer',
'fields': [
{'name': 'CustomerKey','type': 'int'},
{'name': 'GeographyKey', 'type': 'int'},
{'name': 'CustomerAlternateKey', 'type': 'string'},
{'name': 'FirstName', 'type': 'string'},
{'name': 'MiddleName', 'type': 'string'},
{'name': 'LastName', 'type': 'string'},
{'name': 'BirthDate', 'type': 'string'},
{'name': 'Gender', 'type': 'string'},
{'name': 'YearlyIncome', 'type': 'float'}
]
}))
#Function to write Avro file iterated over SQL input in getsql
def CreateAvroFile():
customers = getsql.customers()
#opens file object using wb(write binary)
file = open(filename, 'wb')
datum_writer = DatumWriter()
row_writer = DataFileWriter(file, datum_writer, schema)
for customer in customers:
row_writer.append({
'CustomerKey' : int(customer.CustomerKey),
'GeographyKey' : int(customer.GeographyKey),
'CustomerAlternateKey' : customer.CustomerAlternateKey,
'FirstName' : customer.FirstName,
'MiddleName' : customer.MiddleName,
'LastName' : customer.LastName,
'BirthDate' : customer.BirthDate,
'Gender' : customer.Gender,
'YearlyIncome' : customer.YearlyIncome
})
row_writer.close()
#Read and print function
def PrintOutResult():
#opens file object using rb(read binary)
fileObject = open(filename, "rb")
datum_reader = DatumReader()
filereader = DataFileReader(fileObject, datum_reader)
#Iteration of list to show two ways of printing the data
for rec in filereader:
print('Row for: ' + rec['FirstName'] + ' ' + rec['LastName'])
print(rec)
filereader.close()
CreateAvroFile()
PrintOutResult()