remove scrobbledPlays and prepare queue next query

This commit is contained in:
FoxxMD 2026-04-30 13:09:26 +00:00
parent b8494986d5
commit cfb4131d5d
2 changed files with 55 additions and 30 deletions

View file

@ -8,7 +8,7 @@ import { PlayNew, PlaySelect, PlayInputNew, FindWhere, FindMany, CompareOpKey, Q
import { MarkOptional, MarkRequired, PathValue } from "ts-essentials";
import { genGroupIdStrFromPlay, removeEmptyArrays, removeUndefinedKeys } from "../../../../utils.js";
import dayjs, { Dayjs } from "dayjs";
import { RelationsFieldFilter, eq, inArray, ne, notInArray, desc, asc, and, sql } from "drizzle-orm";
import { RelationsFieldFilter, eq, inArray, ne, notInArray, desc, asc, and, sql, Placeholder } from "drizzle-orm";
import { CompactableProperty, RetentionOptions, retentionPlayTypes } from "../../../infrastructure/config/database.js";
import { shortTodayAwareFormat } from "../../../../../core/TimeUtils.js";
import { buildDateCompare, CompareDateOp, ComponentConstrainedRepoOpts, DrizzleBaseRepository, DrizzleRepositoryOpts, PaginatedQueryResponse, PaginatedResponse } from "./BaseRepository.js";
@ -53,6 +53,7 @@ export type RepositoryCreatePlayOpts = PlayEntityOpts
export class DrizzlePlayRepository extends DrizzleBaseRepository<'plays'> {
protected hasQueueNextPrepared?: ReturnType<typeof this.prepareHasQueueNext>
protected getQueueNextPrepared?: ReturnType<typeof this.prepareGetQueueNext>
constructor(db: ReturnType<typeof getDb>, opts: DrizzleRepositoryOpts = {}) {
super(db, 'plays', 'Plays', opts);
@ -389,41 +390,67 @@ export class DrizzlePlayRepository extends DrizzleBaseRepository<'plays'> {
return recentPlatformIds.map(x => x.platformId);
}
protected prepareGetQueueNext = () => this.db.query.plays.findFirst({
where: {
componentId: sql.placeholder('componentId'),
queueStates: {
queueName: sql.placeholder('queueName'),
queueStatus: 'queued',
retries: {
lte: sql.placeholder('retries')
}
},
},
with: {
queueStates: true
},
orderBy: {
seenAt: 'asc'
},
}).prepare()
public getQueueNext = async (queueName: string, opts: {order?: 'asc' | 'desc', retries?: number} & ComponentConstrainedRepoOpts = {}): Promise<MarkRequired<PlaySelectRel, 'queueStates'> | undefined> => {
const {
retries,
retries = 0,
order = 'asc',
componentId = this.componentId
} = opts;
let where: FindWhere<'plays'> = {
componentId
// let where: FindWhere<'plays'> = {
// componentId
// }
// if(retries !== undefined) {
// where.queueStates = {
// queueName,
// queueStatus: 'queued',
// retries: {
// lte: retries
// }
// }
// } else {
// where.queueStates = {
// queueName,
// queueStatus: 'queued'
// }
// }
// const res = await this.db.query.plays.findFirst({
// where: where,
// orderBy: {
// seenAt: order
// },
// with: {
// queueStates: true
// }
// });
if(this.getQueueNextPrepared === undefined) {
this.getQueueNextPrepared = this.prepareGetQueueNext();
}
if(retries !== undefined) {
where.queueStates = {
queueName,
queueStatus: 'queued',
retries: {
lte: retries
}
}
} else {
where.queueStates = {
queueName,
queueStatus: 'queued'
}
}
const res = await this.db.query.plays.findFirst({
where: where,
orderBy: {
seenAt: order
},
with: {
queueStates: true
}
});
const res = await this.getQueueNextPrepared.execute({queueName, retries, componentId});
if(res === undefined) {
return undefined;
}

View file

@ -96,7 +96,6 @@ export default abstract class AbstractScrobbleClient extends AbstractComponent i
protected MAX_INITIAL_SCROBBLES_FETCH = this.MAX_STORED_SCROBBLES;
scrobbleSOTRanges: PaginatedTimeRangeOptions[] = [];
scrobbledPlayObjs: FixedSizeList<ScrobbledPlayObject>;
tracksScrobbled: number = 0;
lastScrobbleAttempt: Dayjs = dayjs(0)
@ -170,7 +169,6 @@ export default abstract class AbstractScrobbleClient extends AbstractComponent i
this.deadLogger = childLogger(this.logger, CLIENT_DEAD_QUEUE);
this.notifier = notifier;
this.emitter = emitter;
this.scrobbledPlayObjs = new FixedSizeList<ScrobbledPlayObject>(this.MAX_STORED_SCROBBLES);
this.scrobbleQueue = new Queue<{payload: QueuedScrobble<PlayObject>}, {scrobbled: ScrobbledPlayObject}>({
storage: new MemoryStorage(),
concurrency: 1