案件に応じて複数のデータベース(DB)を使い分けることは一般的です。
通常、DBごとに用意された専用ライブラリを使用しますが、SQLAlchemyを利用することで、DB接続やスキーマ情報(テーブル一覧、カラム一覧)の取得など、DB固有の処理を統一化されたインターフェースで実現することが可能です。
しかしながら、SQLAlchemyはテーブルをクラスに、データをオブジェクトとしてマッピングするため、大量のデータに対する一括追加(Insert)、一括更新(Update)、一括削除(Delete)の処理では、パフォーマンスが著しく低下することがあります。
では、統一化されたインターフェースの利用とパフォーマンス低下の抑制を両立する方法はあるのでしょうか?
本記事ではその解決策として、SQLAlchemyの統一化されたインターフェースをDB接続やスキーマ情報の管理に使用し、パフォーマンスに直接関わる処理(大量データの追加、更新、削除、テーブル結合や集計など)では、SQLAlchemyを通じて直接SQLを実行するアプローチを紹介します。
SQLAlchemy の基本的な内容については「【超便利】SQLAlchemyでデータベースを操作しよう(基礎編)」で紹介しているので、必要に応じてご確認ください。
インストール手順
SQLAlchemyを使う場合、SQLAlchemyのインストールに加えて、接続したいDBごとのライブラリをインストールします。
SQLAlchemyは、次のpip コマンドでインストールが可能です。
pip install sqlalchemy
また、PythonプログラムでSQLAlchemyを使用するにあたり、下記のインポートが必要です。
from sqlalchemy import create_engine, text ,MetaData
from sqlalchemy.orm import sessionmaker
下記はDBごとに必要なライブラリのインストール方法と、インポートの記述です。
DB名称 | インストール方法 | import すべき内容 |
---|---|---|
Oracle | pip install cx_Oracle | import cx_Oracle |
MySQL | pip install mysqlclient 又は pip install pymysql | import pymysql |
PostgreSQL | pip install psycopg2 又は pip install psycopg2-binary | import psycopg2 |
SQL Server、 ODBC | pip install pyodbc | import pyodbc |
JDBC | pip install JayDeBeApi | import jaydebeapi |
SQLite | 不要 | import sqlite3 |
DBへの接続
create_engine()
の引数にDB毎のURLを渡すことで、DBに接続できます。
また、create_engine()
のecho引数をTrueにすると、SQLAlchemyが内部で生成しているSQLをログ(通常はコンソール)に出力できます。
from sqlalchemy import create_engine
# データベースエンジンを作成(SQLiteへの接続例)
engine = create_engine(r"sqlite:///p:/example.db", echo=True)
# データベースの接続
connection = engine.connect()
create_engine()
の引数に渡すURLは、次のルールで記述します。
ドライバ名://ユーザー名:パスワード@ホスト名:ポート/DB名
データベース | driver | port | dbname に記載すべき内容 |
---|---|---|---|
Oracle | oracle+cx_oracle | 1521 | サービス名またはSID |
MySQL | mysql+mysqlconnector 又は mysql+pymysql | 3306 | データベース名 |
PostgreSQL | postgresql+psycopg2 | 5432 | データベース名 |
SQL Server | mssql+pyodbc | 1433 | ODBCデータソース名(DSN) |
SQL Server | mssql+pymssql | 1433 | データベース名 |
SQLite | sqlite | - | ファイルのパス |
JDBC | jdbc+jaydebeapi | - | JDBC接続URL |
SQLの実行
SQLを実行する場合、connection
オブジェクトの execute()
を使用します。注意点としては、execute()
に渡すSQLは、必ずtextオブジェクトでなければならない点です。
connection.execute(text( SQL文字列 ))
SQLAlchemyは、SQLの不正な書き換え(SQLジャンクションなど)を防ぐため、SQLを記述した文字列を直接execute()
で利用できない仕様になっています(詳細は後述)。
例えば、users テーブル作成し、データを追加する一連の処理を実行する場合、次のようになります。
CREATE TABLE だけなら commit は必要ありませんが、INSERT も一緒に行っているため、最後に commit を記述しています。
from sqlalchemy import create_engine, text
# データベースエンジンを作成(SQLiteへの接続例)
engine = create_engine(r"sqlite:///p:/example.db", echo=True)
# データベースの接続
connection = engine.connect()
with engine.connect() as connection:
# テーブル作成クエリを実行
connection.execute(text("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
score REAL
)
"""))
# データの挿入
connection.execute(text("""
INSERT INTO users (name, age, score) VALUES
('Alice', 30, 95.0),
('Bob', 25, 88.5),
('Charlie', 35, 76.0)
"""))
# 挿入したデータのコミット
connection.commit()
バインドパラメータについて
バインドパラメータは、SQLインジェクション攻撃を防ぐための手段です。SQL文に直接値を埋め込むのではなく、プレースホルダ(例えば、:パラメータ名
)を使用します。そして、execute()
メソッドの第二引数で、プレースホルダに対応する値を指定します。これにより、アプリケーションは安全にデータベースとやり取りできます。
バインドパラメータを使って、先ほどの INSERT 文を書き直すと、以下のようになります。
with engine.connect() as connection:
query = text("""
INSERT INTO users (name, age, score) VALUES
(:name1, :age1, :score1),
(:name2, :age2, :score2),
(:name3, :age3, :score3)
""")
params = {
"name1": "Alice", "age1": 30, "score1": 95.0,
"name2": "Bob", "age2": 25, "score2": 88.5,
"name3": "Charlie", "age3": 35, "score3": 76.0
}
connection.execute(query, params)
connection.commit()
データの検索と結果の取得
execute()
に検索クエリを指定するだけで、結果を取得できます。
with engine.connect() as connection:
# データを取得するクエリを実行
result = connection.execute(text("SELECT * FROM users"))
for row in result:
print(row)
execute()
の戻り値は、<class 'sqlalchemy.engine.row.Row'>型であり、タプルのリストとして結果が返されます。従って、 for ループで順次値を取得することが可能です。
(1, 'Alice', 30, 95.0)
(2, 'Bob', 25, 88.5)
(3, 'Charlie', 35, 76.0)
トランザクション処理
トランザクション処理は Sesson オブジェクトを使います。
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db", echo=True)
Session = sessionmaker(bind=engine)
# トランザクション処理の例
with Session() as session:
try:
# トランザクション開始
session.execute(text("""
INSERT INTO users (name, age, score) VALUES ('Dave', 40, 88.0)
"""))
session.execute(text("""
INSERT INTO users (name, age, score) VALUES ('Eve', 22, 91.5)
"""))
# 操作成功後コミット
session.commit()
print("トランザクションがコミットされました")
except Exception as e:
# エラー発生時にロールバック
session.rollback()
print(f"エラーが発生しました: {e}")
メタ情報(テーブル名一覧、カラム名一覧)の取得
テーブル名やカラム名の一覧を取得するには、MetaData()
オブジェクトのreflect()
を使います。
これを呼び出すことで、tablesプロパティにテーブルとカラムの情報をまとめて取得できます。
from sqlalchemy import create_engine, MetaData
# SQLite データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db", echo=True)
# メタ情報の取得
metadata = MetaData()
metadata.reflect(bind=engine)
# テーブル一覧を表示
print(metadata.tables) # テーブル情報の取得
metadata.tables
には、次の形式でメタデータが格納されています。
FacadeDict({'users': Table('users', MetaData(), Column('id', INTEGER(), table=<users>, primary_key=True), Column('name', TEXT(), table=<users>, nullable=False), Column('age', INTEGER(), table=<users>), Column('score', REAL(), table=<users>), schema=None)})
従って、テーブル名の一覧を取得したい場合は tables.keys() を、 カラム名を取得したい場合は tables[テーブル名] を使うことで、結果が取得できます。
# テーブル一覧を表示
print(metadata.tables.keys())
# 特定のテーブルのメタ情報を表示
users_table = metadata.tables["users"]
for column in users_table.columns:
print(f"Column: {column.name}, Type: {column.type}")
複数のスキーマが存在する場合の対処方法
下記のコードはデフォルトのスキーマ情報を取得できますが、異なるスキーマが存在する場合、それらの情報は取得できません。
# メタ情報の取得
metadata = MetaData()
metadata.reflect(bind=engine)
デフォルトスキーマ以外に取得したいスキーマがあれば、reflect()
の引数に指定することで取得できます。
下記のコードは、"myschema1","myschema2","myschema3"という3つのスキーマ情報を metadata
オブジェクトに追加する例です。
# メタ情報の取得と保持
metadata = MetaData()
metadata.reflect(bind=self.engine)
# 情報を取得したいスキーマ一覧
Schemas = [ "myschema1","myschema2","myschema3" ]
# 各スキーマのテーブルをリフレクション
for schema in Schemas:
metadata.reflect(bind=self.engine, schema=schema)
# スキーマを指定して、users テーブルのカラム一覧を取得する
users_table = metadata.tables["myschema1.users"]
DataFrameを使った書き込み・読み込み
Pandasを使うことで、検索した結果をDataFrameに読み込んだり、一部を修正して保存することが可能です。
注意点としては、データを追加するか、あるいは新しいデータでテーブルを置き換えるかの2択になります。
メモリに入りきらない量のデータを扱う場合は、この方法で扱うことはできません。素直にSQL文を実行しましょう。
検索結果をDataFrameで受け取る
pd.read_sql() を使うことで、簡単にDataFrameにデータを格納できます。read_sql() の引数に指定するクエリは、普通の文字列で構いません。
import pandas as pd
from sqlalchemy import create_engine
# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db", echo=True)
# DataFrameにデータを読み込む
query = "SELECT * FROM users"
df = pd.read_sql(query, engine)
DataFrameの中身を書き換えて保存する
df.loc()
などで特定のデータを変更後、to_sql()
でDBに保存します。この時、is_exists
に"append" を指定するとデータが追加されてしまうため、"replace"を指定する必要があります。
"replace"を指定すると、元のテーブルが丸ごと今回指定したDataFrameで入れ替わってしまう点にご注意ください。
import pandas as pd
from sqlalchemy import create_engine
# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db")
# DataFrameにデータを読み込む
query = "SELECT * FROM users"
df = pd.read_sql(query, engine)
# 条件に一致する行を変更(例: ageが30のユーザーのscoreを+10)
df.loc[df["age"] == 30, "score"] += 10
# 編集後のDataFrameをデータベースに書き込む
df.to_sql("users", con=engine, if_exists="replace", index=False)
print("条件に一致するデータが更新されました!")
to_sql()
は既存のテーブルを新しいDataFrameの中身で置き換えてしまいます。read_sql()
で読み込む際のSQLに where 区で抽出条件を指定していた場合、抽出されなかったデータはto_sql()
で削除されるため、非常に危険です。
従って、to_sql()
を使用する際は、全件メモリに読み込めるサイズのテーブルでしか使えません。
SQLAlchemyを使ったSQL文の生成
SQL文を毎回記述するのは面倒な作業ですが、SQLAlchemyを使ってSQL文を生成することで、ある程度の手間を省くことができます。
SQL文の生成には、 MetaData
クラスを使います。これは接続したテーブルのメタ情報(テーブル、カラムなど)を保持するクラスであり、これを使ってSQLの自動生成を行います。
データの追加(INSERT)
INSERT文の生成は、insert()
を使います。この時、引数はテーブルオブジェクト(metadata.tables['users']の戻り値)を指定しなければなりません。
insert(テーブルオブジェクト).values(カラム1=値,カラム2=値,・・・)
from sqlalchemy import create_engine, MetaData, Table, insert, update, delete
# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db")
# 接続したDBのメタ情報を取得
metadata = MetaData()
metadata.reflect(bind=engine)
# 既存の 'users' テーブルに関するメタ情報を取得
users_table = metadata.tables['users']
# INSERT文を生成
statement = insert(users_table).values(name="Alice", age=30)
# INSERT文を生成して表示
print(str(statement))
insert()
の戻り値をそのまま execute()
で実行することは可能ですが、今回はstr()
を使って文字列出力しているため、値が入る部分はクエリパラメータで置き換わっています。
従って、これをそのままテンプレートとして使うことが可能です。
INSERT INTO users (name, age) VALUES (:name, :age)
上記のテンプレートを使って INSERTを実行するサンプルは次の様になります。
with engine.connect() as connection:
# SQLクエリ(バインドパラメータを利用)
query = text("INSERT INTO users (name, age) VALUES (:name, :age)")
# 実行時にパラメータを渡す
connection.execute(query, {"name": "Alice", "age": 30})
# コミット(SQLiteの `execute()` では不要だが、他のDBでは必要)
connection.commit()
データの追加(UPDATE)
UPDATE文の生成は、update()
を使います。
insert(テーブルオブジェクト).where(条件式).values(カラム1=値,カラム2=値,・・・)
where() で指定する条件式にカラム名を指定する場合、テーブルオブジェクトに含まれているカラム名を使います。テーブルオブジェクトのカラム名にアクセスするためには、 c (カラム)オブジェクトを経由する必要があります。
テーブルオブジェクト.c.カラム名
from sqlalchemy import create_engine, MetaData, Table, insert, update, delete
# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db")
# 接続したDBのメタ情報を取得
metadata = MetaData()
metadata.reflect(bind=engine)
# 既存の 'users' テーブルに関するメタ情報を取得
users_table = metadata.tables['users']
# UPDATE文を生成
statement = update(users_table).where(users_table.c.id == 1).values(name="Bob")
# UPDATE文を生成して表示
print(str(statement))
UPDATE users SET name=:name WHERE users.id = :id_1
データの削除(DELETE)
DELETE文の生成は、delete()
を使います。
update(テーブルオブジェクト).where(条件式)
from sqlalchemy import create_engine, MetaData, Table, insert, update, delete
# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db")
# 接続したDBのメタ情報を取得
metadata = MetaData()
metadata.reflect(bind=engine)
# statement 'users' テーブルに関するメタ情報を取得
users_table = metadata.tables['users']
# DELETE文を生成
statement = delete(users_table).where(users_table.c.id == 1)
# DELETE文を生成して表示
print(str(statement))
便利な自作クラス
メソッド名(引数) | 説明 |
---|---|
__init__(db_type, server, db_name, user_id, password, port,schemas) | データベースに接続し、エンジンとメタ情報を構築する。 |
create_connection_url(db_type, server, db_name, user_id, password, port) | 指定されたデータベースタイプに基づいて接続URLを構築する。 |
execute(sql_query) | 任意のSQLを実行し、必要に応じて結果を返す。 |
get_tables() | データベース内のテーブル名一覧を取得する。 |
get_columns(table_name) | 指定したテーブルのカラム情報を取得する。 |
fetch_to_dataframe(sql_query) | SQLの結果をpandas.DataFrame に格納して返す。 |
save_dataframe_to_table(df, table_name, mode) | pandas.DataFrame の内容を指定したテーブルに保存する。 |
execute_transaction(sql_list) | トランザクションを用いて複数のSQLを一括実行する。 |
bulk_insert_from_file(file_name, table_name, batch_size) | CSVファイルをバッチ単位で読み込み、データベースに挿入する。 |
引数も入れたバージョンにしてみた。これでOK?
import pandas as pd
from sqlalchemy import create_engine, MetaData, text
class DBAccess:
def __init__(self, db_type, server=None, db_name=None, user_id=None, password=None, port=None, schemas = []):
"""
コンストラクタで指定された情報を基にデータベースに接続し、メタ情報を取得します。
"""
self.connection_url = self.__create_connection_url(db_type, server, db_name, user_id, password, port)
# データベースエンジンの作成
self.engine = create_engine(self.connection_url, echo=False)
# メタ情報の取得と保持
self.metadata = MetaData()
self.metadata.reflect(bind=self.engine)
# 各スキーマのテーブルをリフレクション
for schema in schemas:
self.metadata.reflect(bind=self.engine, schema=schema)
def create_connection_url(self, db_type, server, db_name, user_id, password, port):
"""
接続URLをデータベースタイプに基づいて構築する。
Args:
db_type (str): データベースの種類。
server (str): サーバー名。
db_name (str): データベース名。
user_id (str): ユーザーID。
password (str): パスワード。
port (int): ポート番号。
Returns:
str: 接続URL。
Raises:
ValueError: サポートされていないデータベースタイプの場合。
"""
# データベースごとの固定部分とデフォルトポートを辞書で管理
db_settings = {
"sqlite": ("sqlite:///", None),
"postgresql": ("postgresql+psycopg2://", 5432),
"sqlserver": ("mssql+pymssql://", 1433),
"oracle": ("oracle+cx_oracle://", 1521),
"mysql": ("mysql+pymysql://", 3306),
}
# サポートされていないDBタイプはエラー
if db_type not in db_settings:
raise ValueError(f"サポートされていないデータベースタイプ: {db_type}")
prefix, default_port = db_settings[db_type]
port = port or default_port # ポートが指定されていない場合はデフォルトポートを使用
# SQLiteの場合は特別に処理(ファイルパスのみ指定)
if db_type == "sqlite":
if not db_name:
raise ValueError("SQLiteの場合、db_name(データベースファイルパス)を指定してください。")
return f"{prefix}{db_name}"
# その他の場合は通常の接続URLを構築
return f"{prefix}{user_id}:{password}@{server}:{port}/{db_name}"
def execute(self, sql_query):
"""
任意のSQLを実行するメソッド。SQL文字列をtext()に変換して実行する。
Args:
sql_query (str): 実行するSQL文。
Returns:
list or None: 結果を返すクエリの場合はデータのリスト、それ以外はNone。
"""
with self.engine.connect() as connection:
result = connection.execute(text(sql_query))
if result.returns_rows: # 結果を返すクエリかどうかを判定
return result.fetchall() # 行をすべて取得
else:
return None
def get_tables(self):
"""
データベース内のテーブル名一覧を取得する。
Returns:
list: テーブル名のリスト。
"""
return list(self.metadata.tables.keys())
def get_columns(self, table_name):
"""
指定されたテーブルのカラム情報を取得する。
Args:
table_name (str): 対象となるテーブル名。
Returns:
list: カラム名のリスト。
Raises:
ValueError: 指定されたテーブル名が存在しない場合。
"""
if table_name not in self.metadata.tables:
raise ValueError(f"指定されたテーブル名 '{table_name}' は存在しません。")
table = self.metadata.tables[table_name]
return [column.name for column in table.columns]
def fetch_to_dataframe(self, sql_query):
"""
SQLの結果をDataFrameに格納して返す。
Args:
sql_query (str): 実行するSQL文。
Returns:
pandas.DataFrame: SQL結果を格納したDataFrame。
"""
with self.engine.connect() as connection:
result = connection.execute(text(sql_query))
df = pd.DataFrame(result.fetchall(), columns=result.keys()) # 結果をDataFrameに変換
return df
def save_dataframe_to_table(self, df, table_name, mode):
"""
DataFrameの内容を指定したテーブルに保存する。
Args:
df (pandas.DataFrame): 保存対象のDataFrame。
table_name (str): 保存先のテーブル名。
mode (str): 'append' または 'replace' を指定する保存モード。
Raises:
ValueError: mode引数が正しくない場合。
"""
if mode not in ['append', 'replace']:
raise ValueError("mode引数は 'append' または 'replace' のいずれかで指定してください。")
df.to_sql(name=table_name, con=self.engine, if_exists=mode, index=False) # DataFrameを書き込み
print(f"DataFrameの内容をテーブル '{table_name}' に{mode}モードで保存しました。")
def execute_transaction(self, sql_list):
"""
トランザクションをかけて複数のSQL文を一括で実行する。
Args:
sql_list (list): 実行するSQL文のリスト。
"""
with self.engine.connect() as connection:
transaction = connection.begin() # トランザクション開始
try:
for sql in sql_list:
connection.execute(text(sql))
transaction.commit() # 成功時にコミット
print("トランザクションが正常にコミットされました。")
except Exception as e:
transaction.rollback() # エラー時にロールバック
print(f"トランザクションがロールバックされました。エラー: {e}")
def bulk_insert_from_file(self, file_name, table_name, batch_size):
"""
ファイルを処理件数づつ読み込んで、マルチインサート文を作成してインサートする。
Args:
file_name (str): 処理するCSVファイルのパス。
table_name (str): インサート先のテーブル名。
batch_size (int): 一度に処理する行数。
"""
data = pd.read_csv(file_name) # ファイル読み込み
with self.engine.connect() as connection:
for i in range(0, len(data), batch_size):
batch = data.iloc[i:i+batch_size] # バッチサイズごとにデータを分割
batch.to_sql(name=table_name, con=self.engine, if_exists='append', index=False) # テーブルに追記
print(f"{i + len(batch)} 件をインサートしました。")
まとめ
本記事では、SQLAlchemyを利用することで、DB固有の記述(DBへの接続、スキーマ情報の取得など)を統一化するとともに、SQLAlchemyの利用時に懸念される処理速度低下を解決する方法として、SQL直接実行機能(executeメソッドの利用)を紹介しました。
また、SQLAlchemyが提供する MetaData() クラスを使って、INSERT/UPDATE/DELETE文のテンプレートを自動生成する方法についても紹介しました。
この方法を適用することで、案件ごとにデータベースが異なる場合でも、それぞれの差異を吸収し、コードを共通化できます。ぜひご活用ください。
コメント