o7planning

Connect to MySQL Database in Python using PyMySQL

  1. What is PyMySQL?
  2. Install PyMySQL
  3. Sample Database
  4. Connect MySQL from Python with PyMySQL
  5. Query Example
  6. Insert Example
  7. Update Example
  8. Delete Example
  9. Call Procedure
  10. Call Function

1. What is PyMySQL?

In order to connect Python to a database you need a driver, which is a library used to Interact with the database. For MySQL database, you have such 3 Driver choices:
  • MySQL/connector for Python
  • MySQLdb
  • PyMySQL
Driver
Discription
MySQL/Connector for Python
This is a library provided by the MySQL community.
MySQLdb
MySQLdb is a library that connects to MySQL from Python, it is written in C language and it is free and open source software.
PyMySQL
This is a library that connects to MySQL from Python and it is a pure Python library. PyMySQL's goal is to replace MySQLdb and work on CPython, PyPy and IronPython.
PyMySQL is an open source project, and you can see its source code here:

2. Install PyMySQL

In order to install PyMySQL on Windows (or Ubuntu/Linux) you need to open the CMD window, and run the following statement:
pip install PyMySQL

3. Sample Database

"Simplehr" is a sample database used in many tutorials on o7planning. In the post, I also use it. You can create the database based on the guide below:

4. Connect MySQL from Python with PyMySQL

The following simple example uses Python to connect to MySQL and query the Department table:
connectExample.py
import pymysql.cursors   
# Connect to the database.
connection = pymysql.connect(host='192.168.5.134',
                             user='root',
                             password='1234',                             
                             db='simplehr',
                             charset='utf8mb4',
                             cursorclass=pymysql.cursors.DictCursor) 
print ("connect successful!!") 
try:  
    with connection.cursor() as cursor: 
        # SQL 
        sql = "SELECT Dept_No, Dept_Name FROM Department " 
        # Execute query.
        cursor.execute(sql) 
        print ("cursor.description: ", cursor.description) 
        print() 
        for row in cursor:
            print(row) 
finally:
    # Close connection.
    connection.close()
Results of the example:
connect successful!!
cursor.description: (('Dept_No', 253, None, 80, 80, 0, False), ('Dept_Name', 253, None, 1020, 1020, 0, False))

{'Dept_No': 'D10', 'Dept_Name': 'ACCOUNTING'}
{'Dept_No': 'D20', 'Dept_Name': 'RESEARCH'}
{'Dept_No': 'D30', 'Dept_Name': 'SALES'}
{'Dept_No': 'D40', 'Dept_Name': 'OPERATIONS'}
Utility Module:
The advice here is that you should create a utility module to connect to the database. In case I create a module named as "myconnutils", which defines the getConnection () function to returns a connection.
myconnutils.py
import pymysql.cursors   
# Function return a connection.
def getConnection(): 
    # You can change the connection arguments.
    connection = pymysql.connect(host='192.168.5.129',
                                 user='root',
                                 password='1234',                             
                                 db='simplehr',
                                 charset='utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor)
    return connection

5. Query Example

The following example queries the Employee table, Python uses %s as a "placeholder" for the parameter, which is independent of the parameter type. For example:
sql1 = "Insert into Department (Dept_Id, Dept_No, Dept_Name) values (%s, %s, %s) "

sql2 = "Select * from Employee Where Dept_Id = %s "
queryExample.py
# Use your utility module.
import myconnutils 

connection = myconnutils.getConnection() 
print ("Connect successful!") 
sql = "Select Emp_No, Emp_Name, Hire_Date from Employee Where Dept_Id = %s " 
try :
    cursor = connection.cursor() 
    # Execute sql, and pass 1 parameter.
    cursor.execute(sql, ( 10 ) )  
    print ("cursor.description: ", cursor.description) 
    print() 
    for row in cursor:
        print (" ----------- ")
        print("Row: ", row)
        print ("Emp_No: ", row["Emp_No"])
        print ("Emp_Name: ", row["Emp_Name"])
        print ("Hire_Date: ", row["Hire_Date"] , type(row["Hire_Date"]) ) 
finally:
    # Close connection.
    connection.close()

6. Insert Example

insertExample.py
# Use your utility module.
import myconnutils
import pymysql.cursors  

connection = myconnutils.getConnection() 
print ("Connect successful!")  
try :
    cursor = connection.cursor() 
    sql = "Select max(Grade) as Max_Grade from Salary_Grade "
    cursor.execute(sql) 
    # 1 row.
    oneRow = cursor.fetchone()      

    # Output: {'Max_Grade': 4} or {'Max_Grade': None}
    print ("Row Result: ", oneRow) 
    grade = 1
    
    if oneRow != None and oneRow["Max_Grade"] != None:
        grade = oneRow["Max_Grade"] + 1 
    cursor = connection.cursor()  
    sql =  "Insert into Salary_Grade (Grade, High_Salary, Low_Salary) " \
         + " values (%s, %s, %s) " 
    print ("Insert Grade: ", grade)  
    # Execute sql, and pass 3 parameters.
    cursor.execute(sql, (grade, 2000, 1000 ) ) 
    connection.commit()  
finally: 
    connection.close()
Output:
connect successful!!
Row Result: {'Max_Grade': 2}
Insert Grade: 3

7. Update Example

updateExample.py
# Use your utility module.
import myconnutils
import pymysql.cursors 
import datetime 

connection = myconnutils.getConnection() 
print ("Connect successful!")  
try :
    cursor = connection.cursor() 
    sql = "Update Employee set Salary = %s, Hire_Date = %s where Emp_Id = %s "   
    # Hire_Date
    newHireDate = datetime.date(2002, 10, 11) 
    # Execute sql, and pass 3 parameters.
    rowCount = cursor.execute(sql, (850, newHireDate, 7369 ) ) 
    connection.commit()  
    print ("Updated! ", rowCount, " rows") 
finally:
    # Close connection.
    connection.close()
Output:
connect successful!
Update! 1 rows

8. Delete Example

deleteExample.py
# Use your utility module.
import myconnutils 

connection = myconnutils.getConnection() 
print ("Connect successful!")  
try :
    cursor = connection.cursor() 
    sql = "Delete from Salary_Grade where Grade = %s"  
    
    # Execute sql, and pass 1 parameters.
    rowCount = cursor.execute(sql, ( 3 ) ) 
    connection.commit()  
    print ("Deleted! ", rowCount, " rows") 
finally:
    # Close connection.
    connection.close()
Output:
connect successful!
Deleted! 1 rows

9. Call Procedure

There are some problems when you call a function or procedure in Python. I set up a situation like this:
You have a procedure:
  • Get_Employee_Info(p_Emp_Id, v_Emp_No, v_First_Name, v_Last_Name, v_Hire_Date)
get_Employee_Info
DELIMITER $$

-- This procedure retrieves information of an employee,
-- Input parameter: p_Emp_ID (Integer)
-- There are four output parameters v_Emp_No, v_First_Name, v_Last_Name, v_Hire_Date

CREATE PROCEDURE get_Employee_Info(p_Emp_ID     Integer,
                                 out       v_Emp_No        Varchar(50) ,
                                 out       v_First_Name    Varchar(50) ,
                                 Out       v_Last_name    Varchar(50) ,
                                 Out       v_Hire_date      Date)
BEGIN
set v_Emp_No  = concat( 'E' , Cast(p_Emp_Id as char(15)) );
--
set v_First_Name = 'Michael';
set v_Last_Name  = 'Smith';
set v_Hire_date  = curdate();
END
The procedure above has an input parameter p_Emp_Id and the four output parameters v_Emp_No, v_First_Name, v_Last_Name, v_Hire_Date, and you call this procedure from Python passing the value to p_Emp_Id to get 4 output values. Unfortunately, the value received is not guaranteed to be true (as stated in the DB-API specification). Python can only retrieve values from a SELECT clause.
DB-API specification:
def callproc(self, procname, args=()):
    """Execute stored procedure procname with args

    procname -- string, name of procedure to execute on server

    args -- Sequence of parameters to use with procedure

    Returns the original args.

    Compatibility warning: PEP-249 specifies that any modified
    parameters must be returned. This is currently impossible
    as they are only available by storing them in a server
    variable and then retrieved by a query. Since stored
    procedures return zero or more result sets, there is no
    reliable way to get at OUT or INOUT parameters via callproc.
    The server variables are named @_procname_n, where procname
    is the parameter above and n is the position of the parameter
    (from zero). Once all result sets generated by the procedure
    have been fetched, you can issue a SELECT @_procname_0, ...
    query using .execute() to get any OUT or INOUT values.

    Compatibility warning: The act of calling a stored procedure
    itself creates an empty result set. This appears after any
    result sets generated by the procedure. This is non-standard
    behavior with respect to the DB-API. Be sure to use nextset()
    to advance through all result sets; otherwise you may get
    disconnected.
    """
However you can still solve the problem above, you need to wrap Get_Employee_Info procedure by another procedure (for example Get_Employee_Info_Wrap), this procedure returns the values from the SELECT clause.
get_Employee_Info_Wrap
DROP procedure IF EXISTS `get_Employee_Info_Wrap`;

DELIMITER $$

-- This procedure wrap Get_Employee_info
CREATE PROCEDURE get_Employee_Info_Wrap(p_Emp_ID     Integer,
                                   out       v_Emp_No        Varchar(50) ,
                                   out       v_First_Name    Varchar(50) ,
                                   Out       v_Last_name    Varchar(50) ,
                                   Out       v_Hire_date      Date)
BEGIN
Call get_Employee_Info( p_Emp_Id, v_Emp_No, v_First_Name, v_Last_Name, v_Hire_Date); 
-- SELECT
Select v_Emp_No, v_First_Name, v_Last_Name, v_Hire_Date;  
END
Instead of calling the Get_Employee_Info procedure in Python, call Get_Employee_Info_Wrap procedure.
callProcedureExample.py
# Use your utility module.
import myconnutils 
import datetime 

connection = myconnutils.getConnection() 
print ("Connect successful!")  
try :
    cursor = connection.cursor() 
    # Get_Employee_Info_Wrap               
    # @p_Emp_Id       Integer ,
    # @v_Emp_No       Varchar(50)   OUTPUT
    # @v_First_Name   Varchar(50)   OUTPUT
    # @v_Last_Name    Varchar(50)   OUTPUT
    # @v_Hire_Date    Date          OUTPUT   
    v_Emp_No = ""
    v_First_Name= ""
    v_Last_Name= ""
    v_Hire_Date = None
    
    inOutParams = ( 100, v_Emp_No, v_First_Name , v_Last_Name, v_Hire_Date ) 
    resultArgs = cursor.callproc("Get_Employee_Info_Wrap" , inOutParams   )  
    
    print ('resultArgs:', resultArgs )
    print ( 'inOutParams:', inOutParams ) 
    print (' ----------------------------------- ') 
    for row in cursor:
        print('Row: ',  row )
        print('Row[v_Emp_No]: ',  row['v_Emp_No'] )
        print('Row[v_First_Name]: ',  row['v_First_Name'] )
        print('Row[v_Last_Name]: ',  row['v_Last_Name'] ) 
        # datetime.date
        v_Hire_Date =  row['v_Hire_Date'] 
        print('Row[v_Hire_Date]: ', v_Hire_Date )  
finally:
    # Close connection.
    connection.close()
Run the example:
connect successful!
resultArgs: (100, '', '', '', None)
inOutParams: (100, '', '', '', None)
 -----------------------------------
Row: {'v_Emp_No': 'E100', 'v_First_Name': 'Michael', 'v_Last_Name': 'Smith', 'v_Hire_Date': datetime.date(2017, 5, 17)}
Row[v_Emp_No]: E100
Row[v_First_Name]: Michael
Row[v_Last_Name]: Smith
Row[v_Hire_Date]: 2017-05-17

10. Call Function

To call a function in Python, you should create a query clause, and execute this query.
Here is the Get_Emp_No function, the input parameter is p_Emp_Id and returns Emp_No (Employee Code).
Get_Emp_No
DROP function  if Exists `Get_Emp_No`;
 
DELIMITER $$

CREATE Function Get_Emp_No (p_Emp_Id  Integer) Returns Varchar(50)
Begin    

   return  concat('E', CAST(p_Emp_Id  as  char)) ;
  
END;
callFunctionExample.py
# Use your utility module.
import myconnutils 
import datetime 

connection = myconnutils.getConnection() 
print ("Connect successful!")  
try :
    cursor = connection.cursor() 
    # Get_Employee_Info_Wrap               
    # @p_Emp_Id       Integer  
    v_Emp_No = ""  
    inOutParams = ( 100 ) 
    sql = "Select Get_Emp_No(%s) as Emp_No " 
    cursor.execute(sql, ( 100 ) )  
    print (' ----------------------------------- ') 
    for row in cursor:
        print('Row: ',  row )
        print('Row[Emp_No]: ',  row['Emp_No'] )  
finally:
    # Close connection.
    connection.close()
Running the example:
connect successful!
 -----------------------------------  
Row: {'Emp_No': 'E100'}
Row[Emp_No]: E100