Skip to content
Algorand Developer Portal

Watermark Persistence

← Back to Examples

This example demonstrates file-backed watermark persistence across polls.

  • Implement get/set callbacks for file-based watermark storage
  • Verify watermark advances after each poll
  • Demonstrate at-least-once delivery semantics
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
npx tsx 11-watermark-persistence.ts

View source on GitHub

11-watermark-persistence.ts
/**
* Example: Watermark Persistence
*
* This example demonstrates file-backed watermark persistence across polls.
* - Implement get/set callbacks for file-based watermark storage
* - Verify watermark advances after each poll
* - Demonstrate at-least-once delivery semantics
*
* Prerequisites:
* - LocalNet running (via `algokit localnet start`)
*/
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils';
import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber';
import {
printHeader,
printStep,
printInfo,
printSuccess,
printError,
shortenAddress,
formatAlgo,
} from './shared/utils.js';
async function main() {
printHeader('11 — Watermark Persistence');
// Step 1: Connect to LocalNet
printStep(1, 'Connect to LocalNet');
const algorand = AlgorandClient.defaultLocalNet();
const status = await algorand.client.algod.status();
printInfo(`Current round: ${status.lastRound.toString()}`);
printSuccess('Connected to LocalNet');
// Step 2: Create and fund accounts
printStep(2, 'Create and fund accounts');
const sender = await algorand.account.fromEnvironment('WM_SENDER', algo(100));
const receiver = await algorand.account.fromEnvironment('WM_RECEIVER', algo(10));
const senderAddr = sender.addr.toString();
const receiverAddr = receiver.addr.toString();
printInfo(`Sender: ${shortenAddress(senderAddr)}`);
printInfo(`Receiver: ${shortenAddress(receiverAddr)}`);
printSuccess('Accounts created and funded');
// Step 3: Set up file-backed watermark persistence
printStep(3, 'Set up file-backed watermark persistence');
const watermarkFile = path.join(os.tmpdir(), 'example-watermark.txt');
// Clean up any leftover file from a previous run
if (fs.existsSync(watermarkFile)) {
fs.unlinkSync(watermarkFile);
}
const watermarkPersistence = {
get: async (): Promise<bigint> => {
if (fs.existsSync(watermarkFile)) {
const content = fs.readFileSync(watermarkFile, 'utf-8').trim();
return BigInt(content);
}
return 0n;
},
set: async (newWatermark: bigint): Promise<void> => {
fs.writeFileSync(watermarkFile, newWatermark.toString(), 'utf-8');
},
};
printInfo(`Watermark file: ${watermarkFile}`);
printInfo(`Initial watermark: ${(await watermarkPersistence.get()).toString()}`);
printSuccess('File-backed watermark persistence configured');
// Step 4: Send first batch of 2 transactions
printStep(4, 'Send first batch of 2 payments');
let firstRound: bigint | undefined;
for (const note of ['batch1-txn1', 'batch1-txn2']) {
const result = await algorand.send.payment({
sender: sender.addr,
receiver: receiver.addr,
amount: algo(1),
note,
});
const round = result.confirmation.confirmedRound!;
if (!firstRound) firstRound = round;
printInfo(`Sent ${note}: round ${round}`);
}
printSuccess('First batch of 2 payments sent');
// Step 5: Set initial watermark so we only scan from our first transaction
printStep(5, 'Set initial watermark to isolate test transactions');
const startWatermark = firstRound! - 1n;
await watermarkPersistence.set(startWatermark);
printInfo(`Watermark set to: ${startWatermark.toString()}`);
printSuccess('Watermark positioned before first batch');
// Step 6: First poll — should catch exactly 2 transactions
printStep(6, 'First poll — expect 2 transactions from batch 1');
function createSubscriber() {
return new AlgorandSubscriber(
{
filters: [
{
name: 'payments',
filter: {
sender: senderAddr,
receiver: receiverAddr,
},
},
],
syncBehaviour: 'sync-oldest',
maxRoundsToSync: 100,
watermarkPersistence,
},
algorand.client.algod,
);
}
const subscriber1 = createSubscriber();
const result1 = await subscriber1.pollOnce();
const poll1Txns = result1.subscribedTransactions;
printInfo(`Transactions matched: ${poll1Txns.length.toString()}`);
for (const txn of poll1Txns) {
const note = txn.note ? Buffer.from(txn.note).toString('utf-8') : '';
printInfo(` ${note}: id: ${txn.id.slice(0, 12)}...`);
}
if (poll1Txns.length !== 2) {
throw new Error(`Expected 2 transactions in first poll, got ${poll1Txns.length}`);
}
printSuccess('First poll caught exactly 2 transactions');
// Step 7: Verify watermark was saved to file
printStep(7, 'Verify watermark persisted to file');
const savedWatermark = await watermarkPersistence.get();
const fileContent = fs.readFileSync(watermarkFile, 'utf-8').trim();
printInfo(`File content: ${fileContent}`);
printInfo(`Watermark value: ${savedWatermark.toString()}`);
if (savedWatermark <= startWatermark) {
throw new Error(
`Watermark should have advanced past ${startWatermark}, but is ${savedWatermark}`,
);
}
printSuccess(`Watermark advanced: ${startWatermark} -> ${savedWatermark}`);
// Step 8: Send second batch of 2 transactions
printStep(8, 'Send second batch of 2 payments');
for (const note of ['batch2-txn1', 'batch2-txn2']) {
const result = await algorand.send.payment({
sender: sender.addr,
receiver: receiver.addr,
amount: algo(2),
note,
});
printInfo(`Sent ${note}: round ${result.confirmation.confirmedRound}`);
}
printSuccess('Second batch of 2 payments sent');
// Step 9: Second poll — should catch ONLY the 2 new transactions (not the old ones)
printStep(9, 'Second poll — expect only 2 NEW transactions from batch 2');
// Create a fresh subscriber instance — it reads watermark from file,
// proving persistence works across subscriber instances
const subscriber2 = createSubscriber();
const result2 = await subscriber2.pollOnce();
const poll2Txns = result2.subscribedTransactions;
printInfo(`Transactions matched: ${poll2Txns.length.toString()}`);
for (const txn of poll2Txns) {
const note = txn.note ? Buffer.from(txn.note).toString('utf-8') : '';
printInfo(` ${note}: id: ${txn.id.slice(0, 12)}...`);
}
if (poll2Txns.length !== 2) {
throw new Error(`Expected 2 transactions in second poll, got ${poll2Txns.length}`);
}
// Verify these are batch2 transactions, not batch1
const poll2Notes = poll2Txns.map(txn =>
txn.note ? Buffer.from(txn.note).toString('utf-8') : '',
);
const allBatch2 = poll2Notes.every(note => note.startsWith('batch2'));
if (!allBatch2) {
throw new Error(`Expected only batch2 transactions, got: ${poll2Notes.join(', ')}`);
}
printSuccess('Second poll caught exactly 2 NEW transactions (batch2 only)');
// Step 10: Verify final watermark advanced again
printStep(10, 'Verify final watermark');
const finalWatermark = await watermarkPersistence.get();
printInfo(`Final watermark: ${finalWatermark.toString()}`);
if (finalWatermark <= savedWatermark) {
throw new Error(
`Final watermark should have advanced past ${savedWatermark}, but is ${finalWatermark}`,
);
}
printSuccess(`Watermark advanced: ${savedWatermark} -> ${finalWatermark}`);
// Step 11: Explain at-least-once delivery semantics
printStep(11, 'At-least-once delivery semantics');
console.log();
console.log(' ┌─────────────────────────────────────────────────────────────┐');
console.log(' │ Watermark Persistence & Delivery Semantics │');
console.log(' ├─────────────────────────────────────────────────────────────┤');
console.log(' │ │');
console.log(' │ The watermark is updated AFTER processing completes: │');
console.log(' │ │');
console.log(' │ 1. get() -> read current watermark │');
console.log(' │ 2. Fetch transactions from watermark to tip │');
console.log(' │ 3. Fire on/onBatch handlers │');
console.log(' │ 4. set(newWatermark) -> persist new watermark │');
console.log(' │ │');
console.log(' │ If the process crashes between steps 3 and 4, the │');
console.log(' │ watermark is NOT updated. On restart, the same │');
console.log(' │ transactions will be re-fetched and re-processed. │');
console.log(' │ │');
console.log(' │ This gives AT-LEAST-ONCE delivery: │');
console.log(' │ - Every transaction is guaranteed to be processed │');
console.log(' │ - Some transactions MAY be processed more than once │');
console.log(' │ - Handlers should be idempotent (safe to re-run) │');
console.log(' │ │');
console.log(' │ To achieve exactly-once semantics, persist the watermark │');
console.log(' │ in the same atomic transaction as your business logic │');
console.log(' │ (e.g., in a database transaction). │');
console.log(' │ │');
console.log(' └─────────────────────────────────────────────────────────────┘');
console.log();
// Step 12: Clean up temp file
printStep(12, 'Clean up temp file');
fs.unlinkSync(watermarkFile);
printInfo(`Deleted: ${watermarkFile}`);
printSuccess('Temp file cleaned up');
printHeader('Example complete');
}
main().catch(err => {
printError(err.message);
process.exit(1);
});