BroadcastChannel
https://channel.gandli.eu.org/
https://channel.gandli.eu.org/
import asyncio
import csv
import json
import time
from pathlib import Path
from typing import List, Dict
from tqdm import tqdm
from playwright.async_api import async_playwright, Page
import aiohttp
import argparse
# ---------------------------- 配置常量 ----------------------------
BASE_URL = "http://www.etmoc.com/firms/Brands" # 品牌列表页
OUTPUT_DIR = Path("etmoc_output") # 输出根目录
IMG_DIR = OUTPUT_DIR / "images" # 图片保存目录
CSV_FILE = OUTPUT_DIR / "brands.csv" # CSV 文件路径
JSON_FILE = OUTPUT_DIR / "brands.json" # JSON 文件路径
OUTPUT_DIR.mkdir(exist_ok=True)
IMG_DIR.mkdir(exist_ok=True)
# ---------------------------- 爬虫类 ----------------------------
class ETMOCScraper:
"""
ETMOC 烟草品牌爬虫
"""
def init(self, pages_limit: int = 1):
self.products: List[Dict] = []
self.pages_limit = pages_limit # 0 表示抓取全部页,1 表示默认1页
async def get_total_pages(self, page: Page) -> int:
"""获取品牌列表页的总页数"""
await page.goto(BASE_URL)
try:
await page.wait_for_selector(".pagination", timeout=5000)
pages = await page.query_selector_all(".pagination li a")
numbers = [int(await p.inner_text()) for p in pages if (await p.inner_text()).isdigit()]
total = max(numbers) if numbers else 1
except:
total = 1
# 如果设置了页数限制
return total if self.pages_limit == 0 else min(total, self.pages_limit)
async def scrape_page(self, page: Page, page_num: int) -> List[Dict]:
"""抓取单页品牌信息"""
url = f"{BASE_URL}?page={page_num}"
await page.goto(url)
await page.wait_for_selector(".product-list")
products = await page.query_selector_all(".product-list .item")
result = []
for product in products:
title_el = await product.query_selector("h3 a")
img_el = await product.query_selector("img")
if not title_el:
continue
title = (await title_el.inner_text()).strip()
href = await title_el.get_attribute("href")
img_url = await img_el.get_attribute("src") if img_el else ""
result.append({
"name": title,
"url": f"http://www.etmoc.com{href}",
"img_url": f"http://www.etmoc.com{img_url}" if img_url.startswith("/") else img_url
})
return result
async def scrape_details(self, page: Page, item: Dict) -> Dict:
"""抓取单个产品详情"""
await page.goto(item["url"])
try:
await page.wait_for_selector(".info", timeout=5000)
desc = await page.locator(".info").inner_text()
item["description"] = desc.strip()
except:
item["description"] = ""
return item
async def download_image(self, session: aiohttp.ClientSession, item: Dict):
"""下载单张图片"""
if not item.get("img_url"):
return
filename = IMG_DIR / f"{item['name'].replace('/', '_')}.jpg"
if filename.exists():
return
try:
async with session.get(item["img_url"]) as resp:
if resp.status == 200:
filename.write_bytes(await resp.read())
except:
pass
async def run_scrape(self):
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
page = await browser.new_page()
total_pages = await self.get_total_pages(page)
print(f"将抓取 {total_pages} 页")
for p in tqdm(range(1, total_pages + 1), desc="分页抓取", unit="页"):
self.products.extend(await self.scrape_page(page, p))
print(f"共发现 {len(self.products)} 个产品")
await self._scrape_details_with_progress(page)
await self._download_images_with_progress()
self._export_csv()
self._export_json()
await browser.close()
print(f"完成。导出至 {OUTPUT_DIR.resolve()}")
import csv
import json
import time
from pathlib import Path
from typing import List, Dict
from tqdm import tqdm
from playwright.async_api import async_playwright, Page
import aiohttp
import argparse
# ---------------------------- 配置常量 ----------------------------
BASE_URL = "http://www.etmoc.com/firms/Brands" # 品牌列表页
OUTPUT_DIR = Path("etmoc_output") # 输出根目录
IMG_DIR = OUTPUT_DIR / "images" # 图片保存目录
CSV_FILE = OUTPUT_DIR / "brands.csv" # CSV 文件路径
JSON_FILE = OUTPUT_DIR / "brands.json" # JSON 文件路径
OUTPUT_DIR.mkdir(exist_ok=True)
IMG_DIR.mkdir(exist_ok=True)
# ---------------------------- 爬虫类 ----------------------------
class ETMOCScraper:
"""
ETMOC 烟草品牌爬虫
"""
def init(self, pages_limit: int = 1):
self.products: List[Dict] = []
self.pages_limit = pages_limit # 0 表示抓取全部页,1 表示默认1页
async def get_total_pages(self, page: Page) -> int:
"""获取品牌列表页的总页数"""
await page.goto(BASE_URL)
try:
await page.wait_for_selector(".pagination", timeout=5000)
pages = await page.query_selector_all(".pagination li a")
numbers = [int(await p.inner_text()) for p in pages if (await p.inner_text()).isdigit()]
total = max(numbers) if numbers else 1
except:
total = 1
# 如果设置了页数限制
return total if self.pages_limit == 0 else min(total, self.pages_limit)
async def scrape_page(self, page: Page, page_num: int) -> List[Dict]:
"""抓取单页品牌信息"""
url = f"{BASE_URL}?page={page_num}"
await page.goto(url)
await page.wait_for_selector(".product-list")
products = await page.query_selector_all(".product-list .item")
result = []
for product in products:
title_el = await product.query_selector("h3 a")
img_el = await product.query_selector("img")
if not title_el:
continue
title = (await title_el.inner_text()).strip()
href = await title_el.get_attribute("href")
img_url = await img_el.get_attribute("src") if img_el else ""
result.append({
"name": title,
"url": f"http://www.etmoc.com{href}",
"img_url": f"http://www.etmoc.com{img_url}" if img_url.startswith("/") else img_url
})
return result
async def scrape_details(self, page: Page, item: Dict) -> Dict:
"""抓取单个产品详情"""
await page.goto(item["url"])
try:
await page.wait_for_selector(".info", timeout=5000)
desc = await page.locator(".info").inner_text()
item["description"] = desc.strip()
except:
item["description"] = ""
return item
async def download_image(self, session: aiohttp.ClientSession, item: Dict):
"""下载单张图片"""
if not item.get("img_url"):
return
filename = IMG_DIR / f"{item['name'].replace('/', '_')}.jpg"
if filename.exists():
return
try:
async with session.get(item["img_url"]) as resp:
if resp.status == 200:
filename.write_bytes(await resp.read())
except:
pass
async def run_scrape(self):
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
page = await browser.new_page()
total_pages = await self.get_total_pages(page)
print(f"将抓取 {total_pages} 页")
for p in tqdm(range(1, total_pages + 1), desc="分页抓取", unit="页"):
self.products.extend(await self.scrape_page(page, p))
print(f"共发现 {len(self.products)} 个产品")
await self._scrape_details_with_progress(page)
await self._download_images_with_progress()
self._export_csv()
self._export_json()
await browser.close()
print(f"完成。导出至 {OUTPUT_DIR.resolve()}")
为满足大规模GitHub star项目的自动化管理需求,现需开发一个基于GitHub Actions的周期处理系统:该系统每7天自动触发一次增量更新,每次仅处理最多50个新star项目(按star时间倒序获取),利用GitHub Models AI能力(严格遵守≤2并发限制)实现项目智能分类与摘要生成,通过SQLite数据库记录处理状态与最后时间戳确保连续性,最终将结构化结果(分类树形结构+项目摘要)自动更新至README.md实现知识沉淀,让超过 4000+项目能在约20个月内自然完成处理,形成免维护的长期自动化管理机制。
GitHub Stars智能管理自动化系统 PRD
一、文档信息
项目名称 GitHub Stars智能管理自动化系统
版本号 v1.0
提交日期 2025-08-18
二、背景与目标
问题陈述
- 用户拥有4000+ GitHub star项目需持续管理
- 手动分类/摘要耗时且难以维护
- AI API并发限制(≤2)导致批量处理困难
- GitHub Actions存在6小时执行超时限制
核心目标
构建自动化系统实现:
- 📌 免人工干预的增量式star项目管理
- 🤖 基于AI的智能分类/摘要生成
- ⚡️ 长期稳定的知识沉淀机制
- 🛡️ 严格的API调用约束合规
三、功能需求
1. 周期性增量处理引擎
功能点 详细要求
触发机制 每7天自动执行GitHub Action工作流
处理规模 单次最大50个新star项目
项目筛选 按star时间倒序获取最新项目
全量兼容 支持4000+项目跨周期自然完成处理
2. 智能AI处理模块
组件 技术规范
分类引擎 使用GitHub Models AI生成多级分类标签(如:
"Machine Learning/LLM")
摘要生成 生成80-100字中文技术摘要(保留关键技术关键词)
API限制 严格遵循≤2并发请求
重试机制 指数退避重试策略(最大3次)
3. 知识沉淀系统
输出项 规范
README.md 分板块自动更新:<br>- 按分类树形结构组织<br>- 包含项目名称/链接/AI摘要<br>- 保留历史处理记录
知识图谱 结构化存储路径:
"data/knowledge_base.json"
状态看板 README头部显示:<br>- 已处理总数/待处理数<br>- 下次处理时间
4. 状态管理系统
组件 功能描述
时间戳追踪 持久化存储最后处理时间戳(
"last_timestamp")
项目状态库 SQLite数据库记录:<br>-
"project_id"<br>-
"processing_status"<br>-
"category"<br>-
"last_updated"
断点续传 异常中断后自动恢复至最近检查点
四、技术架构
系统流程图
graph TD
A[定时触发] --> B[读取last_timestamp]
B --> C{新项目?}
C -->|无| D[更新状态看板]
C -->|有| E[获取新项目TOP50]
E --> F[AI处理池]
F --> G[分类&摘要生成]
G --> H[更新知识库]
H --> I[生成README]
I --> J[提交变更]
J --> K[更新last_timestamp]
数据流设计
sequenceDiagram
GitHub Action->>GitHub API: 请求star列表(时间过滤)
GitHub API-->>Processing Engine: JSON项目数据
Processing Engine->>AI Processor: 分发任务(≤2并发)
AI Processor-->>Processing Engine: {分类,摘要}
Processing Engine->>SQLite DB: 更新状态
Processing Engine->>Knowledge Base: 结构化存储
Processing Engine->>README.md: Markdown增量生成
五、验收标准
模块 验收指标
增量处理 1. 7天周期精准触发<br>2. 项目处理数≤50/次<br>3. 全量4000+项目20月内完成
AI处理 1. 严格≤2并发调用<br>2. 分类准确率≥85%<br>3. 摘要包含≥3个关键技术点
输出质量 1. README分类层级清晰<br>2. 历史记录永不丢失<br>3. 知识库JSON可被第三方解析
性能指标 1. 单次执行≤15分钟<br>2. 失败率≤5%<br>3. 自动恢复成功率100%
六、附录
风险控制矩阵
风险点 应对措施
API超限 令牌桶算法控制请求速率
处理超时 强制50项目上限+执行超时监控
分类漂移 人工反馈机制收集错误样本
数据丢失 双重备份(DB+JSON)+版本控制
输出示例
## 人工智能
### 自然语言处理
- [bert-base](https://github.com/google-research/bert)
» 摘要:Google开源的Transformer架构预训练模型,支持多语言...
### 计算机视觉
- [yolov7](https://github.com/WongKinYiu/yolov7)
» 摘要:实时目标检测SOTA模型,精度-速度平衡优化...
> 知识图谱状态:已处理 1250/4230 · 下次更新 2025-08-25
PRD批准
角色 签字 日期
产品经理 [签字区] 2025-08-18
技术负责人 [签字区] 2025-08-18
项目经理 [签字区] 2025-08-18
一、文档信息
项目名称 GitHub Stars智能管理自动化系统
版本号 v1.0
提交日期 2025-08-18
二、背景与目标
问题陈述
- 用户拥有4000+ GitHub star项目需持续管理
- 手动分类/摘要耗时且难以维护
- AI API并发限制(≤2)导致批量处理困难
- GitHub Actions存在6小时执行超时限制
核心目标
构建自动化系统实现:
- 📌 免人工干预的增量式star项目管理
- 🤖 基于AI的智能分类/摘要生成
- ⚡️ 长期稳定的知识沉淀机制
- 🛡️ 严格的API调用约束合规
三、功能需求
1. 周期性增量处理引擎
功能点 详细要求
触发机制 每7天自动执行GitHub Action工作流
处理规模 单次最大50个新star项目
项目筛选 按star时间倒序获取最新项目
全量兼容 支持4000+项目跨周期自然完成处理
2. 智能AI处理模块
组件 技术规范
分类引擎 使用GitHub Models AI生成多级分类标签(如:
"Machine Learning/LLM")
摘要生成 生成80-100字中文技术摘要(保留关键技术关键词)
API限制 严格遵循≤2并发请求
重试机制 指数退避重试策略(最大3次)
3. 知识沉淀系统
输出项 规范
README.md 分板块自动更新:<br>- 按分类树形结构组织<br>- 包含项目名称/链接/AI摘要<br>- 保留历史处理记录
知识图谱 结构化存储路径:
"data/knowledge_base.json"
状态看板 README头部显示:<br>- 已处理总数/待处理数<br>- 下次处理时间
4. 状态管理系统
组件 功能描述
时间戳追踪 持久化存储最后处理时间戳(
"last_timestamp")
项目状态库 SQLite数据库记录:<br>-
"project_id"<br>-
"processing_status"<br>-
"category"<br>-
"last_updated"
断点续传 异常中断后自动恢复至最近检查点
四、技术架构
系统流程图
graph TD
A[定时触发] --> B[读取last_timestamp]
B --> C{新项目?}
C -->|无| D[更新状态看板]
C -->|有| E[获取新项目TOP50]
E --> F[AI处理池]
F --> G[分类&摘要生成]
G --> H[更新知识库]
H --> I[生成README]
I --> J[提交变更]
J --> K[更新last_timestamp]
数据流设计
sequenceDiagram
GitHub Action->>GitHub API: 请求star列表(时间过滤)
GitHub API-->>Processing Engine: JSON项目数据
Processing Engine->>AI Processor: 分发任务(≤2并发)
AI Processor-->>Processing Engine: {分类,摘要}
Processing Engine->>SQLite DB: 更新状态
Processing Engine->>Knowledge Base: 结构化存储
Processing Engine->>README.md: Markdown增量生成
五、验收标准
模块 验收指标
增量处理 1. 7天周期精准触发<br>2. 项目处理数≤50/次<br>3. 全量4000+项目20月内完成
AI处理 1. 严格≤2并发调用<br>2. 分类准确率≥85%<br>3. 摘要包含≥3个关键技术点
输出质量 1. README分类层级清晰<br>2. 历史记录永不丢失<br>3. 知识库JSON可被第三方解析
性能指标 1. 单次执行≤15分钟<br>2. 失败率≤5%<br>3. 自动恢复成功率100%
六、附录
风险控制矩阵
风险点 应对措施
API超限 令牌桶算法控制请求速率
处理超时 强制50项目上限+执行超时监控
分类漂移 人工反馈机制收集错误样本
数据丢失 双重备份(DB+JSON)+版本控制
输出示例
## 人工智能
### 自然语言处理
- [bert-base](https://github.com/google-research/bert)
» 摘要:Google开源的Transformer架构预训练模型,支持多语言...
### 计算机视觉
- [yolov7](https://github.com/WongKinYiu/yolov7)
» 摘要:实时目标检测SOTA模型,精度-速度平衡优化...
> 知识图谱状态:已处理 1250/4230 · 下次更新 2025-08-25
PRD批准
角色 签字 日期
产品经理 [签字区] 2025-08-18
技术负责人 [签字区] 2025-08-18
项目经理 [签字区] 2025-08-18
完整需求文档
1. 核心目标
构建基于GitHub Actions的自动化系统,通过周期性的增量处理实现:
- ⭐️ GitHub star项目的智能分类与摘要生成
- 📚 自动化知识沉淀至README.md
- ⚡️ 长期免维护的项目管理
2. 关键需求
2.1 处理周期
- 🕒 频率:每7天自动执行一次
- ⏱ 单次限额:最多处理50个新star项目
2.2 数据处理
- 🔍 增量获取:
- 仅扫描自上次处理后的新增项目
- 按star时间倒序获取最新项目
- 🤖 AI处理:
- 使用GitHub Models AI能力
- 严格遵循并发限制(≤2请求)
- 输出结构化数据(分类标签+摘要文本)
2.3 存储与状态
- 🗃 状态追踪:
- 轻量SQLite数据库记录处理状态
- 持久化存储上次处理的截止时间戳
- 📌 断点续传:
- 意外中断后从断点自动恢复
- 避免重复处理
2.4 输出要求
- 📄 README.md自动更新:
- 按分类生成分级标题(如:
"## Machine Learning")
- 结构化展示项目(名称+链接+AI摘要)
- 保留历史数据形成知识库
2.5 限制与优化
- ⛔ API约束:
- 处理中断时自动重试(指数退避)
- 失败项目记录到待重试队列
- 🚀 性能保障:
- 单次执行时间<15分钟(50个项目)
- 无状态设计降低资源消耗
3. 系统特性
特性 实现方案
零初始化 首次运行自动处理最新50个项目
弹性扩展 项目量激增时自动分周期消化
跨周期完成 4000+项目通过80周自然处理
全自动化 无需人工触发/配置更新
4. 技术规范
graph LR
A[GitHub Action定时触发] --> B[读取上次时间戳]
B --> C{有新项目?}
C -->|是| D[获取最新50个star]
C -->|否| E[跳过执行]
D --> F[AI并发处理≤2]
F --> G[更新SQLite状态]
G --> H[生成README片段]
H --> I[更新主README]
I --> J[提交新时间戳]
5. 输出示例
## 🌐 Web Development
- [next.js](https://github.com/vercel/next.js)
» 摘要:React框架,支持服务端渲染和静态导出...
## 🤖 Machine Learning
- [transformers](https://github.com/huggingface/transformers)
» 摘要:SOTA自然语言处理库,提供预训练模型...
6. 验收标准
- ✅ 每7天新增项目自动出现在README对应分类
- ✅ README保留历史所有已处理项目
- ✅ SQLite数据库时间戳与Git提交记录一致
- ✅ 4000+项目在20个月内完成全部处理
交付物:包含完整工作流配置的GitHub仓库,实现开箱即用的自动化管理能力,形成持续进化的项目知识图谱。
1. 核心目标
构建基于GitHub Actions的自动化系统,通过周期性的增量处理实现:
- ⭐️ GitHub star项目的智能分类与摘要生成
- 📚 自动化知识沉淀至README.md
- ⚡️ 长期免维护的项目管理
2. 关键需求
2.1 处理周期
- 🕒 频率:每7天自动执行一次
- ⏱ 单次限额:最多处理50个新star项目
2.2 数据处理
- 🔍 增量获取:
- 仅扫描自上次处理后的新增项目
- 按star时间倒序获取最新项目
- 🤖 AI处理:
- 使用GitHub Models AI能力
- 严格遵循并发限制(≤2请求)
- 输出结构化数据(分类标签+摘要文本)
2.3 存储与状态
- 🗃 状态追踪:
- 轻量SQLite数据库记录处理状态
- 持久化存储上次处理的截止时间戳
- 📌 断点续传:
- 意外中断后从断点自动恢复
- 避免重复处理
2.4 输出要求
- 📄 README.md自动更新:
- 按分类生成分级标题(如:
"## Machine Learning")
- 结构化展示项目(名称+链接+AI摘要)
- 保留历史数据形成知识库
2.5 限制与优化
- ⛔ API约束:
- 处理中断时自动重试(指数退避)
- 失败项目记录到待重试队列
- 🚀 性能保障:
- 单次执行时间<15分钟(50个项目)
- 无状态设计降低资源消耗
3. 系统特性
特性 实现方案
零初始化 首次运行自动处理最新50个项目
弹性扩展 项目量激增时自动分周期消化
跨周期完成 4000+项目通过80周自然处理
全自动化 无需人工触发/配置更新
4. 技术规范
graph LR
A[GitHub Action定时触发] --> B[读取上次时间戳]
B --> C{有新项目?}
C -->|是| D[获取最新50个star]
C -->|否| E[跳过执行]
D --> F[AI并发处理≤2]
F --> G[更新SQLite状态]
G --> H[生成README片段]
H --> I[更新主README]
I --> J[提交新时间戳]
5. 输出示例
## 🌐 Web Development
- [next.js](https://github.com/vercel/next.js)
» 摘要:React框架,支持服务端渲染和静态导出...
## 🤖 Machine Learning
- [transformers](https://github.com/huggingface/transformers)
» 摘要:SOTA自然语言处理库,提供预训练模型...
6. 验收标准
- ✅ 每7天新增项目自动出现在README对应分类
- ✅ README保留历史所有已处理项目
- ✅ SQLite数据库时间戳与Git提交记录一致
- ✅ 4000+项目在20个月内完成全部处理
交付物:包含完整工作流配置的GitHub仓库,实现开箱即用的自动化管理能力,形成持续进化的项目知识图谱。