python协程记录

Python 协程(Coroutine)详解

一、什么是协程

协程(Coroutine) 是一种比线程更轻量级的并发编程方式。

👉 简单理解:

  • 线程:由操作系统调度
  • 协程:由程序自己调度

协程允许函数在执行过程中 主动暂停并在之后恢复执行


二、为什么需要协程

传统并发模型:

模型 优点 缺点
多线程 编程直观 线程切换开销大
多进程 稳定性高 资源消耗大
协程 轻量高效 需要异步编程思维

协程适合场景

  • 网络请求(HTTP)
  • 数据库访问
  • IO密集型任务
  • 爬虫
  • 微服务通信

三、协程核心特点

1. 用户态调度

协程切换不依赖操作系统,而是程序控制。

2. 非阻塞执行

一个任务等待IO时,可以切换执行其他任务。

3. 单线程并发

协程通常运行在单线程中,但可以实现高并发。


四、Python 中的协程实现

Python 3.5 之后正式支持 async/await 语法。


五、协程基本语法

1. 定义协程函数

1
2
async def hello():
print("Hello Coroutine")

⚠️ 注意:

async def 定义协程函数 调用后不会立即执行

2. 运行协程

1
2
3
4
5
6
import asyncio

async def hello():
print("Hello Coroutine")

asyncio.run(hello())

3.await 关键字

await用于等待另一个协程执行完成。

1
2
3
4
5
6
7
import asyncio

async def task():
await asyncio.sleep(1)
print("Task finished")

asyncio.run(task())

六、事件循环(Event Loop)

事件循环是协程运行的核心。

👉 作用:

  • 调度任务

  • 管理IO等待

  • 切换协程执行

流程:

1
创建任务 → 注册事件循环 → 执行协程 → IO等待 → 切换任务

七、并发执行多个协程

使用 asyncio.gather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def task1():
await asyncio.sleep(1)
print("Task1 done")

async def task2():
await asyncio.sleep(2)
print("Task2 done")

async def main():
await asyncio.gather(task1(), task2())

asyncio.run(main())

👉 两个任务会并发执行

👉 两个任务会并发执行

八、Task 对象

Task 是对协程的封装,用于调度执行。

1
2
3
4
5
6
async def main():
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())

await t1
await t2

九、协程 vs 线程

对比 协程 线程
调度方式 用户控制 OS控制
切换开销 非常小 较大
并发能力
编程复杂度 较高 较低

十、协程执行流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
协程开始

执行代码

遇到 await

挂起当前协程

执行其他协程

IO完成

恢复协程

十一、常见误区

❌ 协程不是多线程

协程通常运行在单线程。

❌ 没有 await 就不是异步

如果协程内部没有 await,实际上是同步执行。

十二、真实应用示例(HTTP请求)

1
2
3
4
5
6
7
8
9
10
11
12
13
import aiohttp
import asyncio

async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()

async def main():
html = await fetch("https://example.com")
print(len(html))

asyncio.run(main())

十三、协程优缺点总结

优点

  • 高并发

  • 资源占用低

  • IO性能强

缺点

  • 不适合CPU密集型
  • 代码理解难度稍高

十四、适用场景总结

✔ 网络编程
✔ 微服务
✔ 爬虫系统
✔ 高并发IO任务

十五、demo

数据库读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import pymysql
import re
import aiomysql

SQL = '''SELECT
mbt.id,
mbt.summary,
mbt.version,
mbtt.steps_to_reproduce,
mbtt.additional_information,
notes.notes,
revisions.revision_values
FROM mantis_bug_table mbt
LEFT JOIN mantis_bug_text_table mbtt
ON mbt.id = mbtt.id
LEFT JOIN (
SELECT
mbn.bug_id,
GROUP_CONCAT(
mbnt.note
ORDER BY mbn.date_submitted
SEPARATOR '\n---\n'
) AS notes
FROM mantis_bugnote_table mbn
JOIN mantis_bugnote_text_table mbnt
ON mbnt.id = mbn.bugnote_text_id
GROUP BY mbn.bug_id
) notes
ON notes.bug_id = mbt.id
LEFT JOIN (
SELECT
bug_id,
GROUP_CONCAT(value ORDER BY id SEPARATOR ',') AS revision_values
FROM mantis_bug_revision_table
GROUP BY bug_id
) revisions
ON revisions.bug_id = mbt.id WHERE mbt.id = %s;
'''

pool = None

async def init_db():
global pool
pool = await aiomysql.create_pool(
host="12.12.254.242",
port=3306,
user="root",
password="eve-ng",
db="bugtracker",
charset="utf8mb4",
minsize=1,
maxsize=30
)

async def close_db():
global pool
if pool:
pool.close()
await pool.wait_closed()


async def get_bug_by_id(bug_id):
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(SQL,(bug_id,))
row = await cursor.fetchone()
if not row:
return None
return format_(row)


def format_(sql_data):
def extract(text):
if not text:
return '', ''
r = re.search(r'问题原因分析[::]\s*(.*?)[。*\n]', text)
f = re.search(r'修改方案描述[::]\s*(.*?)[。*\n]', text)
return (r.group(1) if r else ''), (f.group(1) if f else '')

result, fix = extract(sql_data.get('additional_information'))
if not result:
result, fix = extract(sql_data.get('notes'))
if not result:
result, fix = extract(sql_data.get('revision_values'))
return {
'bug_id': sql_data['id'],
'bug_title': sql_data['summary'],
'version': sql_data['version'],
'step': sql_data['steps_to_reproduce'],
'reply': {
'cause_of_bug': result,
'bug_solution': fix
}
}


API接口调用数据库查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from util.db_tool import init_db, close_db, get_bug_by_id
from loguru import logger
import uvicorn

@asynccontextmanager
async def lifespan(app: FastAPI):
# ===== 启动阶段 =====
await init_db()
logger.info("数据库连接池已启动")
yield
# ===== 关闭阶段 =====
await close_db()
logger.info("数据库连接池已关闭")

app = FastAPI(lifespan=lifespan)

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)


@app.get('/id/{bug_id}')
async def search_(bug_id: int):
logger.info(f"收到查询请求: {bug_id}")
result = await get_bug_by_id(bug_id)
if not result:
return {'error': '未找到该 bug'}
return result


app.mount("/", StaticFiles(directory="static", html=True), name="static")


if __name__ == '__main__':
uvicorn.run('demo:app', host="0.0.0.0", port=8910)