Solved Problem: A Story About Big Data in a Little Space

Python    2012-07-02

As part of the API project for this iPad app I'm working on, we have to pull down, filter and ingest thousands of Gb of song data from a major music information source. This data is offered to commercial partners in two formats - a relational database or a collection of plain text files. They don't offer any tools for filtering - you have to take all of the data and then whittle down to just what you need.

The relational database turned out to be not an option. It comes in four separate tables, but includes metadata for EVERYTHING this source provides - apps, songs, books, TV shows, etc. While it would have been easier to manage the filtering via SQL, it would have meant downloading the entire database and then eliminating what we didn't need. In a truly bonehead error in judgement, I did try pulling the database pieces down to our dev db server, not realizing how full it already was. A few bad things happened, mostly tools that broke down, but it was all on staging so no real harm done. Long story short, we didn't have adequate server space to go this route, and putting in a dedicated db server just for this project was not an option due to budget limitations. This may not be a problem for you.

So I took a look at the text-based option. Text files were at least available per object type - that is, I could get text files listing music records only, as opposed to having to take app, book, and other data as well. But the text files are split by region, and as of this writing, this music data source supports 51 different regions (they just added another 32, but that change hasn't been reflected in this flat file output yet). The current text file for each region is about 2Gb zipped, and 10Gb expanded. Again, due to budget and other various limitations, I'm processing these files on a staging box, so space is a concern. I couldn't download and expand and work with all 51 files at once, as much as I would have preferred to do that.

So what I came up with was a rotation system. This data is not something that changes rapidly - at least not the portion of it that we use - and updating our own tables once a month was perfectly acceptable. I set up a dictionary assigning two region codes to each day of the month.


import os, smtplib, datetime
from time import strftime
from email.mime.text import MIMEText

allregions = {
	'arg': 1, 'aus': 1, 'aut': 2, 'bel': 2,
	'bgr': 3, 'bol': 3, 'bra': 4, 'can': 4,
	'che': 5, 'chl': 5, 'col': 6, 'cri': 6,
	'cyp': 7, 'cze': 7, 'deu': 8, 'dnk': 8,
	'dom': 9, 'ecu': 9, 'esp': 10, 'est': 10,
	'fin': 11, 'fra': 11, 'gbr': 12, 'grc': 12,
	'gtm': 13, 'hnd': 13, 'hun': 14, 'irl': 14,
	'ita': 15, 'jpn': 15, 'ltu': 16, 'lux': 16,
	'lva': 17, 'mex': 17, 'mlt': 18, 'nic': 18,
	'nld': 19, 'nor': 19, 'nzl': 20, 'pan': 20,
	'per': 21, 'pol': 21, 'prt': 22, 'pry': 22,
	'rou': 23, 'slv': 23, 'svk': 24, 'svn': 24,
	'swe': 25, 'ven': 25, 'usa': 26,
}

def getregions(regions):
	""" 
	For each region, check if the current day of the month 
	matches the number listed in the region dict
	"""
	regionlist = []
	for key, value in regions.iteritems():
		if value == now.day:
			regionlist.append(key)
	return regionlist


Obviously, this is set up via cron to run daily. As each of the current day's regions is identified, I pull down the corresponding zipped text file, unpack it, extract the records I need and then discard the source (as it is too large to store permanently):


def getzipfiles(regionlist):
	"""
	Get the zip files for the current region
	Unpack each one, extract artist records to a new file
	Cleanup - remove old files and folders
	"""

	# regionlist is the current day's regions, e.g. ['arg', 'aus',]
	for region in regionlist:
		# this savepath var is the base filepath on the server where I'm storing everything
		path = savepath + region + "/"

		# attempt to get all of the current zipped files (extension is .tbz) for this region
		os.system("wget -r -nH -nd -P"+ path + " -A.tbz http://user:password@domain.com/feed/path/"+ region +"/current/")

		"""
		One of the unfortunate things I discovered in working with wget is that 
		I can identify and pull files with a specific extension, but not a specific 
		pattern in the file name.  So until I find a better way, I'm downloading 
		all of the tbz files in the region directory, then identifying and working 
		with only the ones with the expression 'song-' prepended.
		"""
		newregionfile = ''
		for root, subFolders, files in os.walk(path):
			for file in files:
				if file[0:8] == 'song-'+region and file[-4:] == '.tbz':
					newregionfile = file

		if newregionfile:
			# unpack the file, give it a new, unique name, and move it to a 
			# folder where I can work on it without impacting other region files
			os.system("tar xjvf " + path + newregionfile + " -C " + path)
			source = path + newregionfile[:-4] + "/song-" + region + ".txt "
			target = savepath + "song-" + region + "-MYARTIST.txt"
			os.system("mv " + source + " " + target)

			# extract any lines containing the string 'MYARTIST' and put them in a new file
			parsetxtfile(target)

			# remove the folder created by unpacking the tar
			rmpath = path + newregionfile[:-4] + "/"
			rmtree(rmpath)

			# remove the downloaded source files
			os.system("rm -f " + path + "*")

			# email myself to let me know whether the file was processed or not
			region = region.upper()
			sendMail("new file for " + region + " - " + newregionfile)
		else:
			region = region.upper()
			sendMail("no new file for " + region)


This is the parse function referenced above - for each valid source file, it does a few specific tasks - extracts only those records where the third col ('artist') matches the name of the band I need ('MYARTIST' for this example), writes all of those new records to a new file, reformats the release date so that I can compare them to current records in the db, then deletes the original source file:


def parsetxtfile(sourcefile):
	region = sourcefile[-10:-7]
	# name the new files to be created
	targetone = sourcefile.replace('.txt', '-int1.txt')
	targettwo = sourcefile.replace('.txt', '-int2.txt')
	targetfile = sourcefile.replace('.txt', '-final.txt')

	# just reading lines in the source file eats up too much memory, 
	# so the first thing that must be done is to strip out just the lines you need
	os.system("grep -w ' MYARTIST ' " + sourcefile + " > " + targetone)
	os.system("sed 's/\"//g' " + targetone + " > " + targettwo)

	ft = open(targetfile, 'w')
	fh = open(targettwo,'r').readlines()
	for i in fh:
		if i.split('\t')[2] == "MYARTIST":
			r = '\t'.join(i.split('\t')[x] for x in range(17))
			original_release_date = i.split('\t')[17].replace(' ', '-') + " 00:00:00"
			release_date = i.split('\t')[18].replace(' ', '-') + " 00:00:00"
			record = r + '\t' + original_release_date + '\t' + release_date
			record = record + '\t' + '\t' + region.upper() + '\t' + '\n'
			ft.writelines(record)
	ft.close()

	os.system("rm -f " + sourcefile)
	os.system("rm -f " + targetone)
	os.system("rm -f " + targettwo)

	# and again, send myself a notice
	sendMail("files ready for insert: " + targetfile)


And in case you're curious, my simple sendMail function:


def sendMail(message, addmessage, filename):
    """ send notification of file activity """

    emailto = ['my.address@at.com',]
    fromaddr = "staging "
    toaddrs  = emailto

    msg = MIMEText(message + addmessage + " " + filename)
    msg['Subject'] = "Music data build - " + str(now.strftime("%Y-%m-%d"))
    msg['Subject'] = msg['Subject'] + str(" - " + message + addmessage)
    msg['From'] = fromaddr
    msg['To'] = ", ".join(toaddrs)

    server = smtplib.SMTP('localhost')
    server.sendmail(fromaddr, toaddrs, msg.as_string())
    server.quit()


So this runs daily, from the 1st through the 26th of each month. Then on the 27th, I run another series of scripts** that packages all that extracted data, compares it to what we already have in the database, decides what's new, what needs to be updated, etc.

** This last series of scripts has to be run manually for a few reasons - the staging server I have to use to process all this data doesn't have direct write access to the production database server, and in spite of how much of this process is automated, the new inserts and updates still need to be reviewed by a human, someone familiar with this recording artist's music, to make sure there are no errors in the music source data. (You'd be surprised how often I spot errors - mostly songs that are attributed to this artist that shouldn't be. Unfortunately, there's just no substitute for a real, live human in this case.)

The first of these scripts takes a walk through the main folder, identifies all of those target files, and inserts the data in them into a holding table:


import os, smtplib, datetime, MySQLdb as Database
from time import strftime
from email.mime.text import MIMEText

now = datetime.datetime.today()
musicdatafolder = '/your/home/path/musicdata/'
outputfile = "/your/home/path/musicdata_deletions.txt"

def findfiles():
    """
    do a non-recursive walk of the main folder
    identify eligible files
    """
    files = []
    for file in os.listdir(musicdatafolder):
        fullpathname = os.path.join(musicdatafolder, file)
        if os.path.isfile(fullpathname):
            x = len(musicdatafolder)
            name = fullpathname[x:]
            if name[8:] == '-MYARTIST-final.txt':
                files.append(file)
    return files

def do_inserts(files):
    """ 
    parse the eligible music data files and 
    insert the records into a holding table on dev
    """

    columns=[]
    columns.append('song_name','album_name','artist_name','composer_name',)
    columns.append('isrc','upc','song_price','album_price','country_code')
    insertcolumns = ", ".join(columns)

    db = Database.connect("ip", "user", "pwd", "db")
    cursor = db.cursor()

    sqlTruncate = """TRUNCATE MYARTIST.music_data_holding"""
    cursor.execute(sqlTruncate)

    for file in files:
        sourcefile = musicdatafolder + file

        print sourcefile

        fh = open(sourcefile,'r').readlines()
        for i in fh:
            insertrow = '", "'.join(i.split('\t')[x] for x in range(21))
            sql = """INSERT INTO MYARTIST.music_data_holding (%s) 
                    VALUES ("%s")""" %(insertcolumns, insertrow)
            # print sql
            cursor.execute(sql)

        print "rm -f " + sourcefile
        # os.system("rm -f " + sourcefile)

    cursor.close()
    db.close()


Next, I compare what's in the holding table to what's in our existing music data - any new record with an album name that we don't already have is marked for deletion. (When this artist releases a new album, I know about it - but just in case, that's another reason for the manual review.)


def recommend_deletions():
    """
    recommend records that should be deleted from the holding table
    this data needs to be reviewed manually
    """
    db = Database.connect("ip", "user", "pwd", "db")
    cursor = db.cursor()

    sql = """SELECT music_data_holding.id, music_data_holding.song_name, 
            music_data_holding.album_name, music_data_holding.country_code 
            FROM music_data_holding WHERE 
            music_data_holding.album_name NOT IN (SELECT DISTINCT album_name 
            FROM music_data)"""
    cursor.execute(sql)
    mismatch = cursor.fetchall()

    ofile  = open(outputfile, "wb")
    ofile.writelines('ID'+'\t'+'Song Name'+'\t'+'Album Name'+'\t'+'Country Code'+'\n')

    delete_list = []
    for record in mismatch:
        ofile.writelines(str(record[0])+'\t'+str(record[1])+'\t')
        ofile.writelines(str(record[2])+'\t'+str(record[3])+'\n')
        delete_list.append(record[0])

    ofile.close()
    cursor.close()
    db.close()

    deletions = ','.join(map(str, delete_list))
    message = "DELETE FROM music_data_holding WHERE music_data_holding.id "
    message = message + "IN (" + str(deletions) + ");" + "\n\r"
    message = message + "Review attachment for new songs before deleting." + "\n\r"
    message = message + "After deletions, run musicdata_update.py." + "\n\r"
    send_mail(message)

    return delete_list


The next script identifies records that might be new or are updates to existing songs. This one generates two emails - both with attachments for me to review, both with SQL already printed out for me to run once I've completed the manual review.


import os, smtplib, datetime, MySQLdb as Database
from time import strftime
from email.mime.text import MIMEText

now = datetime.datetime.today()
outfile1 = "/your/home/path/musicdata_newrecords.txt"
outfile2 = "/your/home/path/musicdata_updates.txt"

def find_new_records():
    """
    determine which records are new, not already in music_data
    these need to be reviewed, inserted into the database
    """
    db = Database.connect("ip", "user", "pwd", "db")
    cursor = db.cursor()

    sql = """SELECT music_data_holding.id, music_data_holding.country_code, 
            music_data_holding.song_name, music_data_holding.album_name  
            FROM music_data_holding WHERE CONCAT(music_data_holding.song_name, 
            ' - ', music_data_holding.album_name, ' - ', 
            music_data_holding.country_code) NOT IN (SELECT DISTINCT 
            CONCAT(music_data.song_name, ' - ', music_data.album_name, ' - ', 
            music_data.country_code) FROM music_data) ORDER BY 
            music_data_holding.country_code"""
    cursor.execute(sql)
    newrecords = cursor.fetchall()

    ofile  = open(outfile1, "wb")
    ofile.writelines('ID' + '\t' + 'Region' + '\t' + 'Song Name' + '\t' + 'Album Name' + '\n')
    insert_ids = []
    for record in newrecords:
        ofile.writelines(str(record[0])+'\t'+str(record[1])+'\t')
        ofile.writelines(str(record[2])+'\t'+str(record[3])+'\n')
        insert_ids.append(record[0])
    ofile.close()

    cursor.close()
    db.close()

    new_ids = ','.join(map(str, insert_ids))
    message = "INSERT INTO music_data (song_name, album_name, artist_name, 
               composer_name, isrc, upc, song_price, album_price, country_code, 
               active) SELECT song_name, album_name, artist_name, composer_name, 
               isrc, upc, song_price, album_price, country_code, '1' 
               FROM music_data_holding WHERE music_data_holding.id IN 
               (" + str(new_ids) + ");" + "\n\r"
    message = message + "Review attachment before inserting new records to music_data." + "\n\r"
    message = message + "After completing inserts and updates, run musicdata_regions.py" + "\n\r"
    send_mail(message, 'New Records', outfile1)

def find_update_records():
    """
    determine which records are potential updates, versions already in music_data
    these need review only - no mapping necessary
    """
    db = Database.connect("ip", "user", "pwd", "db")
    cursor = db.cursor()

    sql = """SELECT * FROM music_data_holding 
            WHERE CONCAT(music_data_holding.song_name, ' - ', 
            music_data_holding.album_name, ' - ', music_data_holding.country_code) 
            IN (SELECT DISTINCT CONCAT(music_data.song_name, ' - ', 
            music_data.album_name, ' - ', music_data.country_code) FROM 
            music_data) ORDER BY music_data_holding.country_code"""
    cursor.execute(sql)
    updaterecords = cursor.fetchall()

    ofile = open(outfile2, "wb")
    ofile.writelines('id'+'\t'+'song_name'+'\t'+'album_name'+'\t'+'artist_name'+'\t')
    ofile.writelines('composer_name'+'\t'+'isrc'+'\t'+'upc'+'\t'+'song_price'+'\t')
    ofile.writelines('album_price'+'\t'+'country_code'+'\t'+'last_modified'+'\t'+'active')
    ofile.writelines('\n')

    update_ids = []
    for record in updaterecords:
        ofile.writelines(str(record[0])+'\t'+str(record[1])+'\t'+str(record[2])+'\t')
        ofile.writelines(str(record[3])+'\t'+str(record[4])+'\t'+str(record[5])+'\t')
        ofile.writelines(str(record[6])+'\t'+str(record[7])+'\t'+str(record[8])+'\t')
        ofile.writelines(str(record[9])+'\t'+str(record[10])+'\t'+str(record[11]))
        ofile.writelines('\n')
        update_ids.append(record[0])
    ofile.close()

    cursor.close()
    db.close()

    new_ids = ','.join(map(str, update_ids))
    message = "SELECT * FROM music_data_holding WHERE music_data_holding.id"
    message = message+" IN ("+str(new_ids)+");"+"\n\r"+"Review attachment"
    message = message+" before updating records in music_data."+"\n\r"
    send_mail(message, 'Update Records', outfile2)


There is a final script that does some mapping once new records are in place, but it's written in much the same style as these others - pull the data, generate a text file for review, attach that to an email with the sql to run embedded in the body.