Tuesday, February 10, 2009

Threading and SQLAlchemy

I thought I would make my first post special, and actually have some examples in it. I have spent the day struggling with SQLAlchemy, figuring out how to actually log stuff to my log, and accessing a MySQL database from within a Thread, and finally got it working, so I thought I would try and post some examples. Because I had a heck of a time finding clear and simple examples.

First, to use threading you have to create the session differently than if you are single threaded.

Here is the import:
from sqlalchemy.orm import scoped_session, sessionmaker
Here is how you create the scoped_session:

self.mysql_db = create_engine('mysql://root:passwd@localhost/databasename', echo=False, pool_size=80)
self.metadata = MetaData(bind=self.mysql_db)
self.Session = scoped_session(sessionmaker(self.mysql_db, autoflush=True, autocommit=True))
self.session = self.Session()

Then you can pass the database into a thread, and create a new session in the thread. The scoped_session will then do all the heavy lifting for you.


Here is the code:
class DoCheckin(threading.Thread):
def __init__(self, url, name, db=None, testid=3, save=False, ql=None):
self.url = url
self.name = name
self.q = q
self.db = db
self.session = self.db.Session()
self.ql = ql
self.testid = testid
self.save = save
self.host = 'localhost'
self.usr = 'root'
self.pwd = 'password'
self.database = 'loadtest'
threading.Thread.__init__(self)
self.logger = logging.getLogger('ILAPPhase2')
#self.con = MySQLdb.connect(host=self.host, user=self.usr, passwd=self.pwd, db=self.database)
#self.cur = self.con.cursor()

def run(self):
self.logger.info('Thread %s Started', self.getName())
count = self.session.query(ZipCodes).count()
self.logger.debug(str(self.getName()) + ': ' + str(count))
self.logger.info('Thread %s Ended', self.getName())
#self.db.Session.close()
self.db.Session.remove()

def getUrl(self):
pass

def RetrieveDataFromURL(self):
pass


You will notice that I have passed in my database, and from that I set the session, technically I can eliminate this line:

self.session = self.db.Session()

if I wanted to because in the scoped_session most of the methods are availble to the Session() instance if I have read the documentation correctly.

In the run() function you will see that I have called the session with a query that gets the count of a table, not very fancy, but I was just trying to get this to work.

I then close() and remove() the session, with autocommit=True this returns the Connection to the pool.

I originally had autocommit=False and was calling self.session.commit(), this did nothing but cause my program to crash very quickly.

So my recommendation is to use autocommit=True and do the remove(). I have done some more testing and you don't need to close, I have commented it out in the above.

So there you have it a simple threadsafe way to use SQLAlchemy with Threading.

Let me know if you have any comments.

My next post, Logging SQLAlchemy to your common logfile.

1 comment:

  1. Thank-you very much for this post as I am struggling with the same issue. What do you pass as the "db" argument in the DoCheckin argument?

    ReplyDelete