1. 理解 cNFT 及其事件生命周期

在 Solana 上,压缩 NFT (cNFT) 与传统 NFT 有一个关键区别:
  • 传统 NFT 每个都有自己的铸造地址和代币账户。
  • 压缩 NFT 将数据存储在由相关 cNFT 程序管理的 Merkle 树中。这个单一的链上账户持有 Merkle 树的根,每个 NFT 被表示为该树中的一个叶子。

关键点:

  • cNFT 铸造 添加一个新的叶子。
  • cNFT 转移 更新现有叶子中的所有权数据。
  • cNFT 销毁 从树中移除或使叶子无效。
由于 cNFT 缺乏典型的代币账户,标准的 Solana NFT 跟踪方法(例如,“监视铸造地址”或“订阅代币账户”)将不起作用。相反,你需要关注程序指令或 Merkle 树 账户。

2. 为什么监听 cNFT 事件?

想象一下围绕 cNFT 构建一个市场、钱包或分析仪表板。你需要知道:
  • 何时铸造新的 cNFT。
  • 哪些 cNFT 被转移到用户或从用户转移。
  • 是否有 cNFT 被销毁。
实时接收这些更新可以帮助你保持界面或数据层与链上状态完美同步。结合历史查询,你可以获得 cNFT 活动的完整时间线,从创建到当前状态。

3. 事件监听方法

Helius 提供了三种主要方式来监听 cNFT 事件:
  • 标准 WebSockets:用于基本程序监控的简单持久连接
  • 增强 WebSockets:具有更好交易定位的高级过滤
  • gRPC (Yellowstone):通过 LaserStream 或专用节点实现最大性能和灵活性
根据您的需求进行选择:标准 WebSockets 适合简单性,增强 WebSockets 适合更好的过滤,gRPC 适合企业级应用的最高性能。

3.1 标准 WebSockets

标准 WebSockets
  • 持久连接:您订阅账户或程序,Solana 推送更新。
const WebSocket = require('ws');

// Replace <API_KEY> with your actual API key
const HELIUS_WS_URL = 'wss://mainnet.helius-rpc.com/?api-key=<API_KEY>';
const ws = new WebSocket(HELIUS_WS_URL);

// Keep connection alive with periodic pings
function startPing(ws) {
  setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.ping();
    }
  }, 30000); // Ping every 30 seconds
}

ws.on('open', () => {
  console.log('Connected to Helius WebSocket');
  
  // Start pinging to keep connection alive
  startPing(ws);

  // Subscribe to the Bubblegum program to catch cNFT events
  const subscribeMsg = {
    jsonrpc: '2.0',
    id: 1,
    method: 'programSubscribe',
    params: [
      'BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY', // Bubblegum program ID
      { 
        commitment: 'confirmed',
        encoding: 'jsonParsed'
      }
    ]
  };

  ws.send(JSON.stringify(subscribeMsg));
});

ws.on('message', (rawData) => {
  try {
    const data = JSON.parse(rawData);
    console.log('New cNFT event:', data);
    
    // Check if this is a subscription result or notification
    if (data.method === 'programNotification') {
      console.log('Program notification received:', data.params);
      // Parse for mints/transfers/burns in the instruction data
    }
  } catch (err) {
    console.error('Failed to parse WS message:', err);
  }
});

ws.on('error', (err) => {
  console.error('WebSocket error:', err);
});

ws.on('close', () => {
  console.log('WebSocket closed');
});

3.2 增强 WebSockets

增强 WebSockets
  • 增强 WebSocket,具有高级过滤功能(accountInclude, accountRequired, 等)。
  • 减少解析开销,因为您只会收到与您的地址相关的交易。
  • 实时过滤确保您只获得您关心的交易。
const WebSocket = require('ws');

// Note: No /? in the URL, just ?
const HELIUS_ENHANCED_WS_URL = 'wss://atlas-mainnet.helius-rpc.com?api-key=<API_KEY>';
const ws = new WebSocket(HELIUS_ENHANCED_WS_URL);

ws.on('open', () => {
  console.log('Connected to Enhanced WebSocket');
  
  // Send subscription request
  const subscribeRequest = {
    jsonrpc: '2.0',
    id: 420,
    method: 'transactionSubscribe',
    params: [
      // Filter object
      {
        failed: false,
        vote: false,
        accountInclude: ['MERKLE_TREE_ADDRESS'] // Replace with your Merkle tree address
      },
      // Options object
      {
        commitment: 'confirmed',
        encoding: 'jsonParsed',
        transactionDetails: 'full',
        showRewards: false,
        maxSupportedTransactionVersion: 0
      }
    ]
  };

  ws.send(JSON.stringify(subscribeRequest));
  
  // Keep connection alive with periodic pings
  setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.ping();
      console.log('Ping sent');
    }
  }, 30000); // Ping every 30 seconds
});

ws.on('message', (data) => {
  const messageStr = data.toString('utf8');
  try {
    const payload = JSON.parse(messageStr);
    const result = payload.params?.result;
    
    if (!result) {
      console.log('Subscription confirmation:', payload);
      return;
    }

    console.log('Enhanced cNFT transaction detected!');
    console.log('Signature:', result.signature);
    console.log('Slot:', result.slot);
    
    // Check transaction logs for cNFT operations
    const logs = result.transaction?.meta?.logMessages || [];
    const hasBubblegumLogs = logs.some(log => 
      log.includes('BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY')
    );
    
    if (hasBubblegumLogs) {
      console.log('Bubblegum program interaction detected - likely cNFT operation');
      console.log('Transaction logs:', logs);
    }

    // Access transaction instructions
    const instructions = result.transaction?.transaction?.message?.instructions || [];
    instructions.forEach((instruction, index) => {
      console.log(`Instruction ${index}:`, instruction);
    });

  } catch (err) {
    console.error('Failed to parse Enhanced WS message:', err);
  }
});

ws.on('error', (err) => {
  console.error('Enhanced WebSocket error:', err);
});

ws.on('close', () => {
  console.log('Enhanced WebSocket closed');
});
用于监控特定 cNFT 程序:
// Monitor Bubblegum program directly
const BUBBLEGUM_PROGRAM_ID = 'BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY';

const subscribeRequest = {
  jsonrpc: '2.0',
  id: 420,
  method: 'transactionSubscribe',
  params: [
    {
      failed: false,
      vote: false,
      accountInclude: [BUBBLEGUM_PROGRAM_ID] // Monitor all Bubblegum transactions
    },
    {
      commitment: 'confirmed',
      encoding: 'jsonParsed',
      transactionDetails: 'full',
      maxSupportedTransactionVersion: 0
    }
  ]
};

3.3 Webhooks

Webhooks 允许 Helius 在链上事件发生时通过 HTTP POST 通知您的服务器。非常适合不需要持久连接的情况。
  1. 通过 APISDKDashboard 创建 webhook。
  2. 指定要监控的地址(Merkle 树地址、用户钱包等)。
  3. 接收服务器端点上的交易数据;解析 cNFT 指令。
创建 Webhook(API 示例):
curl -X POST \
  -H "Content-Type: application/json" \
  -d '{
    "webhookURL": "https://myapp.com/cnft-webhook",
    "transactionTypes": ["ANY"],
    "accountAddresses": ["MERKLE_TREE_ADDRESS"],
    "webhookType": "enhanced",
    "authHeader": "Bearer your-auth-token",
    "txnStatus": "all",
    "encoding": "jsonParsed"
  }' \
  "https://api.helius.xyz/v0/webhooks?api-key=<YOUR_API_KEY>"
JavaScript 示例:
const fetch = require('node-fetch');

async function createCNFTWebhook() {
  const webhookData = {
    webhookURL: 'https://myapp.com/cnft-webhook',
    transactionTypes: ['ANY'], // Monitor all transaction types
    accountAddresses: ['MERKLE_TREE_ADDRESS'], // Replace with your Merkle tree address
    webhookType: 'enhanced', // Get parsed transaction data
    authHeader: 'Bearer your-auth-token', // Optional: secure your webhook
    txnStatus: 'all', // Monitor both success and failed transactions
    encoding: 'jsonParsed'
  };

  const response = await fetch('https://api.helius.xyz/v0/webhooks?api-key=<YOUR_API_KEY>', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
    },
    body: JSON.stringify(webhookData),
  });

  if (!response.ok) {
    throw new Error(`HTTP error! status: ${response.status}`);
  }

  const webhook = await response.json();
  console.log('Webhook created:', webhook);
  return webhook;
}

createCNFTWebhook().catch(console.error);
Webhook 负载结构: 当您的 webhook 端点收到通知时,它将包含如下结构化数据:
{
  "accountData": [
    {
      "account": "MERKLE_TREE_ADDRESS",
      "nativeBalanceChange": 0,
      "tokenBalanceChanges": []
    }
  ],
  "description": "Compressed NFT Mint",
  "events": {
    "compressed": {
      "type": "COMPRESSED_NFT_MINT",
      "treeId": "MERKLE_TREE_ADDRESS",
      "assetId": "...",
      "leafIndex": 12,
      "instructionIndex": 1,
      "newLeafOwner": "UserWalletAddress"
    }
  },
  "fee": 5000,
  "feePayer": "PAYER_ADDRESS",
  "signature": "TRANSACTION_SIGNATURE",
  "slot": 1234567,
  "timestamp": 1234567890000,
  "type": "COMPRESSED_NFT_MINT"
}

3.4 gRPC (Yellowstone)

gRPC 是最灵活且高性能的事件监听解决方案,可通过 LaserStreamDedicated Nodes 使用。
  • 高级过滤(memcmp,所有者,账户等)。
  • 企业级吞吐量,适用于大规模应用。
  • LaserStream:具有自动故障转移和历史重播的多租户 gRPC 服务。
  • Dedicated Nodes:专属 gRPC 端点,保证资源隔离。
完整示例(具有强大流管理的 TypeScript):
# First install dependencies
npm install @triton-one/yellowstone-grpc
npm install typescript @types/node --save-dev
import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";

class CNFTStreamManager {
  private client: Client;
  private stream: any;
  private isConnected = false;
  private reconnectAttempts = 0;
  private readonly maxReconnectAttempts = 10;
  private readonly baseReconnectDelay = 1000;

  constructor(
    private endpoint: string,
    private apiKey: string,
    private onCNFTEvent: (data: any) => void
  ) {
    this.client = new Client(endpoint, apiKey, {
      "grpc.max_receive_message_length": 64 * 1024 * 1024
    });
  }

  async connect(merkleTreeAddress: string): Promise<void> {
    try {
      console.log(`Connecting to gRPC endpoint: ${this.endpoint}`);
      
      const subscribeRequest: SubscribeRequest = {
        commitment: CommitmentLevel.CONFIRMED,
        accounts: {
          merkleTreeAccount: {
            account: [merkleTreeAddress],
            owner: [],
            filters: []
          }
        },
        accountsDataSlice: [],
        transactions: {
          cnftTransactions: {
            accountInclude: [merkleTreeAddress],
            accountExclude: [],
            accountRequired: [],
            vote: false,
            failed: false
          }
        },
        blocks: {},
        blocksMeta: {},
        entry: {},
        slots: {},
        transactionsStatus: {},
        ping: { id: 1 } // Keep connection alive
      };

      this.stream = await this.client.subscribe();
      
      this.stream.on("data", (data: any) => {
        if (data.account) {
          console.log("Merkle tree account update:", data.account);
          this.onCNFTEvent({
            type: 'account_update',
            account: data.account
          });
        }
        
        if (data.transaction) {
          console.log("Transaction involving cNFT:", data.transaction.signature);
          this.onCNFTEvent({
            type: 'transaction',
            transaction: data.transaction
          });
        }
      });

      this.stream.on("error", (error: any) => {
        console.error("Stream error:", error);
        this.handleReconnect();
      });

      this.stream.on("close", () => {
        console.log("Stream closed");
        this.isConnected = false;
      });

      // Send subscription request
      await this.writeRequest(subscribeRequest);
      this.isConnected = true;
      this.reconnectAttempts = 0;
      
      console.log("Successfully connected to gRPC stream");
      
    } catch (error) {
      console.error("Failed to connect:", error);
      this.handleReconnect();
    }
  }

  private async writeRequest(request: SubscribeRequest): Promise<void> {
    return new Promise((resolve, reject) => {
      this.stream.write(request, (err: any) => {
        if (err) {
          reject(err);
        } else {
          resolve();
        }
      });
    });
  }

  private async handleReconnect(): Promise<void> {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error("Max reconnection attempts reached");
      return;
    }

    this.reconnectAttempts++;
    const delay = this.baseReconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
    
    console.log(`Reconnecting in ${delay}ms... (attempt ${this.reconnectAttempts})`);
    
    setTimeout(() => {
      this.connect("MERKLE_TREE_ADDRESS").catch(console.error);
    }, delay);
  }

  disconnect(): void {
    if (this.stream) {
      this.stream.end();
    }
    this.client.close();
    this.isConnected = false;
  }
}

// Usage example
async function monitorCNFTTree() {
  const endpoint = "https://laserstream-mainnet-ewr.helius-rpc.com"; // Choose your region
  const apiKey = "your-api-key";
  const merkleTreeAddress = "MERKLE_TREE_ADDRESS";

  const manager = new CNFTStreamManager(
    endpoint,
    apiKey,
    (eventData) => {
      console.log("cNFT Event received:", eventData);
      
      // Process different event types
      if (eventData.type === 'account_update') {
        console.log("Merkle tree state changed");
        // Handle account update - tree root may have changed
      } else if (eventData.type === 'transaction') {
        console.log("Transaction affecting cNFT detected");
        // Parse transaction for mint/transfer/burn operations
      }
    }
  );

  await manager.connect(merkleTreeAddress);
  
  // Keep running until interrupted
  process.on('SIGINT', () => {
    console.log("Shutting down...");
    manager.disconnect();
    process.exit(0);
  });
}

monitorCNFTTree().catch(console.error);

4. 检索历史 cNFT 数据

实时事件馈送非常适合捕获未来事件。但如果您需要过去——cNFT 的整个生命周期或影响 Merkle 树或钱包的所有交易,该怎么办? 在本节中,我们将探讨历史查询的种主要方法:
  1. Helius 的解析交易 API
  2. 常规 Solana RPC 调用:getSignaturesForAddress + getParsedTransactiongetTransaction

4.1 Helius 的增强交易 API

Helius 提供了一个增强的交易 API。它会自动将 NFT、SPL 和 Swap 交易解码为人类可读的格式。这使您无需手动解析原始数据。 4.1.1 单个或批量交易(/v0/transactions) 端点
  • Mainnet: https://api.helius.xyz/v0/transactions?api-key=<YOUR_API_KEY>
  • Devnet: https://api-devnet.helius.xyz/v0/transactions?api-key=<YOUR_API_KEY>
您可以在请求体中发送最多 100 个交易签名,Helius 返回一个数组的解析交易。 示例
async function parseMultipleTransactions(signatures) {
  const url = 'https://api.helius.xyz/v0/transactions?api-key=<YOUR_API_KEY>';

  const response = await fetch(url, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ 
      transactions: signatures,
      commitment: 'confirmed' // Optional: specify commitment level
    })
  });

  if (!response.ok) {
    throw new Error(`HTTP error! status: ${response.status}`);
  }

  const parsedTxs = await response.json();
  
  // Filter for cNFT-related transactions
  const cnftTransactions = parsedTxs.filter(tx => {
    return tx.events && tx.events.compressed;
  });

  console.log("Parsed cNFT transactions:", JSON.stringify(cnftTransactions, null, 2));
  return cnftTransactions;
}

// Usage with error handling
parseMultipleTransactions([
  "5rfFLBUp5YPr6rC2g1KBBW8LGZBcZ8Lvs7gKAdgrBjmQvFf6EKkgc5cpAQUTwGxDJbNqtLYkjV5vS5zVK4tb6JtP",
  "4jzQxVTaJ4Fe4Fct9y1aaT9hmVyEjpCqE2bL8JMnuLZbzHZwaL4kZZvNEZ6bEj6fGmiAdCPjmNQHCf8v994PAgDf"
]).catch(console.error);
在每个解析的交易中,您可能会在 events 下找到一个 compressed 对象,指示 cNFT 的铸造、转移或销毁:
"compressed": {
  "type": "COMPRESSED_NFT_MINT",
  "treeId": "MERKLE_TREE_ADDRESS",
  "assetId": "...",
  "leafIndex": 12,
  "instructionIndex": 1,
  "newLeafOwner": "UserWalletAddress",
  ...
}
4.1.2 按地址解析交易(/v0/addresses/{address}/transactions) 如果您想要获取特定地址的解析交易,例如Merkle树用户钱包,您可以调用:
  • Mainnet: https://api.helius.xyz/v0/addresses/{address}/transactions?api-key=<YOUR_API_KEY>
  • Devnet: https://api-devnet.helius.xyz/v0/addresses/{address}/transactions?api-key=<YOUR_API_KEY>
示例
async function getParsedHistoryForAddress(address) {
  // Add pagination support and error handling
  const url = `https://api.helius.xyz/v0/addresses/${address}/transactions?api-key=<YOUR_API_KEY>&limit=50`;

  const response = await fetch(url);
  if (!response.ok) {
    throw new Error(`HTTP error! status: ${response.status}`);
  }

  const parsedHistory = await response.json();
  
  // Filter for cNFT-related transactions
  const cnftTransactions = parsedHistory.filter(tx => {
    return tx.events && tx.events.compressed;
  });

  console.log("Parsed cNFT history:", JSON.stringify(cnftTransactions, null, 2));

  // Example of pagination - get next page if needed
  if (parsedHistory.length === 50) {
    const lastSignature = parsedHistory[parsedHistory.length - 1].signature;
    console.log(`More transactions available. Use before=${lastSignature} for next page`);
  }

  return cnftTransactions;
}

// Usage with error handling
getParsedHistoryForAddress("MERKLE_TREE_ADDRESS_OR_USER_WALLET").catch(console.error);

4.2 常规方法:getSignaturesForAddress + getParsedTransaction / getTransaction

如果您更喜欢传统的Solana方法或希望获得最大控制权,您可以调用Solana的本地RPC方法:
  1. getSignaturesForAddress:返回涉及给定地址的交易签名数组(例如,Merkle树或用户的钱包)。
  2. getParsedTransaction:返回给定签名的Solana解析JSON。
  3. getTransaction:返回原始二进制编码的交易,您可以使用外部库(例如,Blockbuster)进行解析,如果您需要专门的cNFT解码。
4.2.1 getSignaturesForAddress 这是一种支持分页的方法。您可以传递beforeuntil参数。 示例
async function fetchSignatures(address, limit = 10) {
  const rpcUrl = "https://mainnet.helius-rpc.com/?api-key=<YOUR_API_KEY>";
  const body = {
    jsonrpc: "2.0",
    id: 1,
    method: "getSignaturesForAddress",
    params: [
      address,
      { 
        limit: limit,
        commitment: "confirmed"
      }
    ]
  };

  const response = await fetch(rpcUrl, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(body)
  });

  if (!response.ok) {
    throw new Error(`HTTP error! status: ${response.status}`);
  }

  const data = await response.json();
  
  if (data.error) {
    throw new Error(`RPC error: ${data.error.message}`);
  }

  console.log("Signatures for address:", data.result);
  return data.result;
}

// Usage with error handling
fetchSignatures("MERKLE_TREE_ADDRESS").catch(console.error);
4.2.2 getParsedTransactiongetTransaction 一旦您获得签名,检索每个交易的详细信息:
async function fetchTransaction(signature, encoding = "jsonParsed") {
  const rpcUrl = "https://mainnet.helius-rpc.com/?api-key=<YOUR_API_KEY>";
  const body = {
    jsonrpc: "2.0",
    id: 1,
    method: "getTransaction",
    params: [
      signature, 
      {
        encoding: encoding,
        commitment: "confirmed",
        maxSupportedTransactionVersion: 0
      }
    ]
  };

  const response = await fetch(rpcUrl, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(body)
  });

  if (!response.ok) {
    throw new Error(`HTTP error! status: ${response.status}`);
  }

  const data = await response.json();
  
  if (data.error) {
    throw new Error(`RPC error: ${data.error.message}`);
  }

  console.log("Transaction:", JSON.stringify(data.result, null, 2));
  return data.result;
}

// Usage:
fetchTransaction("TransactionSignature").catch(console.error);
查找引用cNFT程序或Merkle树的指令。如果您使用getTransaction,您将获得原始数据(例如,base64),您需要使用专门的解析器进行解码。

5. 综合应用

  1. 监听cNFT事件
    • 选择一种方法:WebSockets、Webhooks或gRPC。
    • 过滤cNFT程序或Merkle树地址。
    • 实时解析指令以跟踪铸造、转移和销毁。
  2. 检索历史数据
    • Helius增强交易API:获取cNFT操作的人类可读分解的最简单方法。
    • 常规RPCgetSignaturesForAddress + getParsedTransaction(或getTransaction + 手动解析)以获得最大灵活性,或如果您已经依赖于标准Solana RPC调用。
  3. 构建完整时间线
    • 合并未来(实时)事件与过去(历史)数据。
    • 如果您的事件监听解决方案出现故障,通过获取您的Merkle树或用户地址的最近交易签名来填补空白。

6. 下一步和最佳实践

  • 利用 Helius: Enhanced Transaction API 特别适合于需要以简单 JSON 格式获取 cNFT 事件(铸造、转移、销毁)的情况。
  • 分页: 对于活动频繁的地址,您可能需要使用 beforeuntil 进行迭代以获取较旧的数据。
  • 验证: 为了额外的安全性,您可以验证 Merkle 证明以确认 cNFT 叶子在链上根下是有效的。
  • 索引: 如果您正在构建一个大规模解决方案,考虑将解析的 cNFT 事件存储在您自己的数据库中以便快速查询。
  • 性能: 对于高容量事件监听,gRPC(通过 LaserStream 或 Dedicated Nodes)提供了顶级性能和高级过滤器。
  • 错误处理: 始终为生产应用程序实现适当的错误处理和重试逻辑。
  • 连接管理: 对于 WebSocket 连接,实施心跳/ ping 机制以维持连接。
  • 速率限制: 注意 API 速率限制,并在您的应用程序中实现适当的节流。
祝您构建愉快!