top of page

SQL to AVRO using Python3

First read SQL server using Python3 and PyODBC.  

Start out downloading PyODBC via pip from pypi.org. “pip install pyodbc”. I created a file called getsql.py. I used AdventureWorksDW2017 downloaded from Microsoft in this example. This function will return a list of customers that will be used in creating an Avro file. 

import pyodbc 

def customers():

    conn = pyodbc.connect('Driver={SQL Server};'

                      'Server=name of the sql server;'

                      'Database=AdventureWorksDW2017;'

                      'Trusted_Connection=yes;')

    custs = conn.cursor()

 

    customers = custs.execute("""SELECT top 1 CustomerKey, 

                                GeographyKey, 

                                CustomerAlternateKey, 

                                FirstName, 

                                COALESCE(MiddleName, '-') MiddleName, 

                                LastName, 

                                BirthDate, 

                                Gender, 

                                cast(YearlyIncome as float) YearlyIncome 

                                FROM DimCustomer

                                """)

    return customers  

To create an Avro file, we first need to define a schema.  The schema is of a JSON style and can validate input data and include complex objects.

#Json Schema used to format Avro file

schema = avro.schema.Parse(json.dumps(

{   

    'namespace': 'AVRO',

    'type': 'record',

    'name': 'Customer',

    'fields': [

     {'name': 'CustomerKey','type': 'int'},

     {'name': 'GeographyKey', 'type': 'int'},

     {'name': 'CustomerAlternateKey', 'type': 'string'},

     {'name': 'FirstName', 'type': 'string'},

     {'name': 'MiddleName', 'type': 'string'},

     {'name': 'LastName', 'type': 'string'},

     {'name': 'BirthDate', 'type': 'string'},

     {'name': 'Gender', 'type': 'string'},

     {'name': 'YearlyIncome', 'type': 'float'}

 ]

}))

The function CreateAvroFile() loads the result from getSQL and adds it to the list customers. Then the list is written to Avro format using DataFileWriter row by row using the schema. The row_writer.close() saves the file.  

#Function to write Avro file iterated over SQL input in getsql 

def CreateAvroFile():

    customers = getsql.customers()

 

    #opens file object using wb(write binary)

    file = open(filename, 'wb')

    datum_writer = DatumWriter()

    row_writer = DataFileWriter(file, datum_writer, schema)

    for customer in customers:

        row_writer.append({

            'CustomerKey' : int(customer.CustomerKey), 

            'GeographyKey' : int(customer.GeographyKey), 

            'CustomerAlternateKey' : customer.CustomerAlternateKey, 

            'FirstName' : customer.FirstName, 

            'MiddleName' : customer.MiddleName, 

            'LastName' : customer.LastName, 

            'BirthDate' : customer.BirthDate, 

            'Gender' : customer.Gender, 

            'YearlyIncome' : customer.YearlyIncome

            })

    row_writer.close()

To read the content of the file we use the DataFileReader instead. As you can see in the print output you can read them as objects using square brackets and single quotes.

#Read and print function 

def PrintOutResult():

    #opens file object using rb(read binary)

    fileObject = open(filename, "rb") 

    datum_reader = DatumReader()

    filereader = DataFileReader(fileObject, datum_reader)

#Iteration of list to show two ways of printing the data

    for rec in filereader:

        print('Row for: ' + rec['FirstName'] + ' ' + rec['LastName'])

        print(rec)

    filereader.close()

The complete code:

#used version avro-python3 1.9.2.1

from avro.datafile import DataFileWriter, DataFileReader

from avro.io import DatumWriter, DatumReader

import getsql

import json

import avro.schema

import os

 

filename = 'Customer.avro'

#Deletes file exists

if os.path.exists(filename):

    os.remove(filename)

 

#Json Schema used to format Avro file

schema = avro.schema.Parse(json.dumps(

{   

    'namespace': 'AVRO',

    'type': 'record',

    'name': 'Customer',

    'fields': [

     {'name': 'CustomerKey','type': 'int'},

     {'name': 'GeographyKey', 'type': 'int'},

     {'name': 'CustomerAlternateKey', 'type': 'string'},

     {'name': 'FirstName', 'type': 'string'},

     {'name': 'MiddleName', 'type': 'string'},

     {'name': 'LastName', 'type': 'string'},

     {'name': 'BirthDate', 'type': 'string'},

     {'name': 'Gender', 'type': 'string'},

     {'name': 'YearlyIncome', 'type': 'float'}

 ]

}))

 

#Function to write Avro file iterated over SQL input in getsql 

def CreateAvroFile():

    customers = getsql.customers()

 

    #opens file object using wb(write binary)

    file = open(filename, 'wb')

    datum_writer = DatumWriter()

    row_writer = DataFileWriter(file, datum_writer, schema)

    for customer in customers:

        row_writer.append({

            'CustomerKey' : int(customer.CustomerKey), 

            'GeographyKey' : int(customer.GeographyKey), 

            'CustomerAlternateKey' : customer.CustomerAlternateKey, 

            'FirstName' : customer.FirstName, 

            'MiddleName' : customer.MiddleName, 

            'LastName' : customer.LastName, 

            'BirthDate' : customer.BirthDate, 

            'Gender' : customer.Gender, 

            'YearlyIncome' : customer.YearlyIncome

            })

    row_writer.close()

 

#Read and print function 

def PrintOutResult():

    #opens file object using rb(read binary)

    fileObject = open(filename, "rb") 

    datum_reader = DatumReader()

    filereader = DataFileReader(fileObject, datum_reader)

#Iteration of list to show two ways of printing the data

    for rec in filereader:

        print('Row for: ' + rec['FirstName'] + ' ' + rec['LastName'])

        print(rec)

    filereader.close()

 

CreateAvroFile()

 

PrintOutResult()

AVRO
bottom of page