From 5d1052a3587c9def221af368bf6a7f3e92cf62b4 Mon Sep 17 00:00:00 2001 From: jinye Date: Fri, 1 May 2026 22:47:01 +0800 Subject: [PATCH] feat(telemetry): define HTTP OTLP endpoint behavior and signal routing (#3779) * feat(telemetry): define HTTP OTLP endpoint behavior and signal routing - Add resolveHttpOtlpUrl() that appends /v1/traces, /v1/logs, /v1/metrics to base HTTP OTLP endpoints per the OpenTelemetry specification - Add per-signal endpoint overrides (otlpTracesEndpoint, otlpLogsEndpoint, otlpMetricsEndpoint) for backends with non-standard paths (e.g. Alibaba Cloud) - Add LogToSpanProcessor that bridges OTel log records to spans for traces-only backends, with session-based traceId correlation and error status propagation - Auto-wire LogToSpanProcessor when traces URL exists but logs URL doesn't - Validate per-signal URLs gracefully (log error + skip, don't crash) - Preserve query strings when appending signal paths to URLs - Guard gRPC branch against missing base endpoint with per-signal config - Update telemetry documentation with signal routing semantics and Alibaba Cloud HTTP per-signal endpoint examples Closes #3734 Co-authored-by: Qwen-Coder * fix(telemetry): fix TS noPropertyAccessFromIndexSignature errors in tests Use typed ExportedSpan interface and bracket notation for index signature properties to satisfy strict TypeScript checks in CI. Co-authored-by: Qwen-Coder * fix(telemetry): replace MD5 with SHA-256 for traceId derivation CodeQL flagged MD5 as a weak cryptographic algorithm when used with session.id (considered sensitive data). Switch to SHA-256 truncated to 32 hex chars to satisfy CodeQL while maintaining the same traceId format required by the OTel specification. Co-authored-by: Qwen-Coder * fix(telemetry): address review feedback for LogToSpanProcessor robustness - Wrap JSON.stringify in try/catch to handle circular refs and BigInt - Add export timeout (30s) and try/catch to prevent hung shutdown - Track in-flight exports to avoid interval-vs-shutdown race condition - Fix deriveSpanStatus: use truthy checks (!!), drop success===false heuristic since declined tool calls are normal, not errors - Enforce http(s) scheme in validateUrl to reject file:/javascript: URLs - Change DiagLogLevel from ERROR to WARN to preserve operational diagnostics - Preserve logRecord.instrumentationScope instead of hardcoding - Forward severityNumber/severityText as span attributes - Add tests for circular refs, error status edge cases, severity Co-authored-by: Qwen-Coder * fix(telemetry): flush sdk shutdown through cleanup Remove async process exit handlers from telemetry initialization and route SDK shutdown through Config cleanup so normal CLI exit paths await pending telemetry exports. Keep shutdown idempotent while an SDK shutdown is in flight. Co-authored-by: Qwen-Coder * fix(telemetry): harden bridged log shutdown Co-authored-by: Qwen-Coder * fix(telemetry): address review follow-ups Co-authored-by: Qwen-Coder --------- Co-authored-by: Qwen-Coder --- docs/developers/development/telemetry.md | 60 ++- packages/cli/src/gemini.test.tsx | 93 +++++ packages/cli/src/gemini.tsx | 53 ++- packages/core/src/config/config.test.ts | 66 +++ packages/core/src/config/config.ts | 40 +- packages/core/src/telemetry/config.test.ts | 54 ++- packages/core/src/telemetry/config.ts | 20 + .../telemetry/log-to-span-processor.test.ts | 386 ++++++++++++++++++ .../src/telemetry/log-to-span-processor.ts | 288 +++++++++++++ packages/core/src/telemetry/sdk.test.ts | 192 ++++++++- packages/core/src/telemetry/sdk.ts | 237 +++++++---- 11 files changed, 1387 insertions(+), 102 deletions(-) create mode 100644 packages/core/src/telemetry/log-to-span-processor.test.ts create mode 100644 packages/core/src/telemetry/log-to-span-processor.ts diff --git a/docs/developers/development/telemetry.md b/docs/developers/development/telemetry.md index 442d34b52..0db11ada1 100644 --- a/docs/developers/development/telemetry.md +++ b/docs/developers/development/telemetry.md @@ -58,20 +58,36 @@ observability framework — Qwen Code's observability system provides: All telemetry behavior is controlled through your `.qwen/settings.json` file. These settings can be overridden by environment variables or CLI flags. -| Setting | Environment Variable | CLI Flag | Description | Values | Default | -| -------------- | ------------------------------ | -------------------------------------------------------- | ------------------------------------------------- | ----------------- | ----------------------- | -| `enabled` | `QWEN_TELEMETRY_ENABLED` | `--telemetry` / `--no-telemetry` | Enable or disable telemetry | `true`/`false` | `false` | -| `target` | `QWEN_TELEMETRY_TARGET` | `--telemetry-target ` | Where to send telemetry data | `"gcp"`/`"local"` | `"local"` | -| `otlpEndpoint` | `QWEN_TELEMETRY_OTLP_ENDPOINT` | `--telemetry-otlp-endpoint ` | OTLP collector endpoint | URL string | `http://localhost:4317` | -| `otlpProtocol` | `QWEN_TELEMETRY_OTLP_PROTOCOL` | `--telemetry-otlp-protocol ` | OTLP transport protocol | `"grpc"`/`"http"` | `"grpc"` | -| `outfile` | `QWEN_TELEMETRY_OUTFILE` | `--telemetry-outfile ` | Save telemetry to file (overrides `otlpEndpoint`) | file path | - | -| `logPrompts` | `QWEN_TELEMETRY_LOG_PROMPTS` | `--telemetry-log-prompts` / `--no-telemetry-log-prompts` | Include prompts in telemetry logs | `true`/`false` | `true` | -| `useCollector` | `QWEN_TELEMETRY_USE_COLLECTOR` | - | Use external OTLP collector (advanced) | `true`/`false` | `false` | +| Setting | Environment Variable | CLI Flag | Description | Values | Default | +| --------------------- | -------------------------------------- | -------------------------------------------------------- | ---------------------------------------------------- | ----------------- | ----------------------- | +| `enabled` | `QWEN_TELEMETRY_ENABLED` | `--telemetry` / `--no-telemetry` | Enable or disable telemetry | `true`/`false` | `false` | +| `target` | `QWEN_TELEMETRY_TARGET` | `--telemetry-target ` | Where to send telemetry data | `"gcp"`/`"local"` | `"local"` | +| `otlpEndpoint` | `QWEN_TELEMETRY_OTLP_ENDPOINT` | `--telemetry-otlp-endpoint ` | OTLP collector endpoint | URL string | `http://localhost:4317` | +| `otlpProtocol` | `QWEN_TELEMETRY_OTLP_PROTOCOL` | `--telemetry-otlp-protocol ` | OTLP transport protocol | `"grpc"`/`"http"` | `"grpc"` | +| `otlpTracesEndpoint` | `QWEN_TELEMETRY_OTLP_TRACES_ENDPOINT` | - | Per-signal endpoint override for traces (HTTP only) | URL string | - | +| `otlpLogsEndpoint` | `QWEN_TELEMETRY_OTLP_LOGS_ENDPOINT` | - | Per-signal endpoint override for logs (HTTP only) | URL string | - | +| `otlpMetricsEndpoint` | `QWEN_TELEMETRY_OTLP_METRICS_ENDPOINT` | - | Per-signal endpoint override for metrics (HTTP only) | URL string | - | +| `outfile` | `QWEN_TELEMETRY_OUTFILE` | `--telemetry-outfile ` | Save telemetry to file (overrides `otlpEndpoint`) | file path | - | +| `logPrompts` | `QWEN_TELEMETRY_LOG_PROMPTS` | `--telemetry-log-prompts` / `--no-telemetry-log-prompts` | Include prompts in telemetry logs | `true`/`false` | `true` | +| `useCollector` | `QWEN_TELEMETRY_USE_COLLECTOR` | - | Use external OTLP collector (advanced) | `true`/`false` | `false` | **Note on boolean environment variables:** For the boolean settings (`enabled`, `logPrompts`, `useCollector`), setting the corresponding environment variable to `true` or `1` will enable the feature. Any other value will disable it. +**HTTP OTLP signal routing:** When using HTTP protocol (`otlpProtocol: "http"`), +Qwen Code automatically appends signal-specific paths (`/v1/traces`, `/v1/logs`, +`/v1/metrics`) to the base `otlpEndpoint`. For example, `http://collector:4318` +becomes `http://collector:4318/v1/traces` for traces. If the URL already ends +with a signal path, it is used as-is. Per-signal endpoint overrides +(`otlpTracesEndpoint`, etc.) take precedence over the base endpoint and are used +verbatim. gRPC protocol uses service-based routing and does not append paths. + +The per-signal endpoint environment variables also accept the standard +OpenTelemetry names: `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`, +`OTEL_EXPORTER_OTLP_LOGS_ENDPOINT`, `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`. +The `QWEN_TELEMETRY_OTLP_*` variants take precedence over the `OTEL_*` variants. + For detailed information about all configuration options, see the [Configuration Guide](./cli/configuration.md). @@ -91,6 +107,9 @@ sent to Alibaba Cloud. 1. Enable telemetry in your `.qwen/settings.json` and set the OTLP endpoint: + + **Option A: gRPC protocol** (standard OTLP endpoint): + ```json { "telemetry": { @@ -101,6 +120,29 @@ sent to Alibaba Cloud. } } ``` + + **Option B: HTTP protocol with per-signal endpoints** (for backends + that use non-standard paths, e.g., `/api/otlp/traces` instead of + `/v1/traces`): + + ```json + { + "telemetry": { + "enabled": true, + "otlpProtocol": "http", + "otlpTracesEndpoint": "http:////api/otlp/traces", + "otlpLogsEndpoint": "http:////api/otlp/logs", + "otlpMetricsEndpoint": "http:////api/otlp/metrics" + } + } + ``` + + > **Note:** When using HTTP protocol with only `otlpEndpoint` (no + > per-signal overrides), Qwen Code appends standard OTLP paths + > (`/v1/traces`, `/v1/logs`, `/v1/metrics`) to the base URL. If your + > backend uses different paths, use per-signal endpoint overrides as + > shown in Option B. + 2. If your Alibaba Cloud endpoint requires authentication, provide OTLP headers through standard OpenTelemetry environment variables such as `OTEL_EXPORTER_OTLP_HEADERS` (or the signal-specific variants). Qwen diff --git a/packages/cli/src/gemini.test.tsx b/packages/cli/src/gemini.test.tsx index 615e99ace..6820cbd61 100644 --- a/packages/cli/src/gemini.test.tsx +++ b/packages/cli/src/gemini.test.tsx @@ -493,11 +493,19 @@ describe('gemini.tsx main function kitty protocol', () => { let setRawModeSpy: MockInstance< (mode: boolean) => NodeJS.ReadStream & { fd: 0 } >; + let initialSigintListeners: NodeJS.SignalsListener[]; + let initialSigtermListeners: NodeJS.SignalsListener[]; beforeEach(() => { // Set no relaunch in tests since process spawning causing issues in tests originalEnvNoRelaunch = process.env['QWEN_CODE_NO_RELAUNCH']; process.env['QWEN_CODE_NO_RELAUNCH'] = 'true'; + initialSigintListeners = process.listeners( + 'SIGINT', + ) as NodeJS.SignalsListener[]; + initialSigtermListeners = process.listeners( + 'SIGTERM', + ) as NodeJS.SignalsListener[]; // eslint-disable-next-line @typescript-eslint/no-explicit-any if (!(process.stdin as any).setRawMode) { @@ -517,12 +525,24 @@ describe('gemini.tsx main function kitty protocol', () => { }); afterEach(() => { + for (const listener of process.listeners('SIGINT')) { + if (!initialSigintListeners.includes(listener)) { + process.removeListener('SIGINT', listener as NodeJS.SignalsListener); + } + } + for (const listener of process.listeners('SIGTERM')) { + if (!initialSigtermListeners.includes(listener)) { + process.removeListener('SIGTERM', listener as NodeJS.SignalsListener); + } + } + // Restore original env variables if (originalEnvNoRelaunch !== undefined) { process.env['QWEN_CODE_NO_RELAUNCH'] = originalEnvNoRelaunch; } else { delete process.env['QWEN_CODE_NO_RELAUNCH']; } + vi.restoreAllMocks(); }); it('should call setRawMode and detectAndEnableKittyProtocol when isInteractive is true', async () => { @@ -618,6 +638,79 @@ describe('gemini.tsx main function kitty protocol', () => { expect(setRawModeSpy).toHaveBeenCalledWith(true); expect(detectAndEnableKittyProtocol).toHaveBeenCalledTimes(1); }); + + it('should run cleanup before exiting on interactive SIGINT', async () => { + const { loadCliConfig, parseArguments } = await import( + './config/config.js' + ); + const { loadSettings } = await import('./config/settings.js'); + const cleanupModule = await import('./utils/cleanup.js'); + const signalHandlers = new Map void>(); + const processOnceSpy = vi.spyOn(process, 'once').mockImplementation((( + eventName: string | symbol, + listener: (...args: unknown[]) => void, + ) => { + if (eventName === 'SIGTERM' || eventName === 'SIGINT') { + signalHandlers.set(eventName, listener); + } + return process; + }) as typeof process.once); + const processExitSpy = vi + .spyOn(process, 'exit') + .mockImplementation((() => undefined) as unknown as typeof process.exit); + const runExitCleanupMock = vi.mocked(cleanupModule.runExitCleanup); + runExitCleanupMock.mockResolvedValue(undefined); + + vi.mocked(loadCliConfig).mockResolvedValue({ + isInteractive: () => true, + getQuestion: () => '', + getSandbox: () => false, + getDebugMode: () => false, + getListExtensions: () => false, + getMcpServers: () => ({}), + initialize: vi.fn(), + getIdeMode: () => false, + getExperimentalZedIntegration: () => false, + getScreenReader: () => false, + getGeminiMdFileCount: () => 0, + getWarnings: () => [], + getModelsConfig: () => ({ + getCurrentAuthType: () => null, + getGenerationConfig: () => ({}), + }), + getProxy: () => undefined, + getUsageStatisticsEnabled: () => true, + getSessionId: () => 'test-session-id', + } as unknown as Config); + vi.mocked(loadSettings).mockReturnValue({ + errors: [], + merged: { + advanced: {}, + security: { auth: {} }, + ui: {}, + }, + setValue: vi.fn(), + forScope: () => ({ settings: {}, originalSettings: {}, path: '' }), + migrationWarnings: [], + getUserHooks: () => undefined, + getProjectHooks: () => undefined, + } as never); + vi.mocked(parseArguments).mockResolvedValue({ + extensions: undefined, + } as never); + + await main(); + signalHandlers.get('SIGINT')?.(); + await Promise.resolve(); + await Promise.resolve(); + + expect(setRawModeSpy).toHaveBeenCalledWith(false); + expect(runExitCleanupMock).toHaveBeenCalledTimes(1); + expect(processExitSpy).toHaveBeenCalledWith(130); + + processOnceSpy.mockRestore(); + processExitSpy.mockRestore(); + }); }); describe('validateDnsResolutionOrder', () => { diff --git a/packages/cli/src/gemini.tsx b/packages/cli/src/gemini.tsx index 1add91580..1020f65e9 100644 --- a/packages/cli/src/gemini.tsx +++ b/packages/cli/src/gemini.tsx @@ -164,6 +164,48 @@ ${reason.stack}` }); } +function getSignalExitCode(signal: NodeJS.Signals): number { + return signal === 'SIGINT' ? 130 : 143; +} + +function installInteractiveSignalHandlers(wasRaw: boolean): () => void { + let cleanupStarted = false; + + const handleSignal = (signal: NodeJS.Signals) => { + if (process.stdin.isTTY) { + process.stdin.setRawMode(wasRaw); + } + + if (cleanupStarted) { + return; + } + cleanupStarted = true; + + void runExitCleanup() + .catch((error) => { + debugLogger.error(`Error during ${signal} cleanup:`, error); + }) + .finally(() => { + process.exit(getSignalExitCode(signal)); + }); + }; + + const handleSigterm = () => { + handleSignal('SIGTERM'); + }; + const handleSigint = () => { + handleSignal('SIGINT'); + }; + + process.once('SIGTERM', handleSigterm); + process.once('SIGINT', handleSigint); + + return () => { + process.removeListener('SIGTERM', handleSigterm); + process.removeListener('SIGINT', handleSigint); + }; +} + export async function startInteractiveUI( config: Config, settings: LoadedSettings, @@ -559,6 +601,9 @@ export async function main() { const wasRaw = process.stdin.isRaw; let kittyProtocolDetectionComplete: Promise | undefined; let themeAutoDetectionComplete: Promise | undefined; + if (config.isInteractive()) { + registerCleanup(installInteractiveSignalHandlers(wasRaw)); + } if (config.isInteractive() && !wasRaw && process.stdin.isTTY) { // Set this as early as possible to avoid spurious characters from // input showing up in the output. @@ -569,14 +614,6 @@ export async function main() { // Ensure the stdin listener is removed on any exit path (error, signal, etc.) registerCleanup(() => stopAndGetCapturedInput()); - // This cleanup isn't strictly needed but may help in certain situations. - process.on('SIGTERM', () => { - process.stdin.setRawMode(wasRaw); - }); - process.on('SIGINT', () => { - process.stdin.setRawMode(wasRaw); - }); - // Detect and enable Kitty keyboard protocol once at startup. kittyProtocolDetectionComplete = detectAndEnableKittyProtocol(); diff --git a/packages/core/src/config/config.test.ts b/packages/core/src/config/config.test.ts index 7ff007ce9..0ee260852 100644 --- a/packages/core/src/config/config.test.ts +++ b/packages/core/src/config/config.test.ts @@ -15,6 +15,8 @@ import { DEFAULT_TELEMETRY_TARGET, DEFAULT_OTLP_ENDPOINT, QwenLogger, + isTelemetrySdkInitialized, + shutdownTelemetry, } from '../telemetry/index.js'; import type { ContentGenerator, @@ -177,6 +179,8 @@ vi.mock('../telemetry/index.js', async (importOriginal) => { return { ...actual, initializeTelemetry: vi.fn(), + isTelemetrySdkInitialized: vi.fn(() => false), + shutdownTelemetry: vi.fn().mockResolvedValue(undefined), uiTelemetryService: { getLastPromptTokenCount: vi.fn(), }, @@ -275,6 +279,7 @@ describe('Server Config (config.ts)', () => { beforeEach(() => { // Reset mocks if necessary vi.clearAllMocks(); + vi.mocked(isTelemetrySdkInitialized).mockReturnValue(false); vi.spyOn(QwenLogger.prototype, 'logStartSessionEvent').mockImplementation( async () => undefined, ); @@ -908,6 +913,32 @@ describe('Server Config (config.ts)', () => { expect(config.getTelemetryEnabled()).toBe(true); }); + it('Config shutdown should flush telemetry when SDK is initialized', async () => { + const paramsWithTelemetry: ConfigParameters = { + ...baseParams, + telemetry: { enabled: true }, + }; + vi.mocked(isTelemetrySdkInitialized).mockReturnValue(true); + const config = new Config(paramsWithTelemetry); + + await config.shutdown(); + + expect(shutdownTelemetry).toHaveBeenCalledTimes(1); + }); + + it('Config shutdown should skip telemetry shutdown before SDK initialization', async () => { + const paramsWithTelemetry: ConfigParameters = { + ...baseParams, + telemetry: { enabled: true }, + }; + vi.mocked(isTelemetrySdkInitialized).mockReturnValue(false); + const config = new Config(paramsWithTelemetry); + + await config.shutdown(); + + expect(shutdownTelemetry).not.toHaveBeenCalled(); + }); + it('Config constructor should set telemetry to false when provided as false', () => { const paramsWithTelemetry: ConfigParameters = { ...baseParams, @@ -1083,6 +1114,41 @@ describe('Server Config (config.ts)', () => { }); }); + describe('Per-Signal OTLP Endpoint Configuration', () => { + it('should return per-signal endpoints when provided', () => { + const params: ConfigParameters = { + ...baseParams, + telemetry: { + enabled: true, + otlpTracesEndpoint: 'http://traces:4318/v1/traces', + otlpLogsEndpoint: 'http://logs:4318/v1/logs', + otlpMetricsEndpoint: 'http://metrics:4318/v1/metrics', + }, + }; + const config = new Config(params); + expect(config.getTelemetryOtlpTracesEndpoint()).toBe( + 'http://traces:4318/v1/traces', + ); + expect(config.getTelemetryOtlpLogsEndpoint()).toBe( + 'http://logs:4318/v1/logs', + ); + expect(config.getTelemetryOtlpMetricsEndpoint()).toBe( + 'http://metrics:4318/v1/metrics', + ); + }); + + it('should return undefined when per-signal endpoints are not provided', () => { + const params: ConfigParameters = { + ...baseParams, + telemetry: { enabled: true }, + }; + const config = new Config(params); + expect(config.getTelemetryOtlpTracesEndpoint()).toBeUndefined(); + expect(config.getTelemetryOtlpLogsEndpoint()).toBeUndefined(); + expect(config.getTelemetryOtlpMetricsEndpoint()).toBeUndefined(); + }); + }); + describe('UseRipgrep Configuration', () => { it('should default useRipgrep to true when not provided', () => { const config = new Config(baseParams); diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index 0f80ca112..0da98109e 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -67,7 +67,9 @@ import { FileReadCache } from '../services/fileReadCache.js'; import { DEFAULT_OTLP_ENDPOINT, DEFAULT_TELEMETRY_TARGET, + isTelemetrySdkInitialized, initializeTelemetry, + shutdownTelemetry, logStartSession, logRipgrepFallback, RipgrepFallbackEvent, @@ -217,6 +219,12 @@ export interface TelemetrySettings { target?: TelemetryTarget; otlpEndpoint?: string; otlpProtocol?: 'grpc' | 'http'; + /** Per-signal endpoint override for traces (HTTP only). Used as-is without path appending. */ + otlpTracesEndpoint?: string; + /** Per-signal endpoint override for logs (HTTP only). Used as-is without path appending. */ + otlpLogsEndpoint?: string; + /** Per-signal endpoint override for metrics (HTTP only). Used as-is without path appending. */ + otlpMetricsEndpoint?: string; logPrompts?: boolean; outfile?: string; useCollector?: boolean; @@ -758,8 +766,11 @@ export class Config { this.telemetrySettings = { enabled: params.telemetry?.enabled ?? false, target: params.telemetry?.target ?? DEFAULT_TELEMETRY_TARGET, - otlpEndpoint: params.telemetry?.otlpEndpoint ?? DEFAULT_OTLP_ENDPOINT, + otlpEndpoint: params.telemetry?.otlpEndpoint, otlpProtocol: params.telemetry?.otlpProtocol, + otlpTracesEndpoint: params.telemetry?.otlpTracesEndpoint, + otlpLogsEndpoint: params.telemetry?.otlpLogsEndpoint, + otlpMetricsEndpoint: params.telemetry?.otlpMetricsEndpoint, logPrompts: params.telemetry?.logPrompts ?? true, outfile: params.telemetry?.outfile, useCollector: params.telemetry?.useCollector, @@ -1636,11 +1647,12 @@ export class Config { * It handles the case where initialization was not completed. */ async shutdown(): Promise { - if (!this.initialized) { - // Nothing to clean up if not initialized - return; - } try { + if (!this.initialized) { + // Nothing else to clean up if not initialized. + return; + } + // Finalize the current session's metadata before cleanup, then drain // the async write queue so no records are lost on exit. try { @@ -1663,6 +1675,10 @@ export class Config { } catch (error) { // Log but don't throw - cleanup should be best-effort this.debugLogger.error('Error during Config shutdown:', error); + } finally { + if (isTelemetrySdkInitialized()) { + await shutdownTelemetry(); + } } } @@ -2003,7 +2019,7 @@ export class Config { return this.telemetrySettings.logPrompts ?? true; } - getTelemetryOtlpEndpoint(): string { + getTelemetryOtlpEndpoint(): string | undefined { return this.telemetrySettings.otlpEndpoint ?? DEFAULT_OTLP_ENDPOINT; } @@ -2011,6 +2027,18 @@ export class Config { return this.telemetrySettings.otlpProtocol ?? 'grpc'; } + getTelemetryOtlpTracesEndpoint(): string | undefined { + return this.telemetrySettings.otlpTracesEndpoint; + } + + getTelemetryOtlpLogsEndpoint(): string | undefined { + return this.telemetrySettings.otlpLogsEndpoint; + } + + getTelemetryOtlpMetricsEndpoint(): string | undefined { + return this.telemetrySettings.otlpMetricsEndpoint; + } + getTelemetryTarget(): TelemetryTarget { return this.telemetrySettings.target ?? DEFAULT_TELEMETRY_TARGET; } diff --git a/packages/core/src/telemetry/config.test.ts b/packages/core/src/telemetry/config.test.ts index 443282fd4..f1c0d3f07 100644 --- a/packages/core/src/telemetry/config.test.ts +++ b/packages/core/src/telemetry/config.test.ts @@ -65,7 +65,12 @@ describe('telemetry/config helpers', () => { useCollector: false, }; const resolved = await resolveTelemetrySettings({ settings }); - expect(resolved).toEqual(settings); + expect(resolved).toEqual({ + ...settings, + otlpTracesEndpoint: undefined, + otlpLogsEndpoint: undefined, + otlpMetricsEndpoint: undefined, + }); }); it('uses env over settings and argv over env', async () => { @@ -102,6 +107,9 @@ describe('telemetry/config helpers', () => { target: TelemetryTarget.GCP, otlpEndpoint: 'http://env:4317', otlpProtocol: 'http', + otlpTracesEndpoint: undefined, + otlpLogsEndpoint: undefined, + otlpMetricsEndpoint: undefined, logPrompts: true, outfile: 'env.log', useCollector: true, @@ -117,6 +125,9 @@ describe('telemetry/config helpers', () => { target: TelemetryTarget.LOCAL, otlpEndpoint: 'http://argv:4317', otlpProtocol: 'grpc', + otlpTracesEndpoint: undefined, + otlpLogsEndpoint: undefined, + otlpMetricsEndpoint: undefined, logPrompts: false, outfile: 'argv.log', useCollector: true, // from env as no argv option @@ -151,5 +162,46 @@ describe('telemetry/config helpers', () => { /Invalid telemetry target/i, ); }); + + it('resolves per-signal endpoints from OTEL_ env vars', async () => { + const env = { + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: 'http://traces:4318/v1/traces', + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: 'http://logs:4318/v1/logs', + } as Record; + + const resolved = await resolveTelemetrySettings({ env }); + expect(resolved.otlpTracesEndpoint).toBe('http://traces:4318/v1/traces'); + expect(resolved.otlpLogsEndpoint).toBe('http://logs:4318/v1/logs'); + expect(resolved.otlpMetricsEndpoint).toBeUndefined(); + }); + + it('QWEN_ env vars take precedence over OTEL_ vars for per-signal endpoints', async () => { + const env = { + QWEN_TELEMETRY_OTLP_TRACES_ENDPOINT: + 'http://qwen-traces:4318/v1/traces', + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: 'http://otel-traces:4318/v1/traces', + } as Record; + + const resolved = await resolveTelemetrySettings({ env }); + expect(resolved.otlpTracesEndpoint).toBe( + 'http://qwen-traces:4318/v1/traces', + ); + }); + + it('resolves per-signal endpoints from settings', async () => { + const settings = { + otlpTracesEndpoint: 'http://traces-settings:4318/v1/traces', + otlpMetricsEndpoint: 'http://metrics-settings:4318/v1/metrics', + }; + + const resolved = await resolveTelemetrySettings({ settings }); + expect(resolved.otlpTracesEndpoint).toBe( + 'http://traces-settings:4318/v1/traces', + ); + expect(resolved.otlpLogsEndpoint).toBeUndefined(); + expect(resolved.otlpMetricsEndpoint).toBe( + 'http://metrics-settings:4318/v1/metrics', + ); + }); }); }); diff --git a/packages/core/src/telemetry/config.ts b/packages/core/src/telemetry/config.ts index f1037e742..267124ed7 100644 --- a/packages/core/src/telemetry/config.ts +++ b/packages/core/src/telemetry/config.ts @@ -106,11 +106,31 @@ export async function resolveTelemetrySettings(options: { parseBooleanEnvFlag(env['QWEN_TELEMETRY_USE_COLLECTOR']) ?? settings.useCollector; + // Per-signal endpoint overrides (HTTP only). + // Priority: QWEN_ env var > standard OTEL_ env var > settings.json + const otlpTracesEndpoint = + env['QWEN_TELEMETRY_OTLP_TRACES_ENDPOINT'] ?? + env['OTEL_EXPORTER_OTLP_TRACES_ENDPOINT'] ?? + settings.otlpTracesEndpoint; + + const otlpLogsEndpoint = + env['QWEN_TELEMETRY_OTLP_LOGS_ENDPOINT'] ?? + env['OTEL_EXPORTER_OTLP_LOGS_ENDPOINT'] ?? + settings.otlpLogsEndpoint; + + const otlpMetricsEndpoint = + env['QWEN_TELEMETRY_OTLP_METRICS_ENDPOINT'] ?? + env['OTEL_EXPORTER_OTLP_METRICS_ENDPOINT'] ?? + settings.otlpMetricsEndpoint; + return { enabled, target, otlpEndpoint, otlpProtocol, + otlpTracesEndpoint, + otlpLogsEndpoint, + otlpMetricsEndpoint, logPrompts, outfile, useCollector, diff --git a/packages/core/src/telemetry/log-to-span-processor.test.ts b/packages/core/src/telemetry/log-to-span-processor.test.ts new file mode 100644 index 000000000..181b8cc0e --- /dev/null +++ b/packages/core/src/telemetry/log-to-span-processor.test.ts @@ -0,0 +1,386 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { SpanKind, SpanStatusCode, type HrTime } from '@opentelemetry/api'; +import { LogToSpanProcessor } from './log-to-span-processor.js'; +import type { ReadableLogRecord } from '@opentelemetry/sdk-logs'; +import type { SpanExporter } from '@opentelemetry/sdk-trace-base'; + +interface ExportedSpan { + name: string; + kind: number; + spanContext: () => { traceId: string; spanId: string; traceFlags: number }; + startTime: HrTime; + endTime: HrTime; + attributes: Record; + status: { code: number; message?: string }; +} + +describe('LogToSpanProcessor', () => { + let processor: LogToSpanProcessor; + let mockExporter: SpanExporter; + let exportedSpans: ExportedSpan[]; + + beforeEach(() => { + exportedSpans = []; + mockExporter = { + export: vi.fn((spans, cb) => { + exportedSpans.push(...spans); + cb({ code: 0 }); + }), + shutdown: vi.fn().mockResolvedValue(undefined), + forceFlush: vi.fn().mockResolvedValue(undefined), + } as unknown as SpanExporter; + processor = new LogToSpanProcessor(mockExporter, 60000); + }); + + afterEach(async () => { + await processor.shutdown(); + }); + + it('converts a log record to a span on flush', async () => { + const logRecord = { + body: 'test event', + hrTime: [1000, 500000000] as [number, number], + attributes: { key1: 'value1', key2: 42, key3: true }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans).toHaveLength(1); + const span = exportedSpans[0]; + expect(span.name).toBe('test event'); + expect(span.kind).toBe(SpanKind.INTERNAL); + expect(span.attributes['key1']).toBe('value1'); + expect(span.attributes['key2']).toBe(42); + expect(span.attributes['key3']).toBe(true); + expect(span.attributes['log.bridge']).toBe(true); + expect(span.startTime).toEqual([1000, 500000000]); + expect(span.endTime).toEqual([1000, 500000000]); + expect(span.status.code).toBe(SpanStatusCode.OK); + }); + + it('uses duration_ms to compute span end time', async () => { + const logRecord = { + body: 'api response', + hrTime: [1000, 0] as [number, number], + attributes: { duration_ms: 250 }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].endTime).toEqual([1000, 250000000]); + }); + + it('ignores non-finite duration_ms values', async () => { + const logRecord = { + body: 'api response', + hrTime: [1000, 0] as [number, number], + attributes: { duration_ms: Infinity }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].endTime).toEqual([1000, 0]); + }); + + it('handles duration_ms that causes second rollover', async () => { + const logRecord = { + body: 'long operation', + hrTime: [1000, 900000000] as [number, number], + attributes: { duration_ms: 500 }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].endTime).toEqual([1001, 400000000]); + }); + + it('serializes object attributes to JSON', async () => { + const logRecord = { + body: 'event with object', + hrTime: [1000, 0] as [number, number], + attributes: { metadata: { nested: true } }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].attributes['metadata']).toBe('{"nested":true}'); + }); + + it('handles unserializable object attributes safely', async () => { + const circular: Record = {}; + circular['self'] = circular; + const logRecord = { + body: 'event', + hrTime: [1000, 0] as [number, number], + attributes: { bad: circular }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].attributes['bad']).toBe('[unserializable]'); + }); + + it('drops sensitive attributes before exporting bridged spans', async () => { + const logRecord = { + body: 'event', + hrTime: [1000, 0] as [number, number], + attributes: { + prompt: 'secret prompt', + function_args: '{"token":"secret"}', + response_text: 'secret response', + safe: 'visible', + }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + const attrs = exportedSpans[0].attributes; + expect(attrs).not.toHaveProperty('prompt'); + expect(attrs).not.toHaveProperty('function_args'); + expect(attrs).not.toHaveProperty('response_text'); + expect(attrs['safe']).toBe('visible'); + expect(attrs['log.bridge']).toBe(true); + }); + + it('skips null and undefined attributes', async () => { + const logRecord = { + body: 'event', + hrTime: [1000, 0] as [number, number], + attributes: { valid: 'yes', nullVal: null, undefinedVal: undefined }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + const attrs = exportedSpans[0].attributes; + expect(attrs['valid']).toBe('yes'); + expect(attrs).not.toHaveProperty('nullVal'); + expect(attrs).not.toHaveProperty('undefinedVal'); + expect(attrs['log.bridge']).toBe(true); + }); + + it('uses "unknown" as span name when body is missing', async () => { + const logRecord = { + body: undefined, + hrTime: [1000, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].name).toBe('unknown'); + }); + + it('truncates long span names', async () => { + const longName = 'x'.repeat(200); + const logRecord = { + body: longName, + hrTime: [1000, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].name).toBe(`${'x'.repeat(128)}...`); + }); + + it('generates unique trace IDs without session.id', async () => { + const logRecord1 = { + body: 'event1', + hrTime: [1000, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord; + const logRecord2 = { + body: 'event2', + hrTime: [1001, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord1); + processor.onEmit(logRecord2); + await processor.forceFlush(); + + const ctx1 = exportedSpans[0].spanContext(); + const ctx2 = exportedSpans[1].spanContext(); + expect(ctx1.traceId).toHaveLength(32); + expect(ctx1.spanId).toHaveLength(16); + expect(ctx1.traceId).not.toBe(ctx2.traceId); + }); + + it('derives same traceId from same session.id', async () => { + const logRecord1 = { + body: 'event1', + hrTime: [1000, 0] as [number, number], + attributes: { 'session.id': 'session-abc' }, + } as unknown as ReadableLogRecord; + const logRecord2 = { + body: 'event2', + hrTime: [1001, 0] as [number, number], + attributes: { 'session.id': 'session-abc' }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord1); + processor.onEmit(logRecord2); + await processor.forceFlush(); + + const ctx1 = exportedSpans[0].spanContext(); + const ctx2 = exportedSpans[1].spanContext(); + expect(ctx1.traceId).toBe(ctx2.traceId); + expect(ctx1.spanId).not.toBe(ctx2.spanId); + }); + + it('derives different traceIds from different session.ids', async () => { + const logRecord1 = { + body: 'event1', + hrTime: [1000, 0] as [number, number], + attributes: { 'session.id': 'session-abc' }, + } as unknown as ReadableLogRecord; + const logRecord2 = { + body: 'event2', + hrTime: [1001, 0] as [number, number], + attributes: { 'session.id': 'session-xyz' }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord1); + processor.onEmit(logRecord2); + await processor.forceFlush(); + + const ctx1 = exportedSpans[0].spanContext(); + const ctx2 = exportedSpans[1].spanContext(); + expect(ctx1.traceId).not.toBe(ctx2.traceId); + }); + + it('sets ERROR status for truthy error attributes', async () => { + const logRecord = { + body: 'api error', + hrTime: [1000, 0] as [number, number], + attributes: { + error_message: 'connection refused', + error_type: 'NETWORK', + }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].status.code).toBe(SpanStatusCode.ERROR); + expect(exportedSpans[0].status.message).toBe('connection refused'); + }); + + it('does not set ERROR for success: false (normal decline)', async () => { + const logRecord = { + body: 'tool call declined', + hrTime: [1000, 0] as [number, number], + attributes: { success: false, function_name: 'bash' }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].status.code).toBe(SpanStatusCode.OK); + }); + + it('does not set ERROR for falsy error attributes', async () => { + const logRecord = { + body: 'ok event', + hrTime: [1000, 0] as [number, number], + attributes: { error: null, error_message: '', error_type: '' }, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].status.code).toBe(SpanStatusCode.OK); + }); + + it('preserves severity attributes', async () => { + const logRecord = { + body: 'event', + hrTime: [1000, 0] as [number, number], + attributes: {}, + severityNumber: 9, + severityText: 'INFO', + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.forceFlush(); + + expect(exportedSpans[0].attributes['log.severity_number']).toBe(9); + expect(exportedSpans[0].attributes['log.severity_text']).toBe('INFO'); + }); + + it('reuses in-flight exports and flushes queued spans afterwards', async () => { + await processor.shutdown(); + exportedSpans = []; + const exportCallbacks: Array<(result: { code: number }) => void> = []; + let exportCallCount = 0; + mockExporter = { + export: vi.fn((spans, cb) => { + exportCallCount += 1; + exportedSpans.push(...spans); + if (exportCallCount === 1) { + exportCallbacks.push(cb); + } else { + cb({ code: 0 }); + } + }), + shutdown: vi.fn().mockResolvedValue(undefined), + forceFlush: vi.fn().mockResolvedValue(undefined), + } as unknown as SpanExporter; + processor = new LogToSpanProcessor(mockExporter, 60000); + + processor.onEmit({ + body: 'first', + hrTime: [1000, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord); + const firstFlush = processor.forceFlush(); + await Promise.resolve(); + + processor.onEmit({ + body: 'second', + hrTime: [1001, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord); + const secondFlush = processor.forceFlush(); + await Promise.resolve(); + + expect(mockExporter.export).toHaveBeenCalledTimes(1); + expect(exportedSpans.map((span) => span.name)).toEqual(['first']); + + exportCallbacks[0]({ code: 0 }); + await Promise.all([firstFlush, secondFlush]); + + expect(mockExporter.export).toHaveBeenCalledTimes(2); + expect(exportedSpans.map((span) => span.name)).toEqual(['first', 'second']); + }); + + it('shutdown flushes remaining spans and shuts down exporter', async () => { + const logRecord = { + body: 'final event', + hrTime: [1000, 0] as [number, number], + attributes: {}, + } as unknown as ReadableLogRecord; + + processor.onEmit(logRecord); + await processor.shutdown(); + + expect(exportedSpans).toHaveLength(1); + expect(mockExporter.shutdown).toHaveBeenCalled(); + }); +}); diff --git a/packages/core/src/telemetry/log-to-span-processor.ts b/packages/core/src/telemetry/log-to-span-processor.ts new file mode 100644 index 000000000..cdf57830b --- /dev/null +++ b/packages/core/src/telemetry/log-to-span-processor.ts @@ -0,0 +1,288 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { SpanKind, SpanStatusCode, type HrTime } from '@opentelemetry/api'; +import type { + LogRecordProcessor, + ReadableLogRecord, +} from '@opentelemetry/sdk-logs'; +import type { SpanExporter, ReadableSpan } from '@opentelemetry/sdk-trace-base'; +import { + type Resource, + resourceFromAttributes, +} from '@opentelemetry/resources'; + +import { createHash } from 'node:crypto'; + +import { SERVICE_NAME } from './constants.js'; + +const EXPORT_TIMEOUT_MS = 30_000; +const MAX_SPAN_NAME_LENGTH = 128; +const SENSITIVE_ATTRIBUTE_KEYS = new Set([ + 'prompt', + 'function_args', + 'response_text', +]); + +/** + * A LogRecordProcessor that converts each OTel log record into a span + * and exports it directly through the provided SpanExporter. + * + * This bridges the gap for backends (e.g., Alibaba Cloud) that support + * traces and metrics but not logs over OTLP. Instead of going through + * the global TracerProvider (which can break in bundled environments), + * this processor directly constructs ReadableSpan objects and feeds + * them to the exporter. + * + * When a log record has a `duration_ms` attribute, the resulting span + * will have a matching duration. Otherwise, the span is instantaneous. + */ +export class LogToSpanProcessor implements LogRecordProcessor { + private buffer: ReadableSpanLike[] = []; + private flushTimer: ReturnType | undefined; + private inFlightExport: Promise | undefined; + private readonly flushIntervalMs: number; + + constructor( + private readonly spanExporter: SpanExporter, + flushIntervalMs = 5000, + ) { + this.flushIntervalMs = flushIntervalMs; + this.flushTimer = setInterval(() => { + void this.flush(); + }, this.flushIntervalMs); + this.flushTimer.unref(); + } + + onEmit(logRecord: ReadableLogRecord): void { + const name = sanitizeSpanName(logRecord.body); + const startTime = logRecord.hrTime; + + const attributes: Record = {}; + if (logRecord.attributes) { + for (const [key, value] of Object.entries(logRecord.attributes)) { + if ( + value !== undefined && + value !== null && + !SENSITIVE_ATTRIBUTE_KEYS.has(key) + ) { + attributes[key] = + typeof value === 'object' + ? safeStringify(value) + : (value as string | number | boolean); + } + } + } + attributes['log.bridge'] = true; + + // Preserve severity so downstream queries can filter by log level. + if (logRecord.severityNumber !== undefined) { + attributes['log.severity_number'] = logRecord.severityNumber; + } + if (logRecord.severityText) { + attributes['log.severity_text'] = logRecord.severityText; + } + + let endTime = startTime; + const durationMs = logRecord.attributes?.['duration_ms']; + if ( + typeof durationMs === 'number' && + Number.isFinite(durationMs) && + durationMs > 0 + ) { + const [secs, nanos] = startTime; + const durationNanos = durationMs * 1_000_000; + const endNanos = nanos + durationNanos; + endTime = [secs + Math.floor(endNanos / 1e9), endNanos % 1e9] as HrTime; + } + + // Derive traceId from session.id so all events in one session + // appear under a single trace. spanId is random per event. + const sessionId = logRecord.attributes?.['session.id']; + const traceId = sessionId + ? deriveTraceId(String(sessionId)) + : randomHexString(32); + const spanId = randomHexString(16); + + this.buffer.push({ + name, + kind: SpanKind.INTERNAL, + spanContext: () => ({ + traceId, + spanId, + traceFlags: 1, // SAMPLED + }), + startTime, + endTime, + duration: hrTimeDiff(startTime, endTime), + attributes, + status: deriveSpanStatus(logRecord.attributes), + events: [], + links: [], + resource: logRecord.resource ?? resourceFromAttributes({}), + instrumentationScope: logRecord.instrumentationScope ?? { + name: SERVICE_NAME, + version: '', + }, + ended: true, + parentSpanContext: undefined, + droppedAttributesCount: 0, + droppedEventsCount: 0, + droppedLinksCount: 0, + recordException: () => {}, + }); + } + + private flush(): Promise { + if (this.inFlightExport) return this.inFlightExport; + if (this.buffer.length === 0) return Promise.resolve(); + const spans = this.buffer.splice(0); + const exportPromise = new Promise((resolve) => { + const timeout = setTimeout(() => { + process.stderr.write( + `[LogToSpan] export timeout after ${EXPORT_TIMEOUT_MS}ms\n`, + ); + resolve(); + }, EXPORT_TIMEOUT_MS); + timeout.unref(); + + try { + this.spanExporter.export( + spans as unknown as ReadableSpan[], + (result) => { + clearTimeout(timeout); + if (result.code !== 0) { + process.stderr.write( + `[LogToSpan] export failed: code=${result.code} error=${result.error?.message ?? 'unknown'}\n`, + ); + } + resolve(); + }, + ); + } catch (err) { + clearTimeout(timeout); + process.stderr.write( + `[LogToSpan] export threw: ${err instanceof Error ? err.message : String(err)}\n`, + ); + resolve(); + } + }); + this.inFlightExport = exportPromise.finally(() => { + this.inFlightExport = undefined; + }); + return this.inFlightExport; + } + + async shutdown(): Promise { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = undefined; + } + // Wait for any in-flight interval-triggered export before final flush. + if (this.inFlightExport) { + await this.inFlightExport; + } + await this.flush(); + await this.spanExporter.shutdown(); + } + + async forceFlush(): Promise { + if (this.inFlightExport) { + await this.inFlightExport; + } + await this.flush(); + await this.spanExporter.forceFlush?.(); + } +} + +interface ReadableSpanLike { + name: string; + kind: SpanKind; + spanContext: () => { traceId: string; spanId: string; traceFlags: number }; + startTime: HrTime; + endTime: HrTime; + duration: HrTime; + attributes: Record; + status: { code: SpanStatusCode; message?: string }; + events: never[]; + links: never[]; + resource: Resource; + instrumentationScope: { name: string; version?: string; schemaUrl?: string }; + ended: boolean; + parentSpanContext?: { traceId: string; spanId: string; traceFlags: number }; + droppedAttributesCount: number; + droppedEventsCount: number; + droppedLinksCount: number; + recordException: () => void; +} + +function sanitizeSpanName(body: unknown): string { + const rawName = String(body ?? 'unknown'); + return rawName.length > MAX_SPAN_NAME_LENGTH + ? `${rawName.slice(0, MAX_SPAN_NAME_LENGTH)}...` + : rawName; +} + +/** + * Safely stringify an object value for use as a span attribute. + * Returns a bounded fallback when JSON serialization fails, such as for + * circular references or BigInt values. + */ +function safeStringify(value: unknown): string { + try { + return JSON.stringify(value); + } catch { + return '[unserializable]'; + } +} + +function randomHexString(length: number): string { + const bytes = new Uint8Array(length / 2); + crypto.getRandomValues(bytes); + return Array.from(bytes, (b) => b.toString(16).padStart(2, '0')).join(''); +} + +/** + * Derive a deterministic 32-char hex traceId from a session ID. + * All events in the same session will share this traceId, + * making them appear under a single trace in the backend. + * Uses SHA-256 truncated to 32 hex chars (128 bits) to match the + * OTel trace ID format. + */ +function deriveTraceId(sessionId: string): string { + return createHash('sha256').update(sessionId).digest('hex').slice(0, 32); +} + +/** + * Derive span status from log record attributes. + * Marks the span as ERROR when explicit error indicators are present + * (truthy `error`, `error_message`, or `error_type` attributes). + * Does NOT treat `success: false` as an error — declined/cancelled + * operations are a normal outcome, not failures. + */ +function deriveSpanStatus(attrs: Record | undefined): { + code: SpanStatusCode; + message?: string; +} { + if (!attrs) return { code: SpanStatusCode.OK }; + if (!!attrs['error'] || !!attrs['error_message'] || !!attrs['error_type']) { + const msg = String( + attrs['error_message'] ?? attrs['error'] ?? attrs['error_type'] ?? '', + ); + return { code: SpanStatusCode.ERROR, ...(msg && { message: msg }) }; + } + return { code: SpanStatusCode.OK }; +} + +function hrTimeDiff(start: HrTime, end: HrTime): HrTime { + let secs = end[0] - start[0]; + let nanos = end[1] - start[1]; + if (nanos < 0) { + secs -= 1; + nanos += 1e9; + } + return [secs, nanos] as HrTime; +} diff --git a/packages/core/src/telemetry/sdk.test.ts b/packages/core/src/telemetry/sdk.test.ts index 9274631db..f88e9b9aa 100644 --- a/packages/core/src/telemetry/sdk.test.ts +++ b/packages/core/src/telemetry/sdk.test.ts @@ -5,8 +5,14 @@ */ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { diag } from '@opentelemetry/api'; import type { Config } from '../config/config.js'; -import { initializeTelemetry, shutdownTelemetry } from './sdk.js'; +import { + initializeTelemetry, + isTelemetrySdkInitialized, + shutdownTelemetry, + resolveHttpOtlpUrl, +} from './sdk.js'; import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc'; import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-grpc'; import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-grpc'; @@ -27,6 +33,68 @@ vi.mock('@opentelemetry/exporter-logs-otlp-http'); vi.mock('@opentelemetry/exporter-metrics-otlp-http'); vi.mock('@opentelemetry/sdk-node'); vi.mock('./gcp-exporters.js'); +vi.mock('./log-to-span-processor.js'); + +import { LogToSpanProcessor } from './log-to-span-processor.js'; + +describe('resolveHttpOtlpUrl', () => { + it('appends signal path to base collector URL', () => { + expect(resolveHttpOtlpUrl('http://collector:4318', 'traces')).toBe( + 'http://collector:4318/v1/traces', + ); + expect(resolveHttpOtlpUrl('http://collector:4318', 'logs')).toBe( + 'http://collector:4318/v1/logs', + ); + expect(resolveHttpOtlpUrl('http://collector:4318', 'metrics')).toBe( + 'http://collector:4318/v1/metrics', + ); + }); + + it('handles trailing slash in base URL', () => { + expect(resolveHttpOtlpUrl('http://collector:4318/', 'traces')).toBe( + 'http://collector:4318/v1/traces', + ); + expect(resolveHttpOtlpUrl('http://collector:4318/', 'logs')).toBe( + 'http://collector:4318/v1/logs', + ); + }); + + it('preserves explicit full signal path URL', () => { + expect( + resolveHttpOtlpUrl('http://collector:4318/v1/traces', 'traces'), + ).toBe('http://collector:4318/v1/traces'); + expect(resolveHttpOtlpUrl('http://collector:4318/v1/logs', 'logs')).toBe( + 'http://collector:4318/v1/logs', + ); + expect( + resolveHttpOtlpUrl('http://collector:4318/v1/metrics', 'metrics'), + ).toBe('http://collector:4318/v1/metrics'); + }); + + it('appends signal path when URL has a non-signal custom path', () => { + expect( + resolveHttpOtlpUrl('http://collector:4318/custom/prefix', 'traces'), + ).toBe('http://collector:4318/custom/prefix/v1/traces'); + }); + + it('handles HTTPS URLs', () => { + expect(resolveHttpOtlpUrl('https://otel.example.com', 'logs')).toBe( + 'https://otel.example.com/v1/logs', + ); + expect(resolveHttpOtlpUrl('https://otel.example.com:4318', 'metrics')).toBe( + 'https://otel.example.com:4318/v1/metrics', + ); + }); + + it('preserves query strings when appending signal paths', () => { + expect(resolveHttpOtlpUrl('https://host/otlp?token=abc', 'traces')).toBe( + 'https://host/otlp/v1/traces?token=abc', + ); + expect( + resolveHttpOtlpUrl('https://host/otlp?token=abc&foo=bar', 'logs'), + ).toBe('https://host/otlp/v1/logs?token=abc&foo=bar'); + }); +}); describe('Telemetry SDK', () => { let mockConfig: Config; @@ -37,6 +105,9 @@ describe('Telemetry SDK', () => { getTelemetryEnabled: () => true, getTelemetryOtlpEndpoint: () => 'http://localhost:4317', getTelemetryOtlpProtocol: () => 'grpc', + getTelemetryOtlpTracesEndpoint: () => undefined, + getTelemetryOtlpLogsEndpoint: () => undefined, + getTelemetryOtlpMetricsEndpoint: () => undefined, getTelemetryTarget: () => 'local', getTelemetryUseCollector: () => false, getTelemetryOutfile: () => undefined, @@ -67,7 +138,7 @@ describe('Telemetry SDK', () => { expect(NodeSDK.prototype.start).toHaveBeenCalled(); }); - it('should use HTTP exporters when protocol is http', () => { + it('should use HTTP exporters with signal-specific paths when protocol is http', () => { vi.spyOn(mockConfig, 'getTelemetryEnabled').mockReturnValue(true); vi.spyOn(mockConfig, 'getTelemetryOtlpProtocol').mockReturnValue('http'); vi.spyOn(mockConfig, 'getTelemetryOtlpEndpoint').mockReturnValue( @@ -77,13 +148,13 @@ describe('Telemetry SDK', () => { initializeTelemetry(mockConfig); expect(OTLPTraceExporterHttp).toHaveBeenCalledWith({ - url: 'http://localhost:4318/', + url: 'http://localhost:4318/v1/traces', }); expect(OTLPLogExporterHttp).toHaveBeenCalledWith({ - url: 'http://localhost:4318/', + url: 'http://localhost:4318/v1/logs', }); expect(OTLPMetricExporterHttp).toHaveBeenCalledWith({ - url: 'http://localhost:4318/', + url: 'http://localhost:4318/v1/metrics', }); expect(NodeSDK.prototype.start).toHaveBeenCalled(); }); @@ -98,15 +169,92 @@ describe('Telemetry SDK', () => { ); }); - it('should parse HTTP endpoint correctly', () => { + it('should append signal paths to HTTP endpoint', () => { vi.spyOn(mockConfig, 'getTelemetryOtlpProtocol').mockReturnValue('http'); vi.spyOn(mockConfig, 'getTelemetryOtlpEndpoint').mockReturnValue( 'https://my-collector.com', ); initializeTelemetry(mockConfig); expect(OTLPTraceExporterHttp).toHaveBeenCalledWith( - expect.objectContaining({ url: 'https://my-collector.com/' }), + expect.objectContaining({ url: 'https://my-collector.com/v1/traces' }), ); + expect(OTLPLogExporterHttp).toHaveBeenCalledWith( + expect.objectContaining({ url: 'https://my-collector.com/v1/logs' }), + ); + expect(OTLPMetricExporterHttp).toHaveBeenCalledWith( + expect.objectContaining({ url: 'https://my-collector.com/v1/metrics' }), + ); + }); + + it('should use per-signal endpoint overrides when provided', () => { + vi.spyOn(mockConfig, 'getTelemetryOtlpProtocol').mockReturnValue('http'); + vi.spyOn(mockConfig, 'getTelemetryOtlpEndpoint').mockReturnValue( + 'http://default-collector:4318', + ); + vi.spyOn(mockConfig, 'getTelemetryOtlpTracesEndpoint').mockReturnValue( + 'http://traces-collector:4318/v1/traces', + ); + + initializeTelemetry(mockConfig); + + // Traces uses the per-signal override + expect(OTLPTraceExporterHttp).toHaveBeenCalledWith({ + url: 'http://traces-collector:4318/v1/traces', + }); + // Logs and metrics use the base endpoint with paths appended + expect(OTLPLogExporterHttp).toHaveBeenCalledWith({ + url: 'http://default-collector:4318/v1/logs', + }); + expect(OTLPMetricExporterHttp).toHaveBeenCalledWith({ + url: 'http://default-collector:4318/v1/metrics', + }); + }); + + it('should use per-signal overrides without base endpoint', () => { + vi.spyOn(mockConfig, 'getTelemetryOtlpProtocol').mockReturnValue('http'); + vi.spyOn(mockConfig, 'getTelemetryOtlpEndpoint').mockReturnValue(''); + vi.spyOn(mockConfig, 'getTelemetryOtlpTracesEndpoint').mockReturnValue( + 'http://traces-host/token/api/otlp/traces', + ); + vi.spyOn(mockConfig, 'getTelemetryOtlpMetricsEndpoint').mockReturnValue( + 'http://metrics-host/token/api/otlp/metrics', + ); + // logs has no override and no base endpoint + + initializeTelemetry(mockConfig); + + // Traces and metrics use per-signal override + expect(OTLPTraceExporterHttp).toHaveBeenCalledWith({ + url: 'http://traces-host/token/api/otlp/traces', + }); + expect(OTLPMetricExporterHttp).toHaveBeenCalledWith({ + url: 'http://metrics-host/token/api/otlp/metrics', + }); + // Logs falls back to LogToSpanProcessor (bridges logs → spans) + expect(OTLPLogExporterHttp).not.toHaveBeenCalled(); + expect(LogToSpanProcessor).toHaveBeenCalled(); + expect(NodeSDK.prototype.start).toHaveBeenCalled(); + }); + + it('should warn and skip startup for gRPC per-signal endpoints without base endpoint', () => { + const diagWarnSpy = vi.spyOn(diag, 'warn').mockImplementation(() => {}); + try { + vi.spyOn(mockConfig, 'getTelemetryOtlpProtocol').mockReturnValue('grpc'); + vi.spyOn(mockConfig, 'getTelemetryOtlpEndpoint').mockReturnValue(''); + vi.spyOn(mockConfig, 'getTelemetryOtlpTracesEndpoint').mockReturnValue( + 'http://traces-host/token/api/otlp/traces', + ); + + initializeTelemetry(mockConfig); + + expect(diagWarnSpy).toHaveBeenCalledWith( + expect.stringContaining('Telemetry SDK startup was skipped'), + ); + expect(NodeSDK.prototype.start).not.toHaveBeenCalled(); + expect(isTelemetrySdkInitialized()).toBe(false); + } finally { + diagWarnSpy.mockRestore(); + } }); it('should use OTLP exporters when target is gcp but useCollector is true', () => { @@ -145,4 +293,34 @@ describe('Telemetry SDK', () => { expect(OTLPMetricExporterHttp).not.toHaveBeenCalled(); expect(NodeSDK.prototype.start).toHaveBeenCalled(); }); + + it('should not register async process shutdown handlers', () => { + const processOnSpy = vi.spyOn(process, 'on'); + try { + initializeTelemetry(mockConfig); + + expect(processOnSpy).not.toHaveBeenCalledWith( + 'SIGTERM', + expect.any(Function), + ); + expect(processOnSpy).not.toHaveBeenCalledWith( + 'SIGINT', + expect.any(Function), + ); + expect(processOnSpy).not.toHaveBeenCalledWith( + 'exit', + expect.any(Function), + ); + } finally { + processOnSpy.mockRestore(); + } + }); + + it('should mark telemetry uninitialized after shutdown', async () => { + initializeTelemetry(mockConfig); + + await shutdownTelemetry(); + + expect(isTelemetrySdkInitialized()).toBe(false); + }); }); diff --git a/packages/core/src/telemetry/sdk.ts b/packages/core/src/telemetry/sdk.ts index 3dba2acc4..52bdb6712 100644 --- a/packages/core/src/telemetry/sdk.ts +++ b/packages/core/src/telemetry/sdk.ts @@ -15,18 +15,9 @@ import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base'; import { NodeSDK } from '@opentelemetry/sdk-node'; import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; import { resourceFromAttributes } from '@opentelemetry/resources'; -import { - BatchSpanProcessor, - ConsoleSpanExporter, -} from '@opentelemetry/sdk-trace-node'; -import { - BatchLogRecordProcessor, - ConsoleLogRecordExporter, -} from '@opentelemetry/sdk-logs'; -import { - ConsoleMetricExporter, - PeriodicExportingMetricReader, -} from '@opentelemetry/sdk-metrics'; +import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-node'; +import { BatchLogRecordProcessor } from '@opentelemetry/sdk-logs'; +import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'; import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'; import type { Config } from '../config/config.js'; import { SERVICE_NAME } from './constants.js'; @@ -37,12 +28,48 @@ import { FileSpanExporter, } from './file-exporters.js'; import { createDebugLogger } from '../utils/debugLogger.js'; +import { LogToSpanProcessor } from './log-to-span-processor.js'; // For troubleshooting, set the log level to DiagLogLevel.DEBUG -diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.INFO); +diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.WARN); + +/** + * Standard OTLP HTTP signal-specific paths per the OpenTelemetry specification. + * gRPC uses service-based routing so no path appending is needed. + */ +const OTLP_SIGNAL_PATHS = { + traces: 'v1/traces', + logs: 'v1/logs', + metrics: 'v1/metrics', +} as const; + +type OtlpSignal = keyof typeof OTLP_SIGNAL_PATHS; + +/** + * Resolve the final URL for an HTTP OTLP exporter. + * + * - If the URL path already ends with the signal-specific path (e.g., /v1/traces), + * use it as-is. This supports explicit full-path configuration. + * - Otherwise, append the signal-specific path to the base URL. + */ +export function resolveHttpOtlpUrl( + baseEndpoint: string, + signal: OtlpSignal, +): string { + const signalPath = OTLP_SIGNAL_PATHS[signal]; + const url = new URL(baseEndpoint); + const normalizedPath = url.pathname.replace(/\/+$/, ''); + if (normalizedPath.endsWith(signalPath)) { + return url.href; + } + // Append the signal path to the URL pathname, preserving query/hash. + url.pathname = normalizedPath + '/' + signalPath; + return url.href; +} let sdk: NodeSDK | undefined; let telemetryInitialized = false; +let telemetryShutdownPromise: Promise | undefined; export function isTelemetrySdkInitialized(): boolean { return telemetryInitialized; @@ -73,6 +100,31 @@ function parseOtlpEndpoint( } } +/** + * Validate a URL string. Returns the URL if valid http(s), undefined otherwise. + * Logs an error for invalid URLs instead of throwing. + */ +function validateUrl(url: string | undefined): string | undefined { + if (!url) return undefined; + try { + const parsed = new URL(url); + if (parsed.protocol !== 'http:' && parsed.protocol !== 'https:') { + diag.error( + `OTLP endpoint must use http or https, got ${parsed.protocol}`, + ); + return undefined; + } + if (!parsed.hostname) { + diag.error('OTLP endpoint missing hostname'); + return undefined; + } + return url; + } catch { + diag.error('Invalid OTLP signal endpoint URL, skipping:', url); + return undefined; + } +} + export function initializeTelemetry(config: Config): void { if (telemetryInitialized || !config.getTelemetryEnabled()) { return; @@ -89,51 +141,97 @@ export function initializeTelemetry(config: Config): void { const otlpProtocol = config.getTelemetryOtlpProtocol(); const parsedEndpoint = parseOtlpEndpoint(otlpEndpoint, otlpProtocol); const telemetryOutfile = config.getTelemetryOutfile(); - const useOtlp = !!parsedEndpoint && !telemetryOutfile; + const hasPerSignalEndpoint = + !!config.getTelemetryOtlpTracesEndpoint() || + !!config.getTelemetryOtlpLogsEndpoint() || + !!config.getTelemetryOtlpMetricsEndpoint(); + const useOtlp = + (!!parsedEndpoint || hasPerSignalEndpoint) && !telemetryOutfile; let spanExporter: | OTLPTraceExporter | OTLPTraceExporterHttp | FileSpanExporter - | ConsoleSpanExporter; + | undefined; let logExporter: | OTLPLogExporter | OTLPLogExporterHttp | FileLogExporter - | ConsoleLogRecordExporter; - let metricReader: PeriodicExportingMetricReader; + | undefined; + let metricReader: PeriodicExportingMetricReader | undefined; + let logToSpanProcessor: LogToSpanProcessor | undefined; if (useOtlp) { if (otlpProtocol === 'http') { - spanExporter = new OTLPTraceExporterHttp({ - url: parsedEndpoint, - }); - logExporter = new OTLPLogExporterHttp({ - url: parsedEndpoint, - }); - metricReader = new PeriodicExportingMetricReader({ - exporter: new OTLPMetricExporterHttp({ - url: parsedEndpoint, - }), - exportIntervalMillis: 10000, - }); + const tracesUrl = validateUrl( + config.getTelemetryOtlpTracesEndpoint() ?? + (parsedEndpoint + ? resolveHttpOtlpUrl(parsedEndpoint, 'traces') + : undefined), + ); + const logsUrl = validateUrl( + config.getTelemetryOtlpLogsEndpoint() ?? + (parsedEndpoint + ? resolveHttpOtlpUrl(parsedEndpoint, 'logs') + : undefined), + ); + const metricsUrl = validateUrl( + config.getTelemetryOtlpMetricsEndpoint() ?? + (parsedEndpoint + ? resolveHttpOtlpUrl(parsedEndpoint, 'metrics') + : undefined), + ); + + debugLogger.debug( + `OTLP HTTP endpoints: traces=${tracesUrl ?? 'none'}, logs=${logsUrl ?? 'none'}, metrics=${metricsUrl ?? 'none'}`, + ); + + if (tracesUrl) { + spanExporter = new OTLPTraceExporterHttp({ url: tracesUrl }); + } + if (logsUrl) { + logExporter = new OTLPLogExporterHttp({ url: logsUrl }); + } else if (tracesUrl) { + // Bridge: no logs endpoint but traces endpoint exists. + // Convert log records to spans. Use a dedicated trace exporter so the + // bridge owns its own forceFlush/shutdown lifecycle. + logToSpanProcessor = new LogToSpanProcessor( + new OTLPTraceExporterHttp({ url: tracesUrl }), + ); + } + if (metricsUrl) { + metricReader = new PeriodicExportingMetricReader({ + exporter: new OTLPMetricExporterHttp({ url: metricsUrl }), + exportIntervalMillis: 10000, + }); + } } else { - // grpc - spanExporter = new OTLPTraceExporter({ - url: parsedEndpoint, - compression: CompressionAlgorithm.GZIP, - }); - logExporter = new OTLPLogExporter({ - url: parsedEndpoint, - compression: CompressionAlgorithm.GZIP, - }); - metricReader = new PeriodicExportingMetricReader({ - exporter: new OTLPMetricExporter({ + // grpc — per-signal endpoints are not supported with gRPC protocol. + if (!parsedEndpoint) { + const warning = + 'Per-signal OTLP endpoints are only supported with HTTP protocol. ' + + 'Set otlpProtocol to "http" or provide a base otlpEndpoint for gRPC. ' + + 'Telemetry SDK startup was skipped because no supported gRPC endpoint was configured.'; + diag.warn(warning); + debugLogger.warn(warning); + return; + } else { + spanExporter = new OTLPTraceExporter({ url: parsedEndpoint, compression: CompressionAlgorithm.GZIP, - }), - exportIntervalMillis: 10000, - }); + }); + logExporter = new OTLPLogExporter({ + url: parsedEndpoint, + compression: CompressionAlgorithm.GZIP, + }); + metricReader = new PeriodicExportingMetricReader({ + exporter: new OTLPMetricExporter({ + url: parsedEndpoint, + compression: CompressionAlgorithm.GZIP, + }), + exportIntervalMillis: 10000, + }); + } } } else if (telemetryOutfile) { spanExporter = new FileSpanExporter(telemetryOutfile); @@ -142,20 +240,18 @@ export function initializeTelemetry(config: Config): void { exporter: new FileMetricExporter(telemetryOutfile), exportIntervalMillis: 10000, }); - } else { - spanExporter = new ConsoleSpanExporter(); - logExporter = new ConsoleLogRecordExporter(); - metricReader = new PeriodicExportingMetricReader({ - exporter: new ConsoleMetricExporter(), - exportIntervalMillis: 10000, - }); } + // If no exporter is configured for a signal, it is silently skipped. sdk = new NodeSDK({ resource, - spanProcessors: [new BatchSpanProcessor(spanExporter)], - logRecordProcessors: [new BatchLogRecordProcessor(logExporter)], - metricReader, + spanProcessors: spanExporter ? [new BatchSpanProcessor(spanExporter)] : [], + logRecordProcessors: logExporter + ? [new BatchLogRecordProcessor(logExporter)] + : logToSpanProcessor + ? [logToSpanProcessor] + : [], + ...(metricReader && { metricReader }), instrumentations: [new HttpInstrumentation()], }); @@ -167,29 +263,28 @@ export function initializeTelemetry(config: Config): void { } catch (error) { debugLogger.error('Error starting OpenTelemetry SDK:', error); } - - process.on('SIGTERM', () => { - shutdownTelemetry(); - }); - process.on('SIGINT', () => { - shutdownTelemetry(); - }); - process.on('exit', () => { - shutdownTelemetry(); - }); } export async function shutdownTelemetry(): Promise { + if (telemetryShutdownPromise) { + return telemetryShutdownPromise; + } if (!telemetryInitialized || !sdk) { return; } + const currentSdk = sdk; const debugLogger = createDebugLogger('OTEL'); - try { - await sdk.shutdown(); - debugLogger.debug('OpenTelemetry SDK shut down successfully.'); - } catch (error) { - debugLogger.error('Error shutting down SDK:', error); - } finally { - telemetryInitialized = false; - } + telemetryShutdownPromise = (async () => { + try { + await currentSdk.shutdown(); + debugLogger.debug('OpenTelemetry SDK shut down successfully.'); + } catch (error) { + debugLogger.error('Error shutting down SDK:', error); + } finally { + telemetryInitialized = false; + sdk = undefined; + telemetryShutdownPromise = undefined; + } + })(); + return telemetryShutdownPromise; }