Day37 - 迭代器与生成器详解

详细讲解

1. 迭代器基础

1.1 可迭代对象与迭代器

可迭代对象(Iterable):可以使用 for 循环遍历的对象,如 list, dict, str, file 等。

迭代器(Iterator):实现了 __iter__()__next__() 方法的对象。

# 可迭代对象
numbers = [1, 2, 3]  # list 是可迭代对象

# 获取迭代器
iterator = iter(numbers)
print(f"迭代器: {iterator}")

# 手动迭代
print(next(iterator))  # 1
print(next(iterator))  # 2
print(next(iterator))  # 3
print(next(iterator))  # StopIteration 异常

1.2 迭代器协议

# 迭代器必须实现的两个方法
class MyIterator:
    def __init__(self, data):
        self.data = data
        self.index = 0
    
    def __iter__(self):
        """返回迭代器本身"""
        return self
    
    def __next__(self):
        """返回下一个元素"""
        if self.index >= len(self.data):
            raise StopIteration
        value = self.data[self.index]
        self.index += 1
        return value

# 使用
it = MyIterator([10, 20, 30])
for item in it:
    print(item)

1.3 iter() 和 next() 内置函数

# iter() 可以接受两个参数:可迭代对象 + 哨兵值
# 这样会在每次 next() 时调用可迭代对象,直到返回值等于哨兵值

# 示例:读取行直到空行
with open('lines.txt', 'r') as f:
    for line in iter(f.readline, ''):
        print(line.strip())

# next() 可以指定默认值
numbers = [1, 2, 3]
it = iter(numbers)
print(next(it, '默认值'))  # 1
print(next(it, '默认值'))  # 2
print(next(it, '默认值'))  # 3
print(next(it, '默认值'))  # 默认值(不会抛出 StopIteration)

2. 生成器基础

生成器是一种特殊的迭代器,使用 yield 关键字来产生值,而不是 return

2.1 生成器函数

def countdown(n):
    """倒计时生成器"""
    print(f"开始倒计时从 {n}")
    while n > 0:
        yield n  # 暂停,返回值
        n -= 1
    print("倒计时结束")

# 创建生成器对象(函数不会立即执行)
gen = countdown(5)
print(f"生成器对象: {gen}")

# 迭代获取值
print(next(gen))  # 5
print(next(gen))  # 4
print(next(gen))  # 3

# 也可以用 for 循环
for num in countdown(3):
    print(f"倒计时: {num}")

2.2 生成器表达式

# 类似列表推导式,但使用圆括号(惰性求值)
gen = (x ** 2 for x in range(10))
print(gen)  # <generator object>

# 惰性求值 - 每次只计算一个值
for value in gen:
    print(value)

# 转换为列表
squares = list(x ** 2 for x in range(5))
print(squares)  # [0, 1, 4, 9, 16]

# 在函数参数中使用
print(sum(x ** 2 for x in range(5)))  # 30
print(max(x for x in range(10) if x % 2 == 0))  # 8

2.3 生成器 vs 列表

import sys

# 列表 - 一次性生成所有元素
list_comp = [x ** 2 for x in range(10000)]
print(f"列表大小: {sys.getsizeof(list_comp)} 字节")  # ~80KB

# 生成器 - 惰性求值,节省内存
gen_exp = (x ** 2 for x in range(10000))
print(f"生成器大小: {sys.getsizeof(gen_exp)} 字节")  # ~200 bytes

# 对比:生成 1000 万个数字的内存使用
def using_list():
    return [x for x in range(10000000)]

def using_generator():
    return (x for x in range(10000000))

import sys
print(f"列表: {sys.getsizeof(using_list())} bytes")
print(f"生成器: {sys.getsizeof(using_generator())} bytes")
# 列表: 约 80MB
# 生成器: 约 200 bytes

3. 生成器的实际应用

3.1 处理大数据文件

def read_large_file(filepath, chunk_size=1024):
    """分块读取大文件(节省内存)"""
    with open(filepath, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

# 使用示例 - 处理大日志文件
def find_errors(log_file):
    """从大日志文件中查找错误"""
    error_count = 0
    for chunk in read_large_file(log_file):
        error_count += chunk.count(b'ERROR')
    return error_count

# 使用
for chunk in read_large_file('huge_file.dat'):
    process(chunk)  # 每块 1KB

3.2 无限序列

def fibonacci():
    """无限斐波那契数列生成器"""
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

# 获取前 10 个斐波那契数
fib = fibonacci()
for _ in range(10):
    print(next(fib), end=' ')  # 0 1 1 2 3 5 8 13 21 34

# 生成无限自然数
def natural_numbers():
    n = 1
    while True:
        yield n
        n += 1

# 累加前 100 个自然数的平方
result = sum(x**2 for x in range(1, 101))

3.3 管道处理

def numbers():
    """生成数字"""
    for i in range(1, 6):
        yield i

def square(nums):
    """平方"""
    for n in nums:
        yield n ** 2

def filter_odd(nums):
    """过滤奇数"""
    for n in nums:
        if n % 2 == 0:
            yield n

def pipeline():
    """数据处理管道"""
    return filter_odd(square(numbers()))

# 使用管道
for num in pipeline():
    print(num)  # 4, 16

4. yield from 语句

yield from 用于委托给子生成器,简化嵌套生成器的写法。

def inner():
    yield 1
    yield 2
    yield 3

# 不用 yield from
def without_yield_from():
    for item in inner():
        yield item

# 使用 yield from(更简洁)
def with_yield_from():
    yield from inner()

# 验证
print(list(without_yield_from()))  # [1, 2, 3]
print(list(with_yield_from()))     # [1, 2, 3]

# yield from 的实际应用 - 扁平化
def flatten(nested_list):
    """扁平化嵌套列表"""
    for item in nested_list:
        if isinstance(item, list):
            yield from flatten(item)
        else:
            yield item

# 使用
nested = [1, [2, [3, 4], 5], 6]
print(list(flatten(nested)))  # [1, 2, 3, 4, 5, 6]

5. 生成器的方法(协程通信)

生成器有以下方法,可以向正在运行的生成器发送值:

def coro():
    """协程生成器"""
    while True:
        received = yield
        print(f"收到: {received}")

c = coro()
next(c)  # 启动生成器到第一个 yield
c.send('Hello')  # 发送值
c.send('World')  # 发送另一个值
c.close()  # 关闭生成器
def counter(initial=0):
    """计数器协程"""
    count = initial
    while True:
        increment = yield count
        if increment is None:
            increment = 1
        count += increment

# 使用
c = counter(10)
print(next(c))   # 10
print(c.send(5))  # 15
print(c.send(5))  # 20
print(next(c))    # 21(send(None) 等价于 next())

6. itertools 模块

import itertools

# count() - 无限计数
for i in itertools.count(10, 2):  # 从 10 开始,步长 2
    if i > 20:
        break
    print(i, end=' ')  # 10 12 14 16 18 20

# cycle() - 无限循环
counter = 0
for item in itertools.cycle(['A', 'B', 'C']):
    print(item, end=' ')
    counter += 1
    if counter > 8:
        break  # A B C A B C A B C

# repeat() - 重复
for i in itertools.repeat(5, 3):  # 重复 5,3 次
    print(i, end=' ')  # 5 5 5

# chain() - 连接多个迭代器
print(list(itertools.chain([1, 2], ['a', 'b'], [3, 4])))
# [1, 2, 'a', 'b', 3, 4]

# islice() - 切片迭代器
print(list(itertools.islice(range(10), 2, 8, 2)))  # [2, 4, 6]

# compress() - 按条件筛选
data = ['A', 'B', 'C', 'D', 'E']
selector = [1, 0, 1, 0, 1]
print(list(itertools.compress(data, selector)))  # ['A', 'C', 'E']

# takewhile / dropwhile
print(list(itertools.takewhile(lambda x: x < 5, [1, 4, 6, 4, 1])))  # [1, 4]
print(list(itertools.dropwhile(lambda x: x < 5, [1, 4, 6, 4, 1])))  # [6, 4, 1]

# groupby() - 分组
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4)]
for key, group in itertools.groupby(data, key=lambda x: x[0]):
    print(f"{key}: {list(group)}")

7. 生成器在 Python 3.11+ 的改进

# Python 3.11+ 中生成器的类型注解改进
from typing import Generator

def my_generator(n: int) -> Generator[int, None, None]:
    for i in range(n):
        yield i

# 或者使用更简洁的类型别名
from collections.abc import Iterator

def my_generator2(n: int) -> Iterator[int]:
    for i in range(n):
        yield i

背诵版

核心速查

┌─────────────────────────────────────────────────────────────┐ │ 迭代器 vs 生成器 │ ├─────────────────────────────────────────────────────────────┤ │ 迭代器:实现了 __iter__() 和 __next__() │ │ 生成器:使用 yield 的特殊迭代器 │ ├─────────────────────────────────────────────────────────────┤ │ iter(obj) → 获取迭代器 │ │ next(it) → 获取下一个元素 │ │ next(it, def) → 提供默认值避免 StopIteration │ │ yield from → 委托给子生成器 │ └─────────────────────────────────────────────────────────────┘

生成器特点

生成器 ───── 惰性求值 ───── 节省内存 │ ├── yield ─ 暂停函数,返回值 ├── yield from ─ 委托给子生成器 └── send() ─ 向生成器发送值

itertools 常用函数

函数 说明 示例
count() 无限计数 count(10, 2)
cycle() 无限循环 cycle('AB')
repeat() 重复 repeat(5, 3)
chain() 连接 chain([1], [2])
islice() 切片 islice(range(10), 5)
takewhile() 条件取 takewhile(cond, iter)
dropwhile() 条件丢 dropwhile(cond, iter)
groupby() 分组 groupby(data, key)

考前记忆

面试重点

  1. 迭代器协议

    • __iter__() 返回迭代器本身
    • __next__() 返回下一个元素,结束时抛出 StopIteration
  2. 生成器的特点

    • 惰性求值,节省内存
    • 使用 yield 暂停函数
    • 函数调用返回生成器对象
  3. yield vs return

    • return 结束函数并返回值
    • yield 暂停函数,保存状态,下次调用继续执行
  4. 生成器表达式 vs 列表推导式

    • 列表推导式:[x**2 for x in range(10)]
    • 生成器表达式:(x**2 for x in range(10))
    • 生成器惰性求值,列表立即求值
  5. yield from 的作用

    • 委托给子生成器
    • 简化嵌套循环

记忆口诀

迭代器有 iter 和 next, StopIteration 结束收。 生成器用 yield 惰性求, 省内存管道解耦不用愁。

测试题

选择题

1. 以下哪个是生成器函数?

# A.
def func1():
    return [1, 2, 3]

# B.
def func2():
    return (x ** 2 for x in range(10))

# C.
def func3():
    yield 1
    yield 2
    yield 3

# D.
func4 = [x ** 2 for x in range(10)]

答案:C(包含 yield 的函数)


2. 生成器相比列表的优势是?

# A. 速度更快
# B. 可以使用更多操作
# C. 惰性求值,节省内存
# D. 可以无限大

答案:C


3. next(it, 'default') 的作用是?

# A. 返回第 next 个元素
# B. 迭代两次
# C. 提供默认值,迭代器耗尽时返回默认值而非抛异常
# D. 跳过元素

答案:C


4. yield from 的作用是?

# A. 产生值并继续
# B. 委托给子生成器
# C. 结束生成器
# D. 接收值

答案:B


5. 生成器表达式 (x ** 2 for x in range(5)) 的类型是?

# A. list
# B. tuple
# C. generator
# D. set

答案:C


编程题

1. 实现一个生成器函数:

def prime_generator(limit):
    """生成素数生成器"""
    def is_prime(n):
        if n < 2:
            return False
        for i in range(2, int(n**0.5) + 1):
            if n % i == 0:
                return False
        return True
    
    num = 2
    while num <= limit:
        if is_prime(num):
            yield num
        num += 1

# 使用
for p in prime_generator(50):
    print(p, end=' ')  # 2 3 5 7 11 13 17 19 23 29 31 37 41 43 47

2. 实现文件行生成器:

def file_lines(filepath):
    """逐行读取文件(内存高效)"""
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            yield line.strip()

# 使用
for line in file_lines('large_file.txt'):
    if 'error' in line.lower():
        print(line)

3. 实现数据处理管道:

def generate_numbers(start, end):
    """生成范围内的数字"""
    for i in range(start, end + 1):
        yield i

def filter_positive(numbers):
    """过滤正数"""
    for n in numbers:
        if n > 0:
            yield n

def square(numbers):
    """平方"""
    for n in numbers:
        yield n ** 2

def take(numbers, count):
    """取前 n 个"""
    for i, n in enumerate(numbers):
        if i >= count:
            break
        yield n

def pipeline(start, end, count):
    """数据处理管道"""
    return take(square(filter_positive(generate_numbers(start, end))), count)

# 使用
print(list(pipeline(-5, 10, 5)))  # [1, 4, 9, 16, 25]

4. 使用 itertools 实现复杂操作:

import itertools

def solve():
    # 问题:找出前 10 个同时是 2, 3, 5 的倍数的数
    numbers = itertools.cycle([2, 3, 5])
    return [x for x in itertools.takewhile(lambda x: len([n for n in [2, 3, 5] if x % n == 0]) == 3, 
                                           itertools.count())][:10]

# 简单版本
def simple_solution():
    result = []
    n = 1
    while len(result) < 10:
        if n % 30 == 0:  # 同时是 2, 3, 5 的倍数
            result.append(n)
        n += 1
    return result

print(simple_solution())  # [30, 60, 90, 120, 150, 180, 210, 240, 270, 300]

问答题

Q1: 什么是迭代器协议?请描述其核心方法。

迭代器协议要求对象实现两个方法:

  • __iter__():返回迭代器本身
  • __next__():返回下一个元素,当没有更多元素时抛出 StopIteration 异常

这是 Python 中所有迭代器必须遵循的协议。


Q2: 生成器相比普通函数有什么特点?

  1. 惰性求值:不会立即执行,只有调用 next() 时才产生值
  2. 节省内存:不需要一次性生成所有值
  3. 状态保持:函数执行到 yield 时暂停,保存局部状态
  4. 可迭代:生成器是迭代器,可使用 for 循环遍历

Q3: 什么时候应该使用生成器而不是列表?

  1. 处理大数据:不需要一次性加载所有数据
  2. 无限序列:如斐波那契数列、自然数等
  3. 内存敏感:内存有限时
  4. 流处理:数据来自持续输入(如网络、传感器)
  5. 管道处理:多步骤处理,避免创建中间列表

参考资料