Data Streams SDK (TypeScript)

Data Streams SDKs

Choose the SDK version that matches your needs.

The Data Streams SDK for accessing Chainlink Data Streams with real-time streaming and historical data retrieval.

Requirements

  • Node.js >= 20.0.0
  • TypeScript >= 5.3.x
  • Valid Chainlink Data Streams credentials

Features

Installation

npm install @chainlink/data-streams-sdk

Configuration

Configuration Interface

interface Config {
  // Required
  apiKey: string // API key for authentication
  userSecret: string // User secret for authentication
  endpoint: string // REST API URL
  wsEndpoint: string // WebSocket URL

  // Optional - Request & Retry
  timeout?: number // Request timeout (default: 30000ms)
  retryAttempts?: number // Retry attempts (default: 3)
  retryDelay?: number // Retry delay (default: 1000ms)

  // Optional - High Availability
  haMode?: boolean // Enable HA mode (default: false)
  haConnectionTimeout?: number // HA connection timeout (default: 10000ms)
  connectionStatusCallback?: (isConnected: boolean, host: string, origin: string) => void

  // Optional - Logging
  logging?: LoggingConfig // See Logging Configuration section
}

Basic Usage

const client = createClient({
  apiKey: process.env.API_KEY,
  userSecret: process.env.USER_SECRET,
  endpoint: "https://api.dataengine.chain.link",
  wsEndpoint: "wss://ws.dataengine.chain.link",
})

High Availability Example

const haClient = createClient({
  apiKey: process.env.API_KEY,
  userSecret: process.env.USER_SECRET,
  endpoint: "https://api.dataengine.chain.link", // Mainnet only
  wsEndpoint: "wss://ws.dataengine.chain.link", // Single endpoint with origin discovery
  haMode: true,
})

Note: High Availability mode is only available on mainnet, not testnet.

Examples

Quick Commands:

# Real-time streaming
npx ts-node examples/stream-reports.ts 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782

# High Availability streaming
npx ts-node examples/stream-reports.ts 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782 --ha

# Get latest report
npx ts-node examples/get-latest-report.ts 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782

# List all available feeds
npx ts-node examples/list-feeds.ts

Complete examples See the SDK repo examples for detailed usage and setup. Available examples include:

  • Streaming: Basic streaming, HA mode, metrics monitoring
  • REST API: Latest reports, historical data, bulk operations, feed management
  • Configuration: Logging setup, debugging, monitoring integration

API Reference

Streaming

// Create stream
const stream = client.createStream(feedIds, options?);

// Events
stream.on('report', (report) => { ... });
stream.on('error', (error) => { ... });
stream.on('disconnected', () => { ... });
stream.on('reconnecting', (info) => { ... });

// Control
await stream.connect();
await stream.close();

// Metrics
const metrics = stream.getMetrics();

Stream Options

interface StreamOptions {
  maxReconnectAttempts?: number // Default: 5
  // Base delay (in ms) for exponential backoff.
  // Actual delay grows as: base * 2^(attempt-1) with jitter, capped at 10000ms.
  // Default: 1000ms; user-provided values are clamped to the safe range [200ms, 10000ms].
  reconnectInterval?: number
}

REST API

// Get feeds
const feeds = await client.listFeeds();

// Get latest report
const report = await client.getLatestReport(feedId);

// Get historical report
const report = await client.getReportByTimestamp(feedId, timestamp);

// Get report page
const reports = await client.getReportsPage(feedId, startTime, limit?);

// Get bulk reports
const reports = await client.getReportsBulk(feedIds, timestamp);

Report Format

Quick Decoder Usage

import { decodeReport } from "@chainlink/data-streams-sdk"
const decoded = decodeReport(report.fullReport, report.feedID)

Schema Auto-Detection

The SDK automatically detects and decodes all report versions based on Feed ID patterns:

  • V2: Feed IDs starting with 0x0002
  • V3: Feed IDs starting with 0x0003 (Crypto Streams)
  • V4: Feed IDs starting with 0x0004 (Real-World Assets)
  • V5: Feed IDs starting with 0x0005
  • V6: Feed IDs starting with 0x0006 (Multiple Price Values)
  • V7: Feed IDs starting with 0x0007
  • V8: Feed IDs starting with 0x0008 (Non-OTC RWA)
  • V9: Feed IDs starting with 0x0009 (NAV Fund Data)
  • V10: Feed IDs starting with 0x000a (Tokenized Equity)

Common Fields

All reports include standard metadata:

interface BaseFields {
  version: "V2" | "V3" | "V4" | "V5" | "V6" | "V7" | "V8" | "V9" | "V10"
  nativeFee: bigint
  linkFee: bigint
  expiresAt: number
  feedID: string
  validFromTimestamp: number
  observationsTimestamp: number
}

Schema-Specific Fields

  • V2/V3/V4: price: bigint - Standard price data
  • V3: bid: bigint, ask: bigint - Crypto bid/ask spreads
  • V4: marketStatus: MarketStatus - Real-world asset market status
  • V5: rate: bigint, timestamp: number, duration: number - Interest rate data with observation timestamp and duration
  • V6: price: bigint, price2: bigint, price3: bigint, price4: bigint, price5: bigint - Multiple price values in a single payload
  • V7: exchangeRate: bigint - Exchange rate data
  • V8: midPrice: bigint, lastUpdateTimestamp: number, marketStatus: MarketStatus - Non-OTC RWA data
  • V9: navPerShare: bigint, navDate: number, aum: bigint, ripcord: number - NAV fund data
  • V10: price: bigint, lastUpdateTimestamp: number, marketStatus: MarketStatus, currentMultiplier: bigint, newMultiplier: bigint, activationDateTime: number, tokenizedPrice: bigint - Tokenized equity data

For complete field definitions, see the complete list of available reports and their schemas.

High Availability Mode

HA mode establishes multiple simultaneous connections for zero-downtime operation:

  • Automatic failover between connections
  • Report deduplication across connections
  • Automatic origin discovery to find available endpoints
  • Per-connection monitoring and statistics
const client = createClient({
  // ...config
  haMode: true,
  wsEndpoint: "wss://ws.dataengine.chain.link", // Single endpoint (mainnet only)
})

How it works: When haMode is true, the SDK automatically discovers multiple origin endpoints behind the single URL and establishes separate connections to each origin.

Connection monitoring: The optional connectionStatusCallback can be used to integrate with external monitoring systems. The SDK already provides comprehensive connection logs, so this callback is primarily useful for custom alerting or metrics collection. See examples/metrics-monitoring.ts for a complete implementation example.

Important: HA mode is only available on mainnet endpoints.

Error Handling

Error Types Overview

Error TypeWhen ThrownKey Properties
ValidationErrorInvalid feed IDs, timestamps, parametersmessage
AuthenticationErrorInvalid credentials, HMAC failuresmessage
APIErrorHTTP 4xx/5xx, network timeouts, rate limitsstatusCode, message
ReportDecodingErrorCorrupted report data, unsupported versionsmessage
WebSocketErrorConnection failures, protocol errorsmessage
OriginDiscoveryErrorHA discovery failurescause, message
MultiConnectionErrorAll HA connections failedmessage
PartialConnectionFailureErrorSome HA connections failedfailedConnections, totalConnections
InsufficientConnectionsErrorHA degraded performanceavailableConnections, requiredConnections

Usage Examples

import {
  ValidationError,
  AuthenticationError,
  APIError,
  ReportDecodingError,
  WebSocketError,
  OriginDiscoveryError,
  MultiConnectionError,
} from "./src"

// REST API error handling
try {
  const report = await client.getLatestReport(feedId)
} catch (error) {
  if (error instanceof ValidationError) {
    // Invalid feed ID or parameters
  } else if (error instanceof AuthenticationError) {
    // Check API credentials
  } else if (error instanceof APIError) {
    // Server error - check error.statusCode (429, 500, etc.)
  } else if (error instanceof ReportDecodingError) {
    // Corrupted or unsupported report format
  }
}

// Streaming error handling
stream.on("error", (error) => {
  if (error instanceof WebSocketError) {
    // Connection issues - retry or fallback
  } else if (error instanceof OriginDiscoveryError) {
    // HA discovery failed - falls back to static config
  } else if (error instanceof MultiConnectionError) {
    // All HA connections failed - critical
  }
})

Catch-all error handling:

import { DataStreamsError } from "./src"

try {
  // Any SDK operation
} catch (error) {
  if (error instanceof DataStreamsError) {
    // Handles ANY SDK error (base class for all error types above)
    console.log("SDK error:", error.message)
  } else {
    // Non-SDK error (network, system, etc.)
    console.log("System error:", error)
  }
}

Observability (Logs & Metrics)

The SDK is designed to plug into your existing observability stack.

Logging (Pino/Winston/Console)

Pass your logger to the SDK and choose a verbosity level. For deep WS diagnostics, enable connection debug.

Quick Start

import { createClient, LogLevel } from "@chainlink/data-streams-sdk"

// Silent mode (default) - Zero overhead
const client = createClient({
  /* ... config without logging */
})

// Basic console logging
const client = createClient({
  // ... other config
  logging: {
    logger: {
      info: console.log,
      warn: console.warn,
      error: console.error,
    },
  },
})

Using Pino (structured JSON):

import pino from "pino"
import { createClient, LogLevel } from "@chainlink/data-streams-sdk"

const root = pino({ level: process.env.PINO_LEVEL || "info" })
const sdk = root.child({ component: "sdk" })

const client = createClient({
  // ...config
  logging: {
    logger: {
      info: sdk.info.bind(sdk),
      warn: sdk.warn.bind(sdk),
      error: sdk.error.bind(sdk),
      debug: sdk.debug.bind(sdk),
    },
    logLevel: LogLevel.INFO,
    // For very verbose WS diagnostics, set DEBUG + enableConnectionDebug
    // logLevel: LogLevel.DEBUG,
    // enableConnectionDebug: true,
  },
})

Command-line with pretty output:

PINO_LEVEL=info npx ts-node examples/metrics-monitoring.ts | npx pino-pretty

Log Levels

๐Ÿ”ด ERROR

Critical failures only

  • Authentication failures
  • Network connection errors
  • Report decoding failures
  • API request failures
  • Unexpected crashes

Example Use: Production alerts & monitoring


๐ŸŸก WARN

Everything in ERROR +

  • Partial reconnections
  • Fallback to static origins
  • Retry attempts
  • Connection timeouts
  • Invalid data warnings

Example Use: Production environments


๐Ÿ”ต INFO

Everything in WARN +

  • Client initialization
  • Successful API calls
  • Stream connections
  • Report retrievals
  • Connection status changes
  • Connection mode determination

Example Use: Development & staging


๐Ÿ” DEBUG

Everything in INFO +

  • Feed ID validation
  • Report decoding steps
  • Auth header generation
  • Request/response details
  • WebSocket ping/pong
  • Origin discovery process
  • Configuration validation
  • Origin tracking (HA mode)

Example Use: Debugging & development only

Logging Configuration Options

interface LoggingConfig {
  /** External logger functions (console, winston, pino, etc.) */
  logger?: {
    debug?: (message: string, ...args: any[]) => void
    info?: (message: string, ...args: any[]) => void
    warn?: (message: string, ...args: any[]) => void
    error?: (message: string, ...args: any[]) => void
  }

  /** Minimum logging level - filters out lower priority logs */
  logLevel?: LogLevel // DEBUG (0) | INFO (1) | WARN (2) | ERROR (3)

  /** Enable WebSocket ping/pong and connection state debugging logs */
  enableConnectionDebug?: boolean
}

Compatible with: console, winston, pino, and any logger with debug/info/warn/error methods. See examples/logging-basic.ts for complete integration examples.

For debugging: Use LogLevel.DEBUG for full diagnostics and enableConnectionDebug: true to see WebSocket ping/pong messages and connection state transitions.

Origin tracking in HA mode shows which specific endpoint received each report.

Metrics (stream.getMetrics())

The stream.getMetrics() API provides a complete snapshot for dashboards and alerts:

const m = stream.getMetrics()
// m.accepted, m.deduplicated, m.totalReceived
// m.partialReconnects, m.fullReconnects
// m.activeConnections, m.configuredConnections
// m.originStatus: { [origin]: ConnectionStatus }

Simple periodic print (example):

setInterval(() => {
  const m = stream.getMetrics()
  console.log(`accepted=${m.accepted} dedup=${m.deduplicated} active=${m.activeConnections}/${m.configuredConnections}`)
}, 30000)

Refer to examples/metrics-monitoring.ts for a full metrics dashboard example.

Testing

npm test                 # All tests
npm run test:unit        # Unit tests only
npm run test:integration # Integration tests only

Feed IDs

For available feed IDs, select your desired report from the report schema overview.

What's next

Get the latest Chainlink content straight to your inbox.