使用 Couchbase 的 NoSQL(分散式 / 大數據)資料庫 (已棄用)¶
「已棄用」
本教學已棄用,將在未來版本中移除。
FastAPI 也可以與任何 NoSQL 整合。
這裡我們將看到一個使用 Couchbase 的範例,它是一個以 文件 為基礎的 NoSQL 資料庫。
您可以將其調整以適用於任何其他 NoSQL 資料庫,例如
- MongoDB
- Cassandra
- CouchDB
- ArangoDB
- ElasticSearch 等等。
提示
有一個官方專案產生器,使用 FastAPI 和 Couchbase,所有都基於 Docker,包含前端和其他工具:https://github.com/tiangolo/full-stack-fastapi-couchbase
匯入 Couchbase 元件¶
目前,請先忽略其他部分,只注意匯入的部分
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
定義一個常數以用作「文件類型」¶
我們稍後將在文件中將其用作固定欄位 type
。
Couchbase 並不要求這樣做,但這是一個良好的做法,之後會對您有所幫助。
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
新增一個函式以取得 Bucket
¶
在 Couchbase 中,bucket 是一組文件,可以是不同類型的。
它們通常都與同一個應用程式相關。
在關聯式資料庫世界中的類比是一個「資料庫」(一個特定的資料庫,而不是資料庫伺服器)。
在 MongoDB 中的類比是一個「集合」。
在程式碼中,Bucket
代表與資料庫通訊的主要入口點。
這個工具函式將
- 連接到一個 Couchbase 叢集(可能是一台單機)。
- 設定逾時的預設值。
- 在叢集中進行驗證。
- 取得一個
Bucket
實例。- 設定逾時的預設值。
- 回傳它。
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
建立 Pydantic 模型¶
由於 Couchbase 的「文件」實際上只是「JSON 物件」,我們可以用 Pydantic 對它們進行建模。
User
模型¶
首先,讓我們建立一個 User
模型
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
我們將在我們的*路徑操作函式*中使用這個模型,因此,我們不在其中包含 hashed_password
。
UserInDB
模型¶
現在,讓我們建立一個 UserInDB
模型。
這將包含實際儲存在資料庫中的資料。
我們並非將它創建為 Pydantic 的 BaseModel
的子類別,而是創建為我們自己的 User
的子類別,因為它將擁有 User
中的所有屬性,再加上一些額外的屬性。
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
備註
請注意,我們有一個 hashed_password
和一個 type
欄位,它們將儲存在資料庫中。
但它不是一般 User
模型(我們將在*路徑操作*中返回的模型)的一部分。
取得使用者¶
現在創建一個函式,它將
- 取得使用者名稱。
- 從使用者名稱產生文件 ID。
- 取得具有該 ID 的文件。
- 將文件內容放入
UserInDB
模型中。
藉由創建一個專用於從 username
(或任何其他參數)取得使用者的函式,使其獨立於您的*路徑操作函式*,您可以更輕鬆地在多個部分重複使用它,並且還可以為它新增單元測試。
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
f 字串¶
如果您不熟悉 f"userprofile::{username}"
,它是 Python 的「f 字串」。
在 f 字串中放在 {}
內的任何變數都將在字串中展開/注入。
dict
解包¶
如果您不熟悉 UserInDB(**result.value)
,它正在使用 dict
「解包」。
它將取得 result.value
中的 dict
,並取得其每個鍵和值,並將它們作為鍵值對以關鍵字引數的形式傳遞給 UserInDB
。
因此,如果 dict
包含
{
"username": "johndoe",
"hashed_password": "some_hash",
}
它將作為以下形式傳遞給 UserInDB
UserInDB(username="johndoe", hashed_password="some_hash")
創建您的 FastAPI 程式碼¶
創建 FastAPI
應用程式¶
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
創建*路徑操作函式*¶
由於我們的程式碼正在呼叫 Couchbase,而且我們沒有使用實驗性的 Python await
支援,我們應該使用普通的 def
而不是 async def
來宣告我們的函式。
此外,Couchbase 建議不要在多個「執行緒」中使用單個 Bucket
物件,因此,我們可以直接取得 bucket 並將其傳遞給我們的工具函式。
from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user
摘要¶
您可以整合任何第三方 NoSQL 資料庫,只需使用它們的標準套件即可。
這同樣適用於任何其他外部工具、系統或 API。