クエリ
このセクションでは、リレーショナルデータベースで一般的に実行される基本的なCRUD操作について説明します。
Model.create()
(INSERT クエリを実行する場合)。Model.save()
およびModel.update()
(UPDATE クエリを実行する場合)。Model.delete_instance()
およびModel.delete()
(DELETE クエリを実行する場合)。Model.select()
(SELECT クエリを実行する場合)。
注
Postgresql Exercises Webサイトから取得したクエリの例も多数あります。例は、クエリ例ドキュメントに記載されています。
新しいレコードの作成
Model.create()
を使用して、新しいモデルインスタンスを作成できます。このメソッドは、キーワード引数を受け入れます。キーはモデルのフィールド名に対応します。新しいインスタンスが返され、テーブルに行が追加されます。
>>> User.create(username='Charlie')
<__main__.User object at 0x2529350>
これにより、新しい行がデータベースにINSERTされます。主キーは自動的に取得され、モデルインスタンスに保存されます。
または、モデルインスタンスをプログラムで構築してから、save()
を呼び出すこともできます。
>>> user = User(username='Charlie')
>>> user.save() # save() returns the number of rows modified.
1
>>> user.id
1
>>> huey = User()
>>> huey.username = 'Huey'
>>> huey.save()
1
>>> huey.id
2
モデルに外部キーがある場合、新しいレコードを作成するときに、モデルインスタンスを外部キーフィールドに直接割り当てることができます。
>>> tweet = Tweet.create(user=huey, message='Hello!')
関連オブジェクトの主キーの値を使用することもできます。
>>> tweet = Tweet.create(user=2, message='Hello again!')
データを挿入するだけで、モデルインスタンスを作成する必要がない場合は、Model.insert()
を使用できます。
>>> User.insert(username='Mickey').execute()
3
insertクエリの実行後、新しい行の主キーが返されます。
注
一括挿入操作を高速化する方法はいくつかあります。詳細については、一括挿入のレシピセクションを確認してください。
一括挿入
大量のデータをすばやくロードする方法はいくつかあります。単純な方法は、ループでModel.create()
を呼び出すことです。
data_source = [
{'field1': 'val1-1', 'field2': 'val1-2'},
{'field1': 'val2-1', 'field2': 'val2-2'},
# ...
]
for data_dict in data_source:
MyModel.create(**data_dict)
上記の方法は、いくつかの理由で低速です。
ループをトランザクションでラップしていない場合、
create()
の各呼び出しは独自のトランザクションで発生します。これは非常に遅くなります!途中でかなりの量のPythonロジックが入り込み、各
InsertQuery
を生成してSQLに解析する必要があります。データベースに解析するために送信するデータ(SQLの生のバイト数で)が大量にあります。
場合によっては、追加のクエリが実行される原因となるlast insert idを取得しています。
atomic()
を使用してトランザクションでラップするだけで、大幅な高速化が得られます。
# This is much faster.
with db.atomic():
for data_dict in data_source:
MyModel.create(**data_dict)
上記のコードは、ポイント2、3、および4の影響を受けています。insert_many()
を使用すると、さらに大きな向上が得られます。このメソッドは、タプルまたは辞書のリストを受け入れ、単一のクエリで複数の行を挿入します。
data_source = [
{'field1': 'val1-1', 'field2': 'val1-2'},
{'field1': 'val2-1', 'field2': 'val2-2'},
# ...
]
# Fastest way to INSERT multiple rows.
MyModel.insert_many(data_source).execute()
insert_many()
メソッドは、対応するフィールドも指定した場合、行タプルのリストも受け入れます。
# We can INSERT tuples as well...
data = [('val1-1', 'val1-2'),
('val2-1', 'val2-2'),
('val3-1', 'val3-2')]
# But we need to indicate which fields the values correspond to.
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()
一括挿入をトランザクションでラップすることも良い習慣です。
# You can, of course, wrap this in a transaction as well:
with db.atomic():
MyModel.insert_many(data, fields=fields).execute()
注
SQLiteユーザーは、一括挿入を使用する際のいくつかの注意事項に注意する必要があります。具体的には、SQLite3のバージョンは、一括挿入APIを利用するには3.7.11.0以降である必要があります。さらに、デフォルトでは、SQLiteは、3.32.0 (2020-05-22) より前のSQLiteバージョンではSQLクエリのバインド変数の数を999
に、3.32.0以降のSQLiteバージョンでは32766に制限しています。
バッチでの行の挿入
データソース内の行数によっては、チャンクに分割する必要がある場合があります。特にSQLiteには、クエリごとの変数数の制限が999または32766あります(バッチサイズは、999 // 行の長さまたは32766 // 行の長さになります)。
ループを記述してデータをチャンクにバッチ処理できます(その場合は、トランザクションを使用することを強くお勧めします)。
# Insert rows 100 at a time.
with db.atomic():
for idx in range(0, len(data_source), 100):
MyModel.insert_many(data_source[idx:idx+100]).execute()
Peeweeには、汎用的なイテラブルを、バッチサイズのイテラブルのシリーズに効率的にチャンク化するために使用できるchunked()
ヘルパー関数が付属しています。
from peewee import chunked
# Insert rows 100 at a time.
with db.atomic():
for batch in chunked(data_source, 100):
MyModel.insert_many(batch).execute()
代替手段
Model.bulk_create()
メソッドは、Model.insert_many()
とほぼ同じように動作しますが、挿入する保存されていないモデルインスタンスのリストを受け入れ、オプションでバッチサイズパラメーターを受け入れます。bulk_create()
APIを使用するには
# Read list of usernames from a file, for example.
with open('user_list.txt') as fh:
# Create a list of unsaved User instances.
users = [User(username=line.strip()) for line in fh.readlines()]
# Wrap the operation in a transaction and batch INSERT the users
# 100 at a time.
with db.atomic():
User.bulk_create(users, batch_size=100)
注
Postgresql(RETURNING
句をサポート)を使用している場合、以前に保存されていなかったモデルインスタンスには、新しい主キー値が自動的に設定されます。
さらに、Peeweeは、モデルのリストで1つ以上の列を効率的に更新できるModel.bulk_update()
も提供します。たとえば、
# First, create 3 users with usernames u1, u2, u3.
u1, u2, u3 = [User.create(username='u%s' % i) for i in (1, 2, 3)]
# Now we'll modify the user instances.
u1.username = 'u1-x'
u2.username = 'u2-y'
u3.username = 'u3-z'
# Update all three users with a single UPDATE query.
User.bulk_update([u1, u2, u3], fields=[User.username])
これにより、次のSQLが実行されます。
UPDATE "users" SET "username" = CASE "users"."id"
WHEN 1 THEN "u1-x"
WHEN 2 THEN "u2-y"
WHEN 3 THEN "u3-z" END
WHERE "users"."id" IN (1, 2, 3);
注
オブジェクトのリストが大きい場合は、適切なbatch_sizeを指定し、bulk_update()
の呼び出しをDatabase.atomic()
でラップする必要があります。
with database.atomic():
User.bulk_update(list_of_users, fields=['username'], batch_size=50)
警告
Model.bulk_update()
は、多数のレコードを更新するための最も効率的な方法ではない可能性があります。この機能は、SQL CASE
ステートメントを使用して、更新されるすべての行の主キーと対応するフィールド値の「マッピング」を作成するように実装されています。
別の方法として、Database.batch_commit()
ヘルパーを使用して、batchサイズのトランザクション内でチャンク単位で行を処理できます。このメソッドは、新規作成された行の主キーを取得する必要がある場合に、Postgresql以外のデータベースに対する回避策も提供します。
# List of row data to insert.
row_data = [{'username': 'u1'}, {'username': 'u2'}, ...]
# Assume there are 789 items in row_data. The following code will result in
# 8 total transactions (7x100 rows + 1x89 rows).
for row in db.batch_commit(row_data, 100):
User.create(**row)
別のテーブルからのバルクローディング
バルクロードしたいデータが別のテーブルに格納されている場合は、ソースがSELECTクエリであるINSERTクエリを作成することもできます。Model.insert_from()
メソッドを使用してください。
res = (TweetArchive
.insert_from(
Tweet.select(Tweet.user, Tweet.message),
fields=[TweetArchive.user, TweetArchive.message])
.execute())
上記のクエリは、次のSQLと同等です
INSERT INTO "tweet_archive" ("user_id", "message")
SELECT "user_id", "message" FROM "tweet";
既存のレコードの更新
モデルインスタンスに主キーがあると、その後のsave()
の呼び出しは、別のINSERTではなくUPDATEになります。モデルの主キーは変更されません
>>> user.save() # save() returns the number of rows modified.
1
>>> user.id
1
>>> user.save()
>>> user.id
1
>>> huey.save()
1
>>> huey.id
2
複数のレコードを更新する場合は、UPDATEクエリを発行します。次の例では、Tweet
オブジェクトをすべて更新し、今日より前に作成された場合はpublishedとしてマークします。Model.update()
は、キーがモデルのフィールド名に対応するキーワード引数を受け入れます
>>> today = datetime.today()
>>> query = Tweet.update(is_published=True).where(Tweet.creation_date < today)
>>> query.execute() # Returns the number of rows that were updated.
4
詳細については、Model.update()
、Update
、およびModel.bulk_update()
のドキュメントを参照してください。
注
アトミックアップデート(列の値のインクリメントなど)の実行の詳細については、アトミックアップデートのレシピを確認してください。
アトミックアップデート
Peeweeでは、アトミックアップデートを実行できます。いくつかのカウンターを更新する必要があると仮定しましょう。単純なアプローチは、次のようなものを書くことです。
>>> for stat in Stat.select().where(Stat.url == request.url):
... stat.counter += 1
... stat.save()
これはしないでください! これは遅いだけでなく、複数のプロセスが同時にカウンターを更新している場合に、競合状態が発生しやすくなります。
代わりに、update()
を使用してカウンターをアトミックに更新できます
>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()
これらの更新ステートメントは、必要なだけ複雑にすることができます。すべての従業員に、以前のボーナスに給与の10%を加えたボーナスを与えましょう
>>> query = Employee.update(bonus=(Employee.bonus + (Employee.salary * .1)))
>>> query.execute() # Give everyone a bonus!
サブクエリを使用して、列の値を更新することもできます。User
モデルに、ユーザーが作成したツイート数を格納する非正規化された列があり、この値を定期的に更新すると仮定します。そのようなクエリを記述する方法は次のとおりです
>>> subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id)
>>> update = User.update(num_tweets=subquery)
>>> update.execute()
Upsert
Peeweeは、さまざまなタイプのupsert機能をサポートしています。3.24.0より前のSQLiteとMySQLでは、Peeweeはreplace()
を提供しています。これにより、レコードを挿入したり、制約違反が発生した場合は、既存のレコードを置き換えたりできます。Sqlite 3.24+およびPostgresの場合、peeweeはON CONFLICT
クエリを完全にサポートしています。
replace()
とon_conflict_replace()
の使用例
class User(Model):
username = TextField(unique=True)
last_login = DateTimeField(null=True)
# Insert or update the user. The "last_login" value will be updated
# regardless of whether the user existed previously.
user_id = (User
.replace(username='the-user', last_login=datetime.now())
.execute())
# This query is equivalent:
user_id = (User
.insert(username='the-user', last_login=datetime.now())
.on_conflict_replace()
.execute())
注
replaceに加えて、SQLite、MySQL、およびPostgresqlは、挿入して潜在的な制約違反を無視したい場合に、ignoreアクション(on_conflict_ignore()
を参照)を提供します。
MySQLは、ON DUPLICATE KEY UPDATE句を介してupsertをサポートしています。例えば
class User(Model):
username = TextField(unique=True)
last_login = DateTimeField(null=True)
login_count = IntegerField()
# Insert a new user.
User.create(username='huey', login_count=0)
# Simulate the user logging in. The login count and timestamp will be
# either created or updated correctly.
now = datetime.now()
rowid = (User
.insert(username='huey', last_login=now, login_count=1)
.on_conflict(
preserve=[User.last_login], # Use the value we would have inserted.
update={User.login_count: User.login_count + 1})
.execute())
上記の例では、upsertクエリを何度でも安全に呼び出すことができます。ログインカウントはアトミックにインクリメントされ、最後のログイン列が更新され、重複した行は作成されません。
PostgresqlおよびSQLite(3.24.0以降)は、競合解決をトリガーする制約違反と、更新または保持する値をより詳細に制御できる別の構文を提供します。
on_conflict()
を使用して、Postgresqlスタイルのupsert(またはSQLite 3.24+)を実行する例
class User(Model):
username = TextField(unique=True)
last_login = DateTimeField(null=True)
login_count = IntegerField()
# Insert a new user.
User.create(username='huey', login_count=0)
# Simulate the user logging in. The login count and timestamp will be
# either created or updated correctly.
now = datetime.now()
rowid = (User
.insert(username='huey', last_login=now, login_count=1)
.on_conflict(
conflict_target=[User.username], # Which constraint?
preserve=[User.last_login], # Use the value we would have inserted.
update={User.login_count: User.login_count + 1})
.execute())
上記の例では、upsertクエリを何度でも安全に呼び出すことができます。ログインカウントはアトミックにインクリメントされ、最後のログイン列が更新され、重複した行は作成されません。
注
MySQLとPostgresql/SQLiteの主な違いは、PostgresqlとSQLiteでは、conflict_target
を指定する必要があることです。
EXCLUDED
名前空間を使用した、より高度な(作為的な)例を次に示します。EXCLUDED
ヘルパーを使用すると、競合するデータの値を参照できます。例として、一意のキー(文字列)を値(整数)にマッピングする単純なテーブルを想定します。
class KV(Model):
key = CharField(unique=True)
value = IntegerField()
# Create one row.
KV.create(key='k1', value=1)
# Demonstrate usage of EXCLUDED.
# Here we will attempt to insert a new value for a given key. If that
# key already exists, then we will update its value with the *sum* of its
# original value and the value we attempted to insert -- provided that
# the new value is larger than the original value.
query = (KV.insert(key='k1', value=10)
.on_conflict(conflict_target=[KV.key],
update={KV.value: KV.value + EXCLUDED.value},
where=(EXCLUDED.value > KV.value)))
# Executing the above query will result in the following data being
# present in the "kv" table:
# (key='k1', value=11)
query.execute()
# If we attempted to execute the query *again*, then nothing would be
# updated, as the new value (10) is now less than the value in the
# original row (11).
ON CONFLICT
を使用する際には、いくつかの重要な概念を理解する必要があります
conflict_target=
:UNIQUE制約を持つ列。ユーザーテーブルの場合、これはユーザーのメールアドレスになる場合があります。preserve=
:競合が発生した場合、このパラメーターは、更新したい新しいデータのどの値を示すために使用されます。update=
:競合が発生した場合、これは既存の行に適用するデータのマッピングです。EXCLUDED
:この「魔法の」名前空間を使用すると、制約が失敗しなかった場合に挿入されたであろう新しいデータを参照できます。
完全な例
class User(Model):
email = CharField(unique=True) # Unique identifier for user.
last_login = DateTimeField()
login_count = IntegerField(default=0)
ip_log = TextField(default='')
# Demonstrates the above 4 concepts.
def login(email, ip):
rowid = (User
.insert({User.email: email,
User.last_login: datetime.now(),
User.login_count: 1,
User.ip_log: ip})
.on_conflict(
# If the INSERT fails due to a constraint violation on the
# user email, then perform an UPDATE instead.
conflict_target=[User.email],
# Set the "last_login" to the value we would have inserted
# (our call to datetime.now()).
preserve=[User.last_login],
# Increment the user's login count and prepend the new IP
# to the user's ip history.
update={User.login_count: User.login_count + 1,
User.ip_log: fn.CONCAT(EXCLUDED.ip_log, ',', User.ip_log)})
.execute())
return rowid
# This will insert the initial row, returning the new row id (1).
print(login('test@example.com', '127.1'))
# Because test@example.com exists, this will trigger the UPSERT. The row id
# from above is returned again (1).
print(login('test@example.com', '127.2'))
u = User.get()
print(u.login_count, u.ip_log)
# Prints "2 127.2,127.1"
詳細については、Insert.on_conflict()
とOnConflict
を参照してください。
レコードの削除
単一のモデルインスタンスを削除するには、Model.delete_instance()
ショートカットを使用できます。delete_instance()
は、指定されたモデルインスタンスを削除し、オプションで依存オブジェクトを再帰的に削除できます(recursive=Trueを指定することにより)。
>>> user = User.get(User.id == 1)
>>> user.delete_instance() # Returns the number of rows deleted.
1
>>> User.get(User.id == 1)
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."id" = ?
PARAMS: [1]
任意の行のセットを削除するには、DELETEクエリを発行できます。次は、1年以上経過したTweet
オブジェクトをすべて削除します
>>> query = Tweet.delete().where(Tweet.creation_date < one_year_ago)
>>> query.execute() # Returns the number of rows deleted.
7
詳細については、ドキュメントを参照してください
DeleteQuery
単一レコードの選択
指定されたクエリに一致する単一のインスタンスを取得するには、Model.get()
メソッドを使用できます。主キー検索の場合は、ショートカットメソッドModel.get_by_id()
を使用することもできます。
このメソッドは、指定されたクエリでModel.select()
を呼び出すショートカットですが、結果セットを単一行に制限します。さらに、指定されたクエリに一致するモデルがない場合は、DoesNotExist
例外が発生します。
>>> User.get(User.id == 1)
<__main__.User object at 0x25294d0>
>>> User.get_by_id(1) # Same as above.
<__main__.User object at 0x252df10>
>>> User[1] # Also same as above.
<__main__.User object at 0x252dd10>
>>> User.get(User.id == 1).username
u'Charlie'
>>> User.get(User.username == 'Charlie')
<__main__.User object at 0x2529410>
>>> User.get(User.username == 'nobody')
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."username" = ?
PARAMS: ['nobody']
より高度な操作については、SelectBase.get()
を使用できます。次のクエリは、charlieという名前のユーザーの最新のツイートを取得します
>>> (Tweet
... .select()
... .join(User)
... .where(User.username == 'charlie')
... .order_by(Tweet.created_date.desc())
... .get())
<__main__.Tweet object at 0x2623410>
詳細については、ドキュメントを参照してください
Model.get_or_none()
- 一致する行が見つからない場合は、None
を返します。SelectBase.first()
- 結果セットの最初のレコードまたはNone
を返します。
作成または取得
Peeweeには、「get/create」タイプの操作を実行するための1つのヘルパーメソッドがあります。Model.get_or_create()
は、最初に一致する行の取得を試みます。失敗した場合は、新しい行が作成されます。
「作成または取得」タイプのロジックの場合、通常、重複オブジェクトの作成を防ぐために、unique制約または主キーに依存します。例として、サンプルUserモデルを使用して新しいユーザーアカウントを登録するとします。Userモデルにはユーザー名フィールドにunique制約があるため、データベースの整合性保証に依存して、ユーザー名の重複を防ぎます
try:
with db.atomic():
return User.create(username=username)
except peewee.IntegrityError:
# `username` is a unique column, so this username already exists,
# making it safe to call .get().
return User.get(User.username == username)
このタイプのロジックは、独自のModel
クラスのclassmethod
として簡単にカプセル化できます。
上記の例では、最初に作成を試み、次にデータベースに一意制約を適用させることで、取得にフォールバックしています。最初にレコードを取得しようとする場合は、get_or_create()
を使用できます。このメソッドは、同じ名前のDjango関数と同じように実装されています。Djangoスタイルのキーワード引数フィルターを使用して、WHERE
条件を指定できます。関数は、インスタンスとオブジェクトが作成されたかどうかを示すブール値を含む2タプルを返します。
get_or_create()
を使用してユーザーアカウントの作成を実装する方法を次に示します
user, created = User.get_or_create(username=username)
別のモデル Person
があり、Person オブジェクトを取得または作成したいとします。 Person
を取得する際に考慮する条件は、その人の名前と苗字だけですが、新しいレコードを作成する必要がある場合は、生年月日とお気に入りの色も指定します。
person, created = Person.get_or_create(
first_name=first_name,
last_name=last_name,
defaults={'dob': dob, 'favorite_color': 'green'})
get_or_create()
に渡されたキーワード引数はすべて、get()
ロジックの部分で使用されます。ただし、defaults
ディクショナリは除き、これは新規作成されたインスタンスに値を設定するために使用されます。
詳細については、Model.get_or_create()
のドキュメントを参照してください。
複数レコードの選択
Model.select()
を使用して、テーブルから行を取得できます。SELECT クエリを構築すると、データベースはクエリに対応する行を返します。Peewee では、これらの行を反復処理したり、インデックス操作やスライス操作を使用したりできます。
>>> query = User.select()
>>> [user.username for user in query]
['Charlie', 'Huey', 'Peewee']
>>> query[1]
<__main__.User at 0x7f83e80f5550>
>>> query[1].username
'Huey'
>>> query[:2]
[<__main__.User at 0x7f83e80f53a8>, <__main__.User at 0x7f83e80f5550>]
Select
クエリは、反復処理、インデックス処理、スライス処理を複数回実行できますが、クエリは一度しか実行されないため、賢いです。
次の例では、単に select()
を呼び出し、Select
のインスタンスである戻り値を反復処理します。これにより、User テーブルのすべての行が返されます。
>>> for user in User.select():
... print(user.username)
...
Charlie
Huey
Peewee
注
結果はキャッシュされるため、同じクエリの後続の反復処理ではデータベースにアクセスしません。この動作を無効にする(メモリ使用量を削減する)には、反復処理時に Select.iterator()
を呼び出します。
外部キーを含むモデルを反復処理する場合は、関連モデルの値へのアクセス方法に注意してください。誤って外部キーを解決したり、逆参照を反復処理したりすると、N+1 クエリ動作が発生する可能性があります。
Tweet.user
のように外部キーを作成すると、backref を使用して逆参照 (User.tweets
) を作成できます。逆参照は Select
インスタンスとして公開されます。
>>> tweet = Tweet.get()
>>> tweet.user # Accessing a foreign key returns the related model.
<tw.User at 0x7f3ceb017f50>
>>> user = User.get()
>>> user.tweets # Accessing a back-reference returns a query.
<peewee.ModelSelect at 0x7f73db3bafd0>
他の Select
と同様に、user.tweets
逆参照を反復処理できます。
>>> for tweet in user.tweets:
... print(tweet.message)
...
hello world
this is fun
look at this picture of my food
モデルインスタンスを返すだけでなく、Select
クエリはディクショナリ、タプル、および名前付きタプルを返すことができます。たとえば、ユースケースによっては、行をディクショナリとして処理する方が簡単な場合があります。
>>> query = User.select().dicts()
>>> for row in query:
... print(row)
{'id': 1, 'username': 'Charlie'}
{'id': 2, 'username': 'Huey'}
{'id': 3, 'username': 'Peewee'}
詳細については、namedtuples()
、tuples()
、dicts()
を参照してください。
大規模な結果セットの反復処理
デフォルトでは、peewee は Select
クエリを反復処理する際に返される行をキャッシュします。これは、追加のクエリを発生させずに複数の反復処理とインデックス処理およびスライス処理を可能にするための最適化です。ただし、多数の行を反復処理する場合は、このキャッシュが問題になる可能性があります。
クエリを反復処理する際に peewee が使用するメモリ量を削減するには、iterator()
メソッドを使用します。このメソッドを使用すると、返される各モデルをキャッシュすることなく反復処理できるため、大規模な結果セットを反復処理する際のメモリ使用量を大幅に削減できます。
# Let's assume we've got 10 million stat objects to dump to a csv file.
stats = Stat.select()
# Our imaginary serializer class
serializer = CSVSerializer()
# Loop over all the stats and serialize.
for stat in stats.iterator():
serializer.serialize_object(stat)
単純なクエリの場合、行をディクショナリ、名前付きタプル、またはタプルとして返すことで、さらに速度を向上させることができます。次のメソッドは、Select
クエリで使用して、結果の行タイプを変更できます。
メモリ消費量を削減するために、iterator()
メソッド呼び出しを追加することを忘れないでください。たとえば、上記のコードは次のようになる可能性があります。
# Let's assume we've got 10 million stat objects to dump to a csv file.
stats = Stat.select()
# Our imaginary serializer class
serializer = CSVSerializer()
# Loop over all the stats (rendered as tuples, without caching) and serialize.
for stat_tuple in stats.tuples().iterator():
serializer.serialize_tuple(stat_tuple)
複数のテーブルからの列を含む多数の行を反復処理する場合、peewee は返された各行のモデルグラフを再構築します。この操作は、複雑なグラフの場合、遅くなる可能性があります。たとえば、ツイートの作成者のユーザー名とアバターとともにツイートのリストを選択している場合、Peewee は各行に対して 2 つのオブジェクト (ツイートとユーザー) を作成する必要があります。上記の行タイプに加えて、4 番目のメソッド objects()
があります。これは、行をモデルインスタンスとして返しますが、モデルグラフを解決しようとはしません。
例:
query = (Tweet
.select(Tweet, User) # Select tweet and user data.
.join(User))
# Note that the user columns are stored in a separate User instance
# accessible at tweet.user:
for tweet in query:
print(tweet.user.username, tweet.content)
# Using ".objects()" will not create the tweet.user object and assigns all
# user attributes to the tweet instance:
for tweet in query.objects():
print(tweet.username, tweet.content)
パフォーマンスを最大化するために、クエリを実行し、基になるデータベースカーソルを使用して結果を反復処理できます。Database.execute()
はクエリオブジェクトを受け取り、クエリを実行し、DB-API 2.0 Cursor
オブジェクトを返します。カーソルは、生の行タプルを返します。
query = Tweet.select(Tweet.content, User.username).join(User)
cursor = database.execute(query)
for (content, username) in cursor:
print(username, '->', content)
レコードのフィルタリング
通常の python 演算子を使用して、特定のレコードをフィルタリングできます。Peewee は、さまざまな クエリ演算子 をサポートしています。
>>> user = User.get(User.username == 'Charlie')
>>> for tweet in Tweet.select().where(Tweet.user == user, Tweet.is_published == True):
... print(tweet.user.username, '->', tweet.message)
...
Charlie -> hello world
Charlie -> this is fun
>>> for tweet in Tweet.select().where(Tweet.created_date < datetime.datetime(2011, 1, 1)):
... print(tweet.message, tweet.created_date)
...
Really old tweet 2010-01-01 00:00:00
ジョインをまたいでフィルタリングすることもできます。
>>> for tweet in Tweet.select().join(User).where(User.username == 'Charlie'):
... print(tweet.message)
hello world
this is fun
look at this picture of my food
複雑なクエリを表現する場合は、括弧と Python のビット単位の *or* 演算子と *and* 演算子を使用します。
>>> Tweet.select().join(User).where(
... (User.username == 'Charlie') |
... (User.username == 'Peewee Herman'))
注
Peewee は論理演算子 (and
および or
) ではなく、**ビット単位**演算子 (&
および |
) を使用することに注意してください。この理由は、Python が論理演算の戻り値をブール値に強制するためです。これは、「IN」クエリを in
演算子ではなく .in_()
を使用して表現する必要がある理由でもあります。
可能なクエリの種類については、クエリ操作のテーブル を参照してください。
注
クエリの where 句には、次のようなさまざまな楽しいものを入れることができます。
フィールド式 (例:
User.username == 'Charlie'
)関数式 (例:
fn.Lower(fn.Substr(User.username, 1, 1)) == 'a'
)ある列と別の列の比較 (例:
Employee.salary < (Employee.tenure * 1000) + 40000
)
また、クエリをネストすることもできます。たとえば、ユーザー名が「a」で始まるユーザーによるツイートなどです。
# get users whose username starts with "a"
a_users = User.select().where(fn.Lower(fn.Substr(User.username, 1, 1)) == 'a')
# the ".in_()" method signifies an "IN" query
a_user_tweets = Tweet.select().where(Tweet.user.in_(a_users))
クエリのその他の例
注
さまざまなクエリの例については、クエリの例 ドキュメントを参照してください。このドキュメントでは、PostgreSQL Exercises Web サイトからクエリを実装する方法を示しています。
アクティブなユーザーを取得
User.select().where(User.active == True)
スタッフまたはスーパーユーザーのいずれかであるユーザーを取得
User.select().where(
(User.is_staff == True) | (User.is_superuser == True))
「charlie」という名前のユーザーによるツイートを取得
Tweet.select().join(User).where(User.username == 'charlie')
スタッフまたはスーパーユーザーによるツイートを取得 (FK 関係を想定)
Tweet.select().join(User).where(
(User.is_staff == True) | (User.is_superuser == True))
サブクエリを使用したスタッフまたはスーパーユーザーによるツイートの取得
staff_super = User.select(User.id).where(
(User.is_staff == True) | (User.is_superuser == True))
Tweet.select().where(Tweet.user.in_(staff_super))
レコードの並べ替え
行を順に返すには、order_by()
メソッドを使用します。
>>> for t in Tweet.select().order_by(Tweet.created_date):
... print(t.pub_date)
...
2010-01-01 00:00:00
2011-06-07 14:08:48
2011-06-07 14:12:57
>>> for t in Tweet.select().order_by(Tweet.created_date.desc()):
... print(t.pub_date)
...
2011-06-07 14:12:57
2011-06-07 14:08:48
2010-01-01 00:00:00
また、順序を示すために +
および -
の接頭辞演算子を使用することもできます。
# The following queries are equivalent:
Tweet.select().order_by(Tweet.created_date.desc())
Tweet.select().order_by(-Tweet.created_date) # Note the "-" prefix.
# Similarly you can use "+" to indicate ascending order, though ascending
# is the default when no ordering is otherwise specified.
User.select().order_by(+User.username)
ジョインをまたいで並べ替えることもできます。ツイートを、作成者のユーザー名で並べ替え、次に created_date で並べ替える場合を想定します。
query = (Tweet
.select()
.join(User)
.order_by(User.username, Tweet.created_date.desc()))
SELECT t1."id", t1."user_id", t1."message", t1."is_published", t1."created_date"
FROM "tweet" AS t1
INNER JOIN "user" AS t2
ON t1."user_id" = t2."id"
ORDER BY t2."username", t1."created_date" DESC
計算された値で並べ替える場合は、必要な SQL 式を含めるか、値に割り当てられたエイリアスを参照できます。次に、これらのメソッドを示す 2 つの例を示します。
# Let's start with our base query. We want to get all usernames and the number of
# tweets they've made. We wish to sort this list from users with most tweets to
# users with fewest tweets.
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username))
select
句で使用されるのと同じ COUNT 式を使用して並べ替えることができます。次の例では、ツイート ID の COUNT()
で降順に並べ替えています。
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(fn.COUNT(Tweet.id).desc()))
または、select
句で計算された値に割り当てられたエイリアスを参照できます。この方法には、少し読みやすいという利点があります。名前付きエイリアスを直接参照しているのではなく、SQL
ヘルパーを使用してラップしていることに注意してください。
query = (User
.select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(SQL('num_tweets').desc()))
または、「peewee」の方法で実行するには、
ntweets = fn.COUNT(Tweet.id)
query = (User
.select(User.username, ntweets.alias('num_tweets'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(ntweets.desc())
ランダムレコードの取得
場合によっては、データベースからランダムなレコードを取得する必要がある場合があります。これを行うには、(データベースに応じて) *random* または *rand* 関数で並べ替えます。
Postgresql と Sqlite は *Random* 関数を使用します。
# Pick 5 lucky winners:
LotteryNumber.select().order_by(fn.Random()).limit(5)
MySQL は *Rand* を使用します。
# Pick 5 lucky winners:
LotteryNumber.select().order_by(fn.Rand()).limit(5)
レコードのページネーション
paginate()
メソッドを使用すると、レコードのページを簡単に取得できます。paginate()
は、page_number
と items_per_page
の 2 つのパラメーターを受け取ります。
注意
ページ番号は 1 から始まるため、結果の最初のページはページ 1 になります。
>>> for tweet in Tweet.select().order_by(Tweet.id).paginate(2, 10):
... print(tweet.message)
...
tweet 10
tweet 11
tweet 12
tweet 13
tweet 14
tweet 15
tweet 16
tweet 17
tweet 18
tweet 19
レコード数のカウント
任意の SELECT クエリで、行数をカウントできます。
>>> Tweet.select().count()
100
>>> Tweet.select().where(Tweet.id > 50).count()
50
Peewee は、カウントを実行する外部クエリでクエリをラップします。これにより、次のような SQL が生成されます。
SELECT COUNT(1) FROM ( ... your query ... );
レコードの集計
ユーザーがいて、各ユーザーのツイート数とともにユーザーのリストを取得したいとします。
query = (User
.select(User, fn.Count(Tweet.id).alias('count'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User))
結果のクエリは、すべての通常の属性に加えて、各ユーザーのツイート数を含む追加の属性countを持つUserオブジェクトを返します。ツイートがないユーザーを含めるために、左外部結合を使用します。
タグ付けアプリケーションがあり、特定の数の関連オブジェクトを持つタグを見つけたいとします。この例では、多対多構成でいくつかの異なるモデルを使用します。
class Photo(Model):
image = CharField()
class Tag(Model):
name = CharField()
class PhotoTag(Model):
photo = ForeignKeyField(Photo)
tag = ForeignKeyField(Tag)
ここで、少なくとも 5 つの写真が関連付けられているタグを見つけたいとします。
query = (Tag
.select()
.join(PhotoTag)
.join(Photo)
.group_by(Tag)
.having(fn.Count(Photo.id) > 5))
このクエリは、次の SQL と同等です。
SELECT t1."id", t1."name"
FROM "tag" AS t1
INNER JOIN "phototag" AS t2 ON t1."id" = t2."tag_id"
INNER JOIN "photo" AS t3 ON t2."photo_id" = t3."id"
GROUP BY t1."id", t1."name"
HAVING Count(t3."id") > 5
関連付けられたカウントを取得して、タグに保存したいとします。
query = (Tag
.select(Tag, fn.Count(Photo.id).alias('count'))
.join(PhotoTag)
.join(Photo)
.group_by(Tag)
.having(fn.Count(Photo.id) > 5))
スカラー値の取得
Query.scalar()
を呼び出すことで、スカラー値を取得できます。例えば
>>> PageView.select(fn.Count(fn.Distinct(PageView.url))).scalar()
100
as_tuple=True
を渡すことで、複数のスカラー値を取得できます。
>>> Employee.select(
... fn.Min(Employee.salary), fn.Max(Employee.salary)
... ).scalar(as_tuple=True)
(30000, 50000)
ウィンドウ関数
Window
関数とは、SELECT
クエリの一部として処理されるデータのスライディング ウィンドウで動作する集計関数を指します。ウィンドウ関数を使用すると、次のようなことが可能になります。
結果セットのサブセットに対する集計を実行します。
累積合計を計算します。
結果をランク付けします。
行の値を先行(または後続)の行の値と比較します。
peewee には、SQL ウィンドウ関数のサポートが付属しており、Function.over()
を呼び出し、パーティション分割または順序付けパラメーターを渡すことで作成できます。
次の例では、次のモデルとサンプルデータを使用します。
class Sample(Model):
counter = IntegerField()
value = FloatField()
data = [(1, 10),
(1, 20),
(2, 1),
(2, 3),
(3, 100)]
Sample.insert_many(data, fields=[Sample.counter, Sample.value]).execute()
サンプルテーブルには、次のものが含まれています。
id |
カウンター |
値 |
---|---|---|
1 |
1 |
10.0 |
2 |
1 |
20.0 |
3 |
2 |
1.0 |
4 |
2 |
3.0 |
5 |
3 |
100.0 |
順序付きウィンドウ
value
フィールドの累積合計を計算してみましょう。それを「累積」合計にするためには、順序付けする必要があるため、サンプルの id
フィールドに関して順序付けます。
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(order_by=[Sample.id]).alias('total'))
for sample in query:
print(sample.counter, sample.value, sample.total)
# 1 10. 10.
# 1 20. 30.
# 2 1. 31.
# 2 3. 34.
# 3 100 134.
別の例として、id
で順序付けした場合の、現在の値と前の値の差を計算します。
difference = Sample.value - fn.LAG(Sample.value, 1).over(order_by=[Sample.id])
query = Sample.select(
Sample.counter,
Sample.value,
difference.alias('diff'))
for sample in query:
print(sample.counter, sample.value, sample.diff)
# 1 10. NULL
# 1 20. 10. -- (20 - 10)
# 2 1. -19. -- (1 - 20)
# 2 3. 2. -- (3 - 1)
# 3 100 97. -- (100 - 3)
パーティション分割されたウィンドウ
個別の「カウンター」値ごとの value
の平均を計算してみましょう。counter
フィールドには 3 つの可能な値(1、2、3)があることに注意してください。counter
フィールドに応じてパーティション分割されたウィンドウに対して value
列の AVG()
を計算することで、これを行うことができます。
query = Sample.select(
Sample.counter,
Sample.value,
fn.AVG(Sample.value).over(partition_by=[Sample.counter]).alias('cavg'))
for sample in query:
print(sample.counter, sample.value, sample.cavg)
# 1 10. 15.
# 1 20. 15.
# 2 1. 2.
# 2 3. 2.
# 3 100 100.
order_by
パラメーターと partition_by
パラメーターの両方を指定することで、パーティション内の順序付けを使用できます。例として、個別の counter
グループ内の値でサンプルをランク付けしましょう。
query = Sample.select(
Sample.counter,
Sample.value,
fn.RANK().over(
order_by=[Sample.value],
partition_by=[Sample.counter]).alias('rank'))
for sample in query:
print(sample.counter, sample.value, sample.rank)
# 1 10. 1
# 1 20. 2
# 2 1. 1
# 2 3. 2
# 3 100 1
境界付きウィンドウ
デフォルトでは、ウィンドウ関数は、ウィンドウの境界なしの先行開始と、終了として現在の行を使用して評価されます。集計関数が動作するウィンドウの境界を変更するには、Function.over()
の呼び出しで start
および/または end
を指定します。さらに、Peewee には、適切な境界参照を生成するための Window
オブジェクトのヘルパーメソッドが付属しています。
Window.CURRENT_ROW
- 現在の行を参照する属性。Window.preceding()
- 先行する行数を指定します。または、すべて先行する行を示すために数値を省略します。Window.following()
- 後続の行数を指定します。または、すべて後続する行を示すために数値を省略します。
境界がどのように機能するかを調べるために、id
に関して順序付けられた value
列の累積合計を計算します。ただし、現在の行とその前の 2 行の累積合計のみを調べます。
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(
order_by=[Sample.id],
start=Window.preceding(2),
end=Window.CURRENT_ROW).alias('rsum'))
for sample in query:
print(sample.counter, sample.value, sample.rsum)
# 1 10. 10.
# 1 20. 30. -- (20 + 10)
# 2 1. 31. -- (1 + 20 + 10)
# 2 3. 24. -- (3 + 1 + 20)
# 3 100 104. -- (100 + 3 + 1)
注
end=Window.CURRENT
を指定する必要は技術的にはありませんでした。なぜなら、それがデフォルトだからです。デモンストレーションのために例に示しました。
別の例を見てみましょう。この例では、すべての値の合計が、id
で順序付けられたサンプルの値によって減少する、累積合計の「反対」を計算します。これを実現するために、現在の行から最後の行までの合計を計算します。
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(
order_by=[Sample.id],
start=Window.CURRENT_ROW,
end=Window.following()).alias('rsum'))
# 1 10. 134. -- (10 + 20 + 1 + 3 + 100)
# 1 20. 124. -- (20 + 1 + 3 + 100)
# 2 1. 104. -- (1 + 3 + 100)
# 2 3. 103. -- (3 + 100)
# 3 100 100. -- (100)
フィルターされた集計
集計関数は、フィルター関数 (Postgres および Sqlite 3.25+) もサポートする場合があります。これは、FILTER (WHERE...)
句に変換されます。フィルター式は、Function.filter()
メソッドを使用して集計関数に追加されます。
例として、id
に関して value
フィールドの累積合計を計算しますが、counter=2
のサンプルは除外します。
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).filter(Sample.counter != 2).over(
order_by=[Sample.id]).alias('csum'))
for sample in query:
print(sample.counter, sample.value, sample.csum)
# 1 10. 10.
# 1 20. 30.
# 2 1. 30.
# 2 3. 30.
# 3 100 130.
ウィンドウ定義の再利用
複数の集計に同じウィンドウ定義を使用する場合は、Window
オブジェクトを作成できます。Window
オブジェクトは、Function.over()
と同じパラメーターを受け取り、個々のパラメーターの代わりに over()
メソッドに渡すことができます。
ここでは、サンプルの id
に関して順序付けられた単一のウィンドウを宣言し、そのウィンドウ定義を使用して複数のウィンドウ関数を呼び出します。
win = Window(order_by=[Sample.id])
query = Sample.select(
Sample.counter,
Sample.value,
fn.LEAD(Sample.value).over(win),
fn.LAG(Sample.value).over(win),
fn.SUM(Sample.value).over(win)
).window(win) # Include our window definition in query.
for row in query.tuples():
print(row)
# counter value lead() lag() sum()
# 1 10. 20. NULL 10.
# 1 20. 1. 10. 30.
# 2 1. 3. 20. 31.
# 2 3. 100. 1. 34.
# 3 100. NULL 3. 134.
複数のウィンドウ定義
前の例では、Window
定義を宣言し、それを複数の異なる集計に再利用する方法を見てきました。クエリに必要な数のウィンドウ定義を含めることができますが、各ウィンドウに一意のエイリアスがあることを確認する必要があります。
w1 = Window(order_by=[Sample.id]).alias('w1')
w2 = Window(partition_by=[Sample.counter]).alias('w2')
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(w1).alias('rsum'), # Running total.
fn.AVG(Sample.value).over(w2).alias('cavg') # Avg per category.
).window(w1, w2) # Include our window definitions.
for sample in query:
print(sample.counter, sample.value, sample.rsum, sample.cavg)
# counter value rsum cavg
# 1 10. 10. 15.
# 1 20. 30. 15.
# 2 1. 31. 2.
# 2 3. 34. 2.
# 3 100 134. 100.
同様に、類似した定義を共有する複数のウィンドウ定義がある場合は、以前に定義されたウィンドウ定義を拡張することが可能です。たとえば、ここではデータセットをカウンター値でパーティション分割するため、カウンターに関して集計を行います。次に、このパーティション分割を拡張し、順序付け句を追加する 2 番目のウィンドウを定義します。
w1 = Window(partition_by=[Sample.counter]).alias('w1')
# By extending w1, this window definition will also be partitioned
# by "counter".
w2 = Window(extends=w1, order_by=[Sample.value.desc()]).alias('w2')
query = (Sample
.select(Sample.counter, Sample.value,
fn.SUM(Sample.value).over(w1).alias('group_sum'),
fn.RANK().over(w2).alias('revrank'))
.window(w1, w2)
.order_by(Sample.id))
for sample in query:
print(sample.counter, sample.value, sample.group_sum, sample.revrank)
# counter value group_sum revrank
# 1 10. 30. 2
# 1 20. 30. 1
# 2 1. 4. 2
# 2 3. 4. 1
# 3 100. 100. 1
フレームタイプ: RANGE 対 ROWS 対 GROUPS
フレームタイプに応じて、データベースは順序付けられたグループを異なる方法で処理します。違いを視覚化するために、2 つの追加の Sample
行を作成しましょう。
>>> Sample.create(counter=1, value=20.)
<Sample 6>
>>> Sample.create(counter=2, value=1.)
<Sample 7>
テーブルには、次のものが含まれています。
id |
カウンター |
値 |
---|---|---|
1 |
1 |
10.0 |
2 |
1 |
20.0 |
3 |
2 |
1.0 |
4 |
2 |
3.0 |
5 |
3 |
100.0 |
6 |
1 |
20.0 |
7 |
2 |
1.0 |
counter
フィールドと value
フィールドに関して順序付けられたサンプルの「累積合計」を計算することで、違いを調べてみましょう。フレームタイプを指定するには、次のいずれかを使用できます。
論理的な重複がある場合、RANGE
の動作は予期しない結果につながる可能性があります。
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(
order_by=[Sample.counter, Sample.value],
frame_type=Window.RANGE).alias('rsum'))
for sample in query.order_by(Sample.counter, Sample.value):
print(sample.counter, sample.value, sample.rsum)
# counter value rsum
# 1 10. 10.
# 1 20. 50.
# 1 20. 50.
# 2 1. 52.
# 2 1. 52.
# 2 3. 55.
# 3 100 155.
新しい行を含めると、重複する category
および value
値を持つ行がいくつかできました。RANGE
フレームタイプでは、これらの重複は別々に評価されるのではなく、まとめて評価されます。
より期待される結果は、フレームタイプとして ROWS
を使用することで実現できます。
query = Sample.select(
Sample.counter,
Sample.value,
fn.SUM(Sample.value).over(
order_by=[Sample.counter, Sample.value],
frame_type=Window.ROWS).alias('rsum'))
for sample in query.order_by(Sample.counter, Sample.value):
print(sample.counter, sample.value, sample.rsum)
# counter value rsum
# 1 10. 10.
# 1 20. 30.
# 1 20. 50.
# 2 1. 51.
# 2 1. 52.
# 2 3. 55.
# 3 100 155.
Peewee は、使用するフレームタイプを決定するためにこれらのルールを使用します。
ユーザーが
frame_type
を指定した場合、そのフレームタイプが使用されます。もし
start
またはend
の境界が指定された場合、PeeweeはデフォルトでROWS
を使用します。ユーザーがフレームタイプまたは開始/終了境界を指定しなかった場合、Peeweeはデータベースのデフォルトである
RANGE
を使用します。
Window.GROUPS
フレームタイプは、順序付けの項に基づいて、行のグループの観点からウィンドウ範囲の指定を考慮します。GROUPS
を使用することで、フレームを行の明確なグループをカバーするように定義できます。例を見てみましょう。
query = (Sample
.select(Sample.counter, Sample.value,
fn.SUM(Sample.value).over(
order_by=[Sample.counter, Sample.value],
frame_type=Window.GROUPS,
start=Window.preceding(1)).alias('gsum'))
.order_by(Sample.counter, Sample.value))
for sample in query:
print(sample.counter, sample.value, sample.gsum)
# counter value gsum
# 1 10 10
# 1 20 50
# 1 20 50 (10) + (20+0)
# 2 1 42
# 2 1 42 (20+20) + (1+1)
# 2 3 5 (1+1) + 3
# 3 100 103 (3) + 100
お分かりいただけるように、ウィンドウはその順序付け項である(counter, value)
でグループ化されています。前のグループと現在のグループの間を拡張するウィンドウを見ています。
注
ウィンドウ関数のAPIの詳細については、以下を参照してください。
ウィンドウ関数に関する一般的な情報については、Postgresのウィンドウ関数チュートリアルをお読みください。
さらに、PostgresのドキュメントとSQLiteのドキュメントには、多くの有益な情報が含まれています。
行タプル/辞書/名前付きタプルの取得
Model
が提供するすべてのAPIを必要とせずに、モデルインスタンスを作成するオーバーヘッドを必要とせず、単純に行データを反復処理したい場合があります。これを行うには、以下を使用します。
objects()
– 行タプルで呼び出される任意のコンストラクター関数を受け入れます。
stats = (Stat
.select(Stat.url, fn.Count(Stat.url))
.group_by(Stat.url)
.tuples())
# iterate over a list of 2-tuples containing the url and count
for stat_url, stat_count in stats:
print(stat_url, stat_count)
同様に、dicts()
を使用してカーソルから行を辞書として返すことができます。
stats = (Stat
.select(Stat.url, fn.Count(Stat.url).alias('ct'))
.group_by(Stat.url)
.dicts())
# iterate over a list of 2-tuples containing the url and count
for stat in stats:
print(stat['url'], stat['ct'])
RETURNING句
PostgresqlDatabase
は、UPDATE
、INSERT
、およびDELETE
クエリでRETURNING
句をサポートしています。RETURNING
句を指定すると、クエリによってアクセスされた行を反復処理できます。
デフォルトでは、さまざまなクエリの実行時の戻り値は次のとおりです。
INSERT
- 新しく挿入された行の自動インクリメント主キー値。自動インクリメント主キーを使用しない場合、Postgresは新しい行の主キーを返しますが、SQLiteとMySQLは返しません。UPDATE
- 変更された行数DELETE
- 削除された行数
returning句を使用すると、クエリの実行時の戻り値は反復可能なカーソルオブジェクトになります。
Postgresqlでは、RETURNING
句を使用して、クエリによって挿入または変更された行からデータを返すことができます。
たとえば、登録期限が切れたすべてのユーザーアカウントを非アクティブ化するUpdate
があるとします。それらを非アクティブ化した後、各ユーザーにアカウントが非アクティブ化されたことを知らせるメールを送信したいとします。SELECT
とUPDATE
の2つのクエリを作成するのではなく、RETURNING
句を使用して1つのUPDATE
クエリでこれを行うことができます。
query = (User
.update(is_active=False)
.where(User.registration_expired == True)
.returning(User))
# Send an email to every user that was deactivated.
for deactivate_user in query.execute():
send_deactivation_email(deactivated_user.email)
RETURNING
句は、Insert
とDelete
でも利用可能です。INSERT
で使用すると、新しく作成された行が返されます。DELETE
で使用すると、削除された行が返されます。
RETURNING
句の唯一の制限は、クエリのFROM
句にリストされているテーブルの列のみで構成できることです。特定のテーブルのすべての列を選択するには、単にModel
クラスを渡すことができます。
別の例として、ユーザーを追加し、作成日をサーバーで生成された現在のタイムスタンプに設定しましょう。新しいユーザーのID、メール、作成タイムスタンプを1つのクエリで作成して取得します。
query = (User
.insert(email='foo@bar.com', created=fn.now())
.returning(User)) # Shorthand for all columns on User.
# When using RETURNING, execute() returns a cursor.
cursor = query.execute()
# Get the user object we just inserted and log the data:
user = cursor[0]
logger.info('Created user %s (id=%s) at %s', user.email, user.id, user.created)
デフォルトでは、カーソルはModel
インスタンスを返しますが、異なる行タイプを指定できます。
data = [{'name': 'charlie'}, {'name': 'huey'}, {'name': 'mickey'}]
query = (User
.insert_many(data)
.returning(User.id, User.username)
.dicts())
for new_user in query.execute():
print('Added user "%s", id=%s' % (new_user['username'], new_user['id']))
共通テーブル式
Peeweeは、すべてのタイプのクエリに共通テーブル式(CTE)を含めることをサポートしています。CTEは、以下に役立つ場合があります。
共通のサブクエリを因数分解します。
CTEの結果セットで派生した列でグループ化またはフィルタリングします。
再帰クエリを作成します。
CTEとして使用するSelect
クエリを宣言するには、cte()
メソッドを使用します。これは、クエリをCTE
オブジェクトでラップします。クエリの一部としてCTE
を含める必要があることを示すには、Query.with_cte()
メソッドを使用し、CTEオブジェクトのリストを渡します。
簡単な例
たとえば、キーと浮動小数点値で構成されるデータポイントがあるとします。モデルを定義し、いくつかのテストデータを入力しましょう。
class Sample(Model):
key = TextField()
value = FloatField()
data = (
('a', (1.25, 1.5, 1.75)),
('b', (2.1, 2.3, 2.5, 2.7, 2.9)),
('c', (3.5, 3.5)))
# Populate data.
for key, values in data:
Sample.insert_many([(key, value) for value in values],
fields=[Sample.key, Sample.value]).execute()
CTEを使用して、キーごとに、そのキーの平均を上回る値を計算しましょう。
# First we'll declare the query that will be used as a CTE. This query
# simply determines the average value for each key.
cte = (Sample
.select(Sample.key, fn.AVG(Sample.value).alias('avg_value'))
.group_by(Sample.key)
.cte('key_avgs', columns=('key', 'avg_value')))
# Now we'll query the sample table, using our CTE to find rows whose value
# exceeds the average for the given key. We'll calculate how far above the
# average the given sample's value is, as well.
query = (Sample
.select(Sample.key, Sample.value)
.join(cte, on=(Sample.key == cte.c.key))
.where(Sample.value > cte.c.avg_value)
.order_by(Sample.value)
.with_cte(cte))
クエリによって返されたサンプルを反復処理して、どのサンプルが与えられたグループの平均を上回る値を持っていたかを確認できます。
>>> for sample in query:
... print(sample.key, sample.value)
# 'a', 1.75
# 'b', 2.7
# 'b', 2.9
複雑な例
より完全な例として、トップセールス地域のみの製品ごとの売上合計を見つけるために複数のCTEを使用する次のクエリについて考えてみましょう。モデルは次のようになります。
class Order(Model):
region = TextField()
amount = FloatField()
product = TextField()
quantity = IntegerField()
SQLでクエリを記述する方法を次に示します。この例は、postgresqlのドキュメントにあります。
WITH regional_sales AS (
SELECT region, SUM(amount) AS total_sales
FROM orders
GROUP BY region
), top_regions AS (
SELECT region
FROM regional_sales
WHERE total_sales > (SELECT SUM(total_sales) / 10 FROM regional_sales)
)
SELECT region,
product,
SUM(quantity) AS product_units,
SUM(amount) AS product_sales
FROM orders
WHERE region IN (SELECT region FROM top_regions)
GROUP BY region, product;
Peeweeでは、次のように記述します。
reg_sales = (Order
.select(Order.region,
fn.SUM(Order.amount).alias('total_sales'))
.group_by(Order.region)
.cte('regional_sales'))
top_regions = (reg_sales
.select(reg_sales.c.region)
.where(reg_sales.c.total_sales > (
reg_sales.select(fn.SUM(reg_sales.c.total_sales) / 10)))
.cte('top_regions'))
query = (Order
.select(Order.region,
Order.product,
fn.SUM(Order.quantity).alias('product_units'),
fn.SUM(Order.amount).alias('product_sales'))
.where(Order.region.in_(top_regions.select(top_regions.c.region)))
.group_by(Order.region, Order.product)
.with_cte(reg_sales, top_regions))
再帰CTE
Peeweeは再帰CTEをサポートしています。たとえば、親リンクの外部キーで表されるツリーデータ構造がある場合、再帰CTEは役立ちます。たとえば、オンライン書店のカテゴリの階層があるとします。すべてのカテゴリとその絶対的な深さ、およびルートからカテゴリまでのパスを示すテーブルを生成したいと考えています。
各カテゴリが直近の親カテゴリへの外部キーを持つ、次のモデル定義を想定します。
class Category(Model):
name = TextField()
parent = ForeignKeyField('self', backref='children', null=True)
すべてのカテゴリをその深さと親と共に一覧表示するには、再帰CTEを使用できます。
# Define the base case of our recursive CTE. This will be categories that
# have a null parent foreign-key.
Base = Category.alias()
level = Value(1).alias('level')
path = Base.name.alias('path')
base_case = (Base
.select(Base.id, Base.name, Base.parent, level, path)
.where(Base.parent.is_null())
.cte('base', recursive=True))
# Define the recursive terms.
RTerm = Category.alias()
rlevel = (base_case.c.level + 1).alias('level')
rpath = base_case.c.path.concat('->').concat(RTerm.name).alias('path')
recursive = (RTerm
.select(RTerm.id, RTerm.name, RTerm.parent, rlevel, rpath)
.join(base_case, on=(RTerm.parent == base_case.c.id)))
# The recursive CTE is created by taking the base case and UNION ALL with
# the recursive term.
cte = base_case.union_all(recursive)
# We will now query from the CTE to get the categories, their levels, and
# their paths.
query = (cte
.select_from(cte.c.name, cte.c.level, cte.c.path)
.order_by(cte.c.path))
# We can now iterate over a list of all categories and print their names,
# absolute levels, and path from root -> category.
for category in query:
print(category.name, category.level, category.path)
# Example output:
# root, 1, root
# p1, 2, root->p1
# c1-1, 3, root->p1->c1-1
# c1-2, 3, root->p1->c1-2
# p2, 2, root->p2
# c2-1, 3, root->p2->c2-1
データ変更CTE
Peeweeはデータ変更CTEをサポートしています。
1つのクエリを使用して、あるテーブルからアーカイブテーブルにデータを移動するためのデータ変更CTEの使用例
class Event(Model):
name = CharField()
timestamp = DateTimeField()
class Archive(Model):
name = CharField()
timestamp = DateTimeField()
# Move rows older than 24 hours from the Event table to the Archive.
cte = (Event
.delete()
.where(Event.timestamp < (datetime.now() - timedelta(days=1)))
.returning(Event)
.cte('moved_rows'))
# Create a simple SELECT to get the resulting rows from the CTE.
src = Select((cte,), (cte.c.id, cte.c.name, cte.c.timestamp))
# Insert into the archive table whatever data was returned by the DELETE.
res = (Archive
.insert_from(src, (Archive.id, Archive.name, Archive.timestamp))
.with_cte(cte)
.execute())
上記は、おおまかに次のSQLに対応します。
WITH "moved_rows" AS (
DELETE FROM "event" WHERE ("timestamp" < XXXX-XX-XXTXX:XX:XX)
RETURNING "id", "name", "timestamp")
INSERT INTO "archive" ("id", "name", "timestamp")
SELECT "moved_rows"."id", "moved_rows"."name", "moved_rows"."timestamp"
FROM "moved_rows";
追加の例については、models.py
およびsql.py
のテストを参照してください。
外部キーと結合
このセクションは独自のドキュメントに移動されました:リレーションシップと結合。