Ubuntu TechHive
implementing-a-custom-chatbot-with-openai-and-python.md
使用 OpenAI 和 Python 实现自定义聊天机器人
article.细节

使用 OpenAI 和 Python 实现自定义聊天机器人

reading.进展 13 分钟阅读数

使用 OpenAI 和 Python 实现自定义聊天机器人的说明

使用 OpenAI 和 Python 实现自定义聊天机器人

构建模块

Python 生成器 (Generators)

生成器 是允许你声明一个表现得像迭代器的函数的函数,即它们可以在 for 循环中使用。它们允许你在不将整个数据集存储在内存中的情况下遍历数据,这在处理大数据或想要创建无限序列时非常有用。

当你调用一个普通的 Python 函数时,它会运行到结束并返回一个结果。然而,调用生成器函数会创建一个生成器对象,但函数中的代码不会立即运行。相反,当你遍历生成器时(例如,使用 `for` 循环),函数会运行直到遇到 `yield` 语句,该语句会返回生成的值并暂停函数的状态。然后,函数可以在生成器外部从 `yield` 语句之后立即恢复。

这是一个简单的生成器函数及其使用方法的示例:

def simple_generator():
    yield 1
    yield 2
    yield 3

# 生成器对象可以被遍历
for value in simple_generator():
    print(value)

当你运行这段代码时,它将打印:

``` 1 2 3 ```

AsyncIO

Asyncio 是一个 Python 库,它提供了一个框架,用于使用协程编写单线程并发代码,并在套接字和其他资源上复用 I/O 访问。它允许你编写执行高级异步 I/O 操作的代码,而无需担心线程管理。

要使用 asyncio,你需要使用 `async` 关键字定义协程。协程是一种特殊的函数,当等待异步操作的结果时,它可以将控制权交还给调用者而不会丢失其状态。这是通过 `await` 关键字完成的。

以下是一些使用 asyncio 的示例:

import asyncio

# 定义一个协程
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

# 等待其他协程的主协程
async def main():
    print(f"started at {time.strftime('%X')}")

    # 等待两个协程完成
    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

# 运行主协程
asyncio.run(main())

这段代码将在延迟 1 秒后打印 "hello",然后在延迟 2 秒后打印 "world"。请注意,`asyncio.run()` 是运行主协程的函数,控制权通过 `await` 在事件循环和协程之间传递。

总之,生成器允许你在需要时一次生成一个值,从而节省内存;而 asyncio 允许你以高效且可读的方式编写处理异步操作的代码,从而无需使用多线程即可并发执行大量的网络或 I/O 密集型任务。

ASGI 与 WSGI

描述

ASGI (异步服务器网关接口)WSGI (Web 服务器网关接口) 都是 Web 服务器与 Python Web 应用程序通信的规范,但它们的设计考虑了不同的编程范式。

### WSGI

WSGI 是 PEP 333 中定义的同步标准,允许 Web 服务器与 Python Web 应用程序通信。它是一个简单且传统的模型,其中应用程序使用包含请求信息的环境 (environ) 字典和一个用于启动响应的 start_response 函数来调用。由于 WSGI 是同步的,它每个进程一次只能处理一个请求,因此适用于并行处理需求有限的应用程序。

### ASGI

另一方面,ASGI 是一个异步标准,旨在支持异步 Python 特性,例如 `asyncio` 库。ASGI 应用程序可以在单个进程内并发处理多个请求,这使得它们对于管理许多并发连接或在 I/O 操作上有大量等待时间的应用程序更具可扩展性和性能。

### 比较

以下是简要比较:

  • 并发性: ASGI 可以处理许多并发连接,使其适用于 WebSockets、长轮询 HTTP 和其他长连接。
  • 兼容性: WSGI 是一个较旧的标准,几乎所有 Python Web 框架都支持它,而 ASGI 的支持正在增长,但尚未普及。
  • 复杂性: 由于代码的异步特性,ASGI 应用程序可能更复杂。
  • 性能: ASGI 应用程序在资源利用上可能更高效,特别是在 I/O 密集型操作以及利用 HTTP/2 或服务器推送事件 (SSE) 时。

### 带有 HTTP/2 和 SSE 的 ASGI

HTTP/2 是 HTTP 网络协议的重大修订,允许通过单个 TCP 连接进行多个并发请求,这称为多路复用。这与 ASGI 的异步能力非常匹配,允许 ASGI 并发管理这些多个请求,而无需创建多个线程或进程的开销。

服务器推送事件 (SSE) 是一种允许服务器通过 HTTP 连接向客户端推送实时更新的标准。SSE 要求服务器保持连接打开,并在有新更新可用时发送事件。

ASGI 非常适合 SSE,因为:

  • 异步处理: ASGI 可以高效地管理 SSE 中通常使用的打开连接,而无需保持线程阻塞,因为服务器仅在事件可用时才向客户端发送数据。
  • 可扩展性: 凭借在单个进程中处理大量连接的能力,ASGI 服务器可以同时支持许多客户端,并根据需要向每个客户端发送更新。
  • 与 HTTP/2 的兼容性: 将 HTTP/2 的多路复用能力与 ASGI 相结合,可以更轻松地通过单个连接处理多个 SSE 流。

因此,当与 HTTP/2 一起使用时,ASGI 成为构建具有 SSE 的应用程序的绝佳选择,因为它提供了一种高性能且可扩展的方式来处理与客户端的实时、服务器发起的通信。

插图

sequenceDiagram
    participant Client
    participant Server
    Client->>Server: GET /stream
    Note over Client: Header: Accept: text/event-stream
    Note over Server: 建立流连接
    loop 流式传输数据
        Server-->>Client: data: {"event": "message", "data": "JSON payload"}
        Note over Server: Header: Content-Type: text/event-stream
    end
    Note over Client: 处理每个接收到的事件

服务器推送事件 (SSE)

描述

服务器推送事件 (SSE) 是一种允许服务器通过单个持久 HTTP 连接向网页发送实时更新的技术。这是一种用于构建需要实时更新客户端的应用程序的技术,例如新闻源、社交媒体更新或实时比分。

以下是适合 Web 新手的简要说明:

通常,当你访问网页时,你的浏览器会向服务器发出请求,服务器将请求的页面发回,仅此而已。连接会关闭,直到你发出另一个请求(例如点击链接或刷新页面)。但是,有时你希望在不每次都询问(轮询)的情况下接收来自服务器的持续更新。这就是 SSE 的用途。

使用 SSE,服务器在发送初始响应后保持连接打开,然后可以在新数据可用时发送新数据。这对于实时传递更新非常有用。

插图

以下是显示从服务器到客户端的 SSE 流程的图表的文本表示,包括带有主题和数据的有效负载形状:

sequenceDiagram
    participant Client
    participant Server

    Note over Client,Server: 连接初始化
    Client->>Server: GET /events
    Note over Server: 设置 "Content-Type: text/event-stream"
    Server-->>Client: HTTP 200 OK

    Note over Client,Server: 流式传输事件
    loop 每次发送事件时
        Note over Server: 服务器准备带有特定主题和数据的事件
        Server->>Client: event: user-update\n\n
        Server->>Client: data: {"userId": 1, "status": "active"}\n\n
        Client->>Server: 事件已接收,处理 user-update

        Server->>Client: event: message\n\n
        Server->>Client: data: {"chatId": 42, "text": "Hello there!"}\n\n
        Client->>Server: 事件已接收,处理 message
    end

    Note over Client,Server: 连接关闭(由客户端或服务器)
    Server--xClient: 连接关闭
    Client--xServer: 连接关闭

你可以将此文本插入 Mermaid 实时编辑器或将其包含在 GitHub Markdown 文件中以渲染可视化图表。如果你是手动编写代码,则需要创建矩形来表示作为参与者的客户端和服务器,用一条线表示流连接,然后将有效负载绘制为带标签的形状(例如气泡或矩形),其中包含你期望在服务器和客户端之间传递的事件类型和 JSON 数据结构。事件应该是从服务器到客户端的定向箭头,展示 SSE 的单向性质。

代码

以下是使用 org-mode 格式化代码的示例:

客户端代码 (HTML + JavaScript)

以下示例展示了如何实现一个简单的网页,该网页使用 JavaScript 监听 SSE。


<html>
<head>
  <title>服务器推送事件示例title>
head>
<body>
  <h1>实时更新h1>
  <div id="updates">div>

  <script>
    // 创建一个新的 EventSource 实例,连接到 SSE 端点
    const eventSource = new EventSource('/events');

    eventSource.onmessage = function(event) {
      // 接收到消息时调用此函数
      const messageData = event.data;
      
      // 将新数据追加到 'updates' div
      const updatesElement = document.getElementById('updates');
      updatesElement.innerHTML += messageData + '
'
;
}; eventSource.onerror = function(error) { // 处理发生的任何错误 console.log('EventSource 失败: ', error); }; // 当你完成事件监听时 // eventSource.close(); script> body> html>
服务器端代码 (使用 Node.js)

在服务器端,你将拥有一个使用 SSE 向客户端流式传输更新的端点。以下示例使用带有 Express 框架的 Node.js。

const express = require('express');
const app = express();

app.get('/events', (req, res) => {
  // 设置 SSE 的标头
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });

  const sendEvent = (data) => {
    res.write(`data: ${JSON.stringify(data)}\n\n`); // 将数据发送到客户端
  };

  // 每秒发送一次更新
  const intervalId = setInterval(() => {
    const message = { text: 'Hello World', timestamp: new Date() };
    sendEvent(message);
  }, 1000);

  // 当客户端断开连接时关闭连接
  req.on('close', () => {
    clearInterval(intervalId);
  });
});

const PORT = 3000;
app.listen(PORT, () => {
  console.log(`服务器正在端口 ${PORT} 上运行`);
});

要运行此示例,你需要在系统上设置 Node.js,安装 Express (`npm install express`),并将服务器端代码保存到文件中(例如 `server.js`)。然后,你可以简单地使用 `node server.js` 运行你的服务器,并将你的 Web 浏览器指向客户端 HTML 页面以开始接收事件。

支持库

poetry init # 在当前目录中初始化一个新的 poetry 项目

OpenAI

OpenAI 是一家人工智能研究和部署公司。他们的使命是确保通用人工智能造福全人类。

GPT-4 是 OpenAI 最先进的系统,能产生更安全、更有用的响应。得益于其更广泛的通用知识和问题解决能力,GPT-4 可以更准确地解决难题。

DALL·E 3 比我们之前的系统理解更多的细微差别和细节,让你能够轻松地将你的想法转化为极其准确的图像。

poetry add openai # 添加 openai

FastAPI

FastAPI 是一个现代、快速(高性能)的 Web 框架,用于基于标准 Python 类型提示构建 Python 3.8+ API。

poetry add "fastapi[all]" "uvicorn[standard]" # 添加 fastapi 和 uvicorn

Jinja2

Jinja 是一个快速、富有表现力、可扩展的模板引擎。模板中的特殊占位符允许编写类似于 Python 语法的代码。然后将数据传递给模板以渲染最终文档。

安装

poetry add jinja2 # 添加 jinja2

代码

templates/doggo.jinja2

<html>
<head>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
    <script src="https://unpkg.com/htmx.org@1.9.10">script>
    <script src="https://unpkg.com/htmx.org@1.9.10/dist/ext/sse.js">script>
    <title>Formulario | Page Doggotitle>
head>
<body>
    <h1>作为服务器推送事件的犬种h1>
    <hr>
    <div id="doggo-sse-listener" hx-ext="sse" sse-connect="/dogstream" sse-swap="Terminate,DogBreedNoMass,DogBreed">
    div>
    <b>
        <div id="DogBreedNoMass">div>
        <br>
        <div id="DogBreed">div>
    b>
body>
html>

templates/openai.jinja2

<html>
<head>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
    <script src="https://unpkg.com/htmx.org@1.9.10">script>
    <script src="https://unpkg.com/htmx.org@1.9.10/dist/ext/sse.js">script>
    <title>Page | 原始 OpenAI 响应title>
head>
<body>
<h1>作为服务器推送事件的 OpenAI 响应h1>
<hr>
<div hx-ext="sse" sse-connect="/openaistream">
    <article class="message is-info">
        <div class="message-header">
            <p sse-swap="ResponseNoMass">信息p>
            <button class="delete" aria-label="delete">button>
        div>
    article>

    <div class="message-body" sse-swap="Response" hx-swap="innerHTML">
    div>
div>
body>
html>

HTMX

htmx 让你能够直接在 HTML 中使用属性访问 AJAX、CSS 过渡、WebSockets 和服务器推送事件,因此你可以利用超文本的简单性和强大功能构建现代用户界面。

代码

templates/layout.jinja2

<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">

    <script src="https://cdn.tailwindcss.com">script>

    <script src="https://unpkg.com/htmx.org@1.9.10">script>
    <script src="https://unpkg.com/htmx.org@1.9.10/dist/ext/sse.js">script>

    {% block head %}{% endblock %}
head>
<body hx-boost="true">
{% block body %}{% endblock %}
{% block scripts %}{% endblock %}
body>
html>

templates/partials/sse.jinja2

{% macro sse_stream(sse_config) %}
    <div id="{{ sse_config.listener }}" hx-ext="sse" sse-connect="{{ sse_config.path }}" sse-swap="{{ sse_config.topics | join(',') }}">
    div>
{% endmacro %}

templates/partials/streaming_chunk.jinja2

event: {{ event }}
data: <div {% for name, value in attrs.items() %} {{ name }}="{{ value }}" {% endfor %}>{{ chunk }}div>

templates/partials/ai_message.jinja2

{% macro ai_msg(message) %}
    
    <div class="flex gap-3 my-4 text-gray-600 text-sm flex-1">
        <span class="relative flex shrink-0 overflow-hidden rounded-full w-8 h-8">
            <div class="rounded-full bg-gray-100 border p-1">
                <svg stroke="none" fill="black" stroke-width="1.5"
                     viewBox="0 0 24 24" aria-hidden="true" height="20" width="20"
                     xmlns="http://www.w3.org/2000/svg">
                    <path stroke-linecap="round" stroke-linejoin="round"
                          d="M9.813 15.904L9 18.75l-.813-2.846a4.5 4.5 0 00-3.09-3.09L2.25 12l2.846-.813a4.5 4.5 0 003.09-3.09L9 5.25l.813 2.846a4.5 4.5 0 003.09 3.09L15.75 12l-2.846.813a4.5 4.5 0 00-3.09 3.09zM18.259 8.715L18 9.75l-.259-1.035a3.375 3.375 0 00-2.455-2.456L14.25 6l1.036-.259a3.375 3.375 0 002.455-2.456L18 2.25l.259 1.035a3.375 3.375 0 002.456 2.456L21.75 6l-1.035.259a3.375 3.375 0 00-2.456 2.456zM16.894 20.567L16.5 21.75l-.394-1.183a2.25 2.25 0 00-1.423-1.423L13.5 18.75l1.183-.394a2.25 2.25 0 001.423-1.423l.394-1.183.394 1.183a2.25 2.25 0 001.423 1.423l1.183.394-1.183.394a2.25 2.25 0 00-1.423 1.423z">
                    path>
                svg>
            div>
        span>
        <p class="leading-relaxed">
            <span class="block font-bold text-gray-700">AI span>
            {{ message }}
            <span id="Response">span>
        p>
    div>
{% endmacro %}

templates/partials/user_message.jinja2

{% macro user_msg(message) %}

<div class="flex gap-3 my-4 text-gray-600 text-sm flex-1"><span
    class="relative flex shrink-0 overflow-hidden rounded-full w-8 h-8">
    <div class="rounded-full bg-gray-100 border p-1"><svg stroke="none" fill="black" stroke-width="0"
        viewBox="0 0 16 16" height="20" width="20" xmlns="http://www.w3.org/2000/svg">
        <path
          d="M8 8a3 3 0 1 0 0-6 3 3 0 0 0 0 6Zm2-3a2 2 0 1 1-4 0 2 2 0 0 1 4 0Zm4 8c0 1-1 1-1 1H3s-1 0-1-1 1-4 6-4 6 3 6 4Zm-1-.004c-.001-.246-.154-.986-.832-1.664C11.516 10.68 10.289 10 8 10c-2.29 0-3.516.68-4.168 1.332-.678.678-.83 1.418-.832 1.664h10Z">
        path>
      svg>div>
  span>
  <p class="leading-relaxed">
      <span class="block font-bold text-gray-700">你 span>
      {{ message }}
  p>
div>
{% endmacro %}

templates/index.jinja2

{% extends "layout.jinja2" %}

{% from "partials/sse.jinja2" import sse_stream %}
{% from "partials/ai_message.jinja2" import ai_msg %}
{% from "partials/user_message.jinja2" import user_msg %}

{% block body %}
    
    <button class="fixed bottom-4 right-4 inline-flex items-center justify-center text-sm font-medium disabled:pointer-events-none disabled:opacity-50 border rounded-full w-16 h-16 bg-black hover:bg-gray-700 m-0 cursor-pointer border-gray-200 bg-none p-0 normal-case leading-5 hover:text-gray-900"
            type="button" aria-haspopup="dialog" aria-expanded="false" data-state="closed">
    <svg xmlns=" http://www.w3.org/2000/svg" width="30" height="40" viewBox="0 0 24 24" fill="none"
         stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"
         class="text-white block border-gray-200 align-middle">
        <path d="m3 21 1.9-5.7a8.5 8.5 0 1 1 3.8 3.8z" class="border-gray-200">
        path>
    svg>
  button>

  <div class="md:container md:mx-auto" style="box-shadow: 0 0 #0000, 0 0 #0000, 0 1px 2px 0 rgb(0 0 0 / 0.05);"
    class="fixed bottom-[calc(4rem+1.5rem)] right-40 mr-4 bg-white p-6 rounded-lg border border-[#e5e7eb] w-[3/4] h-[634px]">

    
    <div class="flex flex-col space-y-1.5 pb-6">
      <h2 class="font-semibold text-lg tracking-tight">自定义聊天机器人h2>
      <p class="text-sm text-[#6b7280] leading-3">由你的 OpenAI 密钥驱动p>
    div>

    
    <div class="pr-4 h-[474px]" style="min-width: 100%; display: table;">
      {% if sse_config %}
          {{ sse_stream(sse_config) }}
      {% endif %}
      {% for message in messages %}
          {% if message.sender == 'ai' %}
              {{ ai_msg(message.content) }}
          {% endif %}
          {% if message.sender == 'user' %}
              {{ user_msg(message.content) }}
          {% endif %}
      {% endfor %}
    div>

    
    <div class="flex items-center pt-0">
      <form
        action="{{ url_for('openai', req_id=req_id) }}" method="POST"
        class="flex items-center justify-center w-full space-x-2"
      >
        <input
          class="flex h-10 w-full rounded-md border border-[#e5e7eb] px-3 py-2 text-sm placeholder-[#6b7280] focus:outline-none focus:ring-2 focus:ring-[#9ca3af] disabled:cursor-not-allowed disabled:opacity-50 text-[#030712] focus-visible:ring-offset-2"
          type="text" name="user_prompt"
          placeholder="给 ChatGPT 发送消息..." value="">
        <input
          type="submit"
          class="inline-flex items-center justify-center rounded-md text-sm font-medium text-[#f9fafb] disabled:pointer-events-none disabled:opacity-50 bg-black hover:bg-[#111827E6] h-10 px-4 py-2"
          value="发送消息">
      form>
    div>

  div>

{% endblock %}

Markdown

Markdown 是一种轻量级标记语言,你可以使用它为纯文本文件添加格式元素。Markdown 由 John Gruber 于 2004 年创建,现在是世界上最流行的标记语言之一。

sequenceDiagram
    participant M as Markdown 文件
    participant P as Markdown 处理器
    participant H as HTML 文件
    participant B as Web 浏览器
    M->>P: 处理 Markdown
    P->>H: 生成 HTML
    H->>B: 在浏览器中渲染

此序列图说明了从 Markdown 文件经过处理到渲染的简单流程,如下所示:

  1. “Markdown 文件”作为输入。
  2. “Markdown 处理器”将 Markdown 转换为 HTML。
  3. 生成的“HTML 文件”随后就绪。
  4. 最后,“Web 浏览器”渲染 HTML 文件以向用户显示输出。

TailwindCSS

TailwindCSS 是一个实用优先的 CSS 框架,包含诸如 flex、**pt-4**、**text-center** 和 rotate-90 等类,可以直接在你的标记中组合这些类来构建任何设计。

SQLAlchemy

代码

[tool.poetry]
name = "custom-chatbot"
version = "0.1.0"
description = ""
authors = ["ChiefKemist"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.12"
Jinja2 = "^3.1.2"
fastapi = "^0.109.0"
python-multipart = "^0.0.6"
uvicorn = {extras = ["standard"], version = "^0.25.0"}
httpx = "^0.26.0"
openai = "^1.7.2"
sqlalchemy = "^2.0.25"
markdown = "^3.5.2"
pygments = "^2.17.2"
semantic-router = "^0.0.18"
rich = "^13.7.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.4"
black = "^23.12.1"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

CLI 聊天应用

聊天上下文

聊天记录持久化

代码

import logging
import sys
import hashlib

import openai

from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from getpass import getpass

from rich.console import Console

logging.basicConfig(
    stream=sys.stdout, level=logging.DEBUG,
    format='%(asctime)s %(levelname)s [%(module)s] %(message)s',
)
log = logging.getLogger(__name__)

console = Console()

# SQLAlchemy 设置
Base = declarative_base()


class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String, unique=True)
    password_hash = Column(String)


class Message(Base):
    __tablename__ = 'messages'
    id = Column(Integer, primary_key=True)
    chat_room = Column(String)
    sender = Column(String)
    message = Column(String)
    timestamp = Column(DateTime, default=datetime.now)


# SQLite 数据库连接
engine = create_engine('sqlite:///chat.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()


def hash_password(password):
    return hashlib.sha256(password.encode()).hexdigest()


def register_user():
    username = input("输入新用户名: ")
    if session.query(User).filter_by(username=username).first():
        console.log(
            "用户名已存在。请尝试不同的用户名。",
            style="bold red"
        )
        return None

    password = getpass("输入新密码: ")
    hashed_password = hash_password(password)
    new_user = User(username=username, password_hash=hashed_password)
    session.add(new_user)
    session.commit()
    return username


def login_user():
    username = input("输入用户名: ")
    password = getpass("输入密码: ")
    hashed_password = hash_password(password)

    user = session.query(User).filter_by(username=username, password_hash=hashed_password).first()
    if user:
        return username
    else:
        console.log("用户名或密码无效。", style="bold red")
        return None


def save_message(chat_room, sender, message):
    new_message = Message(chat_room=chat_room, sender=sender, message=message)
    session.add(new_message)
    session.commit()


def get_chat_history(chat_room):
    messages = session.query(Message).filter_by(chat_room=chat_room).order_by(Message.timestamp).all()
    return [f"{message.sender}: {message.message}" for message in messages]


def get_gpt4_response(prompt, chat_history):
    # openai.api_key = 'your-api-key'  # 替换为你的实际 OpenAI API 密钥

    combined_prompt = "\n".join(chat_history[-50:]) + f"\n{prompt}"  # 将历史记录限制为最后 50 条消息
    response = openai.OpenAI().chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "你是一个乐于助人的助手。"},
            {"role": "user", "content": combined_prompt}
        ]
    )
    return response.choices[0].message.content


def run_chat(username):
    chat_room = username
    console.log(
        f"欢迎使用你的个人 GPT-4 聊天 CLI, {username}。输入 'quit' 退出。",
        style="bold blue"
    )

    while True:
        user_input = input("你: ")
        if user_input.lower() == 'quit':
            break

        chat_context = get_chat_history(chat_room)
        gpt_response = get_gpt4_response(user_input, chat_context)

        save_message(chat_room, "你", user_input)
        save_message(chat_room, "GPT-4", gpt_response)

        console.log(f"GPT-4: {gpt_response}", style="bold green")

    console.log("聊天结束。", style="bold blue")


def main():
    log.info("欢迎使用聊天应用程序")
    choice = input("你想 [L]登录 还是 [R]注册? (L/R): ").lower()

    username = None
    while not username:
        if choice == 'r':
            username = register_user()
        elif choice == 'l':
            username = login_user()
        else:
            choice = input("请输入 'L' 登录或 'R' 注册: ").lower()

    if username:
        run_chat(username)


if __name__ == "__main__":
    main()

流式传输狗狗

描述

利用 Python 的 asyncio 和 **生成器**,以及 Htmx,将犬种流式传输到浏览器。 犬种由 dog.ceo 提供。

代码

#begin_src bash

curl https://dog.ceo/api/breeds/list/all

#+end_src

async_doggo.py

#!/usr/bin/env python3

import asyncio
import logging
import sys
import typing

import httpx
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from jinja2 import Environment, FileSystemLoader, select_autoescape

logging.basicConfig(
    stream=sys.stdout, level=logging.DEBUG,
    format='%(asctime)s %(levelname)s [%(module)s] %(message)s',
)
log = logging.getLogger(__name__)

app = FastAPI()

app.mount("/static", StaticFiles(directory="static"), name="static")


def app_context(request: Request) -> typing.Dict[str, typing.Any]:
    return {'app': request.app}


templates = Jinja2Templates(
    directory="templates",
    context_processors=[app_context],
)


@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
    return templates.TemplateResponse(
        request=request, name="doggo.jinja2",
    )


def render_sse_html_chunk(event, chunk, attrs=None):
    if attrs is None:
        attrs = {}
    tmpl = Environment(
        loader=FileSystemLoader('templates/partials'),
        autoescape=select_autoescape(['html'])
    ).select_template(['streaming_chunk.jinja2'])
    html_chunk = tmpl.render(**dict(event=event, chunk=chunk, attrs=attrs))
    return html_chunk


async def gen_dog_breeds():
    async with httpx.AsyncClient() as client:
        breeds = (await client.get('https://dog.ceo/api/breeds/list/all')).json()
        for breed in breeds['message'].keys():
            log.info(f"正在生成 {breed}")
            yield breed


@app.get("/dogstream", response_class=StreamingResponse)
async def dogstream(request: Request):
    async def dogbreeds_iter():
        async for breed in gen_dog_breeds():
            await asyncio.sleep(0.2)
            breed_status_chunk = render_sse_html_chunk(
                'DogBreedNoMass',
                '更多狗狗前辈 :-)',
                {
                    'id': 'DogBreedNoMass',
                    'hx-swap-oob': 'true',
                },
            )
            yield f'{breed_status_chunk}\n\n'.encode('utf-8')
            await asyncio.sleep(0.2)
            chunk = render_sse_html_chunk(
                'DogBreed',
                breed,
                {
                    'id': 'DogBreed',
                    'hx-swap-oob': 'true',
                },
            )
            yield f'{chunk}\n\n'.encode('utf-8')
        breed_status_chunk = render_sse_html_chunk(
            'DogBreedNoMass',
            '没有更多狗狗前辈了 :-(',
            {
                'id': 'DogBreedNoMass',
                'hx-swap-oob': 'true',
            },
        )
        yield f'{breed_status_chunk}\n\n'.encode('utf-8')

    return StreamingResponse(
        dogbreeds_iter(),
        media_type='text/event-stream',
    )


if __name__ == '__main__':
    import uvicorn

    uvicorn.run('async_doggo:app', host='0.0.0.0', port=6543, reload=True)

流式传输 OpenAI

原始 / 纯文本

更好看

富文本

async_chat.py

#!/usr/bin/env python3

import asyncio
import functools
import concurrent.futures
import logging
import re
import sys
import queue
import typing
import uuid
from io import StringIO

from time import sleep

import httpx
import markdown
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters.html import HtmlFormatter

from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from jinja2 import Environment, FileSystemLoader, select_autoescape

logging.basicConfig(
    stream=sys.stdout, level=logging.DEBUG,
    format='%(asctime)s %(levelname)s [%(module)s] %(message)s',
)
log = logging.getLogger(__name__)

app = FastAPI()

app.mount("/static", StaticFiles(directory="static"), name="static")

def app_context(request: Request) -> typing.Dict[str, typing.Any]:
    return {'app': request.app}

templates = Jinja2Templates(
    directory="templates",
    context_processors=[app_context],
)


@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
    req_id = str(uuid.uuid4())
    messages = [
        {'sender': 'ai', 'content': '你好,今天有什么可以帮你的吗?'},
        {'sender': 'user', 'content': 'faslskadalksjioqjeqlkj'},
        {'sender': 'ai',
         'content': '抱歉,我在文档中找不到关于该内容的任何信息。预计答案准确度较低。我在验证来源中找不到此问题的答案。'},
    ]
    return templates.TemplateResponse(
        request=request, name="index.jinja2",
        context=dict(messages=messages, req_id=req_id)
    )


reqs = {}


@app.post("/openai/{req_id}", response_class=HTMLResponse)
async def openai(request: Request, req_id: str, user_prompt: typing.Annotated[str, Form()]):

    log.debug(f'用户提示: {user_prompt}')

    reqs[req_id] = user_prompt

    messages = [
        {'sender': 'user', 'content': user_prompt},
        {'sender': 'ai', 'content': ''},
    ]

    sse_config = dict(
        listener='openai',
        path=f'/openaistream/{req_id}',
        topics=[
            'Response',
            'ResponseNoMass',
            'Terminate',
        ]
    )

    new_req_id = str(uuid.uuid4())

    return templates.TemplateResponse(
        request=request, name="index.jinja2",
        context=dict(
            messages=messages, req_id=new_req_id, sse_config=sse_config
        )
    )


def render_sse_html_chunk(event, chunk, attrs=None):
    if attrs is None:
        attrs = {}
    tmpl = Environment(
        loader=FileSystemLoader('templates/partials'),
        autoescape=select_autoescape(['html'])
    ).select_template(['streaming_chunk.jinja2'])
    html_chunk = tmpl.render(**dict(event=event, chunk=chunk, attrs=attrs))
    return html_chunk


def markdown_to_html_with_highlighting(source_markdown):
    md = markdown.Markdown(extensions=['fenced_code', 'codehilite'])
    html = md.convert(source_markdown)
    css = HtmlFormatter().get_style_defs('.codehilite')
    return f"{html.replace('\n', '
'
)}"
def markdown_to_html_with_inline_highlighting(source_markdown): formatter = HtmlFormatter(style='default', cssclass='', noclasses=True) def inline_highlight(match): language = match.group('lang') code = match.group('code') lexer = get_lexer_by_name(language, stripall=True) highlighted_code = highlight(code.replace('
'
, '\n'), lexer, formatter)
return highlighted_code.replace('\n', '
'
)
highlighted_markdown = re.sub( r'```(?P\w+)\s*(?P.*?)```', inline_highlight, source_markdown, flags=re.DOTALL ) html = markdown.markdown(highlighted_markdown) return html async def run_openai(req_id: str): from time import perf_counter from openai import AsyncOpenAI user_prompt = reqs[req_id] log.debug(f'用户提示: {user_prompt}') messages = [ {'role': 'system', 'content': '请仅使用 markdown 回复。'}, {'role': 'user', 'content': user_prompt}, ] chunks = await AsyncOpenAI().chat.completions.create( model='gpt-4', messages=messages, stream=True, ) last = None result_chunks = [] result_concat = StringIO() async for chunk in chunks: now = perf_counter() if last is not None: t = now - last else: t = 0 text = chunk.choices[0].delta.content if text is not None: result_chunks.append((t, text)) result_concat.write(f"{text}") mdText = markdown_to_html_with_inline_highlighting( result_concat.getvalue().replace('\n', "
"
)
) yield mdText else: log.debug('无文本,添加空格') last = now yield None # 全部完成 text = ''.join(text for _, text in result_chunks) @app.get("/openaistream/{req_id}", response_class=StreamingResponse) async def openaistream(request: Request, req_id: str): log.info(f"请求 ID: {req_id}") async def openai_iter(): async for resp in run_openai(req_id): if resp is None: chunk = render_sse_html_chunk( 'Terminate', '', { 'id': 'openai', 'hx-swap-oob': 'true', }, ) yield f'{chunk}\n\n'.encode('utf-8') break chunk = render_sse_html_chunk( 'Response', resp, { 'id': 'Response', 'hx-swap-oob': 'true', }, ) yield f'{chunk}\n\n'.encode('utf-8') chunk = render_sse_html_chunk( 'Terminate', '', { 'id': 'openai', 'hx-swap-oob': 'true', }, ) yield f'{chunk}\n\n'.encode('utf-8') return StreamingResponse( openai_iter(), media_type='text/event-stream', ) if __name__ == '__main__': import uvicorn uvicorn.run('async_chat:app', host='0.0.0.0', port=6543, reload=True)

带有助手的聊天 Web 应用

助手 API

持久化助手详情

助手数据

具有上下文和持久性的多线程聊天

语义路由器 (Semantic Router)

额外

部署

FastAPI

FastUI

参考资料