Playhouse、Peewee の拡張機能
Peeweeには、playhouse
名前空間にまとめられた多数の拡張モジュールが付属しています。ばかげた名前にもかかわらず、特にSQLite拡張機能やPostgresql拡張機能などのベンダー固有のデータベース機能を公開する、非常に便利な拡張機能がいくつかあります。
以下に、playhouse
を構成するさまざまなモジュールの大まかに整理されたリストを示します。
データベースドライバ/ベンダー固有のデータベース機能
SQLite拡張機能(独自のページ)
高レベルの機能
データベース管理とフレームワークの統合
Sqlite拡張機能
Sqlite拡張機能は、独自のページに移動しました。
SqliteQ
playhouse.sqliteq
モジュールは、SQLiteデータベースへの同時書き込みをシリアル化するSqliteExtDatabase
のサブクラスを提供します。SqliteQueueDatabase
は、複数の**スレッド**からSQLiteデータベースへのシンプルな**読み取りと書き込み**アクセスが必要な場合に、通常のSqliteDatabase
のドロップイン代替として使用できます。
SQLiteは、特定の時間にデータベースに書き込むことができる接続を1つしか許可しません。その結果、データベースへの書き込みが必要なマルチスレッドアプリケーション(たとえば、Webサーバーなど)がある場合、書き込みを試みている1つ以上のスレッドがロックを取得できないときに、エラーが発生する場合があります。
SqliteQueueDatabase
は、すべての書き込みクエリを単一の長寿命接続を介して送信することで、処理を簡略化するように設計されています。利点は、競合やタイムアウトなしに、複数のスレッドがデータベースに書き込んでいるように見えることです。ただし、欠点は、複数のクエリを包含する書き込みトランザクションを発行できないことです。すべての書き込みは、本質的に自動コミットモードで実行されます。
注意
モジュール名は、すべての書き込みクエリがスレッドセーフなキューに入れられるという事実に基づいています。単一のワーカースレッドがキューをリッスンし、そこに送信されたすべてのクエリを実行します。
トランザクション
すべてのクエリはシリアル化され、単一のワーカースレッドによって実行されるため、別のスレッドからのトランザクションSQLが順序どおりに実行されない可能性があります。以下の例では、スレッド「B」によって開始されたトランザクションがスレッド「A」によってロールバックされます(悪い結果になります!)。
スレッドA:UPDATE transplants SET organ='liver'、…;
スレッドB:BEGIN TRANSACTION;
スレッドB:UPDATE life_support_system SET timer += 60 …;
スレッドA:ROLLBACK; – ああ…。
別々のトランザクションからのクエリがインターリーブされる可能性があるため、transaction()
メソッドとatomic()
メソッドはSqliteQueueDatabase
では無効になっています。
別のスレッドから一時的にデータベースに書き込みたい場合は、pause()
メソッドとunpause()
メソッドを使用できます。これらのメソッドは、ライタースレッドが現在のワークロードを終了するまで呼び出し側をブロックします。次に、ライターが切断し、unpause
が呼び出されるまで、呼び出し側が引き継ぎます。
stop()
、start()
、およびis_stopped()
メソッドを使用して、ライタースレッドを制御することもできます。
注意
SQLiteが同時接続をどのように処理するかについての詳細については、SQLiteの分離に関するドキュメントを参照してください。
コード例
データベースインスタンスの作成には、特別な処理は必要ありません。SqliteQueueDatabase
は、注意する必要がある特別なパラメータを受け入れます。 geventを使用している場合は、データベースをインスタンス化するときにuse_gevent=True
を指定する必要があります。これにより、Peeweeはキューイング、スレッド作成、ロックの処理に適切なオブジェクトを使用することを認識します。
from playhouse.sqliteq import SqliteQueueDatabase
db = SqliteQueueDatabase(
'my_app.db',
use_gevent=False, # Use the standard library "threading" module.
autostart=False, # The worker thread now must be started manually.
queue_max_size=64, # Max. # of pending writes that can accumulate.
results_timeout=5.0) # Max. time to wait for query to be executed.
上記の例のようにautostart=False
の場合、実際に書き込みクエリの実行を行うワーカースレッドを起動するには、start()
を呼び出す必要があります。
@app.before_first_request
def _start_worker_threads():
db.start()
SELECTクエリを実行したり、一般的にデータベースにアクセスしたりする場合は、他のデータベースインスタンスと同様に、connect()
とclose()
を呼び出す必要があります。
アプリケーションを終了する準備が整ったら、stop()
メソッドを使用してワーカースレッドをシャットダウンします。バックログがある場合、このメソッドは保留中のすべての作業が完了するまでブロックされます(ただし、新しい作業は許可されません)。
import atexit
@atexit.register
def _stop_worker_threads():
db.stop()
最後に、is_stopped()
メソッドを使用して、データベースライターが起動して実行されているかどうかを判断できます。
Sqliteユーザー定義関数
playhouseモジュールのsqlite_udf
には、便利なユーザー定義関数、集計関数、テーブル値関数が多数含まれています。これらの関数はコレクションにグループ化されており、ユーザー定義の拡張機能を個別に、コレクションごとに、またはすべて登録できます。
スカラー関数は、いくつかのパラメーターを受け取り、単一の値を返す関数です。たとえば、文字列を大文字に変換したり、MD5の16進ダイジェストを計算したりします。
集計関数は、複数のデータの行に対して動作し、単一の結果を生成するスカラー関数のようなものです。たとえば、整数のリストの合計を計算したり、特定の列の最小値を見つけたりします。
テーブル値関数は、複数のデータの行を返すことができる単なる関数です。たとえば、特定の文字列内のすべての一致を返す正規表現検索関数や、2つの日付を受け入れて、その間のすべての日を生成する関数などがあります。
注意
テーブル値関数を使用するには、playhouse._sqlite_ext
C拡張機能をビルドする必要があります。
ユーザー定義関数の登録
db = SqliteDatabase('my_app.db')
# Register *all* functions.
register_all(db)
# Alternatively, you can register individual groups. This will just
# register the DATE and MATH groups of functions.
register_groups(db, 'DATE', 'MATH')
# If you only wish to register, say, the aggregate functions for a
# particular group or groups, you can:
register_aggregate_groups(db, 'DATE')
# If you only wish to register a single function, then you can:
from playhouse.sqlite_udf import gzip, gunzip
db.register_function(gzip, 'gzip')
db.register_function(gunzip, 'gunzip')
ライブラリ関数(「hostname」)の使用
# Assume we have a model, Link, that contains lots of arbitrary URLs.
# We want to discover the most common hosts that have been linked.
query = (Link
.select(fn.hostname(Link.url).alias('host'), fn.COUNT(Link.id))
.group_by(fn.hostname(Link.url))
.order_by(fn.COUNT(Link.id).desc())
.tuples())
# Print the hostname along with number of links associated with it.
for host, count in query:
print('%s: %s' % (host, count))
コレクション名でリストされた関数
スカラー関数は(f)
、集計関数は(a)
、テーブル値関数は(t)
で示されています。
CONTROL_FLOW
- if_then_else(cond, truthy[, falsey=None])
単純な三項演算子のようなもので、
cond
パラメーターの真偽値に応じて、truthy
値またはfalsey
値のいずれかが返されます。
DATE
- strip_tz(date_str)
- パラメーター
date_str – 文字列としてエンコードされたdatetime。
- 戻り値
タイムゾーン情報が削除されたdatetime。
時間は一切調整されず、タイムゾーンが単に削除されるだけです。
- humandelta(nseconds[, glue=', '])
- パラメーター
nseconds (int) – timedeltaの合計秒数。
glue (str) – 値を結合するフラグメント。
- 戻り値
timedeltaを読みやすく説明します。
例:86471 -> 「1 day, 1 minute, 11 seconds」
- mintdiff(datetime_value)
- パラメーター
datetime_value – 日時。
- 戻り値
リスト内の任意の2つの値間の最小差。
任意の2つのdatetime間の最小差を計算する集計関数。
- avgtdiff(datetime_value)
- パラメーター
datetime_value – 日時。
- 戻り値
リスト内の値間の平均差。
リスト内の連続する値間の平均差を計算する集計関数。
- duration(datetime_value)
- パラメーター
datetime_value – 日時。
- 戻り値
リスト内の最小値から最大値までの期間(秒単位)。
リスト内の最小値から最大値までの期間を計算し、秒単位で返す集計関数。
- date_series(start, stop[, step_seconds=86400])
- パラメーター
start (datetime) – 開始日時
stop (datetime) – 終了日時
step_seconds (int) – ステップを構成する秒数。
startからstopまで、一度に
step_seconds
ずつ反復処理することで発生した日付/時刻の値で構成される行を返すテーブル値関数。さらに、startに時刻コンポーネントがなく、step_secondsが1日(86400秒)以上の場合、返される値は日付になります。逆に、startに日付コンポーネントがない場合、値は時間として返されます。それ以外の場合、値はdatetimeとして返されます。
例
SELECT * FROM date_series('2017-01-28', '2017-02-02'); value ----- 2017-01-28 2017-01-29 2017-01-30 2017-01-31 2017-02-01 2017-02-02
FILE
- file_ext(filename)
- パラメーター
filename (str) – 拡張子を抽出するファイル名。
- 戻り値
先頭の「.」を含むファイル拡張子を返します。
- file_read(filename)
- パラメーター
filename (str) – 読み取るファイル名。
- 戻り値
ファイルの内容。
HELPER
- gzip(data[, compression=9])
- パラメーター
data (bytes) – 圧縮するデータ。
compression (int) – 圧縮レベル(9が最大)。
- 戻り値
圧縮されたバイナリデータ。
- gunzip(data)
- パラメーター
data (bytes) – 圧縮されたデータ。
- 戻り値
圧縮されていないバイナリデータ。
- hostname(url)
- パラメーター
url (str) – ホスト名を抽出するURL。
- 戻り値
URLのホスト名部分
- toggle(key)
- パラメーター
key – トグルするキー。
キーをTrue/Falseの状態間で切り替えます。例
>>> toggle('my-key') True >>> toggle('my-key') False >>> toggle('my-key') True
- setting(key[, value=None])
- パラメーター
key – 設定/取得するキー。
value – 設定する値。
- 戻り値
キーに関連付けられた値。
設定をメモリに保存/取得し、アプリケーションの存続期間中に永続化します。現在の値を取得するには、キーのみを指定します。新しい値を設定するには、キーと新しい値を指定して呼び出します。
MATH
- randomrange(start[, stop=None[, step=None]])
- パラメーター
start (int) – 範囲の開始(包括的)
end (int) – 範囲の終了(包括的でない)
step (int) – 値を返す間隔。
[start, end)
の間のランダムな整数を返します。
- gauss_distribution(mean, sigma)
- パラメーター
mean (float) – 平均値
sigma (float) – 標準偏差
- sqrt(n)
n
の平方根を計算します。
- tonumber(s)
- パラメーター
s (str) – 数値に変換する文字列。
- 戻り値
整数、浮動小数点数、または失敗した場合はNULL。
- mode(val)
- パラメーター
val – リスト内の数値。
- 戻り値
最頻値、または最も一般的な数値。
値の最頻値を計算する集計関数。
- minrange(val)
- パラメーター
val – 値
- 戻り値
2つの値間の最小差。
シーケンス内の2つの数値間の最小距離を計算する集計関数。
- avgrange(val)
- パラメーター
val – 値
- 戻り値
値間の平均差。
シーケンス内の2つの連続する数値間の平均距離を計算する集計関数。
- range(val)
- パラメーター
val – 値
- 戻り値
シーケンス内の最小値から最大値までの範囲。
観測された値の範囲を返す集計関数。
- median(val)
- パラメーター
val – 値
- 戻り値
シーケンスの中央値、または中央の値。
シーケンスの中央の値を計算する集計関数。
注意
_sqlite_udf
拡張機能をコンパイルした場合にのみ使用できます。
STRING
- substr_count(haystack, needle)
haystack
内にneedle
が出現する回数を返します。
- strip_chars(haystack, chars)
haystack
の先頭と末尾から、chars
に含まれる任意の文字を取り除きます。
- damerau_levenshtein_dist(s1, s2)
レーベンシュタインアルゴリズムのダメラウ変種を使用して、s1からs2への編集距離を計算します。
注意
_sqlite_udf
拡張機能をコンパイルした場合にのみ使用できます。
- levenshtein_dist(s1, s2)
レーベンシュタインアルゴリズムを使用して、s1からs2への編集距離を計算します。
注意
_sqlite_udf
拡張機能をコンパイルした場合にのみ使用できます。
- str_dist(s1, s2)
標準ライブラリのSequenceMatcherのアルゴリズムを使用して、s1からs2への編集距離を計算します。
注意
_sqlite_udf
拡張機能をコンパイルした場合にのみ使用できます。
- regex_search(regex, search_string)
- パラメーター
regex (str) – 正規表現
search_string (str) – 正規表現のインスタンスを検索する文字列。
指定された
regex
に一致するサブストリングを文字列内で検索するテーブル値関数です。見つかった一致ごとに1行を返します。例
SELECT * FROM regex_search('\w+', 'extract words, ignore! symbols'); value ----- extract words ignore symbols
apsw, 高度なsqliteドライバ
apsw_ext
モジュールには、apsw sqliteドライバで使用するのに適したデータベースクラスが含まれています。
APSWプロジェクトページ: https://github.com/rogerbinns/apsw
APSWは、SQLiteのCインターフェースの上に薄いラッパーを提供する非常に優れたライブラリであり、SQLiteのすべての高度な機能を使用できるようにします。
ドキュメントから引用した、APSWを使用するほんの一部の理由を以下に示します。
APSWは、仮想テーブル、仮想ファイルシステム、BLOB I/O、バックアップ、ファイル制御など、SQLiteのすべての機能を提供します。
接続は、追加のロックなしでスレッド間で共有できます。
トランザクションはコードによって明示的に管理されます。
APSWはネストされたトランザクションを処理できます。
Unicodeは正しく処理されます。
APSWは高速です。
apswとpysqliteの違いの詳細については、apswドキュメントを確認してください。
APSWDatabaseの使用方法
from apsw_ext import *
db = APSWDatabase(':memory:')
class BaseModel(Model):
class Meta:
database = db
class SomeModel(BaseModel):
col1 = CharField()
col2 = DateTimeField()
apsw_ext APIノート
APSWDatabase
は SqliteExtDatabase
を拡張し、その高度な機能を継承します。
- class APSWDatabase(database, **connect_kwargs)
- パラメーター
database (string) – sqliteデータベースのファイル名
connect_kwargs – 接続を開くときにapswに渡されるキーワード引数
- register_module(mod_name, mod_inst)
モジュールをグローバルに登録する方法を提供します。詳細については、仮想テーブルに関するドキュメントを参照してください。
- パラメーター
mod_name (string) – モジュールに使用する名前
mod_inst (object) – 仮想テーブルインターフェースを実装するオブジェクト
- unregister_module(mod_name)
モジュールを登録解除します。
- パラメーター
mod_name (string) – モジュールに使用する名前
注意
ストレージ用にデータ型を適切に調整するため、apsw_ext
モジュールで定義された Field
サブクラスを使用してください。
たとえば、peewee.DateTimeField
を使用する代わりに、必ず playhouse.apsw_ext.DateTimeField
をインポートして使用してください。
Sqlcipherバックエンド
注意
この拡張機能のコードは短いですが、まだ適切にピアレビューされておらず、脆弱性が導入されている可能性があります。
また、このコードは sqlcipher3 (pythonバインディング)と sqlcipher に依存しており、そちらのコードにも脆弱性がある可能性がありますが、これらは広く使用されている暗号化モジュールであるため、そこでは「短いゼロデイ」が予想できます。
sqlcipher_ext APIノート
- class SqlCipherDatabase(database, passphrase, **kwargs)
SqliteDatabase
のサブクラスで、データベースを暗号化して保存します。標準のsqlite3
バックエンドの代わりに、sqlcipher3 を使用します。これは、sqlcipher のpythonラッパーであり、これはsqlite3
の暗号化ラッパーであるため、APIはSqliteDatabase
の同一です。ただし、オブジェクトの構築パラメーターは異なります。- パラメーター
database – 暗号化されたデータベースを開く[または作成する]ファイルへのパス。
passphrase – データベースの暗号化パスフレーズ:少なくとも8文字以上にする必要がありますが、実装ではより適切なパスフレーズの強度基準を適用することを強くお勧めします。
database
ファイルが存在しない場合は、passhprase
から派生したキーで暗号化されて作成されます。既存のデータベースを開こうとすると、
passhprase
は作成時に使用したものと同一である必要があります。パスフレーズが正しくないと、データベースに最初にアクセスしようとしたときにエラーが発生します。
- rekey(passphrase)
- パラメーター
passphrase (str) – データベースの新しいパスフレーズ。
データベースのパスフレーズを変更します。
注意
SQLCipherは、多数の拡張PRAGMAを使用して構成できます。 PRAGMAとその説明のリストは、SQLCipherドキュメントにあります。
たとえば、キー導出のためのPBKDF2反復回数(SQLCipher 3.xでは64K、SQLCipher 4.xではデフォルトで256K)を指定するには、
# Use 1,000,000 iterations.
db = SqlCipherDatabase('my_app.db', pragmas={'kdf_iter': 1000000})
16KBの暗号ページサイズと10,000ページのキャッシュサイズを使用するには、
db = SqlCipherDatabase('my_app.db', passphrase='secret!!!', pragmas={
'cipher_page_size': 1024 * 16,
'cache_size': 10000}) # 10,000 16KB pages, or 160MB.
パスフレーズをユーザーに求める例
db = SqlCipherDatabase(None)
class BaseModel(Model):
"""Parent for all app's models"""
class Meta:
# We won't have a valid db until user enters passhrase.
database = db
# Derive our model subclasses
class Person(BaseModel):
name = TextField(primary_key=True)
right_passphrase = False
while not right_passphrase:
db.init(
'testsqlcipher.db',
passphrase=get_passphrase_from_user())
try: # Actually execute a query against the db to test passphrase.
db.get_tables()
except DatabaseError as exc:
# This error indicates the password was wrong.
if exc.args[0] == 'file is encrypted or is not a database':
tell_user_the_passphrase_was_wrong()
db.init(None) # Reset the db.
else:
raise exc
else:
# The password was correct.
right_passphrase = True
以下も参照してください:もう少し詳しい例。
Postgresql拡張機能
postgresql拡張モジュールは、現在、多数の「postgresのみ」の機能を提供します。
jsonサポート、Postgres 9.4用のjsonbを含む。
配列を格納するための
ArrayField
フィールドタイプ。キー/値ペアを格納するための
HStoreField
フィールドタイプ。timedelta
オブジェクトを格納するためのIntervalField
フィールドタイプ。JSONデータを格納するための
JSONField
フィールドタイプ。jsonb
JSONデータタイプ用のBinaryJSONField
フィールドタイプ。全文検索データを格納するための
TSVectorField
フィールドタイプ。DateTimeTZField
フィールド型。タイムゾーン対応の日付時刻フィールドです。
将来的には、PostgreSQLのより多くの機能へのサポートを追加したいと考えています。追加してほしい特定の機能があれば、Github issueを開いてください。
警告
以下に説明する機能を使用するには、PostgresqlDatabase
の代わりに、拡張機能である PostgresqlExtDatabase
クラスを使用する必要があります。
以下のコードは、次のデータベースとベースモデルを使用していることを前提としています。
from playhouse.postgres_ext import *
ext_db = PostgresqlExtDatabase('peewee_test', user='postgres')
class BaseExtModel(Model):
class Meta:
database = ext_db
JSONサポート
peeweeは、PostgresのネイティブJSONデータ型を、JSONField
の形で基本的なサポートをしています。バージョン2.4.7からは、peeweeはPostgres 9.4のバイナリJSON jsonb
型も、BinaryJSONField
を介してサポートしています。
警告
Postgresは9.2(9.3で完全サポート)以降、ネイティブにJSONデータ型をサポートしています。この機能を使用するには、psycopg2 バージョン2.5以上で、正しいPostgresのバージョンを使用する必要があります。
パフォーマンスとクエリに多くの利点がある BinaryJSONField
を使用するには、Postgres 9.4以降が必要です。
注意
JSONField を使用するには、データベースが PostgresqlExtDatabase
のインスタンスであることを確認する必要があります。
JSONフィールドを持つモデルを宣言する方法の例を以下に示します。
import json
import urllib2
from playhouse.postgres_ext import *
db = PostgresqlExtDatabase('my_database')
class APIResponse(Model):
url = CharField()
response = JSONField()
class Meta:
database = db
@classmethod
def request(cls, url):
fh = urllib2.urlopen(url)
return cls.create(url=url, response=json.loads(fh.read()))
APIResponse.create_table()
# Store a JSON response.
offense = APIResponse.request('http://crime-api.com/api/offense/')
booking = APIResponse.request('http://crime-api.com/api/booking/')
# Query a JSON data structure using a nested key lookup:
offense_responses = APIResponse.select().where(
APIResponse.response['meta']['model'] == 'offense')
# Retrieve a sub-key for each APIResponse. By calling .as_json(), the
# data at the sub-key will be returned as Python objects (dicts, lists,
# etc) instead of serialized JSON.
q = (APIResponse
.select(
APIResponse.data['booking']['person'].as_json().alias('person'))
.where(APIResponse.data['meta']['model'] == 'booking'))
for result in q:
print(result.person['name'], result.person['dob'])
BinaryJSONField
は、通常の JSONField
と同じように動作し、同じ操作をサポートしますが、**包含**をテストするための追加の操作をいくつか提供します。バイナリJSONフィールドを使用すると、JSONデータが他の部分的なJSON構造を含むかどうか(contains()
, contains_any()
, contains_all()
)、またはより大きなJSONドキュメントのサブセットであるかどうか(contained_by()
)をテストできます。
詳細な例については、以下の JSONField
および BinaryJSONField
のAPIドキュメントを参照してください。
hstoreサポート
Postgresql hstore は、組み込みのキー/バリューストアです。hstoreを使用すると、構造化されたリレーショナルデータと一緒に、任意のキー/バリューペアをデータベースに保存できます。
hstore
を使用するには、PostgresqlExtDatabase
をインスタンス化するときに追加のパラメータを指定する必要があります。
# Specify "register_hstore=True":
db = PostgresqlExtDatabase('my_db', register_hstore=True)
現在、postgres_ext
モジュールは、次の操作をサポートしています。
任意の辞書を保存および取得する
キーまたは部分的な辞書でフィルタリングする
既存の辞書に1つ以上のキーを更新/追加する
既存の辞書から1つ以上のキーを削除する
キー、値、またはキーと値をzipで選択する
キー/値のスライスを取得する
キーの存在をテストする
キーが非NULLの値を持っていることをテストする
hstoreの使用
まず、カスタムデータベースクラスと、playhouse.postgres_ext
からのhstore関数をインポートする必要があります(上記のコードスニペットを参照)。次に、モデルに HStoreField
を追加するだけです。
class House(BaseExtModel):
address = CharField()
features = HStoreField()
House
インスタンスに、任意のキー/バリューペアを保存できるようになりました。
>>> h = House.create(
... address='123 Main St',
... features={'garage': '2 cars', 'bath': '2 bath'})
...
>>> h_from_db = House.get(House.id == h.id)
>>> h_from_db.features
{'bath': '2 bath', 'garage': '2 cars'}
個々のキー、複数のキー、または部分的な辞書でフィルタリングできます。
>>> query = House.select()
>>> garage = query.where(House.features.contains('garage'))
>>> garage_and_bath = query.where(House.features.contains(['garage', 'bath']))
>>> twocar = query.where(House.features.contains({'garage': '2 cars'}))
家の原子的な更新を実行したいとします。
>>> new_features = House.features.update({'bath': '2.5 bath', 'sqft': '1100'})
>>> query = House.update(features=new_features)
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'bath': '2.5 bath', 'garage': '2 cars', 'sqft': '1100'}
または、代わりに原子的な削除を実行できます。
>>> query = House.update(features=House.features.delete('bath'))
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'garage': '2 cars', 'sqft': '1100'}
複数のキーを同時に削除できます。
>>> query = House.update(features=House.features.delete('garage', 'sqft'))
キーのみ、値のみを選択したり、2つをzipしたりできます。
>>> for h in House.select(House.address, House.features.keys().alias('keys')):
... print(h.address, h.keys)
123 Main St [u'bath', u'garage']
>>> for h in House.select(House.address, House.features.values().alias('vals')):
... print(h.address, h.vals)
123 Main St [u'2 bath', u'2 cars']
>>> for h in House.select(House.address, House.features.items().alias('mtx')):
... print(h.address, h.mtx)
123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
データのスライス、たとえば、すべてのガレージデータを取得できます。
>>> query = House.select(House.address, House.features.slice('garage').alias('garage_data'))
>>> for house in query:
... print(house.address, house.garage_data)
123 Main St {'garage': '2 cars'}
キーの存在を確認し、それに応じて行をフィルタリングできます。
>>> has_garage = House.features.exists('garage')
>>> for house in House.select(House.address, has_garage.alias('has_garage')):
... print(house.address, house.has_garage)
123 Main St True
>>> for house in House.select().where(House.features.exists('garage')):
... print(house.address, house.features['garage']) # <-- just houses w/garage data
123 Main St 2 cars
Intervalサポート
Postgresは、INTERVAL
データ型(ドキュメント)を通じて期間をサポートします。
- class IntervalField([null=False[, ...]])
Pythonの
datetime.timedelta
インスタンスを格納できるフィールドクラス。例
from datetime import timedelta from playhouse.postgres_ext import * db = PostgresqlExtDatabase('my_db') class Event(Model): location = CharField() duration = IntervalField() start_time = DateTimeField() class Meta: database = db @classmethod def get_long_meetings(cls): return cls.select().where(cls.duration > timedelta(hours=1))
サーバーサイドカーソル
psycopg2がクエリを実行すると、通常、すべての結果がフェッチされ、バックエンドからクライアントに返されます。これにより、大きなクエリを作成するときに、アプリケーションが大量のメモリを使用する可能性があります。サーバーサイドカーソルを使用すると、結果は少しずつ(デフォルトでは2000レコード)返されます。決定版のリファレンスについては、psycopg2ドキュメントを参照してください。
注意
サーバーサイド(または名前付き)カーソルを使用するには、PostgresqlExtDatabase
を使用する必要があります。
サーバーサイドカーソルを使用してクエリを実行するには、ServerSide()
ヘルパーを使用して、selectクエリをラップするだけです。
large_query = PageView.select() # Build query normally.
# Iterate over large query inside a transaction.
for page_view in ServerSide(large_query):
# do some interesting analysis here.
pass
# Server-side resources are released.
すべての SELECT
クエリでサーバーサイドカーソルを自動的に使用する場合は、PostgresqlExtDatabase
を作成するときにこれを指定できます。
from postgres_ext import PostgresqlExtDatabase
ss_db = PostgresqlExtDatabase('my_db', server_side_cursors=True)
注意
サーバーサイドカーソルはトランザクションが存在する限りしか存在しないため、peeweeは SELECT
クエリを実行した後に自動的に commit()
を呼び出しません。反復処理が完了した後に commit
しない場合、接続が閉じられる(または後でトランザクションがコミットされる)まで、サーバーサイドのリソースは解放されません。さらに、peeweeはデフォルトでカーソルによって返された行をキャッシュするため、大きなクエリを反復処理するときは常に .iterator()
を呼び出す必要があります。
ServerSide()
ヘルパーを使用している場合、トランザクションと iterator()
の呼び出しは透過的に処理されます。
全文検索
Postgresqlは、特別なデータ型(tsvector
および tsquery
)を使用して、高度な全文検索を提供します。ドキュメントは tsvector
型に格納または変換する必要があり、検索クエリは tsquery
に変換する必要があります。
簡単なケースでは、Match()
関数を使用するだけで、適切な変換が自動的に実行され、スキーマの変更は不要です。
def blog_search(search_term):
return Blog.select().where(
(Blog.status == Blog.STATUS_PUBLISHED) &
Match(Blog.content, search_term))
Match()
関数は、左側のオペランドを tsvector
に、右側のオペランドを tsquery
に自動的に変換します。パフォーマンスを向上させるには、検索する列に GIN
インデックスを作成することをお勧めします。
CREATE INDEX blog_full_text_search ON blog USING gin(to_tsvector(content));
または、TSVectorField
を使用して、tsvector
データを格納するための専用の列を維持できます。
class Blog(Model):
content = TextField()
search_content = TSVectorField()
注意
TSVectorField
は、GINインデックスを使用して自動的に作成されます。
search_content
フィールドを挿入または更新するときは、入ってくるテキストデータを明示的に tsvector
に変換する必要があります。
content = 'Excellent blog post about peewee ORM.'
blog_entry = Blog.create(
content=content,
search_content=fn.to_tsvector(content))
全文検索を実行するには、TSVectorField.match()
を使用します。
terms = 'python & (sqlite | postgres)'
results = Blog.select().where(Blog.search_content.match(terms))
詳細については、Postgresの全文検索ドキュメントを参照してください。
postgres_ext APIノート
- class PostgresqlExtDatabase(database[, server_side_cursors=False[, register_hstore=False[, ...]]])
PostgresqlDatabase
と同一ですが、以下のサポートのために必要となります。- パラメーター
database (str) – 接続先のデータベース名。
server_side_cursors (bool) –
SELECT
クエリがサーバーサイドカーソルを利用するかどうか。register_hstore (bool) – HStore拡張機能を接続に登録するかどうか。
HStore拡張機能を使用したい場合は、
register_hstore=True
を指定する必要があります。server_side_cursors
を使用する場合は、クエリをServerSide()
でラップするようにしてください。
- ServerSide(select_query)
- パラメーター
select_query –
SelectQuery
インスタンス。- Rtype ジェネレーター
指定された select クエリをトランザクションでラップし、行インスタンスのキャッシュを避けるために
iterator()
メソッドを呼び出します。サーバーサイドのリソースを解放するためには、ジェネレーターを使い切る(すべての行を反復処理する)必要があります。使い方
large_query = PageView.select() for page_view in ServerSide(large_query): # Do something interesting. pass # At this point server side resources are released.
- class ArrayField([field_class=IntegerField[, field_kwargs=None[, dimensions=1[, convert_values=False]]]])
- パラメーター
field_class –
Field
のサブクラス。例:IntegerField
。field_kwargs (dict) –
field_class
を初期化するための引数。dimensions (int) – 配列の次元数。
convert_values (bool) – 配列データに
field_class
の値変換を適用するかどうか。
提供された field_class の配列を格納できるフィールド。
注意
デフォルトでは、ArrayField は GIN インデックスを使用します。これを無効にするには、
index=False
を指定してフィールドを初期化します。リスト(またはリストのリスト)を格納および取得できます。
class BlogPost(BaseModel): content = TextField() tags = ArrayField(CharField) post = BlogPost(content='awesome', tags=['foo', 'bar', 'baz'])
さらに、データベース内の値やスライスをクエリするために
__getitem__
API を使用できます。# Get the first tag on a given blog post. first_tag = (BlogPost .select(BlogPost.tags[0].alias('first_tag')) .where(BlogPost.id == 1) .dicts() .get()) # first_tag = {'first_tag': 'foo'}
値のスライスを取得
# Get the first two tags. two_tags = (BlogPost .select(BlogPost.tags[:2].alias('two')) .dicts() .get()) # two_tags = {'two': ['foo', 'bar']}
- contains(*items)
- パラメーター
items – 指定された配列フィールドに存在する必要がある1つ以上の項目。
# Get all blog posts that are tagged with both "python" and "django". Blog.select().where(Blog.tags.contains('python', 'django'))
- contains_any(*items)
- パラメーター
items – 指定された配列フィールド内で検索する1つ以上の項目。
contains()
と同様ですが、配列に指定された項目のいずれかが含まれる行に一致します。# Get all blog posts that are tagged with "flask" and/or "django". Blog.select().where(Blog.tags.contains_any('flask', 'django'))
- class DateTimeTZField(*args, **kwargs)
DateTimeField
のタイムゾーン対応サブクラス。
- class HStoreField(*args, **kwargs)
任意のキー/値ペアを格納および取得するためのフィールド。使い方の詳細については、hstore サポートを参照してください。
注意
HStoreField
を使用するには、hstore 拡張機能が接続に登録されていることを確認する必要があります。これを実現するには、PostgresqlExtDatabase
をregister_hstore=True
でインスタンス化します。注意
デフォルトでは、
HStoreField
は GiST インデックスを使用します。これを無効にするには、index=False
を指定してフィールドを初期化します。- keys()
指定された行のキーを返します。
>>> for h in House.select(House.address, House.features.keys().alias('keys')): ... print(h.address, h.keys) 123 Main St [u'bath', u'garage']
- values()
指定された行の値を返します。
>>> for h in House.select(House.address, House.features.values().alias('vals')): ... print(h.address, h.vals) 123 Main St [u'2 bath', u'2 cars']
- items()
Python の
dict
のように、キーと値をリストのリストで返します。>>> for h in House.select(House.address, House.features.items().alias('mtx')): ... print(h.address, h.mtx) 123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
- slice(*args)
キーのリストが与えられたとき、データのスライスを返します。
>>> for h in House.select(House.address, House.features.slice('garage').alias('garage_data')): ... print(h.address, h.garage_data) 123 Main St {'garage': '2 cars'}
- exists(key)
指定されたキーが存在するかどうかをクエリします。
>>> for h in House.select(House.address, House.features.exists('garage').alias('has_garage')): ... print(h.address, h.has_garage) 123 Main St True >>> for h in House.select().where(House.features.exists('garage')): ... print(h.address, h.features['garage']) # <-- just houses w/garage data 123 Main St 2 cars
- defined(key)
指定されたキーに値が関連付けられているかどうかをクエリします。
- update(**data)
指定された行または複数行のキー/値をアトミックに更新します。
>>> query = House.update(features=House.features.update( ... sqft=2000, ... year_built=2012)) >>> query.where(House.id == 1).execute()
- delete(*keys)
指定された行または複数行の指定されたキーを削除します。
注意
UPDATE
クエリを使用します。>>> query = House.update(features=House.features.delete( ... 'sqft', 'year_built')) >>> query.where(House.id == 1).execute()
- contains(value)
- パラメーター
value –
dict
、キーのlist
、または単一のキーのいずれか。
以下のいずれかの存在について行をクエリします。
部分的な辞書。
キーのリスト。
単一のキー。
>>> query = House.select() >>> has_garage = query.where(House.features.contains('garage')) >>> garage_bath = query.where(House.features.contains(['garage', 'bath'])) >>> twocar = query.where(House.features.contains({'garage': '2 cars'}))
- contains_any(*keys)
- パラメーター
keys – 検索する1つ以上のキー。
いずれかのキーの存在について行をクエリします。
- class JSONField(dumps=None, *args, **kwargs)
- パラメーター
dumps – デフォルトでは、json.dumps() または dumps 関数が呼び出されます。このメソッドをオーバーライドして、カスタマイズされた JSON ラッパーを作成できます。
任意のJSONを格納およびクエリするのに適したフィールドクラス。モデルでこれを使用する場合、フィールドの値をPythonオブジェクト(
dict
またはlist
)に設定します。データベースから値を取得すると、Pythonデータ構造として返されます。注意
Postgres 9.2 / psycopg2 2.5 以降を使用している必要があります。
注意
Postgres 9.4 を使用している場合は、パフォーマンスが向上し、より強力なクエリ オプションを提供する
BinaryJSONField
の使用を強く検討してください。モデル宣言の例
db = PostgresqlExtDatabase('my_db') class APIResponse(Model): url = CharField() response = JSONField() class Meta: database = db
JSON データの保存例
url = 'http://foo.com/api/resource/' resp = json.loads(urllib2.urlopen(url).read()) APIResponse.create(url=url, response=resp) APIResponse.create(url='http://foo.com/baz/', response={'key': 'value'})
クエリには、ネストされたキーまたは配列の検索を指定するために、Python の
[]
演算子を使用します。APIResponse.select().where( APIResponse.response['key1']['nested-key'] == 'some-value')
[]
演算子の使用例として、APIResponse
に次のデータが保存されていると想像してください。{ "foo": { "bar": ["i1", "i2", "i3"], "baz": { "huey": "mickey", "peewee": "nugget" } } }
いくつかのクエリの結果を以下に示します。
def get_data(expression): # Helper function to just retrieve the results of a # particular expression. query = (APIResponse .select(expression.alias('my_data')) .dicts() .get()) return query['my_data'] # Accessing the foo -> bar subkey will return a JSON # representation of the list. get_data(APIResponse.data['foo']['bar']) # '["i1", "i2", "i3"]' # In order to retrieve this list as a Python list, # we will call .as_json() on the expression. get_data(APIResponse.data['foo']['bar'].as_json()) # ['i1', 'i2', 'i3'] # Similarly, accessing the foo -> baz subkey will # return a JSON representation of the dictionary. get_data(APIResponse.data['foo']['baz']) # '{"huey": "mickey", "peewee": "nugget"}' # Again, calling .as_json() will return an actual # python dictionary. get_data(APIResponse.data['foo']['baz'].as_json()) # {'huey': 'mickey', 'peewee': 'nugget'} # When dealing with simple values, either way works as # you expect. get_data(APIResponse.data['foo']['bar'][0]) # 'i1' # Calling .as_json() when the result is a simple value # will return the same thing as the previous example. get_data(APIResponse.data['foo']['bar'][0].as_json()) # 'i1'
- class BinaryJSONField(dumps=None, *args, **kwargs)
- パラメーター
dumps – デフォルトでは、json.dumps() または dumps 関数が呼び出されます。このメソッドをオーバーライドして、カスタマイズされた JSON ラッパーを作成できます。
任意の JSON ドキュメントを保存およびクエリします。データは通常の Python
dict
およびlist
オブジェクトを使用して保存する必要があり、データベースからデータが返されるときもdict
およびlist
を使用して返されます。基本的なクエリ操作の例については、
JSONField
の上記のコードサンプルを参照してください。以下のクエリ例では、上記で説明した同じAPIResponse
モデルを使用します。注意
デフォルトでは、BinaryJSONField は GiST インデックスを使用します。これを無効にするには、フィールドを
index=False
で初期化します。注意
Postgres 9.4 / psycopg2 2.5 以降を使用している必要があります。Postgres 9.2 または 9.3 を使用している場合は、代わりに通常の
JSONField
を使用できます。- contains(other)
指定された JSON データに、指定された JSON フラグメントまたはキーが含まれているかどうかをテストします。
例
search_fragment = { 'foo': {'bar': ['i2']} } query = (APIResponse .select() .where(APIResponse.data.contains(search_fragment))) # If we're searching for a list, the list items do not need to # be ordered in a particular way: query = (APIResponse .select() .where(APIResponse.data.contains({ 'foo': {'bar': ['i2', 'i1']}})))
シンプルなキーを渡すこともできます。トップレベルにキー
foo
を含む APIResponse を検索するには、次のようにします。APIResponse.select().where(APIResponse.data.contains('foo'))
角かっこを使用してサブキーを検索することもできます。
APIResponse.select().where( APIResponse.data['foo']['bar'].contains(['i2', 'i1']))
- contains_any(*items)
指定された 1 つ以上の項目の存在を検索します。
APIResponse.select().where( APIResponse.data.contains_any('foo', 'baz', 'nugget'))
contains()
と同様に、サブキーを検索することもできます。APIResponse.select().where( APIResponse.data['foo']['bar'].contains_any('i2', 'ix'))
- contains_all(*items)
指定されたすべての項目の存在を検索します。
APIResponse.select().where( APIResponse.data.contains_all('foo'))
contains_any()
と同様に、サブキーを検索することもできます。APIResponse.select().where( APIResponse.data['foo']['bar'].contains_all('i1', 'i2', 'i3'))
- contained_by(other)
指定された JSON ドキュメントが、指定された JSON ドキュメントに含まれている(サブセットである)かどうかをテストします。このメソッドは、
contains()
の逆です。big_doc = { 'foo': { 'bar': ['i1', 'i2', 'i3'], 'baz': { 'huey': 'mickey', 'peewee': 'nugget', } }, 'other_key': ['nugget', 'bear', 'kitten'], } APIResponse.select().where( APIResponse.data.contained_by(big_doc))
- concat(data)
2 つのフィールドデータと提供されたデータを連結します。この操作は、マージや「ディープ連結」を実行しないことに注意してください。
- has_key(key)
JSON オブジェクトのトップレベルにキーが存在するかどうかをテストします。
- remove(*keys)
JSON オブジェクトのトップレベルから 1 つ以上のキーを削除します。
- Match(field, query)
フルテキスト検索式を生成し、左側のオペランドを
tsvector
に、右側のオペランドをtsquery
に自動的に変換します。例
def blog_search(search_term): return Blog.select().where( (Blog.status == Blog.STATUS_PUBLISHED) & Match(Blog.content, search_term))
- class TSVectorField
tsvector
データを保存するのに適したフィールドタイプ。このフィールドは、検索パフォーマンスを向上させるために、自動的にGIN
インデックスで作成されます。注意
このフィールドに格納されたデータは、
tsvector
タイプに手動で変換する必要があります。注意
デフォルトでは、TSVectorField は GIN インデックスを使用します。これを無効にするには、フィールドを
index=False
で初期化します。使用例
class Blog(Model): content = TextField() search_content = TSVectorField() content = 'this is a sample blog entry.' blog_entry = Blog.create( content=content, search_content=fn.to_tsvector(content)) # Note `to_tsvector()`.
- match(query[, language=None[, plain=False]])
- パラメーター
query (str) – フルテキスト検索クエリ。
language (str) – 言語名(オプション)。
plain (bool) – プレーン(シンプル)パーサーを使用して検索クエリを解析します。
- 戻り値
フルテキスト検索/一致を表す式。
例
# Perform a search using the "match" method. terms = 'python & (sqlite | postgres)' results = Blog.select().where(Blog.search_content.match(terms))
Cockroach Database
CockroachDB (CRDB) は peewee で十分にサポートされています。
from playhouse.cockroachdb import CockroachDatabase
db = CockroachDatabase('my_app', user='root', host='10.1.0.8')
Cockroach Cloud を使用している場合は、接続文字列を使用して接続パラメータを指定する方が簡単な場合があります。
db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')
注意
CockroachDB には、psycopg2
(postgres) Python ドライバーが必要です。
注意
CockroachDB のインストールと入門ガイドはこちらにあります: https://www.cockroachlabs.com/docs/stable/install-cockroachdb.html
SSL 構成
Cockroach クラスターを実行する場合は、SSL 証明書を強くお勧めします。Psycopg2 は SSL をすぐにサポートしますが、データベースを初期化するときにいくつかの追加オプションを指定する必要がある場合があります。
db = CockroachDatabase(
'my_app',
user='root',
host='10.1.0.8',
sslmode='verify-full', # Verify the cert common-name.
sslrootcert='/path/to/root.crt')
# Or, alternatively, specified as part of a connection-string:
db = CockroachDatabase('postgresql://root:secret@host:26257/dbname'
'?sslmode=verify-full&sslrootcert=/path/to/root.crt'
'&options=--cluster=my-cluster-xyz')
クライアント検証の詳細については、libpq ドキュメントを参照してください。
Cockroach 拡張 API
playhouse.cockroachdb
拡張モジュールは、次のクラスとヘルパーを提供します。
CockroachDatabase
- CRDB での作業に特化して設計されたPostgresqlDatabase
のサブクラス。PooledCockroachDatabase
- 上記と同様ですが、接続プーリングを実装します。run_transaction()
- トランザクション内で関数を実行し、自動クライアント側再試行ロジックを提供します。
CRDB を使用する場合に役立つ可能性のある特別なフィールドタイプ
UUIDKeyField
- デフォルトでランダムに生成された UUID を使用して、CRDB のUUID
型を使用する主キーフィールドの実装。RowIDField
- デフォルトでunique_rowid()
を使用して、CRDB のINT
型を使用する主キーフィールドの実装。JSONField
- CRDB が JSON を JSONB として扱うため、Postgres のBinaryJSONField
と同じです。ArrayField
- Postgres 拡張機能と同じ(ただし、多次元配列はサポートされていません)。
CRDB は Postgres のワイヤプロトコルと互換性があり、非常に類似した SQL インターフェイスを公開しているため、PostgresqlDatabase
を CRDB で使用することも可能ですが(お勧めしません)。
CRDBはネストされたトランザクション(セーブポイント)をサポートしていないため、
atomic()
メソッドは、CockroachDatabase
を使用する際にこれを強制するように実装されています。詳細については、CRDBトランザクションを参照してください。CRDBは、フィールドタイプ、日付関数、イントロスペクションにおいて、Postgresと微妙な違いがある場合があります。
CRDB固有の機能は、トランザクションの優先度や
AS OF SYSTEM TIME
句の指定など、CockroachDatabase
によって公開されます。
CRDBトランザクション
CRDBはネストされたトランザクション(セーブポイント)をサポートしていないため、atomic()
メソッドは、CockroachDatabase
において、無効なネストが検出された場合に例外を発生するように変更されました。トランザクションコードをネストできるようにしたい場合は、transaction()
メソッドを使用できます。これにより、最も外側のブロックがトランザクションを管理することが保証されます(例:ネストされたブロックを終了しても、早期コミットは発生しません)。
例
@db.transaction()
def create_user(username):
return User.create(username=username)
def some_other_function():
with db.transaction() as txn:
# do some stuff...
# This function is wrapped in a transaction, but the nested
# transaction will be ignored and folded into the outer
# transaction, as we are already in a wrapped-block (via the
# context manager).
create_user('some_user@example.com')
# do other stuff.
# At this point we have exited the outer-most block and the transaction
# will be committed.
return
CRDBはクライアント側のトランザクション再試行を提供しており、特別なrun_transaction()
ヘルパーを使用して利用できます。このヘルパーメソッドは、再試行が必要になる可能性のあるトランザクションステートメントを実行する役割を担う、呼び出し可能なオブジェクトを受け入れます。
run_transaction()
の最も単純な例
def create_user(email):
# Callable that accepts a single argument (the database instance) and
# which is responsible for executing the transactional SQL.
def callback(db_ref):
return User.create(email=email)
return db.run_transaction(callback, max_attempts=10)
huey = create_user('huey@example.com')
注意
指定された試行回数後にトランザクションをコミットできない場合、cockroachdb.ExceededMaxAttempts
例外が発生します。SQLが不正な形式の場合、制約に違反する場合などは、関数は呼び出し元に例外を発生させます。
あるアカウントから別のアカウントに金額を移動するトランザクションに対して、クライアント側の再試行を実装するためのrun_transaction()
の使用例
from playhouse.cockroachdb import CockroachDatabase
db = CockroachDatabase('my_app')
def transfer_funds(from_id, to_id, amt):
"""
Returns a 3-tuple of (success?, from balance, to balance). If there are
not sufficient funds, then the original balances are returned.
"""
def thunk(db_ref):
src, dest = (Account
.select()
.where(Account.id.in_([from_id, to_id])))
if src.id != from_id:
src, dest = dest, src # Swap order.
# Cannot perform transfer, insufficient funds!
if src.balance < amt:
return False, src.balance, dest.balance
# Update each account, returning the new balance.
src, = (Account
.update(balance=Account.balance - amt)
.where(Account.id == from_id)
.returning(Account.balance)
.execute())
dest, = (Account
.update(balance=Account.balance + amt)
.where(Account.id == to_id)
.returning(Account.balance)
.execute())
return True, src.balance, dest.balance
# Perform the queries that comprise a logical transaction. In the
# event the transaction fails due to contention, it will be auto-
# matically retried (up to 10 times).
return db.run_transaction(thunk, max_attempts=10)
CRDB API
- class CockroachDatabase(database[, **kwargs])
PostgresqlDatabase
に基づいており、psycopg2
ドライバーを使用する、CockroachDBの実装。追加のキーワード引数は、psycopg2接続コンストラクタに渡され、データベースの
user
、port
などを指定するために使用できます。または、接続詳細はURL形式で指定できます。
- run_transaction(callback[, max_attempts=None[, system_time=None[, priority=None]]])
- パラメーター
callback – 単一の
db
パラメータ(このメソッドが呼び出されたデータベースインスタンスになります)を受け入れる、呼び出し可能なオブジェクト。max_attempts (int) – 諦めるまでに試行する最大回数。
system_time (datetime) – 指定された値を基準として、トランザクションを
AS OF SYSTEM TIME
で実行します。priority (str) – "low"、"normal"、または "high" のいずれか。
- 戻り値
コールバックによって返された値を返します。
- 発生
max_attempts
を超過した場合、ExceededMaxAttempts
が発生します。
自動クライアント側再試行を使用して、トランザクションでSQLを実行します。
ユーザーが提供する
callback
必ずトランザクションが実行されている接続を表す
db
インスタンスを1つのパラメータとして受け入れる必要があります。必ずコミット、ロールバック、またはそれ以外でトランザクションを管理しようとしないでください。
複数回呼び出すことができます。
理想的にはSQL操作のみを含める必要があります。
さらに、CRDBはネストされたトランザクションをサポートしていないため、この関数が呼び出された時点でデータベースに開いているトランザクションがあってはなりません。そうしようとすると、
NotImplementedError
が発生します。最も単純な例
def create_user(email): def callback(db_ref): return User.create(email=email) return db.run_transaction(callback, max_attempts=10) user = create_user('huey@example.com')
- class PooledCockroachDatabase(database[, **kwargs])
PooledPostgresqlDatabase
に基づく、CockroachDBの接続プール実装。CockroachDatabase
と同じAPIを実装しますが、クライアント側の接続プールを行います。
- run_transaction(db, callback[, max_attempts=None[, system_time=None[, priority=None]]])
自動クライアント側再試行を使用して、トランザクションでSQLを実行します。詳細については、
CockroachDatabase.run_transaction()
を参照してください。- パラメーター
db (CockroachDatabase) – データベースインスタンス。
callback – 単一の
db
パラメータ(上記で渡された値と同じになります)を受け入れる、呼び出し可能なオブジェクト。
注意
この関数は、
CockroachDatabase
クラスの同じ名前のメソッドと同等です。
- class UUIDKeyField
CRDBの
gen_random_uuid()
関数を使用して、初期値を自動的に設定するUUID主キーフィールド。
- class RowIDField
CRDBの
unique_rowid()
関数を使用して、初期値を自動的に設定する自動インクリメント整数主キーフィールド。
参考資料
Postgresql拡張機能の
BinaryJSONField
(cockroachdb
拡張モジュールで利用可能で、JSONField
にエイリアスされています)。Postgresql拡張機能の
ArrayField
。
MySQL拡張機能
Peeweeは、mysql-connectorドライバーまたはmariadb-connectorを使用するための代替のデータベース実装を提供します。実装はplayhouse.mysql_ext
にあります。
- class MySQLConnectorDatabase(database, **kwargs)
mysql-connectorを使用するデータベース実装。サポートされている接続パラメーターの完全なリスト。
mysql-connectorの使用例
from playhouse.mysql_ext import MySQLConnectorDatabase # MySQL database implementation that utilizes mysql-connector driver. db = MySQLConnectorDatabase('my_database', host='1.2.3.4', user='mysql')
- class MariaDBConnectorDatabase(database, **kwargs)
mariadb-connectorを使用するデータベース実装。サポートされている接続パラメーターの完全なリスト。
mariadb-connectorの使用例
from playhouse.mysql_ext import MariaDBConnectorDatabase # MySQL database implementation that utilizes mysql-connector driver. db = MariaDBConnectorDatabase('my_database', host='1.2.3.4', user='mysql')
追加のMySQL固有のヘルパー
DataSet
datasetモジュールには、人気のある同名のプロジェクトをモデルにした、データベースを扱うための高レベルAPIが含まれています。datasetモジュールの目的は、以下を提供することです。
JSONを扱うような、リレーショナルデータを扱うための簡略化されたAPI。
リレーショナルデータをJSONまたはCSVとして簡単にエクスポートする方法。
JSONまたはCSVデータをリレーショナルデータベースに簡単にインポートする方法。
最小限のデータローディングスクリプトは次のようになります。
from playhouse.dataset import DataSet
db = DataSet('sqlite:///:memory:')
table = db['sometable']
table.insert(name='Huey', age=3)
table.insert(name='Mickey', age=5, gender='male')
huey = table.find_one(name='Huey')
print(huey)
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
for obj in table:
print(obj)
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
# {'age': 5, 'gender': 'male', 'id': 2, 'name': 'Mickey'}
辞書APIを使用して、挿入、更新、削除もできます。
huey = table.find_one(name='Huey')
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
# Perform an update by supplying a partial record of changes.
table[1] = {'gender': 'male', 'age': 4}
print(table[1])
# {'age': 4, 'gender': 'male', 'id': 1, 'name': 'Huey'}
# Or insert a new record:
table[3] = {'name': 'Zaizee', 'age': 2}
print(table[3])
# {'age': 2, 'gender': None, 'id': 3, 'name': 'Zaizee'}
# Or delete a record:
del table[3] # Remove the row we just added.
freeze()
およびthaw()
を使用して、データをエクスポートまたはインポートできます。
# Export table content to the `users.json` file.
db.freeze(table.all(), format='json', filename='users.json')
# Import data from a CSV file into a new table. Columns will be automatically
# created for each field in the CSV file.
new_table = db['stats']
new_table.thaw(format='csv', filename='monthly_stats.csv')
はじめに
DataSet
オブジェクトは、dialect://user:password@host/dbname
形式のデータベースURLを渡すことで初期化されます。様々なデータベースへの接続例については、データベースURLセクションを参照してください。
# Create an in-memory SQLite database.
db = DataSet('sqlite:///:memory:')
データの保存
データを保存するには、まずテーブルへの参照を取得する必要があります。テーブルが存在しない場合は、自動的に作成されます。
# Get a table reference, creating the table if it does not exist.
table = db['users']
これでテーブルに新しい行をinsert()
できます。列が存在しない場合は、自動的に作成されます。
table.insert(name='Huey', age=3, color='white')
table.insert(name='Mickey', age=5, gender='male')
テーブル内の既存のエントリを更新するには、新しい値とフィルター条件を含む辞書を渡します。フィルターとして使用する列のリストは、columns引数で指定します。フィルター列が指定されていない場合は、すべての行が更新されます。
# Update the gender for "Huey".
table.update(name='Huey', gender='male', columns=['name'])
# Update all records. If the column does not exist, it will be created.
table.update(favorite_orm='peewee')
データのインポート
JSONファイルやCSVファイルなどの外部ソースからデータをインポートするには、thaw()
メソッドを使用できます。デフォルトでは、検出された属性に対して新しい列が作成されます。テーブルですでに定義されている列のみをデータ入力したい場合は、strict=True
を渡すことができます。
# Load data from a JSON file containing a list of objects.
table = dataset['stock_prices']
table.thaw(filename='stocks.json', format='json')
table.all()[:3]
# Might print...
[{'id': 1, 'ticker': 'GOOG', 'price': 703},
{'id': 2, 'ticker': 'AAPL', 'price': 109},
{'id': 3, 'ticker': 'AMZN', 'price': 300}]
トランザクションの使用
DataSetは、シンプルなコンテキストマネージャーを使用したネストされたトランザクションをサポートしています。
table = db['users']
with db.transaction() as txn:
table.insert(name='Charlie')
with db.transaction() as nested_txn:
# Set Charlie's favorite ORM to Django.
table.update(name='Charlie', favorite_orm='django', columns=['name'])
# jk/lol
nested_txn.rollback()
データベースの検査
現在のデータベース内のテーブルを一覧表示するには、tables()
メソッドを使用できます。
>>> print(db.tables)
['sometable', 'user']
また、特定のテーブルに対しては、列を印刷できます。
>>> table = db['user']
>>> print(table.columns)
['id', 'age', 'name', 'gender', 'favorite_orm']
テーブル内の行数も調べることができます。
>>> print(len(db['user']))
3
データの読み取り
すべての行を取得するには、all()
メソッドを使用できます。
# Retrieve all the users.
users = db['user'].all()
# We can iterate over all rows without calling `.all()`
for user in db['user']:
print(user['name'])
特定のオブジェクトは、find()
およびfind_one()
を使用して取得できます。
# Find all the users who like peewee.
peewee_users = db['user'].find(favorite_orm='peewee')
# Find Huey.
huey = db['user'].find_one(name='Huey')
データのエクスポート
データをエクスポートするには、エクスポートしたいクエリを渡してfreeze()
メソッドを使用します。
peewee_users = db['user'].find(favorite_orm='peewee')
db.freeze(peewee_users, format='json', filename='peewee_users.json')
API
- class DataSet(url, **kwargs)
- パラメーター
url – データベースURLまたは
Database
インスタンス。URLの使用方法の詳細については、例についてはデータベースURLを参照してください。kwargs – DBをイントロスペクトするときに、
Introspector.generate_models()
に渡される追加のキーワード引数。
DataSetクラスは、リレーショナルデータベースを扱うための高レベルAPIを提供します。
- tables
データベースに保存されているテーブルのリストを返します。このリストは、アクセスされるたびに動的に計算されます。
- query(sql[, params=None[, commit=True]])
- パラメーター
sql (str) – SQLクエリ。
params (list) – クエリのオプションのパラメーター。
commit (bool) – クエリを実行時にコミットするかどうか。
- 戻り値
データベースカーソル。
指定されたクエリをデータベースに対して実行します。
- transaction()
新しいトランザクション(またはセーブポイント)を表すコンテキストマネージャーを作成します。
- freeze(query[, format='csv'[, filename=None[, file_obj=None[, encoding='utf8'[, **kwargs]]]]])
- パラメーター
query –
SelectQuery
。all()
または〜Table.findを使用して生成されます。format – 出力形式。デフォルトでは、csvとjsonがサポートされています。
filename – 出力を書き込むファイル名。
file_obj – 出力を書き込むファイルのようなオブジェクト。
encoding (str) – ファイルエンコーディング。
kwargs – エクスポート固有の機能のための任意のパラメーター。
- thaw(table[, format='csv'[, filename=None[, file_obj=None[, strict=False[, encoding='utf8'[, **kwargs]]]]]])
- パラメーター
table (str) – データをロードするテーブルの名前。
format – 入力形式。デフォルトでは、csvとjsonがサポートされています。
filename – データを読み取るファイル名。
file_obj – データを読み取るファイルのようなオブジェクト。
strict (bool) – テーブルにまだ存在しない列の値を保存するかどうか。
encoding (str) – ファイルエンコーディング。
kwargs – インポート固有の機能のための任意のパラメーター。
- connect()
基になるデータベースへの接続を開きます。接続が明示的に開かれていない場合は、クエリが最初に実行されるときに接続が開かれます。
- close()
基になるデータベースへの接続を閉じます。
- class Table(dataset, name, model_class)
- Noindex
指定されたテーブル内の行を操作するための高レベルAPIを提供します。
- columns
指定されたテーブルの列のリストを返します。
- create_index(columns[, unique=False])
指定された列にインデックスを作成します。
# Create a unique index on the `username` column. db['users'].create_index(['username'], unique=True)
- insert(**data)
指定されたデータ辞書をテーブルに挿入し、必要に応じて新しい列を作成します。
- update(columns=None, conjunction=None, **data)
提供されたデータを使用してテーブルを更新します。columnsパラメータに1つ以上の列が指定されている場合、その列のdata辞書内の値を使用して、更新する行を決定します。
# Update all rows. db['users'].update(favorite_orm='peewee') # Only update Huey's record, setting his age to 3. db['users'].update(name='Huey', age=3, columns=['name'])
- find(**query)
指定された等価条件に一致する行についてテーブルをクエリします。クエリが指定されていない場合は、すべての行が返されます。
peewee_users = db['users'].find(favorite_orm='peewee')
- find_one(**query)
指定された等価条件に一致する単一行を返します。一致する行が見つからない場合は、
None
が返されます。huey = db['users'].find_one(name='Huey')
- all()
指定されたテーブルのすべての行を返します。
- delete(**query)
指定された等価条件に一致するすべての行を削除します。クエリが提供されない場合は、すべての行が削除されます。
# Adios, Django! db['users'].delete(favorite_orm='Django') # Delete all the secret messages. db['secret_messages'].delete()
- freeze([format='csv'[, filename=None[, file_obj=None[, **kwargs]]]])
- パラメーター
format – 出力形式。デフォルトでは、csvとjsonがサポートされています。
filename – 出力を書き込むファイル名。
file_obj – 出力を書き込むファイルのようなオブジェクト。
kwargs – エクスポート固有の機能のための任意のパラメーター。
- thaw([format='csv'[, filename=None[, file_obj=None[, strict=False[, **kwargs]]]]])
- パラメーター
format – 入力形式。デフォルトでは、csvとjsonがサポートされています。
filename – データを読み取るファイル名。
file_obj – データを読み取るファイルのようなオブジェクト。
strict (bool) – テーブルにまだ存在しない列の値を保存するかどうか。
kwargs – インポート固有の機能のための任意のパラメーター。
フィールド
これらのフィールドは、playhouse.fields
モジュールにあります。
ハイブリッド属性
ハイブリッド属性は、PythonレベルとSQLレベルの両方で動作する機能をカプセル化します。ハイブリッド属性のアイデアは、SQLAlchemyの同名の機能に由来します。次の例を考えてみましょう。
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_method
def contains(self, point):
return (self.start <= point) & (point < self.end)
ハイブリッド属性は、length
属性がInterval
クラス経由でアクセスされるか、Interval
インスタンス経由でアクセスされるかによって異なる動作をするという事実からその名前が付けられています。
インスタンス経由でアクセスされた場合、期待どおりに動作します。
ただし、Interval.length
クラス属性経由でアクセスされた場合、長さの計算はSQL式として表されます。例えば
query = Interval.select().where(Interval.length > 5)
このクエリは次のSQLと同等になります。
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."end" - "t1"."start") > 5)
playhouse.hybrid
モジュールには、パラメータを受け入れることができるハイブリッドメソッドを実装するためのデコレータも含まれています。ハイブリッドプロパティと同様に、モデルインスタンス経由でアクセスされた場合、関数は記述されたとおりに正常に実行されます。ただし、ハイブリッドメソッドがクラスで呼び出されると、SQL式が生成されます。
例
query = Interval.select().where(Interval.contains(2))
このクエリは次のSQLと同等です。
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))
Pythonの実装がSQLの実装とわずかに異なる状況のための追加のAPIがあります。Interval
モデルにradius
メソッドを追加しましょう。このメソッドは絶対値を計算するため、インスタンス部分にはPythonのabs()
関数を使用し、クラス部分にはfn.ABS()
SQL関数を使用します。
class Interval(Model):
start = IntegerField()
end = IntegerField()
@hybrid_property
def length(self):
return self.end - self.start
@hybrid_property
def radius(self):
return abs(self.length) / 2
@radius.expression
def radius(cls):
return fn.ABS(cls.length) / 2
素晴らしいのは、radius
の実装の両方がlength
ハイブリッド属性を参照していることです!Interval
インスタンス経由でアクセスすると、半径の計算はPythonで実行されます。Interval
クラス経由で呼び出すと、適切なSQLが得られます。
例
query = Interval.select().where(Interval.radius < 3)
このクエリは次のSQLと同等です。
SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE ((abs("t1"."end" - "t1"."start") / 2) < 3)
素晴らしいですよね?クールなアイデアをありがとう、SQLAlchemy!
ハイブリッドAPI
- class hybrid_method(func[, expr=None])
インスタンスレベルとクラスレベルの両方の動作を持つPythonオブジェクトメソッドの定義を可能にするメソッドデコレータ。
例
class Interval(Model): start = IntegerField() end = IntegerField() @hybrid_method def contains(self, point): return (self.start <= point) & (point < self.end)
Interval
インスタンスで呼び出された場合、contains
メソッドは期待どおりに動作します。ただし、クラスメソッドとして呼び出された場合は、SQL式が生成されます。query = Interval.select().where(Interval.contains(2))
次のSQLが生成されます。
SELECT "t1"."id", "t1"."start", "t1"."end" FROM "interval" AS t1 WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))
- expression(expr)
SQL式を生成するメソッドを指定するためのメソッドデコレータ。
- class hybrid_property(fget[, fset=None[, fdel=None[, expr=None]]])
インスタンスレベルとクラスレベルの両方の振る舞いを持つ Python オブジェクトプロパティの定義を可能にするメソッドデコレータ。
例
class Interval(Model): start = IntegerField() end = IntegerField() @hybrid_property def length(self): return self.end - self.start @hybrid_property def radius(self): return abs(self.length) / 2 @radius.expression def radius(cls): return fn.ABS(cls.length) / 2
Interval
インスタンスでアクセスすると、length
およびradius
プロパティは期待どおりに動作します。ただし、クラス属性としてアクセスすると、代わりに SQL 式が生成されます。query = (Interval .select() .where( (Interval.length > 6) & (Interval.radius >= 3)))
次のSQLが生成されます。
SELECT "t1"."id", "t1"."start", "t1"."end" FROM "interval" AS t1 WHERE ( (("t1"."end" - "t1"."start") > 6) AND ((abs("t1"."end" - "t1"."start") / 2) >= 3) )
キー/バリューストア
playhouse.kv
モジュールには、永続的な辞書の実装が含まれています。
- class KeyValue([key_field=None[, value_field=None[, ordered=False[, database=None[, table_name='keyvalue']]]]])
- パラメーター
key_field (Field) – キーに使用するフィールド。デフォルトは
CharField
です。必ずprimary_key=True
を持つ必要があります。value_field (Field) – 値に使用するフィールド。デフォルトは
PickleField
です。ordered (bool) – データをキーでソートされた順序で返す必要があるかどうか。
database (Database) – キー/値データが格納されるデータベース。指定しない場合、インメモリ SQLite データベースが使用されます。
table_name (str) – データストレージのテーブル名。
キー/値データを格納するための辞書のような API。辞書と同様に、期待される API をサポートしますが、アイテムの取得、設定、削除に式を受け入れる機能も追加されています。
KeyValue
がインスタンス化されると、テーブルは自動的に (存在しない場合) 作成されます。キー/値のペアの設定と更新/上書きには、効率的な upsert 実装を使用します。
基本的な例
# Create a key/value store, which uses an in-memory SQLite database # for data storage. KV = KeyValue() # Set (or overwrite) the value for "k1". KV['k1'] = 'v1' # Set (or update) multiple keys at once (uses an efficient upsert). KV.update(k2='v2', k3='v3') # Getting values works as you'd expect. assert KV['k2'] == 'v2' # We can also do this: for value in KV[KV.key > 'k1']: print(value) # 'v2' # 'v3' # Update multiple values at once using expression: KV[KV.key > 'k1'] = 'vx' # What's stored in the KV? print(dict(KV)) # {'k1': 'v1', 'k2': 'vx', 'k3': 'vx'} # Delete a single item. del KV['k2'] # How many items are stored in the KV? print(len(KV)) # 2 # Delete items that match the given condition. del KV[KV.key > 'k1']
- __contains__(expr)
- パラメーター
expr – 単一のキーまたは式
- 戻り値
キー/式が存在するかどうかのブール値。
例
>>> kv = KeyValue() >>> kv.update(k1='v1', k2='v2') >>> 'k1' in kv True >>> 'kx' in kv False >>> (KV.key < 'k2') in KV True >>> (KV.key > 'k2') in KV False
- __len__()
- 戻り値
格納されているアイテムの数。
- __getitem__(expr)
- パラメーター
expr – 単一のキーまたは式。
- 戻り値
キー/式に対応する値。
- 発生
単一のキーが指定されていて見つからない場合は、
KeyError
。
例
>>> KV = KeyValue() >>> KV.update(k1='v1', k2='v2', k3='v3') >>> KV['k1'] 'v1' >>> KV['kx'] KeyError: "kx" not found >>> KV[KV.key > 'k1'] ['v2', 'v3'] >>> KV[KV.key < 'k1'] []
- __setitem__(expr, value)
- パラメーター
expr – 単一のキーまたは式。
value – キーに設定する値
指定されたキーの値を設定します。
expr
が式の場合、その式に一致するすべてのキーの値が更新されます。例
>>> KV = KeyValue() >>> KV.update(k1='v1', k2='v2', k3='v3') >>> KV['k1'] = 'v1-x' >>> print(KV['k1']) 'v1-x' >>> KV[KV.key >= 'k2'] = 'v99' >>> dict(KV) {'k1': 'v1-x', 'k2': 'v99', 'k3': 'v99'}
- __delitem__(expr)
- パラメーター
expr – 単一のキーまたは式。
指定されたキーを削除します。式が指定されている場合、その式に一致するすべてのキーを削除します。
例
>>> KV = KeyValue() >>> KV.update(k1=1, k2=2, k3=3) >>> del KV['k1'] # Deletes "k1". >>> del KV['k1'] KeyError: "k1" does not exist >>> del KV[KV.key > 'k2'] # Deletes "k3". >>> del KV[KV.key > 'k99'] # Nothing deleted, no keys match.
- keys()
- 戻り値
テーブル内のすべてのキーのイテラブル。
- values()
- 戻り値
テーブル内のすべての値のイテラブル。
- items()
- 戻り値
テーブル内のすべてのキー/値ペアのイテラブル。
- update([__data=None[, **mapping]])
指定されたキー/値ペアを効率的に一括挿入または置換します。
例
>>> KV = KeyValue() >>> KV.update(k1=1, k2=2) # Sets 'k1'=1, 'k2'=2. >>> dict(KV) {'k1': 1, 'k2': 2} >>> KV.update(k2=22, k3=3) # Updates 'k2'->22, sets 'k3'=3. >>> dict(KV) {'k1': 1, 'k2': 22, 'k3': 3} >>> KV.update({'k2': -2, 'k4': 4}) # Also can pass a dictionary. >>> dict(KV) {'k1': 1, 'k2': -2, 'k3': 3, 'k4': 4}
- get(expr[, default=None])
- パラメーター
expr – 単一のキーまたは式。
default – キーが見つからない場合のデフォルト値。
- 戻り値
指定されたキー/式の値、または単一のキーが見つからない場合はデフォルト値。
指定されたキーの値を取得します。キーが存在しない場合、キーが式の場合は空のリストが返される場合を除き、デフォルト値が返されます。
- pop(expr[, default=Sentinel])
- パラメーター
expr – 単一のキーまたは式。
default – キーが存在しない場合のデフォルト値。
- 戻り値
指定されたキー/式の値、または単一のキーが見つからない場合はデフォルト値。
値を取得し、指定されたキーを削除します。キーが存在しない場合、キーが式の場合は空のリストが返される場合を除き、デフォルト値が返されます。
- clear()
キーと値のテーブルからすべてのアイテムを削除します。
ショートカット
このモジュールには、peewee の API を使用して、やや冗長または面倒になる可能性のあるものを表現するためのヘルパー関数が含まれています。モデルを辞書にシリアル化したり、その逆を行うためのヘルパーもあります。
- model_to_dict(model[, recurse=True[, backrefs=False[, only=None[, exclude=None[, extra_attrs=None[, fields_from_query=None[, max_depth=None[, manytomany=False]]]]]]]])
- パラメーター
recurse (bool) – 外部キーを再帰的に処理するかどうか。
backrefs (bool) – 関連オブジェクトのリストを再帰的に処理するかどうか。
only – 結果の辞書に含める必要があるフィールドインスタンスのリスト (またはセット)。
exclude – 結果の辞書から除外する必要があるフィールドインスタンスのリスト (またはセット)。
extra_attrs – 辞書に含める必要があるインスタンスの属性またはメソッド名のリスト。
fields_from_query (Select) – このモデルインスタンスを作成した
SelectQuery
。クエリで明示的に選択されたフィールドと値のみがシリアル化されます。max_depth (int) – 再帰時の最大深度。
manytomany (bool) – 多対多フィールドを処理するかどうか。
モデルインスタンス (およびオプションで関連するインスタンス) を辞書に変換します。
例
>>> user = User.create(username='charlie') >>> model_to_dict(user) {'id': 1, 'username': 'charlie'} >>> model_to_dict(user, backrefs=True) {'id': 1, 'tweets': [], 'username': 'charlie'} >>> t1 = Tweet.create(user=user, message='tweet-1') >>> t2 = Tweet.create(user=user, message='tweet-2') >>> model_to_dict(user, backrefs=True) { 'id': 1, 'tweets': [ {'id': 1, 'message': 'tweet-1'}, {'id': 2, 'message': 'tweet-2'}, ], 'username': 'charlie' } >>> model_to_dict(t1) { 'id': 1, 'message': 'tweet-1', 'user': { 'id': 1, 'username': 'charlie' } } >>> model_to_dict(t2, recurse=False) {'id': 1, 'message': 'tweet-2', 'user': 1}
model_to_dict
の実装は、サポートしようとするさまざまな用途があるため、かなり複雑です。特別な用途がある場合は、この関数にいくつかの奇妙なパラメータの組み合わせを無理やり押し込もうとしないことを強くお勧めします。あなたがやろうとしていることを正確に実行するシンプルな関数を作成してください。
- dict_to_model(model_class, data[, ignore_unknown=False])
- パラメーター
model_class (Model) – 作成するモデルクラス。
data (dict) – データの辞書。外部キーはネストされた辞書として含めることができ、バックリファレンスは辞書のリストとして含めることができます。
ignore_unknown (bool) – 認識されない (非フィールド) 属性を許可するかどうか。
データの辞書をモデルインスタンスに変換し、必要に応じて関連インスタンスを作成します。
例
>>> user_data = {'id': 1, 'username': 'charlie'} >>> user = dict_to_model(User, user_data) >>> user <__main__.User at 0x7fea8fa4d490> >>> user.username 'charlie' >>> note_data = {'id': 2, 'text': 'note text', 'user': user_data} >>> note = dict_to_model(Note, note_data) >>> note.text 'note text' >>> note.user.username 'charlie' >>> user_with_notes = { ... 'id': 1, ... 'username': 'charlie', ... 'notes': [{'id': 1, 'text': 'note-1'}, {'id': 2, 'text': 'note-2'}]} >>> user = dict_to_model(User, user_with_notes) >>> user.notes[0].text 'note-1' >>> user.notes[0].user.username 'charlie'
- update_model_from_dict(instance, data[, ignore_unknown=False])
- パラメーター
instance (Model) – 更新するモデルインスタンス。
data (dict) – データの辞書。外部キーはネストされた辞書として含めることができ、バックリファレンスは辞書のリストとして含めることができます。
ignore_unknown (bool) – 認識されない (非フィールド) 属性を許可するかどうか。
指定されたデータ辞書を使用してモデルインスタンスを更新します。
- resolve_multimodel_query(query[, key='_model_identifier'])
- パラメーター
query – 複合 select クエリ。
key (str) – モデル識別子の格納に使用するキー
- 戻り値
複合的なSELECTクエリで選択された各行に対して適切なモデルインスタンスを生成するイテラブルなカーソルです。
複合的なSELECTクエリで返された行を、正しいモデルインスタンス型に解決するためのヘルパーです。たとえば、2つの異なるテーブルの和集合がある場合、このヘルパーはクエリ結果を反復処理する際に、各行を適切なモデルに解決します。
- class ThreadSafeDatabaseMetadata
マルチスレッドアプリケーションで実行時にデータベースを安全に切り替えられるように、
database
属性へのスレッドセーフなアクセスを提供するモデルMetadata
実装。使い方
from playhouse.shortcuts import ThreadSafeDatabaseMetadata # Our multi-threaded application will sometimes swap out the primary # for the read-replica at run-time. primary = PostgresqlDatabase(...) read_replica = PostgresqlDatabase(...) class BaseModel(Model): class Meta: database = primary model_metadata_class = ThreadSafeDatabaseMetadata
シグナルサポート
シグナル(djangoのような)のフックを持つモデルは、playhouse.signals
で提供されています。シグナルを使用するには、プロジェクトのすべてのモデルがplayhouse.signals.Model
のサブクラスである必要があります。これは、さまざまなシグナルをサポートするために必要なメソッドをオーバーライドします。
from playhouse.signals import Model, post_save
class MyModel(Model):
data = IntegerField()
@post_save(sender=MyModel)
def on_save_handler(model_class, instance, created):
put_data_in_cache(instance.data)
警告
私が期待するところでは、明らかな理由から、PeeweeシグナルはModel.insert()
、Model.update()
、またはModel.delete()
メソッドを使用する場合は機能しません。これらのメソッドは、ORMのスコープ外で実行されるクエリを生成し、クエリが実行されたときにどのモデルインスタンスが影響を受ける可能性があるか、または影響を受けない可能性があるかをORMは認識しません。
シグナルは、影響を受けるモデルインスタンスが事前にわかっているModel.save()
やModel.delete_instance()
のような高レベルのpeewee APIにフックすることで機能します。
次のシグナルが提供されます
pre_save
オブジェクトがデータベースに保存される直前に呼び出されます。モデルが初めて保存されるのか、更新されるのかを示す追加のキーワード引数
created
を提供します。post_save
オブジェクトがデータベースに保存された直後に呼び出されます。モデルが初めて保存されるのか、更新されるのかを示す追加のキーワード引数
created
を提供します。pre_delete
Model.delete_instance()
が使用されたときに、オブジェクトがデータベースから削除される直前に呼び出されます。post_delete
Model.delete_instance()
が使用されたときに、オブジェクトがデータベースから削除された直後に呼び出されます。pre_init
モデルクラスが最初にインスタンス化されるときに呼び出されます
ハンドラーの接続
シグナルがディスパッチされるたびに、登録されたハンドラーが呼び出されます。これにより、完全に分離されたコードがモデルの保存や削除などのイベントに応答できるようになります。
Signal
クラスは、コールバック関数と「sender」と「name」の2つのオプションのパラメータを受け取るconnect()
メソッドを提供します。「sender」パラメータを指定した場合、単一のモデルクラスである必要があり、コールバックがその1つのモデルクラスからのシグナルのみを受信できるようにします。「name」パラメータは、シグナルハンドラーを登録解除する場合に便利なエイリアスとして使用されます。
使用例
from playhouse.signals import *
def post_save_handler(sender, instance, created):
print('%s was just saved' % instance)
# our handler will only be called when we save instances of SomeModel
post_save.connect(post_save_handler, sender=SomeModel)
すべてのシグナルハンドラーは、最初の2つの引数としてsender
とinstance
を受け取ります。sender
はモデルクラスであり、instance
は実際に処理されているモデルです。
必要に応じて、デコレータを使用してシグナルハンドラーを接続することもできます。これは上記の例と機能的に同等です
@post_save(sender=SomeModel)
def post_save_handler(sender, instance, created):
print('%s was just saved' % instance)
シグナルAPI
- class Signal
レシーバー(コールバック)のリストを格納し、「send」メソッドが呼び出されたときにそれらを呼び出します。
- connect(receiver[, name=None[, sender=None]])
- パラメーター
receiver (callable) – 少なくとも2つのパラメータ、「sender」(シグナルをトリガーしたモデルのサブクラス)と「instance」(実際のモデルインスタンス)を受け取るcallable。
name (string) – 短いエイリアス
sender (Model) – 指定した場合、このモデルクラスのインスタンスのみがレシーバーコールバックをトリガーします。
シグナルが送信されるたびに呼び出されるレシーバーの内部リストにレシーバーを追加します。
from playhouse.signals import post_save from project.handlers import cache_buster post_save.connect(cache_buster, name='project.cache_buster')
- disconnect([receiver=None[, name=None[, sender=None]]])
- パラメーター
receiver (callable) – 切断するコールバック
name (string) – 短いエイリアス
sender (Model) – モデル固有のハンドラーを切断します。
指定されたレシーバー(または指定された名前エイリアスを持つレシーバー)を、呼び出されないように切断します。レシーバーまたは名前のいずれかを指定する必要があります。
post_save.disconnect(name='project.cache_buster')
- send(instance, *args, **kwargs)
- パラメーター
instance – モデルインスタンス
レシーバーを反復処理し、接続された順序でそれらを呼び出します。レシーバーがsenderを指定した場合、インスタンスがsenderのインスタンスである場合にのみ呼び出されます。
pwiz、モデルジェネレーター
pwiz
は、peeweeに付属している小さなスクリプトで、既存のデータベースをイントロスペクトし、基盤となるデータと対話するのに適したモデルコードを生成できます。すでにデータベースがある場合、pwizは正しい列の関連性と外部キーを持つスケルトンコードを生成することで、優れたブーストを提供できます。
setup.py install
を使用してpeeweeをインストールすると、pwizは「スクリプト」としてインストールされ、次のように実行できます
python -m pwiz -e postgresql -u postgres my_postgres_db
これにより、標準出力に多数のモデルが出力されます。したがって、次のように実行できます
python -m pwiz -e postgresql my_postgres_db > mymodels.py
python # <-- fire up an interactive shell
>>> from mymodels import Blog, Entry, Tag, Whatever
>>> print([blog.name for blog in Blog.select()])
コマンドラインオプション
pwizは次のコマンドラインオプションを受け付けます
オプション |
意味 |
例 |
---|---|---|
-h |
ヘルプを表示 |
|
-e |
データベースバックエンド |
-e mysql |
-H |
接続するホスト |
-H remote.db.server |
-p |
接続するポート |
-p 9001 |
-u |
データベースユーザー |
-u postgres |
-P |
データベースパスワード |
-P (パスワードの入力を求められます) |
-s |
スキーマ |
-s public |
-t |
生成するテーブル |
-t tweet,users,relationships |
-v |
VIEWのモデルを生成 |
(引数なし) |
-i |
生成されたファイルに情報メタデータを追加 |
(引数なし) |
-o |
テーブル列の順序が保持されます |
(引数なし) |
engine
(-e
) の有効なパラメータは次のとおりです。
sqlite
mysql
postgresql
警告
データベースにアクセスするためにパスワードが必要な場合は、安全なプロンプトを使用してパスワードを入力するように求められます。
パスワードは出力に含まれます。具体的には、ファイルの先頭に、パスワードを含む必要なパラメータとともにDatabase
が定義されます。
pwizの例
さまざまなデータベースをイントロスペクトする例
# Introspect a Sqlite database.
python -m pwiz -e sqlite path/to/sqlite_database.db
# Introspect a MySQL database, logging in as root. You will be prompted
# for a password ("-P").
python -m pwiz -e mysql -u root -P mysql_db_name
# Introspect a Postgresql database on a remote server.
python -m pwiz -e postgres -u postgres -H 10.1.0.3 pg_db_name
完全な例
$ sqlite3 example.db << EOM
CREATE TABLE "user" ("id" INTEGER NOT NULL PRIMARY KEY, "username" TEXT NOT NULL);
CREATE TABLE "tweet" (
"id" INTEGER NOT NULL PRIMARY KEY,
"content" TEXT NOT NULL,
"timestamp" DATETIME NOT NULL,
"user_id" INTEGER NOT NULL,
FOREIGN KEY ("user_id") REFERENCES "user" ("id"));
CREATE UNIQUE INDEX "user_username" ON "user" ("username");
EOM
$ python -m pwiz -e sqlite example.db
次の出力を生成します
from peewee import *
database = SqliteDatabase('example.db', **{})
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta:
database = database
class User(BaseModel):
username = TextField(unique=True)
class Meta:
table_name = 'user'
class Tweet(BaseModel):
content = TextField()
timestamp = DateTimeField()
user = ForeignKeyField(column_name='user_id', field='id', model=User)
class Meta:
table_name = 'tweet'
観察
外部キー
Tweet.user_id
が検出され、正しくマッピングされています。User.username
UNIQUE制約が検出されています。各モデルは、必須ではない場合でも(Peeweeはクラス名を適切なテーブル名に自動的に変換するため)、テーブル名を明示的に宣言します。
ForeignKeyField
のすべてのパラメータは、Peeweeがデフォルトで使用する規則に従っている場合でも、明示的に宣言されています。
注意
UnknownField
は、スキーマにPeeweeがフィールドクラスにマッピングする方法を知らない列の宣言が含まれている場合に使用されるプレースホルダーです。
スキーマ移行
Peeweeは、Postgresql、SQLite、MySQLに対して十分にテストされたサポートを備えたスキーマ移行をサポートするようになりました。他のスキーマ移行ツールとは異なり、peeweeの移行はイントロスペクションとデータベースの「バージョン管理」を処理しません。代わりに、peeweeはスキーマ変更ステートメントを生成および実行するための多数のヘルパー関数を提供します。このエンジンは、いつかより高度なツールを構築できる基礎を提供します。
移行は単純なpythonスクリプトとして記述し、コマンドラインから実行できます。移行はアプリケーションのDatabase
オブジェクトにのみ依存するため、依存関係を導入せずに、モデル定義の変更を管理し、移行スクリプトのセットを維持することが簡単になります。
使用例
migrateモジュールからヘルパーをインポートすることから始めます
from playhouse.migrate import *
migrator
をインスタンス化します。SchemaMigrator
クラスは、スキーマ変更操作を生成する役割を担い、これはmigrate()
ヘルパーによって順番に実行できます。
# Postgres example:
my_db = PostgresqlDatabase(...)
migrator = PostgresqlMigrator(my_db)
# SQLite example:
my_db = SqliteDatabase('my_database.db')
migrator = SqliteMigrator(my_db)
migrate()
を使用して、1つ以上の操作を実行します。
title_field = CharField(default='')
status_field = IntegerField(null=True)
migrate(
migrator.add_column('some_table', 'title', title_field),
migrator.add_column('some_table', 'status', status_field),
migrator.drop_column('some_table', 'old_column'),
)
警告
マイグレーションはトランザクション内では実行されません。トランザクション内でマイグレーションを実行したい場合は、migrate の呼び出しを atomic()
コンテキストマネージャーでラップする必要があります。例:
with my_db.atomic():
migrate(...)
サポートされている操作
既存のモデルに新しいフィールドを追加する
# Create your field instances. For non-null fields you must specify a
# default value.
pubdate_field = DateTimeField(null=True)
comment_field = TextField(default='')
# Run the migration, specifying the database table, field name and field.
migrate(
migrator.add_column('comment_tbl', 'pub_date', pubdate_field),
migrator.add_column('comment_tbl', 'comment', comment_field),
)
注意
Peeweeは、Djangoの規約に従い、デフォルトで ForeignKeyField
のカラム名に _id
を付加します。外部キーを追加する際には、適切なカラム名を指定する必要があります。たとえば、Tweet
モデルに user
外部キーを追加する場合:
# Our desired model will look like this:
class Tweet(BaseModel):
user = ForeignKeyField(User) # I want to add this field.
# ... other fields ...
# Migration code:
user = ForeignKeyField(User, field=User.id, null=True)
migrate(
# Note that the column name given is "user_id".
migrator.add_column(Tweet._meta.table_name, 'user_id', user),
)
フィールドの名前を変更する
# Specify the table, original name of the column, and its new name.
migrate(
migrator.rename_column('story', 'pub_date', 'publish_date'),
migrator.rename_column('story', 'mod_date', 'modified_date'),
)
フィールドを削除する
migrate(
migrator.drop_column('story', 'some_old_field'),
)
フィールドをnullableにするか、nullableでないようにする
# Note that when making a field not null that field must not have any
# NULL values present.
migrate(
# Make `pub_date` allow NULL values.
migrator.drop_not_null('story', 'pub_date'),
# Prevent `modified_date` from containing NULL values.
migrator.add_not_null('story', 'modified_date'),
)
フィールドのデータ型を変更する
# Change a VARCHAR(50) field to a TEXT field.
migrate(
migrator.alter_column_type('person', 'email', TextField())
)
テーブルの名前を変更する
migrate(
migrator.rename_table('story', 'stories_tbl'),
)
インデックスを追加する
# Specify the table, column names, and whether the index should be
# UNIQUE or not.
migrate(
# Create an index on the `pub_date` column.
migrator.add_index('story', ('pub_date',), False),
# Create a multi-column index on the `pub_date` and `status` fields.
migrator.add_index('story', ('pub_date', 'status'), False),
# Create a unique index on the category and title fields.
migrator.add_index('story', ('category_id', 'title'), True),
)
インデックスを削除する
# Specify the index name.
migrate(migrator.drop_index('story', 'story_pub_date_status'))
テーブル制約を追加または削除する
# Add a CHECK() constraint to enforce the price cannot be negative.
migrate(migrator.add_constraint(
'products',
'price_check',
Check('price >= 0')))
# Remove the price check constraint.
migrate(migrator.drop_constraint('products', 'price_check'))
# Add a UNIQUE constraint on the first and last names.
migrate(migrator.add_unique('person', 'first_name', 'last_name'))
列のデータベースレベルのデフォルト値を追加または削除する
# Add a default value for a status column.
migrate(migrator.add_column_default(
'entries',
'status',
'draft'))
# Remove the default.
migrate(migrator.drop_column_default('entries', 'status'))
# Use a function for the default value (does not work with Sqlite):
migrate(migrator.add_column_default(
'entries',
'timestamp',
fn.now()))
# Or alternatively (works with Sqlite):
migrate(migrator.add_column_default(
'entries',
'timestamp',
'now()'))
注意
Postgresユーザーは、非標準スキーマを使用する場合、検索パスを設定する必要がある場合があります。これは次のように行うことができます。
new_field = TextField(default='', null=False)
migrator = PostgresqlMigrator(db)
migrate(migrator.set_search_path('my_schema_name'),
migrator.add_column('table', 'field_name', new_field))
マイグレーションAPI
- migrate(*operations)
1つ以上のスキーマ変更操作を実行します。
使い方
migrate( migrator.add_column('some_table', 'new_column', CharField(default='')), migrator.create_index('some_table', ('new_column',)), )
- class SchemaMigrator(database)
- パラメーター
database –
Database
インスタンス。
SchemaMigrator
は、スキーマ変更ステートメントの生成を担当します。- add_column(table, column_name, field)
-
指定されたテーブルに新しい列を追加します。提供された
field
は、適切な列定義を生成するために使用されます。注意
フィールドがnullableでない場合は、デフォルト値を指定する必要があります。
注意
非nullフィールドの場合、フィールドは最初にnullフィールドとして追加され、次に
UPDATE
ステートメントが実行されて列にデフォルト値が設定されます。最後に、列は非nullとしてマークされます。
- drop_column(table, column_name[, cascade=True])
- パラメーター
table (str) – 列を削除するテーブルの名前。
column_name (str) – 削除する列の名前。
cascade (bool) – 列を CASCADE 付きで削除するかどうか。
- rename_column(table, old_name, new_name)
- パラメーター
table (str) – 名前を変更する列を含むテーブルの名前。
old_name (str) – 列の現在の名前。
new_name (str) – 列の新しい名前。
- add_not_null(table, column)
- パラメーター
table (str) – 列を含むテーブルの名前。
column (str) – nullableでないようにする列の名前。
- drop_not_null(table, column)
- パラメーター
table (str) – 列を含むテーブルの名前。
column (str) – nullableにする列の名前。
- add_column_default(table, column, default)
- パラメーター
table (str) – 列を含むテーブルの名前。
column (str) – デフォルトを追加する列の名前。
default – 列の新しいデフォルト値。以下の注記を参照してください。
Peeweeは、デフォルト値が文字列リテラルであると思われる場合、適切に引用符で囲みます。それ以外の場合、デフォルト値は文字通りに扱われます。PostgresとMySQLは、デフォルトをpeewee式(例:
fn.NOW()
)として指定することをサポートしていますが、Sqliteユーザーは代わりにdefault='now()'
を使用する必要があります。
- drop_column_default(table, column)
- パラメーター
table (str) – 列を含むテーブルの名前。
column (str) – デフォルトを削除する列の名前。
- alter_column_type(table, column, field[, cast=None])
- パラメーター
列のデータ型を変更します。互換性のない型を使用するとデータベースで適切にサポートされない可能性があるため、このメソッドは慎重に使用する必要があります。
- rename_table(old_name, new_name)
- パラメーター
old_name (str) – テーブルの現在の名前。
new_name (str) – テーブルの新しい名前。
- add_index(table, columns[, unique=False[, using=None]])
- パラメーター
table (str) – インデックスを作成するテーブルの名前。
columns (list) – インデックスを作成する必要がある列のリスト。
unique (bool) – 新しいインデックスに一意制約を指定するかどうか。
using (str) – インデックスの種類(サポートされている場合)。例:GiSTまたはGIN。
- drop_index(table, index_name)
- パラメーター
table (str) – 削除するインデックスを含むテーブルの名前。
index_name (str) – 削除するインデックスの名前。
- add_constraint(table, name, constraint)
- drop_constraint(table, name)
- パラメーター
table (str) – 制約を削除するテーブル。
name (str) – 削除する制約の名前。
- add_unique(table, *column_names)
- パラメーター
table (str) – 制約を追加するテーブル。
column_names (str) – UNIQUE 制約のための1つ以上のカラム。
- class PostgresqlMigrator(database)
Postgresqlデータベースのマイグレーションを生成します。
- set_search_path(schema_name)
- パラメーター
schema_name (str) – 使用するスキーマ。
後続の操作のための検索パス(スキーマ)を設定します。
- class SqliteMigrator(database)
SQLiteデータベースのマイグレーションを生成します。
SQLiteは
ALTER TABLE
クエリのサポートが限られているため、次の操作は現在SQLiteではサポートされていません。add_constraint
drop_constraint
add_unique
- class MySQLMigrator(database)
MySQLデータベースのマイグレーションを生成します。
リフレクション
リフレクションモジュールには、既存のデータベースをイントロスペクトするためのヘルパーが含まれています。このモジュールは、DataSet や pwiz, モデルジェネレータ など、playhouseの他のいくつかのモジュールで内部的に使用されます。
- generate_models(database[, schema=None[, **options]])
- パラメーター
database (Database) – イントロスペクトするデータベースインスタンス。
schema (str) – イントロスペクトするオプションのスキーマ。
options – 任意のオプション、詳細は
Introspector.generate_models()
を参照してください。
- 戻り値
テーブル名をモデルクラスにマッピングする
dict
。
指定されたデータベース内のテーブルのモデルを生成します。この関数の使用方法の例については、「Peeweeをインタラクティブに使用する」のセクションを参照してください。
例
>>> from peewee import * >>> from playhouse.reflection import generate_models >>> db = PostgresqlDatabase('my_app') >>> models = generate_models(db) >>> list(models.keys()) ['account', 'customer', 'order', 'orderitem', 'product'] >>> globals().update(models) # Inject models into namespace. >>> for cust in customer.select(): # Query using generated model. ... print(cust.name) ... Huey Kitty Mickey Dog
- print_model(model)
- パラメーター
model (Model) – 表示するモデルクラス
- 戻り値
戻り値はありません
モデルクラスのユーザーフレンドリーな説明を表示します。デバッグやインタラクティブな使用に役立ちます。現在、テーブル名、およびすべてのフィールドとそれらのデータ型を表示します。「Peeweeをインタラクティブに使用する」のセクションに例があります。
出力例
>>> from playhouse.reflection import print_model >>> print_model(User) user id AUTO PK email TEXT name TEXT dob DATE index(es) email UNIQUE >>> print_model(Tweet) tweet id AUTO PK user INT FK: User.id title TEXT content TEXT timestamp DATETIME is_published BOOL index(es) user_id is_published, timestamp
- print_table_sql(model)
- パラメーター
model (Model) – 表示するモデル
- 戻り値
戻り値はありません
指定されたモデルクラスの SQL
CREATE TABLE
を出力します。これは、デバッグやインタラクティブな使用に役立つ場合があります。使用例については、「Peeweeをインタラクティブに使用する」のセクションを参照してください。インデックスと制約は、この関数の出力には含まれていないことに注意してください。出力例
>>> from playhouse.reflection import print_table_sql >>> print_table_sql(User) CREATE TABLE IF NOT EXISTS "user" ( "id" INTEGER NOT NULL PRIMARY KEY, "email" TEXT NOT NULL, "name" TEXT NOT NULL, "dob" DATE NOT NULL ) >>> print_table_sql(Tweet) CREATE TABLE IF NOT EXISTS "tweet" ( "id" INTEGER NOT NULL PRIMARY KEY, "user_id" INTEGER NOT NULL, "title" TEXT NOT NULL, "content" TEXT NOT NULL, "timestamp" DATETIME NOT NULL, "is_published" INTEGER NOT NULL, FOREIGN KEY ("user_id") REFERENCES "user" ("id") )
- class Introspector(metadata[, schema=None])
メタデータは、
Introspector
をインスタンス化することにより、データベースから抽出できます。このクラスを直接インスタンス化するのではなく、ファクトリメソッドfrom_database()
を使用することをお勧めします。- classmethod from_database(database[, schema=None])
- パラメーター
database –
Database
インスタンス。schema (str) – オプションのスキーマ(一部のデータベースでサポートされています)。
指定されたデータベースで使用するのに適した
Introspector
インスタンスを作成します。使い方
db = SqliteDatabase('my_app.db') introspector = Introspector.from_database(db) models = introspector.generate_models() # User and Tweet (assumed to exist in the database) are # peewee Model classes generated from the database schema. User = models['user'] Tweet = models['tweet']
- generate_models([skip_invalid=False[, table_names=None[, literal_column_names=False[, bare_fields=False[, include_views=False]]]]])
- パラメーター
skip_invalid (bool) – 名前が有効なpython識別子ではないテーブルをスキップします。
table_names (list) – 生成するテーブル名のリスト。指定しない場合は、すべてのテーブルのモデルが生成されます。
literal_column_names (bool) – カラム名をそのまま使用します。デフォルトでは、カラム名は「python化」されます。つまり、混合ケースは小文字になります。
bare_fields – SQLiteのみ。イントロスペクトされたカラムのデータ型を指定しないでください。
include_views – VIEWのモデルも生成します。
- 戻り値
テーブル名をモデルクラスにマッピングする辞書。
データベースをイントロスペクトし、テーブル、カラム、および外部キー制約を読み込んでから、各データベーステーブルを動的に生成された
Model
クラスにマッピングする辞書を生成します。
データベースURL
このモジュールには、URL接続文字列からデータベース接続を生成するためのヘルパー関数が含まれています。
- connect(url, **connect_params)
指定された接続URLから
Database
インスタンスを作成します。例
sqlite:///my_database.db は、現在のディレクトリ内のファイル
my_database.db
のSqliteDatabase
インスタンスを作成します。sqlite:///:memory: は、インメモリの
SqliteDatabase
インスタンスを作成します。postgresql://postgres:my_password@localhost:5432/my_database は、
PostgresqlDatabase
インスタンスを作成します。ユーザー名とパスワード、および接続先のホストとポートが提供されます。mysql://user:passwd@ip:port/my_db は、ローカルMySQLデータベースmy_dbの
MySQLDatabase
インスタンスを作成します。mysql+pool://user:passwd@ip:port/my_db?max_connections=20&stale_timeout=300 は、max_connectionsが20に設定され、stale_timeout設定が300秒のローカルMySQLデータベースmy_dbの
PooledMySQLDatabase
インスタンスを作成します。
サポートされているスキーム
apsw
:APSWDatabase
mysql
:MySQLDatabase
mysql+pool
:PooledMySQLDatabase
postgres
:PostgresqlDatabase
postgres+pool
:PooledPostgresqlDatabase
postgresext
:PostgresqlExtDatabase
postgresext+pool
:PooledPostgresqlExtDatabase
sqlite
:SqliteDatabase
sqliteext
:SqliteExtDatabase
sqlite+pool
:PooledSqliteDatabase
sqliteext+pool
:PooledSqliteExtDatabase
使い方
import os from playhouse.db_url import connect # Connect to the database URL defined in the environment, falling # back to a local Sqlite database if no database URL is specified. db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')
- parse(url)
指定されたURL内の情報を、
database
、host
、port
、user
、および/またはpassword
を含む辞書に解析します。追加の接続引数は、URLクエリ文字列で渡すことができます。カスタムデータベースクラスを使用している場合は、
parse()
関数を使用してURLから情報を抽出し、それをデータベースオブジェクトに渡すことができます。
- register_database(db_class, *names)
- パラメーター
db_class –
Database
のサブクラス。names – URLスキームとして使用する名前のリスト(例: 'sqlite' または 'firebird')。
指定された名前で追加のデータベースクラスを登録します。この関数を使用すると、
connect()
関数を拡張して、追加のスキームをサポートできます。たとえば、Firebird
用のカスタムデータベースクラスがあり、それがFirebirdDatabase
という名前であるとします。from playhouse.db_url import connect, register_database register_database(FirebirdDatabase, 'firebird') db = connect('firebird://my-firebird-db')
接続プール
pool
モジュールには、PostgreSQL、MySQL、およびSQLiteデータベースの接続プーリングを提供するDatabase
クラスが多数含まれています。プールは、バックエンドへの接続を開閉するDatabase
クラスのメソッドをオーバーライドすることによって機能します。プールでは、接続がリサイクルされるまでのタイムアウトと、オープン接続数の上限を指定できます。
マルチスレッドアプリケーションでは、最大でmax_connectionsの接続が開かれます。各スレッド(または gevent を使用している場合はグリーンレット)は、独自の接続を持ちます。
シングルスレッドアプリケーションでは、1つの接続のみが作成されます。これは、ステールタイムアウトを超えるか、明示的に閉じられる(.manual_close()を使用)まで継続的にリサイクルされます。
デフォルトでは、アプリケーションに必要なことは、使用が終わったときに接続が閉じられることを確認するだけであり、それらはプールに戻されます。Webアプリケーションの場合、これは通常、リクエストの開始時に接続を開き、レスポンスを返すときに接続を閉じることを意味します。
シンプルなPostgresプールコードの例
# Use the special postgresql extensions.
from playhouse.pool import PooledPostgresqlExtDatabase
db = PooledPostgresqlExtDatabase(
'my_app',
max_connections=32,
stale_timeout=300, # 5 minutes.
user='postgres')
class BaseModel(Model):
class Meta:
database = db
以上です!接続のプールをより詳細に制御したい場合は、接続管理セクションを確認してください。
プールAPI
- class PooledDatabase(database[, max_connections=20[, stale_timeout=None[, timeout=None[, **kwargs]]]])
- パラメーター
database (str) – データベース名またはデータベースファイル名。
max_connections (int) – 最大接続数。無制限の場合は
None
を指定します。stale_timeout (int) – 接続の使用を許可する秒数。
timeout (int) – プールが満杯になったときにブロックする秒数。デフォルトでは、peeweeはプールが満杯になってもブロックせず、単に例外をスローします。無期限にブロックするには、この値を
0
に設定します。kwargs – データベースクラスに渡される任意のキーワード引数。
Database
のサブクラスで使用することを目的としたミックスインクラス。注意
接続は、stale_timeoutを超えたときに正確に閉じられるわけではありません。代わりに、ステール接続は、新しい接続が要求されたときにのみ閉じられます。
注意
オープン接続数がmax_connectionsを超えると、ValueErrorが発生します。
- manual_close()
プールに戻さずに、現在開いている接続を閉じます。
- close_idle()
すべてのアイドル接続を閉じます。これには、現在使用中の接続は含まれません。以前に作成されてプールに戻された接続のみが含まれます。
- close_stale([age=600])
- パラメーター
age (int) – 接続がステールと見なされるまでの時間。
- 戻り値
閉じられた接続の数。
使用中だが、指定された期間を超えている接続を閉じます。このメソッドを呼び出すときは注意してください!
- close_all()
すべての接続を閉じます。これには、その時点で使用中の可能性のある接続も含まれます。このメソッドを呼び出すときは注意してください!
- class PooledPostgresqlDatabase
PostgresqlDatabase
のサブクラスで、PooledDatabase
ヘルパーをミックスインします。
- class PooledPostgresqlExtDatabase
PostgresqlExtDatabase
のサブクラスで、PooledDatabase
ヘルパーをミックスインします。PostgresqlExtDatabase
は、Postgresql拡張機能モジュールの一部であり、多くのPostgres固有の機能のサポートを提供します。
- class PooledMySQLDatabase
MySQLDatabase
のサブクラスで、PooledDatabase
ヘルパーをミックスインします。
- class PooledSqliteDatabase
SQLiteアプリの永続的な接続。
- class PooledSqliteExtDatabase
SQLite拡張機能アドバンスドデータベースドライバー
SqliteExtDatabase
を使用する、SQLiteアプリの永続的な接続。
テストユーティリティ
peeweeプロジェクトのテストに役立つユーティリティが含まれています。
- class count_queries([only_select=False])
コンテキスト内で実行されたクエリの数をカウントするコンテキストマネージャー。
- パラメーター
only_select (bool) – SELECTクエリのみをカウントします。
with count_queries() as counter: huey = User.get(User.username == 'huey') huey_tweets = [tweet.message for tweet in huey.tweets] assert counter.count == 2
- count
実行されたクエリの数。
- get_queries()
SQLクエリとパラメータのリストからなる2タプルのリストを返します。
- assert_query_count(expected[, only_select=False])
デコレートされた関数内で実行されたクエリの数が期待される数と等しくない場合、
AssertionError
を発生させる関数またはメソッドデコレーター。class TestMyApp(unittest.TestCase): @assert_query_count(1) def test_get_popular_blogs(self): popular_blogs = Blog.get_popular() self.assertEqual( [blog.title for blog in popular_blogs], ["Peewee's Playhouse!", "All About Huey", "Mickey's Adventures"])
この関数はコンテキストマネージャーとしても使用できます。
class TestMyApp(unittest.TestCase): def test_expensive_operation(self): with assert_query_count(1): perform_expensive_operation()
Flaskユーティリティ
playhouse.flask_utils
モジュールには、peeweeをFlask Webフレームワークと統合するためのいくつかのヘルパーが含まれています。
データベースラッパー
FlaskDB
クラスは、Flaskアプリケーション内からPeeweeデータベースを構成および参照するためのラッパーです。その名前に惑わされないでください。これはpeeweeデータベースと同じものではありません。FlaskDB
は、Flaskアプリから次のボイラープレートを削除するように設計されています。
アプリの設定データに基づいてPeeweeデータベースインスタンスを動的に作成します。
アプリケーションのすべてのモデルが継承する基本クラスを作成します。
データベース接続を開閉するためのリクエストの開始時と終了時にフックを登録します。
基本的な使い方
import datetime
from flask import Flask
from peewee import *
from playhouse.flask_utils import FlaskDB
DATABASE = 'postgresql://postgres:password@localhost:5432/my_database'
# If we want to exclude particular views from the automatic connection
# management, we list them this way:
FLASKDB_EXCLUDED_ROUTES = ('logout',)
app = Flask(__name__)
app.config.from_object(__name__)
db_wrapper = FlaskDB(app)
class User(db_wrapper.Model):
username = CharField(unique=True)
class Tweet(db_wrapper.Model):
user = ForeignKeyField(User, backref='tweets')
content = TextField()
timestamp = DateTimeField(default=datetime.datetime.now)
上記のコード例では、指定されたデータベースURLによって指定されたpeewee PostgresqlDatabase
が作成およびインスタンス化されます。リクエストが受信されると接続を確立し、レスポンスが送信されると自動的に接続を閉じるようにリクエストフックが構成されます。最後に、FlaskDB
クラスは、アプリケーションのモデルの基本として使用できるFlaskDB.Model
プロパティを公開します。
FlaskDB
ラッパーによって構成された、ラップされたPeeweeデータベースインスタンスにアクセスする方法を次に示します。
# Obtain a reference to the Peewee database instance.
peewee_db = db_wrapper.database
@app.route('/transfer-funds/', methods=['POST'])
def transfer_funds():
with peewee_db.atomic():
# ...
return jsonify({'transfer-id': xid})
注意
実際のpeeweeデータベースには、FlaskDB.database
属性を使用してアクセスできます。
FlaskDB
を使用してPeeweeデータベースを構成する別の方法を次に示します。
app = Flask(__name__)
db_wrapper = FlaskDB(app, 'sqlite:///my_app.db')
上記の例ではデータベースURLの使用を示していますが、より高度な使用法では、構成オプションの辞書を指定したり、単にpeewee Database
インスタンスを渡したりできます。
DATABASE = {
'name': 'my_app_db',
'engine': 'playhouse.pool.PooledPostgresqlDatabase',
'user': 'postgres',
'max_connections': 32,
'stale_timeout': 600,
}
app = Flask(__name__)
app.config.from_object(__name__)
wrapper = FlaskDB(app)
pooled_postgres_db = wrapper.database
peewee Database
オブジェクトの使用
peewee_db = PostgresqlExtDatabase('my_app')
app = Flask(__name__)
db_wrapper = FlaskDB(app, peewee_db)
アプリケーションファクトリを使用したデータベース
アプリケーションファクトリパターンを使用する場合は、FlaskDB
クラスはinit_app()
メソッドを実装します。
ファクトリとして使用
db_wrapper = FlaskDB()
# Even though the database is not yet initialized, you can still use the
# `Model` property to create model classes.
class User(db_wrapper.Model):
username = CharField(unique=True)
def create_app():
app = Flask(__name__)
app.config['DATABASE'] = 'sqlite:////home/code/apps/my-database.db'
db_wrapper.init_app(app)
return app
クエリユーティリティ
flask_utils
モジュールは、Webアプリでクエリを管理するためのいくつかのヘルパーを提供します。一般的なパターンには次のようなものがあります。
- get_object_or_404(query_or_model, *query)
- パラメーター
query_or_model –
Model
クラス、または事前にフィルタリングされたSelectQuery
のいずれか。query – 任意の複雑なpeewee式。
指定されたクエリに一致するオブジェクトを取得するか、404 not foundレスポンスを返します。一般的なユースケースとしては、Weblogの詳細ページなどがあります。指定されたURLに一致する投稿を取得するか、404を返す必要があります。
例
@app.route('/blog/<slug>/') def post_detail(slug): public_posts = Post.select().where(Post.published == True) post = get_object_or_404(public_posts, (Post.slug == slug)) return render_template('post_detail.html', post=post)
- object_list(template_name, query[, context_variable='object_list'[, paginate_by=20[, page_var='page'[, check_bounds=True[, **kwargs]]]]])
- パラメーター
template_name – レンダリングするテンプレートの名前。
query – ページネーションする
SelectQuery
インスタンス。context_variable – ページ分割されたオブジェクトリストに使用するコンテキスト変数名。
paginate_by – 1ページあたりのオブジェクト数。
page_var – ページを含む
GET
引数の名前。check_bounds – 指定されたページが有効なページであることを確認するかどうか。
check_bounds
がTrue
で、無効なページが指定された場合、404が返されます。kwargs – テンプレートコンテキストに渡す任意のキー/値ペア。
指定されたクエリで指定されたオブジェクトのページ分割されたリストを取得します。ページ分割されたオブジェクトリストは、指定された
context_variable
、現在のページとページ総数に関するメタデータ、およびキーワード引数として渡された任意のコンテキストデータを使用してコンテキストにドロップされます。ページは、
page
GET
引数を使用して指定されます。たとえば、/my-object-list/?page=3
は3番目のページを返します。例
@app.route('/blog/') def post_index(): public_posts = (Post .select() .where(Post.published == True) .order_by(Post.timestamp.desc())) return object_list( 'post_index.html', query=public_posts, context_variable='post_list', paginate_by=10)
テンプレートには次のコンテキストがあります
post_list
。最大10件の投稿を含むリスト。page
。page
GET
パラメーターの値に基づいて現在のページが含まれます。pagination
。PaginatedQuery
インスタンス。
- class PaginatedQuery(query_or_model, paginate_by[, page_var='page'[, check_bounds=False]])
- パラメーター
query_or_model –
Model
または、ページ分割するレコードのコレクションを含むSelectQuery
インスタンスのいずれか。paginate_by – 1ページあたりのオブジェクト数。
page_var – ページを含む
GET
引数の名前。check_bounds – 指定されたページが有効なページであることを確認するかどうか。
check_bounds
がTrue
で、無効なページが指定された場合、404が返されます。
GET
引数に基づいてページネーションを実行するヘルパークラス。- get_page()
page_var
GET
パラメーターの値で示される、現在選択されているページを返します。ページが明示的に選択されていない場合、このメソッドは最初のページを示す1を返します。
- get_page_count()
可能なページの総数を返します。
- get_object_list()
get_page()
の値を使用して、ユーザーがリクエストしたオブジェクトのページを返します。戻り値は、適切なLIMIT
句とOFFSET
句を持つSelectQuery
です。check_bounds
がTrue
に設定されていて、リクエストされたページにオブジェクトが含まれていない場合、404が発生します。