エンジニアの頭の中

トレードbotエンジニアが書く技術と投資のブログ

SQLAlchemyでMySQLのINSERT .. DUPLICATE ON KEY UPDATE (UPSERT)を使用する

f:id:mitsu3204:20180815160850j:plain

最近、PythonMySQLデータベースへ接続するコードを書いていたのですが、直接SQLを書くのが嫌になりSQLAlchemyを使用することにしました。

既存のMySQLへの接続処理をSQLAlchemyを使用するコードへと置き換えていたのですが、その際に、MySQLへのレコード登録時にキー重複が発生した場合は、該当レコードを更新する「UPSERT」をする必要がありました。

元々は、MySQLduplicate on key update機能をSQLを書いて使用していたのですが、SQLを書かずにSQLAlchemyで実現する方法を調べてみたので、メモとして残しておきます。

UPSERTはSQLAlchemy 1.2系でサポートされている

SQLAlchemyでは、1.2系からUPSERT (ON DUPLICATE KEY UPDATE)をサポートしているようです。

1.2系は、現時点(2017/11/5)では、プレリリース版です。私が使用しているSQLAlchemyは、1.1系でしたが、この機能を使用したかったので1.2系へ更新してみました。

SQLAlchemy 1.2系をインストールする

pipで検索します。

$ pip search SQLAlchemy

1.2.0b3(ベータ版)が出てきました。

SQLAlchemy (1.2.0b3) - Database Abstraction Library
INSTALLED: 1.1.15
LATEST: 1.2.0b3

SQLAlchemyを1.2.0b3バージョンに更新します。pipでベータ版へアップデートする場合は、通常のアップデート時とは異なり、--preオプションを付与して、install -Uを実行します。

$ pip install SQLAlchemy -U --pre
Collecting SQLAlchemy
Downloading SQLAlchemy-1.2.0b3.tar.gz (5.4MB)
100% |████████████████████████████████| 5.4MB 331kB/s
Installing collected packages: SQLAlchemy
Found existing installation: SQLAlchemy 1.1.15
Uninstalling SQLAlchemy-1.1.15:
Successfully uninstalled SQLAlchemy-1.1.15
Running setup.py install for SQLAlchemy ... done
Successfully installed SQLAlchemy-1.2.0b3

SQLAlchemyのHPのサンプルコードを読む

公式ドキュメントの、Dialect Improvements and Changes - MySQLの項には、UPSERTを実行するコードとして、以下のサンプルが掲載されています。

from sqlalchemy.dialects.mysql import insert

insert_stmt = insert(my_table). \
values(id='some_id', data='some data to insert')

on_conflict_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data,
status='U'
)

conn.execute(on_conflict_stmt)

insert(my_table)関数の引数に指定されているmy_tableは、sqlalchemy.Table型のオブジェクトです。

肝心なのは、insert_stmt.on_duplicate_key_updateの部分です。

on_duplicate_key_updateの引数として、datastatusというキーの引数が指定されていますが、これらは、レコードのキー重複が発生した際に、「どのカラムをどんな値で更新するか」を指定するためのものです。

上記のサンプルの場合だと、以下のように解釈できます。

  • dataという名前の列を、insert文のdata列に指定した値で更新する。(つまりvalues関数の引数であるdataに指定された'some data to insert'で更新せよという意味)
  • statusという名前の列を、'U'という値で更新する。

というものです。

datastatusという引数は、on_duplicate_key_update関数特有の引数ではなく、更新対象とするカラムの名前と値を、key=valueの形式で表している点に注意してください。

実際にUPSERTを書いてみる

サンプルに従い、私が使っていたテーブルに合わせて書いてみたコードです。テーブル定義のコードとセットで載せておきます。

# table定義
sample = Table('sample', metadata,
    Column('context_id', Integer, nullable=False),
    Column('name', String(256), nullable=False),
    Column('amount', BigInteger, nullable=False),
    Column('created_at', TIMESTAMP, nullable=False, server_default=text('current_timestamp')),
    Column('updated_at', TIMESTAMP, nullable=False, server_default=text('current_timestamp'), server_onupdate=text('current_timestamp')),
    UniqueConstraint('context_id', 'name')
)

# insertを作成
insert_stmt = insert(sample).values(
    context_id=<context_idの値>, name=<nameの値>, amount=<amountの値>)

# ユニーク制約であるcontext_idとnameの重複が発生した場合は、既存レコードのamount列をinsert文のamountに指定した値で更新する
on_conflict_stmt = insert_stmt.on_duplicate_key_update(
amount=insert_stmt.inserted.amount)

# upsert実行
conn.execute(on_conflict_stmt)

以上です。簡単に書けました。