Skip to content
This repository was archived by the owner on Oct 14, 2018. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 37 additions & 34 deletions pyzipcode/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
from settings import db_location
from .settings import db_location
try:
import sqlite3
except ImportError:
from pysqlite2 import dbapi2 as sqlite3
import math
import time

class ConnectionManager(object):
"""
Assumes a database that will work with cursor objects
"""

def __init__(self):
# test out the connection...
conn = sqlite3.connect(db_location)
conn.close()

def query(self, sql, args):
conn = None
retry_count = 0
Expand All @@ -24,13 +23,13 @@ def query(self, sql, args):
# then just give up...
try:
conn = sqlite3.connect(db_location)
except sqlite3.OperationalError, x:
except sqlite3.OperationalError:
retry_count += 1
time.sleep(0.001)

if not conn and retry_count > 10:
raise sqlite3.OperationalError("Can't connect to sqlite database.")

cursor = conn.cursor()
cursor.execute(sql, args)
res = cursor.fetchall()
Expand All @@ -39,7 +38,6 @@ def query(self, sql, args):

ZIP_QUERY = "SELECT * FROM ZipCodes WHERE zip=?"
ZIP_RANGE_QUERY = "SELECT * FROM ZipCodes WHERE longitude >= %s and longitude <= %s AND latitude >= %s and latitude <= %s"
ZIP_FIND_QUERY = "SELECT * FROM ZipCodes WHERE city LIKE ? AND state LIKE ?"

class ZipCode(object):
def __init__(self, data):
Expand All @@ -59,56 +57,61 @@ def format_result(zips):

class ZipNotFoundException(Exception):
pass

class ZipCodeDatabase(object):

def __init__(self, conn_manager=None):
if conn_manager is None:
conn_manager = ConnectionManager()
self.conn_manager = conn_manager

def get_zipcodes_around_radius(self, zip, radius):
zips = self.get(zip)
if zips is None:
raise ZipNotFoundException("Could not find zip code you're searching by.")
else:
zip = zips[0]

radius = float(radius)

long_range = (zip.longitude-(radius/69.0), zip.longitude+(radius/69.0))
lat_range = (zip.latitude-(radius/49.0), zip.latitude+(radius/49.0))

return format_result(self.conn_manager.query(ZIP_RANGE_QUERY % (
long_range[0], long_range[1],
lat_range[0], lat_range[1]
)))

def find_zip(self, city=None, state=None):
if city is None:
city = "%"
else:
city = city.upper()

if state is None:
state = "%"

def find_zip(self, city=None, state=None, limit=None, exact=False):
cmp = ' LIKE ' if not exact else '='
if city is None and state is None:
q = "SELECT * FROM ZipCodes"
elif state is not None and city is not None:
q = "SELECT * FROM ZipCodes WHERE city%(cmp)s? AND state%(cmp)s?" % {'cmp': cmp}
else:
state = state.upper()

return format_result(self.conn_manager.query(ZIP_FIND_QUERY, [city, state]))

if city is None:
q = "SELECT * FROM ZipCodes WHERE state%s?" % cmp
else:
city = city.upper()

if state is None:
q = "SELECT * FROM ZipCodes WHERE city%s?" % cmp
else:
state = state.upper()

if limit is not None:
q+= ' LIMIT 0, %d' % limit

args = [arg for arg in [city, state] if arg is not None]
return format_result(self.conn_manager.query(q, args))

def get(self, zip):
return format_result(self.conn_manager.query(ZIP_QUERY, [zip]))

def __getitem__(self, zip):
zip = self.get(str(zip))
if zip is None:
raise IndexError("Couldn't find zip")
else:
return zip[0]