MENU

【超便利】SQLAlchemy+SQLでデータベース自由自在(応用編)

案件に応じて複数のデータベース(DB)を使い分けることは一般的です。
通常、DBごとに用意された専用ライブラリを使用しますが、SQLAlchemyを利用することで、DB接続やスキーマ情報(テーブル一覧、カラム一覧)の取得など、DB固有の処理を統一化されたインターフェースで実現することが可能です。

しかしながら、SQLAlchemyはテーブルをクラスに、データをオブジェクトとしてマッピングするため、大量のデータに対する一括追加(Insert)、一括更新(Update)、一括削除(Delete)の処理では、パフォーマンスが著しく低下することがあります。

では、統一化されたインターフェースの利用とパフォーマンス低下の抑制を両立する方法はあるのでしょうか?

本記事ではその解決策として、SQLAlchemyの統一化されたインターフェースをDB接続やスキーマ情報の管理に使用し、パフォーマンスに直接関わる処理(大量データの追加、更新、削除、テーブル結合や集計など)では、SQLAlchemyを通じて直接SQLを実行するアプローチを紹介します。

SQLAlchemy の基本的な内容については「【超便利】SQLAlchemyでデータベースを操作しよう(基礎編)」で紹介しているので、必要に応じてご確認ください。

目次

インストール手順

SQLAlchemyを使う場合、SQLAlchemyのインストールに加えて、接続したいDBごとのライブラリをインストールします。

SQLAlchemyは、次のpip コマンドでインストールが可能です。

pip install sqlalchemy

また、PythonプログラムでSQLAlchemyを使用するにあたり、下記のインポートが必要です。

from sqlalchemy import create_engine, text ,MetaData
from sqlalchemy.orm import sessionmaker

下記はDBごとに必要なライブラリのインストール方法と、インポートの記述です。

DB名称インストール方法import すべき内容
Oraclepip install cx_Oracleimport cx_Oracle
MySQLpip install mysqlclient
又は pip install pymysql
import pymysql
PostgreSQLpip install psycopg2
又は pip install psycopg2-binary
import psycopg2
SQL Server、
ODBC
pip install pyodbcimport pyodbc
JDBCpip install JayDeBeApiimport jaydebeapi
SQLite不要import sqlite3

DBへの接続

create_engine()の引数にDB毎のURLを渡すことで、DBに接続できます。
また、create_engine()のecho引数をTrueにすると、SQLAlchemyが内部で生成しているSQLをログ(通常はコンソール)に出力できます。

from sqlalchemy import create_engine

# データベースエンジンを作成(SQLiteへの接続例)
engine = create_engine(r"sqlite:///p:/example.db", echo=True)

# データベースの接続
connection = engine.connect()

create_engine()の引数に渡すURLは、次のルールで記述します。

ドライバ名://ユーザー名:パスワード@ホスト名:ポート/DB名

データベースdriverportdbname に記載すべき内容
Oracleoracle+cx_oracle1521サービス名またはSID
MySQLmysql+mysqlconnector
又は mysql+pymysql
3306データベース名
PostgreSQLpostgresql+psycopg25432データベース名
SQL Servermssql+pyodbc1433ODBCデータソース名(DSN)
SQL Servermssql+pymssql1433データベース名
SQLitesqlite-ファイルのパス
JDBCjdbc+jaydebeapi-JDBC接続URL

SQLの実行

SQLを実行する場合、connectionオブジェクトの execute()を使用します。注意点としては、execute() に渡すSQLは、必ずtextオブジェクトでなければならない点です。

connection.execute(text( SQL文字列 ))

SQLAlchemyは、SQLの不正な書き換え(SQLジャンクションなど)を防ぐため、SQLを記述した文字列を直接execute() で利用できない仕様になっています(詳細は後述)。

例えば、users テーブル作成し、データを追加する一連の処理を実行する場合、次のようになります。
CREATE TABLE だけなら commit は必要ありませんが、INSERT も一緒に行っているため、最後に commit を記述しています。

from sqlalchemy import create_engine, text

# データベースエンジンを作成(SQLiteへの接続例)
engine = create_engine(r"sqlite:///p:/example.db", echo=True)

# データベースの接続
connection = engine.connect()

with engine.connect() as connection:
    # テーブル作成クエリを実行
    connection.execute(text("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            age INTEGER,
            score REAL
        )
        """))

  # データの挿入
    connection.execute(text("""
        INSERT INTO users (name, age, score) VALUES
        ('Alice', 30, 95.0),
        ('Bob', 25, 88.5),
        ('Charlie', 35, 76.0)
        """))
    # 挿入したデータのコミット
    connection.commit()

バインドパラメータについて

バインドパラメータは、SQLインジェクション攻撃を防ぐための手段です。SQL文に直接値を埋め込むのではなく、プレースホルダ(例えば、:パラメータ名)を使用します。そして、execute()メソッドの第二引数で、プレースホルダに対応する値を指定します。これにより、アプリケーションは安全にデータベースとやり取りできます。

バインドパラメータを使って、先ほどの INSERT 文を書き直すと、以下のようになります。

with engine.connect() as connection:
    query = text("""
        INSERT INTO users (name, age, score) VALUES
        (:name1, :age1, :score1),
        (:name2, :age2, :score2),
        (:name3, :age3, :score3)
    """)
    
    params = {
        "name1": "Alice", "age1": 30, "score1": 95.0,
        "name2": "Bob", "age2": 25, "score2": 88.5,
        "name3": "Charlie", "age3": 35, "score3": 76.0
    }

    connection.execute(query, params)
    connection.commit()

データの検索と結果の取得

execute()に検索クエリを指定するだけで、結果を取得できます。

with engine.connect() as connection:
    # データを取得するクエリを実行
    result = connection.execute(text("SELECT * FROM users"))
    for row in result:
        print(row)

execute()の戻り値は、<class 'sqlalchemy.engine.row.Row'>型であり、タプルのリストとして結果が返されます。従って、 for ループで順次値を取得することが可能です。

(1, 'Alice', 30, 95.0)
(2, 'Bob', 25, 88.5)
(3, 'Charlie', 35, 76.0)

トランザクション処理

トランザクション処理は Sesson オブジェクトを使います。

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db", echo=True)
Session = sessionmaker(bind=engine)

# トランザクション処理の例
with Session() as session:
    try:
        # トランザクション開始
        session.execute(text("""
            INSERT INTO users (name, age, score) VALUES ('Dave', 40, 88.0)
        """))
        
        session.execute(text("""
            INSERT INTO users (name, age, score) VALUES ('Eve', 22, 91.5)
        """))
        
        # 操作成功後コミット
        session.commit()
        print("トランザクションがコミットされました")
    except Exception as e:
        # エラー発生時にロールバック
        session.rollback()
        print(f"エラーが発生しました: {e}")

メタ情報(テーブル名一覧、カラム名一覧)の取得

テーブル名やカラム名の一覧を取得するには、MetaData() オブジェクトのreflect() を使います。
これを呼び出すことで、tablesプロパティにテーブルとカラムの情報をまとめて取得できます。

from sqlalchemy import create_engine, MetaData

# SQLite データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db", echo=True)

# メタ情報の取得
metadata = MetaData()
metadata.reflect(bind=engine)

# テーブル一覧を表示
print(metadata.tables)  # テーブル情報の取得

metadata.tablesには、次の形式でメタデータが格納されています。

FacadeDict({'users': Table('users', MetaData(), Column('id', INTEGER(), table=<users>, primary_key=True), Column('name', TEXT(), table=<users>, nullable=False), Column('age', INTEGER(), table=<users>), Column('score', REAL(), table=<users>), schema=None)})

従って、テーブル名の一覧を取得したい場合は tables.keys() を、 カラム名を取得したい場合は tables[テーブル名] を使うことで、結果が取得できます。

# テーブル一覧を表示
print(metadata.tables.keys())  

# 特定のテーブルのメタ情報を表示
users_table = metadata.tables["users"]
for column in users_table.columns:
    print(f"Column: {column.name}, Type: {column.type}")

複数のスキーマが存在する場合の対処方法

下記のコードはデフォルトのスキーマ情報を取得できますが、異なるスキーマが存在する場合、それらの情報は取得できません。

# メタ情報の取得
metadata = MetaData()
metadata.reflect(bind=engine)

デフォルトスキーマ以外に取得したいスキーマがあれば、reflect() の引数に指定することで取得できます。
下記のコードは、"myschema1","myschema2","myschema3"という3つのスキーマ情報を metadata オブジェクトに追加する例です。

# メタ情報の取得と保持
metadata = MetaData()
metadata.reflect(bind=self.engine)

# 情報を取得したいスキーマ一覧
Schemas = [ "myschema1","myschema2","myschema3" ]

# 各スキーマのテーブルをリフレクション
for schema in Schemas:
    metadata.reflect(bind=self.engine, schema=schema)

# スキーマを指定して、users テーブルのカラム一覧を取得する
users_table = metadata.tables["myschema1.users"]

DataFrameを使った書き込み・読み込み

Pandasを使うことで、検索した結果をDataFrameに読み込んだり、一部を修正して保存することが可能です。
注意点としては、データを追加するか、あるいは新しいデータでテーブルを置き換えるかの2択になります。
メモリに入りきらない量のデータを扱う場合は、この方法で扱うことはできません。素直にSQL文を実行しましょう。

検索結果をDataFrameで受け取る

pd.read_sql() を使うことで、簡単にDataFrameにデータを格納できます。read_sql() の引数に指定するクエリは、普通の文字列で構いません。

import pandas as pd
from sqlalchemy import create_engine

# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db", echo=True)

# DataFrameにデータを読み込む
query = "SELECT * FROM users"
df = pd.read_sql(query, engine)

DataFrameの中身を書き換えて保存する

df.loc() などで特定のデータを変更後、to_sql() でDBに保存します。この時、is_exists に"append" を指定するとデータが追加されてしまうため、"replace"を指定する必要があります。
"replace"を指定すると、元のテーブルが丸ごと今回指定したDataFrameで入れ替わってしまう点にご注意ください。

import pandas as pd
from sqlalchemy import create_engine

# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db")

# DataFrameにデータを読み込む
query = "SELECT * FROM users"
df = pd.read_sql(query, engine)

# 条件に一致する行を変更(例: ageが30のユーザーのscoreを+10)
df.loc[df["age"] == 30, "score"] += 10

# 編集後のDataFrameをデータベースに書き込む
df.to_sql("users", con=engine, if_exists="replace", index=False)

print("条件に一致するデータが更新されました!")

to_sql()は既存のテーブルを新しいDataFrameの中身で置き換えてしまいます。
read_sql() で読み込む際のSQLに where 区で抽出条件を指定していた場合、抽出されなかったデータはto_sql() で削除されるため、非常に危険です。
従って、to_sql() を使用する際は、全件メモリに読み込めるサイズのテーブルでしか使えません。

SQLAlchemyを使ったSQL文の生成

SQL文を毎回記述するのは面倒な作業ですが、SQLAlchemyを使ってSQL文を生成することで、ある程度の手間を省くことができます。

SQL文の生成には、 MetaDataクラスを使います。これは接続したテーブルのメタ情報(テーブル、カラムなど)を保持するクラスであり、これを使ってSQLの自動生成を行います。

データの追加(INSERT)

INSERT文の生成は、insert()を使います。この時、引数はテーブルオブジェクト(metadata.tables['users']の戻り値)を指定しなければなりません。

insert(テーブルオブジェクト).values(カラム1=値,カラム2=値,・・・)

from sqlalchemy import create_engine, MetaData, Table, insert, update, delete 

# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db")

# 接続したDBのメタ情報を取得
metadata = MetaData()
metadata.reflect(bind=engine)

# 既存の 'users' テーブルに関するメタ情報を取得
users_table = metadata.tables['users']

# INSERT文を生成
statement = insert(users_table).values(name="Alice", age=30)
# INSERT文を生成して表示
print(str(statement))  

insert() の戻り値をそのまま execute() で実行することは可能ですが、今回はstr()を使って文字列出力しているため、値が入る部分はクエリパラメータで置き換わっています。
従って、これをそのままテンプレートとして使うことが可能です。

INSERT INTO users (name, age) VALUES (:name, :age)

上記のテンプレートを使って INSERTを実行するサンプルは次の様になります。

with engine.connect() as connection:
    # SQLクエリ(バインドパラメータを利用)
    query = text("INSERT INTO users (name, age) VALUES (:name, :age)")

    # 実行時にパラメータを渡す
    connection.execute(query, {"name": "Alice", "age": 30})
    
    # コミット(SQLiteの `execute()` では不要だが、他のDBでは必要)
    connection.commit()

データの追加(UPDATE)

UPDATE文の生成は、update()を使います。

insert(テーブルオブジェクト).where(条件式).values(カラム1=値,カラム2=値,・・・)

where() で指定する条件式にカラム名を指定する場合、テーブルオブジェクトに含まれているカラム名を使います。テーブルオブジェクトのカラム名にアクセスするためには、 c (カラム)オブジェクトを経由する必要があります。

テーブルオブジェクト.c.カラム名

from sqlalchemy import create_engine, MetaData, Table, insert, update, delete 

# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db")

# 接続したDBのメタ情報を取得
metadata = MetaData()
metadata.reflect(bind=engine)

# 既存の 'users' テーブルに関するメタ情報を取得
users_table = metadata.tables['users']

# UPDATE文を生成
statement = update(users_table).where(users_table.c.id == 1).values(name="Bob")
# UPDATE文を生成して表示
print(str(statement))  

UPDATE users SET name=:name WHERE users.id = :id_1

データの削除(DELETE)

DELETE文の生成は、delete()を使います。

update(テーブルオブジェクト).where(条件式)

from sqlalchemy import create_engine, MetaData, Table, insert, update, delete 

# データベースエンジンを作成
engine = create_engine(r"sqlite:///p:/example.db")

# 接続したDBのメタ情報を取得
metadata = MetaData()
metadata.reflect(bind=engine)

# statement  'users' テーブルに関するメタ情報を取得
users_table = metadata.tables['users']

# DELETE文を生成
statement = delete(users_table).where(users_table.c.id == 1)
# DELETE文を生成して表示
print(str(statement))  

便利な自作クラス

メソッド名(引数)説明
__init__(db_type, server, db_name, user_id, password, port,schemas)データベースに接続し、エンジンとメタ情報を構築する。
create_connection_url(db_type, server, db_name, user_id, password, port)指定されたデータベースタイプに基づいて接続URLを構築する。
execute(sql_query)任意のSQLを実行し、必要に応じて結果を返す。
get_tables()データベース内のテーブル名一覧を取得する。
get_columns(table_name)指定したテーブルのカラム情報を取得する。
fetch_to_dataframe(sql_query)SQLの結果をpandas.DataFrameに格納して返す。
save_dataframe_to_table(df, table_name, mode)pandas.DataFrame の内容を指定したテーブルに保存する。
execute_transaction(sql_list)トランザクションを用いて複数のSQLを一括実行する。
bulk_insert_from_file(file_name, table_name, batch_size)CSVファイルをバッチ単位で読み込み、データベースに挿入する。

引数も入れたバージョンにしてみた。これでOK?

import pandas as pd
from sqlalchemy import create_engine, MetaData, text

class DBAccess:
    def __init__(self, db_type, server=None, db_name=None, user_id=None, password=None, port=None, schemas = []):
        """
        コンストラクタで指定された情報を基にデータベースに接続し、メタ情報を取得します。
        """
        self.connection_url = self.__create_connection_url(db_type, server, db_name, user_id, password, port)
        
        # データベースエンジンの作成
        self.engine = create_engine(self.connection_url, echo=False)
        
        # メタ情報の取得と保持
        self.metadata = MetaData()
        self.metadata.reflect(bind=self.engine)

        # 各スキーマのテーブルをリフレクション
        for schema in schemas:
            self.metadata.reflect(bind=self.engine, schema=schema)

    def create_connection_url(self, db_type, server, db_name, user_id, password, port):
        """
        接続URLをデータベースタイプに基づいて構築する。
        
        Args:
            db_type (str): データベースの種類。
            server (str): サーバー名。
            db_name (str): データベース名。
            user_id (str): ユーザーID。
            password (str): パスワード。
            port (int): ポート番号。
        
        Returns:
            str: 接続URL。
        Raises:
            ValueError: サポートされていないデータベースタイプの場合。
        """
        # データベースごとの固定部分とデフォルトポートを辞書で管理
        db_settings = {
            "sqlite": ("sqlite:///", None),
            "postgresql": ("postgresql+psycopg2://", 5432),
            "sqlserver": ("mssql+pymssql://", 1433),
            "oracle": ("oracle+cx_oracle://", 1521),
            "mysql": ("mysql+pymysql://", 3306),
        }

        # サポートされていないDBタイプはエラー
        if db_type not in db_settings:
            raise ValueError(f"サポートされていないデータベースタイプ: {db_type}")

        prefix, default_port = db_settings[db_type]
        port = port or default_port  # ポートが指定されていない場合はデフォルトポートを使用

        # SQLiteの場合は特別に処理(ファイルパスのみ指定)
        if db_type == "sqlite":
            if not db_name:
                raise ValueError("SQLiteの場合、db_name(データベースファイルパス)を指定してください。")
            return f"{prefix}{db_name}"

        # その他の場合は通常の接続URLを構築
        return f"{prefix}{user_id}:{password}@{server}:{port}/{db_name}"

    def execute(self, sql_query):
        """
        任意のSQLを実行するメソッド。SQL文字列をtext()に変換して実行する。
        
        Args:
            sql_query (str): 実行するSQL文。
        
        Returns:
            list or None: 結果を返すクエリの場合はデータのリスト、それ以外はNone。
        """
        with self.engine.connect() as connection:
            result = connection.execute(text(sql_query))
            if result.returns_rows:  # 結果を返すクエリかどうかを判定
                return result.fetchall()  # 行をすべて取得
            else:
                return None

    def get_tables(self):
        """
        データベース内のテーブル名一覧を取得する。
        
        Returns:
            list: テーブル名のリスト。
        """
        return list(self.metadata.tables.keys())

    def get_columns(self, table_name):
        """
        指定されたテーブルのカラム情報を取得する。
        
        Args:
            table_name (str): 対象となるテーブル名。
        
        Returns:
            list: カラム名のリスト。
        Raises:
            ValueError: 指定されたテーブル名が存在しない場合。
        """
        if table_name not in self.metadata.tables:
            raise ValueError(f"指定されたテーブル名 '{table_name}' は存在しません。")

        table = self.metadata.tables[table_name]
        return [column.name for column in table.columns]

    def fetch_to_dataframe(self, sql_query):
        """
        SQLの結果をDataFrameに格納して返す。
        
        Args:
            sql_query (str): 実行するSQL文。
        
        Returns:
            pandas.DataFrame: SQL結果を格納したDataFrame。
        """
        with self.engine.connect() as connection:
            result = connection.execute(text(sql_query))
            df = pd.DataFrame(result.fetchall(), columns=result.keys())  # 結果をDataFrameに変換
        return df

    def save_dataframe_to_table(self, df, table_name, mode):
        """
        DataFrameの内容を指定したテーブルに保存する。
        
        Args:
            df (pandas.DataFrame): 保存対象のDataFrame。
            table_name (str): 保存先のテーブル名。
            mode (str): 'append' または 'replace' を指定する保存モード。
        
        Raises:
            ValueError: mode引数が正しくない場合。
        """
        if mode not in ['append', 'replace']:
            raise ValueError("mode引数は 'append' または 'replace' のいずれかで指定してください。")
        
        df.to_sql(name=table_name, con=self.engine, if_exists=mode, index=False)  # DataFrameを書き込み
        print(f"DataFrameの内容をテーブル '{table_name}' に{mode}モードで保存しました。")

    def execute_transaction(self, sql_list):
        """
        トランザクションをかけて複数のSQL文を一括で実行する。
        
        Args:
            sql_list (list): 実行するSQL文のリスト。
        """
        with self.engine.connect() as connection:
            transaction = connection.begin()  # トランザクション開始
            try:
                for sql in sql_list:
                    connection.execute(text(sql))
                transaction.commit()  # 成功時にコミット
                print("トランザクションが正常にコミットされました。")
            except Exception as e:
                transaction.rollback()  # エラー時にロールバック
                print(f"トランザクションがロールバックされました。エラー: {e}")

    def bulk_insert_from_file(self, file_name, table_name, batch_size):
        """
        ファイルを処理件数づつ読み込んで、マルチインサート文を作成してインサートする。
        
        Args:
            file_name (str): 処理するCSVファイルのパス。
            table_name (str): インサート先のテーブル名。
            batch_size (int): 一度に処理する行数。
        """
        data = pd.read_csv(file_name)  # ファイル読み込み

        with self.engine.connect() as connection:
            for i in range(0, len(data), batch_size):
                batch = data.iloc[i:i+batch_size]  # バッチサイズごとにデータを分割
                batch.to_sql(name=table_name, con=self.engine, if_exists='append', index=False)  # テーブルに追記
                print(f"{i + len(batch)} 件をインサートしました。")

まとめ

本記事では、SQLAlchemyを利用することで、DB固有の記述(DBへの接続、スキーマ情報の取得など)を統一化するとともに、SQLAlchemyの利用時に懸念される処理速度低下を解決する方法として、SQL直接実行機能(executeメソッドの利用)を紹介しました。

また、SQLAlchemyが提供する MetaData() クラスを使って、INSERT/UPDATE/DELETE文のテンプレートを自動生成する方法についても紹介しました。

この方法を適用することで、案件ごとにデータベースが異なる場合でも、それぞれの差異を吸収し、コードを共通化できます。ぜひご活用ください。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

コメント

コメントする

目次