mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-26 07:44:05 +00:00
Major additions: - Complete Next.js studio application with 1600+ components - Docker support (Dockerfile.combined, docker-compose.yml) - GCP deployment documentation and benchmarks - SQL benchmark scripts for performance testing - Sentry integration for monitoring - Comprehensive test suite and mocks Studio features: - Dashboard and admin interfaces - Data visualization components - Authentication and user management - API integration with RuVector backend - Static data and public assets 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
111 lines
3.7 KiB
TypeScript
111 lines
3.7 KiB
TypeScript
import { useInfiniteQuery } from '@tanstack/react-query'
|
|
import dayjs from 'dayjs'
|
|
import { last } from 'lodash'
|
|
|
|
import { isQueueNameValid } from 'components/interfaces/Integrations/Queues/Queues.utils'
|
|
import { QUEUE_MESSAGE_TYPE } from 'components/interfaces/Integrations/Queues/SingleQueue/Queue.utils'
|
|
import { executeSql } from 'data/sql/execute-sql-query'
|
|
import { DATE_FORMAT } from 'lib/constants'
|
|
import type { ResponseError, UseCustomInfiniteQueryOptions } from 'types'
|
|
import { databaseQueuesKeys } from './keys'
|
|
|
|
export type DatabaseQueueVariables = {
|
|
projectRef?: string
|
|
connectionString?: string | null
|
|
queueName: string
|
|
status: QUEUE_MESSAGE_TYPE[]
|
|
}
|
|
|
|
export type PostgresQueueMessage = {
|
|
msg_id: number
|
|
read_ct: number
|
|
enqueued_at: string
|
|
archived_at: string
|
|
vt: Date
|
|
message: Record<string, never>
|
|
}
|
|
|
|
export const QUEUE_MESSAGES_PAGE_SIZE = 30
|
|
|
|
export async function getDatabaseQueue({
|
|
projectRef,
|
|
connectionString,
|
|
queueName,
|
|
afterTimestamp,
|
|
status,
|
|
}: DatabaseQueueVariables & { afterTimestamp: string | undefined }) {
|
|
if (!projectRef) throw new Error('Project ref is required')
|
|
if (!isQueueNameValid(queueName)) {
|
|
throw new Error(
|
|
'Invalid queue name: must contain only alphanumeric characters, underscores, and hyphens'
|
|
)
|
|
}
|
|
|
|
if (status.length === 0) {
|
|
return []
|
|
}
|
|
|
|
// handles when scheduled and available are deselected
|
|
let queueQuery = ``
|
|
if (status.includes('available') && status.includes('scheduled')) {
|
|
queueQuery = `SELECT msg_id, enqueued_at, read_ct, vt, message, NULL as archived_at FROM "pgmq"."q_${queueName}"`
|
|
} else if (status.includes('available') && !status.includes('scheduled')) {
|
|
queueQuery = `SELECT msg_id, enqueued_at, read_ct, vt, message, NULL as archived_at FROM "pgmq"."q_${queueName}" WHERE vt < '${dayjs(new Date()).format(DATE_FORMAT)}'`
|
|
} else if (!status.includes('available') && status.includes('scheduled')) {
|
|
queueQuery = `SELECT msg_id, enqueued_at, read_ct, vt, message, NULL as archived_at FROM "pgmq"."q_${queueName}" WHERE vt > '${dayjs(new Date()).format(DATE_FORMAT)}'`
|
|
}
|
|
|
|
let archivedQuery = ``
|
|
if (status.includes('archived')) {
|
|
archivedQuery = `SELECT msg_id, enqueued_at, read_ct, vt, message, archived_at FROM "pgmq"."a_${queueName}"`
|
|
}
|
|
|
|
let query = `SELECT
|
|
*
|
|
FROM
|
|
(
|
|
${[queueQuery, archivedQuery].filter(Boolean).join(' UNION ALL ')}
|
|
) AS combined`
|
|
if (afterTimestamp) {
|
|
query += ` WHERE enqueued_at > '${afterTimestamp}'`
|
|
}
|
|
|
|
const { result } = await executeSql({
|
|
projectRef,
|
|
connectionString,
|
|
sql: `${query} order by enqueued_at LIMIT ${QUEUE_MESSAGES_PAGE_SIZE}`,
|
|
})
|
|
return result as DatabaseQueueData
|
|
}
|
|
|
|
export type DatabaseQueueData = PostgresQueueMessage[]
|
|
export type DatabaseQueueError = ResponseError
|
|
|
|
export const useQueueMessagesInfiniteQuery = <TData = DatabaseQueueData>(
|
|
{ projectRef, connectionString, queueName, status }: DatabaseQueueVariables,
|
|
{
|
|
enabled = true,
|
|
...options
|
|
}: UseCustomInfiniteQueryOptions<DatabaseQueueData, DatabaseQueueError, TData> = {}
|
|
) =>
|
|
useInfiniteQuery<DatabaseQueueData, DatabaseQueueError, TData>({
|
|
queryKey: databaseQueuesKeys.getMessagesInfinite(projectRef, queueName, { status }),
|
|
queryFn: ({ pageParam }) => {
|
|
return getDatabaseQueue({
|
|
projectRef,
|
|
connectionString,
|
|
queueName,
|
|
afterTimestamp: pageParam,
|
|
status,
|
|
})
|
|
},
|
|
staleTime: 0,
|
|
enabled: enabled && typeof projectRef !== 'undefined',
|
|
|
|
getNextPageParam(lastPage) {
|
|
const hasNextPage = lastPage.length <= QUEUE_MESSAGES_PAGE_SIZE
|
|
if (!hasNextPage) return undefined
|
|
return last(lastPage)?.enqueued_at
|
|
},
|
|
...options,
|
|
})
|