Realtime websocket sdk
This document describes how to connect to the TGM Expert WebSocket server from any JavaScript/TypeScript client (Angular, React, Vue, vanilla JS) and subscribe to real-time events: entity CRUD changes, dynamic queue messages, IoT events, notifications, and more.
Table of Contents¶
- Quick Start
- Connection
- Authentication
- Entity Changes (Real-Time CRUD)
- Dynamic Queue Messages
- IoT Events
- Notifications
- Calendar Updates
- All STOMP Destinations Reference
- Angular Integration (RxStomp)
- React Integration (@stomp/stompjs)
- Error Handling & Reconnection
- Troubleshooting
Quick Start¶
Install the STOMP client:
# For any framework
npm install @stomp/stompjs sockjs-client
# Angular-specific (optional, wraps stompjs in RxJS)
npm install @stomp/rx-stomp
Minimal working example:
import { Client } from '@stomp/stompjs';
import SockJS from 'sockjs-client';
const client = new Client({
webSocketFactory: () => new SockJS('http://localhost:1337/ws'),
connectHeaders: {
Authorization: 'Bearer <your-jwt-token>',
},
onConnect: () => {
console.log('Connected');
// Listen to all entity changes
client.subscribe('/topic/entities', (message) => {
const event = JSON.parse(message.body);
console.log(`${event.action} on ${event.resourceName}:`, event.entityId);
});
},
});
client.activate();
That's it. When any entity is created, updated, or deleted via the API, you receive it in real-time.
Connection¶
Endpoints¶
| Endpoint | Transport | Use Case |
|---|---|---|
/ws |
WebSocket + SockJS | General purpose |
/ws/chat |
WebSocket + SockJS | Chat features |
/ws-raw |
Raw WebSocket (no SockJS) | Native WS clients |
SockJS (recommended for browsers)¶
import { Client } from '@stomp/stompjs';
import SockJS from 'sockjs-client';
const client = new Client({
webSocketFactory: () => new SockJS('http://localhost:1337/ws'),
});
Raw WebSocket (mobile/native clients)¶
const client = new Client({
brokerURL: 'ws://localhost:1337/ws-raw',
});
Authentication¶
The server authenticates STOMP connections via JWT. Pass the token in the CONNECT headers:
const client = new Client({
webSocketFactory: () => new SockJS('http://localhost:1337/ws'),
connectHeaders: {
Authorization: 'Bearer <jwt-token>',
},
});
The token is validated by ChatWebSocketAuthHandler on the STOMP CONNECT frame. If the token is missing or invalid, the connection is allowed anonymously (no user-specific queues will work).
Entity Changes (Real-Time CRUD)¶
When any entity (inspection, intervention, unit, component, failure, location, work order, etc.) is created, updated, or deleted via BaseApiController, a real-time event is broadcast to WebSocket.
No configuration needed. This works automatically for every entity type managed by BaseApiController.
STOMP Destinations¶
| Destination | Description |
|---|---|
/topic/entities |
All entity changes (firehose) |
/topic/entities/{resourceName} |
Changes for a specific resource only |
The {resourceName} matches the API resource name (plural): inspections, interventions, units, components, failures, locations, work_orders, gate_systems, etc.
Payload Format¶
{
"resourceName": "inspections",
"action": "created",
"entityId": 42,
"timestamp": "2026-02-17T14:30:00.123",
"data": {
"id": 42,
"titleOfInspection": "Annual Turbine Check",
"status": "PENDING",
"...": "full entity object"
}
}
| Field | Type | Description |
|---|---|---|
resourceName |
string | API resource name (plural) — e.g. inspections, units |
action |
string | "created", "updated", or "deleted" |
entityId |
number | The entity ID |
timestamp |
string | ISO timestamp of the event |
data |
object|null | Full entity object on create/update. null on delete. |
Example: Subscribe to All Entity Changes¶
client.subscribe('/topic/entities', (message) => {
const event = JSON.parse(message.body);
switch (event.action) {
case 'created':
console.log(`New ${event.resourceName} created:`, event.data);
break;
case 'updated':
console.log(`${event.resourceName} #${event.entityId} updated:`, event.data);
break;
case 'deleted':
console.log(`${event.resourceName} #${event.entityId} deleted`);
break;
}
});
Example: Subscribe to a Specific Entity Type¶
// Only inspection changes
client.subscribe('/topic/entities/inspections', (message) => {
const event = JSON.parse(message.body);
console.log('Inspection', event.action, event.data);
});
// Only unit changes
client.subscribe('/topic/entities/units', (message) => {
const event = JSON.parse(message.body);
console.log('Unit', event.action, event.data);
});
Example: Keep a Local List in Sync¶
let inspections: any[] = [];
// Load initial data from REST API
fetch('/api/inspections').then(r => r.json()).then(data => {
inspections = data.data;
});
// Keep it in sync via WebSocket
client.subscribe('/topic/entities/inspections', (message) => {
const event = JSON.parse(message.body);
switch (event.action) {
case 'created':
inspections.push(event.data);
break;
case 'updated':
const idx = inspections.findIndex(i => i.id === event.entityId);
if (idx >= 0) inspections[idx] = event.data;
break;
case 'deleted':
inspections = inspections.filter(i => i.id !== event.entityId);
break;
}
});
Dynamic Queue Messages¶
When a message is consumed from a RabbitMQ dynamic queue and successfully processed by its handler, it is broadcast to WebSocket. This lets frontend clients receive queue messages without connecting to RabbitMQ directly.
Prerequisites: The queue must exist in RabbitMQ and have an active listener. Create queues via POST /api/admin/queues.
STOMP Destinations¶
| Destination | Description |
|---|---|
/topic/queues |
All queue messages (firehose) |
/topic/queues/{queueName} |
Messages from a specific queue only |
The {queueName} is the queue name as defined in QueueConfiguration.name — e.g. tgm_iot, tgm_notifications, tgm_erp_sync, or any custom queue you create.
Payload Format¶
Queue-specific (/topic/queues/{queueName}):
{
"queueName": "tgm_iot",
"handlerType": "iot",
"routingKey": "tgm.iot",
"timestamp": "2026-02-17T14:30:00.456",
"payload": {
"sensorId": 123,
"value": 75.5,
"alertLevel": "warning"
}
}
Firehose (/topic/queues):
{
"queueName": "tgm_iot",
"data": {
"queueName": "tgm_iot",
"handlerType": "iot",
"routingKey": "tgm.iot",
"timestamp": "2026-02-17T14:30:00.456",
"payload": { "..." }
}
}
| Field | Type | Description |
|---|---|---|
queueName |
string | The RabbitMQ queue name |
handlerType |
string | Handler that processed it: generic, iot, notification, erp, webhook |
routingKey |
string | The RabbitMQ routing key the message was published with |
timestamp |
string | ISO timestamp of when it was broadcast |
payload |
object|null | The original message payload from RabbitMQ (JSON) |
Example: Subscribe to a Specific Queue¶
// Listen to IoT queue messages
client.subscribe('/topic/queues/tgm_iot', (message) => {
const event = JSON.parse(message.body);
console.log('IoT message:', event.payload);
});
// Listen to ERP sync messages
client.subscribe('/topic/queues/tgm_erp_sync', (message) => {
const event = JSON.parse(message.body);
console.log('ERP sync:', event.payload);
});
Example: Subscribe to All Queue Messages¶
client.subscribe('/topic/queues', (message) => {
const event = JSON.parse(message.body);
console.log(`Message from queue "${event.queueName}":`, event.data);
});
Example: Custom Queue (Created at Runtime)¶
// 1. Create a custom queue via the admin API
const response = await fetch('/api/admin/queues', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer <token>',
},
body: JSON.stringify({
name: 'my_alerts_queue',
displayName: 'My Custom Alerts',
routingKey: 'tgm.alerts.critical',
handlerType: 'generic',
isEnabled: true,
autoStart: true,
}),
});
// 2. Subscribe to it immediately — no code changes, no restart
client.subscribe('/topic/queues/my_alerts_queue', (message) => {
const event = JSON.parse(message.body);
console.log('Custom alert:', event.payload);
});
// 3. Any message published to RabbitMQ with routing key "tgm.alerts.critical"
// will now appear here in real-time.
IoT Events¶
IoT events from the static RabbitMQListener (not dynamic queues) are broadcast here.
STOMP Destination¶
| Destination | Description |
|---|---|
/topic/iot-event |
All IoT events |
Example¶
client.subscribe('/topic/iot-event', (message) => {
const event = JSON.parse(message.body);
console.log(`Sensor ${event.sensorId}: ${event.value} (${event.alertLevel})`);
});
Notifications¶
User-specific notifications. Requires authentication — the server routes based on user ID.
STOMP Destination¶
| Destination | Description |
|---|---|
/user/queue/notifications |
Notifications for the authenticated user |
Example¶
client.subscribe('/user/queue/notifications', (message) => {
const notification = JSON.parse(message.body);
console.log(`[${notification.title}] ${notification.message}`);
showToast(notification);
});
Calendar Updates¶
User-specific calendar events (e.g. when an inspection or intervention is scheduled).
STOMP Destination¶
| Destination | Description |
|---|---|
/user/queue/calendar |
Calendar updates for the authenticated user |
Payload Format¶
{
"action": "created",
"entityType": "inspection",
"entityId": 42,
"timestamp": "2026-02-17T14:30:00"
}
Example¶
client.subscribe('/user/queue/calendar', (message) => {
const event = JSON.parse(message.body);
console.log(`Calendar: ${event.action} ${event.entityType} #${event.entityId}`);
refreshCalendar();
});
All STOMP Destinations Reference¶
Broadcast Topics (anyone can subscribe)¶
| Destination | Source | Description |
|---|---|---|
/topic/entities |
WebSocketEntityBroadcastListener |
All entity CRUD events |
/topic/entities/{resourceName} |
WebSocketEntityBroadcastListener |
CRUD events for one resource type |
/topic/queues |
DynamicQueueService |
All dynamic queue messages |
/topic/queues/{queueName} |
DynamicQueueService |
Messages from one queue |
/topic/iot-event |
RabbitMQListener |
IoT sensor events |
/topic/{entityType} |
RabbitMQListener (generic) |
Legacy entity updates via RabbitMQ |
/topic/reservoir/{id}/sensors |
ReservoirWebSocketHandler |
Reservoir sensor data stream |
/topic/reservoir/{id}/status |
ReservoirWebSocketHandler |
Reservoir status/alerts |
User-Specific Queues (requires authentication)¶
| Destination | Source | Description |
|---|---|---|
/user/queue/notifications |
WebSocketService |
User notifications |
/user/queue/calendar |
WebSocketService |
Calendar updates |
/user/queue/chat/messages |
ChatWebSocketHandler |
Incoming chat messages |
/user/queue/chat/typing |
ChatWebSocketHandler |
Typing indicators |
/user/queue/chat/presence |
ChatWebSocketHandler |
Presence updates |
/user/queue/chat/receipts |
ChatWebSocketHandler |
Read receipts |
/user/queue/chat/reactions |
ChatWebSocketHandler |
Reaction updates |
/user/queue/video/incoming-call |
VideoCallWebSocketHandler |
Incoming video call |
/user/queue/video/call-answered |
VideoCallWebSocketHandler |
Call answered |
/user/queue/video/call-ended |
VideoCallWebSocketHandler |
Call ended |
Application Endpoints (send messages)¶
| Destination | Handler | Description |
|---|---|---|
/app/chat.send |
ChatWebSocketHandler |
Send chat message |
/app/chat.typing |
ChatWebSocketHandler |
Send typing indicator |
/app/chat.markRead |
ChatWebSocketHandler |
Mark messages as read |
/app/chat.reaction |
ChatWebSocketHandler |
Add/remove reaction |
/app/reservoir.subscribe |
ReservoirWebSocketHandler |
Subscribe to reservoir data |
/app/video.call-init |
VideoCallWebSocketHandler |
Initiate video call |
/app/video.call-answer |
VideoCallWebSocketHandler |
Answer video call |
/app/video.call-end |
VideoCallWebSocketHandler |
End video call |
Angular Integration (RxStomp)¶
Service¶
import { Injectable, OnDestroy } from '@angular/core';
import { RxStomp, RxStompConfig } from '@stomp/rx-stomp';
import { Observable, BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';
import SockJS from 'sockjs-client';
export interface EntityChangeEvent<T = any> {
resourceName: string;
action: 'created' | 'updated' | 'deleted';
entityId: number;
timestamp: string;
data: T | null;
}
export interface QueueMessageEvent<T = any> {
queueName: string;
handlerType: string;
routingKey: string;
timestamp: string;
payload: T | null;
}
@Injectable({ providedIn: 'root' })
export class RealtimeService implements OnDestroy {
private rxStomp = new RxStomp();
private connected$ = new BehaviorSubject<boolean>(false);
/** Connect to the WebSocket server. Call once at app startup. */
connect(token: string, serverUrl = 'http://localhost:1337'): void {
const config: RxStompConfig = {
webSocketFactory: () => new SockJS(`${serverUrl}/ws`),
connectHeaders: { Authorization: `Bearer ${token}` },
heartbeatIncoming: 10000,
heartbeatOutgoing: 10000,
reconnectDelay: 5000,
};
this.rxStomp.configure(config);
this.rxStomp.activate();
this.rxStomp.connected$.subscribe(() => this.connected$.next(true));
this.rxStomp.stompErrors$.subscribe(() => this.connected$.next(false));
}
/** Disconnect from the WebSocket server. */
disconnect(): void {
this.rxStomp.deactivate();
this.connected$.next(false);
}
/** Observable of connection status. */
isConnected(): Observable<boolean> {
return this.connected$.asObservable();
}
// ─── Entity Changes ──────────────────────────────────────────────
/** Subscribe to ALL entity changes. */
allEntityChanges(): Observable<EntityChangeEvent> {
return this.rxStomp.watch('/topic/entities').pipe(
map(msg => JSON.parse(msg.body))
);
}
/** Subscribe to changes for a specific resource type. */
entityChanges<T = any>(resourceName: string): Observable<EntityChangeEvent<T>> {
return this.rxStomp.watch(`/topic/entities/${resourceName}`).pipe(
map(msg => JSON.parse(msg.body))
);
}
// ─── Dynamic Queues ──────────────────────────────────────────────
/** Subscribe to ALL dynamic queue messages. */
allQueueMessages(): Observable<{ queueName: string; data: QueueMessageEvent }> {
return this.rxStomp.watch('/topic/queues').pipe(
map(msg => JSON.parse(msg.body))
);
}
/** Subscribe to messages from a specific queue. */
queueMessages<T = any>(queueName: string): Observable<QueueMessageEvent<T>> {
return this.rxStomp.watch(`/topic/queues/${queueName}`).pipe(
map(msg => JSON.parse(msg.body))
);
}
// ─── IoT ─────────────────────────────────────────────────────────
/** Subscribe to IoT sensor events. */
iotEvents(): Observable<any> {
return this.rxStomp.watch('/topic/iot-event').pipe(
map(msg => JSON.parse(msg.body))
);
}
// ─── User-specific ───────────────────────────────────────────────
/** Subscribe to notifications for the authenticated user. */
notifications(): Observable<any> {
return this.rxStomp.watch('/user/queue/notifications').pipe(
map(msg => JSON.parse(msg.body))
);
}
/** Subscribe to calendar updates for the authenticated user. */
calendarUpdates(): Observable<any> {
return this.rxStomp.watch('/user/queue/calendar').pipe(
map(msg => JSON.parse(msg.body))
);
}
ngOnDestroy(): void {
this.disconnect();
}
}
Usage in a Component¶
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { RealtimeService, EntityChangeEvent } from './realtime.service';
@Component({
selector: 'app-inspection-list',
template: `
<div *ngFor="let insp of inspections" [class.flash]="flashIds.has(insp.id)">
{{ insp.titleOfInspection }} — {{ insp.status }}
</div>
`,
})
export class InspectionListComponent implements OnInit, OnDestroy {
inspections: any[] = [];
flashIds = new Set<number>();
private destroy$ = new Subject<void>();
constructor(
private realtimeService: RealtimeService,
private http: HttpClient,
) {}
ngOnInit(): void {
// 1. Load initial data
this.http.get<any>('/api/inspections').subscribe(res => {
this.inspections = res.data;
});
// 2. Subscribe to real-time inspection changes
this.realtimeService
.entityChanges('inspections')
.pipe(takeUntil(this.destroy$))
.subscribe(event => this.handleChange(event));
}
private handleChange(event: EntityChangeEvent): void {
switch (event.action) {
case 'created':
this.inspections.unshift(event.data);
this.flash(event.entityId);
break;
case 'updated':
const i = this.inspections.findIndex(x => x.id === event.entityId);
if (i >= 0) this.inspections[i] = event.data;
this.flash(event.entityId);
break;
case 'deleted':
this.inspections = this.inspections.filter(x => x.id !== event.entityId);
break;
}
}
private flash(id: number): void {
this.flashIds.add(id);
setTimeout(() => this.flashIds.delete(id), 2000);
}
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}
Usage: Queue Monitor Component¶
@Component({
selector: 'app-queue-monitor',
template: `
<h3>Live Queue Messages</h3>
<div *ngFor="let msg of messages">
[{{ msg.queueName }}] {{ msg.handlerType }} — {{ msg.routingKey }}
<pre>{{ msg.payload | json }}</pre>
</div>
`,
})
export class QueueMonitorComponent implements OnInit, OnDestroy {
messages: any[] = [];
private destroy$ = new Subject<void>();
constructor(private realtimeService: RealtimeService) {}
ngOnInit(): void {
// Listen to ALL dynamic queue messages
this.realtimeService
.allQueueMessages()
.pipe(takeUntil(this.destroy$))
.subscribe(event => {
this.messages.unshift(event.data);
if (this.messages.length > 100) this.messages.pop();
});
}
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}
Usage: Listen to a Custom Queue Created at Runtime¶
ngOnInit(): void {
// This queue was created via POST /api/admin/queues with name "my_alerts_queue".
// No code changes were needed on the backend. Just subscribe:
this.realtimeService
.queueMessages('my_alerts_queue')
.pipe(takeUntil(this.destroy$))
.subscribe(event => {
console.log('Alert payload:', event.payload);
});
}
React Integration (@stomp/stompjs)¶
Hook¶
// useRealtime.ts
import { useEffect, useRef, useCallback, useState } from 'react';
import { Client, IMessage } from '@stomp/stompjs';
import SockJS from 'sockjs-client';
interface EntityChangeEvent<T = any> {
resourceName: string;
action: 'created' | 'updated' | 'deleted';
entityId: number;
timestamp: string;
data: T | null;
}
interface QueueMessageEvent<T = any> {
queueName: string;
handlerType: string;
routingKey: string;
timestamp: string;
payload: T | null;
}
export function useStompClient(serverUrl: string, token: string) {
const clientRef = useRef<Client | null>(null);
const [connected, setConnected] = useState(false);
useEffect(() => {
const client = new Client({
webSocketFactory: () => new SockJS(`${serverUrl}/ws`),
connectHeaders: { Authorization: `Bearer ${token}` },
reconnectDelay: 5000,
heartbeatIncoming: 10000,
heartbeatOutgoing: 10000,
onConnect: () => setConnected(true),
onDisconnect: () => setConnected(false),
onStompError: () => setConnected(false),
});
client.activate();
clientRef.current = client;
return () => {
client.deactivate();
};
}, [serverUrl, token]);
const subscribe = useCallback(
(destination: string, callback: (body: any) => void) => {
const client = clientRef.current;
if (!client) return () => {};
// If already connected, subscribe now. Otherwise wait.
let sub: any = null;
const doSubscribe = () => {
sub = client.subscribe(destination, (message: IMessage) => {
callback(JSON.parse(message.body));
});
};
if (client.connected) {
doSubscribe();
} else {
const origOnConnect = client.onConnect;
client.onConnect = (frame) => {
origOnConnect?.(frame);
doSubscribe();
};
}
return () => sub?.unsubscribe();
},
[],
);
return { connected, subscribe };
}
// ─── Convenience hooks ─────────────────────────────────────────────
export function useEntityChanges<T = any>(
client: ReturnType<typeof useStompClient>,
resourceName: string,
onEvent: (event: EntityChangeEvent<T>) => void,
) {
useEffect(() => {
return client.subscribe(`/topic/entities/${resourceName}`, onEvent);
}, [client, resourceName, onEvent]);
}
export function useAllEntityChanges(
client: ReturnType<typeof useStompClient>,
onEvent: (event: EntityChangeEvent) => void,
) {
useEffect(() => {
return client.subscribe('/topic/entities', onEvent);
}, [client, onEvent]);
}
export function useQueueMessages<T = any>(
client: ReturnType<typeof useStompClient>,
queueName: string,
onEvent: (event: QueueMessageEvent<T>) => void,
) {
useEffect(() => {
return client.subscribe(`/topic/queues/${queueName}`, onEvent);
}, [client, queueName, onEvent]);
}
Usage in a React Component¶
function InspectionList() {
const [inspections, setInspections] = useState<any[]>([]);
const stomp = useStompClient('http://localhost:1337', authToken);
// Load initial data
useEffect(() => {
fetch('/api/inspections')
.then(r => r.json())
.then(res => setInspections(res.data));
}, []);
// Subscribe to real-time inspection changes
useEntityChanges(stomp, 'inspections', useCallback((event) => {
setInspections(prev => {
switch (event.action) {
case 'created':
return [event.data, ...prev];
case 'updated':
return prev.map(i => i.id === event.entityId ? event.data : i);
case 'deleted':
return prev.filter(i => i.id !== event.entityId);
default:
return prev;
}
});
}, []));
return (
<ul>
{inspections.map(i => (
<li key={i.id}>{i.titleOfInspection} — {i.status}</li>
))}
</ul>
);
}
Usage: Queue Monitor¶
function QueueMonitor({ queueName }: { queueName: string }) {
const [messages, setMessages] = useState<any[]>([]);
const stomp = useStompClient('http://localhost:1337', authToken);
useQueueMessages(stomp, queueName, useCallback((event) => {
setMessages(prev => [event, ...prev].slice(0, 100));
}, []));
return (
<div>
<h3>Messages from: {queueName}</h3>
{messages.map((msg, i) => (
<pre key={i}>{JSON.stringify(msg.payload, null, 2)}</pre>
))}
</div>
);
}
// Usage: <QueueMonitor queueName="tgm_iot" />
// Or for a custom queue: <QueueMonitor queueName="my_alerts_queue" />
Error Handling & Reconnection¶
The @stomp/stompjs client handles reconnection automatically via reconnectDelay.
const client = new Client({
// ...
reconnectDelay: 5000, // Retry every 5 seconds
heartbeatIncoming: 10000, // Server pings every 10s
heartbeatOutgoing: 10000, // Client pings every 10s
onStompError: (frame) => {
console.error('STOMP error:', frame.headers['message']);
},
onWebSocketClose: (event) => {
console.warn('WebSocket closed, will reconnect...');
},
});
Subscriptions survive reconnection — the STOMP client re-subscribes automatically after reconnect.
Troubleshooting¶
"WebSocket connection failed"¶
- Check the server is running on the expected port
- Check CORS: the
app.cors.allowed-originsproperty must include your frontend origin - If using SockJS, ensure
/wsendpoint is accessible (trycurl http://localhost:1337/ws/info)
"Not receiving entity change events"¶
- Entity changes are only broadcast after the transaction commits (
AFTER_COMMIT). If the transaction rolls back, no event is sent. - The broadcast is async — there may be a small delay (milliseconds).
- Check the browser console for STOMP connection errors.
- Verify you're subscribing to the correct destination:
/topic/entities/inspections(plural, matching the API resource name).
"Not receiving queue messages"¶
- The queue must exist in RabbitMQ. Create it via
POST /api/admin/queues. - The queue listener must be running. Start it via
POST /api/admin/queues/{id}/start. - Messages are only broadcast on successful handler processing. Check queue message history via
GET /api/admin/queues/{id}/history. - Verify the routing key matches what the publisher uses.
"Not receiving notifications"¶
- Notifications are user-specific (
/user/queue/notifications). You must be authenticated via JWT. - The token must be valid and contain the correct user ID.
"Messages are duplicated"¶
- If you subscribe to both
/topic/entities(firehose) and/topic/entities/inspections(specific), you will receive inspection events twice. Choose one. - Same applies to
/topic/queuesvs/topic/queues/{queueName}.