diff --git a/apps/rssPush.js b/apps/rssPush.js index 71043d7..edfab1e 100644 --- a/apps/rssPush.js +++ b/apps/rssPush.js @@ -37,7 +37,10 @@ export default class RssPlugin extends plugin { }, ], }); - schedule.scheduleJob('*/10 * * * *', () => this.pushFeeds()); + if (!global.__rss_job_scheduled) { + schedule.scheduleJob('*/10 * * * *', () => this.pushFeeds()); + global.__rss_job_scheduled = true; + } } /** @@ -128,10 +131,12 @@ export default class RssPlugin extends plugin { for (const feed of feeds) { const latest = await rssTools.fetchFeed(feed.url); if (!latest || !latest.length) continue; - - const lastLink = await rssCache.get(feed.url); - const newItems = lastLink ? latest.filter((i) => i.link !== lastLink) : latest; - + const newItems = []; + for (const item of latest) { + if (!(await rssCache.has(feed.url, item.link))) { + newItems.push(item); + } + } if (newItems.length) { await rssCache.set(feed.url, newItems[0].link); diff --git a/lib/rss/rssCache.js b/lib/rss/rssCache.js index d5b045c..04ebfa1 100644 --- a/lib/rss/rssCache.js +++ b/lib/rss/rssCache.js @@ -2,9 +2,11 @@ import crypto from 'crypto'; import paths from '../../constants/path.js'; import path from 'path'; import fs from 'fs'; +import configControl from '../config/configControl.js'; const redis = global.redis; const cachePath = path.join(paths.rssCache, 'rss_cache.json'); +const MAX_CACHE = configControl.get('maxFeed'); const rssCache = { /** @@ -22,34 +24,56 @@ const rssCache = { }, /** - * 从redis中获取数据 + * 获取已缓存的 link 数组 * @param url - * @returns {Promise<*>} + * @returns {Promise} */ async get(url) { const key = this.urlToKey(url); - return await redis.get(key); + const raw = await redis.get(key); + if (!raw) return []; + try { + const parsed = JSON.parse(raw); + if (Array.isArray(parsed)) return parsed; + return [parsed]; // 向后兼容老数据 + } catch { + return [raw]; + } }, /** - * 保存数据至redis和本地 + * 判断某条是否已缓存 + * @param url + * @param link + * @returns {Promise} + */ + async has(url, link) { + const cached = await this.get(url); + return cached.includes(link); + }, + + /** + * 添加新 link 到缓存 * @param url * @param latestLink * @returns {Promise} */ async set(url, latestLink) { const key = this.urlToKey(url); - await redis.set(key, latestLink); - await this.saveToLocal(url, latestLink); + let cached = await this.get(url); + cached = [latestLink, ...cached.filter((l) => l !== latestLink)].slice(0, MAX_CACHE); + + await redis.set(key, JSON.stringify(cached)); + await this.saveToLocal(url, cached); }, /** - * 保存至本地 + * 保存到本地文件 * @param url - * @param latestLink + * @param latestLinks * @returns {Promise} */ - async saveToLocal(url, latestLink) { + async saveToLocal(url, latestLinks) { let localData = {}; try { if (fs.existsSync(cachePath)) { @@ -58,22 +82,28 @@ const rssCache = { } catch (err) { logger.error(`本地rss缓存读取失败..`, err); } - localData[url] = latestLink; + localData[url] = latestLinks; fs.writeFileSync(cachePath, JSON.stringify(localData, null, 2), 'utf-8'); }, /** - * 从本地加载数据至redis + * 从本地缓存加载到 redis * @returns {Promise} */ async loadLocalToRedis() { if (!fs.existsSync(cachePath)) return; - const data = JSON.parse(fs.readFileSync(cachePath, 'utf-8')); - for (const [url, link] of Object.entries(data)) { - const key = this.urlToKey(url); - await redis.set(key, link); + + try { + const data = JSON.parse(fs.readFileSync(cachePath, 'utf-8')); + for (const [url, value] of Object.entries(data)) { + const key = this.urlToKey(url); + const safeArray = Array.isArray(value) ? value : [value]; + await redis.set(key, JSON.stringify(safeArray)); + } + logger.info(`[RSS] 本地缓存已加载至 Redis`); + } catch (err) { + logger.error(`[RSS] 加载本地缓存失败`, err); } - logger.info(`[RSS]本地缓存已加载至redis..`); }, };