From Design to Implementation
In the previous article, I outlined the design goals and high-level architecture of a reactive Telegram bot framework — why non-blocking I/O matters, how separating polling from processing enables horizontal scaling, and why sealed interfaces make response dispatch type-safe at compile time.
That article showed simplified code to explain the concepts. This one goes deeper. We will walk through the actual implementation: how Flux.create and expand drive the polling loop with back-pressure, how atomic variables coordinate concurrent state without locks, how the channel abstraction separates read and write concerns, and how Apache Pulsar’s reactive client integrates natively with Project Reactor.
If you are building the delivery layer for something like the RAG-powered assistant from the previous article — where users interact with an LLM through Telegram — this is the infrastructure that makes it reactive end-to-end.
Architecture — What Changed
The previous article described a single ReactiveQueue<T> interface with publish, subscribe, and close. In practice, this conflation caused problems: a component that only reads from a queue should not have access to publish, and a publisher should not accidentally call subscribe. The interface was split into two:
This is Interface Segregation applied to reactive channels. The polling pipeline also changed fundamentally — from Flux.interval with flat retry to Flux.create with demand-driven expansion. Let me walk through each component.
The Polling Pipeline — Flux.create Meets expand
The heart of the framework is TelegramBotUpdatePoller. Its job: continuously fetch updates from the Telegram Bot API and emit them as a Flux<Update> that respects downstream back-pressure.
The previous article showed a simplified version using Flux.interval:
// Simplified version from the previous article
Flux.interval(pollingInterval)
.flatMap(tick -> client.getUpdates(offset, limit, timeout, allowedUpdates))
.flatMap(response -> Flux.fromIterable(response.getResult()))
.doOnNext(update -> offset = update.updateId() + 1)
.retry()
.subscribe();
This works, but has a fundamental issue: it ignores back-pressure. The interval ticks regardless of whether downstream consumers have processed previous updates. If the handler is slow, updates pile up in memory.
The actual implementation uses Flux.create with a demand-driven loop:
public Flux<Update> subscribe() {
return Flux.<Update>create(sink -> {
sink.onRequest(requested -> {
Disposable disposable = pollUpdates(sink, lastOffset.get())
.doOnNext(sink::next)
.collectList()
.expand(list -> {
if (list.size() >= limit
&& sink.requestedFromDownstream() > 0
&& !sink.isCancelled()) {
return pollUpdates(sink, lastOffset.get()).collectList();
}
sink.complete();
return Flux.empty();
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
sink.onDispose(disposable);
sink.onCancel(disposable);
});
});
}
There is a lot happening in these 20 lines. Let me break it down.
Flux.create — Bridging Push and Pull
Flux.create gives us a FluxSink — a programmatic handle for pushing elements into a reactive stream. Unlike Flux.generate (which is pull-based and synchronous), Flux.create supports multi-threaded emission and lets us control the lifecycle explicitly.
The sink.onRequest(requested -> ...) callback fires when the downstream subscriber signals demand. This is the back-pressure entry point: we only start polling when someone is ready to consume.
expand — Recursive Continuation Without Stack Overflow
After the first poll returns, we need to decide: should we poll again immediately, or stop? This is where expand comes in. It works like a breadth-first recursive flatMap:
- Collect the poll results into a
List - If the list is full (
size >= limit), downstream still wants data (requestedFromDownstream() > 0), and the sink is not cancelled — poll again - Otherwise, complete the stream
This is critical for Telegram’s long-polling model. When there are many pending updates, the API returns a full batch immediately. expand chains these batch fetches without returning to the event loop between each one, reducing latency during catch-up scenarios.
Why Not Flux.interval?
| Aspect | Flux.interval |
Flux.create + expand |
|---|---|---|
| Back-pressure | Ignores downstream demand | Checks requestedFromDownstream() before each poll |
| Batch chaining | Fixed interval between polls | Immediate continuation when batch is full |
| Cancellation | Requires external Disposable management |
Built-in via sink.onCancel |
| Thread model | Timer thread + flatMap scheduler | Single boundedElastic subscription |
| Error propagation | Requires explicit onErrorResume in the chain |
Errors flow directly to sink.error |
Atomic State — Thread Safety Without Locks
The poller maintains three pieces of mutable state, all using java.util.concurrent.atomic types:
private final AtomicLong lastOffset = new AtomicLong(0);
private final AtomicBoolean inFlight = new AtomicBoolean(false);
private final AtomicReference<Disposable> subscription = new AtomicReference<>();
Offset Tracking
Each Telegram update has a monotonically increasing updateId. The next poll must send offset = lastUpdateId + 1 to avoid receiving duplicates. The update happens atomically:
.doOnNext(update -> {
lastOffset.updateAndGet(currentVal -> {
if (update.updateId() >= currentVal) {
return update.updateId() + 1;
}
return currentVal;
});
})
The updateAndGet with a conditional ensures we never move the offset backward — even if updates arrive out of order due to retry or concurrent emission.
Flight Control
The inFlight boolean prevents overlapping HTTP requests:
Flux<Update> pollUpdates(FluxSink<Update> sink, long offset) {
if (sink.isCancelled() || !inFlight.compareAndSet(false, true)) {
return Flux.empty();
}
if (sink.requestedFromDownstream() <= 0) {
inFlight.set(false);
return Flux.empty();
}
return botClient.getUpdates(offset, limit, timeout, allowedUpdates)
.onErrorResume(WebClientRequestException.class, e -> {
if (e.getCause() instanceof ReadTimeoutException) {
return Flux.empty();
}
return Flux.error(e);
})
.doOnNext(update -> { /* offset tracking */ });
}
Three guard clauses protect against invalid states:
sink.isCancelled()— subscriber disconnected, stop immediatelyinFlight.compareAndSet(false, true)— another poll is already in progress, skiprequestedFromDownstream() <= 0— no demand, do not fetch
The ReadTimeoutException handling is important: Telegram’s long-polling returns an empty response when the timeout expires without new updates. This is normal operation, not an error — we return Flux.empty() and let expand decide whether to poll again.
The HTTP Client — WebClient and Non-Blocking I/O
TelegramBotClient wraps Spring’s WebClient to call every Telegram Bot API method reactively:
public Flux<Update> getUpdates(long offset, int limit, int timeout,
List<String> allowedUpdates) {
return webClient.get()
.uri(endpoint + "/bot" + token, uriBuilder ->
uriBuilder
.pathSegment("getUpdates")
.queryParam("offset", offset)
.queryParam("limit", limit)
.queryParam("timeout", timeout)
.queryParam("allowed_updates", allowedUpdates)
.build()
)
.exchangeToMono(clientResponse ->
extractResponse(clientResponse,
new ParameterizedTypeReference<Response<List<Update>>>() {})
)
.flatMapMany(response -> Flux.fromIterable(response.getResult()));
}
Key details:
exchangeToMonogives access to the fullClientResponse, including status code and headers, before committing to a body type. This is where error responses are transformed intoTelegramApiException.flatMapManyunpacks theList<Update>from the response into individual elements in the stream. This is what allows the poller to process updates one by one rather than in batches.- The
timeoutparameter is passed directly to Telegram’s API — this is the long-polling timeout in seconds, not an HTTP timeout. The HTTP client should have its own read timeout configured slightly higher than this value.
Every send method (sendMessage, sendPhoto, editMessageText, etc.) follows the same pattern — webClient.post() with form data, returning Mono<Response<T>>:
public Mono<Response<Message>> sendMessage(
long chatId, String text,
@Nullable ParseMode parseMode, @Nullable Object replyMarkup,
@Nullable List<MessageEntity> entities,
@Nullable Boolean disableNotification, @Nullable Boolean protectContent) {
BodyInserters.FormInserter<String> inserter =
BodyInserters.fromFormData("chat_id", String.valueOf(chatId))
.with("text", text);
if (parseMode != null) {
inserter = inserter.with("parse_mode", parseMode.name());
}
if (replyMarkup != null) {
inserter = inserter.with("reply_markup", toJson(replyMarkup));
}
return webClient.post()
.uri(endpoint + "/bot" + token,
uriBuilder -> uriBuilder.pathSegment("sendMessage").build())
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(inserter)
.exchangeToMono(clientResponse ->
extractResponse(clientResponse,
new ParameterizedTypeReference<Response<Message>>() {}));
}
No thread is ever blocked waiting for a Telegram API response.
Building the Processing Pipeline — Pure Functional Composition
There is no dedicated handler interface or abstraction layer between the poller and your business logic. The Flux<Update> returned by TelegramBotUpdatePoller.subscribe() is a standard Reactor stream — you compose your processing pipeline using regular operators like flatMap, map, filter, and doOnNext.
The simplest pipeline — poll, process, reply:
poller.subscribe()
.filter(update -> update.message() != null && update.message().text() != null)
.flatMap(update -> {
long chatId = update.message().chat().id();
String text = update.message().text();
return replier.dispatch(
new BotResponse.SendMessage(chatId, "Echo: " + text, ParseMode.HTML)
);
})
.subscribe();
No framework abstractions, no handler interfaces — just Reactor operators. This is deliberate. A dedicated UpdateHandler interface would add indirection without adding value: the Function<Update, Publisher<?>> that flatMap already accepts is the handler.
Multi-Step Responses
A single update often requires multiple outbound actions. Since each step returns a Mono, you chain them with then or thenReturn:
poller.subscribe()
.filter(update -> update.message() != null)
.flatMap(update -> {
long chatId = update.message().chat().id();
// 1. Show typing indicator, then send reply
return replier.dispatch(new BotResponse.SendChatAction(chatId, ChatAction.TYPING))
.then(replier.dispatch(
new BotResponse.SendMessage(chatId, "Processed: " + update.message().text())
));
})
.subscribe();
Custom Error Handling and Retry
Because the pipeline is a plain Flux, you attach retry and error handling where it makes sense — per-update, per-section, or globally:
poller.subscribe()
.flatMap(update -> processUpdate(update)
.onErrorResume(TelegramApiException.class, e -> {
log.warn("Telegram API error for update {}: {}", update.updateId(), e.getMessage());
return Mono.empty(); // skip this update, continue processing
})
)
.retry() // restart the entire pipeline on unrecoverable errors
.subscribe();
The inner onErrorResume isolates failures per update — one bad message does not kill the stream. The outer retry restarts the pipeline if the poller itself fails (network outage, Telegram API down).
The Channel Abstraction
The previous article used a single ReactiveQueue<T> interface. The current design splits this into two:
public interface ReactiveChannel extends AutoCloseable {
void close();
}
public interface ReadableReactiveChannel<V> extends ReactiveChannel {
Flux<V> subscribe();
}
public interface WritableReactiveChannel<I, O> extends ReactiveChannel {
Mono<O> publish(I message);
}
The split has a practical consequence: WritableReactiveChannel<I, O> uses two type parameters. The input type I is what you publish; the output type O is what you get back (typically a message ID or acknowledgment). This matters for Pulsar, where publishing returns a MessageId that you might want to log or track.
Reactive Pulsar — Bridging Consumer and Producer to Reactor
The Pulsar queue module wraps the standard Apache Pulsar client (4.1.3) — Consumer<T> and Producer<T> — in the channel abstractions using Flux.create and Mono.fromFuture. No reactive adapter library is needed; the bridging is explicit and lightweight.
WritableReactiveChannel
public class PulsarWritableReactiveChannel<T>
implements WritableReactiveChannel<T, MessageId> {
private final Producer<T> producer;
public PulsarWritableReactiveChannel(Producer<T> producer) {
this.producer = producer;
}
@Override
public Mono<MessageId> publish(T message) {
return Mono.fromFuture(producer.sendAsync(message));
}
}
Mono.fromFuture bridges Pulsar’s CompletableFuture<MessageId> into Reactor — fully non-blocking, with the Mono completing when Pulsar acknowledges the write. The input type is T directly, not a wrapper — the channel accepts any payload that the producer’s Schema<T> can serialize.
ReadableReactiveChannel
public class PulsarReadableReactiveChannel<T>
implements ReadableReactiveChannel<Message<T>> {
private final Consumer<T> consumer;
public PulsarReadableReactiveChannel(Consumer<T> consumer) {
this.consumer = consumer;
}
@Override
public Flux<Message<T>> subscribe() {
return Flux.<Message<T>>create(sink -> {
AtomicReference<Disposable> disposableRef = new AtomicReference<>(null);
sink.onRequest(requested -> {
Disposable d = fetchNext(sink)
.subscribeOn(Schedulers.boundedElastic())
.doOnComplete(sink::complete)
.subscribe();
disposableRef.set(d);
});
sink.onDispose(() -> {
disposableRef.getAndUpdate(d -> {
if (d != null) d.dispose();
return null;
});
});
}, FluxSink.OverflowStrategy.ERROR);
}
}
The same Flux.create pattern from the poller appears here — a FluxSink bridges push-based consumption into a demand-driven Reactor stream. The OverflowStrategy.ERROR signals downstream that it must keep up; there is no internal buffer silently absorbing messages.
The actual consumption happens in fetchNext:
private Flux<Message<T>> fetchNext(FluxSink<Message<T>> sink) {
return Mono.<Message<T>>create(s -> {
try {
s.success(consumer.receive(100, TimeUnit.MILLISECONDS));
} catch (PulsarClientException e) {
s.error(e);
}
}).flux()
.flatMap(msg -> {
if (msg == null) {
return Mono.empty(); // end of topic — no more messages
}
sink.next(msg);
if (sink.isCancelled() || sink.requestedFromDownstream() <= 0) {
return Flux.empty();
}
return fetchNext(sink); // recursive continuation
})
.onErrorResume(Exception.class, e -> {
sink.error(e);
return Mono.empty();
});
}
Three details matter:
- Timeout-based polling —
consumer.receive(100, TimeUnit.MILLISECONDS)returnsnullwhen no message is available within the timeout. This is how end-of-topic detection works: anullresult means the consumer has caught up, and the stream completes. - Recursive continuation — when a message arrives and downstream still has demand,
fetchNextcalls itself. This is the same pattern asexpandin the poller, but expressed as direct recursion within aflatMap. - Back-pressure guards —
sink.isCancelled()andrequestedFromDownstream() <= 0prevent fetching when there is no demand, just like the poller’s flight control.
Explicit Acknowledgment
The readable channel returns raw Message<T> objects — not unwrapped payloads. This is deliberate: the caller controls when to acknowledge:
public Mono<Void> acknowledge(Message<T> message) {
return Mono.fromFuture(consumer.acknowledgeAsync(message));
}
public void negativeAcknowledge(Message<T> message) {
consumer.negativeAcknowledge(message);
}
Positive acknowledgment is async via Mono.fromFuture, letting you chain it into the pipeline. Negative acknowledgment is synchronous — it tells Pulsar to redeliver the message (or route to a dead-letter topic, depending on configuration). This separation gives you retry semantics without framework magic.
Wiring It Up
Setting up the Pulsar channels uses the standard Pulsar client API:
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// Reader — wraps a Pulsar Consumer
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://public/default/bot-updates")
.subscriptionName("bot-worker")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(1)
.subscribe();
var reader = new PulsarReadableReactiveChannel<>(consumer);
// Sender — wraps a Pulsar Producer
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://public/default/bot-responses")
.create();
var sender = new PulsarWritableReactiveChannel<>(producer);
A few configuration choices worth noting:
SubscriptionType.Shared— messages are distributed across all consumers with the same subscription name. This is how you scale workers horizontally: start more instances, and Pulsar balances the load.receiverQueueSize(1)— limits the internal prefetch buffer to one message. This gives the reactive channel explicit control over when to fetch the next message, rather than letting Pulsar prefetch a batch that might sit unprocessed.SubscriptionInitialPosition.Earliest— starts consuming from the beginning of the topic. For production, you would typically useLatestor rely on the cursor position from a durable subscription.
Putting It All Together
Here is how the full reactive pipeline looks when all components are assembled:
Direct pipeline — everything in one process:
TelegramBotClient client = new TelegramBotClient(botToken);
TelegramBotUpdatePoller poller = new TelegramBotUpdatePoller(client, 100, 30,
List.of("message", "callback_query"));
TelegramBotReplier replier = new TelegramBotReplier(client);
poller.subscribe()
.filter(update -> update.message() != null)
.flatMap(update -> {
long chatId = update.message().chat().id();
return replier.dispatch(
new BotResponse.SendMessage(chatId, "Processed: " + update.message().text())
);
})
.onErrorResume(e -> {
log.error("Pipeline error", e);
return Mono.empty();
})
.retry()
.subscribe();
The entire pipeline is a single Flux chain — poll, filter, process, dispatch. No handler interfaces, no framework callbacks. Standard Reactor composition gives you full control over error handling, retry strategy, and concurrency.
Distributed pipeline via Pulsar — split across services:
// Service 1: Poller — fetches updates and publishes to Pulsar
poller.subscribe()
.flatMap(update -> updateSender.publish(update))
.retry()
.subscribe();
// Service 2: Worker — reads from Pulsar, processes, writes results back
updateReader.subscribe()
.flatMap(msg -> {
Update update = msg.getValue();
long chatId = update.message().chat().id();
BotResponse response = new BotResponse.SendMessage(chatId, "Processed");
return updateReader.acknowledge(msg)
.then(responseSender.publish(response));
})
.retry()
.subscribe();
// Service 3: Replier — reads responses from Pulsar, dispatches to Telegram
responseReader.subscribe()
.flatMap(msg -> responseReader.acknowledge(msg)
.then(replier.dispatch(msg.getValue())))
.retry()
.subscribe();
Each service is a standalone Flux pipeline. The worker explicitly acknowledges each message after processing — if it crashes before acknowledging, Pulsar redelivers the message to another consumer. For per-conversation ordering, configure the producer with a message key (producer.newMessage().key(chatId).value(update).send()) and use SubscriptionType.Key_Shared on the consumer side. Scaling workers is a matter of starting more instances — Pulsar distributes messages across them automatically.
Testing Reactive Pipelines
Reactor Test’s StepVerifier is indispensable for testing reactive code. The framework uses three testing layers:
Unit tests — verify pipeline logic by testing each flatMap stage in isolation:
@Test
void shouldProcessTextMessage() {
Update update = createTestUpdate("hello");
Mono<BotResponse> result = Mono.just(update)
.filter(u -> u.message() != null && u.message().text() != null)
.map(u -> new BotResponse.SendMessage(
u.message().chat().id(), "Echo: " + u.message().text()));
StepVerifier.create(result)
.assertNext(r -> assertEquals("Echo: hello",
((BotResponse.SendMessage) r).text()))
.verifyComplete();
}
Integration tests — HTTP client against MockServer:
@Test
void shouldHandleLongPollTimeout() {
mockServer.when(request().withPath("/bot.*/getUpdates"))
.respond(response().withDelay(TimeUnit.SECONDS, 35));
StepVerifier.create(poller.subscribe().take(Duration.ofSeconds(5)))
.expectComplete()
.verify();
}
End-to-end tests — Pulsar channels with Docker Compose:
@Test
void publishAndSubscribe_singleMessage() {
StepVerifier.create(
sender.publish("hello").then()
).verifyComplete();
StepVerifier.create(reader.subscribe().take(1))
.assertNext(received -> {
assertEquals("hello", received.getValue());
})
.expectComplete()
.verify(Duration.ofSeconds(30));
}
And a multi-message test with explicit acknowledgment:
@Test
void publishAndSubscribe_multipleMessages() {
for (int i = 0; i < 4; i++) {
StepVerifier.create(sender.publish("hello-%d".formatted(i)))
.expectNextCount(1)
.verifyComplete();
}
StepVerifier.create(reader.subscribe()
.flatMap(msg -> reader.acknowledge(msg).thenReturn(msg.getValue()))
.take(4))
.expectNext("hello-0")
.expectNext("hello-1")
.expectNext("hello-2")
.expectNext("hello-3")
.expectComplete()
.verify(Duration.ofSeconds(10));
}
Conclusion
The gap between describing a reactive architecture and implementing one is where most of the interesting decisions live. Flux.create with expand gives us demand-driven polling that respects back-pressure. Atomic variables provide thread-safe state management without locks. Sealed interfaces with pattern matching make response dispatch exhaustive at compile time. And the Pulsar channel wrappers bridge Consumer/Producer into Reactor streams with explicit acknowledgment control — keeping the integration lightweight while preserving back-pressure semantics end-to-end.
The framework is open-source and available on GitHub.