使用 OpenAI 和 Python 实现自定义聊天机器人
构建模块
Python 生成器 (Generators)
生成器 是允许你声明一个表现得像迭代器的函数的函数,即它们可以在 for 循环中使用。它们允许你在不将整个数据集存储在内存中的情况下遍历数据,这在处理大数据或想要创建无限序列时非常有用。
当你调用一个普通的 Python 函数时,它会运行到结束并返回一个结果。然而,调用生成器函数会创建一个生成器对象,但函数中的代码不会立即运行。相反,当你遍历生成器时(例如,使用 `for` 循环),函数会运行直到遇到 `yield` 语句,该语句会返回生成的值并暂停函数的状态。然后,函数可以在生成器外部从 `yield` 语句之后立即恢复。
这是一个简单的生成器函数及其使用方法的示例:
def simple_generator():
yield 1
yield 2
yield 3
# 生成器对象可以被遍历
for value in simple_generator():
print(value)当你运行这段代码时,它将打印:
``` 1 2 3 ```
AsyncIO
Asyncio 是一个 Python 库,它提供了一个框架,用于使用协程编写单线程并发代码,并在套接字和其他资源上复用 I/O 访问。它允许你编写执行高级异步 I/O 操作的代码,而无需担心线程管理。
要使用 asyncio,你需要使用 `async` 关键字定义协程。协程是一种特殊的函数,当等待异步操作的结果时,它可以将控制权交还给调用者而不会丢失其状态。这是通过 `await` 关键字完成的。
以下是一些使用 asyncio 的示例:
import asyncio
# 定义一个协程
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
# 等待其他协程的主协程
async def main():
print(f"started at {time.strftime('%X')}")
# 等待两个协程完成
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
# 运行主协程
asyncio.run(main())这段代码将在延迟 1 秒后打印 "hello",然后在延迟 2 秒后打印 "world"。请注意,`asyncio.run()` 是运行主协程的函数,控制权通过 `await` 在事件循环和协程之间传递。
总之,生成器允许你在需要时一次生成一个值,从而节省内存;而 asyncio 允许你以高效且可读的方式编写处理异步操作的代码,从而无需使用多线程即可并发执行大量的网络或 I/O 密集型任务。
ASGI 与 WSGI
描述
ASGI (异步服务器网关接口) 和 WSGI (Web 服务器网关接口) 都是 Web 服务器与 Python Web 应用程序通信的规范,但它们的设计考虑了不同的编程范式。
### WSGI
WSGI 是 PEP 333 中定义的同步标准,允许 Web 服务器与 Python Web 应用程序通信。它是一个简单且传统的模型,其中应用程序使用包含请求信息的环境 (environ) 字典和一个用于启动响应的 start_response 函数来调用。由于 WSGI 是同步的,它每个进程一次只能处理一个请求,因此适用于并行处理需求有限的应用程序。
### ASGI
另一方面,ASGI 是一个异步标准,旨在支持异步 Python 特性,例如 `asyncio` 库。ASGI 应用程序可以在单个进程内并发处理多个请求,这使得它们对于管理许多并发连接或在 I/O 操作上有大量等待时间的应用程序更具可扩展性和性能。
### 比较
以下是简要比较:
- 并发性: ASGI 可以处理许多并发连接,使其适用于 WebSockets、长轮询 HTTP 和其他长连接。
- 兼容性: WSGI 是一个较旧的标准,几乎所有 Python Web 框架都支持它,而 ASGI 的支持正在增长,但尚未普及。
- 复杂性: 由于代码的异步特性,ASGI 应用程序可能更复杂。
- 性能: ASGI 应用程序在资源利用上可能更高效,特别是在 I/O 密集型操作以及利用 HTTP/2 或服务器推送事件 (SSE) 时。
### 带有 HTTP/2 和 SSE 的 ASGI
HTTP/2 是 HTTP 网络协议的重大修订,允许通过单个 TCP 连接进行多个并发请求,这称为多路复用。这与 ASGI 的异步能力非常匹配,允许 ASGI 并发管理这些多个请求,而无需创建多个线程或进程的开销。
服务器推送事件 (SSE) 是一种允许服务器通过 HTTP 连接向客户端推送实时更新的标准。SSE 要求服务器保持连接打开,并在有新更新可用时发送事件。
ASGI 非常适合 SSE,因为:
- 异步处理: ASGI 可以高效地管理 SSE 中通常使用的打开连接,而无需保持线程阻塞,因为服务器仅在事件可用时才向客户端发送数据。
- 可扩展性: 凭借在单个进程中处理大量连接的能力,ASGI 服务器可以同时支持许多客户端,并根据需要向每个客户端发送更新。
- 与 HTTP/2 的兼容性: 将 HTTP/2 的多路复用能力与 ASGI 相结合,可以更轻松地通过单个连接处理多个 SSE 流。
因此,当与 HTTP/2 一起使用时,ASGI 成为构建具有 SSE 的应用程序的绝佳选择,因为它提供了一种高性能且可扩展的方式来处理与客户端的实时、服务器发起的通信。
插图
sequenceDiagram
participant Client
participant Server
Client->>Server: GET /stream
Note over Client: Header: Accept: text/event-stream
Note over Server: 建立流连接
loop 流式传输数据
Server-->>Client: data: {"event": "message", "data": "JSON payload"}
Note over Server: Header: Content-Type: text/event-stream
end
Note over Client: 处理每个接收到的事件服务器推送事件 (SSE)
描述
服务器推送事件 (SSE) 是一种允许服务器通过单个持久 HTTP 连接向网页发送实时更新的技术。这是一种用于构建需要实时更新客户端的应用程序的技术,例如新闻源、社交媒体更新或实时比分。
以下是适合 Web 新手的简要说明:
通常,当你访问网页时,你的浏览器会向服务器发出请求,服务器将请求的页面发回,仅此而已。连接会关闭,直到你发出另一个请求(例如点击链接或刷新页面)。但是,有时你希望在不每次都询问(轮询)的情况下接收来自服务器的持续更新。这就是 SSE 的用途。
使用 SSE,服务器在发送初始响应后保持连接打开,然后可以在新数据可用时发送新数据。这对于实时传递更新非常有用。
插图
以下是显示从服务器到客户端的 SSE 流程的图表的文本表示,包括带有主题和数据的有效负载形状:
sequenceDiagram
participant Client
participant Server
Note over Client,Server: 连接初始化
Client->>Server: GET /events
Note over Server: 设置 "Content-Type: text/event-stream"
Server-->>Client: HTTP 200 OK
Note over Client,Server: 流式传输事件
loop 每次发送事件时
Note over Server: 服务器准备带有特定主题和数据的事件
Server->>Client: event: user-update\n\n
Server->>Client: data: {"userId": 1, "status": "active"}\n\n
Client->>Server: 事件已接收,处理 user-update
Server->>Client: event: message\n\n
Server->>Client: data: {"chatId": 42, "text": "Hello there!"}\n\n
Client->>Server: 事件已接收,处理 message
end
Note over Client,Server: 连接关闭(由客户端或服务器)
Server--xClient: 连接关闭
Client--xServer: 连接关闭你可以将此文本插入 Mermaid 实时编辑器或将其包含在 GitHub Markdown 文件中以渲染可视化图表。如果你是手动编写代码,则需要创建矩形来表示作为参与者的客户端和服务器,用一条线表示流连接,然后将有效负载绘制为带标签的形状(例如气泡或矩形),其中包含你期望在服务器和客户端之间传递的事件类型和 JSON 数据结构。事件应该是从服务器到客户端的定向箭头,展示 SSE 的单向性质。
代码
以下是使用 org-mode 格式化代码的示例:
客户端代码 (HTML + JavaScript)
以下示例展示了如何实现一个简单的网页,该网页使用 JavaScript 监听 SSE。
<html>
<head>
<title>服务器推送事件示例title>
head>
<body>
<h1>实时更新h1>
<div id="updates">div>
<script>
// 创建一个新的 EventSource 实例,连接到 SSE 端点
const eventSource = new EventSource('/events');
eventSource.onmessage = function(event) {
// 接收到消息时调用此函数
const messageData = event.data;
// 将新数据追加到 'updates' div
const updatesElement = document.getElementById('updates');
updatesElement.innerHTML += messageData + '
';
};
eventSource.onerror = function(error) {
// 处理发生的任何错误
console.log('EventSource 失败: ', error);
};
// 当你完成事件监听时
// eventSource.close();
script>
body>
html>服务器端代码 (使用 Node.js)
在服务器端,你将拥有一个使用 SSE 向客户端流式传输更新的端点。以下示例使用带有 Express 框架的 Node.js。
const express = require('express');
const app = express();
app.get('/events', (req, res) => {
// 设置 SSE 的标头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
const sendEvent = (data) => {
res.write(`data: ${JSON.stringify(data)}\n\n`); // 将数据发送到客户端
};
// 每秒发送一次更新
const intervalId = setInterval(() => {
const message = { text: 'Hello World', timestamp: new Date() };
sendEvent(message);
}, 1000);
// 当客户端断开连接时关闭连接
req.on('close', () => {
clearInterval(intervalId);
});
});
const PORT = 3000;
app.listen(PORT, () => {
console.log(`服务器正在端口 ${PORT} 上运行`);
});要运行此示例,你需要在系统上设置 Node.js,安装 Express (`npm install express`),并将服务器端代码保存到文件中(例如 `server.js`)。然后,你可以简单地使用 `node server.js` 运行你的服务器,并将你的 Web 浏览器指向客户端 HTML 页面以开始接收事件。
支持库
poetry init # 在当前目录中初始化一个新的 poetry 项目OpenAI
FastAPI
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于基于标准 Python 类型提示构建 Python 3.8+ API。
poetry add "fastapi[all]" "uvicorn[standard]" # 添加 fastapi 和 uvicornJinja2
Jinja 是一个快速、富有表现力、可扩展的模板引擎。模板中的特殊占位符允许编写类似于 Python 语法的代码。然后将数据传递给模板以渲染最终文档。
安装
poetry add jinja2 # 添加 jinja2代码
templates/doggo.jinja2
<html>
<head>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
<script src="https://unpkg.com/htmx.org@1.9.10">script>
<script src="https://unpkg.com/htmx.org@1.9.10/dist/ext/sse.js">script>
<title>Formulario | Page Doggotitle>
head>
<body>
<h1>作为服务器推送事件的犬种h1>
<hr>
<div id="doggo-sse-listener" hx-ext="sse" sse-connect="/dogstream" sse-swap="Terminate,DogBreedNoMass,DogBreed">
div>
<b>
<div id="DogBreedNoMass">div>
<br>
<div id="DogBreed">div>
b>
body>
html>templates/openai.jinja2
<html>
<head>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
<script src="https://unpkg.com/htmx.org@1.9.10">script>
<script src="https://unpkg.com/htmx.org@1.9.10/dist/ext/sse.js">script>
<title>Page | 原始 OpenAI 响应title>
head>
<body>
<h1>作为服务器推送事件的 OpenAI 响应h1>
<hr>
<div hx-ext="sse" sse-connect="/openaistream">
<article class="message is-info">
<div class="message-header">
<p sse-swap="ResponseNoMass">信息p>
<button class="delete" aria-label="delete">button>
div>
article>
<div class="message-body" sse-swap="Response" hx-swap="innerHTML">
div>
div>
body>
html>HTMX
htmx 让你能够直接在 HTML 中使用属性访问 AJAX、CSS 过渡、WebSockets 和服务器推送事件,因此你可以利用超文本的简单性和强大功能构建现代用户界面。
代码
templates/layout.jinja2
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="https://cdn.tailwindcss.com">script>
<script src="https://unpkg.com/htmx.org@1.9.10">script>
<script src="https://unpkg.com/htmx.org@1.9.10/dist/ext/sse.js">script>
{% block head %}{% endblock %}
head>
<body hx-boost="true">
{% block body %}{% endblock %}
{% block scripts %}{% endblock %}
body>
html>templates/partials/sse.jinja2
{% macro sse_stream(sse_config) %}
<div id="{{ sse_config.listener }}" hx-ext="sse" sse-connect="{{ sse_config.path }}" sse-swap="{{ sse_config.topics | join(',') }}">
div>
{% endmacro %}templates/partials/streaming_chunk.jinja2
event: {{ event }}
data: <div {% for name, value in attrs.items() %} {{ name }}="{{ value }}" {% endfor %}>{{ chunk }}div>templates/partials/ai_message.jinja2
{% macro ai_msg(message) %}
<div class="flex gap-3 my-4 text-gray-600 text-sm flex-1">
<span class="relative flex shrink-0 overflow-hidden rounded-full w-8 h-8">
<div class="rounded-full bg-gray-100 border p-1">
<svg stroke="none" fill="black" stroke-width="1.5"
viewBox="0 0 24 24" aria-hidden="true" height="20" width="20"
xmlns="http://www.w3.org/2000/svg">
<path stroke-linecap="round" stroke-linejoin="round"
d="M9.813 15.904L9 18.75l-.813-2.846a4.5 4.5 0 00-3.09-3.09L2.25 12l2.846-.813a4.5 4.5 0 003.09-3.09L9 5.25l.813 2.846a4.5 4.5 0 003.09 3.09L15.75 12l-2.846.813a4.5 4.5 0 00-3.09 3.09zM18.259 8.715L18 9.75l-.259-1.035a3.375 3.375 0 00-2.455-2.456L14.25 6l1.036-.259a3.375 3.375 0 002.455-2.456L18 2.25l.259 1.035a3.375 3.375 0 002.456 2.456L21.75 6l-1.035.259a3.375 3.375 0 00-2.456 2.456zM16.894 20.567L16.5 21.75l-.394-1.183a2.25 2.25 0 00-1.423-1.423L13.5 18.75l1.183-.394a2.25 2.25 0 001.423-1.423l.394-1.183.394 1.183a2.25 2.25 0 001.423 1.423l1.183.394-1.183.394a2.25 2.25 0 00-1.423 1.423z">
path>
svg>
div>
span>
<p class="leading-relaxed">
<span class="block font-bold text-gray-700">AI span>
{{ message }}
<span id="Response">span>
p>
div>
{% endmacro %}templates/partials/user_message.jinja2
{% macro user_msg(message) %}
<div class="flex gap-3 my-4 text-gray-600 text-sm flex-1"><span
class="relative flex shrink-0 overflow-hidden rounded-full w-8 h-8">
<div class="rounded-full bg-gray-100 border p-1"><svg stroke="none" fill="black" stroke-width="0"
viewBox="0 0 16 16" height="20" width="20" xmlns="http://www.w3.org/2000/svg">
<path
d="M8 8a3 3 0 1 0 0-6 3 3 0 0 0 0 6Zm2-3a2 2 0 1 1-4 0 2 2 0 0 1 4 0Zm4 8c0 1-1 1-1 1H3s-1 0-1-1 1-4 6-4 6 3 6 4Zm-1-.004c-.001-.246-.154-.986-.832-1.664C11.516 10.68 10.289 10 8 10c-2.29 0-3.516.68-4.168 1.332-.678.678-.83 1.418-.832 1.664h10Z">
path>
svg>div>
span>
<p class="leading-relaxed">
<span class="block font-bold text-gray-700">你 span>
{{ message }}
p>
div>
{% endmacro %}templates/index.jinja2
{% extends "layout.jinja2" %}
{% from "partials/sse.jinja2" import sse_stream %}
{% from "partials/ai_message.jinja2" import ai_msg %}
{% from "partials/user_message.jinja2" import user_msg %}
{% block body %}
<button class="fixed bottom-4 right-4 inline-flex items-center justify-center text-sm font-medium disabled:pointer-events-none disabled:opacity-50 border rounded-full w-16 h-16 bg-black hover:bg-gray-700 m-0 cursor-pointer border-gray-200 bg-none p-0 normal-case leading-5 hover:text-gray-900"
type="button" aria-haspopup="dialog" aria-expanded="false" data-state="closed">
<svg xmlns=" http://www.w3.org/2000/svg" width="30" height="40" viewBox="0 0 24 24" fill="none"
stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"
class="text-white block border-gray-200 align-middle">
<path d="m3 21 1.9-5.7a8.5 8.5 0 1 1 3.8 3.8z" class="border-gray-200">
path>
svg>
button>
<div class="md:container md:mx-auto" style="box-shadow: 0 0 #0000, 0 0 #0000, 0 1px 2px 0 rgb(0 0 0 / 0.05);"
class="fixed bottom-[calc(4rem+1.5rem)] right-40 mr-4 bg-white p-6 rounded-lg border border-[#e5e7eb] w-[3/4] h-[634px]">
<div class="flex flex-col space-y-1.5 pb-6">
<h2 class="font-semibold text-lg tracking-tight">自定义聊天机器人h2>
<p class="text-sm text-[#6b7280] leading-3">由你的 OpenAI 密钥驱动p>
div>
<div class="pr-4 h-[474px]" style="min-width: 100%; display: table;">
{% if sse_config %}
{{ sse_stream(sse_config) }}
{% endif %}
{% for message in messages %}
{% if message.sender == 'ai' %}
{{ ai_msg(message.content) }}
{% endif %}
{% if message.sender == 'user' %}
{{ user_msg(message.content) }}
{% endif %}
{% endfor %}
div>
<div class="flex items-center pt-0">
<form
action="{{ url_for('openai', req_id=req_id) }}" method="POST"
class="flex items-center justify-center w-full space-x-2"
>
<input
class="flex h-10 w-full rounded-md border border-[#e5e7eb] px-3 py-2 text-sm placeholder-[#6b7280] focus:outline-none focus:ring-2 focus:ring-[#9ca3af] disabled:cursor-not-allowed disabled:opacity-50 text-[#030712] focus-visible:ring-offset-2"
type="text" name="user_prompt"
placeholder="给 ChatGPT 发送消息..." value="">
<input
type="submit"
class="inline-flex items-center justify-center rounded-md text-sm font-medium text-[#f9fafb] disabled:pointer-events-none disabled:opacity-50 bg-black hover:bg-[#111827E6] h-10 px-4 py-2"
value="发送消息">
form>
div>
div>
{% endblock %}Markdown
Markdown 是一种轻量级标记语言,你可以使用它为纯文本文件添加格式元素。Markdown 由 John Gruber 于 2004 年创建,现在是世界上最流行的标记语言之一。
sequenceDiagram
participant M as Markdown 文件
participant P as Markdown 处理器
participant H as HTML 文件
participant B as Web 浏览器
M->>P: 处理 Markdown
P->>H: 生成 HTML
H->>B: 在浏览器中渲染此序列图说明了从 Markdown 文件经过处理到渲染的简单流程,如下所示:
- “Markdown 文件”作为输入。
- “Markdown 处理器”将 Markdown 转换为 HTML。
- 生成的“HTML 文件”随后就绪。
- 最后,“Web 浏览器”渲染 HTML 文件以向用户显示输出。
TailwindCSS
TailwindCSS 是一个实用优先的 CSS 框架,包含诸如 flex、**pt-4**、**text-center** 和 rotate-90 等类,可以直接在你的标记中组合这些类来构建任何设计。
SQLAlchemy
代码
[tool.poetry]
name = "custom-chatbot"
version = "0.1.0"
description = ""
authors = ["ChiefKemist"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.12"
Jinja2 = "^3.1.2"
fastapi = "^0.109.0"
python-multipart = "^0.0.6"
uvicorn = {extras = ["standard"], version = "^0.25.0"}
httpx = "^0.26.0"
openai = "^1.7.2"
sqlalchemy = "^2.0.25"
markdown = "^3.5.2"
pygments = "^2.17.2"
semantic-router = "^0.0.18"
rich = "^13.7.0"
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.4"
black = "^23.12.1"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"CLI 聊天应用
聊天上下文
聊天记录持久化
代码
import logging
import sys
import hashlib
import openai
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from getpass import getpass
from rich.console import Console
logging.basicConfig(
stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s %(levelname)s [%(module)s] %(message)s',
)
log = logging.getLogger(__name__)
console = Console()
# SQLAlchemy 设置
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String, unique=True)
password_hash = Column(String)
class Message(Base):
__tablename__ = 'messages'
id = Column(Integer, primary_key=True)
chat_room = Column(String)
sender = Column(String)
message = Column(String)
timestamp = Column(DateTime, default=datetime.now)
# SQLite 数据库连接
engine = create_engine('sqlite:///chat.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
def hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
def register_user():
username = input("输入新用户名: ")
if session.query(User).filter_by(username=username).first():
console.log(
"用户名已存在。请尝试不同的用户名。",
style="bold red"
)
return None
password = getpass("输入新密码: ")
hashed_password = hash_password(password)
new_user = User(username=username, password_hash=hashed_password)
session.add(new_user)
session.commit()
return username
def login_user():
username = input("输入用户名: ")
password = getpass("输入密码: ")
hashed_password = hash_password(password)
user = session.query(User).filter_by(username=username, password_hash=hashed_password).first()
if user:
return username
else:
console.log("用户名或密码无效。", style="bold red")
return None
def save_message(chat_room, sender, message):
new_message = Message(chat_room=chat_room, sender=sender, message=message)
session.add(new_message)
session.commit()
def get_chat_history(chat_room):
messages = session.query(Message).filter_by(chat_room=chat_room).order_by(Message.timestamp).all()
return [f"{message.sender}: {message.message}" for message in messages]
def get_gpt4_response(prompt, chat_history):
# openai.api_key = 'your-api-key' # 替换为你的实际 OpenAI API 密钥
combined_prompt = "\n".join(chat_history[-50:]) + f"\n{prompt}" # 将历史记录限制为最后 50 条消息
response = openai.OpenAI().chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "你是一个乐于助人的助手。"},
{"role": "user", "content": combined_prompt}
]
)
return response.choices[0].message.content
def run_chat(username):
chat_room = username
console.log(
f"欢迎使用你的个人 GPT-4 聊天 CLI, {username}。输入 'quit' 退出。",
style="bold blue"
)
while True:
user_input = input("你: ")
if user_input.lower() == 'quit':
break
chat_context = get_chat_history(chat_room)
gpt_response = get_gpt4_response(user_input, chat_context)
save_message(chat_room, "你", user_input)
save_message(chat_room, "GPT-4", gpt_response)
console.log(f"GPT-4: {gpt_response}", style="bold green")
console.log("聊天结束。", style="bold blue")
def main():
log.info("欢迎使用聊天应用程序")
choice = input("你想 [L]登录 还是 [R]注册? (L/R): ").lower()
username = None
while not username:
if choice == 'r':
username = register_user()
elif choice == 'l':
username = login_user()
else:
choice = input("请输入 'L' 登录或 'R' 注册: ").lower()
if username:
run_chat(username)
if __name__ == "__main__":
main()流式传输狗狗
描述
利用 Python 的 asyncio 和 **生成器**,以及 Htmx,将犬种流式传输到浏览器。 犬种由 dog.ceo 提供。
代码
#begin_src bash
curl https://dog.ceo/api/breeds/list/all
#+end_src
async_doggo.py
#!/usr/bin/env python3
import asyncio
import logging
import sys
import typing
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from jinja2 import Environment, FileSystemLoader, select_autoescape
logging.basicConfig(
stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s %(levelname)s [%(module)s] %(message)s',
)
log = logging.getLogger(__name__)
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
def app_context(request: Request) -> typing.Dict[str, typing.Any]:
return {'app': request.app}
templates = Jinja2Templates(
directory="templates",
context_processors=[app_context],
)
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse(
request=request, name="doggo.jinja2",
)
def render_sse_html_chunk(event, chunk, attrs=None):
if attrs is None:
attrs = {}
tmpl = Environment(
loader=FileSystemLoader('templates/partials'),
autoescape=select_autoescape(['html'])
).select_template(['streaming_chunk.jinja2'])
html_chunk = tmpl.render(**dict(event=event, chunk=chunk, attrs=attrs))
return html_chunk
async def gen_dog_breeds():
async with httpx.AsyncClient() as client:
breeds = (await client.get('https://dog.ceo/api/breeds/list/all')).json()
for breed in breeds['message'].keys():
log.info(f"正在生成 {breed}")
yield breed
@app.get("/dogstream", response_class=StreamingResponse)
async def dogstream(request: Request):
async def dogbreeds_iter():
async for breed in gen_dog_breeds():
await asyncio.sleep(0.2)
breed_status_chunk = render_sse_html_chunk(
'DogBreedNoMass',
'更多狗狗前辈 :-)',
{
'id': 'DogBreedNoMass',
'hx-swap-oob': 'true',
},
)
yield f'{breed_status_chunk}\n\n'.encode('utf-8')
await asyncio.sleep(0.2)
chunk = render_sse_html_chunk(
'DogBreed',
breed,
{
'id': 'DogBreed',
'hx-swap-oob': 'true',
},
)
yield f'{chunk}\n\n'.encode('utf-8')
breed_status_chunk = render_sse_html_chunk(
'DogBreedNoMass',
'没有更多狗狗前辈了 :-(',
{
'id': 'DogBreedNoMass',
'hx-swap-oob': 'true',
},
)
yield f'{breed_status_chunk}\n\n'.encode('utf-8')
return StreamingResponse(
dogbreeds_iter(),
media_type='text/event-stream',
)
if __name__ == '__main__':
import uvicorn
uvicorn.run('async_doggo:app', host='0.0.0.0', port=6543, reload=True)流式传输 OpenAI
原始 / 纯文本
更好看
富文本
async_chat.py
#!/usr/bin/env python3
import asyncio
import functools
import concurrent.futures
import logging
import re
import sys
import queue
import typing
import uuid
from io import StringIO
from time import sleep
import httpx
import markdown
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters.html import HtmlFormatter
from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from jinja2 import Environment, FileSystemLoader, select_autoescape
logging.basicConfig(
stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s %(levelname)s [%(module)s] %(message)s',
)
log = logging.getLogger(__name__)
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
def app_context(request: Request) -> typing.Dict[str, typing.Any]:
return {'app': request.app}
templates = Jinja2Templates(
directory="templates",
context_processors=[app_context],
)
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
req_id = str(uuid.uuid4())
messages = [
{'sender': 'ai', 'content': '你好,今天有什么可以帮你的吗?'},
{'sender': 'user', 'content': 'faslskadalksjioqjeqlkj'},
{'sender': 'ai',
'content': '抱歉,我在文档中找不到关于该内容的任何信息。预计答案准确度较低。我在验证来源中找不到此问题的答案。'},
]
return templates.TemplateResponse(
request=request, name="index.jinja2",
context=dict(messages=messages, req_id=req_id)
)
reqs = {}
@app.post("/openai/{req_id}", response_class=HTMLResponse)
async def openai(request: Request, req_id: str, user_prompt: typing.Annotated[str, Form()]):
log.debug(f'用户提示: {user_prompt}')
reqs[req_id] = user_prompt
messages = [
{'sender': 'user', 'content': user_prompt},
{'sender': 'ai', 'content': ''},
]
sse_config = dict(
listener='openai',
path=f'/openaistream/{req_id}',
topics=[
'Response',
'ResponseNoMass',
'Terminate',
]
)
new_req_id = str(uuid.uuid4())
return templates.TemplateResponse(
request=request, name="index.jinja2",
context=dict(
messages=messages, req_id=new_req_id, sse_config=sse_config
)
)
def render_sse_html_chunk(event, chunk, attrs=None):
if attrs is None:
attrs = {}
tmpl = Environment(
loader=FileSystemLoader('templates/partials'),
autoescape=select_autoescape(['html'])
).select_template(['streaming_chunk.jinja2'])
html_chunk = tmpl.render(**dict(event=event, chunk=chunk, attrs=attrs))
return html_chunk
def markdown_to_html_with_highlighting(source_markdown):
md = markdown.Markdown(extensions=['fenced_code', 'codehilite'])
html = md.convert(source_markdown)
css = HtmlFormatter().get_style_defs('.codehilite')
return f"{html.replace('\n', '
')}"
def markdown_to_html_with_inline_highlighting(source_markdown):
formatter = HtmlFormatter(style='default', cssclass='', noclasses=True)
def inline_highlight(match):
language = match.group('lang')
code = match.group('code')
lexer = get_lexer_by_name(language, stripall=True)
highlighted_code = highlight(code.replace('
', '\n'), lexer, formatter)
return highlighted_code.replace('\n', '
')
highlighted_markdown = re.sub(
r'```(?P\w+)\s*(?P.*?)```',
inline_highlight,
source_markdown,
flags=re.DOTALL
)
html = markdown.markdown(highlighted_markdown)
return html
async def run_openai(req_id: str):
from time import perf_counter
from openai import AsyncOpenAI
user_prompt = reqs[req_id]
log.debug(f'用户提示: {user_prompt}')
messages = [
{'role': 'system', 'content': '请仅使用 markdown 回复。'},
{'role': 'user', 'content': user_prompt},
]
chunks = await AsyncOpenAI().chat.completions.create(
model='gpt-4',
messages=messages,
stream=True,
)
last = None
result_chunks = []
result_concat = StringIO()
async for chunk in chunks:
now = perf_counter()
if last is not None:
t = now - last
else:
t = 0
text = chunk.choices[0].delta.content
if text is not None:
result_chunks.append((t, text))
result_concat.write(f"{text}")
mdText = markdown_to_html_with_inline_highlighting(
result_concat.getvalue().replace('\n', "
")
)
yield mdText
else:
log.debug('无文本,添加空格')
last = now
yield None # 全部完成
text = ''.join(text for _, text in result_chunks)
@app.get("/openaistream/{req_id}", response_class=StreamingResponse)
async def openaistream(request: Request, req_id: str):
log.info(f"请求 ID: {req_id}")
async def openai_iter():
async for resp in run_openai(req_id):
if resp is None:
chunk = render_sse_html_chunk(
'Terminate',
'',
{
'id': 'openai',
'hx-swap-oob': 'true',
},
)
yield f'{chunk}\n\n'.encode('utf-8')
break
chunk = render_sse_html_chunk(
'Response',
resp,
{
'id': 'Response',
'hx-swap-oob': 'true',
},
)
yield f'{chunk}\n\n'.encode('utf-8')
chunk = render_sse_html_chunk(
'Terminate',
'',
{
'id': 'openai',
'hx-swap-oob': 'true',
},
)
yield f'{chunk}\n\n'.encode('utf-8')
return StreamingResponse(
openai_iter(),
media_type='text/event-stream',
)
if __name__ == '__main__':
import uvicorn
uvicorn.run('async_chat:app', host='0.0.0.0', port=6543, reload=True)带有助手的聊天 Web 应用
助手 API
持久化助手详情
助手数据
具有上下文和持久性的多线程聊天
语义路由器 (Semantic Router)
额外
部署
FastAPI
FastUI
参考资料
- 阅读在线书籍 Hypermedia Systems。
- 阅读 Hypermedia Systems 第 5 章中的 Htmx 模式。
- 阅读 Hypermedia Systems 第 8 章中的 Htmx 大师技巧。
- ///_hyperscript 是一种用于现代 Web 前端的简单易用的语言。
- H/ Hyperview 是一种用于开发服务器驱动移动应用的新型超媒体格式和 React Native 客户端。

