增强型 WebSocket
为什么选择增强型 WebSocket – 快速的 JSON 流,包含解码的账户和交易。无需自定义网络堆栈。
可用于商业及更高计划。
工作原理 – 连接到 Atlas 端点,订阅 Pump AMM 交易,并监听更新。示例重试五次,采用指数退避策略。
要求
- Node.js ≥ 18(已在 v20 上测试)
- 如果计划使用
ts‑node运行.ts示例,则需要 TypeScript ≥ 5 - 需要 Helius 商业计划或更高
- 一个名为
HELIUS_API_KEY的环境变量,用于存储您的 API 密钥
全局安装依赖项:
npm i -g typescript ts‑node实施
1
安装依赖项
报告错误代码
复制
询问AI
npm install ws
2
创建 WebSocket 客户端
创建一个名为
enhanced-ws-pump.ts 的文件,并包含以下代码:报告错误代码
复制
询问AI
// enhanced-ws-pump.ts
import WebSocket from 'ws';
// Configuration for reconnection
const MAX_RETRIES = 5;
const INITIAL_RETRY_DELAY = 1000; // 1 second
let retryCount = 0;
let retryDelay = INITIAL_RETRY_DELAY;
// Function to create a new WebSocket connection
function createWebSocket() {
return new WebSocket(`wss://atlas-mainnet.helius-rpc.com/?api-key=${process.env.HELIUS_API_KEY}`);
}
// Function to send a request to the WebSocket server
function sendRequest(ws: WebSocket) {
const request = {
jsonrpc: "2.0",
id: 420,
method: "transactionSubscribe",
params: [
{
accountInclude: ["pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA"]
},
{
commitment: "processed",
encoding: "jsonParsed",
transactionDetails: "full",
maxSupportedTransactionVersion: 0
}
]
};
ws.send(JSON.stringify(request));
}
// Function to send a ping to the WebSocket server
function startPing(ws: WebSocket) {
return setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.ping();
console.log('Ping sent');
}
}, 30000); // Ping every 30 seconds
}
// Function to handle reconnection
function reconnect() {
if (retryCount >= MAX_RETRIES) {
console.error('Maximum retry attempts reached.');
return;
}
console.log(`Attempting to reconnect in ${retryDelay/1000} seconds... (Attempt ${retryCount + 1}/${MAX_RETRIES})`);
setTimeout(() => {
retryCount++;
retryDelay *= 2; // Exponential backoff
initializeWebSocket();
}, retryDelay);
}
// Function to initialize WebSocket with all event handlers
function initializeWebSocket() {
const ws = createWebSocket();
let pingInterval: NodeJS.Timeout;
ws.on('open', function open() {
console.log('WebSocket is open');
retryCount = 0; // Reset retry count on successful connection
retryDelay = INITIAL_RETRY_DELAY; // Reset retry delay
sendRequest(ws);
pingInterval = startPing(ws);
});
ws.on('message', function incoming(data: WebSocket.Data) {
const messageStr = data.toString('utf8');
try {
const messageObj = JSON.parse(messageStr);
// Check if it's a subscription confirmation
if (messageObj.result !== undefined) {
console.log('Subscription confirmed:', messageObj);
return;
}
// Check if it's transaction data
if (messageObj.params && messageObj.params.result) {
const transaction = messageObj.params.result;
console.log('Received transaction:', JSON.stringify(transaction, null, 2));
}
} catch (e) {
console.error('Failed to parse JSON:', e);
}
});
ws.on('error', function error(err: Error) {
console.error('WebSocket error:', err);
});
ws.on('close', function close() {
console.log('WebSocket is closed');
if (pingInterval) {
clearInterval(pingInterval);
}
reconnect();
});
}
// Start the WebSocket connection
initializeWebSocket();
// Handle program termination
process.on('SIGINT', () => {
console.log('Shutting down...');
process.exit(0);
});
3
设置环境变量
将您的 Helius API 密钥添加为环境变量:用您在仪表板中实际的 Helius API 密钥替换
报告错误代码
复制
询问AI
export HELIUS_API_KEY=your-helius-api-key
your-helius-api-key。4
运行应用程序
执行脚本以开始流式传输 Pump AMM 数据:您将在终端中看到解析后的 Pump AMM 交易。当套接字关闭时,客户端会自动重试。
报告错误代码
复制
询问AI
npx ts-node enhanced-ws-pump.ts
主要优势
- 浏览器兼容 - WebSocket 协议在 Node.js 和浏览器环境中均可使用
- 丰富的数据 - 获取完全解析的交易对象,包含解码的指令和账户
- 简单的实现 - 除了标准的 WebSocket 客户端外,不需要特殊的库
- 自动重连 - 内置重试逻辑确保稳定连接
常见问题和解决方案
401 Unauthorized
401 Unauthorized
验证您的HELIUS_API_KEY是否正确。
No logs received
No logs received
确保Pump AMM程序地址正确,并且程序上有活动。
Connection dropping
Connection dropping
实施更强大的重连逻辑或检查网络稳定性。
下一步
1
创建UI仪表板
使用React或Vue.js构建一个网页界面,以实时可视化传入的Pump AMM交易。
2
实现数据库存储
将交易数据存储在MongoDB或PostgreSQL等数据库中以进行历史分析:
报告错误代码
复制
询问AI
import { MongoClient } from 'mongodb';
// Setup MongoDB connection
async function setupDatabase() {
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
return client.db('pump-amm').collection('transactions');
}
// Then in your message handler:
ws.on('message', async function incoming(data: WebSocket.Data) {
const messageStr = data.toString('utf8');
try {
const messageObj = JSON.parse(messageStr);
if (messageObj.params && messageObj.params.result) {
const transaction = messageObj.params.result;
// Store in database
const collection = await setupDatabase();
await collection.insertOne({
timestamp: new Date(),
transaction: transaction
});
console.log('Transaction stored in database');
}
} catch (e) {
console.error('Failed to process message:', e);
}
});
3
设置警报系统
使用Discord webhooks等服务配置高价值交易或特定模式的警报:
报告错误代码
复制
询问AI
import axios from 'axios';
// Send alert to Discord webhook
async function sendAlert(message: string) {
await axios.post('YOUR_DISCORD_WEBHOOK_URL', {
content: message
});
}
// Then in your message handler:
if (messageObj.params && messageObj.params.result) {
const transaction = messageObj.params.result;
// Example: Check for transactions above a certain value
const isHighValue = checkIfHighValueTransaction(transaction);
if (isHighValue) {
sendAlert(`High-value transaction detected: ${transaction.signature}`);
}
}
4
实现心跳监控
添加一个更强大的心跳系统以确保持续连接:
报告错误代码
复制
询问AI
// Enhanced heartbeat system
function setupHeartbeat(ws: WebSocket) {
let lastPongTime = Date.now();
// Send ping regularly
const pingInterval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.ping();
}
}, 30000);
// Track pong responses
ws.on('pong', () => {
lastPongTime = Date.now();
});
// Check connection health
const healthCheck = setInterval(() => {
const now = Date.now();
if (now - lastPongTime > 90000) { // No pong for 90 seconds
console.warn('Connection seems unresponsive, reconnecting...');
ws.terminate();
clearInterval(pingInterval);
clearInterval(healthCheck);
}
}, 30000);
return { pingInterval, healthCheck };
}