This commit is contained in:
talorr
2026-03-27 03:36:08 +03:00
parent 8a97ce6d54
commit cda36918e8
225 changed files with 35641 additions and 0 deletions

99
backend/src/app.ts Normal file
View File

@@ -0,0 +1,99 @@
import cookieParser from "cookie-parser";
import cors from "cors";
import express from "express";
import rateLimit from "express-rate-limit";
import helmet from "helmet";
import morgan from "morgan";
import { corsOrigins } from "./config/env.js";
import { isMailConfigured, verifyMailTransport } from "./lib/mail.js";
import { errorHandler } from "./middleware/error-handler.js";
import { appVersionRouter } from "./modules/app-version/app-version.routes.js";
import { adminRouter } from "./modules/admin/admin.routes.js";
import { authRouter } from "./modules/auth/auth.routes.js";
import { internalRouter } from "./modules/internal/internal.routes.js";
import { pushRouter } from "./modules/push/push.routes.js";
import { rssRouter } from "./modules/rss/rss.routes.js";
import { signalsRouter } from "./modules/signals/signals.routes.js";
import { usersRouter } from "./modules/users/users.routes.js";
export const app = express();
if (isMailConfigured()) {
void verifyMailTransport()
.then(() => {
console.info("SMTP transport verified");
})
.catch((error) => {
console.error("SMTP transport verification failed", error instanceof Error ? error.message : String(error));
});
} else {
console.warn("SMTP is not fully configured. Password reset emails are disabled.");
}
app.set("trust proxy", 1);
app.use(helmet());
app.use(
cors({
origin(origin, callback) {
if (!origin || corsOrigins.includes(origin)) {
callback(null, true);
return;
}
callback(new Error(`CORS blocked for origin: ${origin}`));
},
credentials: true
})
);
const apiLimiter = rateLimit({
windowMs: 15 * 60 * 1000,
max: 300,
standardHeaders: true,
legacyHeaders: false
});
const authLimiter = rateLimit({
windowMs: 15 * 60 * 1000,
max: 10,
standardHeaders: true,
legacyHeaders: false,
message: { message: "Too many authentication requests. Try again later." }
});
const publicWriteLimiter = rateLimit({
windowMs: 15 * 60 * 1000,
max: 60,
standardHeaders: true,
legacyHeaders: false,
message: { message: "Too many requests. Try again later." }
});
const internalLimiter = rateLimit({
windowMs: 60 * 1000,
max: 30,
standardHeaders: true,
legacyHeaders: false,
message: { message: "Too many internal requests. Try again later." }
});
app.use(apiLimiter);
app.use(express.json({ limit: "1mb" }));
app.use(cookieParser());
app.use(morgan("dev"));
app.use("/auth/login", authLimiter);
app.use("/auth/register", authLimiter);
app.use("/auth/forgot-password", authLimiter);
app.use("/auth/reset-password", authLimiter);
app.use("/public", publicWriteLimiter);
app.use("/internal", internalLimiter);
app.get("/health", (_req, res) => {
res.json({ ok: true });
});
app.use("/", appVersionRouter);
app.use("/auth", authRouter);
app.use("/signals", signalsRouter);
app.use("/", rssRouter);
app.use("/", pushRouter);
app.use("/", usersRouter);
app.use("/internal", internalRouter);
app.use("/admin", adminRouter);
app.use(errorHandler);

210
backend/src/config/env.ts Normal file
View File

@@ -0,0 +1,210 @@
import dotenv from "dotenv";
import fs from "node:fs";
import path from "node:path";
import webpush from "web-push";
import { z } from "zod";
dotenv.config();
const isMissingOrPlaceholder = (value: string | undefined) =>
!value || value.trim().length === 0 || value.startsWith("replace_");
const isValidVapidConfig = (publicKey: string | undefined, privateKey: string | undefined, subject: string | undefined) => {
if (isMissingOrPlaceholder(publicKey) || isMissingOrPlaceholder(privateKey)) {
return false;
}
const resolvedSubject = isMissingOrPlaceholder(subject) ? "mailto:admin@example.com" : subject!;
try {
webpush.setVapidDetails(resolvedSubject, publicKey!, privateKey!);
return true;
} catch {
return false;
}
};
const parseEnvFile = (content: string) => {
const entries = new Map<string, string>();
for (const line of content.split(/\r?\n/)) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith("#")) continue;
const separatorIndex = trimmed.indexOf("=");
if (separatorIndex === -1) continue;
const key = trimmed.slice(0, separatorIndex).trim();
const value = trimmed.slice(separatorIndex + 1).trim();
if (key) entries.set(key, value);
}
return entries;
};
const ensureVapidConfig = () => {
const stateDir = process.env.VAPID_STATE_DIR?.trim() || path.join(process.cwd(), ".runtime");
const stateFile = path.join(stateDir, "vapid.env");
const currentPublic = process.env.VAPID_PUBLIC_KEY;
const currentPrivate = process.env.VAPID_PRIVATE_KEY;
const currentSubject = process.env.VAPID_SUBJECT;
const readState = () => {
try {
if (!fs.existsSync(stateFile)) return null;
return parseEnvFile(fs.readFileSync(stateFile, "utf8"));
} catch {
return null;
}
};
const saveState = (publicKey: string, privateKey: string, subject: string) => {
try {
fs.mkdirSync(stateDir, { recursive: true });
const payload = [
`VAPID_PUBLIC_KEY=${publicKey}`,
`VAPID_PRIVATE_KEY=${privateKey}`,
`VAPID_SUBJECT=${subject}`
].join("\n");
fs.writeFileSync(stateFile, `${payload}\n`, "utf8");
} catch {
// Ignore write errors and continue with in-memory env values.
}
};
if (isValidVapidConfig(currentPublic, currentPrivate, currentSubject)) {
const subject = isMissingOrPlaceholder(currentSubject) ? "mailto:admin@example.com" : currentSubject!;
if (subject !== currentSubject) {
process.env.VAPID_SUBJECT = subject;
}
saveState(currentPublic!, currentPrivate!, subject);
return;
}
const state = readState();
const statePublic = state?.get("VAPID_PUBLIC_KEY");
const statePrivate = state?.get("VAPID_PRIVATE_KEY");
const stateSubject = state?.get("VAPID_SUBJECT");
if (isValidVapidConfig(statePublic, statePrivate, stateSubject || currentSubject)) {
process.env.VAPID_PUBLIC_KEY = statePublic;
process.env.VAPID_PRIVATE_KEY = statePrivate;
process.env.VAPID_SUBJECT =
!isMissingOrPlaceholder(currentSubject) ? currentSubject : stateSubject || "mailto:admin@example.com";
return;
}
const generated = webpush.generateVAPIDKeys();
const subject =
!isMissingOrPlaceholder(currentSubject) && currentSubject ? currentSubject : "mailto:admin@example.com";
process.env.VAPID_PUBLIC_KEY = generated.publicKey;
process.env.VAPID_PRIVATE_KEY = generated.privateKey;
process.env.VAPID_SUBJECT = subject;
saveState(generated.publicKey, generated.privateKey, subject);
console.warn(`Generated new VAPID keys and saved them to ${stateFile}`);
};
ensureVapidConfig();
const stringWithDefault = (fallback: string) =>
z.preprocess((value) => {
if (typeof value !== "string") return value;
const trimmed = value.trim();
return trimmed.length === 0 ? undefined : trimmed;
}, z.string().default(fallback));
const trimmedString = () =>
z.preprocess((value) => {
if (typeof value !== "string") return value;
return value.trim();
}, z.string());
const buildCorsOrigins = (rawCorsOrigin: string, appPublicUrl: string) => {
const defaults = [
appPublicUrl,
"https://antigol.ru",
"http://localhost:3000",
"http://localhost:5173",
"http://localhost",
"https://localhost"
];
const configured = rawCorsOrigin
.split(",")
.map((origin) => origin.trim())
.filter(Boolean);
return Array.from(new Set([...configured, ...defaults]));
};
const parseBotNameAliasesObject = (parsed: unknown) => {
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return new Map<string, string>();
}
return new Map(
Object.entries(parsed)
.filter((entry): entry is [string, string] => {
const [key, value] = entry;
return key.trim().length > 0 && typeof value === "string" && value.trim().length > 0;
})
.map(([key, value]) => [key.trim(), value.trim()])
);
};
const loadBotNameAliases = (aliasesFilePath: string) => {
try {
const content = fs.readFileSync(aliasesFilePath, "utf8");
return parseBotNameAliasesObject(JSON.parse(content) as unknown);
} catch (error) {
const nodeError = error as NodeJS.ErrnoException;
if (nodeError.code !== "ENOENT") {
console.warn(`Failed to load bot aliases from ${aliasesFilePath}: ${nodeError.message}`);
}
return new Map<string, string>();
}
};
const envSchema = z.object({
PORT: z.coerce.number().default(4000),
REDIS_URL: stringWithDefault("redis://127.0.0.1:6379"),
DATABASE_URL: trimmedString().pipe(z.string().min(1)),
JWT_SECRET: trimmedString().pipe(z.string().min(8)),
JWT_EXPIRES_IN: stringWithDefault("7d"),
CORS_ORIGIN: stringWithDefault(""),
APP_PUBLIC_URL: stringWithDefault("https://antigol.ru").pipe(z.string().url()),
SMTP_HOST: stringWithDefault(""),
SMTP_PORT: z.coerce.number().default(587),
SMTP_SECURE: z
.union([z.boolean(), z.string()])
.transform((value) => {
if (typeof value === "boolean") return value;
const normalized = value.trim().toLowerCase();
return normalized === "true" || normalized === "1" || normalized === "yes";
})
.default(false),
SMTP_USER: stringWithDefault(""),
SMTP_PASSWORD: stringWithDefault(""),
SMTP_FROM_EMAIL: stringWithDefault(""),
SMTP_FROM_NAME: stringWithDefault("Alpinbet"),
PASSWORD_RESET_TTL_MINUTES: z.coerce.number().default(60),
APP_LATEST_VERSION: stringWithDefault(""),
APP_MIN_SUPPORTED_VERSION: stringWithDefault(""),
APP_UPDATE_URL: stringWithDefault(""),
BOT_NAME_ALIASES_FILE: stringWithDefault("./config/bot-name-aliases.json"),
APP_UPDATE_MESSAGE: z.string().default("Доступна новая версия приложения"),
VAPID_PUBLIC_KEY: stringWithDefault(""),
VAPID_PRIVATE_KEY: stringWithDefault(""),
VAPID_SUBJECT: stringWithDefault("mailto:admin@example.com"),
FIREBASE_PROJECT_ID: stringWithDefault(""),
FIREBASE_CLIENT_EMAIL: stringWithDefault(""),
FIREBASE_PRIVATE_KEY: stringWithDefault(""),
FIREBASE_SERVICE_ACCOUNT_JSON: stringWithDefault(""),
SETTLEMENT_INTERVAL_MS: z.coerce.number().default(60000),
SIGNALS_WORKER_CONCURRENCY: z.coerce.number().default(1),
PUSH_WORKER_CONCURRENCY: z.coerce.number().default(1),
PARSER_INTERNAL_SECRET: trimmedString().pipe(z.string().min(16))
});
export const env = envSchema.parse(process.env);
export const corsOrigins = buildCorsOrigins(env.CORS_ORIGIN, env.APP_PUBLIC_URL);
export const botNameAliases = loadBotNameAliases(path.resolve(process.cwd(), env.BOT_NAME_ALIASES_FILE));

4
backend/src/db/prisma.ts Normal file
View File

@@ -0,0 +1,4 @@
import { PrismaClient } from "@prisma/client";
export const prisma = new PrismaClient();

22
backend/src/index.ts Normal file
View File

@@ -0,0 +1,22 @@
import { env } from "./config/env.js";
import { prisma } from "./db/prisma.js";
import { closeRedisConnection } from "./lib/redis.js";
import { app } from "./app.js";
import { startSettlementWorker } from "./modules/workers/settlement.worker.js";
const server = app.listen(env.PORT, () => {
console.log(`API запущен на порту ${env.PORT}`);
});
const interval = startSettlementWorker();
async function shutdown() {
clearInterval(interval);
server.close();
await closeRedisConnection();
await prisma.$disconnect();
process.exit(0);
}
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);

26
backend/src/lib/auth.ts Normal file
View File

@@ -0,0 +1,26 @@
import bcrypt from "bcryptjs";
import jwt, { type SignOptions } from "jsonwebtoken";
import { env } from "../config/env.js";
type JwtPayload = {
userId: string;
role: "admin" | "user";
};
export async function hashPassword(password: string) {
return bcrypt.hash(password, 10);
}
export async function verifyPassword(password: string, passwordHash: string) {
return bcrypt.compare(password, passwordHash);
}
export function signToken(payload: JwtPayload) {
return jwt.sign(payload, env.JWT_SECRET, {
expiresIn: env.JWT_EXPIRES_IN as SignOptions["expiresIn"]
});
}
export function verifyToken(token: string) {
return jwt.verify(token, env.JWT_SECRET) as JwtPayload;
}

16
backend/src/lib/dedupe.ts Normal file
View File

@@ -0,0 +1,16 @@
export function createSignalDedupeKey(input: {
providerId?: string | null;
eventId: string;
marketType: string;
selection: string;
lineValue?: number | null;
}) {
return [
input.providerId ?? "manual",
input.eventId.trim().toLowerCase(),
input.marketType.trim().toLowerCase(),
input.selection.trim().toLowerCase(),
input.lineValue ?? "na"
].join(":");
}

View File

@@ -0,0 +1,9 @@
export class HttpError extends Error {
statusCode: number;
constructor(statusCode: number, message: string) {
super(message);
this.statusCode = statusCode;
}
}

86
backend/src/lib/mail.ts Normal file
View File

@@ -0,0 +1,86 @@
import nodemailer from "nodemailer";
import { env } from "../config/env.js";
let transporterPromise: Promise<nodemailer.Transporter> | null = null;
let verifyPromise: Promise<void> | null = null;
export function isMailConfigured() {
return Boolean(
env.SMTP_HOST.trim() &&
env.SMTP_PORT > 0 &&
env.SMTP_USER.trim() &&
env.SMTP_PASSWORD.trim() &&
env.SMTP_FROM_EMAIL.trim()
);
}
async function getTransporter() {
if (!isMailConfigured()) {
throw new Error("SMTP is not configured");
}
if (!transporterPromise) {
transporterPromise = Promise.resolve(
nodemailer.createTransport({
host: env.SMTP_HOST,
port: env.SMTP_PORT,
secure: env.SMTP_SECURE,
auth: {
user: env.SMTP_USER,
pass: env.SMTP_PASSWORD
}
})
);
}
return transporterPromise;
}
export async function verifyMailTransport() {
if (!isMailConfigured()) {
throw new Error("SMTP is not configured");
}
if (!verifyPromise) {
verifyPromise = getTransporter().then(async (transporter) => {
await transporter.verify();
});
}
return verifyPromise;
}
export async function sendMail(options: {
to: string;
subject: string;
html: string;
text: string;
}) {
const transporter = await getTransporter();
const from = env.SMTP_FROM_NAME.trim()
? `"${env.SMTP_FROM_NAME.replace(/"/g, '\\"')}" <${env.SMTP_FROM_EMAIL}>`
: env.SMTP_FROM_EMAIL;
try {
await transporter.sendMail({
from,
to: options.to,
subject: options.subject,
html: options.html,
text: options.text
});
} catch (error) {
console.error("SMTP send failed", {
host: env.SMTP_HOST,
port: env.SMTP_PORT,
secure: env.SMTP_SECURE,
user: env.SMTP_USER,
from: env.SMTP_FROM_EMAIL,
to: options.to,
subject: options.subject,
error: error instanceof Error ? error.message : String(error)
});
throw error;
}
}

View File

@@ -0,0 +1,87 @@
import { cert, getApp, getApps, initializeApp } from "firebase-admin/app";
import { getMessaging } from "firebase-admin/messaging";
import { env } from "../config/env.js";
type NativePushResult =
| { ok: true; messageId: string }
| { ok: false; reason: string; code?: string };
function getFirebaseApp() {
if (
env.FIREBASE_SERVICE_ACCOUNT_JSON.trim().length === 0 &&
(env.FIREBASE_PROJECT_ID.trim().length === 0 ||
env.FIREBASE_CLIENT_EMAIL.trim().length === 0 ||
env.FIREBASE_PRIVATE_KEY.trim().length === 0)
) {
return null;
}
if (getApps().length > 0) {
return getApp();
}
const credentials =
env.FIREBASE_SERVICE_ACCOUNT_JSON.trim().length > 0
? JSON.parse(env.FIREBASE_SERVICE_ACCOUNT_JSON)
: {
projectId: env.FIREBASE_PROJECT_ID,
clientEmail: env.FIREBASE_CLIENT_EMAIL,
privateKey: env.FIREBASE_PRIVATE_KEY.replace(/\\n/g, "\n")
};
return initializeApp({
credential: cert(credentials)
});
}
export function isNativePushConfigured() {
return getFirebaseApp() !== null;
}
export async function sendNativePush(
token: string,
payload: {
title: string;
body: string;
data?: Record<string, string>;
}
): Promise<NativePushResult> {
const app = getFirebaseApp();
if (!app) {
return { ok: false, reason: "Firebase is not configured", code: "messaging/not-configured" };
}
try {
const messageId = await getMessaging(app).send({
token,
notification: {
title: payload.title,
body: payload.body
},
data: payload.data,
android: {
priority: "high",
notification: {
channelId: "signals",
clickAction: "OPEN_SIGNAL"
}
},
apns: {
payload: {
aps: {
sound: "default"
}
}
}
});
return { ok: true, messageId };
} catch (error) {
const details = error as { code?: string; message?: string };
return {
ok: false,
reason: details.message || "Native push delivery failed",
code: details.code
};
}
}

56
backend/src/lib/push.ts Normal file
View File

@@ -0,0 +1,56 @@
import webpush from "web-push";
import { env } from "../config/env.js";
type PushErrorLike = Error & {
statusCode?: number;
body?: unknown;
};
let pushConfigured = false;
try {
webpush.setVapidDetails(env.VAPID_SUBJECT, env.VAPID_PUBLIC_KEY, env.VAPID_PRIVATE_KEY);
pushConfigured = true;
} catch (error) {
console.warn(
"Web Push disabled: failed to initialize VAPID configuration",
error instanceof Error ? error.message : String(error)
);
}
export function getVapidPublicKey() {
return env.VAPID_PUBLIC_KEY;
}
export async function sendWebPush(
subscription: { endpoint: string; p256dh: string; auth: string },
payload: Record<string, unknown>
) {
if (!pushConfigured) {
return { ok: false, reason: "VAPID not configured" } as const;
}
try {
await webpush.sendNotification(
{
endpoint: subscription.endpoint,
keys: {
p256dh: subscription.p256dh,
auth: subscription.auth
}
},
JSON.stringify(payload)
);
return { ok: true, statusCode: 201 as const } as const;
} catch (error) {
const pushError = error as PushErrorLike;
return {
ok: false,
reason: error instanceof Error ? error.message : "Push error",
statusCode: pushError?.statusCode,
details: typeof pushError?.body === "string" ? pushError.body : undefined
} as const;
}
}

10
backend/src/lib/redis.ts Normal file
View File

@@ -0,0 +1,10 @@
import { Redis } from "ioredis";
import { env } from "../config/env.js";
export const redisConnection = new Redis(env.REDIS_URL, {
maxRetriesPerRequest: null
});
export async function closeRedisConnection() {
await redisConnection.quit();
}

View File

@@ -0,0 +1,52 @@
import { Signal, SignalStatus } from "@prisma/client";
type EventScore = {
homeScore: number;
awayScore: number;
};
function evaluate1X2(selection: string, score: EventScore): SignalStatus {
const normalized = selection.trim().toUpperCase();
if (score.homeScore === score.awayScore) return normalized === "X" ? SignalStatus.win : SignalStatus.lose;
const homeWon = score.homeScore > score.awayScore;
if (normalized === "1") return homeWon ? SignalStatus.win : SignalStatus.lose;
if (normalized === "2") return homeWon ? SignalStatus.lose : SignalStatus.win;
return SignalStatus.manual_review;
}
function evaluateTotal(selection: string, lineValue: number | null, score: EventScore): SignalStatus {
if (lineValue === null) return SignalStatus.manual_review;
const total = score.homeScore + score.awayScore;
const normalized = selection.trim().toLowerCase();
if (total === lineValue) return SignalStatus.void;
if (normalized.includes("over")) return total > lineValue ? SignalStatus.win : SignalStatus.lose;
if (normalized.includes("under")) return total < lineValue ? SignalStatus.win : SignalStatus.lose;
return SignalStatus.manual_review;
}
function evaluateBtts(selection: string, score: EventScore): SignalStatus {
const bothScored = score.homeScore > 0 && score.awayScore > 0;
const normalized = selection.trim().toLowerCase();
if (normalized.includes("yes")) return bothScored ? SignalStatus.win : SignalStatus.lose;
if (normalized.includes("no")) return bothScored ? SignalStatus.lose : SignalStatus.win;
return SignalStatus.manual_review;
}
function evaluateHandicap(selection: string, lineValue: number | null, score: EventScore): SignalStatus {
if (lineValue === null) return SignalStatus.manual_review;
const normalized = selection.trim().toLowerCase();
const isHome = normalized.includes("1") || normalized.includes("home");
const value = isHome ? score.homeScore + lineValue - score.awayScore : score.awayScore + lineValue - score.homeScore;
if (value === 0) return SignalStatus.void;
return value > 0 ? SignalStatus.win : SignalStatus.lose;
}
export function settleSignal(signal: Signal, score: EventScore) {
const market = signal.marketType.trim().toLowerCase();
if (market === "1x2") return evaluate1X2(signal.selection, score);
if (market.includes("total")) return evaluateTotal(signal.selection, signal.lineValue, score);
if (market.includes("both teams to score") || market.includes("btts")) return evaluateBtts(signal.selection, score);
if (market.includes("handicap")) return evaluateHandicap(signal.selection, signal.lineValue, score);
return SignalStatus.manual_review;
}

View File

@@ -0,0 +1,35 @@
import { NextFunction, Request, Response } from "express";
import { prisma } from "../db/prisma.js";
import { verifyToken } from "../lib/auth.js";
export type AuthenticatedRequest = Request & {
user?: {
id: string;
role: "admin" | "user";
};
};
export async function requireAuth(req: AuthenticatedRequest, res: Response, next: NextFunction) {
const header = req.headers.authorization;
const bearerToken = header?.startsWith("Bearer ") ? header.slice(7) : undefined;
const cookieToken = typeof req.cookies?.auth_token === "string" ? req.cookies.auth_token : undefined;
const token = bearerToken || cookieToken;
if (!token) return res.status(401).json({ message: "Требуется авторизация" });
try {
const payload = verifyToken(token);
const user = await prisma.user.findUnique({ where: { id: payload.userId } });
if (!user || !user.active) return res.status(401).json({ message: "Пользователь недоступен" });
req.user = { id: user.id, role: user.role };
next();
} catch {
return res.status(401).json({ message: "Невалидный токен" });
}
}
export function requireAdmin(req: AuthenticatedRequest, res: Response, next: NextFunction) {
if (req.user?.role !== "admin") return res.status(403).json({ message: "Требуются права администратора" });
next();
}

View File

@@ -0,0 +1,14 @@
import { NextFunction, Request, Response } from "express";
import { ZodError } from "zod";
import { HttpError } from "../lib/errors.js";
export function errorHandler(error: unknown, _req: Request, res: Response, _next: NextFunction) {
if (error instanceof ZodError) {
return res.status(400).json({ message: "Ошибка валидации", issues: error.flatten() });
}
if (error instanceof HttpError) {
return res.status(error.statusCode).json({ message: error.message });
}
console.error(error);
return res.status(500).json({ message: "Внутренняя ошибка сервера" });
}

View File

@@ -0,0 +1,314 @@
import { Router } from "express";
import { Prisma, SignalStatus } from "@prisma/client";
import { prisma } from "../../db/prisma.js";
import { HttpError } from "../../lib/errors.js";
import { requireAdmin, requireAuth, type AuthenticatedRequest } from "../../middleware/auth.js";
import { enqueueBroadcastPush, enqueueSignalPush } from "../queues/push.queue.js";
import { signalCreateSchema, signalStatusSchema, signalUpdateSchema } from "../signals/signals.schemas.js";
import { createSignal, updateSignal } from "../signals/signals.service.js";
export const adminRouter = Router();
adminRouter.use(requireAuth, requireAdmin);
type PushEventPayload = {
subscriptionId?: string;
endpoint?: string;
endpointHost?: string;
ok?: boolean;
statusCode?: number | null;
reason?: string | null;
notificationType?: string | null;
};
async function logAdminAction(
adminId: string,
action: string,
entityType: string,
entityId: string,
metadata?: Record<string, unknown>
) {
await prisma.adminActionLog.create({
data: {
adminId,
action,
entityType,
entityId,
metadata: metadata ? (metadata as Prisma.InputJsonObject) : undefined
}
});
}
adminRouter.post("/signals", async (req: AuthenticatedRequest, res, next) => {
try {
const payload = signalCreateSchema.parse(req.body);
const signal = await createSignal(payload);
await logAdminAction(req.user!.id, "create", "signal", signal.id);
res.status(201).json(signal);
} catch (error) {
next(error);
}
});
adminRouter.patch("/signals/:id", async (req: AuthenticatedRequest, res, next) => {
try {
const payload = signalUpdateSchema.parse(req.body);
const signal = await updateSignal(String(req.params.id), payload);
await logAdminAction(req.user!.id, "update", "signal", signal.id, payload);
res.json(signal);
} catch (error) {
next(error);
}
});
adminRouter.delete("/signals/:id", async (req: AuthenticatedRequest, res) => {
await prisma.signal.delete({ where: { id: String(req.params.id) } });
await logAdminAction(req.user!.id, "delete", "signal", String(req.params.id));
res.status(204).send();
});
adminRouter.post("/signals/:id/duplicate", async (req: AuthenticatedRequest, res, next) => {
try {
const source = await prisma.signal.findUnique({ where: { id: String(req.params.id) } });
if (!source) {
throw new HttpError(404, "Сигнал не найден");
}
const duplicate = await createSignal({
providerId: source.providerId,
eventId: `${source.eventId}-${Date.now()}`,
sportType: source.sportType,
leagueName: source.leagueName,
homeTeam: source.homeTeam,
awayTeam: source.awayTeam,
eventStartTime: source.eventStartTime.toISOString(),
marketType: source.marketType,
selection: source.selection,
lineValue: source.lineValue,
odds: source.odds,
signalTime: new Date().toISOString(),
sourceType: source.sourceType,
comment: source.comment,
rawPayload: source.rawPayload as Record<string, unknown> | null,
published: false
});
await logAdminAction(req.user!.id, "duplicate", "signal", duplicate.id, { sourceId: source.id });
res.status(201).json(duplicate);
} catch (error) {
next(error);
}
});
adminRouter.post("/signals/:id/send-push", async (req: AuthenticatedRequest, res, next) => {
try {
const job = await enqueueSignalPush({
type: "signal",
signalId: String(req.params.id),
force: true
});
await logAdminAction(req.user!.id, "send_push", "signal", String(req.params.id), {
queued: true,
jobId: job.id
});
res.status(202).json({ queued: true, jobId: job.id });
} catch (error) {
next(error);
}
});
adminRouter.post("/signals/:id/set-status", async (req: AuthenticatedRequest, res, next) => {
try {
const payload = signalStatusSchema.parse(req.body);
await prisma.signal.update({
where: { id: String(req.params.id) },
data: { status: payload.status as SignalStatus }
});
const settlement = await prisma.settlement.upsert({
where: { signalId: String(req.params.id) },
update: { result: payload.status as SignalStatus, explanation: payload.explanation },
create: {
signalId: String(req.params.id),
result: payload.status as SignalStatus,
explanation: payload.explanation
}
});
await logAdminAction(req.user!.id, "set_status", "signal", String(req.params.id), payload);
res.json(settlement);
} catch (error) {
next(error);
}
});
adminRouter.post("/broadcast", async (req: AuthenticatedRequest, res, next) => {
try {
const title = String(req.body.title ?? "").trim();
const body = String(req.body.body ?? "").trim();
if (!title || !body) {
throw new HttpError(400, "Нужны title и body");
}
const job = await enqueueBroadcastPush({
type: "broadcast",
title,
body,
url: req.body.url ? String(req.body.url) : "/",
tag: req.body.tag ? String(req.body.tag) : undefined,
renotify: Boolean(req.body.renotify)
});
await logAdminAction(req.user!.id, "broadcast", "notification", "broadcast", {
title,
body,
url: req.body.url,
tag: req.body.tag,
renotify: Boolean(req.body.renotify),
queued: true,
jobId: job.id
});
res.status(202).json({ queued: true, jobId: job.id });
} catch (error) {
next(error);
}
});
adminRouter.get("/logs", async (_req, res) => {
const [pushLogs, adminActions, integrationLogs] = await Promise.all([
prisma.notificationLog.findMany({ orderBy: { createdAt: "desc" }, take: 50 }),
prisma.adminActionLog.findMany({ orderBy: { createdAt: "desc" }, take: 50, include: { admin: true } }),
prisma.integrationLog.findMany({ orderBy: { createdAt: "desc" }, take: 50 })
]);
res.json({ pushLogs, adminActions, integrationLogs });
});
adminRouter.get("/push-subscriptions", async (_req, res) => {
const [subscriptions, recentEvents, notificationLogs] = await Promise.all([
prisma.pushSubscription.findMany({
orderBy: { updatedAt: "desc" },
include: {
user: {
select: {
id: true,
email: true,
role: true,
active: true,
notificationSetting: true
}
}
}
}),
prisma.integrationLog.findMany({
where: { provider: "web-push" },
orderBy: { createdAt: "desc" },
take: 500
}),
prisma.notificationLog.findMany({
orderBy: { createdAt: "desc" },
take: 20
})
]);
const latestEventBySubscription = new Map<string, (typeof recentEvents)[number]>();
for (const event of recentEvents) {
const payload =
event.payload && typeof event.payload === "object" ? (event.payload as unknown as PushEventPayload) : null;
const subscriptionId = payload?.subscriptionId;
if (!subscriptionId || latestEventBySubscription.has(subscriptionId)) {
continue;
}
latestEventBySubscription.set(subscriptionId, event);
}
const items = subscriptions.map((subscription) => {
const latestEvent = latestEventBySubscription.get(subscription.id);
const payload =
latestEvent?.payload && typeof latestEvent.payload === "object"
? (latestEvent.payload as unknown as PushEventPayload)
: null;
const status = !subscription.active
? "inactive"
: payload?.ok === false
? "error"
: payload?.ok === true
? "ok"
: "ready";
return {
id: subscription.id,
endpoint: subscription.endpoint,
endpointHost: payload?.endpointHost ?? (() => {
try {
return new URL(subscription.endpoint).hostname;
} catch {
return "unknown";
}
})(),
active: subscription.active,
createdAt: subscription.createdAt,
updatedAt: subscription.updatedAt,
status,
user: {
id: subscription.user.id,
email: subscription.user.email,
role: subscription.user.role,
active: subscription.user.active,
notificationSetting: subscription.user.notificationSetting
},
latestEvent: latestEvent
? {
createdAt: latestEvent.createdAt,
level: latestEvent.level,
message: latestEvent.message,
ok: payload?.ok ?? null,
statusCode: payload?.statusCode ?? null,
reason: payload?.reason ?? null,
notificationType: payload?.notificationType ?? null
}
: null
};
});
const summary = {
total: items.length,
active: items.filter((item) => item.active).length,
inactive: items.filter((item) => !item.active).length,
ok: items.filter((item) => item.status === "ok").length,
ready: items.filter((item) => item.status === "ready").length,
error: items.filter((item) => item.status === "error").length
};
res.json({
summary,
items,
recentNotificationLogs: notificationLogs
});
});
adminRouter.post("/event-results", async (req: AuthenticatedRequest, res) => {
const result = await prisma.eventResult.upsert({
where: { eventId: req.body.eventId },
update: {
homeScore: req.body.homeScore,
awayScore: req.body.awayScore,
status: req.body.status ?? "finished",
payload: req.body.payload ?? null
},
create: {
eventId: req.body.eventId,
homeScore: req.body.homeScore,
awayScore: req.body.awayScore,
status: req.body.status ?? "finished",
payload: req.body.payload ?? null
}
});
await logAdminAction(req.user!.id, "upsert_event_result", "event_result", result.id, req.body);
res.status(201).json(result);
});

View File

@@ -0,0 +1,13 @@
import { Router } from "express";
import { env } from "../../config/env.js";
export const appVersionRouter = Router();
appVersionRouter.get("/app-version", (_req, res) => {
res.json({
latestVersion: env.APP_LATEST_VERSION || null,
minSupportedVersion: env.APP_MIN_SUPPORTED_VERSION || null,
updateUrl: env.APP_UPDATE_URL || null,
message: env.APP_UPDATE_MESSAGE || "Доступна новая версия приложения"
});
});

View File

@@ -0,0 +1,361 @@
import crypto from "node:crypto";
import { type Response, Router } from "express";
import { Prisma, SubscriptionStatus } from "@prisma/client";
import { env } from "../../config/env.js";
import { prisma } from "../../db/prisma.js";
import { hashPassword, signToken, verifyPassword } from "../../lib/auth.js";
import { HttpError } from "../../lib/errors.js";
import { isMailConfigured, sendMail } from "../../lib/mail.js";
import { requireAuth, type AuthenticatedRequest } from "../../middleware/auth.js";
import {
forgotPasswordSchema,
loginSchema,
registerSchema,
resetPasswordSchema
} from "./auth.schemas.js";
export const authRouter = Router();
type PasswordResetTokenRow = {
id: string;
userId: string;
expiresAt: Date;
usedAt: Date | null;
userActive: boolean;
};
function logForgotPassword(event: string, payload: Record<string, unknown>) {
console.info("[auth/forgot-password]", event, payload);
}
function isSecureCookie() {
try {
return new URL(env.APP_PUBLIC_URL).protocol === "https:";
} catch {
return true;
}
}
function getAuthCookieDomain() {
try {
const { hostname } = new URL(env.APP_PUBLIC_URL);
if (!hostname || hostname === "localhost" || /^\d{1,3}(\.\d{1,3}){3}$/.test(hostname)) {
return undefined;
}
return `.${hostname}`;
} catch {
return undefined;
}
}
function setAuthCookie(res: Response, token: string) {
res.cookie("auth_token", token, {
httpOnly: true,
secure: isSecureCookie(),
sameSite: "lax",
path: "/",
domain: getAuthCookieDomain()
});
}
function clearAuthCookie(res: Response) {
res.clearCookie("auth_token", {
httpOnly: true,
secure: isSecureCookie(),
sameSite: "lax",
path: "/",
domain: getAuthCookieDomain()
});
}
function isSubscriptionActive(access: {
status: SubscriptionStatus;
startsAt: Date;
expiresAt: Date | null;
}) {
const now = Date.now();
return (
access.status === SubscriptionStatus.active &&
access.startsAt.getTime() <= now &&
(access.expiresAt === null || access.expiresAt.getTime() >= now)
);
}
function hashResetToken(token: string) {
return crypto.createHash("sha256").update(token).digest("hex");
}
function createPasswordResetUrl(token: string) {
const url = new URL("/reset-password", env.APP_PUBLIC_URL);
url.searchParams.set("token", token);
return url.toString();
}
async function sendPasswordResetEmail(email: string, token: string) {
const resetUrl = createPasswordResetUrl(token);
await sendMail({
to: email,
subject: "Восстановление пароля",
text: [
"Вы запросили восстановление пароля.",
`Перейдите по ссылке: ${resetUrl}`,
`Ссылка действует ${env.PASSWORD_RESET_TTL_MINUTES} минут.`
].join("\n"),
html: `
<div style="font-family: Arial, sans-serif; line-height: 1.6; color: #10203b;">
<h2 style="margin: 0 0 16px;">Восстановление пароля</h2>
<p style="margin: 0 0 12px;">Вы запросили смену пароля для вашего аккаунта.</p>
<p style="margin: 0 0 20px;">
<a
href="${resetUrl}"
style="display: inline-block; padding: 12px 18px; border-radius: 12px; background: #21c58b; color: #ffffff; text-decoration: none; font-weight: 700;"
>
Сбросить пароль
</a>
</p>
<p style="margin: 0 0 12px;">Или откройте ссылку вручную:</p>
<p style="margin: 0 0 12px; word-break: break-all;">${resetUrl}</p>
<p style="margin: 0; color: #5f7395;">Ссылка действует ${env.PASSWORD_RESET_TTL_MINUTES} минут.</p>
</div>
`
});
}
authRouter.post("/register", async (req, res, next) => {
try {
const data = registerSchema.parse(req.body);
const existingUser = await prisma.user.findUnique({ where: { email: data.email } });
if (existingUser) throw new HttpError(409, "Пользователь уже существует");
const user = await prisma.user.create({
data: {
email: data.email,
passwordHash: await hashPassword(data.password),
notificationSetting: {
create: { signalsPushEnabled: true, resultsPushEnabled: false }
}
},
select: { id: true, email: true, role: true }
});
const token = signToken({ userId: user.id, role: user.role });
setAuthCookie(res, token);
res.status(201).json({ token, user: { id: user.id, email: user.email, role: user.role } });
} catch (error) {
next(error);
}
});
authRouter.post("/login", async (req, res, next) => {
try {
const data = loginSchema.parse(req.body);
const user = await prisma.user.findUnique({ where: { email: data.email } });
if (!user || !(await verifyPassword(data.password, user.passwordHash))) {
throw new HttpError(401, "Неверный email или пароль");
}
const token = signToken({ userId: user.id, role: user.role });
setAuthCookie(res, token);
res.json({
token,
user: { id: user.id, email: user.email, role: user.role }
});
} catch (error) {
next(error);
}
});
authRouter.post("/logout", (_req, res) => {
clearAuthCookie(res);
res.status(204).send();
});
authRouter.post("/forgot-password", async (req, res, next) => {
try {
const data = forgotPasswordSchema.parse(req.body);
logForgotPassword("requested", { email: data.email });
if (!isMailConfigured()) {
logForgotPassword("smtp_not_configured", { email: data.email });
throw new HttpError(503, "Восстановление пароля по email пока не настроено");
}
const user = await prisma.user.findUnique({
where: { email: data.email },
select: { id: true, email: true, active: true }
});
if (!user) {
logForgotPassword("user_not_found", { email: data.email });
} else if (!user.active) {
logForgotPassword("user_inactive", { email: user.email, userId: user.id });
} else {
const token = crypto.randomBytes(32).toString("hex");
const tokenHash = hashResetToken(token);
const expiresAt = new Date(Date.now() + env.PASSWORD_RESET_TTL_MINUTES * 60 * 1000);
await prisma.$executeRaw`
DELETE FROM "PasswordResetToken"
WHERE "userId" = ${user.id} AND "usedAt" IS NULL
`;
await prisma.$executeRaw`
INSERT INTO "PasswordResetToken" ("id", "userId", "tokenHash", "expiresAt")
VALUES (${crypto.randomUUID()}, ${user.id}, ${tokenHash}, ${expiresAt})
`;
logForgotPassword("token_created", {
email: user.email,
userId: user.id,
expiresAt: expiresAt.toISOString()
});
await sendPasswordResetEmail(user.email, token);
logForgotPassword("email_sent", { email: user.email, userId: user.id });
}
res.json({
message: "Если аккаунт с таким email существует, мы отправили письмо для восстановления пароля"
});
} catch (error) {
next(error);
}
});
authRouter.post("/reset-password", async (req, res, next) => {
try {
const data = resetPasswordSchema.parse(req.body);
const tokenHash = hashResetToken(data.token);
const [resetToken] = await prisma.$queryRaw<PasswordResetTokenRow[]>(Prisma.sql`
SELECT
prt."id",
prt."userId",
prt."expiresAt",
prt."usedAt",
u."active" AS "userActive"
FROM "PasswordResetToken" prt
INNER JOIN "User" u ON u."id" = prt."userId"
WHERE prt."tokenHash" = ${tokenHash}
LIMIT 1
`);
if (
!resetToken ||
resetToken.usedAt ||
resetToken.expiresAt.getTime() < Date.now() ||
!resetToken.userActive
) {
throw new HttpError(400, "Ссылка для восстановления недействительна или уже истекла");
}
const passwordHash = await hashPassword(data.password);
await prisma.$transaction(async (tx) => {
await tx.user.update({
where: { id: resetToken.userId },
data: {
passwordHash
}
});
await tx.$executeRaw`
UPDATE "PasswordResetToken"
SET "usedAt" = ${new Date()}
WHERE "id" = ${resetToken.id}
`;
await tx.$executeRaw`
DELETE FROM "PasswordResetToken"
WHERE "userId" = ${resetToken.userId}
AND "usedAt" IS NULL
AND "id" <> ${resetToken.id}
`;
});
res.json({ message: "Пароль успешно обновлен" });
} catch (error) {
next(error);
}
});
authRouter.get("/me", requireAuth, async (req: AuthenticatedRequest, res) => {
await prisma.userBotAccess.updateMany({
where: {
userId: req.user!.id,
status: SubscriptionStatus.active,
expiresAt: {
lt: new Date()
}
},
data: {
status: SubscriptionStatus.expired
}
});
const user = await prisma.user.findUnique({
where: { id: req.user!.id },
select: {
id: true,
email: true,
role: true,
active: true,
createdAt: true,
notificationSetting: true,
botAccesses: {
include: {
bot: {
select: {
id: true,
key: true,
name: true,
sourceUrl: true,
active: true
}
}
},
orderBy: [{ startsAt: "desc" }, { createdAt: "desc" }]
}
}
});
if (!user) {
throw new HttpError(404, "Пользователь не найден");
}
const activeBotAccesses = user.botAccesses.filter(isSubscriptionActive);
if (user.role === "admin" && user.botAccesses.length === 0) {
const bots = await prisma.bot.findMany({
where: { active: true },
select: {
id: true,
key: true,
name: true,
sourceUrl: true,
active: true
},
orderBy: { name: "asc" }
});
res.json({
...user,
botAccesses: bots.map((bot) => ({
id: `admin-${bot.id}`,
grantedAt: user.createdAt,
bot
}))
});
return;
}
res.json({
...user,
botAccesses: activeBotAccesses.map((access) => ({
...access,
isActiveNow: true
}))
});
});

View File

@@ -0,0 +1,24 @@
import { z } from "zod";
export const registerSchema = z.object({
email: z.string().email("Некорректный email"),
password: z.string().min(8, "Минимум 8 символов")
});
export const loginSchema = registerSchema;
export const forgotPasswordSchema = z.object({
email: z.string().email("Некорректный email")
});
export const resetPasswordSchema = z
.object({
token: z.string().min(1, "Токен восстановления обязателен"),
password: z.string().min(8, "Минимум 8 символов"),
confirmPassword: z.string().min(8, "Минимум 8 символов")
})
.refine((data) => data.password === data.confirmPassword, {
message: "Пароли не совпадают",
path: ["confirmPassword"]
});

View File

@@ -0,0 +1,58 @@
import { Router } from "express";
import { env } from "../../config/env.js";
import { HttpError } from "../../lib/errors.js";
import { enqueueBroadcastPush, enqueueSignalPush } from "../queues/push.queue.js";
export const internalRouter = Router();
internalRouter.use((req, res, next) => {
const secret = req.header("x-parser-secret");
if (!env.PARSER_INTERNAL_SECRET || secret !== env.PARSER_INTERNAL_SECRET) {
return res.status(403).json({ message: "Forbidden" });
}
next();
});
internalRouter.post("/parser/signals-changed", async (req, res, next) => {
try {
const signalIds = Array.isArray(req.body?.signalIds)
? req.body.signalIds.filter((value: unknown): value is string => typeof value === "string")
: [];
const jobs = await Promise.all(
signalIds.map((signalId: string) =>
enqueueSignalPush({
type: "signal",
signalId
})
)
);
res.json({
processed: signalIds.length,
queued: jobs.length,
jobs: jobs.map((job) => ({ id: job.id, signalId: job.data.signalId }))
});
} catch (error) {
next(error);
}
});
internalRouter.post("/broadcast", async (req, res, next) => {
try {
const title = String(req.body?.title ?? "").trim();
const body = String(req.body?.body ?? "").trim();
const url = req.body?.url ? String(req.body.url) : "/";
const tag = req.body?.tag ? String(req.body.tag) : undefined;
const renotify = Boolean(req.body?.renotify);
if (!title || !body) {
throw new HttpError(400, "Нужны title и body");
}
const job = await enqueueBroadcastPush({ type: "broadcast", title, body, url, tag, renotify });
res.status(202).json({ queued: true, jobId: job.id });
} catch (error) {
next(error);
}
});

View File

@@ -0,0 +1,428 @@
import { Router } from "express";
import { prisma } from "../../db/prisma.js";
import { getVapidPublicKey } from "../../lib/push.js";
import { requireAuth, type AuthenticatedRequest } from "../../middleware/auth.js";
import {
deactivateNativePushSubscriptionSchema,
deactivatePushSubscriptionSchema,
anonymousNativePushSubscriptionSchema,
anonymousPushSubscriptionSchema,
nativePushSubscriptionSchema,
notificationSettingsSchema,
pushSubscriptionSchema,
replaceableAnonymousPushSubscriptionSchema,
replaceablePushSubscriptionSchema
} from "./push.schemas.js";
export const pushRouter = Router();
function normalizeSubscriptionPayload(body: unknown) {
if (!body || typeof body !== "object") {
return body;
}
const payload = body as Record<string, unknown>;
const nestedKeys =
payload.keys && typeof payload.keys === "object" ? (payload.keys as Record<string, unknown>) : undefined;
return {
...payload,
keys: {
p256dh: nestedKeys?.p256dh ?? payload.p256dh,
auth: nestedKeys?.auth ?? payload.auth
}
};
}
async function getOrCreateAnonymousUser(clientId: string) {
const email = `anon+${clientId}@push.local`;
const user = await prisma.user.upsert({
where: { email },
update: { active: true },
create: {
email,
passwordHash: "anonymous",
role: "user",
active: true,
notificationSetting: {
create: {
signalsPushEnabled: true,
resultsPushEnabled: false
}
}
}
});
await prisma.notificationSetting.upsert({
where: { userId: user.id },
update: {
signalsPushEnabled: true
},
create: {
userId: user.id,
signalsPushEnabled: true,
resultsPushEnabled: false
}
});
return user;
}
async function upsertNativePushSubscription(userId: string, payload: { token: string; platform: "android" | "ios"; deviceId?: string }) {
return prisma.nativePushSubscription.upsert({
where: {
userId_token: {
userId,
token: payload.token
}
},
update: {
active: true,
platform: payload.platform,
deviceId: payload.deviceId
},
create: {
userId,
token: payload.token,
platform: payload.platform,
deviceId: payload.deviceId,
active: true
}
});
}
pushRouter.get("/vapid-public-key", (_req, res) => {
res.json({ publicKey: getVapidPublicKey() });
});
pushRouter.post("/public/push-subscriptions", async (req, res, next) => {
try {
const payload = replaceableAnonymousPushSubscriptionSchema.parse(normalizeSubscriptionPayload(req.body));
const anonymousUser = await getOrCreateAnonymousUser(payload.clientId);
const subscription = await prisma.$transaction(async (tx) => {
if (payload.previousEndpoint && payload.previousEndpoint !== payload.endpoint) {
await tx.pushSubscription.updateMany({
where: {
userId: anonymousUser.id,
endpoint: payload.previousEndpoint
},
data: { active: false }
});
}
return tx.pushSubscription.upsert({
where: {
userId_endpoint: {
userId: anonymousUser.id,
endpoint: payload.endpoint
}
},
update: {
active: true,
p256dh: payload.keys.p256dh,
auth: payload.keys.auth
},
create: {
userId: anonymousUser.id,
endpoint: payload.endpoint,
p256dh: payload.keys.p256dh,
auth: payload.keys.auth,
active: true
}
});
});
res.status(201).json(subscription);
} catch (error) {
console.error("Public push subscription failed", {
body: req.body,
error
});
next(error);
}
});
pushRouter.post("/public/native-push-subscriptions", async (req, res, next) => {
try {
const payload = anonymousNativePushSubscriptionSchema.parse(req.body);
const anonymousUser = await getOrCreateAnonymousUser(payload.clientId);
const subscription = await upsertNativePushSubscription(anonymousUser.id, payload);
res.status(201).json(subscription);
} catch (error) {
console.error("Public native push subscription failed", {
body: req.body,
error
});
next(error);
}
});
pushRouter.get("/public/push-subscriptions/:clientId", async (req, res, next) => {
try {
const payload = anonymousPushSubscriptionSchema.pick({ clientId: true }).parse(req.params);
const email = `anon+${payload.clientId}@push.local`;
const anonymousUser = await prisma.user.findUnique({
where: { email },
include: {
pushSubscriptions: {
where: { active: true },
select: { id: true, endpoint: true, active: true, createdAt: true, updatedAt: true }
}
}
});
const items = anonymousUser?.pushSubscriptions ?? [];
res.json({
items,
hasActiveSubscription: items.length > 0
});
} catch (error) {
next(error);
}
});
pushRouter.get("/public/native-push-subscriptions/:clientId", async (req, res, next) => {
try {
const payload = anonymousPushSubscriptionSchema.pick({ clientId: true }).parse(req.params);
const email = `anon+${payload.clientId}@push.local`;
const anonymousUser = await prisma.user.findUnique({
where: { email },
include: {
nativePushSubscriptions: {
where: { active: true },
select: { id: true, platform: true, active: true, createdAt: true, updatedAt: true }
}
}
});
const items = anonymousUser?.nativePushSubscriptions ?? [];
res.json({
items,
hasActiveSubscription: items.length > 0
});
} catch (error) {
next(error);
}
});
pushRouter.delete("/public/push-subscriptions/:clientId/:id", async (req, res, next) => {
try {
const payload = anonymousPushSubscriptionSchema.pick({ clientId: true }).parse(req.params);
const email = `anon+${payload.clientId}@push.local`;
const anonymousUser = await prisma.user.findUnique({ where: { email } });
if (!anonymousUser) {
return res.status(204).send();
}
await prisma.pushSubscription.updateMany({
where: { id: String(req.params.id), userId: anonymousUser.id },
data: { active: false }
});
res.status(204).send();
} catch (error) {
next(error);
}
});
pushRouter.delete("/public/native-push-subscriptions/:clientId/:id", async (req, res, next) => {
try {
const payload = anonymousPushSubscriptionSchema.pick({ clientId: true }).parse(req.params);
const email = `anon+${payload.clientId}@push.local`;
const anonymousUser = await prisma.user.findUnique({ where: { email } });
if (!anonymousUser) {
return res.status(204).send();
}
await prisma.nativePushSubscription.updateMany({
where: { id: String(req.params.id), userId: anonymousUser.id },
data: { active: false }
});
res.status(204).send();
} catch (error) {
next(error);
}
});
pushRouter.post("/me/push-subscriptions", requireAuth, async (req: AuthenticatedRequest, res, next) => {
try {
const payload = replaceablePushSubscriptionSchema.parse(normalizeSubscriptionPayload(req.body));
const subscription = await prisma.$transaction(async (tx) => {
if (payload.previousEndpoint && payload.previousEndpoint !== payload.endpoint) {
await tx.pushSubscription.updateMany({
where: {
userId: req.user!.id,
endpoint: payload.previousEndpoint
},
data: { active: false }
});
}
return tx.pushSubscription.upsert({
where: {
userId_endpoint: {
userId: req.user!.id,
endpoint: payload.endpoint
}
},
update: {
active: true,
p256dh: payload.keys.p256dh,
auth: payload.keys.auth
},
create: {
userId: req.user!.id,
endpoint: payload.endpoint,
p256dh: payload.keys.p256dh,
auth: payload.keys.auth,
active: true
}
});
});
res.status(201).json(subscription);
} catch (error) {
console.error("Authenticated push subscription failed", {
userId: req.user?.id,
body: req.body,
error
});
next(error);
}
});
pushRouter.post("/me/native-push-subscriptions", requireAuth, async (req: AuthenticatedRequest, res, next) => {
try {
const payload = nativePushSubscriptionSchema.parse(req.body);
const subscription = await upsertNativePushSubscription(req.user!.id, payload);
res.status(201).json(subscription);
} catch (error) {
console.error("Authenticated native push subscription failed", {
userId: req.user?.id,
body: req.body,
error
});
next(error);
}
});
pushRouter.get("/me/push-subscriptions", requireAuth, async (req: AuthenticatedRequest, res) => {
const subscriptions = await prisma.pushSubscription.findMany({
where: { userId: req.user!.id, active: true },
select: {
id: true,
endpoint: true,
active: true,
createdAt: true,
updatedAt: true
}
});
res.json({
items: subscriptions,
hasActiveSubscription: subscriptions.length > 0
});
});
pushRouter.get("/me/native-push-subscriptions", requireAuth, async (req: AuthenticatedRequest, res) => {
const subscriptions = await prisma.nativePushSubscription.findMany({
where: { userId: req.user!.id, active: true },
select: {
id: true,
platform: true,
active: true,
createdAt: true,
updatedAt: true
}
});
res.json({
items: subscriptions,
hasActiveSubscription: subscriptions.length > 0
});
});
pushRouter.delete("/me/push-subscriptions/:id", requireAuth, async (req: AuthenticatedRequest, res) => {
await prisma.pushSubscription.updateMany({
where: { id: String(req.params.id), userId: req.user!.id },
data: { active: false }
});
res.status(204).send();
});
pushRouter.delete("/me/native-push-subscriptions/:id", requireAuth, async (req: AuthenticatedRequest, res) => {
await prisma.nativePushSubscription.updateMany({
where: { id: String(req.params.id), userId: req.user!.id },
data: { active: false }
});
res.status(204).send();
});
pushRouter.post("/me/push-subscriptions/deactivate", requireAuth, async (req: AuthenticatedRequest, res, next) => {
try {
const payload = deactivatePushSubscriptionSchema.parse(req.body);
await prisma.pushSubscription.updateMany({
where: {
userId: req.user!.id,
endpoint: payload.endpoint,
active: true
},
data: { active: false }
});
res.status(204).send();
} catch (error) {
next(error);
}
});
pushRouter.post("/me/native-push-subscriptions/deactivate", requireAuth, async (req: AuthenticatedRequest, res, next) => {
try {
const payload = deactivateNativePushSubscriptionSchema.parse(req.body);
await prisma.nativePushSubscription.updateMany({
where: {
userId: req.user!.id,
token: payload.token,
active: true
},
data: { active: false }
});
res.status(204).send();
} catch (error) {
next(error);
}
});
pushRouter.get("/me/notification-settings", requireAuth, async (req: AuthenticatedRequest, res) => {
const settings = await prisma.notificationSetting.findUnique({
where: { userId: req.user!.id }
});
res.json(settings);
});
pushRouter.patch("/me/notification-settings", requireAuth, async (req: AuthenticatedRequest, res, next) => {
try {
const payload = notificationSettingsSchema.parse(req.body);
const settings = await prisma.notificationSetting.upsert({
where: { userId: req.user!.id },
update: payload,
create: {
userId: req.user!.id,
...payload
}
});
res.json(settings);
} catch (error) {
next(error);
}
});

View File

@@ -0,0 +1,44 @@
import { z } from "zod";
export const pushSubscriptionSchema = z.object({
endpoint: z.string().url(),
keys: z.object({
p256dh: z.string().min(1),
auth: z.string().min(1)
})
});
export const replaceablePushSubscriptionSchema = pushSubscriptionSchema.extend({
previousEndpoint: z.string().url().optional()
});
export const anonymousPushSubscriptionSchema = pushSubscriptionSchema.extend({
clientId: z.string().min(1).max(128).regex(/^[a-zA-Z0-9_-]+$/)
});
export const replaceableAnonymousPushSubscriptionSchema = anonymousPushSubscriptionSchema.extend({
previousEndpoint: z.string().url().optional()
});
export const nativePushSubscriptionSchema = z.object({
token: z.string().min(16),
platform: z.enum(["android", "ios"]),
deviceId: z.string().min(1).max(255).optional()
});
export const anonymousNativePushSubscriptionSchema = nativePushSubscriptionSchema.extend({
clientId: z.string().min(1).max(128).regex(/^[a-zA-Z0-9_-]+$/)
});
export const notificationSettingsSchema = z.object({
signalsPushEnabled: z.boolean(),
resultsPushEnabled: z.boolean()
});
export const deactivatePushSubscriptionSchema = z.object({
endpoint: z.string().url()
});
export const deactivateNativePushSubscriptionSchema = z.object({
token: z.string().min(16)
});

View File

@@ -0,0 +1,569 @@
import { SubscriptionStatus } from "@prisma/client";
import { botNameAliases } from "../../config/env.js";
import { prisma } from "../../db/prisma.js";
import { sendNativePush } from "../../lib/native-push.js";
import { sendWebPush } from "../../lib/push.js";
type BroadcastInput = {
title: string;
body: string;
url?: string;
tag?: string;
renotify?: boolean;
};
type DeliveryContext = {
notificationType: "signal" | "broadcast";
title: string;
body: string;
signalId?: string;
tag?: string;
};
type NotifySignalOptions = {
force?: boolean;
};
type PushSubscriptionRecord = {
id: string;
endpoint: string;
p256dh: string;
auth: string;
userId: string;
active: boolean;
};
type NativePushSubscriptionRecord = {
id: string;
token: string;
platform: string;
deviceId: string | null;
userId: string;
active: boolean;
};
function buildActiveSubscriptionWhere(botKey: string) {
const now = new Date();
return {
status: SubscriptionStatus.active,
startsAt: {
lte: now
},
OR: [
{
expiresAt: null
},
{
expiresAt: {
gte: now
}
}
],
bot: {
key: botKey
}
};
}
function isInactiveSignal(rawPayload: unknown) {
if (!rawPayload || typeof rawPayload !== "object" || Array.isArray(rawPayload)) {
return false;
}
const payload = rawPayload as Record<string, unknown>;
return payload.forecastInactive === true || payload.activeTab === 2 || payload.activeTab === "2";
}
function getAliasedBotName(rawPayload: Record<string, unknown> | null) {
const rawBotKey = typeof rawPayload?.botKey === "string" ? rawPayload.botKey.trim() : "";
const rawBotName = typeof rawPayload?.botName === "string" ? rawPayload.botName.trim() : "";
if (rawBotKey) {
const aliasByKey = botNameAliases.get(rawBotKey);
if (aliasByKey) {
return aliasByKey;
}
}
if (rawBotName) {
const aliasByName = botNameAliases.get(rawBotName);
if (aliasByName) {
return aliasByName;
}
}
return rawBotName || "New signal";
}
function getSignalPushPayload(signal: {
id: string;
eventId: string;
homeTeam: string;
awayTeam: string;
selection: string;
odds: unknown;
forecast: string | null;
rawPayload: unknown;
}) {
const rawPayload = signal.rawPayload && typeof signal.rawPayload === "object" && !Array.isArray(signal.rawPayload)
? signal.rawPayload as Record<string, unknown>
: null;
const botName = getAliasedBotName(rawPayload);
const teams = [signal.homeTeam, signal.awayTeam]
.map((team) => team.trim())
.filter((team) => team.length > 0)
.join(" - ");
const forecast = signal.forecast?.trim() || "";
const odds = typeof signal.odds === "number" && Number.isFinite(signal.odds)
? signal.odds.toFixed(2)
: String(signal.odds).trim();
const bodyParts = [odds, forecast, botName].filter((part) => part.length > 0);
const body = bodyParts.join(" - ");
return {
title: teams || botName,
body
};
}
function getSignalBotKey(signal: { rawPayload: unknown }) {
if (!signal.rawPayload || typeof signal.rawPayload !== "object" || Array.isArray(signal.rawPayload)) {
return null;
}
const payload = signal.rawPayload as Record<string, unknown>;
if (typeof payload.botKey !== "string") {
return null;
}
const botKey = payload.botKey.trim();
return botKey.length > 0 ? botKey : null;
}
function getSignalNotificationFingerprint(signal: {
eventId: string;
homeTeam: string;
awayTeam: string;
selection: string;
odds: unknown;
forecast: string | null;
rawPayload: unknown;
}) {
const rawPayload = signal.rawPayload && typeof signal.rawPayload === "object" && !Array.isArray(signal.rawPayload)
? signal.rawPayload as Record<string, unknown>
: null;
return JSON.stringify({
eventId: signal.eventId,
homeTeam: signal.homeTeam,
awayTeam: signal.awayTeam,
selection: signal.selection,
odds: Number(signal.odds),
forecast: signal.forecast?.trim() || null,
botName: typeof rawPayload?.botName === "string" ? rawPayload.botName.trim() : null,
rawForecast: typeof rawPayload?.forecast === "string" ? rawPayload.forecast.trim() : null
});
}
function getEndpointHost(endpoint: string) {
try {
return new URL(endpoint).hostname;
} catch {
return "unknown";
}
}
async function recordDeliveryEvent(
subscription: PushSubscriptionRecord,
result:
| { ok: true; statusCode?: number }
| { ok: false; reason: string; statusCode?: number; details?: string },
context: DeliveryContext
) {
await prisma.integrationLog.create({
data: {
provider: "web-push",
level: result.ok ? "info" : "error",
message: result.ok ? "Web push delivered" : "Web push delivery failed",
payload: {
subscriptionId: subscription.id,
userId: subscription.userId,
endpoint: subscription.endpoint,
endpointHost: getEndpointHost(subscription.endpoint),
active: subscription.active,
notificationType: context.notificationType,
signalId: context.signalId ?? null,
title: context.title,
body: context.body,
tag: context.tag ?? null,
ok: result.ok,
statusCode: result.statusCode ?? null,
reason: result.ok ? null : result.reason,
details: result.ok ? null : result.details ?? null
}
}
});
}
async function deactivateInvalidSubscription(
subscription: Pick<PushSubscriptionRecord, "id" | "endpoint" | "userId">,
result: { ok: false; reason: string; statusCode?: number; details?: string },
context: DeliveryContext
) {
if (result.statusCode !== 404 && result.statusCode !== 410) {
return;
}
await prisma.pushSubscription.updateMany({
where: { id: subscription.id, active: true },
data: { active: false }
});
await prisma.integrationLog.create({
data: {
provider: "web-push",
level: "warn",
message: "Push subscription deactivated after provider rejection",
payload: {
subscriptionId: subscription.id,
userId: subscription.userId,
endpoint: subscription.endpoint,
endpointHost: getEndpointHost(subscription.endpoint),
notificationType: context.notificationType,
signalId: context.signalId ?? null,
tag: context.tag ?? null,
statusCode: result.statusCode,
reason: result.reason,
details: result.details ?? null
}
}
});
}
async function deliverToSubscription(
subscription: PushSubscriptionRecord,
payload: Record<string, unknown>,
context: DeliveryContext
) {
const result = await sendWebPush(subscription, payload);
await recordDeliveryEvent(subscription, result, context);
if (!result.ok) {
await deactivateInvalidSubscription(subscription, result, context);
}
return result;
}
async function recordNativeDeliveryEvent(
subscription: NativePushSubscriptionRecord,
result: { ok: true; messageId: string } | { ok: false; reason: string; code?: string },
context: DeliveryContext
) {
await prisma.integrationLog.create({
data: {
provider: "native-push",
level: result.ok ? "info" : "error",
message: result.ok ? "Native push delivered" : "Native push delivery failed",
payload: {
subscriptionId: subscription.id,
userId: subscription.userId,
platform: subscription.platform,
deviceId: subscription.deviceId,
active: subscription.active,
notificationType: context.notificationType,
signalId: context.signalId ?? null,
title: context.title,
body: context.body,
tag: context.tag ?? null,
ok: result.ok,
code: result.ok ? null : result.code ?? null,
reason: result.ok ? null : result.reason
}
}
});
}
async function deactivateInvalidNativeSubscription(
subscription: Pick<NativePushSubscriptionRecord, "id" | "userId" | "token">,
result: { ok: false; reason: string; code?: string },
context: DeliveryContext
) {
if (result.code !== "messaging/registration-token-not-registered") {
return;
}
await prisma.nativePushSubscription.updateMany({
where: { id: subscription.id, active: true },
data: { active: false }
});
await prisma.integrationLog.create({
data: {
provider: "native-push",
level: "warn",
message: "Native push subscription deactivated after provider rejection",
payload: {
subscriptionId: subscription.id,
userId: subscription.userId,
token: subscription.token,
notificationType: context.notificationType,
signalId: context.signalId ?? null,
tag: context.tag ?? null,
code: result.code ?? null,
reason: result.reason
}
}
});
}
async function deliverToNativeSubscription(
subscription: NativePushSubscriptionRecord,
payload: Record<string, string>,
context: DeliveryContext
) {
const result = await sendNativePush(subscription.token, {
title: context.title,
body: context.body,
data: payload
});
await recordNativeDeliveryEvent(subscription, result, context);
if (!result.ok) {
await deactivateInvalidNativeSubscription(subscription, result, context);
}
return result;
}
export async function notifyUsersForSignal(signalId: string, options: NotifySignalOptions = {}) {
const signal = await prisma.signal.findUnique({ where: { id: signalId } });
if (!signal) return { recipients: 0, successCount: 0, failedCount: 0 };
if (isInactiveSignal(signal.rawPayload)) {
return {
recipients: 0,
successCount: 0,
failedCount: 0,
skipped: true,
reason: "inactive_signal"
};
}
const fingerprint = getSignalNotificationFingerprint(signal);
if (!options.force) {
const previousNotification = await prisma.notificationLog.findFirst({
where: {
signalId,
type: "signal"
},
orderBy: {
createdAt: "desc"
}
});
const previousPayload = previousNotification?.payload
&& typeof previousNotification.payload === "object"
&& !Array.isArray(previousNotification.payload)
? previousNotification.payload as Record<string, unknown>
: null;
if (previousPayload?.fingerprint === fingerprint) {
return {
recipients: 0,
successCount: 0,
failedCount: 0,
skipped: true,
reason: "duplicate_signal_notification"
};
}
}
const botKey = getSignalBotKey(signal);
const users = await prisma.user.findMany({
where: {
active: true,
notificationSetting: { is: { signalsPushEnabled: true } },
...(botKey
? {
OR: [
{
role: "admin"
},
{
botAccesses: {
some: buildActiveSubscriptionWhere(botKey)
}
}
]
}
: {})
},
include: {
pushSubscriptions: { where: { active: true } },
nativePushSubscriptions: { where: { active: true } }
}
});
const { title, body } = getSignalPushPayload(signal);
const tag = `event-${signal.eventId}`;
let recipients = 0;
let successCount = 0;
let failedCount = 0;
for (const user of users) {
for (const subscription of user.pushSubscriptions) {
recipients += 1;
const result = await deliverToSubscription(
subscription,
{
title,
body,
url: `/signals/${signal.id}`,
tag,
data: {
url: `/signals/${signal.id}`,
eventId: signal.eventId,
signalId: signal.id,
type: "signal"
}
},
{
notificationType: "signal",
title,
body,
signalId: signal.id,
tag
}
);
if (result.ok) {
successCount += 1;
} else {
failedCount += 1;
}
}
for (const subscription of user.nativePushSubscriptions) {
recipients += 1;
const result = await deliverToNativeSubscription(
subscription,
{
url: `/signals/${signal.id}`,
eventId: signal.eventId,
signalId: signal.id,
type: "signal"
},
{
notificationType: "signal",
title,
body,
signalId: signal.id,
tag
}
);
if (result.ok) {
successCount += 1;
} else {
failedCount += 1;
}
}
}
await prisma.notificationLog.create({
data: {
signalId,
type: "signal",
recipients,
successCount,
failedCount,
payload: { title, body, fingerprint }
}
});
return { recipients, successCount, failedCount };
}
export async function broadcastNotification({ title, body, url = "/", tag, renotify = false }: BroadcastInput) {
const subscriptions = await prisma.pushSubscription.findMany({
where: { active: true, user: { active: true } }
});
const nativeSubscriptions = await prisma.nativePushSubscription.findMany({
where: { active: true, user: { active: true } }
});
let successCount = 0;
let failedCount = 0;
const finalTag = tag || `broadcast-${Date.now()}`;
for (const subscription of subscriptions) {
const result = await deliverToSubscription(
subscription,
{
title,
body,
url,
tag: finalTag,
renotify,
data: {
url,
type: "broadcast"
}
},
{
notificationType: "broadcast",
title,
body,
tag: finalTag
}
);
if (result.ok) {
successCount += 1;
} else {
failedCount += 1;
}
}
for (const subscription of nativeSubscriptions) {
const result = await deliverToNativeSubscription(
subscription,
{
url,
type: "broadcast",
tag: finalTag,
renotify: String(renotify)
},
{
notificationType: "broadcast",
title,
body,
tag: finalTag
}
);
if (result.ok) {
successCount += 1;
} else {
failedCount += 1;
}
}
await prisma.notificationLog.create({
data: {
type: "broadcast",
recipients: subscriptions.length + nativeSubscriptions.length,
successCount,
failedCount,
payload: { title, body, url, tag: finalTag, renotify }
}
});
return { recipients: subscriptions.length + nativeSubscriptions.length, successCount, failedCount };
}

View File

@@ -0,0 +1,33 @@
import { Queue } from "bullmq";
import { redisConnection } from "../../lib/redis.js";
import {
PUSH_BROADCAST_JOB_NAME,
PUSH_QUEUE_NAME,
PUSH_SIGNAL_JOB_NAME
} from "./queue.constants.js";
import type {
PushBroadcastJobData,
PushJobData,
PushSignalJobData
} from "./queue.types.js";
export const pushQueue = new Queue<PushJobData>(PUSH_QUEUE_NAME, {
connection: redisConnection,
defaultJobOptions: {
attempts: 5,
backoff: {
type: "exponential",
delay: 5_000
},
removeOnComplete: 500,
removeOnFail: 500
}
});
export async function enqueueSignalPush(payload: PushSignalJobData) {
return pushQueue.add(PUSH_SIGNAL_JOB_NAME, payload);
}
export async function enqueueBroadcastPush(payload: PushBroadcastJobData) {
return pushQueue.add(PUSH_BROADCAST_JOB_NAME, payload);
}

View File

@@ -0,0 +1,6 @@
export const SIGNALS_QUEUE_NAME = "signals";
export const SIGNALS_SNAPSHOT_JOB_NAME = "signals.snapshot";
export const PUSH_QUEUE_NAME = "push";
export const PUSH_SIGNAL_JOB_NAME = "push.signal";
export const PUSH_BROADCAST_JOB_NAME = "push.broadcast";

View File

@@ -0,0 +1,45 @@
export type QueueSignalPayload = {
providerId: string;
eventId: string;
sportType: string;
leagueName: string;
homeTeam: string;
awayTeam: string;
eventStartTime: string;
marketType: string;
selection: string;
forecast: string | null;
lineValue: number | null;
odds: number;
signalTime: string;
status: "pending" | "won" | "lost" | "void" | "unpublished";
sourceType: "provider" | "manual";
comment: string | null;
published: boolean;
dedupeKey: string;
rawPayload: Record<string, unknown> | null;
};
export type SignalsSnapshotJobData = {
providerId: string;
items: QueueSignalPayload[];
meta?: Record<string, unknown>;
syncEligibleBotKeys: string[];
};
export type PushSignalJobData = {
type: "signal";
signalId: string;
force?: boolean;
};
export type PushBroadcastJobData = {
type: "broadcast";
title: string;
body: string;
url?: string;
tag?: string;
renotify?: boolean;
};
export type PushJobData = PushSignalJobData | PushBroadcastJobData;

View File

@@ -0,0 +1,21 @@
import { Queue } from "bullmq";
import { redisConnection } from "../../lib/redis.js";
import { SIGNALS_QUEUE_NAME, SIGNALS_SNAPSHOT_JOB_NAME } from "./queue.constants.js";
import type { SignalsSnapshotJobData } from "./queue.types.js";
export const signalsQueue = new Queue<SignalsSnapshotJobData>(SIGNALS_QUEUE_NAME, {
connection: redisConnection,
defaultJobOptions: {
attempts: 5,
backoff: {
type: "exponential",
delay: 5_000
},
removeOnComplete: 200,
removeOnFail: 500
}
});
export async function enqueueSignalsSnapshot(payload: SignalsSnapshotJobData) {
return signalsQueue.add(SIGNALS_SNAPSHOT_JOB_NAME, payload);
}

View File

@@ -0,0 +1,118 @@
import { Router } from "express";
import type { Request } from "express";
import { prisma } from "../../db/prisma.js";
import { env } from "../../config/env.js";
export const rssRouter = Router();
function escapeXml(value: string) {
return value
.replaceAll("&", "&amp;")
.replaceAll("<", "&lt;")
.replaceAll(">", "&gt;")
.replaceAll('"', "&quot;")
.replaceAll("'", "&apos;");
}
function toRfc822(value: Date) {
return value.toUTCString();
}
function buildFrontendUrl(path: string) {
return new URL(path, env.APP_PUBLIC_URL).toString();
}
function buildRequestUrl(req: Request) {
const protoHeader = req.get("x-forwarded-proto");
const hostHeader = req.get("x-forwarded-host") || req.get("host");
const protocol = protoHeader?.split(",")[0]?.trim() || req.protocol || "http";
const host = hostHeader?.split(",")[0]?.trim();
if (!host) {
return "";
}
return `${protocol}://${host}${req.originalUrl}`;
}
function formatSignalDescription(signal: {
leagueName: string;
marketType: string;
selection: string;
lineValue: number | null;
odds: number;
status: string;
comment: string | null;
eventStartTime: Date;
signalTime: Date;
}) {
const parts = [
`League: ${signal.leagueName}`,
`Market: ${signal.marketType}`,
`Selection: ${signal.selection}${signal.lineValue !== null ? ` ${signal.lineValue}` : ""}`,
`Odds: ${signal.odds}`,
`Status: ${signal.status}`,
`Event start: ${signal.eventStartTime.toISOString()}`,
`Signal time: ${signal.signalTime.toISOString()}`
];
if (signal.comment) {
parts.push(`Comment: ${signal.comment}`);
}
return parts.join(" | ");
}
rssRouter.get("/rss.xml", async (req, res, next) => {
try {
const signals = await prisma.signal.findMany({
where: { published: true },
orderBy: [{ signalTime: "desc" }, { createdAt: "desc" }],
take: 50
});
const siteUrl = env.APP_PUBLIC_URL;
const selfUrl = buildRequestUrl(req);
const latestDate = signals[0]?.updatedAt ?? new Date();
const itemsXml = signals
.map((signal) => {
const title = `${signal.homeTeam} vs ${signal.awayTeam}: ${signal.selection} @ ${signal.odds}`;
const link = buildFrontendUrl(`/signals/${signal.id}`);
const description = formatSignalDescription(signal);
return [
"<item>",
`<title>${escapeXml(title)}</title>`,
`<link>${escapeXml(link)}</link>`,
`<guid isPermaLink="false">${escapeXml(signal.id)}</guid>`,
`<pubDate>${toRfc822(signal.signalTime)}</pubDate>`,
`<description>${escapeXml(description)}</description>`,
"</item>"
].join("");
})
.join("");
const channelParts = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">',
"<channel>",
"<title>Betting Signals</title>",
"<description>Latest published betting signals</description>",
`<link>${escapeXml(siteUrl)}</link>`,
`<lastBuildDate>${toRfc822(latestDate)}</lastBuildDate>`,
"<language>ru</language>"
];
if (selfUrl) {
channelParts.push(`<atom:link href="${escapeXml(selfUrl)}" rel="self" type="application/rss+xml" />`);
}
channelParts.push(itemsXml, "</channel>", "</rss>");
res.setHeader("Content-Type", "application/rss+xml; charset=utf-8");
res.send(channelParts.join(""));
} catch (error) {
next(error);
}
});

View File

@@ -0,0 +1,271 @@
import crypto from "node:crypto";
import { Prisma } from "@prisma/client";
import { prisma } from "../../db/prisma.js";
import { enqueueSignalPush } from "../queues/push.queue.js";
import type { QueueSignalPayload, SignalsSnapshotJobData } from "../queues/queue.types.js";
function parseActiveTab(value: unknown) {
if (value === null || value === undefined || value === "") return null;
const parsed = Number(value);
return Number.isInteger(parsed) ? parsed : null;
}
function normalizeDateValue(value: unknown) {
if (!value) return null;
const date = value instanceof Date ? value : new Date(value as string);
const time = date.getTime();
return Number.isNaN(time) ? null : new Date(time).toISOString();
}
function getMeaningfulRawPayload(rawPayload: QueueSignalPayload["rawPayload"]) {
if (!rawPayload || typeof rawPayload !== "object" || Array.isArray(rawPayload)) {
return rawPayload ?? null;
}
return {
botKey: rawPayload.botKey || null,
botName: rawPayload.botName || null,
botUrl: rawPayload.botUrl || null,
id: rawPayload.id || null,
eventUrl: rawPayload.eventUrl || null,
publicationType: rawPayload.publicationType || null,
selectionText: rawPayload.selectionText || null,
publicationTimer: rawPayload.publicationTimer || null,
forecast: rawPayload.forecast || null,
forecastRaw: rawPayload.forecastRaw || null,
forecastImageUrl: rawPayload.forecastImageUrl || null,
stake: rawPayload.stake || null,
stakePercent: rawPayload.stakePercent || null,
activeTab: parseActiveTab(rawPayload.activeTab),
forecastInactive: rawPayload.forecastInactive === true,
parserDetectedInactive: rawPayload.parserDetectedInactive === true
};
}
function hasMeaningfulSignalChanges(existingSignal: {
sportType: string;
leagueName: string;
homeTeam: string;
awayTeam: string;
forecast: string | null;
eventStartTime: Date;
odds: unknown;
published: boolean;
rawPayload: unknown;
} | null, nextSignal: QueueSignalPayload) {
if (!existingSignal) return true;
return (
existingSignal.sportType !== nextSignal.sportType ||
existingSignal.leagueName !== nextSignal.leagueName ||
existingSignal.homeTeam !== nextSignal.homeTeam ||
existingSignal.awayTeam !== nextSignal.awayTeam ||
existingSignal.forecast !== nextSignal.forecast ||
normalizeDateValue(existingSignal.eventStartTime) !== normalizeDateValue(nextSignal.eventStartTime) ||
Number(existingSignal.odds) !== Number(nextSignal.odds) ||
existingSignal.published !== nextSignal.published ||
JSON.stringify(getMeaningfulRawPayload(existingSignal.rawPayload as QueueSignalPayload["rawPayload"])) !==
JSON.stringify(getMeaningfulRawPayload(nextSignal.rawPayload))
);
}
async function upsertSignalWithTransaction(
tx: Prisma.TransactionClient,
signal: QueueSignalPayload
) {
const existingRows = await tx.$queryRawUnsafe<
Array<{
id: string;
published: boolean;
sportType: string;
leagueName: string;
homeTeam: string;
awayTeam: string;
forecast: string | null;
eventStartTime: Date;
odds: unknown;
rawPayload: unknown;
}>
>(
`
SELECT
id,
"published",
"sportType",
"leagueName",
"homeTeam",
"awayTeam",
"forecast",
"eventStartTime",
"odds",
"rawPayload"
FROM "Signal"
WHERE "dedupeKey" = $1
LIMIT 1
`,
signal.dedupeKey
);
const existingSignal = existingRows[0] || null;
const result = await tx.$queryRawUnsafe<Array<{ id: string }>>(
`
INSERT INTO "Signal" (
id, "providerId", "eventId", "sportType", "leagueName", "homeTeam", "awayTeam",
"eventStartTime", "marketType", "selection", "forecast", "lineValue", "odds", "signalTime",
"status", "sourceType", "comment", "published", "dedupeKey", "rawPayload",
"createdAt", "updatedAt"
)
VALUES (
$1, $2, $3, $4, $5, $6, $7,
$8, $9, $10, $11, $12, $13, $14,
$15::"SignalStatus", $16::"SourceType", $17, $18, $19, $20::jsonb,
NOW(), NOW()
)
ON CONFLICT ("dedupeKey")
DO UPDATE SET
"sportType" = EXCLUDED."sportType",
"leagueName" = EXCLUDED."leagueName",
"homeTeam" = EXCLUDED."homeTeam",
"awayTeam" = EXCLUDED."awayTeam",
"eventStartTime" = EXCLUDED."eventStartTime",
"forecast" = EXCLUDED."forecast",
"odds" = EXCLUDED."odds",
"signalTime" = CASE
WHEN ABS(EXTRACT(EPOCH FROM ("Signal"."updatedAt" - "Signal"."signalTime"))) <= 120
THEN EXCLUDED."signalTime"
ELSE "Signal"."signalTime"
END,
"comment" = EXCLUDED."comment",
"published" = true,
"status" = 'pending'::"SignalStatus",
"rawPayload" = EXCLUDED."rawPayload",
"updatedAt" = NOW()
RETURNING id
`,
existingSignal?.id || crypto.randomUUID(),
signal.providerId,
signal.eventId,
signal.sportType,
signal.leagueName,
signal.homeTeam,
signal.awayTeam,
new Date(signal.eventStartTime),
signal.marketType,
signal.selection,
signal.forecast,
signal.lineValue,
signal.odds,
new Date(signal.signalTime),
signal.status,
signal.sourceType,
signal.comment,
signal.published,
signal.dedupeKey,
JSON.stringify(signal.rawPayload)
);
return {
signalId: result[0]?.id || null,
existed: Boolean(existingSignal),
wasPublished: existingSignal?.published === true,
changed: hasMeaningfulSignalChanges(existingSignal, signal)
};
}
export async function syncParserSignalsSnapshot(job: SignalsSnapshotJobData) {
const currentDedupeKeys = job.items.map((signal) => signal.dedupeKey);
const changedSignalIds = await prisma.$transaction(async (tx) => {
const collectedChangedSignalIds: string[] = [];
for (const signal of job.items) {
const result = await upsertSignalWithTransaction(tx, signal);
if (result.signalId && (!result.existed || !result.wasPublished || result.changed)) {
collectedChangedSignalIds.push(result.signalId);
}
}
if (job.syncEligibleBotKeys.length > 0 && currentDedupeKeys.length > 0) {
await tx.$executeRawUnsafe(
`
UPDATE "Signal"
SET
"published" = true,
"status" = CASE
WHEN "status" = 'unpublished'::"SignalStatus" THEN 'pending'::"SignalStatus"
ELSE "status"
END,
"comment" = 'Signal marked inactive by parser',
"rawPayload" = COALESCE("rawPayload", '{}'::jsonb) || '{"parserDetectedInactive":true,"forecastInactive":true,"activeTab":2}'::jsonb,
"updatedAt" = NOW()
WHERE "sourceType" = 'provider'::"SourceType"
AND COALESCE("providerId", '') = $1
AND COALESCE("rawPayload"->>'botKey', '') = ANY($2::text[])
AND NOT ("dedupeKey" = ANY($3::text[]))
AND (
"published" = false
OR COALESCE("rawPayload"->>'forecastInactive', 'false') <> 'true'
OR COALESCE("rawPayload"->>'activeTab', '') <> '2'
OR "status" = 'unpublished'::"SignalStatus"
)
`,
job.providerId,
job.syncEligibleBotKeys,
currentDedupeKeys
);
} else if (job.syncEligibleBotKeys.length > 0) {
await tx.$executeRawUnsafe(
`
UPDATE "Signal"
SET
"published" = true,
"status" = CASE
WHEN "status" = 'unpublished'::"SignalStatus" THEN 'pending'::"SignalStatus"
ELSE "status"
END,
"comment" = 'Signal marked inactive by parser',
"rawPayload" = COALESCE("rawPayload", '{}'::jsonb) || '{"parserDetectedInactive":true,"forecastInactive":true,"activeTab":2}'::jsonb,
"updatedAt" = NOW()
WHERE "sourceType" = 'provider'::"SourceType"
AND COALESCE("providerId", '') = $1
AND COALESCE("rawPayload"->>'botKey', '') = ANY($2::text[])
AND (
"published" = false
OR COALESCE("rawPayload"->>'forecastInactive', 'false') <> 'true'
OR COALESCE("rawPayload"->>'activeTab', '') <> '2'
OR "status" = 'unpublished'::"SignalStatus"
)
`,
job.providerId,
job.syncEligibleBotKeys
);
}
await tx.integrationLog.create({
data: {
provider: job.providerId,
level: "info",
message: "Parser snapshot synchronized from queue",
payload: {
meta: job.meta ?? null,
itemsCount: job.items.length,
syncEligibleBotKeys: job.syncEligibleBotKeys
} as Prisma.InputJsonValue
}
});
return collectedChangedSignalIds;
});
for (const signalId of changedSignalIds) {
await enqueueSignalPush({
type: "signal",
signalId
});
}
return {
processed: job.items.length,
changedSignalIds
};
}

View File

@@ -0,0 +1,99 @@
import { Router } from "express";
import { SubscriptionStatus } from "@prisma/client";
import { prisma } from "../../db/prisma.js";
import { requireAuth, type AuthenticatedRequest } from "../../middleware/auth.js";
import { signalListSchema } from "./signals.schemas.js";
import { listActiveSignalCountsByBot, listSignals } from "./signals.service.js";
export const signalsRouter = Router();
signalsRouter.use(requireAuth);
function withCreatedAtAsSignalTime<T extends { signalTime: Date; createdAt: Date }>(signal: T) {
return {
...signal,
signalTime: signal.createdAt
};
}
function activeSubscriptionWhere(userId: string, botKey: string) {
const now = new Date();
return {
userId,
status: SubscriptionStatus.active,
startsAt: {
lte: now
},
OR: [
{
expiresAt: null
},
{
expiresAt: {
gte: now
}
}
],
bot: {
key: botKey
}
};
}
signalsRouter.get("/", async (req: AuthenticatedRequest, res, next) => {
try {
const filters = signalListSchema.parse(req.query);
const signals = await listSignals(filters, {
userId: req.user?.id,
role: req.user?.role
});
res.json(signals);
} catch (error) {
next(error);
}
});
signalsRouter.get("/active-counts", async (req: AuthenticatedRequest, res, next) => {
try {
const result = await listActiveSignalCountsByBot({
userId: req.user?.id,
role: req.user?.role
});
res.json(result);
} catch (error) {
next(error);
}
});
signalsRouter.get("/:id", async (req: AuthenticatedRequest, res) => {
const signal = await prisma.signal.findUnique({
where: { id: String(req.params.id) },
include: { settlement: true, notifications: true }
});
if (!signal) {
return res.status(404).json({ message: "Сигнал не найден" });
}
if (req.user?.role !== "admin" && signal.sourceType !== "manual") {
const payload = signal.rawPayload && typeof signal.rawPayload === "object"
? (signal.rawPayload as Record<string, unknown>)
: null;
const botKey = payload?.botKey ? String(payload.botKey) : "";
if (!botKey) {
return res.status(403).json({ message: "Нет доступа к сигналу" });
}
const access = await prisma.userBotAccess.findFirst({
where: activeSubscriptionWhere(req.user!.id, botKey)
});
if (!access) {
return res.status(403).json({ message: "Нет доступа к сигналу" });
}
}
return res.json(withCreatedAtAsSignalTime(signal));
});

View File

@@ -0,0 +1,42 @@
import { z } from "zod";
export const signalCreateSchema = z.object({
providerId: z.string().optional().nullable(),
eventId: z.string().min(1),
sportType: z.string().min(1),
leagueName: z.string().min(1),
homeTeam: z.string().min(1),
awayTeam: z.string().min(1),
eventStartTime: z.string().datetime(),
marketType: z.string().min(1),
selection: z.string().min(1),
forecast: z.string().optional().nullable(),
lineValue: z.number().optional().nullable(),
odds: z.number().positive(),
signalTime: z.string().datetime(),
sourceType: z.enum(["manual", "provider"]).default("manual"),
comment: z.string().optional().nullable(),
rawPayload: z.record(z.any()).optional().nullable(),
published: z.boolean().default(true)
});
export const signalUpdateSchema = signalCreateSchema.partial().extend({
status: z.enum(["pending", "win", "lose", "void", "manual_review", "unpublished"]).optional()
});
export const signalStatusSchema = z.object({
status: z.enum(["pending", "win", "lose", "void", "manual_review", "unpublished"]),
explanation: z.string().min(1)
});
export const signalListSchema = z.object({
status: z.string().optional(),
sportType: z.string().optional(),
sourceType: z.string().optional(),
q: z.string().optional(),
published: z.string().optional(),
botKey: z.string().optional(),
activeTab: z.coerce.number().int().min(1).max(2).optional(),
page: z.coerce.number().int().min(1).default(1),
perPage: z.coerce.number().int().min(1).max(200).default(20)
});

View File

@@ -0,0 +1,420 @@
import { Prisma, SignalStatus, SourceType, SubscriptionStatus } from "@prisma/client";
import { prisma } from "../../db/prisma.js";
import { createSignalDedupeKey } from "../../lib/dedupe.js";
import { HttpError } from "../../lib/errors.js";
import { settleSignal } from "../../lib/settlement.js";
type UpsertSignalInput = {
providerId?: string | null;
eventId: string;
sportType: string;
leagueName: string;
homeTeam: string;
awayTeam: string;
eventStartTime: string;
marketType: string;
selection: string;
forecast?: string | null;
lineValue?: number | null;
odds: number;
signalTime: string;
sourceType: "manual" | "provider";
comment?: string | null;
rawPayload?: Record<string, unknown> | null;
published?: boolean;
};
function normalizeJson(value: Record<string, unknown> | null | undefined): Prisma.InputJsonValue | typeof Prisma.JsonNull | undefined {
if (value === undefined) return undefined;
if (value === null) return Prisma.JsonNull;
return value as Prisma.InputJsonObject;
}
function asWhereArray(value: Prisma.SignalWhereInput["AND"]): Prisma.SignalWhereInput[] {
if (!value) return [];
return Array.isArray(value) ? value : [value];
}
const PARSER_INACTIVE_COMMENT = "Signal marked inactive by parser";
function withCreatedAtAsSignalTime<T extends { signalTime: Date; createdAt: Date }>(signal: T) {
return {
...signal,
signalTime: signal.createdAt
};
}
function buildActiveSubscriptionWhere(userId: string): Prisma.UserBotAccessWhereInput {
const now = new Date();
return {
userId,
status: SubscriptionStatus.active,
startsAt: {
lte: now
},
OR: [
{
expiresAt: null
},
{
expiresAt: {
gte: now
}
}
]
};
}
function buildInactiveSignalWhere(): Prisma.SignalWhereInput {
return {
comment: PARSER_INACTIVE_COMMENT
};
}
function buildActiveSignalWhere(): Prisma.SignalWhereInput {
return {
OR: [
{
comment: null
},
{
comment: {
not: PARSER_INACTIVE_COMMENT
}
}
]
};
}
export async function createSignal(input: UpsertSignalInput) {
const dedupeKey = createSignalDedupeKey(input);
try {
return await prisma.signal.create({
data: {
providerId: input.providerId ?? null,
eventId: input.eventId,
sportType: input.sportType,
leagueName: input.leagueName,
homeTeam: input.homeTeam,
awayTeam: input.awayTeam,
eventStartTime: new Date(input.eventStartTime),
marketType: input.marketType,
selection: input.selection,
forecast: input.forecast ?? null,
lineValue: input.lineValue ?? null,
odds: input.odds,
signalTime: new Date(input.signalTime),
status: SignalStatus.pending,
sourceType: input.sourceType as SourceType,
comment: input.comment ?? null,
published: input.published ?? true,
dedupeKey,
rawPayload: normalizeJson(input.rawPayload)
}
});
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002") {
throw new HttpError(409, "Такой сигнал уже существует");
}
throw error;
}
}
export async function updateSignal(id: string, input: Partial<UpsertSignalInput> & { status?: SignalStatus }) {
const current = await prisma.signal.findUnique({ where: { id } });
if (!current) throw new HttpError(404, "Сигнал не найден");
const dedupeKey = createSignalDedupeKey({
providerId: input.providerId ?? current.providerId,
eventId: input.eventId ?? current.eventId,
marketType: input.marketType ?? current.marketType,
selection: input.selection ?? current.selection,
lineValue: input.lineValue ?? current.lineValue
});
return prisma.signal.update({
where: { id },
data: {
providerId: input.providerId ?? undefined,
eventId: input.eventId,
sportType: input.sportType,
leagueName: input.leagueName,
homeTeam: input.homeTeam,
awayTeam: input.awayTeam,
eventStartTime: input.eventStartTime ? new Date(input.eventStartTime) : undefined,
marketType: input.marketType,
selection: input.selection,
forecast: input.forecast === null ? null : input.forecast,
lineValue: input.lineValue === null ? null : input.lineValue,
odds: input.odds,
signalTime: input.signalTime ? new Date(input.signalTime) : undefined,
sourceType: input.sourceType ? (input.sourceType as SourceType) : undefined,
comment: input.comment === null ? null : input.comment,
published: input.published,
status: input.status,
dedupeKey,
rawPayload: normalizeJson(input.rawPayload)
}
});
}
export async function listSignals(filters: {
status?: string;
sportType?: string;
sourceType?: string;
q?: string;
published?: string;
botKey?: string;
activeTab?: number;
page: number;
perPage: number;
}, options: { userId?: string; role?: "admin" | "user" } = {}) {
const publishedFilter = filters.published === undefined ? true : filters.published === "true";
const page = filters.page;
const perPage = filters.perPage;
const where: Prisma.SignalWhereInput = {
status: filters.status as SignalStatus | undefined,
sportType: filters.sportType,
sourceType: filters.sourceType as "manual" | "provider" | undefined,
published: publishedFilter,
OR: filters.q
? [
{ homeTeam: { contains: filters.q, mode: "insensitive" } },
{ awayTeam: { contains: filters.q, mode: "insensitive" } },
{ leagueName: { contains: filters.q, mode: "insensitive" } }
]
: undefined
};
const andConditions: Prisma.SignalWhereInput[] = [];
let activeTabCondition: Prisma.SignalWhereInput | null = null;
if (filters.activeTab !== undefined) {
activeTabCondition = filters.activeTab === 2
? buildInactiveSignalWhere()
: buildActiveSignalWhere();
}
if (filters.botKey) {
andConditions.push({
OR: [
{
rawPayload: {
path: ["botKey"],
equals: filters.botKey
}
},
{
rawPayload: {
path: ["botKey"],
equals: String(filters.botKey)
}
}
]
});
}
if (options.role === "user" && options.userId) {
const accesses = await prisma.userBotAccess.findMany({
where: buildActiveSubscriptionWhere(options.userId),
select: {
bot: {
select: {
key: true
}
}
}
});
const allowedBotKeys = accesses.map((entry) => entry.bot.key);
andConditions.push({
OR: [
{
sourceType: SourceType.manual
},
...allowedBotKeys.flatMap((key) => [
{
rawPayload: {
path: ["botKey"],
equals: key
}
},
{
rawPayload: {
path: ["botKey"],
equals: String(key)
}
}
])
]
});
}
if (activeTabCondition) {
andConditions.push(activeTabCondition);
}
if (andConditions.length > 0) {
where.AND = andConditions;
}
const tabCountAndConditions = activeTabCondition
? andConditions.filter((condition) => condition !== activeTabCondition)
: andConditions;
const tabCountsWhere: Prisma.SignalWhereInput | null = filters.botKey || options.role === "user"
? {
status: undefined,
sportType: filters.sportType,
sourceType: filters.sourceType as "manual" | "provider" | undefined,
published: publishedFilter,
OR: undefined
}
: null;
if (tabCountsWhere && tabCountAndConditions.length > 0) {
tabCountsWhere.AND = tabCountAndConditions;
}
const activeTab1Where: Prisma.SignalWhereInput | null = tabCountsWhere
? {
...tabCountsWhere,
AND: [
...asWhereArray(tabCountsWhere.AND),
buildActiveSignalWhere()
]
}
: null;
const activeTab2Where: Prisma.SignalWhereInput | null = tabCountsWhere
? {
...tabCountsWhere,
AND: [
...asWhereArray(tabCountsWhere.AND),
buildInactiveSignalWhere()
]
}
: null;
const [items, total, allTabTotal, activeTabTotal, inactiveTabTotal] = await Promise.all([
prisma.signal.findMany({
where,
orderBy: [{ eventStartTime: "desc" }, { signalTime: "desc" }],
skip: (page - 1) * perPage,
take: perPage
}),
prisma.signal.count({ where }),
tabCountsWhere ? prisma.signal.count({ where: tabCountsWhere }) : Promise.resolve(0),
activeTab1Where ? prisma.signal.count({ where: activeTab1Where }) : Promise.resolve(0),
activeTab2Where ? prisma.signal.count({ where: activeTab2Where }) : Promise.resolve(0)
]);
return {
items: items.map(withCreatedAtAsSignalTime),
pagination: {
page,
perPage,
total,
totalPages: Math.max(1, Math.ceil(total / perPage))
},
tabCounts: tabCountsWhere
? {
all: allTabTotal,
"1": activeTabTotal,
"2": inactiveTabTotal
}
: undefined
};
}
export async function listActiveSignalCountsByBot(options: { userId?: string; role?: "admin" | "user" } = {}) {
const botWhere: Prisma.BotWhereInput = {
active: true
};
if (options.role === "user" && options.userId) {
botWhere.userAccesses = {
some: buildActiveSubscriptionWhere(options.userId)
};
}
const bots = await prisma.bot.findMany({
where: botWhere,
select: {
id: true,
key: true,
name: true
},
orderBy: {
name: "asc"
}
});
const items = await Promise.all(
bots.map(async (bot) => {
const activeSignals = await prisma.signal.count({
where: {
published: true,
OR: [
{
rawPayload: {
path: ["botKey"],
equals: bot.key
}
},
{
rawPayload: {
path: ["botKey"],
equals: String(bot.key)
}
}
],
AND: [
buildActiveSignalWhere()
]
}
});
return {
botId: bot.id,
botKey: bot.key,
botName: bot.name,
activeSignals
};
})
);
return {
items
};
}
export async function settlePendingSignals() {
const pendingSignals = await prisma.signal.findMany({
where: { status: SignalStatus.pending, published: true }
});
for (const signal of pendingSignals) {
const eventResult = await prisma.eventResult.findUnique({ where: { eventId: signal.eventId } });
if (!eventResult || eventResult.homeScore === null || eventResult.awayScore === null) continue;
const result = settleSignal(signal, {
homeScore: eventResult.homeScore,
awayScore: eventResult.awayScore
});
await prisma.signal.update({ where: { id: signal.id }, data: { status: result } });
await prisma.settlement.upsert({
where: { signalId: signal.id },
update: { result, explanation: `Автоматический settlement по счету ${eventResult.homeScore}:${eventResult.awayScore}` },
create: {
signalId: signal.id,
result,
explanation: `Автоматический settlement по счету ${eventResult.homeScore}:${eventResult.awayScore}`
}
});
}
}

View File

@@ -0,0 +1,296 @@
import { Router } from "express";
import { SubscriptionStatus } from "@prisma/client";
import { prisma } from "../../db/prisma.js";
import { requireAdmin, requireAuth, type AuthenticatedRequest } from "../../middleware/auth.js";
export const usersRouter = Router();
function isSubscriptionActive(access: {
status: SubscriptionStatus;
startsAt: Date;
expiresAt: Date | null;
}) {
const now = Date.now();
const startsAt = access.startsAt.getTime();
const expiresAt = access.expiresAt?.getTime() ?? null;
return access.status === SubscriptionStatus.active && startsAt <= now && (expiresAt === null || expiresAt >= now);
}
async function refreshExpiredSubscriptions(userId?: string) {
await prisma.userBotAccess.updateMany({
where: {
...(userId ? { userId } : {}),
status: SubscriptionStatus.active,
expiresAt: {
lt: new Date()
}
},
data: {
status: SubscriptionStatus.expired
}
});
}
usersRouter.get("/admin/users", requireAuth, requireAdmin, async (req, res) => {
await refreshExpiredSubscriptions();
const query = String(req.query.q ?? "").trim();
const where = {
AND: [
{
NOT: {
email: {
startsWith: "anon+"
}
}
},
{
NOT: {
email: {
endsWith: "@push.local"
}
}
},
...(query
? [
{
email: {
contains: query,
mode: "insensitive" as const
}
}
]
: [])
]
};
const users = await prisma.user.findMany({
where,
include: {
pushSubscriptions: true,
notificationSetting: true,
botAccesses: {
include: {
bot: true
},
orderBy: [
{
status: "asc"
},
{
createdAt: "desc"
}
]
}
},
orderBy: { createdAt: "desc" }
});
res.json(
users.map((user) => ({
...user,
botAccesses: user.botAccesses.map((access) => ({
...access,
isActiveNow: isSubscriptionActive(access)
}))
}))
);
});
usersRouter.patch("/admin/users/:id/active", requireAuth, requireAdmin, async (req, res) => {
const user = await prisma.user.update({
where: { id: String(req.params.id) },
data: { active: Boolean(req.body.active) }
});
res.json(user);
});
usersRouter.get("/admin/bots", requireAuth, requireAdmin, async (_req, res) => {
const bots = await prisma.bot.findMany({
where: { active: true },
orderBy: { name: "asc" }
});
res.json(bots);
});
usersRouter.put("/admin/users/:id/bot-access", requireAuth, requireAdmin, async (req: AuthenticatedRequest, res) => {
const userId = String(req.params.id);
const requestedBotIds = Array.isArray(req.body?.botIds) ? req.body.botIds.map((entry: unknown) => String(entry)) : [];
const availableBots = await prisma.bot.findMany({
where: {
id: {
in: requestedBotIds
}
},
select: {
id: true
}
});
const validBotIds = availableBots.map((bot) => bot.id);
await prisma.$transaction(async (tx) => {
await tx.userBotAccess.updateMany({
where: {
userId,
botId: {
notIn: validBotIds.length > 0 ? validBotIds : ["__none__"]
}
},
data: {
status: SubscriptionStatus.canceled,
expiresAt: new Date()
}
});
for (const botId of validBotIds) {
await tx.userBotAccess.upsert({
where: {
userId_botId: {
userId,
botId
}
},
update: {
status: SubscriptionStatus.active,
startsAt: new Date(),
expiresAt: null,
notes: null,
grantedById: req.user!.id
},
create: {
userId,
botId,
grantedById: req.user!.id,
status: SubscriptionStatus.active,
startsAt: new Date()
}
});
}
});
const updatedUser = await prisma.user.findUnique({
where: { id: userId },
include: {
pushSubscriptions: true,
notificationSetting: true,
botAccesses: {
include: {
bot: true
},
orderBy: {
bot: {
name: "asc"
}
}
}
}
});
res.json(
updatedUser
? {
...updatedUser,
botAccesses: updatedUser.botAccesses.map((access) => ({
...access,
isActiveNow: isSubscriptionActive(access)
}))
}
: null
);
});
usersRouter.patch("/admin/users/:id/subscriptions/:botId", requireAuth, requireAdmin, async (req: AuthenticatedRequest, res) => {
const userId = String(req.params.id);
const botId = String(req.params.botId);
const status = String(req.body?.status ?? SubscriptionStatus.active) as SubscriptionStatus;
const startsAt = req.body?.startsAt ? new Date(String(req.body.startsAt)) : new Date();
const expiresAt = req.body?.expiresAt ? new Date(String(req.body.expiresAt)) : null;
const notes = typeof req.body?.notes === "string" ? req.body.notes.trim() || null : null;
if (!Object.values(SubscriptionStatus).includes(status)) {
return res.status(400).json({ message: "Некорректный статус подписки" });
}
const bot = await prisma.bot.findUnique({
where: { id: botId },
select: { id: true }
});
if (!bot) {
return res.status(404).json({ message: "Бот не найден" });
}
if (Number.isNaN(startsAt.getTime()) || (expiresAt && Number.isNaN(expiresAt.getTime()))) {
return res.status(400).json({ message: "Некорректная дата подписки" });
}
if (expiresAt && expiresAt < startsAt) {
return res.status(400).json({ message: "Дата окончания не может быть раньше даты начала" });
}
const access = await prisma.userBotAccess.upsert({
where: {
userId_botId: {
userId,
botId
}
},
update: {
status,
startsAt,
expiresAt,
notes,
grantedById: req.user!.id,
grantedAt: new Date()
},
create: {
userId,
botId,
status,
startsAt,
expiresAt,
notes,
grantedById: req.user!.id
},
include: {
bot: true
}
});
res.json({
...access,
isActiveNow: isSubscriptionActive(access)
});
});
usersRouter.get("/me/subscriptions", requireAuth, async (req: AuthenticatedRequest, res) => {
await refreshExpiredSubscriptions(req.user!.id);
const subscriptions = await prisma.userBotAccess.findMany({
where: {
userId: req.user!.id
},
include: {
bot: true
},
orderBy: [
{
status: "asc"
},
{
startsAt: "desc"
}
]
});
res.json(
subscriptions.map((subscription) => ({
...subscription,
isActiveNow: isSubscriptionActive(subscription)
}))
);
});

View File

@@ -0,0 +1,29 @@
export type ProviderSignal = {
providerId: string;
eventId: string;
sportType: string;
leagueName: string;
homeTeam: string;
awayTeam: string;
eventStartTime: string;
marketType: string;
selection: string;
lineValue?: number | null;
odds: number;
signalTime: string;
rawPayload?: Record<string, unknown>;
};
export interface SignalProvider {
providerName: string;
fetchSignals(): Promise<ProviderSignal[]>;
}
export class StubProvider implements SignalProvider {
providerName = "stub";
async fetchSignals() {
return [];
}
}

View File

@@ -0,0 +1,16 @@
import { env } from "../../config/env.js";
import { settlePendingSignals } from "../signals/signals.service.js";
export function startSettlementWorker() {
const run = async () => {
try {
await settlePendingSignals();
} catch (error) {
console.error("Settlement worker error", error);
}
};
void run();
return setInterval(run, env.SETTLEMENT_INTERVAL_MS);
}

2
backend/src/types/web-push.d.ts vendored Normal file
View File

@@ -0,0 +1,2 @@
declare module "web-push";

View File

@@ -0,0 +1,59 @@
import { Worker } from "bullmq";
import { prisma } from "../db/prisma.js";
import { closeRedisConnection, redisConnection } from "../lib/redis.js";
import {
PUSH_BROADCAST_JOB_NAME,
PUSH_QUEUE_NAME,
PUSH_SIGNAL_JOB_NAME
} from "../modules/queues/queue.constants.js";
import type { PushJobData } from "../modules/queues/queue.types.js";
import { broadcastNotification, notifyUsersForSignal } from "../modules/push/push.service.js";
import { env } from "../config/env.js";
const worker = new Worker<PushJobData>(
PUSH_QUEUE_NAME,
async (job) => {
if (job.name === PUSH_SIGNAL_JOB_NAME && job.data.type === "signal") {
return notifyUsersForSignal(job.data.signalId, { force: job.data.force });
}
if (job.name === PUSH_BROADCAST_JOB_NAME && job.data.type === "broadcast") {
return broadcastNotification({
title: job.data.title,
body: job.data.body,
url: job.data.url,
tag: job.data.tag,
renotify: job.data.renotify
});
}
throw new Error(`Unsupported push job: ${job.name}`);
},
{
connection: redisConnection,
concurrency: env.PUSH_WORKER_CONCURRENCY
}
);
worker.on("completed", (job) => {
console.log(`[push-worker] completed ${job.id}`);
});
worker.on("failed", (job, error) => {
console.error(`[push-worker] failed ${job?.id ?? "unknown"}`, error);
});
async function shutdown() {
await worker.close();
await closeRedisConnection();
await prisma.$disconnect();
process.exit(0);
}
process.on("SIGINT", () => {
void shutdown();
});
process.on("SIGTERM", () => {
void shutdown();
});

View File

@@ -0,0 +1,47 @@
import { Worker } from "bullmq";
import { env } from "../config/env.js";
import { prisma } from "../db/prisma.js";
import { closeRedisConnection, redisConnection } from "../lib/redis.js";
import {
SIGNALS_QUEUE_NAME,
SIGNALS_SNAPSHOT_JOB_NAME
} from "../modules/queues/queue.constants.js";
import { syncParserSignalsSnapshot } from "../modules/signals/parser-signals.service.js";
const worker = new Worker(
SIGNALS_QUEUE_NAME,
async (job) => {
if (job.name !== SIGNALS_SNAPSHOT_JOB_NAME) {
throw new Error(`Unsupported signals job: ${job.name}`);
}
return syncParserSignalsSnapshot(job.data);
},
{
connection: redisConnection,
concurrency: env.SIGNALS_WORKER_CONCURRENCY
}
);
worker.on("completed", (job) => {
console.log(`[signals-worker] completed ${job.id}`);
});
worker.on("failed", (job, error) => {
console.error(`[signals-worker] failed ${job?.id ?? "unknown"}`, error);
});
async function shutdown() {
await worker.close();
await closeRedisConnection();
await prisma.$disconnect();
process.exit(0);
}
process.on("SIGINT", () => {
void shutdown();
});
process.on("SIGTERM", () => {
void shutdown();
});