基本概念
确认python版本
# 命令行
python --version
# 程序内
import sys
print(sys.version_info)
print(sys.version)
# 2to3 与 six 等工具可以把代码轻松地适配到 Python 3及其后续版本上面
遵循 PEP 8
-
空白
1 使用 space(空格)来表示缩进 2 每行的字符数不应超过 79 3 对于占据多行的长表达式来说,除了首行之外的其余各行都应该在通常的缩进级别之上再加 4 个空格 4 函数与类之间应该用 两个空行 隔开 5 在同一个类中,各方法之间应该用 一个空行 隔开 6 在使用下标来获取列表元素、调用函数或给关键字参数赋值的时候,不要在两旁添加空格 7 为变量赋值的时候,赋值符号的左侧和右侧应该各自写上一个空格,而且只写就好
-
命名
1 函数、变量及属性应该用小写字母来拼写,各单词之间以下划线相连 lowercase_underscore 2 受保护的实例属性,应该以单个下划线开头,例如,_leading_underscore 3 私有的实例属性,应该以两个下划线开头,例如,__double_leading_underscore 4 类与异常,应该以每个单词首字母均大写的形式来命名,例如,CapitalizedWord 5 模块级别的常量,应该全部采用大写字母来拼写,各单词之间以下划线相连,例如,ALL_CAPS 6 类中的实例方法(instance method),应该把首个参数命名为 self,以表示该对象自身 7 类方法(class method)的首个参数,应该命名为 cls,以表示该类自身
-
表达式和语句
1 采用内联形式的否定词,而不要把否定词放在整个表达式的前面,例如,应该写 if a is not b 而不是 if not a is b 2 不要通过检测长度的办法(如 if len(somelist) == 0)来判断 somelist 是否为 []或 '' 等空值,而是应该采用 if not somelist 这种写法来判断,它会假定:空值将自动评估为 False 3 检测 somelist 是否为 [1] 或 'hi' 等非空值时,也应如此,if somelist 语句默认会把非空的值判断为 True 4 不要编写单行的 if 语句、for 循环、while 循环及 except 复合语句,而是应该把这些语句分成多行来书写,以示清晰 5 import 语句应该总是放在文件开头 6 引入模块的时候,总是应该使用绝对名称,而不应该根据当前模块的路径来使用相对名称。例如,引入 bar 包中的 foo 模块时,应该完整地写出 from bar import foo,而不应该简写为 import foo 7 import 语句应该按顺序划分成三个部分,分别表示标准库模块、第三方模块、自用模块
了解 bytes
、str
与 unicode
的区别
Python 3 有两种表示字符序列的类型
:bytes 和 str。
bytes
的实例包含原始的 8 位值
str
的实例包含 Unicode 字符
程序的核心部分应该使用 Unicode 字符类型(也就是 Python 3 中的 str、Python 2 中的 unicode),而且不要对字符编码做任何假设
def to_str(bytes_or_str):
if isinstance(bytes_to_str, bytes):
value = bytes_or_str.decode('utf-8')
else:
value = bytes_to_str
return value
def to_bytes(bytes_or_str):
if isinstance(bytes_to_str, str):
value = bytes_or_str.encode('utf-8')
else:
value = bytes_to_str
return value
用列表推导来取代 map
和 filter
a = [1, 2, 3, 4, 5, 6]
squares = [x**2 for x in a]
# squares = map(lambda x: x**2, a)
用生成器表达式来改写数据量较大的列表推导
当输入的数据量较大时,列表推导可能会因为占用太多内存而出问题。
由生成器表达式所返回的迭代器,可以逐次产生输出值,从而避免了内存用量问题
把某个生成器表达式所返回的迭代器,放在另一个生成器表达式的 for 子表达式中,即可将二者组合起来
it = (len(x) for x in open('/tmp/xxx.txt'))
roots = ((x, x**2) for x in it)
# 这种连锁生成器表达式,可以迅速在 Python 中执行
# 面这种连锁生成器表达式,可以迅速在 Python 中执行
尽量用 enumerate
取代 range
enumerate 函数提供了一种精简的写法,可以在遍历迭代器时获知每个元素的索引
用 zip
函数同时遍历两个迭代器
Python 3 中的 zip 相当于生成器,会在遍历过程中逐次产生元组
如果提供的迭代器长度不等,那么 zip 就会自动提前终止
itertools
内置模块中的 zip_longest
函数可以平行地遍历多个迭代器,而不用在乎
它们的长度是否相等
a = [1 , 2, 3, ]
b = ["a", "ab", "abc", "apple"]
# 如果提供的迭代器长度不等,那么 zip 就会自动提前终止
for name, count in zip(b, a):
print(name, count)
# a 1
# ab 2
# abc 3
# `itertools `内置模块中的 `zip_longest` 函数可以平行地遍历多个迭代器,而不用在乎
# 它们的长度是否相等
from itertools import zip_longest
for name, count in zip_longest(b, a):
print(name, count)
# a 1
# ab 2
# abc 3
# apple None
需要注意 try ... else ...
, for ... else ...
代码块
1 Python 有 种 特 殊 语 法, 可 在 for 及 while 循环的内部语句块之后紧跟一个else 块。
2 只有当整个循环主体都没遇到 break 语句时,循环后面的 else 块才会执行。
3 不要在循环后面使用 else 块,因为这种写法既不直观,又容易引人误解。
合理利用 try/except/else/finally
结构中的每个代码块
1 无论 try 块是否发生异常,都可利用 try/finally 复合语句中的 finally 块来执行清理工作。
2 else 块可以用来缩减 try 块中的代码量,并把没有发生异常时所要执行的语句与try/except 代码块隔开。
3 顺利运行 try 块后,若想使某些操作能在 finally 块的清理代码之前执行,则可将这些操作写到 else 块中
####
函数
尽量用异常来表示特殊情况,而不要返回 None
1 用 None 这个返回值来表示特殊意义的函数,很容易使调用者犯错,因为 None
和 0 及空字符串之类的值,在条件表达式里都会评估为 False。
2 函数在遇到特殊情况时,应该抛出异常,而不要返回 None。调用者看到该函数
的文档中所描述的异常之后,应该就会编写相应的代码来处理它们了
考虑用生成器
来改写直接返回列表的函数
1 使用生成器比把收集到的结果放入列表里返回给调用者更加清晰。
2 由生成器函数所返回的那个迭代器,可以把生成器函数体中,传给 yield 表达式
的那些值,逐次产生出来。
3 无论输入量有多大,生成器都能产生一系列输出,因为这些输入量和输出量,
都不会影响它在执行时所耗的内存。
元类 metaclass
可以在每次定义具体的类的时候, 提供独特的行为
注意要遵循 最小惊讶原则(rule of least surprise
)
用纯属性取代 get 和 set (设置,读取属性)
# bad
class OldRegistor(object):
def __init__(self, ohms):
self._ohms = ohms
def get_ohms(self):
return self._ohms
def set_ohms(self, ohms):
self._obms = ohms
r0 = OldRegistor(50e3)
r0.set_ohms(10e3)
r0.set_ohms(r0.get_ohms() + 5e3)
# good
class Registor(object):
def __init__(self, ohms):
self.ohms = ohms
self.voltage = 0
self.current = 0
r1 = Registior(50e3)
r1.ohms = 10e3
r1.ohms += 5e3
# 设置特殊行为(属性间的关联行为)
class VoltageRegistor(Registor):
def __init__(self, ohms):
super().__init__(ohms)
self._voltage = 0
@property
def voltage(self):
return self._voltage
@voltage.setter
def voltage(self, voltage):
self._voltage = voltage
self.current = self._voltage / self.ohms
r2 = VoltageRegistor(1e3)
r2.voltage = 10 # 执行voltage.setter, self.current = 0.01
# 验证属性的类型和数值
class BoundedRegistor(Registor):
def __int__(self, ohms):
super(BoundedRegistor, self).__int__(ohms)
@property
def ohms(self):
return self._ohms
@ohms.setter
def ohms(self, ohms):
if ohms <= 0:
raise ValueError(f"{ohms} ohms must be > 0")
self._ohms = ohms
r3 = BoundedRegistor(1e3)
# r3.ohms = 0
# raise ValueError(f"{ohms} ohms must be > 0")
r3.ohms = 1
print(r3.__dict__)
# 防止修改父类的属性
@ohms.setter
def ohms(self, ohms):
if hasattr(self, '_ohms'):
raise AttributeError("can't set attribute")
self._ohms = ohms
r4 = BoundedRegistor(1e3)
r4.ohms = 0
# raise attribute error
# property 给现有的实例 添加新的功能
from datetime import timedelta, datetime
class Bucket(object):
def __int__(self, period):
self.period_delta = timedelta(seconds=period)
self.reset_time = datetime.now()
self.max_quota = 0
self.quota_consumed = 0
def __repr__(self):
return f'Bucket(max_quota={self.max_quota}, quota_consumed={self.quota_consumed})'
@property
def quota(self):
return self.max_quota - self.quota_consumed
@quota.setter
def quota(self, amount):
delta = self.max_quota - amount
if delta == 0:
self.quota_consumed = 0
self.max_quota = 0
elif delta < 0:
assert self.quota_consumed == 0
self.max_quota = amount
else:
assert self.max_quota >= self.quota_consumed
self.quota_consumed += delta
@property
方法执行速度需要快一些, 缓慢或者复杂的工作应该放在普通的方法里面
描述符改写需要重复使用的 @property
方法
python 内置的 @property
修饰器, 不方便复用
@property
def writing_grade(self):
...
@writing_grade.setter
def writing_grade(self):
...
@property
def math_grade(self):
...
@math_grade.setter
def math_grade(self):
...
Python 会对访问操作进行一定的转译 根据描述器协议
基于 object类的 __getattribute__
描述符可以把同一套逻辑运用在类中的不同属性上
class Grade(object):
def __get__(self, instance, owner):
...
def __set__(self, instance, value):
...
class Exam(object):
math_grade = Grade()
writing_grade = Grade()
science_grade = Grade()
exam = Exam()
# 属性赋值
exam.writing_grade = 40
Exam.__dict__["writing_grade"].__set__(exam, 40)
# 获取属性
print(exam.writing_grade)
print(Exam.__dict__["writing_grade"].__get__(exam, Exam))
# 如果exam实例没有 writing_grade, python 会转向 Exam
from weakref import WeakKeyDictionary
class Grade(object):
def __init__(self):
# self.values = {} # 程序生命周期内, values都会保存一份 exam实例的引用 (内存泄漏)
self.values = WeakKeyDictionary() # 如果没有强引用, 会把这key 删除
def __get__(self, instance, owner):
if instance is None:
return self
return self.values.get(instance)
def __set__(self, instance, value):
if not (0 <= value <= 100):
raise ValueError("grade must be between 0 and 100")
# print(self.__dict__)
if value <= 60:
value = "不合格"
elif 60 < value <= 85:
value = "不错"
else:
value = "优秀"
self.values[instance] = value
class Exam(object):
math_grade = Grade()
writing_grade = Grade()
science_grade = Grade()
first_exam = Exam()
# second_exam = Exam()
first_exam.writing_grade = 82
first_exam.science_grade = 56
first_exam.math_grade = 98
# second_exam.writing_grade = 75
print(f"writing_grade: {first_exam.writing_grade}")
print(f"science_grade: {first_exam.science_grade}")
print(f"math_grade: {first_exam.math_grade}")
writing_grade: 不错
science_grade: 不合格
math_grade: 优秀
用 __getattr__
, __getattribute__
和 __setattr__
实现 按需生成的属性
class LazyDB(object):
def __init__(self):
self.exists = 5
def __getattr__(self, name):
value = f"Value for {name}"
setattr(self, name, value)
return value
def __setattr__(self, key, value):
# 只要是实例的赋值, 都会触发 __setattr__
super(LazyDB, self).__setattr__(key, value)
def __getattribute__(self, name):
# 只要是实例属性获取, 都会触发 __getattribute__
return super(LazyDB, self).__getattribute__(name)
# 查询特定的数据
# specified_data = super(LazyDB, self).__getattribute__("specified_data")
# return specified_data["name"]
data = LazyDB()
# data.a = 1
print(data.__dict__)
print(data.__dict__)
# print(data.__dict__) # {'exists': 5}
# print(data.foo) # Value for foo
# print(data.__dict__) # {'exists': 5, 'foo': 'Value for foo'}
用元类验证子类
# 生成对象前 验证子类的定义
class Meta(type):
def __new__(meta, name, bases, class_dict):
print(meta, name, bases, class_dict)
if class_dict["stuff"] <= 3:
raise ValueError("stuff should be over 3")
return type.__new__(meta, name, bases, class_dict)
class MyClass(object, metaclass=Meta):
stuff = 123
def foo(self):
pass
用元类注解类的属性
在类定义之前, 率先修改该类的属性
class Meta(type):
def __new__(meta, name, bases, class_dict):
for key, value in class_dict.items():
if isinstance(value, Field):
value.name = key
value.internal_name = "_" + key
cls = type.__new__(meta, name, bases, class_dict)
return cls
class DatabaseRow(object, metaclass=Meta):
...
class Field(object):
def __init__(self, name):
self.name = name
self.internal_name = "_" + self.name
def __get__(self, instance, instance_type):
if instance is None:
return self
return getattr(instance, self.internal_name, '')
def __set__(self, instance, value):
setattr(instance, self.internal_name, value)
class Customer(object):
first_name = Field("first_name")
last_name = Field("last_name")
prefix = Field("prefix")
suffix = Field("suffix")
class BetterField(object):
def __init__(self):
self.name = None
self.internal_name = None
class BetterCustomer(DatabaseRow):
first_name = BetterField()
last_name = BetterField()
prefix = BetterField()
suffix = BetterField()
并发与并行
使用subprocess
模块来管理子进程
# 子进程独立于父进程(python解释器)运行
#
import subprocess
#
# proc = subprocess.Popen(
# ["echo", "hello world"],
# stdout=subprocess.PIPE
# )
# out, err = proc.communicate()
#
# print(out.decode('utf-8'), err)
# 管理输入流和输出流
import subprocess
import os
def run_md5(input_stdin):
proc = subprocess.Popen(
['md5'],
stdin=input_stdin,
stdout=subprocess.PIPE,
)
return proc
def run_openssl(data):
env = os.environ.copy()
env["password"] = b'xe24U\n\xd0Q135\x11'
proc = subprocess.Popen(
['openssl', 'enc', '-des3', '-pass', 'env:password'],
env=env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
proc.stdin.write(data)
proc.stdin.flush() # ensure child get input
return proc
input_procs = []
hash_procs = []
for _ in range(3):
data = os.urandom(10)
proc = run_openssl(data)
input_procs.append(proc)
hash_proc = run_md5(proc.stdout)
hash_procs.append(hash_proc)
for proc in input_procs:
proc.communicate()
for proc in hash_procs:
out, err = proc.communicate(time_out=3) # 增加超时
print(out.strip())
使用线程执行阻塞式的I/O, 但是不要用来平行计算
CPython 分两步运行
1 文本形式的源代码 --> 解析并编译成字节码
2 用一种基于栈的解释器运行 这份字节码
在字节码解释器运行时, 通过GIL来保证协调一致性
C++
和 Java
等语言可以同时执行多条线程, 可以充分利用多核心
python
在同一时刻只能有一个线程向前执行
from threading import Thread
import time
def factorize(number):
for i in range(1, number + 1):
if number % i == 0:
yield i
numbers = [2139079, 1232344, 2132332, 21321312]
start = time.time()
for number in numbers:
list(factorize(number))
# 2.261014461517334 串行
print(f"{time.time() - start}")
class FactorizeThread(Thread):
def __init__(self, number):
super(FactorizeThread, self).__init__()
self.number = number
def run(self):
self.factors = list(factorize(self.number))
start_ = time.time()
threads = []
for number in numbers:
thread = FactorizeThread(number)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print(f"{time.time() - start}") # 4.63801383972168
# 多线程时间更慢
因为 GIL (全局解释器锁)的限制, 多条python线程不能再多个CPU核心上 平行的执行
Lock
防止线程间数据竞争
GIL
不能保证 线程间的数据安全 (线程可能中断,无法保证数据的一致性)
from threading import Thread, Lock
class LockingCounter(object):
def __init__(self):
self.lock = Lock()
self.count = 0
def increment(self, offset):
with self.lock:
self.count += offset
def run_threads(func, how_many, counter):
threads = []
for i in range(5):
args = (i, how_many, counter)
thread = Thread(target=func, args=args)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
def woker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)
how_many = 10 ** 5
counter = LockingCounter()
run_threads(woker, how_many, counter)
print(f"counter should be {5 * how_many}, got {counter.count}")
# counter should be 500000, got 500000
Queue
协调线程间的工作
内置的queue.Queue
get
会持续阻塞, 知道有新的数据加入
# 自定义 Queue 重写 __iter__
from queue import Queue
from threading import Thread
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return
yield item
finally:
self.task_done()
class StoppableWorker(Thread):
def __init__(self, func, in_queue: ClosableQueue, out_queue):
super(StoppableWorker, self).__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
threads = [
# StoppableWorker(...),
# ...
]
in_queue = ClosableQueue()
for thread in threads:
thread.start()
for _ in range(100):
in_queue.put(object())
in_queue.close() # put signal
in_queue.join()
# ...
协程并发运行多个函数
协程的实现方式实际是一种对生成器
的扩展
启动生成器协程的开销
和调用函数的开销相仿
处于活跃状态的协程, 在耗尽前, 只会占用不到1KB的内存
工作原理:
def my_coroutine():
while 1:
received = yield
print(f"received: {received}")
it = my_coroutine()
next(it) # start
# it.send(None) # start
# can't send non-None value to a just-started generator
it.send("first")
it.send("second")
内置模块
考虑contextlib
和 with
改写可复用的 try .. finally..
代码
定义一种情景管理器, 临时改变上下文代码块
的条件
from contextlib import contextmanager
import logging
def my_function():
logging.debug("debug")
logging.error("error")
logging.debug("debug")
@contextmanager
def debug_logging(level):
logger = logging.getLogger()
old_level = logger.getEffectiveLevel()
logger.setLevel(level)
try:
yield
finally:
logger.setLevel(old_level)
my_function() # ERROR:root:error
with debug_logging(logging.DEBUG):
my_function()
# DEBUG:root:debug
# ERROR:root:error
# DEBUG:root:debug
my_function()
# ERROR:root:error
pickle操作并不可靠
pickle序列化的数据是不安全
的数据格式
1 经过pickle处理的数据(原始的python对象), 如果混入恶意的信息, 反序列化的时候可能对程序有损害
2 json的序列化(只是简单的描述信息)
不信任的通讯, 需要使用 JSON
使用singledispatch
实现泛型
len
, type
, print
等函数可以对任意的数据类型进行处理.
使用singledispatch
来实现
from functools import singledispatch, wraps
def check_type(func):
@wraps(func)
def wrapper(*args):
first_arg = args[0]
type_first_arg = type(first_arg)
# print(f"first_arg is type: {type_first_arg}")
for item in args[1:]:
if not isinstance(item, type_first_arg):
raise ValueError(f"all value should be with same type")
return func(*args)
return wrapper
@singledispatch
def join_all(*args):
raise TypeError
@join_all.register(str)
@check_type
def _(*args):
return ''.join(args)
@join_all.register(int)
@check_type
def _(*args):
s = 0
for i in args:
s += i
return s
@join_all.register(list)
@check_type
def _(*args):
s = []
for i in args:
s.extend(i)
return s
@join_all.register(dict)
@check_type
def _(*args):
s = {}
for i in args:
s.update(i)
return s
@join_all.register(set)
@check_type
def _(*args):
s = set()
for i in args:
s.update(i)
return s
@join_all.register(tuple)
@check_type
def _(*args):
s = tuple()
for i in args:
s += i
return s
print(join_all(1, 2, 3))
print(join_all("a", "b", "c"))
print(join_all([1,2], [2,3], [34]))
print(join_all({"a":1}, {"b":2}, {"a":3}))
print(join_all((1,2), (2,3), (3,4)))
print(join_all({1,2}, {2,3}, {2,3}))
print(join_all({1,2}, {2,3}, {2,3}))