ハック

peeweeを使用したハック集。共有したいクールなハックがありますか? GitHubでイシューを開くか、私に連絡してください。

楽観的ロック

楽観的ロックは、通常 *SELECT FOR UPDATE*(またはSQLiteでは *BEGIN IMMEDIATE*)を使用する状況で役立ちます。たとえば、データベースからユーザーレコードを取得し、いくつかの変更を加えて、変更されたユーザーレコードを保存する場合などです。通常、このシナリオでは、選択した瞬間から変更を保存する瞬間まで、トランザクションの期間中、ユーザーレコードをロックする必要があります。

一方、楽観的ロックでは、ロックを取得*せず*、代わりに変更している行の内部 *バージョン* 列に依存します。読み取り時に、行の現在のバージョンを確認し、保存時に、バージョンが最初に読み取ったバージョンと同じ場合にのみ更新が行われるようにします。バージョンが新しい場合、他のプロセスが割り込んで行を変更した可能性があります。変更したバージョンを保存すると、重要な変更が失われる可能性があります。

Peeweeで楽観的ロックを実装するのは非常に簡単です。これは、出発点として使用できる基本クラスです。

from peewee import *

class ConflictDetectedException(Exception): pass

class BaseVersionedModel(Model):
    version = IntegerField(default=1, index=True)

    def save_optimistic(self):
        if not self.id:
            # This is a new record, so the default logic is to perform an
            # INSERT. Ideally your model would also have a unique
            # constraint that made it impossible for two INSERTs to happen
            # at the same time.
            return self.save()

        # Update any data that has changed and bump the version counter.
        field_data = dict(self.__data__)
        current_version = field_data.pop('version', 1)
        self._populate_unsaved_relations(field_data)
        field_data = self._prune_fields(field_data, self.dirty_fields)
        if not field_data:
            raise ValueError('No changes have been made.')

        ModelClass = type(self)
        field_data['version'] = ModelClass.version + 1  # Atomic increment.

        query = ModelClass.update(**field_data).where(
            (ModelClass.version == current_version) &
            (ModelClass.id == self.id))
        if query.execute() == 0:
            # No rows were updated, indicating another process has saved
            # a new version. How you handle this situation is up to you,
            # but for simplicity I'm just raising an exception.
            raise ConflictDetectedException()
        else:
            # Increment local version to match what is now in the db.
            self.version += 1
            return True

これがどのように機能するか、例を挙げて説明します。次のモデル定義があるとします。ユーザー名に一意制約があることに注意してください。これは、二重挿入を防ぐ方法を提供するため重要です。

class User(BaseVersionedModel):
    username = CharField(unique=True)
    favorite_animal = CharField()

>>> u = User(username='charlie', favorite_animal='cat')
>>> u.save_optimistic()
True

>>> u.version
1

>>> u.save_optimistic()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "x.py", line 18, in save_optimistic
    raise ValueError('No changes have been made.')
ValueError: No changes have been made.

>>> u.favorite_animal = 'kitten'
>>> u.save_optimistic()
True

# Simulate a separate thread coming in and updating the model.
>>> u2 = User.get(User.username == 'charlie')
>>> u2.favorite_animal = 'macaw'
>>> u2.save_optimistic()
True

# Now, attempt to change and re-save the original instance:
>>> u.favorite_animal = 'little parrot'
>>> u.save_optimistic()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "x.py", line 30, in save_optimistic
    raise ConflictDetectedException()
ConflictDetectedException: current version is out of sync

グループごとの上位オブジェクト

これらの例では、グループごとの単一の上位項目をクエリするいくつかの方法について説明します。さまざまな手法の詳細な説明については、ブログ投稿Peewee ORMを使用したグループ別の上位項目のクエリをご覧ください。上位 *N* 項目のクエリという、より一般的な問題に興味がある場合は、以下のセクショングループごとの上位N個のオブジェクトをご覧ください。

これらの例では、 *User* モデルと *Tweet* モデルを使用して、各ユーザーとその最新のツイートを見つけます。

テストで最も効率的だった方法は、MAX() 集計関数を使用する方法です。

相関のないサブクエリで集計を実行するため、このメソッドが効率的であると確信できます。アイデアは、投稿を、作成者でグループ化し、タイムスタンプがそのユーザーの最大観測タイムスタンプと等しいものを選択することです。

# When referencing a table multiple times, we'll call Model.alias() to create
# a secondary reference to the table.
TweetAlias = Tweet.alias()

# Create a subquery that will calculate the maximum Tweet created_date for each
# user.
subquery = (TweetAlias
            .select(
                TweetAlias.user,
                fn.MAX(TweetAlias.created_date).alias('max_ts'))
            .group_by(TweetAlias.user)
            .alias('tweet_max_subquery'))

# Query for tweets and join using the subquery to match the tweet's user
# and created_date.
query = (Tweet
         .select(Tweet, User)
         .join(User)
         .switch(Tweet)
         .join(subquery, on=(
             (Tweet.created_date == subquery.c.max_ts) &
             (Tweet.user == subquery.c.user_id))))

SQLiteとMySQLはもう少し緩やかで、選択されている列のサブセットによるグループ化を許可しています。つまり、サブクエリをなくして、非常に簡潔に表現できます。

query = (Tweet
         .select(Tweet, User)
         .join(User)
         .group_by(Tweet.user)
         .having(Tweet.created_date == fn.MAX(Tweet.created_date)))

グループごとの上位N個のオブジェクト

これらの例では、グループごとの上位 *N* 項目をかなり効率的にクエリするいくつかの方法について説明します。さまざまな手法の詳細な説明については、ブログ投稿Peewee ORMを使用したグループ別の上位N個のオブジェクトのクエリをご覧ください。

これらの例では、 *User* モデルと *Tweet* モデルを使用して、各ユーザーとその3つの最新のツイートを見つけます。

Postgresラテラル結合

ラテラル結合は、かなり効率的な相関サブクエリを可能にする優れたPostgres機能です。これらは、多くの場合、SQL for each ループと呼ばれます。

必要なSQLは次のとおりです。

SELECT * FROM
  (SELECT id, username FROM user) AS uq
   LEFT JOIN LATERAL
  (SELECT message, created_date
   FROM tweet
   WHERE (user_id = uq.id)
   ORDER BY created_date DESC LIMIT 3)
  AS pq ON true

peeweeでこれを達成するのは非常に簡単です。

subq = (Tweet
        .select(Tweet.message, Tweet.created_date)
        .where(Tweet.user == User.id)
        .order_by(Tweet.created_date.desc())
        .limit(3))

query = (User
         .select(User, subq.c.content, subq.c.created_date)
         .join(subq, JOIN.LEFT_LATERAL)
         .order_by(User.username, subq.c.created_date.desc()))

# We queried from the "perspective" of user, so the rows are User instances
# with the addition of a "content" and "created_date" attribute for each of
# the (up-to) 3 most-recent tweets for each user.
for row in query:
    print(row.username, row.content, row.created_date)

Tweetモデルの「視点」から同等のクエリを実装するには、代わりに次のように記述できます。

# subq is the same as the above example.
subq = (Tweet
        .select(Tweet.message, Tweet.created_date)
        .where(Tweet.user == User.id)
        .order_by(Tweet.created_date.desc())
        .limit(3))

query = (Tweet
         .select(User.username, subq.c.content, subq.c.created_date)
         .from_(User)
         .join(subq, JOIN.LEFT_LATERAL)
         .order_by(User.username, subq.c.created_date.desc()))

# Each row is a "tweet" instance with an additional "username" attribute.
# This will print the (up-to) 3 most-recent tweets from each user.
for tweet in query:
    print(tweet.username, tweet.content, tweet.created_date)

ウィンドウ関数

ウィンドウ関数は、peeweeでサポートされています。スケーラブルで効率的なパフォーマンスを提供します。

必要なSQLは次のとおりです。

SELECT subq.message, subq.username
FROM (
    SELECT
        t2.message,
        t3.username,
        RANK() OVER (
            PARTITION BY t2.user_id
            ORDER BY t2.created_date DESC
        ) AS rnk
    FROM tweet AS t2
    INNER JOIN user AS t3 ON (t2.user_id = t3.id)
) AS subq
WHERE (subq.rnk <= 3)

peeweeでこれを達成するには、ランク付けされたツイートをフィルタリングを実行する外部クエリにラップします。

TweetAlias = Tweet.alias()

# The subquery will select the relevant data from the Tweet and
# User table, as well as ranking the tweets by user from newest
# to oldest.
subquery = (TweetAlias
            .select(
                TweetAlias.message,
                User.username,
                fn.RANK().over(
                    partition_by=[TweetAlias.user],
                    order_by=[TweetAlias.created_date.desc()]).alias('rnk'))
            .join(User, on=(TweetAlias.user == User.id))
            .alias('subq'))

# Since we can't filter on the rank, we are wrapping it in a query
# and performing the filtering in the outer query.
query = (Tweet
         .select(subquery.c.message, subquery.c.username)
         .from_(subquery)
         .where(subquery.c.rnk <= 3))

その他の方法

Postgresを使用していない場合、残念ながら、理想的とは言えないパフォーマンスを示すオプションしかありません。一般的な方法のより完全な概要については、このブログ投稿をご覧ください。以下に、アプローチと対応するSQLをまとめます。

COUNT を使用すると、より新しいタイムスタンプを持つツイートが *N* 未満存在するすべてのツイートを取得できます。

TweetAlias = Tweet.alias()

# Create a correlated subquery that calculates the number of
# tweets with a higher (newer) timestamp than the tweet we're
# looking at in the outer query.
subquery = (TweetAlias
            .select(fn.COUNT(TweetAlias.id))
            .where(
                (TweetAlias.created_date >= Tweet.created_date) &
                (TweetAlias.user == Tweet.user)))

# Wrap the subquery and filter on the count.
query = (Tweet
         .select(Tweet, User)
         .join(User)
         .where(subquery <= 3))

自己結合を行い、HAVING 句でフィルタリングを実行することで、同様の結果を得ることができます。

TweetAlias = Tweet.alias()

# Use a self-join and join predicates to count the number of
# newer tweets.
query = (Tweet
         .select(Tweet.id, Tweet.message, Tweet.user, User.username)
         .join(User)
         .switch(Tweet)
         .join(TweetAlias, on=(
             (TweetAlias.user == Tweet.user) &
             (TweetAlias.created_date >= Tweet.created_date)))
         .group_by(Tweet.id, Tweet.content, Tweet.user, User.username)
         .having(fn.COUNT(Tweet.id) <= 3))

最後の例では、相関サブクエリでLIMIT 句を使用しています。

TweetAlias = Tweet.alias()

# The subquery here will calculate, for the user who created the
# tweet in the outer loop, the three newest tweets. The expression
# will evaluate to `True` if the outer-loop tweet is in the set of
# tweets represented by the inner query.
query = (Tweet
         .select(Tweet, User)
         .join(User)
         .where(Tweet.id << (
             TweetAlias
             .select(TweetAlias.id)
             .where(TweetAlias.user == Tweet.user)
             .order_by(TweetAlias.created_date.desc())
             .limit(3))))

SQLiteでカスタム関数を記述する

SQLiteは、Pythonで記述されたカスタム関数で非常に簡単に拡張できます。これらの関数は、SQLステートメントから呼び出すことができます。SqliteExtDatabasefunc() デコレータを使用すると、非常に簡単に独自の関数を定義できます。

ユーザーが入力したパスワードのハッシュバージョンを生成する関数の例を次に示します。これを使用して、ユーザーとパスワードを照合するlogin 機能を実装することもできます。

from hashlib import sha1
from random import random
from playhouse.sqlite_ext import SqliteExtDatabase

db = SqliteExtDatabase('my-blog.db')

def get_hexdigest(salt, raw_password):
    data = salt + raw_password
    return sha1(data.encode('utf8')).hexdigest()

@db.func()
def make_password(raw_password):
    salt = get_hexdigest(str(random()), str(random()))[:5]
    hsh = get_hexdigest(salt, raw_password)
    return '%s$%s' % (salt, hsh)

@db.func()
def check_password(raw_password, enc_password):
    salt, hsh = enc_password.split('$', 1)
    return hsh == get_hexdigest(salt, raw_password)

この関数を使用して新しいユーザーを追加し、ハッシュされたパスワードを保存する方法は次のとおりです。

query = User.insert(
    username='charlie',
    password=fn.make_password('testing')).execute()

データベースからユーザーを取得する場合、保存されているパスワードはハッシュされ、ソルトが追加されます。

>>> user = User.get(User.username == 'charlie')
>>> print(user.password)
b76fa$88be1adcde66a1ac16054bc17c8a297523170949

login タイプの機能を実装するには、次のようなコードを記述できます。

def login(username, password):
    try:
        return (User
                .select()
                .where(
                    (User.username == username) &
                    (fn.check_password(password, User.password) == True))
                .get())
    except User.DoesNotExist:
        # Incorrect username and/or password.
        return False

日付計算

Peeweeでサポートされている各データベースは、日付/時刻演算用に独自の関数とセマンティクスのセットを実装しています。

このセクションでは、SQLで動的な日付操作を行うためにPeeweeを利用する方法を示す短いシナリオとサンプルコードを提供します。

シナリオ: *X* 秒ごとに特定のタスクを実行する必要があり、タスク間隔とタスク自体の両方がデータベースで定義されています。特定の時間に実行する必要があるタスクを教えてくれるコードを記述する必要があります。

class Schedule(Model):
    interval = IntegerField()  # Run this schedule every X seconds.


class Task(Model):
    schedule = ForeignKeyField(Schedule, backref='tasks')
    command = TextField()  # Run this command.
    last_run = DateTimeField()  # When was this run last?

ロジックは本質的に次のようになります。

# e.g., if the task was last run at 12:00:05, and the associated interval
# is 10 seconds, the next occurrence should be 12:00:15. So we check
# whether the current time (now) is 12:00:15 or later.
now >= task.last_run + schedule.interval

そのため、次のコードを記述できます。

next_occurrence = something  # ??? how do we define this ???

# We can express the current time as a Python datetime value, or we could
# alternatively use the appropriate SQL function/name.
now = Value(datetime.datetime.now())  # Or SQL('current_timestamp'), e.g.

query = (Task
         .select(Task, Schedule)
         .join(Schedule)
         .where(now >= next_occurrence))

Postgresqlの場合、静的な1秒間隔を乗算してオフセットを動的に計算します。

second = SQL("INTERVAL '1 second'")
next_occurrence = Task.last_run + (Schedule.interval * second)

MySQLの場合、スケジュールの間隔を直接参照できます。

from peewee import NodeList  # Needed to construct sql entity.

interval = NodeList((SQL('INTERVAL'), Schedule.interval, SQL('SECOND')))
next_occurrence = fn.date_add(Task.last_run, interval)

SQLiteの場合、SQLiteには専用の日時タイプがないため、少し注意が必要です。そのため、SQLiteの場合、unixタイムスタンプに変換し、スケジュール秒を追加してから、比較可能な日時表現に戻します。

next_ts = fn.strftime('%s', Task.last_run) + Schedule.interval
next_occurrence = fn.datetime(next_ts, 'unixepoch')