diff --git a/docs/developers/development/telemetry.md b/docs/developers/development/telemetry.md index 442d34b52..0db11ada1 100644 --- a/docs/developers/development/telemetry.md +++ b/docs/developers/development/telemetry.md @@ -58,20 +58,36 @@ observability framework — Qwen Code's observability system provides: All telemetry behavior is controlled through your `.qwen/settings.json` file. These settings can be overridden by environment variables or CLI flags. -| Setting | Environment Variable | CLI Flag | Description | Values | Default | -| -------------- | ------------------------------ | -------------------------------------------------------- | ------------------------------------------------- | ----------------- | ----------------------- | -| `enabled` | `QWEN_TELEMETRY_ENABLED` | `--telemetry` / `--no-telemetry` | Enable or disable telemetry | `true`/`false` | `false` | -| `target` | `QWEN_TELEMETRY_TARGET` | `--telemetry-target ` | Where to send telemetry data | `"gcp"`/`"local"` | `"local"` | -| `otlpEndpoint` | `QWEN_TELEMETRY_OTLP_ENDPOINT` | `--telemetry-otlp-endpoint ` | OTLP collector endpoint | URL string | `http://localhost:4317` | -| `otlpProtocol` | `QWEN_TELEMETRY_OTLP_PROTOCOL` | `--telemetry-otlp-protocol ` | OTLP transport protocol | `"grpc"`/`"http"` | `"grpc"` | -| `outfile` | `QWEN_TELEMETRY_OUTFILE` | `--telemetry-outfile ` | Save telemetry to file (overrides `otlpEndpoint`) | file path | - | -| `logPrompts` | `QWEN_TELEMETRY_LOG_PROMPTS` | `--telemetry-log-prompts` / `--no-telemetry-log-prompts` | Include prompts in telemetry logs | `true`/`false` | `true` | -| `useCollector` | `QWEN_TELEMETRY_USE_COLLECTOR` | - | Use external OTLP collector (advanced) | `true`/`false` | `false` | +| Setting | Environment Variable | CLI Flag | Description | Values | Default | +| --------------------- | -------------------------------------- | -------------------------------------------------------- | ---------------------------------------------------- | ----------------- | ----------------------- | +| `enabled` | `QWEN_TELEMETRY_ENABLED` | `--telemetry` / `--no-telemetry` | Enable or disable telemetry | `true`/`false` | `false` | +| `target` | `QWEN_TELEMETRY_TARGET` | `--telemetry-target ` | Where to send telemetry data | `"gcp"`/`"local"` | `"local"` | +| `otlpEndpoint` | `QWEN_TELEMETRY_OTLP_ENDPOINT` | `--telemetry-otlp-endpoint ` | OTLP collector endpoint | URL string | `http://localhost:4317` | +| `otlpProtocol` | `QWEN_TELEMETRY_OTLP_PROTOCOL` | `--telemetry-otlp-protocol ` | OTLP transport protocol | `"grpc"`/`"http"` | `"grpc"` | +| `otlpTracesEndpoint` | `QWEN_TELEMETRY_OTLP_TRACES_ENDPOINT` | - | Per-signal endpoint override for traces (HTTP only) | URL string | - | +| `otlpLogsEndpoint` | `QWEN_TELEMETRY_OTLP_LOGS_ENDPOINT` | - | Per-signal endpoint override for logs (HTTP only) | URL string | - | +| `otlpMetricsEndpoint` | `QWEN_TELEMETRY_OTLP_METRICS_ENDPOINT` | - | Per-signal endpoint override for metrics (HTTP only) | URL string | - | +| `outfile` | `QWEN_TELEMETRY_OUTFILE` | `--telemetry-outfile ` | Save telemetry to file (overrides `otlpEndpoint`) | file path | - | +| `logPrompts` | `QWEN_TELEMETRY_LOG_PROMPTS` | `--telemetry-log-prompts` / `--no-telemetry-log-prompts` | Include prompts in telemetry logs | `true`/`false` | `true` | +| `useCollector` | `QWEN_TELEMETRY_USE_COLLECTOR` | - | Use external OTLP collector (advanced) | `true`/`false` | `false` | **Note on boolean environment variables:** For the boolean settings (`enabled`, `logPrompts`, `useCollector`), setting the corresponding environment variable to `true` or `1` will enable the feature. Any other value will disable it. +**HTTP OTLP signal routing:** When using HTTP protocol (`otlpProtocol: "http"`), +Qwen Code automatically appends signal-specific paths (`/v1/traces`, `/v1/logs`, +`/v1/metrics`) to the base `otlpEndpoint`. For example, `http://collector:4318` +becomes `http://collector:4318/v1/traces` for traces. If the URL already ends +with a signal path, it is used as-is. Per-signal endpoint overrides +(`otlpTracesEndpoint`, etc.) take precedence over the base endpoint and are used +verbatim. gRPC protocol uses service-based routing and does not append paths. + +The per-signal endpoint environment variables also accept the standard +OpenTelemetry names: `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`, +`OTEL_EXPORTER_OTLP_LOGS_ENDPOINT`, `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`. +The `QWEN_TELEMETRY_OTLP_*` variants take precedence over the `OTEL_*` variants. + For detailed information about all configuration options, see the [Configuration Guide](./cli/configuration.md). @@ -91,6 +107,9 @@ sent to Alibaba Cloud. 1. Enable telemetry in your `.qwen/settings.json` and set the OTLP endpoint: + + **Option A: gRPC protocol** (standard OTLP endpoint): + ```json { "telemetry": { @@ -101,6 +120,29 @@ sent to Alibaba Cloud. } } ``` + + **Option B: HTTP protocol with per-signal endpoints** (for backends + that use non-standard paths, e.g., `/api/otlp/traces` instead of + `/v1/traces`): + + ```json + { + "telemetry": { + "enabled": true, + "otlpProtocol": "http", + "otlpTracesEndpoint": "http:////api/otlp/traces", + "otlpLogsEndpoint": "http:////api/otlp/logs", + "otlpMetricsEndpoint": "http:////api/otlp/metrics" + } + } + ``` + + > **Note:** When using HTTP protocol with only `otlpEndpoint` (no + > per-signal overrides), Qwen Code appends standard OTLP paths + > (`/v1/traces`, `/v1/logs`, `/v1/metrics`) to the base URL. If your + > backend uses different paths, use per-signal endpoint overrides as + > shown in Option B. + 2. If your Alibaba Cloud endpoint requires authentication, provide OTLP headers through standard OpenTelemetry environment variables such as `OTEL_EXPORTER_OTLP_HEADERS` (or the signal-specific variants). Qwen diff --git a/packages/cli/src/gemini.test.tsx b/packages/cli/src/gemini.test.tsx index 615e99ace..6820cbd61 100644 --- a/packages/cli/src/gemini.test.tsx +++ b/packages/cli/src/gemini.test.tsx @@ -493,11 +493,19 @@ describe('gemini.tsx main function kitty protocol', () => { let setRawModeSpy: MockInstance< (mode: boolean) => NodeJS.ReadStream & { fd: 0 } >; + let initialSigintListeners: NodeJS.SignalsListener[]; + let initialSigtermListeners: NodeJS.SignalsListener[]; beforeEach(() => { // Set no relaunch in tests since process spawning causing issues in tests originalEnvNoRelaunch = process.env['QWEN_CODE_NO_RELAUNCH']; process.env['QWEN_CODE_NO_RELAUNCH'] = 'true'; + initialSigintListeners = process.listeners( + 'SIGINT', + ) as NodeJS.SignalsListener[]; + initialSigtermListeners = process.listeners( + 'SIGTERM', + ) as NodeJS.SignalsListener[]; // eslint-disable-next-line @typescript-eslint/no-explicit-any if (!(process.stdin as any).setRawMode) { @@ -517,12 +525,24 @@ describe('gemini.tsx main function kitty protocol', () => { }); afterEach(() => { + for (const listener of process.listeners('SIGINT')) { + if (!initialSigintListeners.includes(listener)) { + process.removeListener('SIGINT', listener as NodeJS.SignalsListener); + } + } + for (const listener of process.listeners('SIGTERM')) { + if (!initialSigtermListeners.includes(listener)) { + process.removeListener('SIGTERM', listener as NodeJS.SignalsListener); + } + } + // Restore original env variables if (originalEnvNoRelaunch !== undefined) { process.env['QWEN_CODE_NO_RELAUNCH'] = originalEnvNoRelaunch; } else { delete process.env['QWEN_CODE_NO_RELAUNCH']; } + vi.restoreAllMocks(); }); it('should call setRawMode and detectAndEnableKittyProtocol when isInteractive is true', async () => { @@ -618,6 +638,79 @@ describe('gemini.tsx main function kitty protocol', () => { expect(setRawModeSpy).toHaveBeenCalledWith(true); expect(detectAndEnableKittyProtocol).toHaveBeenCalledTimes(1); }); + + it('should run cleanup before exiting on interactive SIGINT', async () => { + const { loadCliConfig, parseArguments } = await import( + './config/config.js' + ); + const { loadSettings } = await import('./config/settings.js'); + const cleanupModule = await import('./utils/cleanup.js'); + const signalHandlers = new Map void>(); + const processOnceSpy = vi.spyOn(process, 'once').mockImplementation((( + eventName: string | symbol, + listener: (...args: unknown[]) => void, + ) => { + if (eventName === 'SIGTERM' || eventName === 'SIGINT') { + signalHandlers.set(eventName, listener); + } + return process; + }) as typeof process.once); + const processExitSpy = vi + .spyOn(process, 'exit') + .mockImplementation((() => undefined) as unknown as typeof process.exit); + const runExitCleanupMock = vi.mocked(cleanupModule.runExitCleanup); + runExitCleanupMock.mockResolvedValue(undefined); + + vi.mocked(loadCliConfig).mockResolvedValue({ + isInteractive: () => true, + getQuestion: () => '', + getSandbox: () => false, + getDebugMode: () => false, + getListExtensions: () => false, + getMcpServers: () => ({}), + initialize: vi.fn(), + getIdeMode: () => false, + getExperimentalZedIntegration: () => false, + getScreenReader: () => false, + getGeminiMdFileCount: () => 0, + getWarnings: () => [], + getModelsConfig: () => ({ + getCurrentAuthType: () => null, + getGenerationConfig: () => ({}), + }), + getProxy: () => undefined, + getUsageStatisticsEnabled: () => true, + getSessionId: () => 'test-session-id', + } as unknown as Config); + vi.mocked(loadSettings).mockReturnValue({ + errors: [], + merged: { + advanced: {}, + security: { auth: {} }, + ui: {}, + }, + setValue: vi.fn(), + forScope: () => ({ settings: {}, originalSettings: {}, path: '' }), + migrationWarnings: [], + getUserHooks: () => undefined, + getProjectHooks: () => undefined, + } as never); + vi.mocked(parseArguments).mockResolvedValue({ + extensions: undefined, + } as never); + + await main(); + signalHandlers.get('SIGINT')?.(); + await Promise.resolve(); + await Promise.resolve(); + + expect(setRawModeSpy).toHaveBeenCalledWith(false); + expect(runExitCleanupMock).toHaveBeenCalledTimes(1); + expect(processExitSpy).toHaveBeenCalledWith(130); + + processOnceSpy.mockRestore(); + processExitSpy.mockRestore(); + }); }); describe('validateDnsResolutionOrder', () => { diff --git a/packages/cli/src/gemini.tsx b/packages/cli/src/gemini.tsx index 1add91580..1020f65e9 100644 --- a/packages/cli/src/gemini.tsx +++ b/packages/cli/src/gemini.tsx @@ -164,6 +164,48 @@ ${reason.stack}` }); } +function getSignalExitCode(signal: NodeJS.Signals): number { + return signal === 'SIGINT' ? 130 : 143; +} + +function installInteractiveSignalHandlers(wasRaw: boolean): () => void { + let cleanupStarted = false; + + const handleSignal = (signal: NodeJS.Signals) => { + if (process.stdin.isTTY) { + process.stdin.setRawMode(wasRaw); + } + + if (cleanupStarted) { + return; + } + cleanupStarted = true; + + void runExitCleanup() + .catch((error) => { + debugLogger.error(`Error during ${signal} cleanup:`, error); + }) + .finally(() => { + process.exit(getSignalExitCode(signal)); + }); + }; + + const handleSigterm = () => { + handleSignal('SIGTERM'); + }; + const handleSigint = () => { + handleSignal('SIGINT'); + }; + + process.once('SIGTERM', handleSigterm); + process.once('SIGINT', handleSigint); + + return () => { + process.removeListener('SIGTERM', handleSigterm); + process.removeListener('SIGINT', handleSigint); + }; +} + export async function startInteractiveUI( config: Config, settings: LoadedSettings, @@ -559,6 +601,9 @@ export async function main() { const wasRaw = process.stdin.isRaw; let kittyProtocolDetectionComplete: Promise | undefined; let themeAutoDetectionComplete: Promise | undefined; + if (config.isInteractive()) { + registerCleanup(installInteractiveSignalHandlers(wasRaw)); + } if (config.isInteractive() && !wasRaw && process.stdin.isTTY) { // Set this as early as possible to avoid spurious characters from // input showing up in the output. @@ -569,14 +614,6 @@ export async function main() { // Ensure the stdin listener is removed on any exit path (error, signal, etc.) registerCleanup(() => stopAndGetCapturedInput()); - // This cleanup isn't strictly needed but may help in certain situations. - process.on('SIGTERM', () => { - process.stdin.setRawMode(wasRaw); - }); - process.on('SIGINT', () => { - process.stdin.setRawMode(wasRaw); - }); - // Detect and enable Kitty keyboard protocol once at startup. kittyProtocolDetectionComplete = detectAndEnableKittyProtocol(); diff --git a/packages/core/src/config/config.test.ts b/packages/core/src/config/config.test.ts index 7ff007ce9..0ee260852 100644 --- a/packages/core/src/config/config.test.ts +++ b/packages/core/src/config/config.test.ts @@ -15,6 +15,8 @@ import { DEFAULT_TELEMETRY_TARGET, DEFAULT_OTLP_ENDPOINT, QwenLogger, + isTelemetrySdkInitialized, + shutdownTelemetry, } from '../telemetry/index.js'; import type { ContentGenerator, @@ -177,6 +179,8 @@ vi.mock('../telemetry/index.js', async (importOriginal) => { return { ...actual, initializeTelemetry: vi.fn(), + isTelemetrySdkInitialized: vi.fn(() => false), + shutdownTelemetry: vi.fn().mockResolvedValue(undefined), uiTelemetryService: { getLastPromptTokenCount: vi.fn(), }, @@ -275,6 +279,7 @@ describe('Server Config (config.ts)', () => { beforeEach(() => { // Reset mocks if necessary vi.clearAllMocks(); + vi.mocked(isTelemetrySdkInitialized).mockReturnValue(false); vi.spyOn(QwenLogger.prototype, 'logStartSessionEvent').mockImplementation( async () => undefined, ); @@ -908,6 +913,32 @@ describe('Server Config (config.ts)', () => { expect(config.getTelemetryEnabled()).toBe(true); }); + it('Config shutdown should flush telemetry when SDK is initialized', async () => { + const paramsWithTelemetry: ConfigParameters = { + ...baseParams, + telemetry: { enabled: true }, + }; + vi.mocked(isTelemetrySdkInitialized).mockReturnValue(true); + const config = new Config(paramsWithTelemetry); + + await config.shutdown(); + + expect(shutdownTelemetry).toHaveBeenCalledTimes(1); + }); + + it('Config shutdown should skip telemetry shutdown before SDK initialization', async () => { + const paramsWithTelemetry: ConfigParameters = { + ...baseParams, + telemetry: { enabled: true }, + }; + vi.mocked(isTelemetrySdkInitialized).mockReturnValue(false); + const config = new Config(paramsWithTelemetry); + + await config.shutdown(); + + expect(shutdownTelemetry).not.toHaveBeenCalled(); + }); + it('Config constructor should set telemetry to false when provided as false', () => { const paramsWithTelemetry: ConfigParameters = { ...baseParams, @@ -1083,6 +1114,41 @@ describe('Server Config (config.ts)', () => { }); }); + describe('Per-Signal OTLP Endpoint Configuration', () => { + it('should return per-signal endpoints when provided', () => { + const params: ConfigParameters = { + ...baseParams, + telemetry: { + enabled: true, + otlpTracesEndpoint: 'http://traces:4318/v1/traces', + otlpLogsEndpoint: 'http://logs:4318/v1/logs', + otlpMetricsEndpoint: 'http://metrics:4318/v1/metrics', + }, + }; + const config = new Config(params); + expect(config.getTelemetryOtlpTracesEndpoint()).toBe( + 'http://traces:4318/v1/traces', + ); + expect(config.getTelemetryOtlpLogsEndpoint()).toBe( + 'http://logs:4318/v1/logs', + ); + expect(config.getTelemetryOtlpMetricsEndpoint()).toBe( + 'http://metrics:4318/v1/metrics', + ); + }); + + it('should return undefined when per-signal endpoints are not provided', () => { + const params: ConfigParameters = { + ...baseParams, + telemetry: { enabled: true }, + }; + const config = new Config(params); + expect(config.getTelemetryOtlpTracesEndpoint()).toBeUndefined(); + expect(config.getTelemetryOtlpLogsEndpoint()).toBeUndefined(); + expect(config.getTelemetryOtlpMetricsEndpoint()).toBeUndefined(); + }); + }); + describe('UseRipgrep Configuration', () => { it('should default useRipgrep to true when not provided', () => { const config = new Config(baseParams); diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index 0f80ca112..0da98109e 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -67,7 +67,9 @@ import { FileReadCache } from '../services/fileReadCache.js'; import { DEFAULT_OTLP_ENDPOINT, DEFAULT_TELEMETRY_TARGET, + isTelemetrySdkInitialized, initializeTelemetry, + shutdownTelemetry, logStartSession, logRipgrepFallback, RipgrepFallbackEvent, @@ -217,6 +219,12 @@ export interface TelemetrySettings { target?: TelemetryTarget; otlpEndpoint?: string; otlpProtocol?: 'grpc' | 'http'; + /** Per-signal endpoint override for traces (HTTP only). Used as-is without path appending. */ + otlpTracesEndpoint?: string; + /** Per-signal endpoint override for logs (HTTP only). Used as-is without path appending. */ + otlpLogsEndpoint?: string; + /** Per-signal endpoint override for metrics (HTTP only). Used as-is without path appending. */ + otlpMetricsEndpoint?: string; logPrompts?: boolean; outfile?: string; useCollector?: boolean; @@ -758,8 +766,11 @@ export class Config { this.telemetrySettings = { enabled: params.telemetry?.enabled ?? false, target: params.telemetry?.target ?? DEFAULT_TELEMETRY_TARGET, - otlpEndpoint: params.telemetry?.otlpEndpoint ?? DEFAULT_OTLP_ENDPOINT, + otlpEndpoint: params.telemetry?.otlpEndpoint, otlpProtocol: params.telemetry?.otlpProtocol, + otlpTracesEndpoint: params.telemetry?.otlpTracesEndpoint, + otlpLogsEndpoint: params.telemetry?.otlpLogsEndpoint, + otlpMetricsEndpoint: params.telemetry?.otlpMetricsEndpoint, logPrompts: params.telemetry?.logPrompts ?? true, outfile: params.telemetry?.outfile, useCollector: params.telemetry?.useCollector, @@ -1636,11 +1647,12 @@ export class Config { * It handles the case where initialization was not completed. */ async shutdown(): Promise { - if (!this.initialized) { - // Nothing to clean up if not initialized - return; - } try { + if (!this.initialized) { + // Nothing else to clean up if not initialized. + return; + } + // Finalize the current session's metadata before cleanup, then drain // the async write queue so no records are lost on exit. try { @@ -1663,6 +1675,10 @@ export class Config { } catch (error) { // Log but don't throw - cleanup should be best-effort this.debugLogger.error('Error during Config shutdown:', error); + } finally { + if (isTelemetrySdkInitialized()) { + await shutdownTelemetry(); + } } } @@ -2003,7 +2019,7 @@ export class Config { return this.telemetrySettings.logPrompts ?? true; } - getTelemetryOtlpEndpoint(): string { + getTelemetryOtlpEndpoint(): string | undefined { return this.telemetrySettings.otlpEndpoint ?? DEFAULT_OTLP_ENDPOINT; } @@ -2011,6 +2027,18 @@ export class Config { return this.telemetrySettings.otlpProtocol ?? 'grpc'; } + getTelemetryOtlpTracesEndpoint(): string | undefined { + return this.telemetrySettings.otlpTracesEndpoint; + } + + getTelemetryOtlpLogsEndpoint(): string | undefined { + return this.telemetrySettings.otlpLogsEndpoint; + } + + getTelemetryOtlpMetricsEndpoint(): string | undefined { + return this.telemetrySettings.otlpMetricsEndpoint; + } + getTelemetryTarget(): TelemetryTarget { return this.telemetrySettings.target ?? DEFAULT_TELEMETRY_TARGET; } diff --git a/packages/core/src/telemetry/config.test.ts b/packages/core/src/telemetry/config.test.ts index 443282fd4..f1c0d3f07 100644 --- a/packages/core/src/telemetry/config.test.ts +++ b/packages/core/src/telemetry/config.test.ts @@ -65,7 +65,12 @@ describe('telemetry/config helpers', () => { useCollector: false, }; const resolved = await resolveTelemetrySettings({ settings }); - expect(resolved).toEqual(settings); + expect(resolved).toEqual({ + ...settings, + otlpTracesEndpoint: undefined, + otlpLogsEndpoint: undefined, + otlpMetricsEndpoint: undefined, + }); }); it('uses env over settings and argv over env', async () => { @@ -102,6 +107,9 @@ describe('telemetry/config helpers', () => { target: TelemetryTarget.GCP, otlpEndpoint: 'http://env:4317', otlpProtocol: 'http', + otlpTracesEndpoint: undefined, + otlpLogsEndpoint: undefined, + otlpMetricsEndpoint: undefined, logPrompts: true, outfile: 'env.log', useCollector: true, @@ -117,6 +125,9 @@ describe('telemetry/config helpers', () => { target: TelemetryTarget.LOCAL, otlpEndpoint: 'http://argv:4317', otlpProtocol: 'grpc', + otlpTracesEndpoint: undefined, + otlpLogsEndpoint: undefined, + otlpMetricsEndpoint: undefined, logPrompts: false, outfile: 'argv.log', useCollector: true, // from env as no argv option @@ -151,5 +162,46 @@ describe('telemetry/config helpers', () => { /Invalid telemetry target/i, ); }); + + it('resolves per-signal endpoints from OTEL_ env vars', async () => { + const env = { + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: 'http://traces:4318/v1/traces', + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: 'http://logs:4318/v1/logs', + } as Record; + + const resolved = await resolveTelemetrySettings({ env }); + expect(resolved.otlpTracesEndpoint).toBe('http://traces:4318/v1/traces'); + expect(resolved.otlpLogsEndpoint).toBe('http://logs:4318/v1/logs'); + expect(resolved.otlpMetricsEndpoint).toBeUndefined(); + }); + + it('QWEN_ env vars take precedence over OTEL_ vars for per-signal endpoints', async () => { + const env = { + QWEN_TELEMETRY_OTLP_TRACES_ENDPOINT: + 'http://qwen-traces:4318/v1/traces', + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: 'http://otel-traces:4318/v1/traces', + } as Record; + + const resolved = await resolveTelemetrySettings({ env }); + expect(resolved.otlpTracesEndpoint).toBe( + 'http://qwen-traces:4318/v1/traces', + ); + }); + + it('resolves per-signal endpoints from settings', async () => { + const settings = { + otlpTracesEndpoint: 'http://traces-settings:4318/v1/traces', + otlpMetricsEndpoint: 'http://metrics-settings:4318/v1/metrics', + }; + + const resolved = await resolveTelemetrySettings({ settings }); + expect(resolved.otlpTracesEndpoint).toBe( + 'http://traces-settings:4318/v1/traces', + ); + expect(resolved.otlpLogsEndpoint).toBeUndefined(); + expect(resolved.otlpMetricsEndpoint).toBe( + 'http://metrics-settings:4318/v1/metrics', + ); + }); }); }); diff --git a/packages/core/src/telemetry/config.ts b/packages/core/src/telemetry/config.ts index f1037e742..267124ed7 100644 --- a/packages/core/src/telemetry/config.ts +++ b/packages/core/src/telemetry/config.ts @@ -106,11 +106,31 @@ export async function resolveTelemetrySettings(options: { parseBooleanEnvFlag(env['QWEN_TELEMETRY_USE_COLLECTOR']) ?? settings.useCollector; + // Per-signal endpoint overrides (HTTP only). + // Priority: QWEN_ env var > standard OTEL_ env var > settings.json + const otlpTracesEndpoint = + env['QWEN_TELEMETRY_OTLP_TRACES_ENDPOINT'] ?? + env['OTEL_EXPORTER_OTLP_TRACES_ENDPOINT'] ?? + settings.otlpTracesEndpoint; + + const otlpLogsEndpoint = + env['QWEN_TELEMETRY_OTLP_LOGS_ENDPOINT'] ?? + env['OTEL_EXPORTER_OTLP_LOGS_ENDPOINT'] ?? + settings.otlpLogsEndpoint; + + const otlpMetricsEndpoint = + env['QWEN_TELEMETRY_OTLP_METRICS_ENDPOINT'] ?? + env['OTEL_EXPORTER_OTLP_METRICS_ENDPOINT'] ?? + settings.otlpMetricsEndpoint; + return { enabled, target, otlpEndpoint, otlpProtocol, + otlpTracesEndpoint, + otlpLogsEndpoint, + otlpMetricsEndpoint, logPrompts, outfile, useCollector, diff --git a/packages/core/src/telemetry/log-to-span-processor.test.ts b/packages/core/src/telemetry/log-to-span-processor.test.ts new file mode 100644 index 000000000..181b8cc0e --- /dev/null +++ b/packages/core/src/telemetry/log-to-span-processor.test.ts @@ -0,0 +1,386 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { SpanKind, SpanStatusCode, type HrTime } from '@opentelemetry/api'; +import { LogToSpanProcessor } from './log-to-span-processor.js'; +import type { ReadableLogRecord } from '@opentelemetry/sdk-logs'; +import type { SpanExporter } from '@opentelemetry/sdk-trace-base'; + +interface ExportedSpan { + name: string; + kind: number; + spanContext: () => { traceId: string; spanId: string; traceFlags: number }; + startTime: HrTime; + endTime: HrTime; + attributes: Record; + status: { code: number; message?: string }; +} + +describe('LogToSpanProcessor', () => { + let processor: LogToSpanProcessor; + let mockExporter: SpanExporter; + let exportedSpans: ExportedSpan[]; + + beforeEach(() => { + exportedSpans = []; + mockExporter = { + export: vi.fn((spans, cb) => { + exportedSpans.push(...spans); + cb({ code: 0 }); + }), + shutdown: vi.fn().mockResolvedValue(undefined), + forceFlush: vi.fn().mockResolvedValue(undefined), + } as unknown as SpanExporter; + processor = new LogToSpanProcessor(mockExporter, 60000); + }); + + afterEach(async () => { + await processor.shutdown(); + }); + + it('converts a log record to a span on flush', async () => { + const logRecord = { + body: 'test event', + hrTime: [1000, 500000000] as [number, number], + attributes: { key1: 'value1', key2: 42, key3: true }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans).toHaveLength(1); + const span = exportedSpans[0]; + expect(span.name).toBe('test event'); + expect(span.kind).toBe(SpanKind.INTERNAL); + expect(span.attributes['key1']).toBe('value1'); + expect(span.attributes['key2']).toBe(42); + expect(span.attributes['key3']).toBe(true); + expect(span.attributes['log.bridge']).toBe(true); + expect(span.startTime).toEqual([1000, 500000000]); + expect(span.endTime).toEqual([1000, 500000000]); + expect(span.status.code).toBe(SpanStatusCode.OK); + }); + + it('uses duration_ms to compute span end time', async () => { + const logRecord = { + body: 'api response', + hrTime: [1000, 0] as [number, number], + attributes: { duration_ms: 250 }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].endTime).toEqual([1000, 250000000]); + }); + + it('ignores non-finite duration_ms values', async () => { + const logRecord = { + body: 'api response', + hrTime: [1000, 0] as [number, number], + attributes: { duration_ms: Infinity }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].endTime).toEqual([1000, 0]); + }); + + it('handles duration_ms that causes second rollover', async () => { + const logRecord = { + body: 'long operation', + hrTime: [1000, 900000000] as [number, number], + attributes: { duration_ms: 500 }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].endTime).toEqual([1001, 400000000]); + }); + + it('serializes object attributes to JSON', async () => { + const logRecord = { + body: 'event with object', + hrTime: [1000, 0] as [number, number], + attributes: { metadata: { nested: true } }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].attributes['metadata']).toBe('{"nested":true}'); + }); + + it('handles unserializable object attributes safely', async () => { + const circular: Record = {}; + circular['self'] = circular; + const logRecord = { + body: 'event', + hrTime: [1000, 0] as [number, number], + attributes: { bad: circular }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].attributes['bad']).toBe('[unserializable]'); + }); + + it('drops sensitive attributes before exporting bridged spans', async () => { + const logRecord = { + body: 'event', + hrTime: [1000, 0] as [number, number], + attributes: { + prompt: 'secret prompt', + function_args: '{"token":"secret"}', + response_text: 'secret response', + safe: 'visible', + }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + const attrs = exportedSpans[0].attributes; + expect(attrs).not.toHaveProperty('prompt'); + expect(attrs).not.toHaveProperty('function_args'); + expect(attrs).not.toHaveProperty('response_text'); + expect(attrs['safe']).toBe('visible'); + expect(attrs['log.bridge']).toBe(true); + }); + + it('skips null and undefined attributes', async () => { + const logRecord = { + body: 'event', + hrTime: [1000, 0] as [number, number], + attributes: { valid: 'yes', nullVal: null, undefinedVal: undefined }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + const attrs = exportedSpans[0].attributes; + expect(attrs['valid']).toBe('yes'); + expect(attrs).not.toHaveProperty('nullVal'); + expect(attrs).not.toHaveProperty('undefinedVal'); + expect(attrs['log.bridge']).toBe(true); + }); + + it('uses "unknown" as span name when body is missing', async () => { + const logRecord = { + body: undefined, + hrTime: [1000, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].name).toBe('unknown'); + }); + + it('truncates long span names', async () => { + const longName = 'x'.repeat(200); + const logRecord = { + body: longName, + hrTime: [1000, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].name).toBe(`${'x'.repeat(128)}...`); + }); + + it('generates unique trace IDs without session.id', async () => { + const logRecord1 = { + body: 'event1', + hrTime: [1000, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord; + const logRecord2 = { + body: 'event2', + hrTime: [1001, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord1); + processor.onEmit(logRecord2); + await processor.forceFlush(); + + const ctx1 = exportedSpans[0].spanContext(); + const ctx2 = exportedSpans[1].spanContext(); + expect(ctx1.traceId).toHaveLength(32); + expect(ctx1.spanId).toHaveLength(16); + expect(ctx1.traceId).not.toBe(ctx2.traceId); + }); + + it('derives same traceId from same session.id', async () => { + const logRecord1 = { + body: 'event1', + hrTime: [1000, 0] as [number, number], + attributes: { 'session.id': 'session-abc' }, + } as unknown as ReadableLogRecord; + const logRecord2 = { + body: 'event2', + hrTime: [1001, 0] as [number, number], + attributes: { 'session.id': 'session-abc' }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord1); + processor.onEmit(logRecord2); + await processor.forceFlush(); + + const ctx1 = exportedSpans[0].spanContext(); + const ctx2 = exportedSpans[1].spanContext(); + expect(ctx1.traceId).toBe(ctx2.traceId); + expect(ctx1.spanId).not.toBe(ctx2.spanId); + }); + + it('derives different traceIds from different session.ids', async () => { + const logRecord1 = { + body: 'event1', + hrTime: [1000, 0] as [number, number], + attributes: { 'session.id': 'session-abc' }, + } as unknown as ReadableLogRecord; + const logRecord2 = { + body: 'event2', + hrTime: [1001, 0] as [number, number], + attributes: { 'session.id': 'session-xyz' }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord1); + processor.onEmit(logRecord2); + await processor.forceFlush(); + + const ctx1 = exportedSpans[0].spanContext(); + const ctx2 = exportedSpans[1].spanContext(); + expect(ctx1.traceId).not.toBe(ctx2.traceId); + }); + + it('sets ERROR status for truthy error attributes', async () => { + const logRecord = { + body: 'api error', + hrTime: [1000, 0] as [number, number], + attributes: { + error_message: 'connection refused', + error_type: 'NETWORK', + }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].status.code).toBe(SpanStatusCode.ERROR); + expect(exportedSpans[0].status.message).toBe('connection refused'); + }); + + it('does not set ERROR for success: false (normal decline)', async () => { + const logRecord = { + body: 'tool call declined', + hrTime: [1000, 0] as [number, number], + attributes: { success: false, function_name: 'bash' }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].status.code).toBe(SpanStatusCode.OK); + }); + + it('does not set ERROR for falsy error attributes', async () => { + const logRecord = { + body: 'ok event', + hrTime: [1000, 0] as [number, number], + attributes: { error: null, error_message: '', error_type: '' }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].status.code).toBe(SpanStatusCode.OK); + }); + + it('preserves severity attributes', async () => { + const logRecord = { + body: 'event', + hrTime: [1000, 0] as [number, number], + attributes: {}, + severityNumber: 9, + severityText: 'INFO', + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].attributes['log.severity_number']).toBe(9); + expect(exportedSpans[0].attributes['log.severity_text']).toBe('INFO'); + }); + + it('reuses in-flight exports and flushes queued spans afterwards', async () => { + await processor.shutdown(); + exportedSpans = []; + const exportCallbacks: Array<(result: { code: number }) => void> = []; + let exportCallCount = 0; + mockExporter = { + export: vi.fn((spans, cb) => { + exportCallCount += 1; + exportedSpans.push(...spans); + if (exportCallCount === 1) { + exportCallbacks.push(cb); + } else { + cb({ code: 0 }); + } + }), + shutdown: vi.fn().mockResolvedValue(undefined), + forceFlush: vi.fn().mockResolvedValue(undefined), + } as unknown as SpanExporter; + processor = new LogToSpanProcessor(mockExporter, 60000); + + processor.onEmit({ + body: 'first', + hrTime: [1000, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord); + const firstFlush = processor.forceFlush(); + await Promise.resolve(); + + processor.onEmit({ + body: 'second', + hrTime: [1001, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord); + const secondFlush = processor.forceFlush(); + await Promise.resolve(); + + expect(mockExporter.export).toHaveBeenCalledTimes(1); + expect(exportedSpans.map((span) => span.name)).toEqual(['first']); + + exportCallbacks[0]({ code: 0 }); + await Promise.all([firstFlush, secondFlush]); + + expect(mockExporter.export).toHaveBeenCalledTimes(2); + expect(exportedSpans.map((span) => span.name)).toEqual(['first', 'second']); + }); + + it('shutdown flushes remaining spans and shuts down exporter', async () => { + const logRecord = { + body: 'final event', + hrTime: [1000, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.shutdown(); + + expect(exportedSpans).toHaveLength(1); + expect(mockExporter.shutdown).toHaveBeenCalled(); + }); +}); diff --git a/packages/core/src/telemetry/log-to-span-processor.ts b/packages/core/src/telemetry/log-to-span-processor.ts new file mode 100644 index 000000000..cdf57830b --- /dev/null +++ b/packages/core/src/telemetry/log-to-span-processor.ts @@ -0,0 +1,288 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { SpanKind, SpanStatusCode, type HrTime } from '@opentelemetry/api'; +import type { + LogRecordProcessor, + ReadableLogRecord, +} from '@opentelemetry/sdk-logs'; +import type { SpanExporter, ReadableSpan } from '@opentelemetry/sdk-trace-base'; +import { + type Resource, + resourceFromAttributes, +} from '@opentelemetry/resources'; + +import { createHash } from 'node:crypto'; + +import { SERVICE_NAME } from './constants.js'; + +const EXPORT_TIMEOUT_MS = 30_000; +const MAX_SPAN_NAME_LENGTH = 128; +const SENSITIVE_ATTRIBUTE_KEYS = new Set([ + 'prompt', + 'function_args', + 'response_text', +]); + +/** + * A LogRecordProcessor that converts each OTel log record into a span + * and exports it directly through the provided SpanExporter. + * + * This bridges the gap for backends (e.g., Alibaba Cloud) that support + * traces and metrics but not logs over OTLP. Instead of going through + * the global TracerProvider (which can break in bundled environments), + * this processor directly constructs ReadableSpan objects and feeds + * them to the exporter. + * + * When a log record has a `duration_ms` attribute, the resulting span + * will have a matching duration. Otherwise, the span is instantaneous. + */ +export class LogToSpanProcessor implements LogRecordProcessor { + private buffer: ReadableSpanLike[] = []; + private flushTimer: ReturnType | undefined; + private inFlightExport: Promise | undefined; + private readonly flushIntervalMs: number; + + constructor( + private readonly spanExporter: SpanExporter, + flushIntervalMs = 5000, + ) { + this.flushIntervalMs = flushIntervalMs; + this.flushTimer = setInterval(() => { + void this.flush(); + }, this.flushIntervalMs); + this.flushTimer.unref(); + } + + onEmit(logRecord: ReadableLogRecord): void { + const name = sanitizeSpanName(logRecord.body); + const startTime = logRecord.hrTime; + + const attributes: Record = {}; + if (logRecord.attributes) { + for (const [key, value] of Object.entries(logRecord.attributes)) { + if ( + value !== undefined && + value !== null && + !SENSITIVE_ATTRIBUTE_KEYS.has(key) + ) { + attributes[key] = + typeof value === 'object' + ? safeStringify(value) + : (value as string | number | boolean); + } + } + } + attributes['log.bridge'] = true; + + // Preserve severity so downstream queries can filter by log level. + if (logRecord.severityNumber !== undefined) { + attributes['log.severity_number'] = logRecord.severityNumber; + } + if (logRecord.severityText) { + attributes['log.severity_text'] = logRecord.severityText; + } + + let endTime = startTime; + const durationMs = logRecord.attributes?.['duration_ms']; + if ( + typeof durationMs === 'number' && + Number.isFinite(durationMs) && + durationMs > 0 + ) { + const [secs, nanos] = startTime; + const durationNanos = durationMs * 1_000_000; + const endNanos = nanos + durationNanos; + endTime = [secs + Math.floor(endNanos / 1e9), endNanos % 1e9] as HrTime; + } + + // Derive traceId from session.id so all events in one session + // appear under a single trace. spanId is random per event. + const sessionId = logRecord.attributes?.['session.id']; + const traceId = sessionId + ? deriveTraceId(String(sessionId)) + : randomHexString(32); + const spanId = randomHexString(16); + + this.buffer.push({ + name, + kind: SpanKind.INTERNAL, + spanContext: () => ({ + traceId, + spanId, + traceFlags: 1, // SAMPLED + }), + startTime, + endTime, + duration: hrTimeDiff(startTime, endTime), + attributes, + status: deriveSpanStatus(logRecord.attributes), + events: [], + links: [], + resource: logRecord.resource ?? resourceFromAttributes({}), + instrumentationScope: logRecord.instrumentationScope ?? { + name: SERVICE_NAME, + version: '', + }, + ended: true, + parentSpanContext: undefined, + droppedAttributesCount: 0, + droppedEventsCount: 0, + droppedLinksCount: 0, + recordException: () => {}, + }); + } + + private flush(): Promise { + if (this.inFlightExport) return this.inFlightExport; + if (this.buffer.length === 0) return Promise.resolve(); + const spans = this.buffer.splice(0); + const exportPromise = new Promise((resolve) => { + const timeout = setTimeout(() => { + process.stderr.write( + `[LogToSpan] export timeout after ${EXPORT_TIMEOUT_MS}ms\n`, + ); + resolve(); + }, EXPORT_TIMEOUT_MS); + timeout.unref(); + + try { + this.spanExporter.export( + spans as unknown as ReadableSpan[], + (result) => { + clearTimeout(timeout); + if (result.code !== 0) { + process.stderr.write( + `[LogToSpan] export failed: code=${result.code} error=${result.error?.message ?? 'unknown'}\n`, + ); + } + resolve(); + }, + ); + } catch (err) { + clearTimeout(timeout); + process.stderr.write( + `[LogToSpan] export threw: ${err instanceof Error ? err.message : String(err)}\n`, + ); + resolve(); + } + }); + this.inFlightExport = exportPromise.finally(() => { + this.inFlightExport = undefined; + }); + return this.inFlightExport; + } + + async shutdown(): Promise { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = undefined; + } + // Wait for any in-flight interval-triggered export before final flush. + if (this.inFlightExport) { + await this.inFlightExport; + } + await this.flush(); + await this.spanExporter.shutdown(); + } + + async forceFlush(): Promise { + if (this.inFlightExport) { + await this.inFlightExport; + } + await this.flush(); + await this.spanExporter.forceFlush?.(); + } +} + +interface ReadableSpanLike { + name: string; + kind: SpanKind; + spanContext: () => { traceId: string; spanId: string; traceFlags: number }; + startTime: HrTime; + endTime: HrTime; + duration: HrTime; + attributes: Record; + status: { code: SpanStatusCode; message?: string }; + events: never[]; + links: never[]; + resource: Resource; + instrumentationScope: { name: string; version?: string; schemaUrl?: string }; + ended: boolean; + parentSpanContext?: { traceId: string; spanId: string; traceFlags: number }; + droppedAttributesCount: number; + droppedEventsCount: number; + droppedLinksCount: number; + recordException: () => void; +} + +function sanitizeSpanName(body: unknown): string { + const rawName = String(body ?? 'unknown'); + return rawName.length > MAX_SPAN_NAME_LENGTH + ? `${rawName.slice(0, MAX_SPAN_NAME_LENGTH)}...` + : rawName; +} + +/** + * Safely stringify an object value for use as a span attribute. + * Returns a bounded fallback when JSON serialization fails, such as for + * circular references or BigInt values. + */ +function safeStringify(value: unknown): string { + try { + return JSON.stringify(value); + } catch { + return '[unserializable]'; + } +} + +function randomHexString(length: number): string { + const bytes = new Uint8Array(length / 2); + crypto.getRandomValues(bytes); + return Array.from(bytes, (b) => b.toString(16).padStart(2, '0')).join(''); +} + +/** + * Derive a deterministic 32-char hex traceId from a session ID. + * All events in the same session will share this traceId, + * making them appear under a single trace in the backend. + * Uses SHA-256 truncated to 32 hex chars (128 bits) to match the + * OTel trace ID format. + */ +function deriveTraceId(sessionId: string): string { + return createHash('sha256').update(sessionId).digest('hex').slice(0, 32); +} + +/** + * Derive span status from log record attributes. + * Marks the span as ERROR when explicit error indicators are present + * (truthy `error`, `error_message`, or `error_type` attributes). + * Does NOT treat `success: false` as an error — declined/cancelled + * operations are a normal outcome, not failures. + */ +function deriveSpanStatus(attrs: Record | undefined): { + code: SpanStatusCode; + message?: string; +} { + if (!attrs) return { code: SpanStatusCode.OK }; + if (!!attrs['error'] || !!attrs['error_message'] || !!attrs['error_type']) { + const msg = String( + attrs['error_message'] ?? attrs['error'] ?? attrs['error_type'] ?? '', + ); + return { code: SpanStatusCode.ERROR, ...(msg && { message: msg }) }; + } + return { code: SpanStatusCode.OK }; +} + +function hrTimeDiff(start: HrTime, end: HrTime): HrTime { + let secs = end[0] - start[0]; + let nanos = end[1] - start[1]; + if (nanos < 0) { + secs -= 1; + nanos += 1e9; + } + return [secs, nanos] as HrTime; +} diff --git a/packages/core/src/telemetry/sdk.test.ts b/packages/core/src/telemetry/sdk.test.ts index 9274631db..f88e9b9aa 100644 --- a/packages/core/src/telemetry/sdk.test.ts +++ b/packages/core/src/telemetry/sdk.test.ts @@ -5,8 +5,14 @@ */ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { diag } from '@opentelemetry/api'; import type { Config } from '../config/config.js'; -import { initializeTelemetry, shutdownTelemetry } from './sdk.js'; +import { + initializeTelemetry, + isTelemetrySdkInitialized, + shutdownTelemetry, + resolveHttpOtlpUrl, +} from './sdk.js'; import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc'; import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-grpc'; import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-grpc'; @@ -27,6 +33,68 @@ vi.mock('@opentelemetry/exporter-logs-otlp-http'); vi.mock('@opentelemetry/exporter-metrics-otlp-http'); vi.mock('@opentelemetry/sdk-node'); vi.mock('./gcp-exporters.js'); +vi.mock('./log-to-span-processor.js'); + +import { LogToSpanProcessor } from './log-to-span-processor.js'; + +describe('resolveHttpOtlpUrl', () => { + it('appends signal path to base collector URL', () => { + expect(resolveHttpOtlpUrl('http://collector:4318', 'traces')).toBe( + 'http://collector:4318/v1/traces', + ); + expect(resolveHttpOtlpUrl('http://collector:4318', 'logs')).toBe( + 'http://collector:4318/v1/logs', + ); + expect(resolveHttpOtlpUrl('http://collector:4318', 'metrics')).toBe( + 'http://collector:4318/v1/metrics', + ); + }); + + it('handles trailing slash in base URL', () => { + expect(resolveHttpOtlpUrl('http://collector:4318/', 'traces')).toBe( + 'http://collector:4318/v1/traces', + ); + expect(resolveHttpOtlpUrl('http://collector:4318/', 'logs')).toBe( + 'http://collector:4318/v1/logs', + ); + }); + + it('preserves explicit full signal path URL', () => { + expect( + resolveHttpOtlpUrl('http://collector:4318/v1/traces', 'traces'), + ).toBe('http://collector:4318/v1/traces'); + expect(resolveHttpOtlpUrl('http://collector:4318/v1/logs', 'logs')).toBe( + 'http://collector:4318/v1/logs', + ); + expect( + resolveHttpOtlpUrl('http://collector:4318/v1/metrics', 'metrics'), + ).toBe('http://collector:4318/v1/metrics'); + }); + + it('appends signal path when URL has a non-signal custom path', () => { + expect( + resolveHttpOtlpUrl('http://collector:4318/custom/prefix', 'traces'), + ).toBe('http://collector:4318/custom/prefix/v1/traces'); + }); + + it('handles HTTPS URLs', () => { + expect(resolveHttpOtlpUrl('https://otel.example.com', 'logs')).toBe( + 'https://otel.example.com/v1/logs', + ); + expect(resolveHttpOtlpUrl('https://otel.example.com:4318', 'metrics')).toBe( + 'https://otel.example.com:4318/v1/metrics', + ); + }); + + it('preserves query strings when appending signal paths', () => { + expect(resolveHttpOtlpUrl('https://host/otlp?token=abc', 'traces')).toBe( + 'https://host/otlp/v1/traces?token=abc', + ); + expect( + resolveHttpOtlpUrl('https://host/otlp?token=abc&foo=bar', 'logs'), + ).toBe('https://host/otlp/v1/logs?token=abc&foo=bar'); + }); +}); describe('Telemetry SDK', () => { let mockConfig: Config; @@ -37,6 +105,9 @@ describe('Telemetry SDK', () => { getTelemetryEnabled: () => true, getTelemetryOtlpEndpoint: () => 'http://localhost:4317', getTelemetryOtlpProtocol: () => 'grpc', + getTelemetryOtlpTracesEndpoint: () => undefined, + getTelemetryOtlpLogsEndpoint: () => undefined, + getTelemetryOtlpMetricsEndpoint: () => undefined, getTelemetryTarget: () => 'local', getTelemetryUseCollector: () => false, getTelemetryOutfile: () => undefined, @@ -67,7 +138,7 @@ describe('Telemetry SDK', () => { expect(NodeSDK.prototype.start).toHaveBeenCalled(); }); - it('should use HTTP exporters when protocol is http', () => { + it('should use HTTP exporters with signal-specific paths when protocol is http', () => { vi.spyOn(mockConfig, 'getTelemetryEnabled').mockReturnValue(true); vi.spyOn(mockConfig, 'getTelemetryOtlpProtocol').mockReturnValue('http'); vi.spyOn(mockConfig, 'getTelemetryOtlpEndpoint').mockReturnValue( @@ -77,13 +148,13 @@ describe('Telemetry SDK', () => { initializeTelemetry(mockConfig); expect(OTLPTraceExporterHttp).toHaveBeenCalledWith({ - url: 'http://localhost:4318/', + url: 'http://localhost:4318/v1/traces', }); expect(OTLPLogExporterHttp).toHaveBeenCalledWith({ - url: 'http://localhost:4318/', + url: 'http://localhost:4318/v1/logs', }); expect(OTLPMetricExporterHttp).toHaveBeenCalledWith({ - url: 'http://localhost:4318/', + url: 'http://localhost:4318/v1/metrics', }); expect(NodeSDK.prototype.start).toHaveBeenCalled(); }); @@ -98,15 +169,92 @@ describe('Telemetry SDK', () => { ); }); - it('should parse HTTP endpoint correctly', () => { + it('should append signal paths to HTTP endpoint', () => { vi.spyOn(mockConfig, 'getTelemetryOtlpProtocol').mockReturnValue('http'); vi.spyOn(mockConfig, 'getTelemetryOtlpEndpoint').mockReturnValue( 'https://my-collector.com', ); initializeTelemetry(mockConfig); expect(OTLPTraceExporterHttp).toHaveBeenCalledWith( - expect.objectContaining({ url: 'https://my-collector.com/' }), + expect.objectContaining({ url: 'https://my-collector.com/v1/traces' }), ); + expect(OTLPLogExporterHttp).toHaveBeenCalledWith( + expect.objectContaining({ url: 'https://my-collector.com/v1/logs' }), + ); + expect(OTLPMetricExporterHttp).toHaveBeenCalledWith( + expect.objectContaining({ url: 'https://my-collector.com/v1/metrics' }), + ); + }); + + it('should use per-signal endpoint overrides when provided', () => { + vi.spyOn(mockConfig, 'getTelemetryOtlpProtocol').mockReturnValue('http'); + vi.spyOn(mockConfig, 'getTelemetryOtlpEndpoint').mockReturnValue( + 'http://default-collector:4318', + ); + vi.spyOn(mockConfig, 'getTelemetryOtlpTracesEndpoint').mockReturnValue( + 'http://traces-collector:4318/v1/traces', + ); + + initializeTelemetry(mockConfig); + + // Traces uses the per-signal override + expect(OTLPTraceExporterHttp).toHaveBeenCalledWith({ + url: 'http://traces-collector:4318/v1/traces', + }); + // Logs and metrics use the base endpoint with paths appended + expect(OTLPLogExporterHttp).toHaveBeenCalledWith({ + url: 'http://default-collector:4318/v1/logs', + }); + expect(OTLPMetricExporterHttp).toHaveBeenCalledWith({ + url: 'http://default-collector:4318/v1/metrics', + }); + }); + + it('should use per-signal overrides without base endpoint', () => { + vi.spyOn(mockConfig, 'getTelemetryOtlpProtocol').mockReturnValue('http'); + vi.spyOn(mockConfig, 'getTelemetryOtlpEndpoint').mockReturnValue(''); + vi.spyOn(mockConfig, 'getTelemetryOtlpTracesEndpoint').mockReturnValue( + 'http://traces-host/token/api/otlp/traces', + ); + vi.spyOn(mockConfig, 'getTelemetryOtlpMetricsEndpoint').mockReturnValue( + 'http://metrics-host/token/api/otlp/metrics', + ); + // logs has no override and no base endpoint + + initializeTelemetry(mockConfig); + + // Traces and metrics use per-signal override + expect(OTLPTraceExporterHttp).toHaveBeenCalledWith({ + url: 'http://traces-host/token/api/otlp/traces', + }); + expect(OTLPMetricExporterHttp).toHaveBeenCalledWith({ + url: 'http://metrics-host/token/api/otlp/metrics', + }); + // Logs falls back to LogToSpanProcessor (bridges logs → spans) + expect(OTLPLogExporterHttp).not.toHaveBeenCalled(); + expect(LogToSpanProcessor).toHaveBeenCalled(); + expect(NodeSDK.prototype.start).toHaveBeenCalled(); + }); + + it('should warn and skip startup for gRPC per-signal endpoints without base endpoint', () => { + const diagWarnSpy = vi.spyOn(diag, 'warn').mockImplementation(() => {}); + try { + vi.spyOn(mockConfig, 'getTelemetryOtlpProtocol').mockReturnValue('grpc'); + vi.spyOn(mockConfig, 'getTelemetryOtlpEndpoint').mockReturnValue(''); + vi.spyOn(mockConfig, 'getTelemetryOtlpTracesEndpoint').mockReturnValue( + 'http://traces-host/token/api/otlp/traces', + ); + + initializeTelemetry(mockConfig); + + expect(diagWarnSpy).toHaveBeenCalledWith( + expect.stringContaining('Telemetry SDK startup was skipped'), + ); + expect(NodeSDK.prototype.start).not.toHaveBeenCalled(); + expect(isTelemetrySdkInitialized()).toBe(false); + } finally { + diagWarnSpy.mockRestore(); + } }); it('should use OTLP exporters when target is gcp but useCollector is true', () => { @@ -145,4 +293,34 @@ describe('Telemetry SDK', () => { expect(OTLPMetricExporterHttp).not.toHaveBeenCalled(); expect(NodeSDK.prototype.start).toHaveBeenCalled(); }); + + it('should not register async process shutdown handlers', () => { + const processOnSpy = vi.spyOn(process, 'on'); + try { + initializeTelemetry(mockConfig); + + expect(processOnSpy).not.toHaveBeenCalledWith( + 'SIGTERM', + expect.any(Function), + ); + expect(processOnSpy).not.toHaveBeenCalledWith( + 'SIGINT', + expect.any(Function), + ); + expect(processOnSpy).not.toHaveBeenCalledWith( + 'exit', + expect.any(Function), + ); + } finally { + processOnSpy.mockRestore(); + } + }); + + it('should mark telemetry uninitialized after shutdown', async () => { + initializeTelemetry(mockConfig); + + await shutdownTelemetry(); + + expect(isTelemetrySdkInitialized()).toBe(false); + }); }); diff --git a/packages/core/src/telemetry/sdk.ts b/packages/core/src/telemetry/sdk.ts index 3dba2acc4..52bdb6712 100644 --- a/packages/core/src/telemetry/sdk.ts +++ b/packages/core/src/telemetry/sdk.ts @@ -15,18 +15,9 @@ import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base'; import { NodeSDK } from '@opentelemetry/sdk-node'; import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; import { resourceFromAttributes } from '@opentelemetry/resources'; -import { - BatchSpanProcessor, - ConsoleSpanExporter, -} from '@opentelemetry/sdk-trace-node'; -import { - BatchLogRecordProcessor, - ConsoleLogRecordExporter, -} from '@opentelemetry/sdk-logs'; -import { - ConsoleMetricExporter, - PeriodicExportingMetricReader, -} from '@opentelemetry/sdk-metrics'; +import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-node'; +import { BatchLogRecordProcessor } from '@opentelemetry/sdk-logs'; +import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'; import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'; import type { Config } from '../config/config.js'; import { SERVICE_NAME } from './constants.js'; @@ -37,12 +28,48 @@ import { FileSpanExporter, } from './file-exporters.js'; import { createDebugLogger } from '../utils/debugLogger.js'; +import { LogToSpanProcessor } from './log-to-span-processor.js'; // For troubleshooting, set the log level to DiagLogLevel.DEBUG -diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.INFO); +diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.WARN); + +/** + * Standard OTLP HTTP signal-specific paths per the OpenTelemetry specification. + * gRPC uses service-based routing so no path appending is needed. + */ +const OTLP_SIGNAL_PATHS = { + traces: 'v1/traces', + logs: 'v1/logs', + metrics: 'v1/metrics', +} as const; + +type OtlpSignal = keyof typeof OTLP_SIGNAL_PATHS; + +/** + * Resolve the final URL for an HTTP OTLP exporter. + * + * - If the URL path already ends with the signal-specific path (e.g., /v1/traces), + * use it as-is. This supports explicit full-path configuration. + * - Otherwise, append the signal-specific path to the base URL. + */ +export function resolveHttpOtlpUrl( + baseEndpoint: string, + signal: OtlpSignal, +): string { + const signalPath = OTLP_SIGNAL_PATHS[signal]; + const url = new URL(baseEndpoint); + const normalizedPath = url.pathname.replace(/\/+$/, ''); + if (normalizedPath.endsWith(signalPath)) { + return url.href; + } + // Append the signal path to the URL pathname, preserving query/hash. + url.pathname = normalizedPath + '/' + signalPath; + return url.href; +} let sdk: NodeSDK | undefined; let telemetryInitialized = false; +let telemetryShutdownPromise: Promise | undefined; export function isTelemetrySdkInitialized(): boolean { return telemetryInitialized; @@ -73,6 +100,31 @@ function parseOtlpEndpoint( } } +/** + * Validate a URL string. Returns the URL if valid http(s), undefined otherwise. + * Logs an error for invalid URLs instead of throwing. + */ +function validateUrl(url: string | undefined): string | undefined { + if (!url) return undefined; + try { + const parsed = new URL(url); + if (parsed.protocol !== 'http:' && parsed.protocol !== 'https:') { + diag.error( + `OTLP endpoint must use http or https, got ${parsed.protocol}`, + ); + return undefined; + } + if (!parsed.hostname) { + diag.error('OTLP endpoint missing hostname'); + return undefined; + } + return url; + } catch { + diag.error('Invalid OTLP signal endpoint URL, skipping:', url); + return undefined; + } +} + export function initializeTelemetry(config: Config): void { if (telemetryInitialized || !config.getTelemetryEnabled()) { return; @@ -89,51 +141,97 @@ export function initializeTelemetry(config: Config): void { const otlpProtocol = config.getTelemetryOtlpProtocol(); const parsedEndpoint = parseOtlpEndpoint(otlpEndpoint, otlpProtocol); const telemetryOutfile = config.getTelemetryOutfile(); - const useOtlp = !!parsedEndpoint && !telemetryOutfile; + const hasPerSignalEndpoint = + !!config.getTelemetryOtlpTracesEndpoint() || + !!config.getTelemetryOtlpLogsEndpoint() || + !!config.getTelemetryOtlpMetricsEndpoint(); + const useOtlp = + (!!parsedEndpoint || hasPerSignalEndpoint) && !telemetryOutfile; let spanExporter: | OTLPTraceExporter | OTLPTraceExporterHttp | FileSpanExporter - | ConsoleSpanExporter; + | undefined; let logExporter: | OTLPLogExporter | OTLPLogExporterHttp | FileLogExporter - | ConsoleLogRecordExporter; - let metricReader: PeriodicExportingMetricReader; + | undefined; + let metricReader: PeriodicExportingMetricReader | undefined; + let logToSpanProcessor: LogToSpanProcessor | undefined; if (useOtlp) { if (otlpProtocol === 'http') { - spanExporter = new OTLPTraceExporterHttp({ - url: parsedEndpoint, - }); - logExporter = new OTLPLogExporterHttp({ - url: parsedEndpoint, - }); - metricReader = new PeriodicExportingMetricReader({ - exporter: new OTLPMetricExporterHttp({ - url: parsedEndpoint, - }), - exportIntervalMillis: 10000, - }); + const tracesUrl = validateUrl( + config.getTelemetryOtlpTracesEndpoint() ?? + (parsedEndpoint + ? resolveHttpOtlpUrl(parsedEndpoint, 'traces') + : undefined), + ); + const logsUrl = validateUrl( + config.getTelemetryOtlpLogsEndpoint() ?? + (parsedEndpoint + ? resolveHttpOtlpUrl(parsedEndpoint, 'logs') + : undefined), + ); + const metricsUrl = validateUrl( + config.getTelemetryOtlpMetricsEndpoint() ?? + (parsedEndpoint + ? resolveHttpOtlpUrl(parsedEndpoint, 'metrics') + : undefined), + ); + + debugLogger.debug( + `OTLP HTTP endpoints: traces=${tracesUrl ?? 'none'}, logs=${logsUrl ?? 'none'}, metrics=${metricsUrl ?? 'none'}`, + ); + + if (tracesUrl) { + spanExporter = new OTLPTraceExporterHttp({ url: tracesUrl }); + } + if (logsUrl) { + logExporter = new OTLPLogExporterHttp({ url: logsUrl }); + } else if (tracesUrl) { + // Bridge: no logs endpoint but traces endpoint exists. + // Convert log records to spans. Use a dedicated trace exporter so the + // bridge owns its own forceFlush/shutdown lifecycle. + logToSpanProcessor = new LogToSpanProcessor( + new OTLPTraceExporterHttp({ url: tracesUrl }), + ); + } + if (metricsUrl) { + metricReader = new PeriodicExportingMetricReader({ + exporter: new OTLPMetricExporterHttp({ url: metricsUrl }), + exportIntervalMillis: 10000, + }); + } } else { - // grpc - spanExporter = new OTLPTraceExporter({ - url: parsedEndpoint, - compression: CompressionAlgorithm.GZIP, - }); - logExporter = new OTLPLogExporter({ - url: parsedEndpoint, - compression: CompressionAlgorithm.GZIP, - }); - metricReader = new PeriodicExportingMetricReader({ - exporter: new OTLPMetricExporter({ + // grpc — per-signal endpoints are not supported with gRPC protocol. + if (!parsedEndpoint) { + const warning = + 'Per-signal OTLP endpoints are only supported with HTTP protocol. ' + + 'Set otlpProtocol to "http" or provide a base otlpEndpoint for gRPC. ' + + 'Telemetry SDK startup was skipped because no supported gRPC endpoint was configured.'; + diag.warn(warning); + debugLogger.warn(warning); + return; + } else { + spanExporter = new OTLPTraceExporter({ url: parsedEndpoint, compression: CompressionAlgorithm.GZIP, - }), - exportIntervalMillis: 10000, - }); + }); + logExporter = new OTLPLogExporter({ + url: parsedEndpoint, + compression: CompressionAlgorithm.GZIP, + }); + metricReader = new PeriodicExportingMetricReader({ + exporter: new OTLPMetricExporter({ + url: parsedEndpoint, + compression: CompressionAlgorithm.GZIP, + }), + exportIntervalMillis: 10000, + }); + } } } else if (telemetryOutfile) { spanExporter = new FileSpanExporter(telemetryOutfile); @@ -142,20 +240,18 @@ export function initializeTelemetry(config: Config): void { exporter: new FileMetricExporter(telemetryOutfile), exportIntervalMillis: 10000, }); - } else { - spanExporter = new ConsoleSpanExporter(); - logExporter = new ConsoleLogRecordExporter(); - metricReader = new PeriodicExportingMetricReader({ - exporter: new ConsoleMetricExporter(), - exportIntervalMillis: 10000, - }); } + // If no exporter is configured for a signal, it is silently skipped. sdk = new NodeSDK({ resource, - spanProcessors: [new BatchSpanProcessor(spanExporter)], - logRecordProcessors: [new BatchLogRecordProcessor(logExporter)], - metricReader, + spanProcessors: spanExporter ? [new BatchSpanProcessor(spanExporter)] : [], + logRecordProcessors: logExporter + ? [new BatchLogRecordProcessor(logExporter)] + : logToSpanProcessor + ? [logToSpanProcessor] + : [], + ...(metricReader && { metricReader }), instrumentations: [new HttpInstrumentation()], }); @@ -167,29 +263,28 @@ export function initializeTelemetry(config: Config): void { } catch (error) { debugLogger.error('Error starting OpenTelemetry SDK:', error); } - - process.on('SIGTERM', () => { - shutdownTelemetry(); - }); - process.on('SIGINT', () => { - shutdownTelemetry(); - }); - process.on('exit', () => { - shutdownTelemetry(); - }); } export async function shutdownTelemetry(): Promise { + if (telemetryShutdownPromise) { + return telemetryShutdownPromise; + } if (!telemetryInitialized || !sdk) { return; } + const currentSdk = sdk; const debugLogger = createDebugLogger('OTEL'); - try { - await sdk.shutdown(); - debugLogger.debug('OpenTelemetry SDK shut down successfully.'); - } catch (error) { - debugLogger.error('Error shutting down SDK:', error); - } finally { - telemetryInitialized = false; - } + telemetryShutdownPromise = (async () => { + try { + await currentSdk.shutdown(); + debugLogger.debug('OpenTelemetry SDK shut down successfully.'); + } catch (error) { + debugLogger.error('Error shutting down SDK:', error); + } finally { + telemetryInitialized = false; + sdk = undefined; + telemetryShutdownPromise = undefined; + } + })(); + return telemetryShutdownPromise; }