PostgreSQL Python
| Introduction | |
| Connection to db with psycopg2 | |
| Create DB | |
| Create table | |
| Insert data | |
| Update data | |
| Drop table | |
| Check connection in docker | |
| Related Articles |
Introduction
In this article you can learn how to use
PostgreSQL
with Python.
Before trying to connect to Postgres check that it is
installed
and running.
You can find a manual on installing Postgres 17 in
Windows 11
here
Simple connection with psycopg2
Let's look at a simple example of connecting to a database using
psycopg2
We will store the credentials in a
database.ini
file. To read configs will use
config.py
script based on
configparser
and we will implement the connection itself in
pg_demo.py
pg_demo/ |-- .gitignore |-- config.py |-- database.ini `-- pg_demo.py
# .gitignore database.ini __pycache__
В файле
database.ini
the most important is the password field. You need to specify the password that was used when installing Postgres.
The default port 5432 can be omitted, but if there are several servers, they will be on different ports.
[postgresql] host=localhost port=5432 database=postgres user=postgres password=secret
#!/usr/bin/python # config.py import os from configparser import ConfigParser def config(filename="database.ini", section="postgresql"): # create a parser parser = ConfigParser() file_path = os.path.join(os.path.dirname(__file__), filename) # read config file parser.read(file_path) # get section, default to postgresql db = {} if parser.has_section(section): params = parser.items(section) for param in params: db[param[0]] = param[1] else: raise Exception(f"Section {section} not found in the {filename} file") return db
#!/usr/bin/python import psycopg2 from config import config def connect(): """ Connect to the PostgreSQL database server """ conn = None try: # read connection parameters params = config() # connect to the PostgreSQL server print('Connecting to the PostgreSQL database...') conn = psycopg2.connect(**params) # create a cursor cur = conn.cursor() # execute a statement print('PostgreSQL database version:') cur.execute('SELECT version()') # display the PostgreSQL database server version db_version = cur.fetchone() print(db_version) # close the communication with the PostgreSQL cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() print('Database connection closed.') if __name__ == '__main__': connect()
python pg_demo.py
Connecting to the PostgreSQL database... PostgreSQL database version: ('PostgreSQL 12.8, compiled by Visual C++ build 1914, 64-bit',) Database connection closed.
Create DB
To create a database, we use the setup from the previous example. Add the file to it
create_db.py
We will connect using
config.py
Please note that to create a new database we connect to the default postgres database. After creating a new database, you will need to connect directly to it to work with it.
If you are using a freshly installed Postgres, it will look like this in pgAdmin
pgAdmin
Create my_db database
# ctreate_db.py import psycopg2 from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from config import config params = config() con = psycopg2.connect(**params) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = con.cursor() cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier("my_db"))) cur.close() con.close() print("Database created successfully.")
python create_db.py
Database created successfully
pgAdmin
Create table
To create a new table in our new database my_db first we will create a dedicated setting section in the database.ini file
[my_db] host=localhost port=5432 database=my_db user=postgres password=secret
Let's create a new script
create_table.py
To establish a connection we will still use
config.py
,
but now we should explicitly pass the section argument.
In the pgAdmin we can see that the database is empty.
pgAdmin
# create_table.py import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from config import config params = config(filename="database.ini", section="my_db") con = psycopg2.connect(**params) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = con.cursor() create_table_query = ''' CREATE TABLE cities ( id SERIAL PRIMARY KEY, name TEXT, country TEXT, population INT, founded DATE, capital BOOL, visited BOOL, upd_timestamp BIGINT ); ''' cur.execute(create_table_query) cur.close() con.close() print("Table created successfully.")
python create_table.py
Table created successfully
In pgAdmin we see that an empty table cities has appeared in the my_db database.
pgAdmin
Insert data to the table
Let's create the
insert_to_table.py
script,
which will add new rows to the cities table of the my_db database
First, let's look at the classic way of adding data
# insert_to_table_classic.py import time import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from config import config from datetime import date def insert_to_table(table_name: str, data: tuple) -> None: params = config(filename="database.ini", section="my_db") con = psycopg2.connect(**params) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = con.cursor() cur.execute(f"INSERT INTO {table_name} " f"(name, " f"country, " f"population, " f"founded, " f"capital, " f"visited, " f"upd_timestamp) " f"VALUES (%s, %s, %s, %s, %s, %s, %s)", data) cur.close() con.close() print("Data inserted successfully.") data = ("Narva", "Estonia", 40000, date(1172, 1, 1), False, True, int(time.time())) if __name__ == "__main__": insert_to_table("cities", data)
python insert_to_table_classic.py
Data inserted successfully.
In pgAdmin we see that in the cities table a record about Narva has appeared
pgAdmin
In order to add a record, we had to list all the columns of the table and also set the corresponding number %s. We passed the data to the function as a tuple
Add data using dictionary
There is a way to add data to a table without explicitly specifying the column names. All the necessary values are taken from the dictionary . When using INSERT, the advantage of this method is debatable. It is much better revealed when updating the table
# insert_to_table.py import time import psycopg2 from psycopg2.extensions import AsIs from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from config import config from datetime import date def insert_to_table(table_name: str, data: dict) -> None: params = config(filename="database.ini", section="my_db") con = psycopg2.connect(**params) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = con.cursor() data.update({"upd_timestamp": int(time.time())}) columns = data.keys() values = [data[column] for column in columns] insert_statement = f"INSERT INTO {table_name} (%s) VALUES %s" cur.execute(insert_statement, (AsIs(','.join(columns)), tuple(values))) cur.close() con.close() print("Data inserted successfully.") data = { "name": "Riga", "country": "Latvia", "population": 200000, "founded": date(1201, 1, 1 ), "capital": True, "visited": True } if __name__ == "__main__": insert_to_table("cities", data)
python insert_to_table.py
Data inserted successfully.
In pgAdmin we see that in the cities table a record about Riga has appeared
pgAdmin
Update table
Let's update the data about Narva in the classical way.
# update_table_classic.py import time import psycopg2 from datetime import date from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from config import config def update_table(name: str, table_name: str, values: tuple) -> None: params = config(filename="database.ini", section="my_db") con = psycopg2.connect(**params) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = con.cursor() update_query = (f"UPDATE {table_name} SET " f"name = %s, " f"country = %s, " f"population = %s, " f"founded = %s, " f"capital = %s, " f"visited = %s, " f"upd_timestamp = %s " f"WHERE name = '{name}'") cur.execute(update_query, values) cur.close() con.close() print("Data updated successfully.") values = ("Narva", "Estonia", 53626, date(1172, 1, 1), False, True, int(time.time())) if __name__ == "__main__": update_table("Narva", "cities", values)
python update_table_classic.py
Data updated successfully
In pgAdmin we see that the population data and the upd_timestamp field have been updated.
pgAdmin
Updating individual fields
In the previous example, all fields were updated. Some were updated with the same values, others with new ones. If you need to update only certain fields, you need to exclude the unnecessary ones from the query.
Let's look at an example of updating the fields population and upd_timestamp in the classic way.
# update_table_classic.py import time import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from config import config def update_table(name: str, table_name: str, values: tuple) -> None: params = config(filename="database.ini", section="my_db") con = psycopg2.connect(**params) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = con.cursor() update_query = (f"UPDATE {table_name} SET " f"population = %s, " f"upd_timestamp = %s " f"WHERE name = '{name}'") cur.execute(update_query, values) cur.close() con.close() print("Data updated successfully.") values = (53627, int(time.time())) if __name__ == "__main__": update_table("Narva", "cities", values)
python update_table.py
Data updated successfully
In pgAdmin we see that in the cities table the population of Narva and the update time have changed
pgAdmin
If you now want to update another field, you will need a separate function. For each combination of fields, you will need a separate function for updating. This is not very convenient, so you can use updating from a dictionary.
Update from dictionary
Let's assume that we receive data with updated values for some table columns. Let#39;s say that a city population update has arrived and
the visit status has changed. Next time, updates may arrive for other fields. In order not to write a script for each possible option, we will
update from the dictionary.
Let#39;s create a script
update_table.py
,
which will update only those values that are contained in the dictionary.
# update_table.py import time import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from config import config def update_table(name: str, table_name: str, data: dict) -> None: params = config(filename="database.ini", section="my_db") con = psycopg2.connect(**params) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = con.cursor() data.update({"upd_timestamp": int(time.time())}) set_clause = ', '.join([f"{key} = %({key})s" for key in data if key != 'id']) update_statement = f"UPDATE {table_name} SET {set_clause} WHERE name = '{name}'" cur.execute(update_statement, data) cur.close() con.close() print("Data updated successfully.") data = { "population": 619000, "visited": False } if __name__ == "__main__": update_table("Riga", "cities", data)
python update_table.py
Data updated successfully
In pgAdmin we see that in the cities table the population of Riga has changed, as well as the visit status and timestamp.
pgAdmin
Updating from a dictionary using concatenation
A similar result can be achieved by manipulating the SQL query string.
# update_table.py import time import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from config import config def update_table(name: str, table_name: str, data: dict) -> None: params = config(filename="database.ini", section="my_db") con = psycopg2.connect(**params) con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = con.cursor() upd_string_start = f"UPDATE {table_name} SET " upd_string_mid = "" for key in data.keys(): upd_string_mid += f"{key} = %s, " upd_string_end = f"upd_timestamp = %s WHERE name = '{name}'" update_query = upd_string_start + upd_string_mid + upd_string_end list_values = [] for value in data.values(): list_values.append(value) list_values.append(int(time.time())) values = tuple(list_values) cur.execute(update_query, values) cur.close() con.close() print("Data updated successfully.") data = { "population": 619001, "visited": True } if __name__ == "__main__": update_table("Riga", "cities", data)
python update_table.py
Data updated successfully
In pgAdmin we see that in the cities table the population of Riga has changed again, as well as the visit status and timestamp.
pgAdmin
Drop database
To delete the database, we use the setup from the previous example.
We will connect using
config.py
It is usually convenient to use
conn.autocommit = True
Also note that the database name is enclosed in double quotes. This is done in case there are hyphens in it.
import psycopg2 import config def drop_db(db_name: str): print(f"drop_db({db_name})") params = config.credentials() print('Connecting to the PostgreSQL database...') conn = psycopg2.connect(**params) cur = conn.cursor() conn.autocommit = True try: cur.execute(f'DROP DATABASE IF EXISTS "{db_name}"') except (Exception, psycopg2.DatabaseError) as e: print(e) finally: cur.close() conn.close() if __name__ == '__main__': drop_db("obsoleted_db")
Check the connection in the compose setup
Check connection to docker container with Postgres from another container
import os import psycopg2 print(os.environ['POSTGRES_USER']) pg_connection_dict = { 'dbname': os.environ['POSTGRES_DB'], 'user': os.environ['POSTGRES_USER'], 'password': os.environ['POSTGRES_PASSWORD'], 'port': 5432, 'host': "postgres_service" } print("pg_connection_dict", pg_connection_dict) con = psycopg2.connect(**pg_connection_dict) print("con", con) sql = "SELECT * FROM pg_catalog.pg_tables;" sql = """SELECT table_schema || '.' || table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema NOT IN ('pg_catalog', 'information_schema');""" cur = con.cursor() cur.execute(sql) print(cur.fetchall())
| SQLite3 Python | |
| Real example | |
| Errors | |
| Python | |
| Databases |