基本数据结构 与 概念
元组
和列表
对比以及差异
1 都支持切片操作
2 可以随意嵌套
l = [[1, 2, 3], [4, 5]] # 列表的每一个元素也是一个列表
tup = ((1, 2, 3), (4, 5, 6)) # 元组的每一个元素也是一元组
3 都支持负数索引
4 是否是动态的
列表是动态的,长度大小不固定,可以随意地增加、删减或者改变元素(mutable)。
元组是静态的,长度大小固定,无法增加删减或者改变(immutable
列表
和元组存储方式
的差异
1 列表是动态的,所以它需要存储指针,来指向对应的元素
l = [1, 2, 3]
l.__sizeof__()
# 64
tup = (1, 2, 3)
tup.__sizeof__()
# 48
2 由于列表可变,所以需要额外存储已经分配的长度大小(8 字节)
l = []
l.__sizeof__() // 空列表的存储空间为 40 字节
40
l.append(1)
l.__sizeof__()
72 // 加入了元素 1 之后,列表为其分配了可以存储 4 个元素的空间 (72 - 40)/8 = 4
l.append(2)
l.__sizeof__()
72 // 由于之前分配了空间,所以加入元素 2,列表空间不变
l.append(3)
l.__sizeof__()
72 // 同上
l.append(4)
l.__sizeof__()
72 // 同上
l.append(5)
l.__sizeof__()
104 // 加入元素 5 之后,列表的空间不足,所以又额外分配了可以存储 4 个元素的空间
列表
和元组
的性能
元组要比列表更加轻量级一些 元组的性能速度要略优于列表
# 初始化一个相同元素的列表和元组分别所需的时间
python3 -m timeit 'x=(1,2,3,4,5,6)'
20000000 loops, best of 5: 9.97 nsec per loop
python3 -m timeit 'x=[1,2,3,4,5,6]'
5000000 loops, best of 5: 50.1 nsec per loop
列表
和元组
的使用场景
存储的数据和数量不变 -> 元组
比如你有一个函数,需要返回的是一个地点的经纬度,然后直接传给前端渲染
存储的数据或数量是可变的 -> 列表
字典
和集合
在 Python3.7+
,字典被确定为有序
(注意:在 3.6 中,字典有序是一个 implementation detail,
在 3.7 才正式成为语言特性,因此 3.6 中无法 100% 确保其有序性)
字典
是一系列由键(key)和值(value)配对组成的元素的集合
集合
和字典基本相同,唯一的区别,就是集合没有键和值的配对,是一系列无序的、唯一的元素组合
集合并
不支持索引操作
,因为集合本质上是一个哈希表
字典
和集合
的工作原理
字典和集合的内部结构都是一张 哈希表
字典 哈希值(hash)、键和值
记录 {'name': 'mike', 'dob': '1999-01-01', 'gender': 'male'}
old 哈希表结构
--+-------------------------------+
| 哈希值 (hash) 键 (key) 值 (value)
--+-------------------------------+
0 | hash0 key0 value0
--+-------------------------------+
1 | hash1 key1 value1
--+-------------------------------+
2 | hash2 key2 value2
--+-------------------------------+
. | ...
__+_______________________________+
存储形式
entries = [
['--', '--', '--'] // ’---‘,表示这个位置没有元素,但是已经分配了内存
[-230273521, 'dob', '1999-01-01'],
['--', '--', '--'],
['--', '--', '--'],
[1231236123, 'name', 'mike'],
['--', '--', '--'],
[9371539127, 'gender', 'male']
]
new 哈希表结构
Indices
----------------------------------------------------
None | index | None | None | index | None | index ...
----------------------------------------------------
Entries
--------------------
hash0 key0 value0
---------------------
hash1 key1 value1
---------------------
hash2 key2 value2
---------------------
...
---------------------
新的存储形式
indices = [None, 1, None, None, 0, None, 2]
entries = [
[1231236123, 'name', 'mike'],
[-230273521, 'dob', '1999-01-01'],
[9371539127, 'gender', 'male']
]
空间利用率得到很大的提高
集合 只有单一的元素
-
插入操作
字典或集合插入一个元素 --> 计算键的哈希值(hash(key)) -->> 计算这个元素应该插入哈希表的位置 index = hash(key) & mask (和mask = PyDicMinSize - 1 做与操作) ->> 哈希表中 位置空,个元素被插入 位置占用: 若两者都相等,则表明这个元素已经存在,如果值不同,则更新值。 若两者中有一个不相等,这种情况我们通常称为哈希冲突(hash collision), 意思是两个元素的键不相等,但是哈希值相等。 这种情况下,Python 便会继续寻找表中空余的位置,直到找到位置为止。
-
查找操作
和前面的插入操作类似,Python 会根据哈希值,找到其应该处于的位置; 然后,比较哈希表这个位置中元素的哈希值和键,与需要查找的元素是否相等。 如果相等,则直接返回;如果不等,则继续查找,直到找到空位或者抛出异常为止。
-
删除操作
对于删除操作,Python 会暂时对这个位置的元素,赋于一个特殊的值,等到重新调整哈希表的大小时,再将其删除。
哈希冲突的发生,往往会降低字典和集合操作的速度。
因此,为了保证其高效性,字典和集合内的哈希表,通常会保证其至少留有 1/3 的剩余空间
。
随着元素的不停插入,当剩余空间小于 1/3 时,Python 会重新获取更大的内存空间,扩充哈希表。 不过,这种情况下,表内所有的元素位置都会被重新排放
文件输入输出 I/O
, JSON 序列化
I/O 操作需谨慎,一定要进行充分的错误处理,并细心编码,防止出现编码漏洞;
编码时,对内存占用和磁盘占用要有充分的估计,这样在出错时可以更容易找到原因;
简单的 NLP(自然语言处理)任务
读取文件;
去除所有标点符号和换行符,并把所有大写变成小写;
合并相同的词,统计每个词出现的频率,并按照词频从大到小排序;
将结果按行输出到文件 out.txt。
import re
# 你不用太关心这个函数
def parse(text):
# 使用正则表达式去除标点符号和换行符
text = re.sub(r'[^\w ]', ' ', text)
# 转为小写
text = text.lower()
# 生成所有单词的列表
word_list = text.split(' ')
# 去除空白单词
word_list = filter(None, word_list)
# filter(None, Iterable) 这里的 None,严格意义上等于 lambda x: x, 是一个 callable
# 生成单词和词频的字典
word_cnt = {}
for word in word_list:
if word not in word_cnt:
word_cnt[word] = 0
word_cnt[word] += 1
# 按照词频排序
sorted_word_cnt = sorted(word_cnt.items(), key=lambda kv: kv[1], reverse=True)
return sorted_word_cnt
with open('in.txt', 'r') as fin:
text = fin.read()
word_and_freq = parse(text)
with open('out.txt', 'w') as fout:
for word, freq in word_and_freq:
fout.write('{} {}\n'.format(word, freq))
########## 输出 (省略较长的中间结果) ##########
and 15
be 13
will 11
to 11
the 10
of 10
a 8
we 8
day 6
当读取的文件一行的内容过多的时候, 使用生成器可以减少内存消耗
def chunked_file_reader(fp, block_size=1024 * 8):
"""生成器函数:分块读取文件内容
"""
while True:
chunk = fp.read(block_size)
# 当文件没有更多内容时,read 调用将会返回空字符串 ''
if not chunk:
break
yield chunk
# 也可以使用iter函数
for chunk in iter(partial(fp.read, 1024*8), ''):
return chunk
函数
python 函数的特性
1 多态
函数参数 接受任何数据类型(整型,浮点,字符串等等) dynamically typed
和其他语言,比如 Java、C 等很大的一个不同点
2 支持函数的嵌套
函数嵌套
的主要两个作用
-
1 保证内部函数的隐私
内部函数只能被外部函数所调用和访问,不会暴露在全局作用域 比如: 函数内部有一些隐私数据(比如数据库的用户、密码等)
def connect_DB():
def get_DB_configuration():
...
return host, username, password
conn = connector.connect(get_DB_configuration())
return conn
# 提高程序的安全性
# 函数 get_DB_configuration,是内部函数,它无法在 connect_DB() 函数以外被单独调用
-
2 提高程序的运行效率
实际工作中,如果到相似的情况, 输入检查不是很快,还会耗费一定的资源,那么运用函数的嵌套就十分必要了
比如要 使用递归的方式计算一个数的阶乘
def factorial(input):
if not isinstance(input, int):
raise Exception("input must be a integer")
if input < 0 or input > 10000:
raise Exception("invalid input number")
if input <= 1:
return 1
return input * factorial(input - 1)
# 使用 嵌套函数
def factorial2(input):
if not isinstance(input, int):
raise Exception("input must be a integer")
if input < 0 or input > 10000:
raise Exception("invalid input number")
def inner_factorial(input):
if input <= 1:
return 1
return input * inner_factorial(input - 1)
return inner_factorial(input)
import cProfile
cProfile.run("factorial(200)")
cProfile.run("factorial2(200)")
==================================================================
403 function calls (204 primitive calls) in 0.000 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.000 0.000 <string>:1(<module>)
200/1 0.000 0.000 0.000 0.000 test.py:1(factorial)
1 0.000 0.000 0.000 0.000 {built-in method builtins.exec}
200 0.000 0.000 0.000 0.000 {built-in method builtins.isinstance}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
==================================================================
205 function calls (6 primitive calls) in 0.000 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.000 0.000 <string>:1(<module>)
1 0.000 0.000 0.000 0.000 test.py:13(factorial2)
200/1 0.000 0.000 0.000 0.000 test.py:19(inner_factorial)
1 0.000 0.000 0.000 0.000 {built-in method builtins.exec}
1 0.000 0.000 0.000 0.000 {built-in method builtins.isinstance}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
==================================================================
函数变量作用域
局部变量
变量是在函数内部定义
一旦函数执行完毕,局部变量就会被回收,无法访问
全局变量
定义在整个文件层次上
可以在文件内的任何地方被访问
注意全局变量
和 内部函数变量
的修改 , global
and nonlocal
# ===================================
MIN_VALUE = 1
MAX_VALUE = 10
def validation_check(value):
...
MIN_VALUE += 1
# MIN_VALUE = MIN_VALUE + 1
# 这里的 MIN_VALUE 是 函数内部定义的变量 不同于外部全局的 MIN_VALUE
# local variable 'MIN_VALUE' referenced before assignment
...
validation_check(5)
# global
MIN_VALUE = 1
MAX_VALUE = 10
def validation_check(value):
...
gglobal MIN_VALUE
MIN_VALUE += 1
# MIN_VALUE = MIN_VALUE + 1
# 这里的 MIN_VALUE 是 函数内部定义的变量 不同于外部全局的 MIN_VALUE
# local variable 'MIN_VALUE' referenced before assignment
...
validation_check(5)
# ===================================
def outer():
x = "local"
def inner():
x += "nolocal"
print("inner", x)
inner()
print(x)
outer()
# nonlocal
def outer():
x = "local"
def inner():
nonlocal x
x += "nolocal"
print("inner", x)
inner()
print(x)
outer()
闭包 与 嵌套函数
闭包 外部函数返回 一个函数
嵌套函数 外部函数返回 一个值
def nth_power(exponent):
def exponent_of(base):
return base ** exponent
return exponent_of
square = nth_power(2)
cube = nth_power(3)
print(square(2)) 4
print(cube(2)) 8
# 不使用闭包
res1 = nth_power_rewrite(base1, 2)
res2 = nth_power_rewrite(base2, 2)
res3 = nth_power_rewrite(base3, 2)
...
# 使用闭包
# 简化程序的复杂度,提高可读性
square = nth_power(2)
res1 = square(base1)
res2 = square(base2)
res3 = square(base3)
...
闭包常常和装饰器(decorator)一起使用
异常 except block
中的变量
e = 12
try:
1/0
except Exception as e:
pass
print(e) # name 'e' is not defined
在异常处理的 except block 中,把异常赋予了一个变量,那么这个变量会在 except block 执行结束时被删除
e = 1
try:
1 / 0
except ZeroDivisionError as e:
try:
pass
finally:
del e
一定要保证 except
中异常赋予的变量
,在之后的语句中不再被用到
Python 函数式编程
与 lambda匿名函数
lambda匿名函数
匿名函数的代码简洁很多,也更加符合 Python 的编程习惯
可以配合 函数式编程
- 1 lambda 是一个表达式(expression),并不是一个语句(statement)
(1) lambda 可以用在 一些常规函数 def 不能用的地方
(比如,列表)
[(lambda x: x*x)(x) for x in range(10)]
(2) lambda 可以被用作某些 函数参数
l = [(1, 20), (3, 0), (9, 10), (2, -1)]
l.sort(key=lambda x: x[1]) # 按列表中元祖的第二个元素排序
print(l)
# 输出
[(2, -1), (3, 0), (9, 10), (1, 20)]
-
2 lambda 的主体是只有一行的简单表达式,并不能扩展成一个多行的代码块。
出于设计的考虑 lambda 专注于简单的任务
Python 函数式编程
函数式编程
,是指代码中每一块都是不可变的(immutable)
,都由纯函数(pure function)
的形式组成。
这里的纯函数,是指函数本身相互独立、互不影响
对于相同的输入,总会有相同的输出,没有任何副作用
def multiply_2(l):
for index in range(0, len(l)):
l[index] *= 2
return l
# 纯函数的形式
def multiply_2_pure(l):
new_list = []
for item in l:
new_list.append(item * 2)
return new_list
优点 主要在于其纯函数和不可变的特性使程序更加健壮,易于调试(debug)和测试
缺点 主要在于限制多,难写
Python 不同于一些语言(比如 Scala),它并不是一门函数式编程语言
Python
也提供了一些函数式编程的特性
map()、filter() 和 reduce()
,通常结合匿名函数 lambda
一起使用
以 map() 函数
为例,看一下 Python 提供的函数式编程接口的性能
# map
python -m timeit -s "xs=range(10000)" "map(lambda x: x*2, xs)"
# 1000000 loops, best of 5: 269 nsec per loop
# 列表推导式
python -m timeit -s "xs=range(1000000)" "[x * 2 for x in xs]"
# 2 loops, best of 5: 122 msec per loop
# for 循环
python -m timeit -s "xs=range(1000000)" "l=[]" "for i in xs: l.append(i*2)"
# 2 loops, best of 5: 151 msec per loop
map() 函数直接由 C 语言写的,运行时不需要通过 Python 解释器间接调用,
并且内部做了诸多优化,所以运行速度最快
面向对象基础
类
是一群具有相同属性
和函数
的对象
的集合
面向对象编程四要素 类,属性,函数,对象
class Document():
def __init__(self, title, author, context):
print('init function called')
self.title = title
self.author = author
self.__context = context # __ 开头的属性是私有属性
def get_context_length(self):
return len(self.__context)
def intercept_context(self, length):
self.__context = self.__context[:length]
# 1 如何在一个类中定义一些常量,每个对象都可以方便访问这些常量而不用重新构造?
# (类中 全大写来表示常量)
# 2 如果一个函数不涉及到访问修改这个类的属性,而放到类外面有点不恰当,怎么做才能更优雅呢?
# (静态函数)
# 3 既然类是一群相似的对象的集合,那么可不可以是一群相似的类的集合呢?
# (类的继承)
class Document():
WELCOME_STR = 'Welcome! The context for this book is {}.'
# 在一个类中定义一些常量,每个对象都可以方便访问这些常量而不用重新构造
# 全大写来表示常量
# 类中使用 self.WELCOME_STR
# 类外使用 Entity.WELCOME_STR
def __init__(self, title, author, context):
print('init function called')
self.title = title
self.author = author
self.__context = context
# 类函数
# 产生的影响是动态的,能够访问或者修改对象的属性
@classmethod
def create_empty_book(cls, title, author):
return cls(title=title, author=author, context='nothing')
# 成员函数
# 产生的影响是动态的,能够访问或者修改对象的属性
def get_context_length(self):
return len(self.__context)
# 静态函数
# 静态函数则与类没有什么关联,最明显的特征便是,静态函数的第一个参数没有任何特殊性 (相当于类外边)
@staticmethod
def get_welcome(context):
return Document.WELCOME_STR.format(context)
类: 一群有着相似性的事物的集合,这里对应 Python 的 class。
对象:集合中的一个事物,这里对应由 class 生成的某一个 object,比如代码中的 harry_potter_book。
属性:对象的某个静态特征,比如上述代码中的 title、author 和 __context。
函数:对象的某个动态能力,比如上述代码中的 intercept_context () 函数
类的继承
一个类既拥有另一个类的特征,也拥有不同于另一个类的独特特征
这里的第一个类叫做子类,另一个叫做父类,特征其实就是类的属性和函数
# 1
class Entity():
def __init__(self, object_type):
print('parent class init called')
self.object_type = object_type
def get_context_length(self):
raise Exception('get_context_length not implemented')
def print_title(self):
print(self.title)
class Document(Entity):
def __init__(self, title, author, context):
print('Document class init called')
Entity.__init__(self, 'document')
self.title = title
self.author = author
self.__context = context
def get_context_length(self):
return len(self.__context)
class Video(Entity):
def __init__(self, title, author, video_length):
print('Video class init called')
Entity.__init__(self, 'video')
self.title = title
self.author = author
self.__video_length = video_length
def get_context_length(self):
return self.__video_length
harry_potter_book = Document('Harry Potter(Book)', 'J. K. Rowling',
'... Forever Do not believe any thing is capable of thinking independently ...')
harry_potter_movie = Video('Harry Potter(Movie)', 'J. K. Rowling', 120)
print(harry_potter_book.object_type)
print(harry_potter_movie.object_type)
harry_potter_book.print_title()
harry_potter_movie.print_title()
print(harry_potter_book.get_context_length())
print(harry_potter_movie.get_context_length())
构造函数
– init
每个类都有构造函数
继承类在生成对象的时候, 不会调用父类的构造函数, 需要在 init() 函数中显式调用父类的构造函数
执行顺序 子类的构造函数 -> 父类的构造函数
函数重写
– 约束子类行为
子类必须重新写一遍 get_context_length() 函数,来覆盖掉原有函数, 否则会调用父类的函数
继承父类的函数
– 减少重复的代码,降低系统的熵值(即复杂度)
抽象对象 抽象类
from abc import ABCMeta, abstractmethod
class Entity(metaclass=ABCMeta):
@abstractmethod
def get_title(self):
pass
@abstractmethod
def set_title(self, title):
pass
class Document(Entity):
def get_title(self):
return self.title
def set_title(self, title):
self.title = title
document = Document()
document.set_title('Harry Potter')
print(document.get_title())
entity = Entity() # 抽象类是一种特殊的类,它生下来就是作为父类存在的,一旦对象化就会报错
########## 输出 ##########
Harry Potter
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-7-266b2aa47bad> in <module>()
21 print(document.get_title())
22
---> 23 entity = Entity()
24 entity.set_title('Test')
TypeError: Can't instantiate abstract class Entity with abstract methods get_title, set_title
抽象类
是一种特殊的类,它生下来就是作为父类存在
metaclass=ABCMeta
对象化就会报错
抽象函数
定义在抽象类之中,子类必须重写该函数才能使用
@abstractmethod
软件工程中一个很重要的概念,定义接口
PM(Product Manager,产品经理) 写出产品需求文档,然后迭代
TL(Team Leader,项目经理)编写开发文档,开发文档中会定义不同模块的大致功能和接口
每个模块之间如何协作、单元测试和集成测试、线上灰度测试、监测和日志等等一系列开发流程
抽象类
是一种自上而下的设计风范,你只需要用少量的代码描述清楚要做的事情,定义好接口
,
然后就可以交给不同开发人员去开发和对接
面向对象 + 搜索引擎
+ LRU 缓存
+ 多继承
搜索引擎由搜索器
、索引器
、检索器
和用户接口
四个部分组成
搜索器 爬虫(scrawler)
在互联网上大量爬取各类网站的内容,送给索引器
索引器
索引器拿到网页和内容后,会对内容进行处理,形成索引, 存储于内部的数据库等待检索
用户接口 (网页和 App 前端界面,搜索页面 )
通过用户接口,向搜索引擎发出询问(query),询问解析后送达检索器
检索器
效检索后,再将结果返回给用户
# 基类
class SearchEngineBase(object):
def __init__(self):
pass
def add_corpus(self, file_path):
# 抓取数据
with open(file_path, 'r') as fin:
text = fin.read()
self.process_corpus(file_path, text)
def process_corpus(self, id, text):
# 索引器
raise Exception('process_corpus not implemented.')
def search(self, query):
# 检索器
raise Exception('search not implemented.')
def main(search_engine):
for file_path in ['1.txt', '2.txt']:
search_engine.add_corpus(file_path)
while True:
query = input()
results = search_engine.search(query)
print('found {} result(s):'.format(len(results)))
for result in results:
print(result)
# SimpleEngine
# 继承并实现了 process_corpus 和 search 接口
class SimpleEngine(SearchEngineBase):
def __init__(self):
super(SimpleEngine, self).__init__()
self.__id_to_text = {}
def process_corpus(self, id, text):
self.__id_to_text[id] = text
def search(self, query):
results = []
for id, text in self.__id_to_text.items():
if query in text:
results.append(id)
return results
search_engine = SimpleEngine()
main(search_engine)
# BOWInvertedIndexEngine
import re
class BOWInvertedIndexEngine(SearchEngineBase):
def __init__(self):
super(BOWInvertedIndexEngine, self).__init__()
self.inverted_index = {}
def process_corpus(self, id, text):
words = self.parse_text_to_words(text)
for word in words:
if word not in self.inverted_index:
self.inverted_index[word] = []
self.inverted_index[word].append(id)
def search(self, query):
query_words = list(self.parse_text_to_words(query))
query_words_index = list()
for query_word in query_words:
query_words_index.append(0)
# 如果某一个查询单词的倒序索引为空,我们就立刻返回
for query_word in query_words:
if query_word not in self.inverted_index:
return []
result = []
while True:
# 首先,获得当前状态下所有倒序索引的 index
current_ids = []
for idx, query_word in enumerate(query_words):
current_index = query_words_index[idx]
current_inverted_list = self.inverted_index[query_word]
# 已经遍历到了某一个倒序索引的末尾,结束 search
if current_index >= len(current_inverted_list):
return result
current_ids.append(current_inverted_list[current_index])
# 然后,如果 current_ids 的所有元素都一样,那么表明这个单词在这个元素对应的文档中都出现了
if all(x == current_ids[0] for x in current_ids):
result.append(current_ids[0])
query_words_index = [x + 1 for x in query_words_index]
continue
# 如果不是,我们就把最小的元素加一
min_val = min(current_ids)
min_val_pos = current_ids.index(min_val)
query_words_index[min_val_pos] += 1
@staticmethod
def parse_text_to_words(text):
# 使用正则表达式去除标点符号和换行符
text = re.sub(r'[^\w ]', ' ', text)
# 转为小写
text = text.lower()
# 生成所有单词的列表
word_list = text.split(' ')
# 去除空白单词
word_list = filter(None, word_list)
# 返回单词的 set
return set(word_list)
search_engine = BOWInvertedIndexEngine()
main(search_engine)
LRUCache
定义了一个缓存类,你可以通过继承这个类来调用其方法
使用 pylru
import pylru
class LRUCache(object):
def __init__(self, size=32):
self.cache = pylru.lrucache(size)
def has(self, key):
return key in self.cache
def get(self, key):
return self.cache[key]
def set(self, key, value):
self.cache[key] = value
class BOWInvertedIndexEngineWithCache(BOWInvertedIndexEngine, LRUCache):
def __init__(self):
super(BOWInvertedIndexEngineWithCache, self).__init__()
LRUCache.__init__(self)
def search(self, query):
if self.has(query):
print('cache hit!')
return self.get(query)
result = super(BOWInvertedIndexEngineWithCache, self).search(query)
self.set(query, result)
return result
search_engine = BOWInvertedIndexEngineWithCache()
main(search_engine)
多重继承有两种初始化方法
:
1 super(BOWInvertedIndexEngineWithCache, self).__init__() # 父类必须 是 新式类
2 LRUCache.__init__(self) 传统方法
python模块化
├── utils
│ ├── util.py
│ └── class_util.py
├── src
│ └── sub_main.py
└── main.py
# src/sub_main.py
import sys
sys.path.append("..") # 表示将当前程序所在位置向上提了一级,之后就能调用 utils 的模块
from utils.class_util import *
encoder = Encoder()
decoder = Decoder()
print(encoder.encode('abcde'))
print(encoder.encode('edcba'))
# main.py
from utils.utils import get_sum # 只需要使用 . 代替 / 来表示子目录
print(get_sum(1, 2))
# utils/util.py
def get_sum(a, b):
return a + b
# utils/class_util.py
from typing import List
class Encoder(object):
def encode(self, s):
return s[::-1]
class Decoder(object):
def decode(self, s):
return ''.join(reverse(list(s)))
def reverse(L: List) -> List:
L.reverse()
return L
import
同一个模块只会被执行一次
可以防止重复导入模块出现问题
除了一些极其特殊的情况,import 必须位于程序的最前端
关于 __init_.py
的版本区别
Python 2 __init__.py 内容可以为空,也可以用来表述 包对外暴露的模块接口
Python 3 不是必须的
sys.path.append("..")
,则可以改变当前 Python 解释器的位置
项目模块化 –
尽可能使用绝对位置是第一要义
相对路径
和绝对路径
的概念
绝对路径
在 Linux 系统中,每个文件都有一个绝对路径,以 / 开头,来表示从根目录到叶子节点的路径,例如 /home/ubuntu/Desktop/my_project/test.py。
相对路径
'../../Downloads/example.json', .. 表示上一层目录。
最好实践:尽可能使用绝对位置
相对位置是一种很不好的选择
因为代码可能会迁移,相对位置会使得重构既不雅观,也易出错
使用绝对位置的优点:
1 简化依赖管理
整个公司的代码模块,都可以被你写的任何程序所调用,而你写的库和模块也会被其他人调用。
调用的方式,都是从代码的根目录开始索引,也就是前面提到过的相对的绝对路径。
这样极大地提高了代码的分享共用能力,你不需要重复造轮子
2 版本统一
不存在使用了一个新模块,却导致一系列函数崩溃的情况;
并且所有的升级都需要通过单元测试才可以继续
3 代码追溯
很容易追溯,一个 API 是从哪里被调用的,它的历史版本是怎样迭代开发,产生变化的
├── proto
│ ├── mat.py
├── utils
│ └── mat_mul.py
└── src
└── main.py
# proto/mat.py
class Matrix(object):
def __init__(self, data):
self.data = data
self.n = len(data)
self.m = len(data[0])
# utils/mat_mul.py
from proto.mat import Matrix
def mat_mul(matrix_1: Matrix, matrix_2: Matrix):
assert matrix_1.m == matrix_2.n
n, m, s = matrix_1.n, matrix_1.m, matrix_2.m
result = [[0 for _ in range(n)] for _ in range(s)]
for i in range(n):
for j in range(s):
for k in range(m):
result[i][k] += matrix_1.data[i][j] * matrix_2.data[j][k]
return Matrix(result)
# src/main.py
from proto.mat import Matrix # 直接从项目根目录中导入,并依次向下导入模块 mat.py 中的 Matrix
from utils.mat_mul import mat_mul
a = Matrix([[1, 2], [3, 4]])
b = Matrix([[5, 6], [7, 8]])
print(mat_mul(a, b).data)
PYTHONPATH
设置
# 1
import sys
sys.path[0] = '/home/ubuntu/workspace/your_projects'
# 2 对于每一个项目来说,最好要有一个独立的运行环境来保持包和模块的纯净性
# 在activate文件的末尾
# export PYTHONPATH="/home/ubuntu/workspace/your_projects"
巧用
if __name__ == '__main__'
避开import
时执行
import 在导入文件的时候,会自动把所有暴露在外面的代码全都执行一遍
如果你要把一个东西封装成模块,又想让它可以执行的话,你必须将要执行的代码放在 if __name__ == '__main__'
下面
__name__ 作为 Python 的魔术内置参数 (模块对象的一个属性)
使用 import 语句时,__name__ 就会被赋值为 该模块的名字, 自然就不等于 __main__了
is
与 拷贝
'=='
VS'is'
每个对象的身份标识
,都能通过函数 id(object)
获得
a = 10
b = 10
a == b
True
id(a)
4427562448
id(b)
4427562448
a is b
True
Python 内部会对 -5 到 256
的整型维持一个数组,起到一个缓存的作用
每次你试图创建一个 -5 到 256 范围内的整型数字时,Python 都会从这个数组中返回相对应的引用,
而不是重新开辟一块新的内存空间
超过 这个范围, 会开辟不同的内存区域
-
比较操作符
'is'
的速度效率
通常要优于'=='
'is'操作符 'is'操作符 不能被重载, Python 就不需要去寻找,程序中是否有其他地方重载了比较操作符,并去调用。 执行比较操作符'is',就仅仅 比较两个变量的 ID '=='操作符 执行a == b相当于是去执行 a.__eq__(b),而 Python 大部分的数据类型都会去重载__eq__这个函数,其内部的处理通常会复杂一些 比如,对于列表,__eq__函数会去遍历列表中的元素,比较它们的顺序和值是否相等。
浅拷贝 shallow copy
和深度拷贝 deep copy
- 浅拷贝方法
# 1 使用数据类型本身的构造器
s1 = set([1, 2, 3])
s2 = set(s1)
l1 = [1, 2, 3]
l2 = list(l1)
# 2 切片 ':'
l1 = [1, 2, 3]
l2 = l1[:]
# 3 函数 copy.copy()
import copy
l1 = [1, 2, 3]
l2 = copy.copy(l1)
注意元组
使用 tuple() 或者切片操作符':'不会创建一份浅拷贝,相反,它会返回一个指向相同元组的引用
t1 = (1, 2, 3)
t2 = tuple(t1)
t1 == t2
True
t1 is t2
True
- 浅拷贝
是指重新分配一块内存,创建一个新的对象
,里面的元素是原对象中子对象的引用
l1 = [[1, 2], (30, 40)]
l2 = list(l1) # 重新分配一块内存,创建一个新的对象
l1.append(100) # 不会对 l2 产生任何影响
l1[0].append(3) # 操作后 l1 和 l2 都会改变
l1
[[1, 2, 3], (30, 40), 100]
l2
[[1, 2, 3], (30, 40)]
l1[1] += (50, 60) # 重新创建了一个新元组作为 l1 中的第二个元素 , l2 不变
l1
[[1, 2, 3], (30, 40, 50, 60), 100]
[[1, 2, 3], (30, 40)]
- 深度拷贝
是指重新分配一块内存,创建一个新的对象,并且将原对象中的元素,
以递归的方式
,通过创建新的子对象
拷贝到新对象中。
因此,新对象和原对象没有任何关联。
import copy
l1 = [[1, 2], (30, 40)]
l2 = copy.deepcopy(l1)
l1.append(100)
l1[0].append(3)
l1
[[1, 2, 3], (30, 40), 100]
l2
[[1, 2], (30, 40)]
如果被拷贝对象中存在指向自身的引用
,那么程序很容易陷入无限循环
import copy
x = [1]
x.append(x)
x
[1, [...]]
y = copy.deepcopy(x)
y
[1, [...]]
print(x==y) # RecursionError: maximum recursion depth exceeded in comparison
变量与参数
Python 变量的命名与赋值的原理
变量的赋值,只是表示让变量指向了某个对象,并不表示拷贝对象给变量;而一个对象,可以被多个变量所指向。
可变对象(列表,字典,集合等等)的改变,会影响所有指向该对象的变量。
对于不可变对象(字符串,整型,元祖等等),所有指向该对象的变量的值总是一样的,也不会改变。
但是通过某些操作(+= 等等)更新不可变对象的值时,会返回一个新的对象。
变量可以被删除,但是对象无法被删除
函数的参数传递
Python 的参数传递是赋值传递 (pass by assignment),或者叫作对象的引用传递(pass by object reference)
Python 里所有的数据类型都是对象,所以参数传递时,只是让新变量与原变量指向相同的对象而已
不存在值传递或是引用传递一说
如果对象是可变
的,当其改变时,所有指向这个对象的变量都会改变
。
如果对象不可变
,简单的赋值只能改变其中一个变量的值
,其余变量则不受影响
装饰器
带有自定义参数的装饰器
ef repeat(num):
def my_decorator(func):
def wrapper(*args, **kwargs):
for i in range(num):
print('wrapper of decorator')
func(*args, **kwargs)
return wrapper
return my_decorator
@repeat(4)
def greet(message):
print(message)
greet('hello world')
原函数的元信息
@functools.wrap
import functools
def my_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print('wrapper of decorator')
func(*args, **kwargs)
return wrapper
@my_decorator
def greet(message):
print(message)
greet.__name__
# 输出
'greet'
类装饰器
类装饰器主要依赖于函数__call_()
每当你调用一个类的示例时,函数__call__()就会被执行一次
class Count:
def __init__(self, func):
self.func = func
self.num_calls = 0
def __call__(self, *args, **kwargs):
self.num_calls += 1
print('num of calls is: {}'.format(self.num_calls))
return self.func(*args, **kwargs)
@Count
def example():
print("hello world")
example()
# 输出
num of calls is: 1
hello world
example()
# 输出
num of calls is: 2
hello world
...
# def log_attr(cls):
origin_attr = cls.__getattribute__
def new_attr(self, name):
print(f"getting: {name}")
return origin_attr(self, name)
cls.__getattribute__ = new_attr
return cls
@log_attr
class A:
def __init__(self, x):
self.x = x
def spam(self):
...
a = A(1)
print(a.x)
# 类装饰器 扩展类的功能 (替代元类的简洁方案)
def log_attr(cls):
origin_attr = cls.__getattribute__
def new_attr(self, name):
print(f"getting: {name}")
return origin_attr(self, name)
cls.__getattribute__ = new_attr
return cls
@log_attr
class A:
def __init__(self, x):
self.x = x
def spam(self):
...
a = A(1)
print(a.x)
# 单例模式
def single_class(cls):
cls._instance = None
def new_(cls, *args):
if cls._instance:
return cls._instance
cls._instance = cv = object.__new__(cls)
return cv
cls.__new__ = new_
return cls
@single_class
class A(object):
def __init__(self, name):
self.name = name
装饰器的嵌套
@decorator1
@decorator2
@decorator3
def func():...
decorator1(decorator2(decorator3(func)))
装饰器用法实例
- 身份认证
import functools
def authenticate(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
request = args[0]
if check_user_logged_in(request): # 如果用户处于登录状态
return func(*args, **kwargs) # 执行函数 post_comment()
else:
raise Exception('Authentication failed')
return wrapper
@authenticate
def post_comment(request, ...)
...
- 日志记录
import time
import functools
def log_execution_time(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
res = func(*args, **kwargs)
end = time.perf_counter()
print('{} took {} ms'.format(func.__name__, (end - start) * 1000))
return res
return wrapper
@log_execution_time
def calculate_similarity(items):
...
- 输入合理性检查
import functools
def validation_check(input):
@functools.wraps(func)
def wrapper(*args, **kwargs):
... # 检查输入是否合法
@validation_check
def neural_network_training(param1, param2, ...):
...
-
缓存
LRU cache,在 Python 中的表示形式是@lru_cache。@lru_cache会缓存进程中的函数参数和结果,当缓存满了以后,会删除 least recenly used 的数据。
@lru_cache
def check(param1, param2, ...) # 检查用户设备类型,版本号等等
...
metaclass
“超越类”和“变形类”
metaclass 的超越变形特性
YAML
的动态序列化/逆序列化
增加 `yaml_tag`
# ======================================================
class Monster(yaml.YAMLObject):
yaml_tag = u'!Monster' # cls.yaml_loader.add_constructor(cls.yaml_tag, cls.from_yaml)
def __init__(self, name, hp, ac, attacks):
self.name = name
self.hp = hp
self.ac = ac
self.attacks = attacks
def __repr__(self):
return "%s(name=%r, hp=%r, ac=%r, attacks=%r)" % (
self.__class__.__name__, self.name, self.hp, self.ac,
self.attacks)
Monster(name='Cave spider', hp=[2, 6], ac=16, attacks=['BITE', 'HURT'])
print(yaml.dump(Monster(
name='Cave lizard', hp=[3, 6], ac=16, attacks=['BITE', 'HURT'])))
# !Monster
# ac: 16
# attacks:
# - BITE
# - HURT
# hp:
# - 3
# - 6
# name: Cave lizard
# ======================================================
# Python 2/3 相同部分
class YAMLObjectMetaclass(type):
def __init__(cls, name, bases, kwds):
super(YAMLObjectMetaclass, cls).__init__(name, bases, kwds)
if 'yaml_tag' in kwds and kwds['yaml_tag'] is not None:
cls.yaml_loader.add_constructor(cls.yaml_tag, cls.from_yaml)
# 省略其余定义
# YAML 应用 metaclass,拦截了所有 YAMLObject 子类的定义。
# 在你定义任何 YAMLObject 子类时,Python 会强行插入运行下面这段代码
# cls.yaml_loader.add_constructor(cls.yaml_tag, cls.from_yaml)
# Python 3
class YAMLObject(metaclass=YAMLObjectMetaclass):
# YAMLObject 把 metaclass 都声明成了 YAMLObjectMetaclass
yaml_loader = Loader
# 省略其余定义
# Python 2
class YAMLObject(object):
__metaclass__ = YAMLObjectMetaclass
# YAMLObject 把 metaclass 都声明成了 YAMLObjectMetaclass
yaml_loader = Loader
# 省略其余定义
Python 底层语言设计层面 是如何实现 metaclass
`metaclass` 拦截 python类的定义
-
所有的
Python 的用户定义类
,都是type
这个类的实例
类本身不过是一个名为 type 类的实例。 在 Python 的类型世界里,type 这个类就是造物的上帝
class Myclass:
...
instance = Myclass()
print(type(instance)) # <class '__main__.Myclass'> instance 是 MyClass 的实例
print(type(Myclass)) # <class 'type'> MyClass 是 type 的实例。
-
用户自定义类
,只不过是type 类
的__call__运算符
重载定义一个类的语句结束时 --> Python 调用 type 的__call__运算符 class MyClass: data = 1 MyClass = type('MyClass', (), {'data': 1})
# 1 定义一个类
class MyClass:
data = 1
# 2 python 真正执行的是
class = type(classname, superclasses, attributedict)
MyClass = type('MyClass', (), {'data': 1})
# type 的__call__运算符重载
# 3 进一步调用
type.__new__(typeclass, classname, superclasses, attributedict)
type.__init__(class, classname, superclasses, attributedict)
-
metaclass
是type
的子类,替换原生的type 的__call__运算符重载机制
通过替换 type 的__call__运算符重载机制,“超越变形”正常的类
一旦你把一个类型 MyClass
的 metaclass
设置成 MyMeta,MyClass 就不再由原生的 type 创建
,而是会调用 MyMeta
的__call__运算符
重载
也就是
class = type(classname, superclasses, attributedict)
# 变为了
class = MyMeta(classname, superclasses, attributedict)
使用
metaclass
的风险
metaclass 会"扭曲变形"正常的 Python 类型模型
如果使用不慎,对于整个代码库造成的风险是不可估量的
> 使用 Python metaclass 很多公司里 需要特例特批
在应用层,metaclass 往往不是很好的选择
迭代器与生成器
可迭代对象
通过 iter()
函数返回一个迭代器
再通过 next()
函数就可以实现遍历
for in 语句将这个过程隐式化
def is_iterable(param):
try:
iter(param)
return True
except TypeError:
return False
params = [
1234,
'1234',
[1, 2, 3, 4],
set([1, 2, 3, 4]),
{1:1, 2:2, 3:3, 4:4},
(1, 2, 3, 4)
]
for param in params:
print('{} is iterable? {}'.format(param, is_iterable(param)))
########## 输出 ##########
# 1234 is iterable? False
# 1234 is iterable? True
# [1, 2, 3, 4] is iterable? True
# {1, 2, 3, 4} is iterable? True
# {1: 1, 2: 2, 3: 3, 4: 4} is iterable? True
# (1, 2, 3, 4) is iterable? True
生成器 (懒人版本的迭代器)
生成器在 Python 2
的版本上,是协程的一种重要实现方式
;
而 Python 3.5 引入 async await
语法糖后,生成器实现协程的方式就已经落后了。
生成器并不会像迭代器一样占用大量内存,只有在被使用的时候才会调用
import os
import psutil
# 显示当前 python 程序占用的内存大小
def show_memory_info(hint):
pid = os.getpid()
p = psutil.Process(pid)
info = p.memory_full_info()
memory = info.uss / 1024. / 1024
print('{} memory used: {} MB'.format(hint, memory))
def test_iterator():
show_memory_info('initing iterator')
list_1 = [i for i in range(100000000)]
show_memory_info('after iterator initiated')
print(sum(list_1))
show_memory_info('after sum called')
def test_generator():
show_memory_info('initing generator')
list_2 = (i for i in range(100000000))
show_memory_info('after generator initiated')
print(sum(list_2))
show_memory_info('after sum called')
%time test_iterator()
%time test_generator()
########## 输出 ##########
initing iterator memory used: 48.9765625 MB
after iterator initiated memory used: 3920.30078125 MB
4999999950000000
after sum called memory used: 3920.3046875 MB
Wall time: 17 s
initing generator memory used: 50.359375 MB
after generator initiated memory used: 50.359375 MB
4999999950000000
after sum called memory used: 50.109375 MB
Wall time: 12.5 s
next()
函数运行的时候,保存了当前的指针
b = (i for i in range(5))
# next() 函数运行的时候,保存了当前的指针
print(2 in b) # true [3, 4, 5] left
print(4 in b) # true [5] left
print(3 in b) # false
给定两个序列,判定第一个是不是第二个的子序列
def is_subsequence(a, b):
b = iter(b) # 转换成迭代器
return all(i in b for i in a)
print(is_subsequence([1, 3, 5], [1, 2, 3, 4, 5]))
print(is_subsequence([1, 4, 3], [1, 2, 3, 4, 5]))
#
########## 输出 ##########
True
False
容器是可迭代对象,可迭代对象调用 iter() 函数,可以得到一个迭代器。
迭代器可以通过 next() 函数来得到下一个元素,从而支持遍历。
生成器是一种特殊的迭代器(注意这个逻辑关系反之不成立)。
使用生成器,你可以写出来更加清晰的代码;合理使用生成器,
可以降低内存占用、优化程序结构、提高程序速度。
生成器在 Python 2 的版本上,是协程的一种重要实现方式;
而 Python 3.5 引入 async await 语法糖后,生成器实现协程的方式就已经落后了。
协程
协程是实现并发编程的一种方式
进程上下文切换占用了大量的资源,线程也顶不住如此巨大的压力
多进程 / 多线程类比为起源于唐朝的藩镇割据,
事件循环,就是宋朝加强的中央集权制
NGINX
+事件循环
事件循环启动一个统一的调度器
,让调度器来决定一个时刻去运行哪个任务
,于是省却了多线程中启动线程、管理线程、同步锁等各种开销
同一时期的 NGINX,在高并发下能保持低资源低消耗高性能,相比 Apache 也支持更多的并发连接
协程的基本使用
一个并发协程的实例(py3.7+
):
async def crawl_page(url): # async 修饰词声明异步函数
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(url))
# 1 同步
async def main(urls): # async 修饰词声明异步函数
for url in urls:
await crawl_page(url)
# 2 (1)异步
async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
# 协程对象,可以通过 asyncio.create_task 来创建任务
for task in tasks:
await task
# 等所有任务都结束
# 2 (2) 异步
async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
await asyncio.gather(*tasks)
# *tasks 解包列表,将列表变成了函数的参数
%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
1 async 修饰词声明异步函数
2 调用异步函数,得到一个协程对象(coroutine object)
举个例子,如果你 print(crawl_page('')),
便会输出<coroutine object crawl_page at 0x000002BEDF141148>,
提示你这是一个 Python 的协程对象,而并不会真正执行这个函数
3 执行协程
3.1 通过 await 来调用
3.2 asyncio.create_task() 来创建任务
3.3 asyncio.run 来触发运行
asyncio.run(main()) 作为主程序的入口函数,在程序运行周期内,只调用一次 asyncio.run
协程运行过程
import asyncio
async def worker_1():
print('worker_1 start')
await asyncio.sleep(1)
print('worker_1 done')
async def worker_2():
print('worker_2 start')
await asyncio.sleep(2)
print('worker_2 done')
async def main():
task1 = asyncio.create_task(worker_1())
task2 = asyncio.create_task(worker_2())
print('before await')
await task1
print('awaited worker_1')
await task2
print('awaited worker_2')
asyncio.run(main())
1 asyncio.run(main()),程序进入 main() 函数,事件循环开启;
2 task1 和 task2 任务被创建,并进入事件循环等待运行;运行到 print,输出 'before await';
3 await task1 执行,用户选择从当前的主任务中切出,事件调度器开始调度 worker_1;
4 worker_1 开始运行,运行 print 输出'worker_1 start',然后运行到 await asyncio.sleep(1), 从当前任务切出,事件调度器开始调度 worker_2;
5 worker_2 开始运行,运行 print 输出 'worker_2 start',然后运行 await asyncio.sleep(2) 从当前任务切出;
6 以上所有事件的运行时间,都应该在 1ms 到 10ms 之间,甚至可能更短,事件调度器从这个时候开始暂停调度;
7 一秒钟后,worker_1 的 sleep 完成,事件调度器将控制权重新传给 task_1,输出 'worker_1 done',task_1 完成任务,从事件循环中退出;
8 await task1 完成,事件调度器将控制器传给主任务,输出 'awaited worker_1',·然后在 await task2 处继续等待;
9 两秒钟后,worker_2 的 sleep 完成,事件调度器将控制权重新传给 task_2,输出 'worker_2 done',task_2 完成任务,从事件循环中退出;
10 主任务输出 'awaited worker_2',协程全任务结束,事件循环结束。
获取协程的异常 (
设置 return_exceptions 参数
)
import asyncio
async def worker_1():
await asyncio.sleep(1)
return 1
async def worker_2():
await asyncio.sleep(2)
return 2 / 0
async def worker_3():
await asyncio.sleep(3)
return 3
async def main():
task_1 = asyncio.create_task(worker_1())
task_2 = asyncio.create_task(worker_2())
task_3 = asyncio.create_task(worker_3())
await asyncio.sleep(2)
task_3.cancel()
# cancel() 请求取消 Task 对象。
# (await 切换回来之后会抛出异常)将安排在下一轮事件循环中抛出一个 CancelledError 异常给被封包的协程
res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
print(res)
asyncio.run(main())
# [1, ZeroDivisionError('division by zero'), CancelledError()]
# worker_1 正常运行,worker_2 运行中出现错误,worker_3 执行时间过长被我们 cancel 掉了,这些信息会全部体现在最终的返回结果 res 中
await asyncio.gather(task_1, task.., return_exceptions=True)
如果不设置这个参数,错误就会完整地 throw 到我们这个执行层,
从而需要 try except 来捕捉,这也就意味着其他还没被执行的任务会被全部取消掉。
为了避免这个局面,我们将 return_exceptions 设置为 True 即可。
Python 中的并发编程——Futures
并发和并行
并发
在 Python 中,并发
并不是指同一时刻有多个操作(thread、task)同时进行。
相反,某个特定的时刻,它只允许有一个操作发生
,只不过线程 / 任务之间会互相切换,直到完成
并发通常应用于 I/O 操作频繁的场景
比如你要从网站上下载多个文件,I/O 操作的时间可能会比 CPU 运行处理的时间长得多。
thread 和 task 两种切换顺序的不同方式分别对应 Python 中并发的两种形式——threading 和 asyncio
1 threading
操作系统知道每个线程的所有信息,因此它会做主在适当的时候做线程切换
容易出现 race condition (可能出现在一个语句执行的过程中(比如 x += 1))
2 asyncio
主程序想要切换任务时,必须得到此任务可以被切换的通知
可以避免刚刚提到的 race condition 的情况
并行
并行,指的是同一时刻、同时发生。
Python 中的 multi-processing
便是这个意思,对于 multi-processing,
你可以简单地这么理解:比如你的电脑是 6 核处理器,那么在运行程序时,就可以强制 Python 开 6 个进程,同时执行,
以加快运行速度
并行则更多应用于 CPU heavy 的场景
比如 MapReduce 中的并行计算,为了加快运行速度,一般会用多台机器、多个处理器来完成。
并发编程之
Futures
线程的数量
根据实际的需求决定
import concurrent.futures
import requests
import threading
import time
def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))
def download_all(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_one, sites)
def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
if __name__ == '__main__':
main()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_one, sites)
创建了一个线程池,总共有 5 个线程可以分配使用
在 download_one() 函数中, requests.get() 方法是线程安全的(thread-safe),因此在多线程的环境下,它也可以安全使用,并不会出现 race condition 的情况
线程的数量
可以自己定义,但是线程数并不是越多越好
因为线程的创建、维护和删除也会有一定的开销。
所以如果你设置的很大,反而可能会导致速度变慢。
需要根据实际的需求做一些测试,来寻找最优的线程数量。
Futures
Python 中的 Futures 模块
,位于 concurrent.futures
和asyncio
中,它们都表示带有延迟的操作
Futures
会将处于等待状态的操作
包裹起来放到队列中,这些操作的状态
随时可以查询,当然,它们的结果
或是异常
,也能够在操作完成后
被获取。
通常来说,作为用户,我们不用考虑如何去创建 Futures,这些 Futures 底层都会帮我们处理好。
我们要做的,实际上是去 schedule 这些 Futures 的执行。
比如,Futures 中的 Executor 类,当我们执行 executor.submit(func) 时,
会安排里面的 func() 函数执行,并返回创建好的 future 实例,以便你之后查询调用。
Futures 中一些常用的函数:
done()
表示相对应的操作
是否完成
True 表示完成,False 表示没有完成
done() 是 non-blocking 的,会立即返回结果
add_done_callback(fn)
表示 Futures 完成后,相对应的参数函数 fn,会被通知并执行调用
result()
表示当 future 完成后,返回其对应的结果或异常
as_completed(fs)
针对给定的 future 迭代器 fs,在其完成后,返回完成后的迭代器
import concurrent.futures
import requests
import time
def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))
def download_all(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
to_do = []
for site in sites:
future = executor.submit(download_one, site)
to_do.append(future)
# 将下载每一个网站的内容都放进 future 队列 to_do,等待执行
for future in concurrent.futures.as_completed(to_do):
# 在 future 完成后,便输出结果
future.result()
def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
if __name__ == '__main__':
main()
# 这里要注意,future 列表中每个 future 完成的顺序,和它在列表中的顺序并不一定完全一致。到底哪个先完成、哪个后完成,取决于系统的调度和每个 future 的执行时间
多线程每次
只能有一个线程执行
Python 的解释器并不是线程安全的, 为了解决由此带来的 race condition 等问题,Python 便引入了全局解释器锁
(也就是同一时刻,只允许一个线程执行)
在执行 I/O
操作时,如果一个线程被 block 了,全局解释器锁便会被释放,从而让另一个线程能够继续执行
Asyncio
多线程的局限性:
多线程运行过程容易被打断,因此有可能出现 race condition 的情况
线程切换本身存在一定的损耗,线程数不能无限增加
因此,如果你的 I/O 操作非常 heavy,多线程很有可能满足不了高效率、高质量的需求
什么是 Asyncio
sync
vs Async
Sync 是指操作一个接一个地执行,下一个操作必须等上一个操作完成后才能执行
Async 是指不同操作间可以相互交替执行,如果其中的某个操作被 block 了,程序并不会等待,而是会找出可执行的操作继续执行
Asyncio 工作原理:
Asyncio
和其他 Python 程序一样,是单线程
的
它只有一个主线程,但是可以进行多个不同的任务(task)
这里的任务,就是特殊的 future 对象
这些不同的任务,被一个叫做 event loop
的对象所控制。
你可以把这里的任务,类比成多线程版本里的多个线程。
假设任务只有 两个状态:
预备状态
(任务目前空闲,但随时待命准备运行);
等待状态
(任务已经运行,但正在等待外部的操作完成,比如 I/O 操作)
event loop
会维护 两个任务列表,分别对应这两种状态;
并且选取预备状态的一个任务
(具体选取哪个任务,和其等待的时间长短、占用的资源等等相关),
使其运行,一直到这个任务把控制权
交还给 event loop
为止。
当任务把控制权
交还给 event loop
时,event loop
会根据其是否完成
,把任务放到预备或等待状态
的列表,
然后遍历等待状态列表的任务
,查看他们是否完成。
如果完成,则将其放到预备状态的列表;
如果未完成,则继续放在等待状态的列表。
当所有任务被重新放置在合适的列表
后,新一轮的循环
又开始了:
event loop
继续从预备状态的列表
中选取一个任务使其执行
… 如此周而复始,直到所有任务完成。
对于 Asyncio 来说,它的任务在运行时不会被外部的一些因素打断,
因此 Asyncio 内的操作不会出现 race condition 的情况,这样你就不需要担心线程安全的问题了
Asyncio 用法
import asyncio
import aiohttp
import time
async def download_one(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print('Read {} from {}'.format(resp.content_length, url))
async def download_all(sites):
tasks = [asyncio.create_task(download_one(site)) for site in sites]
# 对输入的协程 coro 创建一个任务,安排它的执行,并返回此任务对象
await asyncio.gather(*tasks)
def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
asyncio.run(download_all(sites))
# asyncio.run(coro) 是 Asyncio 的 root call,表示拿到 event loop,运行输入的 coro,直到它结束,最后关闭这个 event loop
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
if __name__ == '__main__':
main()
Async 和 await 关键字是 Asyncio 的最新写法,表示这个语句 / 函数是 non-block 的 正好对应前面所讲的 event loop 的概念。
如果任务执行的过程需要等待,则将其放入等待状态的列表中,然后继续执行预备状态列表里的任务
-
Asyncio
的root call
–asyncio.run(coro)
py3.7+ asyncio.run(coro) 表示拿到 event loop,运行输入的 coro,直到它结束,最后关闭这个 event loop 老版本 loop = asyncio.get_event_loop() try: loop.run_until_complete(coro) finally: loop.close()
-
asyncio.create_task(coro)
对输入的协程 coro 创建一个任务,安排它的执行,并返回此任务对象 py3.7+ asyncio.create_task(coro) 老版本 asyncio.ensure_future(coro)
-
asyncio.gather(*aws, loop=None, return_exception=False)
在 event loop 中运行aws序列的所有任务
Asyncio 缺陷
1 Asyncio 软件库的兼容性问题
在 Python3 的早期一直是个大问题
很多情况下,使用 Asyncio 需要特定第三方库的支持,比如前面示例中的 aiohttp。
2 在任务的调度方面有了更大的自主权
写代码时就得更加注意,不然很容易出错
并发模式的选择
Asyncio
I/O bound,并且 I/O 操作很慢,需要很多任务 / 线程协同实现
多线程
I/O bound,但是 I/O 操作很快,只需要有限数量的任务 / 线程
多进程
CPU bound
GIL
(Global Interpreter Lock,即全局解释器锁)
Python 的线程
,的的确确封装了底层的操作系统线程
Linux 系统里是 Pthread(全称为 POSIX Thread)
Windows 系统里是 Windows Thread。
完全受操作系统管理,比如协调何时执行、管理内存资源、管理中断等等
虽然 Python 的线程和 C++ 的线程本质上是不同的抽象,
但它们的底层并没有什么不同
GIL 的定义
GIL,是最流行的 Python 解释器 CPython
中的一个技术术语。
它的意思是全局解释器锁
,本质上是类似操作系统的 Mutex
。
每一个 Python 线程,在 CPython 解释器中
执行时,都会先锁住自己的线程
,阻止别的线程执行
为什么 CPython 需要 GIL
1 设计者为了规避类似于内存管理这样的复杂的竞争风险问题(race condition)
2 CPython 大量使用 C 语言库,但大部分 C 语言库都不是原生线程安全的(线程安全会降低性能和增加复杂度)
>>> import sys
>>> a = []
>>> b = a
>>> sys.getrefcount(a)
3
# a 的引用计数是 3,因为有 a、b 和作为参数传递的 getrefcount 这三个地方
# 如果有两个 Python 线程同时引用了 a,就会造成引用计数的 race condition,引用计数可能最终只增加 1,这样就会造成内存被污染。
# 因为第一个线程结束时,会把引用计数减少 1,这时可能达到条件释放内存,当第二个线程再试图访问 a 时,就找不到有效的内存了
GIL 如何工作
其中,Thread 1、2、3 轮流执行,每一个线程在开始执行时,都会锁住 GIL,
以阻止别的线程执行;同样的,每一个线程执行完一段后,会释放 GIL,以允许别的线程开始利用资源。
check_interval
机制
CPython 解释器会去轮询检查线程 GIL 的锁住情况。
每隔一段时间,Python 解释器就会强制当前线程去释放 GIL,这样别的线程才能有执行的机会
不同版本的 Python 中,check interval
的实现方式并不一样。
早期的 Python 是 100 个 ticks,大致对应了 1000 个 bytecodes;
而 Python 3 以后,interval 是 15 毫秒。
`CPython 解释器`会在一个“合理”的时间范围内释放 GIL 就可以了。
Python的线程安全 (有了GIL还是需要线程锁)
GIL 仅允许一个 Python 线程执行 (安全)
Python 还有 check interval 这样的抢占机制 (不安全)
import threading
n = 0
def foo():
global n
n += 1
threads = []
for i in range(100):
t = threading.Thread(target=foo)
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
print(n)
1 n+=1这一句代码让线程不安全
2 foo 这个函数的 bytecode
>>> import dis
>>> dis.dis(foo)
LOAD_GLOBAL 0 (n)
LOAD_CONST 1 (1)
INPLACE_ADD
STORE_GLOBAL 0 (n)
3 这四行 bytecode 中间都是有可能被打断 (check interval 抢占机制)
4 需要去注意线程安全 (需要 lock 等工具,来确保线程安全)
n = 0
lock = threading.Lock()
def foo():
global n
with lock:
n += 1
如何绕过GIL
GIL 的设计,主要是为了方便 CPython 解释器层面的编写者
,而不是 Python 应用层面的程序员
事实上,很多高性能应用场景都已经有大量的 C 实现的 Python 库
例如 NumPy 的矩阵运算,就都是通过 C 来实现的,并不受 GIL 影响
绕过 GIL 的大致思路:
绕过 CPython,使用 JPython(Java 实现的 Python 解释器)等别的实现;
把关键性能代码,放到别的语言(一般是 C++)中实现
Python内存管理与垃圾回收机制
垃圾回收 --> `引用次数为 0` or `循环引用`
Python 程序在运行的时候,需要在内存中开辟出一块空间
,用于存放运行时产生的临时变量
计算完成后,再将结果输出到永久性存储器中
。
如果数据量过大,内存空间管理不善就很容易出现 OOM(out of memory)
,俗称爆内存
,程序可能被操作系统中止
永不中断的系统 内存管理很重要
不然很容易引发内存泄漏
内存泄漏 (
不中断的系统 容易出现
)
指程序本身没有设计好,导致程序未能释放已不再使用的内存
代码在分配了某段内存后,因为设计错误,失去了对这段内存的控制
,从而造成了内存的浪费
引用计数
Python 中一切皆对象。
看到的一切变量,本质上都是对象的一个 (指针)
# 在函数返回后,`局部变量`的引用会注销掉
import os
import psutil
# 显示当前 python 程序占用的内存大小
def show_memory_info(hint):
pid = os.getpid()
p = psutil.Process(pid)
info = p.memory_full_info()
memory = info.uss / 1024. / 1024
print('{} memory used: {} MB'.format(hint, memory))
def func():
show_memory_info('initial')
a = [i for i in range(10000000)]
show_memory_info('after a created')
func()
show_memory_info('finished')
########## 输出 ##########
initial memory used: 47.19140625 MB
after a created memory used: 433.91015625 MB
finished memory used: 48.109375 MB
数内部声明的列表 a 是局部变量,在函数返回后,局部变量的引用会注销掉;
此时,列表 a 所指代对象的引用数为 0,Python 便会执行垃圾回收,
因此之前占用的大量内存就又回来了。
# 1 将 a 声明为全局变量 即使函数返回后,列表的引用依然存在
def func():
show_memory_info('initial')
global a
a = [i for i in range(10000000)]
show_memory_info('after a created')
func()
show_memory_info('finished')
########## 输出 ##########
initial memory used: 48.88671875 MB
after a created memory used: 433.94921875 MB
finished memory used: 433.94921875 MB
# 2 把生成的列表返回,然后在主程序中接收, 引用依然存在
def func():
show_memory_info('initial')
a = [i for i in derange(10000000)]
show_memory_info('after a created')
return a
a = func()
show_memory_info('finished')
########## 输出 ##########
initial memory used: 47.96484375 MB
after a created memory used: 434.515625 MB
finished memory used: 434.515625 MB
python 内部的引用计数机制
import sys
a = []
print(sys.getrefcount(a))
# 两次引用,一次来自 a,一次来自 getrefcount
def func(a):
print(sys.getrefcount(a))
func(a)
# 四次引用,a,python 的函数调用栈,函数参数,和 getrefcount
print(sys.getrefcount(a))
# 两次引用,一次来自 a,一次来自 getrefcount,函数 func 调用已经不存在
sys.getrefcount()
这个函数,可以查看一个变量的引用次数
getrefcount 本身也会引入一次计数。
sys.getrefcount() 函数并不是统计一个指针,而是要统计一个对象 被引用的次数
手动释放内存
1 先调用 del a 来删除一个对象
2 然后强制调用 gc.collect()
import os
import psutil
import gc
# 显示当前 python 程序占用的内存大小
def show_memory_info(hint):
pid = os.getpid()
p = psutil.Process(pid)
info = p.memory_full_info()
memory = info.uss / 1024. / 1024
print('{} memory used: {} MB'.format(hint, memory))
show_memory_info('initial')
a = [i for i in range(10000000)]
show_memory_info("after a created")
del a
gc.collect()
show_memory_info("finished")
循环引用
import os
import psutil
import gc
# 显示当前 python 程序占用的内存大小
def show_memory_info(hint):
pid = os.getpid()
p = psutil.Process(pid)
info = p.memory_full_info()
memory = info.uss / 1024. / 1024
print('{} memory used: {} MB'.format(hint, memory))
def func():
show_memory_info('initial')
a = [i for i in range(10000000)]
b = [i for i in range(10000000)]
show_memory_info('after a, b created')
a.append(b)
b.append(a)
# a 和 b 互相引用,并且,作为局部变量,在函数 func 调用结束后,a 和 b 这两个指针从程序意义上已经不存在了
# 但是 互相引用,导致它们的引用数都不为 0
func()
show_memory_info('finished')
Python 使用标记清除(mark-sweep)
算法和分代收集(generational)
,来启用针对循环引用
的自动垃圾回收
-
标记清除算法
Python 的垃圾回收实现中,mark-sweep 使用双向链表维护了一个数据结构 只考虑容器类的对象(只有容器类对象才有可能产生循环引用) 对 不可达节点 进行垃圾回收 (对于一个有向图,如果从一个节点出发进行遍历,并标记其经过的所有节点;那么,在遍历结束后,所有没有被标记的节点,我们就称之为不可达节点)
-
分代收集算法
Python 将所有对象分为三代 刚创立的对象是第 0 代, 经过一次垃圾回收后,依然存在的对象,便会依次从上一代挪到下一代 (每一代启动自动垃圾回收的阈值,则是可以单独指定的) 当垃圾回收器中新增对象减去删除对象达到相应的阈值时,就会对这一代对象启动垃圾回收 分代的基本思想 : 新生的对象更有可能被垃圾回收,而存活更久的对象也有更高的概率继续存活。因此,通过这种做法,可以节约不少计算量,从而提高 Python 的性能
调试内存泄漏的问题
objgraph
objgraph
是一个非常好用的可视化引用关系
的包
-
show_refs()
生成清晰的引用关系图
show_backrefs()
代码规范与单元测试
代码风格与规范
code style -> 代码评审(code review) -> 可读性(readability review)
统一编程规范注意点
阅读者的体验 » 编程者的体验 » 机器的体验
1 变量名一定要写完整
# 错误示例
if (a <= 0):
return
elif (a > b):
return
else:
b -= a
# 正确示例
if (transfer_amount <= 0):
raise Exception('...')
elif (transfer_amount > balance):
raise Exception('...')
else:
balance -= transfer_amount
2 Python 代码中的 import 对象,只能是 package 或者 module
避免name collisions
# 错误示例
from mypkg import Obj
from mypkg import my_func
my_func([1, 2, 3])
# 正确示例
import numpy as np
import mypkg
np.array([6, 7, 8])
3 比较对象是否是 None 时,一定要显式地用 is None
# 错误示例
def pay(name, salary=None):
if not salary:
salary = 11
print(name, "is compensated", salary, "dollars")
# 正确示例
def pay(name, salary=None):
if salary is None:
salary = 11
print(name, "is compensated", salary, "dollars")
4 dict.keys() 遍历前生成一个临时的列表
字典容量大的时候导致上面的代码消耗大量内存并且运行缓慢
# 错误示例
adict = {i: i * 2 for i in xrange(10000000)}
for key in adict.keys():
print("{0} = {1}".format(key, adict[key]))
# 正确示例
for key in adict:
5 注意使用 == 和 is 比较的差别
CPython(Python 的 C 实现)的实现中,把 -5 到 256 的整数做成了 singleton,也就是说,这个区间里的数字都会引用同一块内存区域
但是 -5 到 256 之外的数字,会因为你的重新定义而被重新分配内存
# 错误示例
x = 27
y = 27
print(x is y)
x = 721
y = 721
print(x is y)
# 正确示例
x = 27
y = 27
print(x == y)
x = 721
y = 721
print(x == y)
实例
class Model(network.Network):
def fit(self,
x=None,
y=None,
batch_size=None,
epochs=1,
verbose=1,
callbacks=None,
validation_split=0.,
validation_data=None,
shuffle=True,
class_weight=None,
sample_weight=None,
initial_epoch=0,
steps_per_epoch=None,
validation_steps=None,
validation_freq=1,
max_queue_size=10,
workers=1,
use_multiprocessing=False,
**kwargs):
# Legacy support
if 'nb_epoch' in kwargs:
logging.warning(
'The `nb_epoch` argument in `fit` has been renamed `epochs`.')
epochs = kwargs.pop('nb_epoch')
if kwargs:
raise TypeError('Unrecognized keyword arguments: ' + str(kwargs))
self._assert_compile_was_called()
func = self._select_training_loop(x)
return func.fit(
self,
x=x,
y=y,
batch_size=batch_size,
epochs=epochs,
verbose=verbose,
callbacks=callbacks,
validation_split=validation_split,
validation_data=validation_data,
shuffle=shuffle,
class_weight=class_weight,
sample_weight=sample_weight,
initial_epoch=initial_epoch,
steps_per_epoch=steps_per_epoch,
validation_steps=validation_steps,
validation_freq=validation_freq,
max_queue_size=max_queue_size,
workers=workers,
use_multiprocessing=use_multiprocessing)
-
文档规范
1 所有 import 尽量放在开头 2 不要使用 import 一次导入多个模块 3 from module import func 确保 func 在本文件中不会出现命名冲突 from module import func as new_func
-
注释规范
# This is an example to demonstrate how to comment.
# Please note this function must be used carefully.
def solve(x):
if x == 1: # This is only one exception.
return False
return True
-
文档描述
docstring
三个双引号开始、三个双引号结尾
class SpatialDropout2D(Dropout):
"""Spatial 2D version of Dropout.
This version performs the same function as Dropout, however it drops
entire 2D feature maps instead of individual elements. If adjacent pixels
within feature maps are strongly correlated (as is normally the case in
early convolution layers) then regular dropout will not regularize the
activations and will otherwise just result in an effective learning rate
decrease. In this case, SpatialDropout2D will help promote independence
between feature maps and should be used instead.
Arguments:
rate: float between 0 and 1. Fraction of the input units to drop.
data_format: 'channels_first' or 'channels_last'.
In 'channels_first' mode, the channels dimension
(the depth) is at index 1,
in 'channels_last' mode is it at index 3.
It defaults to the `image_data_format` value found in your
Keras config file at `~/.keras/keras.json`.
If you never set it, then it will be "channels_last".
Input shape:
4D tensor with shape:
`(samples, channels, rows, cols)` if data_format='channels_first'
or 4D tensor with shape:
`(samples, rows, cols, channels)` if data_format='channels_last'.
Output shape:
Same as input
References:
- [Efficient Object Localization Using Convolutional
Networks](https://arxiv.org/abs/1411.4280)
"""
def __init__(self, rate, data_format=None, **kwargs):
super(SpatialDropout2D, self).__init__(rate, **kwargs)
if data_format is None:
data_format = K.image_data_format()
if data_format not in {'channels_last', 'channels_first'}:
raise ValueError('data_format must be in '
'{"channels_last", "channels_first"}')
self.data_format = data_format
self.input_spec = InputSpec(ndim=4)
-
命名规范
变量命名 拒绝使用 a b c d 这样毫无意义的单字符 (唯一可以使用单字符的地方是迭代,比如 for i in range(n) 这种,为了精简可以使用) 使用小写,通过下划线串联 (data_format、input_spec) 常量 最好的做法是全部大写,并通过下划线连接,例如:WAIT_TIME、SERVER_ADDRESS、PORT_NUMBER 函数名 使用小写的方式,通过下划线连接 (launch_nuclear_missile()、check_input_validation() 类名 首字母大写,然后合并起来 (class SpatialDropout2D()、class FeatureSet())
…
拆分代码, 提高代码可读性
如何拆分类
class Person:
def __init__(self, name, sex, age, job_title, job_description, company_name):
self.name = name
self.sex = sex
self.age = age
self.job_title = job_title
self.job_description = description
self.company_name = company_name
# job 在其中出现了很多次,而且它们表达的是一个意义实体,这种情况下,我们可以考虑将这部分分解出来,作为单独的类
class Person:
def __init__(self, name, sex, age, job_title, job_description, company_name):
self.name = name
self.sex = sex
self.age = age
self.job = Job(job_title, job_description, company_name)
class Job:
def __init__(self, job_title, job_description, company_name):
self.job_title = job_title
self.job_description = description
self.company_name = company_name
合理使用 assert
什么是 assert?
Python 的 assert 语句
,可以说是一个 debug 的好工具
,主要用于测试一个条件是否满足
对代码做一些 internal 的 self-check
(表示你很确定。这个条件一定会发生或者一定不会发生)
测试的条件满足,则什么也不做,相当于执行了 pass 语句
测试条件不满足,便会抛出异常 AssertionError,并返回具体的错误信息(optional)
assert 语法
assert_stmt ::= "assert" expression ["," expression]
例子
assert 1 == 2, 'assertion is wrong'
# 相当于是
f __debug__:
if not 1==2:
raise AssertionError("assertion is wrong")
# 这里的__debug__是一个常数
# 如果 Python 程序执行时附带了 -O 这个选项,
# 比如Python test.py -O,那么程序中所有的 assert 语句都会失效
# 常数__debug__便为 False;反之__debug__则为 True
这里的__debug__
是一个常数。
如果 Python 程序执行时附带了-O
这个选项,比如Python test.py -O
,那么程序中所有的 assert 语句都会失效
,常数__debug__便为 False
;反之__debug__则为 True
。
不过,需要注意的是,直接对常数__debug__赋值是非法的
,因为它的值在解释器开始运行时就已经决定了,中途无法改变。
assert 的使用
assert 的加入,可以有效预防 bug 的发生,提高程序的健壮性
# 1
def apply_discount(price, discount):
updated_price = price * (1 - discount)
assert 0 <= updated_price <= price, 'price should be greater or equal to 0 and less or equal to original price'
# 检查折后价格,这个值必须大于等于 0、小于等于原来的价格,否则就抛出异常
return updated_price
# 2
def calculate_average_price(total_sales, num_sales):
assert num_sales > 0, 'number of sales should be greater than 0'
# 规定销售数目必须大于 0,这样就可以防止后台计算那些还未开卖的专栏的价格
return total_sales / num_sales
# 3
def func(input):
assert isinstance(input, list), 'input must be type of list'
# 下面的操作都是基于前提:input 必须是 list
if len(input) == 1:
...
elif len(input) == 2:
...
else:
...
assert 并不适用 run-time error 的检查。
比如你试图打开一个文件,但文件不存在;
再或者是你试图从网上下载一个东西,但中途断网了了等等,
这些情况下,还是应该捕捉 错误与异常的内容,进行正确处理
注意 assert 的安全性问题(用户信息相关):
assert 的检查是可以被关闭
def delete_course(user, course_id):
assert user_is_admin(user), 'user must be admin'
assert course_exist(course_id), 'course id must exist'
delete(course_id)
# 加入-O这个选项就会让 assert 失效。因此,一旦 assert 的检查被关闭,user_is_admin() 和 course_exist() 这两个函数便不会被执行
# 导致:
# 任何用户都有权限删除专栏课程;
# 并且,不管这个课程是否存在,他们都可以强行执行删除操作
上下文管理 – 资源管理操作(释放资源)
文件的输入输出
、数据库的连接断开
等,都是很常见的资源管理操作
。
但资源都是有限的,写程序时,我们必须保证这些资源在使用过后得到释放,不然就容易造成资源泄露
,
轻者使得系统处理缓慢,重则会使系统崩溃。
# 打开了太多的文件,占据了太多的资源,造成系统崩溃
for x in range(10000000):
f = open('test.txt', 'w')
f.write('hello')
# with 语句 (上下文管理器)
for x in range(10000000):
with open('test.txt', 'w') as f:
f.write('hello')
# 相当于
f = open('test.txt', 'w')
try:
f.write('hello')
finally:
f.close()
上下文管理器的实现
-
基于类
的上下文管理器保证这个类包括 方法”__enter__()”- 返回需要被管理的资源 方法“__exit__()” - 存在一些释放、清理资源的操作,比如这个例子中的关闭文件等等
# 自定义上下文管理类 FileManager
class FileManager:
def __init__(self, name, mode):
print('calling __init__ method')
self.name = name
self.mode = mode
self.file = None
def __enter__(self):
print('calling __enter__ method')
self.file = open(self.name, self.mode)
return self.file
def __exit__(self, exc_type, exc_val, exc_tb):
# 参数“exc_type, exc_val, exc_tb”,分别表示 exception_type、exception_value 和 traceback
# 有异常抛出,异常的信息就会包含在这三个变量中,传入
print('calling __exit__ method')
if self.file:
self.file.close()
with FileManager('test.txt', 'w') as f:
print('ready to write to file')
f.write('hello world')
## 输出
calling __init__ method
calling __enter__ method
ready to write to file
calling __exit__ method
内部调用过程:
1 方法“__init__()”被调用,程序初始化对象 FileManager,使得文件名(name)是"test.txt",文件模式 (mode) 是'w';
2 方法“__enter__()”被调用,文件“test.txt”以写入的模式被打开,并且返回 FileManager 对象赋予变量 f;
3 字符串“hello world”被写入文件“test.txt”;
4 方法“__exit__()”被调用,负责关闭之前打开的文件流
__exit__
中处理异常:
class Foo:
def __init__(self):
print('__init__ called')
def __enter__(self):
print('__enter__ called')
return self
def __exit__(self, exc_type, exc_value, exc_tb):
print('__exit__ called')
if exc_type:
print(f'exc_type: {exc_type}')
print(f'exc_value: {exc_value}')
print(f'exc_traceback: {exc_tb}')
print('exception handled')
return True
# 如果方法“__exit__()”没有返回 True,异常仍然会被抛出
with Foo() as obj:
raise Exception('exception raised').with_traceback(None)
# 手动抛出了异常“exception raised”
# 输出
__init__ called
__enter__ called
__exit__ called
exc_type: <class 'Exception'>
exc_value: exception raised
exc_traceback: <traceback object at 0x1046036c8>
exception handled
- 基于
生成器
的上下文管理器
使用装饰器 contextlib.contextmanager
,来定义自己所需的基于生成器的上下文管理器
,用以支持 with
语句
from contextlib import contextmanager
@contextmanager
def file_manager(name, mode):
try:
f = open(name, mode)
yield f
finally:
f.close()
# 函数 file_manager() 是一个生成器
with file_manager('test.txt', 'w') as f:
f.write('hello world')
# 执行 with 语句时,便会打开文件,并返回文件对象 f
# 当 with 语句执行完后,finally block 中的关闭文件操作便会执行
注意释放资源
:
方法“__exit__()”或者是 finally block 中释放资源
单元测试 unit test
测试驱动开发(TDD)
单元测试
, 就是编写测试来验证某一个模块的功能正确性
一般会指定输入,验证输出是否符合预期。
单元测试的使用
import unittest
# 将要被测试的排序函数
def sort(arr):
l = len(arr)
for i in range(0, l):
for j in range(i + 1, l):
if arr[i] >= arr[j]:
tmp = arr[i]
arr[i] = arr[j]
arr[j] = tmp
# 编写子类继承 unittest.TestCase
class TestSort(unittest.TestCase):
# 以 test 开头的函数将会被测试
def test_sort(self):
arr = [3, 4, 1, 5, 6]
sort(arr)
# assert 结果跟我们期待的一样
self.assertEqual(arr, [1, 3, 4, 5, 6])
if __name__ == '__main__':
## 如果在 Jupyter 下,请用如下方式运行单元测试
# unittest.main(argv=['first-arg-is-ignored'], exit=False)
## 如果是命令行下运行,则:
unittest.main()
## 输出
# ----------------------------------------------------------------------
# Ran 1 test in 0.000s
#
# OK
1 创建一个类TestSort 继承类‘unittest.TestCase’
2 定义相应的测试函数 test_sort(),进行测试(测试函数要以‘test’开头)
内部,通常使用 assertEqual()、assertTrue()、assertFalse() 和 assertRaise() 等 assert 语句对结果进行验证
3 unittest.main() 运行测试
单元测试的几个技巧
-
mock
(通过一个虚假对象,来代替被测试函数或模块需要的对象)单元测试中最核心重要的一环 mock 替换相关的依赖项(函数)
比如你要测一个后端 API 逻辑的功能性,但一般后端 API 都依赖于数据库、文件系统、网络等。 这样,你就需要通过 mock,来创建一些虚假的数据库层、文件系统层、网络层对象,以便可以简单地对核心后端逻辑单元进行测试
>>> 测试逻辑调用
mock side_effect
mock 的函数,属性是可以根据不同的输入,返回不同的数值,而不只是一个 return_value
import unittest
from unittest.mock import MagicMock
from functools import partial
def side_effect_m2(a):
return a
class A(unittest.TestCase):
def m1(self):
val = self.m2()
self.m3(val)
def m2(self):
...
def mm(self):
...
def m3(self, value):
...
def test_m1(self):
a = A()
# a.m2 = MagicMock()
# a.m2.side_effect = partial(side_effect_m2, "customer_val") // 自定义函数
a.m2 = MagicMock()
a.m2 = MagicMock(return_value="customer_val") # 设置返回值
a.m3 = MagicMock()
a.mm = MagicMock()
a.m1()
self.assertTrue(a.m2.called) # 是否被调用
a.m3.assert_called_with("customer_val") # 是否传入 指定参数 调用
if __name__ == '__main__':
unittest.main()
-
patch (应用 Python 的
decoration 模式
或是context manager
概念,快速自然地 mock 所需的函数)给指定的对象打补丁, 用来断言它们在测试中的期望行为(比如,断言被调用时的参数个数,访问指定的属性等)
from unittest.mock import patch
import example
@patch('example.func')
def test1(x, mock_func):
example.func(x) # Uses patched example.func
mock_func.assert_called_with(x)
# 它还可以被当做一个上下文管理器:
with patch('example.func') as mock_func:
example.func(x) # Uses patched example.func
mock_func.assert_called_with(x)
高质量的单元测试
提高 Test Coverage
+ 模块化
coverage tool 来衡量 Test Coverage
# 模块化
def preprocess(arr):
...
...
return arr
def sort(arr):
...
...
return arr
def postprocess(arr):
...
return arr
def work(self):
arr = preprocess(arr)
arr = sort(arr)
arr = postprocess(arr)
return arr
from unittest.mock import patch
def test_preprocess(self):
...
def test_sort(self):
...
def test_postprocess(self):
...
@patch('%s.preprocess')
@patch('%s.sort')
@patch('%s.postprocess')
def test_work(self,mock_post_process, mock_sort, mock_preprocess):
work()
self.assertTrue(mock_post_process.called)
self.assertTrue(mock_sort.called)
self.assertTrue(mock_preprocess.called)
pdb & cProfile
:调试和性能分析
调试和性能分析的主要场景:
一是代码本身有问题,需要我们找到 root cause 并修复;
二是代码效率有问题,比如过度浪费资源,增加 latency,因此需要我们 debug;
三是在开发新的 feature 时,一般都需要测试
用
pdb
进行代码调试
为 Python 程序提供了 交互式的源代码调试功能,
是命令行版本的 IDE 断点调试器
- 使用步骤
def func():
print('enter func()')
a = 1
b = 2
import pdb
pdb.set_trace()
# 程序已经运行到了“pdb.set_trace()”这行,并且暂停了下来,等待用户输入
func()
c = 3
print(a + b + c)
#
1 打印,语法是"p <expression>": p a p b
2 “n”,表示继续执行代码到下一行
3 ”l“, 表示列举出当前代码行上下的 11 行源代码(箭头标识当前位置)
4 ”s“ 进入了函数 func() 的内部
当执行完函数 func() 内部语句并跳出后,显示”--Return--“
5 命令”b [ ([filename:]lineno | function) [, condition] ]“可以用来设置断点。比方说,我想要在代码中的第 10 行,再加一个断点,那么在 pdb 模式下输入”b 11“即可。
而”c“则表示一直执行程序,直到遇到下一个断点。
cProfile
进行性能分析
profile,是指对代码的每个部分进行动态的分析
比如准确计算出每个模块消耗的时间
计算斐波拉契数列:
def memoize(f):
memo = {}
def helper(x):
if x not in memo:
memo[x] = f(x)
return memo[x]
return helper
# 1 先计算
def fib(n):
if n == 1:
return 1
elif n == 2:
return 1
else:
return fib(n - 1) + fib(n - 2)
def fib_seq(n):
res = []
if n > 1:
res.extend(fib_seq(n - 1))
res.append(fib(n))
return res
# 2 加缓存
@memoize
def fib1(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def fib_seq1(n):
res = []
if n > 0:
res.extend(fib_seq1(n - 1))
res.append(fib(n))
return res
# 用了更多的 容器方法(sum, append...)
def fib2(n):
if n == 1:
ret = [1]
return ret
elif n == 2:
ret = [1, 1]
return ret
else:
ret = []
ret.extend(fib2(n - 1))
ret.append(sum(fib2(n-1)[-2:]))
return ret
# 加缓存
@memoize
def fib3(n):
if n == 1:
ret = [1]
return ret
elif n == 2:
ret = [1, 1]
return ret
else:
ret = []
ret.extend(fib3(n - 1))
ret.append(sum(fib3(n-1)[-2:]))
return ret
import cProfile as cp
cp.run("fib(23)")
cp.run("fib1(23)")
cp.run("fib2(23)")
cp.run("fib3(23)")
# 1 没有缓存
1664082 function calls (4 primitive calls) in 0.701 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.701 0.701 <string>:1(<module>)
1664079/1 0.701 0.000 0.701 0.701 mat.py:1(fib)
1 0.000 0.000 0.701 0.701 {built-in method builtins.exec}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
# 2 给 计算函数 增加缓存
57317 function calls (7 primitive calls) in 0.040 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.040 0.040 <string>:1(<module>)
57312/2 0.040 0.000 0.040 0.020 mat.py:1(fib)
1 0.000 0.000 0.040 0.040 mat.py:25(helper)
1 0.000 0.000 0.040 0.040 mat.py:33(fib1)
1 0.000 0.000 0.040 0.040 {built-in method builtins.exec}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
# 3 调用 sum, append 实现
10485759 function calls (6291457 primitive calls) in 6.082 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 6.082 6.082 <string>:1(<module>)
4194303/1 4.919 0.000 6.082 6.082 mat.py:54(fib2)
1 0.000 0.000 6.082 6.082 {built-in method builtins.exec}
2097151 0.518 0.000 0.518 0.000 {built-in method builtins.sum}
2097151 0.286 0.000 0.286 0.000 {method 'append' of 'list' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
2097151 0.360 0.000 0.360 0.000 {method 'extend' of 'list' objects}
# 4 给 计算函数 增加缓存(从小到大依次增加缓存) (V++++)
131 function calls (68 primitive calls) in 0.000 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.000 0.000 <string>:1(<module>)
43/1 0.000 0.000 0.000 0.000 mat.py:25(helper)
22/1 0.000 0.000 0.000 0.000 mat.py:71(fib3)
1 0.000 0.000 0.000 0.000 {built-in method builtins.exec}
21 0.000 0.000 0.000 0.000 {built-in method builtins.sum}
21 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
21 0.000 0.000 0.000 0.000 {method 'extend' of 'list' objects}
性能指标:
ncalls 是指相应代码 / 函数被调用的次数;
tottime 是指对应代码 / 函数总共执行所需要的时间(注意,并不包括它调用的其他代码 / 函数的执行时间)
tottime percall 就是上述两者相除的结果,也就是tottime / ncalls;
cumtime 是指对应代码 / 函数总共执行所需要的时间,这里包括了它调用的其他代码 / 函数的执行时间;
cumtime percall 则是 cumtime 和 ncalls 相除的平均结果
使用方式:
python3 -m cProfile xxx.py
import cProfile
# def fib(n)
# def fib_seq(n):
cProfile.run('fib_seq(30)')