TinyDB是一个小型,简单易用,面向文档的数据库;代码仅1800行,纯python编写。TinyDB项目大小刚好,学习它可以了解NOSQL数据库的实现,本文包括下面几部分:
本次阅读采用的版本号是 4.0.0
, 项目结构如下:
文件 | 描述 |
---|---|
database | database的实现 |
middlewares | 中间件实现,包括cache |
operations | 对database的一些操作方法 |
queries | 查询功能实现 |
storages | 存储功能实现 |
table | 文档;表/集合功能实现 |
utils | 工具类 |
version | 版本号 |
项目的类图:
tinydb类图
从类图可以看到,代码主要集中在database,table和query三个部分。
TinyDB的使用示例:
from tinydb import TinyDB
from tinydb import Query
from tinydb import JSONStorage
from tinydb.middlewares import CachingMiddleware
db = TinyDB('cache_db.json', storage=CachingMiddleware(JSONStorage))
db.purge_tables() # 重置数据
db.insert({'int': 1, 'char': 'a'}) # 插入数据
db.insert({'int': 2, 'char': 'b'})
table = db.table('user')
table.insert({'name': "shawn", "age": 18})
table.insert({'name': "shelton", "age": 28})
print(table.all()) # [{'name': 'shawn', 'age': 18}, {'name': 'shelton', 'age': 28}]
User = Query()
table.update({'name': 'shawn', 'age': 19}, User.name == 'shawn') # 修改数据
print(table.search(User.name == 'shawn')) # [{'name': 'shawn', 'age': 19}]
table.remove(User.name == 'shawn') # 删除数据
db.close()
上面的示例演示了数据库的CRUD等基础功能,可见tinydb的api非常简洁直观:
从下面的数据文件cache_db.json
,可以看到每个文档都有一个int型id,标识doc的唯一性。
{
"_default": {
"9": {
"int": 1,
"char": "a"
},
"10": {
"int": 2,
"char": "b"
}
},
"user": {
"2": {
"name": "shelton",
"age": 28
}
}
}
storages 包括下面3个类:
先看简单的 MemoryStorage 实现:
class MemoryStorage(Storage):
def __init__(self):
super().__init__()
self.memory = None
def read(self) -> Optional[Dict[str, Dict[str, Any]]]:
return self.memory
def write(self, data: Dict[str, Dict[str, Any]]):
self.memory = data
storage的实现就是每次更换数据全量,这里的data是整个database的数据。
再看默认的 JSONStorage 实现:
class JSONStorage(Storage):
"""
Store the data in a JSON file.
"""
def __init__(self, path: str, create_dirs=False, encoding=None, **kwargs):
super().__init__()
touch(path, create_dirs=create_dirs) # 创建文件及目录
self.kwargs = kwargs
self._handle = open(path, 'r+', encoding=encoding) # 文件读写
def close(self) -> None:
self._handle.close() # 文件存储,需要合法的关闭
def read(self) -> Optional[Dict[str, Dict[str, Any]]]:
self._handle.seek(0, os.SEEK_END)
size = self._handle.tell() # 判断文档内容大小
if not size:
return None
else:
self._handle.seek(0)
return json.load(self._handle) # 加载数据
def write(self, data: Dict[str, Dict[str, Any]]):
self._handle.seek(0)
serialized = json.dumps(data, **self.kwargs) # json序列化
self._handle.write(serialized)
self._handle.flush()
os.fsync(self._handle.fileno()) # 强制写入磁盘,保存数据
self._handle.truncate() # 截断原始数据
JSONStorage 主要就是文件的操作,然后进行json数据序列化和反序列化。官方的参考文档中还有扩展的 YAMLStorage ,大家可以通过参考链接自行去查看。
document是普通字典+doc_id属性,非常简单
class Document(dict):
def __init__(self, value: Mapping, doc_id: int):
super().__init__(value)
self.doc_id = doc_id
Table的实现代码较多,先看构造方法:
class Table:
document_class = Document # 存储数据类,可以被扩展
document_id_class = int # 数据主键,默认int,可以被扩展
query_cache_class = LRUCache # 查询缓存
default_query_cache_capacity = 10 # 查询缓存容量
def __init__(
self,
storage: Storage,
name: str,
cache_size: int = default_query_cache_capacity
):
self._storage = storage # 存储的引用
self._name = name # 表名
self._query_cache = self.query_cache_class(capacity=cache_size) \
# type: LRUCache[Query, List[Document]]
self._next_id = None # 主键记录
Table主要包括读和写两部分,我们先看写的代表 search 方法
def search(self, cond: Query) -> List[Document]:
if cond in self._query_cache: # 优先从查询缓存获取
docs = self._query_cache.get(cond)
if docs is not None:
return docs[:]
docs = [doc for doc in self if cond(doc)] # 使用Query判断数据是否符合条件
self._query_cache[cond] = docs[:] # 缓存下次使用
return docs
table 对象可以迭代,是因为实现了 iter
方法
def __iter__(self) -> Iterator[Document]:
"""
Iterate over all documents stored in the table.
:returns: an iterator over all documents.
"""
# Iterate all documents and their IDs
for doc_id, doc in self._read_table().items(): # 读取所有数据
# Convert documents to the document class
yield self.document_class(doc, doc_id) # 包装Document对象
重点之一在 read_table 方法实现:
def _read_table(self) -> Dict[int, Mapping]:
tables = self._storage.read() # 从storage读取数据
...
table = tables[self.name] # 获取当前表
...
return {
self.document_id_class(doc_id): doc
for doc_id, doc in table.items()
} # 生成全部数据
查询中还有一个重点在于查询条件的处理,这是由Query实现的,稍后再介绍。继续查看插入数据方法 insert 的实现:
def insert(self, document: Mapping) -> int:
...
doc_id = self._get_next_id() # 获取自增ID
def updater(table: dict):
...
table[doc_id] = dict(document) # 插入数据
self._update_table(updater) # 抽象的更新表方法
return doc_id
主键自增的 get_next_id 方法
def _get_next_id(self):
if self._next_id is not None: # 第一条记录
next_id = self._next_id
self._next_id = next_id + 1
return next_id # 快速返回
table = self._read_table() # 从存储中读取数据
if not table: # 空表
next_id = 1
self._next_id = next_id + 1
return next_id
# 查找已有数据的最大主键
max_id = max(self.document_id_class(i) for i in table.keys())
self._next_id = max_id + 1 # 主键自增
return self._next_id
更新数据最重要的 update_table :
def _update_table(self, updater: Callable[[Dict[int, Mapping]], None]):
tables = self._storage.read() # 载入已有database数据
if tables is None:
# The database is empty
tables = {}
try:
raw_table = tables[self.name] # 读取table数据
except KeyError:
# The table does not exist yet, so it is empty
raw_table = {}
# 转换document对象
table = {
self.document_id_class(doc_id): doc
for doc_id, doc in raw_table.items()
}
updater(table) # 更新数据
tables[self.name] = {
str(doc_id): doc
for doc_id, doc in table.items()
} # 封装document
self._storage.write(tables) # 写入数据
self.clear_cache() # 数据变动,清空查询缓存
可以看到更新数据模版主要步骤是:
query是可以进行布尔运算和算术运算的conditon,由QueryInstance父类和Query子类两级实现。QueryInstance定义了布尔运算的规则,Query定义了算术运算的规则。
class QueryInstance:
def __init__(self, test: Callable[[Mapping], bool], hashval: Tuple):
self._test = test # 计算函数
self._hash = hashval # hash值
def __call__(self, value: Mapping) -> bool: # 执行获取布尔值
return self._test(value)
def __hash__(self):
return hash(self._hash)
def __repr__(self):
return 'QueryImpl{}'.format(self._hash)
def __eq__(self, other: object):
if isinstance(other, QueryInstance): # 类型和hash相同
return self._hash == other._hash
return False
QueryInstance使用hash值确定对象的唯一性,布尔运算 and
, or
和 not
也都是基于对象的hash判断。frozenset 是给对象计算hash值的关键函数,在utils中提供。
def __and__(self, other: 'QueryInstance') -> 'QueryInstance':
return QueryInstance(lambda value: self(value) and other(value),
('and', frozenset([self._hash, other._hash])))
def __or__(self, other: 'QueryInstance') -> 'QueryInstance':
return QueryInstance(lambda value: self(value) or other(value),
('or', frozenset([self._hash, other._hash])))
def __invert__(self) -> 'QueryInstance':
return QueryInstance(lambda value: not self(value),
('not', self._hash))
Query子类中定义了算术运算的函数,包括:
算术运算和布尔运算不同,是基于Query对象的条件进行判断:
def _generate_test(
self,
test: Callable[[Any], bool],
hashval: Tuple,
) -> QueryInstance:
def runner(value):
try:
# Resolve the path
for part in self._path:
value = value[part] # 字典取值
except (KeyError, TypeError):
return False
else:
return test(value)
return QueryInstance(
lambda value: runner(value),
hashval
)
def __eq__(self, rhs: Any):
"""
Test a dict value for equality.
>>> Query().f1 == 42
:param rhs: The value to compare against
"""
return self._generate_test(
lambda value: value == rhs, # 判断逻辑,使用匿名函数
('==', self._path, freeze(rhs))
)
query对象的条件是这样设置的:
# Query().f1 == 42
def __getattr__(self, item: str):
# Generate a new query object with the new query path
# We use type(self) to get the class of the current query in case
# someone uses a subclass of ``Query``
query = type(self)() # 新生成query对象
# Now we add the accessed item to the query path ...
query._path = self._path + (item,)
# ... and update the query hash
query._hash = ('path', query._path)
return query
query还提供了一些api:exists, matches, search, test, any, all 和 one_of 进行集合判断。
database只是维护表的集合和存储,整体实现很简单:
class TinyDB:
table_class = Table
default_table_name = '_default'
default_storage_class = JSONStorage # 默认存储实现,可以扩展
def __init__(self, *args, **kwargs):
storage = kwargs.pop('storage', self.default_storage_class)
self._storage = storage(*args, **kwargs) # 准备存储实现
self._opened = True
self._tables = {} # 支持多表
def __getattr__(self, name):
return getattr(self.table(self.default_table_name), name)
# db.insert({'int': 1, 'char': 'a'}) # insert语法通过getattr透传到default-table
cache是数据库的重要实现,tinydb提供了2种cache。一种是table的query-cache, 比如之前的search查询:
def search(self, cond: Query) -> List[Document]:
if cond in self._query_cache:
docs = self._query_cache.get(cond)
if docs is not None:
return docs[:]
...
查询缓存使用LRU实现,LRU全称Least Recently Used:最近最少使用淘汰算法。同类的还有,LFU全称Least Frequently Used),最不经常使用淘汰算法。LFU是淘汰一段时间内,使用次数最少的数据;LRU是淘汰最长时间没有被使用的数据,更多说明请见参考链接。
class LRUCache(abc.MutableMapping, Generic[K, V]):
def __init__(self, capacity=None):
self.capacity = capacity # 缓存容量
self.cache = OrderedDict() # 有序字典
def get(self, key: K, default: D = None) -> Optional[Union[V, D]]:
value = self.cache.get(key) # 从换成获取
if value is not None:
del self.cache[key]
self.cache[key] = value # 更新缓存顺序
return value
return default
def set(self, key: K, value: V):
if self.cache.get(key):
del self.cache[key]
self.cache[key] = value # 更新缓存顺序及值
else:
self.cache[key] = value
if self.capacity is not None and self.length > self.capacity:
self.cache.popitem(last=False) # 淘汰最古老的数据
虽然LRUCache的实现比较简单,容量无法字典增长,数据每次淘汰一条比较低效,但是也体现了缓存实现的主要特点:
另外一种cache是,数据写入的cache。默认的storage中,每次有数据更新都要写入磁盘,这样效率较低,cachemiddleware中对数据进行缓存,变成N次数据变动后一次写入:
class CachingMiddleware(Middleware):
WRITE_CACHE_SIZE = 1000 # 变动上限
def __init__(self, storage_cls):
super().__init__(storage_cls)
self.cache = None
self._cache_modified_count = 0 # 变动计数
def read(self):
if self.cache is None:
self.cache = self.storage.read() # 初始化
return self.cache
def write(self, data):
self.cache = data # 缓存数据
self._cache_modified_count += 1 # 计数增长
# 判读是否需要写入磁盘
if self._cache_modified_count >= self.WRITE_CACHE_SIZE:
self.flush()
def close(self): # 安全关闭
self.flush()
self.storage.close()
我们可以简单小结一下文档型数据库的实现:
python 抽象类在 ABC
模块中提供,使用 abstractmethod 配合 NotImplementedError 异常定义:
class Storage(ABC):
@abstractmethod
def read(self) -> Optional[Dict[str, Dict[str, Any]]]:
raise NotImplementedError('To be overridden!')
@abstractmethod
def write(self, data: Dict[str, Dict[str, Any]]) -> None:
raise NotImplementedError('To be overridden!')
def close(self) -> None:
pass
python3 支持数据类型的 annotation ,比如read方法约定了返回值是一个字典嵌套字典的参数或者None,所以是 Optional类型;write方法的数据也是字典嵌套,没有返回值。
from typing import Dict, Any, Optional
@abstractmethod
def read(self) -> Optional[Dict[str, Dict[str, Any]]]:
pass
@abstractmethod
def write(self, data: Dict[str, Dict[str, Any]]) -> None:
pass
类型注释还支持自引用:
def __and__(self, other: 'QueryInstance') -> 'QueryInstance': # 自引用的类型注释,当前QueryInstance类还未创建
return QueryInstance(lambda value: self(value) and other(value),
('and', frozenset([self._hash, other._hash])))
自定义对象的hash
class FrozenDict(dict):
def __hash__(self):
# 转换为元祖,利用元祖不可变的特性计算hash值
return hash(tuple(sorted(self.items())))
def _immutable(self, *args, **kws):
raise TypeError('object is immutable')
# Disable write access to the dict
__setitem__ = _immutable
__delitem__ = _immutable
clear = _immutable
setdefault = _immutable
popitem = _immutable
def update(self, e=None, **f):
raise TypeError('object is immutable')
def pop(self, k, d=None):
raise TypeError('object is immutable')
def freeze(obj):
"""
使用递归方式一个对象转换成可以hash的对象
"""
if isinstance(obj, dict):
# Transform dicts into ``FrozenDict``s
return FrozenDict((k, freeze(v)) for k, v in obj.items())
elif isinstance(obj, list):
# Transform lists into tuples
return tuple(freeze(el) for el in obj)
elif isinstance(obj, set):
# Transform sets into ``frozenset``s
return frozenset(obj)
else:
# Don't handle all other objects
return obj
tinydb项目使用 poetry 进行虚拟环境管理,关于poetry的使用请看Python虚拟环境指南2020版 ;使用 pytest 进行测试用例管理。
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/V5rmHiKlyayUcMEBuzDidA
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。