This document describes how to connect to the TGM Expert WebSocket server from any JavaScript/TypeScript client (Angular, React, Vue, vanilla JS) and subscribe to real-time events: entity CRUD changes, dynamic queue messages, IoT events, notifications, and more.

Table of Contents


Quick Start

Install the STOMP client:

# For any framework
npm install @stomp/stompjs sockjs-client

# Angular-specific (optional, wraps stompjs in RxJS)
npm install @stomp/rx-stomp

Minimal working example:

import { Client } from '@stomp/stompjs';
import SockJS from 'sockjs-client';

const client = new Client({
  webSocketFactory: () => new SockJS('http://localhost:1337/ws'),
  connectHeaders: {
    Authorization: 'Bearer <your-jwt-token>',
  },
  onConnect: () => {
    console.log('Connected');

    // Listen to all entity changes
    client.subscribe('/topic/entities', (message) => {
      const event = JSON.parse(message.body);
      console.log(`${event.action} on ${event.resourceName}:`, event.entityId);
    });
  },
});

client.activate();

That's it. When any entity is created, updated, or deleted via the API, you receive it in real-time.


Connection

Endpoints

Endpoint Transport Use Case
/ws WebSocket + SockJS General purpose
/ws/chat WebSocket + SockJS Chat features
/ws-raw Raw WebSocket (no SockJS) Native WS clients
import { Client } from '@stomp/stompjs';
import SockJS from 'sockjs-client';

const client = new Client({
  webSocketFactory: () => new SockJS('http://localhost:1337/ws'),
});

Raw WebSocket (mobile/native clients)

const client = new Client({
  brokerURL: 'ws://localhost:1337/ws-raw',
});

Authentication

The server authenticates STOMP connections via JWT. Pass the token in the CONNECT headers:

const client = new Client({
  webSocketFactory: () => new SockJS('http://localhost:1337/ws'),
  connectHeaders: {
    Authorization: 'Bearer <jwt-token>',
  },
});

The token is validated by ChatWebSocketAuthHandler on the STOMP CONNECT frame. If the token is missing or invalid, the connection is allowed anonymously (no user-specific queues will work).


Entity Changes (Real-Time CRUD)

When any entity (inspection, intervention, unit, component, failure, location, work order, etc.) is created, updated, or deleted via BaseApiController, a real-time event is broadcast to WebSocket.

No configuration needed. This works automatically for every entity type managed by BaseApiController.

STOMP Destinations

Destination Description
/topic/entities All entity changes (firehose)
/topic/entities/{resourceName} Changes for a specific resource only

The {resourceName} matches the API resource name (plural): inspections, interventions, units, components, failures, locations, work_orders, gate_systems, etc.

Payload Format

{
  "resourceName": "inspections",
  "action": "created",
  "entityId": 42,
  "timestamp": "2026-02-17T14:30:00.123",
  "data": {
    "id": 42,
    "titleOfInspection": "Annual Turbine Check",
    "status": "PENDING",
    "...": "full entity object"
  }
}
Field Type Description
resourceName string API resource name (plural) — e.g. inspections, units
action string "created", "updated", or "deleted"
entityId number The entity ID
timestamp string ISO timestamp of the event
data object|null Full entity object on create/update. null on delete.

Example: Subscribe to All Entity Changes

client.subscribe('/topic/entities', (message) => {
  const event = JSON.parse(message.body);

  switch (event.action) {
    case 'created':
      console.log(`New ${event.resourceName} created:`, event.data);
      break;
    case 'updated':
      console.log(`${event.resourceName} #${event.entityId} updated:`, event.data);
      break;
    case 'deleted':
      console.log(`${event.resourceName} #${event.entityId} deleted`);
      break;
  }
});

Example: Subscribe to a Specific Entity Type

// Only inspection changes
client.subscribe('/topic/entities/inspections', (message) => {
  const event = JSON.parse(message.body);
  console.log('Inspection', event.action, event.data);
});

// Only unit changes
client.subscribe('/topic/entities/units', (message) => {
  const event = JSON.parse(message.body);
  console.log('Unit', event.action, event.data);
});

Example: Keep a Local List in Sync

let inspections: any[] = [];

// Load initial data from REST API
fetch('/api/inspections').then(r => r.json()).then(data => {
  inspections = data.data;
});

// Keep it in sync via WebSocket
client.subscribe('/topic/entities/inspections', (message) => {
  const event = JSON.parse(message.body);

  switch (event.action) {
    case 'created':
      inspections.push(event.data);
      break;

    case 'updated':
      const idx = inspections.findIndex(i => i.id === event.entityId);
      if (idx >= 0) inspections[idx] = event.data;
      break;

    case 'deleted':
      inspections = inspections.filter(i => i.id !== event.entityId);
      break;
  }
});

Dynamic Queue Messages

When a message is consumed from a RabbitMQ dynamic queue and successfully processed by its handler, it is broadcast to WebSocket. This lets frontend clients receive queue messages without connecting to RabbitMQ directly.

Prerequisites: The queue must exist in RabbitMQ and have an active listener. Create queues via POST /api/admin/queues.

STOMP Destinations

Destination Description
/topic/queues All queue messages (firehose)
/topic/queues/{queueName} Messages from a specific queue only

The {queueName} is the queue name as defined in QueueConfiguration.name — e.g. tgm_iot, tgm_notifications, tgm_erp_sync, or any custom queue you create.

Payload Format

Queue-specific (/topic/queues/{queueName}):

{
  "queueName": "tgm_iot",
  "handlerType": "iot",
  "routingKey": "tgm.iot",
  "timestamp": "2026-02-17T14:30:00.456",
  "payload": {
    "sensorId": 123,
    "value": 75.5,
    "alertLevel": "warning"
  }
}

Firehose (/topic/queues):

{
  "queueName": "tgm_iot",
  "data": {
    "queueName": "tgm_iot",
    "handlerType": "iot",
    "routingKey": "tgm.iot",
    "timestamp": "2026-02-17T14:30:00.456",
    "payload": { "..." }
  }
}
Field Type Description
queueName string The RabbitMQ queue name
handlerType string Handler that processed it: generic, iot, notification, erp, webhook
routingKey string The RabbitMQ routing key the message was published with
timestamp string ISO timestamp of when it was broadcast
payload object|null The original message payload from RabbitMQ (JSON)

Example: Subscribe to a Specific Queue

// Listen to IoT queue messages
client.subscribe('/topic/queues/tgm_iot', (message) => {
  const event = JSON.parse(message.body);
  console.log('IoT message:', event.payload);
});

// Listen to ERP sync messages
client.subscribe('/topic/queues/tgm_erp_sync', (message) => {
  const event = JSON.parse(message.body);
  console.log('ERP sync:', event.payload);
});

Example: Subscribe to All Queue Messages

client.subscribe('/topic/queues', (message) => {
  const event = JSON.parse(message.body);
  console.log(`Message from queue "${event.queueName}":`, event.data);
});

Example: Custom Queue (Created at Runtime)

// 1. Create a custom queue via the admin API
const response = await fetch('/api/admin/queues', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer <token>',
  },
  body: JSON.stringify({
    name: 'my_alerts_queue',
    displayName: 'My Custom Alerts',
    routingKey: 'tgm.alerts.critical',
    handlerType: 'generic',
    isEnabled: true,
    autoStart: true,
  }),
});

// 2. Subscribe to it immediately — no code changes, no restart
client.subscribe('/topic/queues/my_alerts_queue', (message) => {
  const event = JSON.parse(message.body);
  console.log('Custom alert:', event.payload);
});

// 3. Any message published to RabbitMQ with routing key "tgm.alerts.critical"
//    will now appear here in real-time.

IoT Events

IoT events from the static RabbitMQListener (not dynamic queues) are broadcast here.

STOMP Destination

Destination Description
/topic/iot-event All IoT events

Example

client.subscribe('/topic/iot-event', (message) => {
  const event = JSON.parse(message.body);
  console.log(`Sensor ${event.sensorId}: ${event.value} (${event.alertLevel})`);
});

Notifications

User-specific notifications. Requires authentication — the server routes based on user ID.

STOMP Destination

Destination Description
/user/queue/notifications Notifications for the authenticated user

Example

client.subscribe('/user/queue/notifications', (message) => {
  const notification = JSON.parse(message.body);
  console.log(`[${notification.title}] ${notification.message}`);
  showToast(notification);
});

Calendar Updates

User-specific calendar events (e.g. when an inspection or intervention is scheduled).

STOMP Destination

Destination Description
/user/queue/calendar Calendar updates for the authenticated user

Payload Format

{
  "action": "created",
  "entityType": "inspection",
  "entityId": 42,
  "timestamp": "2026-02-17T14:30:00"
}

Example

client.subscribe('/user/queue/calendar', (message) => {
  const event = JSON.parse(message.body);
  console.log(`Calendar: ${event.action} ${event.entityType} #${event.entityId}`);
  refreshCalendar();
});

All STOMP Destinations Reference

Broadcast Topics (anyone can subscribe)

Destination Source Description
/topic/entities WebSocketEntityBroadcastListener All entity CRUD events
/topic/entities/{resourceName} WebSocketEntityBroadcastListener CRUD events for one resource type
/topic/queues DynamicQueueService All dynamic queue messages
/topic/queues/{queueName} DynamicQueueService Messages from one queue
/topic/iot-event RabbitMQListener IoT sensor events
/topic/{entityType} RabbitMQListener (generic) Legacy entity updates via RabbitMQ
/topic/reservoir/{id}/sensors ReservoirWebSocketHandler Reservoir sensor data stream
/topic/reservoir/{id}/status ReservoirWebSocketHandler Reservoir status/alerts

User-Specific Queues (requires authentication)

Destination Source Description
/user/queue/notifications WebSocketService User notifications
/user/queue/calendar WebSocketService Calendar updates
/user/queue/chat/messages ChatWebSocketHandler Incoming chat messages
/user/queue/chat/typing ChatWebSocketHandler Typing indicators
/user/queue/chat/presence ChatWebSocketHandler Presence updates
/user/queue/chat/receipts ChatWebSocketHandler Read receipts
/user/queue/chat/reactions ChatWebSocketHandler Reaction updates
/user/queue/video/incoming-call VideoCallWebSocketHandler Incoming video call
/user/queue/video/call-answered VideoCallWebSocketHandler Call answered
/user/queue/video/call-ended VideoCallWebSocketHandler Call ended

Application Endpoints (send messages)

Destination Handler Description
/app/chat.send ChatWebSocketHandler Send chat message
/app/chat.typing ChatWebSocketHandler Send typing indicator
/app/chat.markRead ChatWebSocketHandler Mark messages as read
/app/chat.reaction ChatWebSocketHandler Add/remove reaction
/app/reservoir.subscribe ReservoirWebSocketHandler Subscribe to reservoir data
/app/video.call-init VideoCallWebSocketHandler Initiate video call
/app/video.call-answer VideoCallWebSocketHandler Answer video call
/app/video.call-end VideoCallWebSocketHandler End video call

Angular Integration (RxStomp)

Service

import { Injectable, OnDestroy } from '@angular/core';
import { RxStomp, RxStompConfig } from '@stomp/rx-stomp';
import { Observable, BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';
import SockJS from 'sockjs-client';

export interface EntityChangeEvent<T = any> {
  resourceName: string;
  action: 'created' | 'updated' | 'deleted';
  entityId: number;
  timestamp: string;
  data: T | null;
}

export interface QueueMessageEvent<T = any> {
  queueName: string;
  handlerType: string;
  routingKey: string;
  timestamp: string;
  payload: T | null;
}

@Injectable({ providedIn: 'root' })
export class RealtimeService implements OnDestroy {
  private rxStomp = new RxStomp();
  private connected$ = new BehaviorSubject<boolean>(false);

  /** Connect to the WebSocket server. Call once at app startup. */
  connect(token: string, serverUrl = 'http://localhost:1337'): void {
    const config: RxStompConfig = {
      webSocketFactory: () => new SockJS(`${serverUrl}/ws`),
      connectHeaders: { Authorization: `Bearer ${token}` },
      heartbeatIncoming: 10000,
      heartbeatOutgoing: 10000,
      reconnectDelay: 5000,
    };

    this.rxStomp.configure(config);
    this.rxStomp.activate();
    this.rxStomp.connected$.subscribe(() => this.connected$.next(true));
    this.rxStomp.stompErrors$.subscribe(() => this.connected$.next(false));
  }

  /** Disconnect from the WebSocket server. */
  disconnect(): void {
    this.rxStomp.deactivate();
    this.connected$.next(false);
  }

  /** Observable of connection status. */
  isConnected(): Observable<boolean> {
    return this.connected$.asObservable();
  }

  // ─── Entity Changes ──────────────────────────────────────────────

  /** Subscribe to ALL entity changes. */
  allEntityChanges(): Observable<EntityChangeEvent> {
    return this.rxStomp.watch('/topic/entities').pipe(
      map(msg => JSON.parse(msg.body))
    );
  }

  /** Subscribe to changes for a specific resource type. */
  entityChanges<T = any>(resourceName: string): Observable<EntityChangeEvent<T>> {
    return this.rxStomp.watch(`/topic/entities/${resourceName}`).pipe(
      map(msg => JSON.parse(msg.body))
    );
  }

  // ─── Dynamic Queues ──────────────────────────────────────────────

  /** Subscribe to ALL dynamic queue messages. */
  allQueueMessages(): Observable<{ queueName: string; data: QueueMessageEvent }> {
    return this.rxStomp.watch('/topic/queues').pipe(
      map(msg => JSON.parse(msg.body))
    );
  }

  /** Subscribe to messages from a specific queue. */
  queueMessages<T = any>(queueName: string): Observable<QueueMessageEvent<T>> {
    return this.rxStomp.watch(`/topic/queues/${queueName}`).pipe(
      map(msg => JSON.parse(msg.body))
    );
  }

  // ─── IoT ─────────────────────────────────────────────────────────

  /** Subscribe to IoT sensor events. */
  iotEvents(): Observable<any> {
    return this.rxStomp.watch('/topic/iot-event').pipe(
      map(msg => JSON.parse(msg.body))
    );
  }

  // ─── User-specific ───────────────────────────────────────────────

  /** Subscribe to notifications for the authenticated user. */
  notifications(): Observable<any> {
    return this.rxStomp.watch('/user/queue/notifications').pipe(
      map(msg => JSON.parse(msg.body))
    );
  }

  /** Subscribe to calendar updates for the authenticated user. */
  calendarUpdates(): Observable<any> {
    return this.rxStomp.watch('/user/queue/calendar').pipe(
      map(msg => JSON.parse(msg.body))
    );
  }

  ngOnDestroy(): void {
    this.disconnect();
  }
}

Usage in a Component

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { RealtimeService, EntityChangeEvent } from './realtime.service';

@Component({
  selector: 'app-inspection-list',
  template: `
    <div *ngFor="let insp of inspections" [class.flash]="flashIds.has(insp.id)">
      {{ insp.titleOfInspection }} — {{ insp.status }}
    </div>
  `,
})
export class InspectionListComponent implements OnInit, OnDestroy {
  inspections: any[] = [];
  flashIds = new Set<number>();
  private destroy$ = new Subject<void>();

  constructor(
    private realtimeService: RealtimeService,
    private http: HttpClient,
  ) {}

  ngOnInit(): void {
    // 1. Load initial data
    this.http.get<any>('/api/inspections').subscribe(res => {
      this.inspections = res.data;
    });

    // 2. Subscribe to real-time inspection changes
    this.realtimeService
      .entityChanges('inspections')
      .pipe(takeUntil(this.destroy$))
      .subscribe(event => this.handleChange(event));
  }

  private handleChange(event: EntityChangeEvent): void {
    switch (event.action) {
      case 'created':
        this.inspections.unshift(event.data);
        this.flash(event.entityId);
        break;
      case 'updated':
        const i = this.inspections.findIndex(x => x.id === event.entityId);
        if (i >= 0) this.inspections[i] = event.data;
        this.flash(event.entityId);
        break;
      case 'deleted':
        this.inspections = this.inspections.filter(x => x.id !== event.entityId);
        break;
    }
  }

  private flash(id: number): void {
    this.flashIds.add(id);
    setTimeout(() => this.flashIds.delete(id), 2000);
  }

  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

Usage: Queue Monitor Component

@Component({
  selector: 'app-queue-monitor',
  template: `
    <h3>Live Queue Messages</h3>
    <div *ngFor="let msg of messages">
      [{{ msg.queueName }}] {{ msg.handlerType }} — {{ msg.routingKey }}
      <pre>{{ msg.payload | json }}</pre>
    </div>
  `,
})
export class QueueMonitorComponent implements OnInit, OnDestroy {
  messages: any[] = [];
  private destroy$ = new Subject<void>();

  constructor(private realtimeService: RealtimeService) {}

  ngOnInit(): void {
    // Listen to ALL dynamic queue messages
    this.realtimeService
      .allQueueMessages()
      .pipe(takeUntil(this.destroy$))
      .subscribe(event => {
        this.messages.unshift(event.data);
        if (this.messages.length > 100) this.messages.pop();
      });
  }

  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

Usage: Listen to a Custom Queue Created at Runtime

ngOnInit(): void {
  // This queue was created via POST /api/admin/queues with name "my_alerts_queue".
  // No code changes were needed on the backend. Just subscribe:
  this.realtimeService
    .queueMessages('my_alerts_queue')
    .pipe(takeUntil(this.destroy$))
    .subscribe(event => {
      console.log('Alert payload:', event.payload);
    });
}

React Integration (@stomp/stompjs)

Hook

// useRealtime.ts
import { useEffect, useRef, useCallback, useState } from 'react';
import { Client, IMessage } from '@stomp/stompjs';
import SockJS from 'sockjs-client';

interface EntityChangeEvent<T = any> {
  resourceName: string;
  action: 'created' | 'updated' | 'deleted';
  entityId: number;
  timestamp: string;
  data: T | null;
}

interface QueueMessageEvent<T = any> {
  queueName: string;
  handlerType: string;
  routingKey: string;
  timestamp: string;
  payload: T | null;
}

export function useStompClient(serverUrl: string, token: string) {
  const clientRef = useRef<Client | null>(null);
  const [connected, setConnected] = useState(false);

  useEffect(() => {
    const client = new Client({
      webSocketFactory: () => new SockJS(`${serverUrl}/ws`),
      connectHeaders: { Authorization: `Bearer ${token}` },
      reconnectDelay: 5000,
      heartbeatIncoming: 10000,
      heartbeatOutgoing: 10000,
      onConnect: () => setConnected(true),
      onDisconnect: () => setConnected(false),
      onStompError: () => setConnected(false),
    });

    client.activate();
    clientRef.current = client;

    return () => {
      client.deactivate();
    };
  }, [serverUrl, token]);

  const subscribe = useCallback(
    (destination: string, callback: (body: any) => void) => {
      const client = clientRef.current;
      if (!client) return () => {};

      // If already connected, subscribe now. Otherwise wait.
      let sub: any = null;

      const doSubscribe = () => {
        sub = client.subscribe(destination, (message: IMessage) => {
          callback(JSON.parse(message.body));
        });
      };

      if (client.connected) {
        doSubscribe();
      } else {
        const origOnConnect = client.onConnect;
        client.onConnect = (frame) => {
          origOnConnect?.(frame);
          doSubscribe();
        };
      }

      return () => sub?.unsubscribe();
    },
    [],
  );

  return { connected, subscribe };
}

// ─── Convenience hooks ─────────────────────────────────────────────

export function useEntityChanges<T = any>(
  client: ReturnType<typeof useStompClient>,
  resourceName: string,
  onEvent: (event: EntityChangeEvent<T>) => void,
) {
  useEffect(() => {
    return client.subscribe(`/topic/entities/${resourceName}`, onEvent);
  }, [client, resourceName, onEvent]);
}

export function useAllEntityChanges(
  client: ReturnType<typeof useStompClient>,
  onEvent: (event: EntityChangeEvent) => void,
) {
  useEffect(() => {
    return client.subscribe('/topic/entities', onEvent);
  }, [client, onEvent]);
}

export function useQueueMessages<T = any>(
  client: ReturnType<typeof useStompClient>,
  queueName: string,
  onEvent: (event: QueueMessageEvent<T>) => void,
) {
  useEffect(() => {
    return client.subscribe(`/topic/queues/${queueName}`, onEvent);
  }, [client, queueName, onEvent]);
}

Usage in a React Component

function InspectionList() {
  const [inspections, setInspections] = useState<any[]>([]);
  const stomp = useStompClient('http://localhost:1337', authToken);

  // Load initial data
  useEffect(() => {
    fetch('/api/inspections')
      .then(r => r.json())
      .then(res => setInspections(res.data));
  }, []);

  // Subscribe to real-time inspection changes
  useEntityChanges(stomp, 'inspections', useCallback((event) => {
    setInspections(prev => {
      switch (event.action) {
        case 'created':
          return [event.data, ...prev];
        case 'updated':
          return prev.map(i => i.id === event.entityId ? event.data : i);
        case 'deleted':
          return prev.filter(i => i.id !== event.entityId);
        default:
          return prev;
      }
    });
  }, []));

  return (
    <ul>
      {inspections.map(i => (
        <li key={i.id}>{i.titleOfInspection} — {i.status}</li>
      ))}
    </ul>
  );
}

Usage: Queue Monitor

function QueueMonitor({ queueName }: { queueName: string }) {
  const [messages, setMessages] = useState<any[]>([]);
  const stomp = useStompClient('http://localhost:1337', authToken);

  useQueueMessages(stomp, queueName, useCallback((event) => {
    setMessages(prev => [event, ...prev].slice(0, 100));
  }, []));

  return (
    <div>
      <h3>Messages from: {queueName}</h3>
      {messages.map((msg, i) => (
        <pre key={i}>{JSON.stringify(msg.payload, null, 2)}</pre>
      ))}
    </div>
  );
}

// Usage: <QueueMonitor queueName="tgm_iot" />
// Or for a custom queue: <QueueMonitor queueName="my_alerts_queue" />

Error Handling & Reconnection

The @stomp/stompjs client handles reconnection automatically via reconnectDelay.

const client = new Client({
  // ...
  reconnectDelay: 5000,           // Retry every 5 seconds
  heartbeatIncoming: 10000,       // Server pings every 10s
  heartbeatOutgoing: 10000,       // Client pings every 10s
  onStompError: (frame) => {
    console.error('STOMP error:', frame.headers['message']);
  },
  onWebSocketClose: (event) => {
    console.warn('WebSocket closed, will reconnect...');
  },
});

Subscriptions survive reconnection — the STOMP client re-subscribes automatically after reconnect.


Troubleshooting

"WebSocket connection failed"

  • Check the server is running on the expected port
  • Check CORS: the app.cors.allowed-origins property must include your frontend origin
  • If using SockJS, ensure /ws endpoint is accessible (try curl http://localhost:1337/ws/info)

"Not receiving entity change events"

  • Entity changes are only broadcast after the transaction commits (AFTER_COMMIT). If the transaction rolls back, no event is sent.
  • The broadcast is async — there may be a small delay (milliseconds).
  • Check the browser console for STOMP connection errors.
  • Verify you're subscribing to the correct destination: /topic/entities/inspections (plural, matching the API resource name).

"Not receiving queue messages"

  • The queue must exist in RabbitMQ. Create it via POST /api/admin/queues.
  • The queue listener must be running. Start it via POST /api/admin/queues/{id}/start.
  • Messages are only broadcast on successful handler processing. Check queue message history via GET /api/admin/queues/{id}/history.
  • Verify the routing key matches what the publisher uses.

"Not receiving notifications"

  • Notifications are user-specific (/user/queue/notifications). You must be authenticated via JWT.
  • The token must be valid and contain the correct user ID.

"Messages are duplicated"

  • If you subscribe to both /topic/entities (firehose) and /topic/entities/inspections (specific), you will receive inspection events twice. Choose one.
  • Same applies to /topic/queues vs /topic/queues/{queueName}.