TGM Expert uses RabbitMQ as a message broker for real-time event distribution, asynchronous processing, and decoupled communication between services. It enables features like real-time notifications, IoT event streaming, and WebSocket event broadcasting.

Table of Contents


Overview

Key Features

  • Real-Time Events - Instant notification delivery via WebSocket
  • Asynchronous Processing - Offload heavy tasks from request threads
  • Multi-Tenant Isolation - Messages tagged with client/sandbox context
  • Event Broadcasting - Distribute events to multiple consumers
  • Reliable Delivery - Message persistence and acknowledgment
  • Topic Routing - Flexible routing with topic exchanges

Use Cases in TGM Expert

Use Case Routing Key Description
IoT Events tgm.iot Real-time sensor readings
Notifications tgm.notification User notifications
Entity Updates tgm.{entityType} CRUD event broadcasts
Work Orders tgm.workorder Work order status changes
Alerts tgm.alert Sensor threshold alerts

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                   RabbitMQ Message Flow                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Publishers                                                     │
│   ┌───────────┐  ┌───────────┐  ┌───────────┐                   │
│   │IoTService │  │UserService│  │AlertService│                  │
│   └─────┬─────┘  └─────┬─────┘  └─────┬─────┘                   │
│         │              │              │                          │
│         └──────────────┼──────────────┘                          │
│                        ▼                                         │
│   ┌─────────────────────────────────────────────────────┐       │
│   │              RabbitMQPublisher                       │       │
│   │    (Adds tenant context: clientId, sandbox)          │       │
│   └─────────────────────────┬───────────────────────────┘       │
│                             │                                    │
│                             ▼                                    │
│   ┌─────────────────────────────────────────────────────┐       │
│   │                    RabbitMQ                          │       │
│   │  ┌─────────────────────────────────────────────┐    │       │
│   │  │           tgm_exchange (topic)               │    │       │
│   │  └─────────────────────────────────────────────┘    │       │
│   │         │              │              │              │       │
│   │         ▼              ▼              ▼              │       │
│   │  ┌──────────┐  ┌──────────┐  ┌──────────┐          │       │
│   │  │tgm_queue │  │iot_queue │  │alert_queue│          │       │
│   │  └──────────┘  └──────────┘  └──────────┘          │       │
│   └─────────────────────────────────────────────────────┘       │
│                             │                                    │
│                             ▼                                    │
│   ┌─────────────────────────────────────────────────────┐       │
│   │              RabbitMQListener                        │       │
│   │    (Sets tenant context from message)                │       │
│   └─────────────────────────┬───────────────────────────┘       │
│                             │                                    │
│         ┌───────────────────┼───────────────────┐                │
│         ▼                   ▼                   ▼                │
│   ┌───────────┐      ┌───────────┐      ┌───────────┐           │
│   │WebSocket  │      │Notification│      │Email      │           │
│   │Service    │      │Service     │      │Service    │           │
│   └───────────┘      └───────────┘      └───────────┘           │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Configuration

Environment Variables

# Enable RabbitMQ
RABBIT_ENABLED=true

# Connection settings
RABBIT_HOST=localhost
RABBIT_PORT=5672
RABBIT_USER=guest
RABBIT_PASS=guest

# Queue configuration
RABBIT_QUEUE=tgm_queue
RABBIT_EXCHANGE=tgm_exchange

application.yml

spring:
  rabbitmq:
    enabled: ${RABBIT_ENABLED:false}
    host: ${RABBIT_HOST:localhost}
    port: ${RABBIT_PORT:5672}
    username: ${RABBIT_USER:guest}
    password: ${RABBIT_PASS:guest}
    virtual-host: /

app:
  queue:
    name: ${RABBIT_QUEUE:tgm_queue}
    exchange: ${RABBIT_EXCHANGE:tgm_exchange}

RabbitMQ Configuration Class

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.enabled", havingValue = "true")
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "tgm_exchange";
    public static final String QUEUE_NAME = "tgm_queue";
    public static final String IOT_QUEUE = "tgm_iot_queue";
    public static final String NOTIFICATION_QUEUE = "tgm_notification_queue";

    public static final String IOT_ROUTING_KEY = "tgm.iot";
    public static final String NOTIFICATION_ROUTING_KEY = "tgm.notification";

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public Queue mainQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    @Bean
    public Queue iotQueue() {
        return QueueBuilder.durable(IOT_QUEUE).build();
    }

    @Bean
    public Queue notificationQueue() {
        return QueueBuilder.durable(NOTIFICATION_QUEUE).build();
    }

    @Bean
    public Binding mainBinding(Queue mainQueue, TopicExchange exchange) {
        return BindingBuilder.bind(mainQueue).to(exchange).with("tgm.#");
    }

    @Bean
    public Binding iotBinding(Queue iotQueue, TopicExchange exchange) {
        return BindingBuilder.bind(iotQueue).to(exchange).with(IOT_ROUTING_KEY);
    }

    @Bean
    public Binding notificationBinding(Queue notificationQueue, TopicExchange exchange) {
        return BindingBuilder.bind(notificationQueue).to(exchange).with(NOTIFICATION_ROUTING_KEY);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

Message Types

Standard Message Structure

All messages include tenant context for multi-tenant isolation:

{
  "type": "notification",
  "clientId": "client123",
  "sandbox": "sandbox_dev",
  "timestamp": "2026-02-05T10:30:00Z",
  "payload": {
    // Type-specific data
  }
}

IoT Event Message

{
  "type": "iot",
  "clientId": "client123",
  "sandbox": "public",
  "sensorId": 456,
  "unitId": 789,
  "value": 75.5,
  "timestamp": "2026-02-05T10:30:00Z",
  "alertLevel": "warning"
}

Notification Message

{
  "type": "notification",
  "clientId": "client123",
  "sandbox": "public",
  "userId": 123,
  "title": "New Work Order Assigned",
  "message": "Work order #WO-2026-001 has been assigned to you",
  "link": "/work-orders/1",
  "priority": "high"
}

Entity Update Message

{
  "type": "entity",
  "clientId": "client123",
  "sandbox": "public",
  "entityType": "inspection",
  "action": "created",
  "entityId": 456,
  "data": {
    "id": 456,
    "title": "Monthly Inspection",
    "status": "scheduled"
  }
}

Multi-Tenancy

Publisher: Adding Tenant Context

All messages automatically include the current tenant context:

@Component
@ConditionalOnProperty(name = "spring.rabbitmq.enabled", havingValue = "true")
@RequiredArgsConstructor
public class RabbitMQPublisher {

    private final RabbitTemplate rabbitTemplate;

    /**
     * Add multi-tenancy context to a message.
     */
    private Map<String, Object> addTenantContext(Map<String, Object> message) {
        Map<String, Object> enrichedMessage = new HashMap<>(message);
        enrichedMessage.put("clientId", ClientContext.getClient());
        enrichedMessage.put("sandbox", TenantContext.getTenant());
        enrichedMessage.put("timestamp", Instant.now().toString());
        return enrichedMessage;
    }

    public void sendIotEvent(Map<String, Object> event) {
        Map<String, Object> enrichedEvent = addTenantContext(event);
        enrichedEvent.put("type", "iot");
        sendMessage(RabbitMQConfig.IOT_ROUTING_KEY, enrichedEvent);
    }

    public void sendNotification(Long userId, String title, String message) {
        Map<String, Object> notification = new HashMap<>();
        notification.put("type", "notification");
        notification.put("userId", userId);
        notification.put("title", title);
        notification.put("message", message);
        notification.put("clientId", ClientContext.getClient());
        notification.put("sandbox", TenantContext.getTenant());
        sendMessage(RabbitMQConfig.NOTIFICATION_ROUTING_KEY, notification);
    }

    public void sendEntityUpdate(String entityType, String action, Object data) {
        Map<String, Object> update = new HashMap<>();
        update.put("type", "entity");
        update.put("entityType", entityType);
        update.put("action", action);
        update.put("data", data);
        update.put("clientId", ClientContext.getClient());
        update.put("sandbox", TenantContext.getTenant());
        sendMessage("tgm." + entityType, update);
    }

    private void sendMessage(String routingKey, Object message) {
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.EXCHANGE_NAME,
            routingKey,
            message
        );
    }
}

Listener: Restoring Tenant Context

The listener restores tenant context before processing:

@Component
@ConditionalOnProperty(name = "spring.rabbitmq.enabled", havingValue = "true")
@RequiredArgsConstructor
public class RabbitMQListener {

    private final WebSocketService webSocketService;

    @RabbitListener(queues = "${app.queue.name:tgm_queue}")
    public void receiveMessage(Map<String, Object> message) {
        try {
            // Set multi-tenancy context from message
            setTenantContext(message);

            String type = (String) message.getOrDefault("type", "unknown");

            switch (type) {
                case "iot" -> handleIotMessage(message);
                case "notification" -> handleNotificationMessage(message);
                default -> handleGenericMessage(message);
            }
        } finally {
            // Clear context after processing
            clearTenantContext();
        }
    }

    private void setTenantContext(Map<String, Object> message) {
        Object clientId = message.get("clientId");
        Object sandbox = message.get("sandbox");

        if (clientId != null) {
            ClientContext.setClient(clientId.toString());
        }
        if (sandbox != null) {
            TenantContext.setTenant(sandbox.toString());
        }
    }

    private void clearTenantContext() {
        ClientContext.clear();
        TenantContext.clear();
    }

    private void handleIotMessage(Map<String, Object> message) {
        webSocketService.sendIotEvent(message);
    }

    private void handleNotificationMessage(Map<String, Object> message) {
        Object userIdObj = message.get("userId");
        if (userIdObj != null) {
            Long userId = userIdObj instanceof Long ? (Long) userIdObj :
                    Long.parseLong(userIdObj.toString());
            webSocketService.sendNotification(userId, message);
        }
    }

    private void handleGenericMessage(Map<String, Object> message) {
        String entityType = (String) message.getOrDefault("entityType", "generic");
        String action = (String) message.getOrDefault("action", "update");
        webSocketService.sendEntityUpdate(entityType, action, message.get("data"));
    }
}

Publishing Messages

From Services

@Service
@RequiredArgsConstructor
public class InspectionService {

    private final RabbitMQPublisher rabbitMQPublisher;

    public Inspection createInspection(InspectionRequest request) {
        Inspection inspection = // ... create inspection

        // Publish event for real-time updates
        rabbitMQPublisher.sendEntityUpdate(
            "inspection",
            "created",
            InspectionDto.fromEntity(inspection)
        );

        return inspection;
    }

    public void completeInspection(Long inspectionId) {
        Inspection inspection = // ... complete inspection

        // Notify assigned users
        rabbitMQPublisher.sendNotification(
            inspection.getInspector().getId(),
            "Inspection Completed",
            "Inspection #" + inspection.getId() + " has been completed"
        );

        // Broadcast update
        rabbitMQPublisher.sendEntityUpdate(
            "inspection",
            "completed",
            InspectionDto.fromEntity(inspection)
        );
    }
}

IoT Events

@Service
@RequiredArgsConstructor
public class IoTService {

    private final RabbitMQPublisher rabbitMQPublisher;

    public void processSensorReading(Long sensorId, Double value) {
        Map<String, Object> event = new HashMap<>();
        event.put("sensorId", sensorId);
        event.put("value", value);
        event.put("timestamp", Instant.now().toString());

        // Check thresholds
        Sensor sensor = sensorRepository.findById(sensorId).orElseThrow();
        if (value > sensor.getCriticalMax() || value < sensor.getCriticalMin()) {
            event.put("alertLevel", "critical");
        } else if (value > sensor.getWarningMax() || value < sensor.getWarningMin()) {
            event.put("alertLevel", "warning");
        } else {
            event.put("alertLevel", "normal");
        }

        rabbitMQPublisher.sendIotEvent(event);
    }
}

WebSocket Integration

WebSocket Service

@Service
@RequiredArgsConstructor
public class WebSocketService {

    private final SimpMessagingTemplate messagingTemplate;

    public void sendIotEvent(Map<String, Object> event) {
        String clientId = (String) event.get("clientId");
        String sandbox = (String) event.get("sandbox");

        // Send to client-specific topic
        String destination = String.format("/topic/%s/%s/iot",
            clientId != null ? clientId : "default",
            sandbox != null ? sandbox : "public"
        );

        messagingTemplate.convertAndSend(destination, event);
    }

    public void sendNotification(Long userId, Map<String, Object> notification) {
        String clientId = (String) notification.get("clientId");

        // Send to user-specific queue
        String destination = String.format("/queue/%s/user/%d/notifications",
            clientId != null ? clientId : "default",
            userId
        );

        messagingTemplate.convertAndSend(destination, notification);
    }

    public void sendEntityUpdate(String entityType, String action, Object data) {
        String clientId = ClientContext.getClient();
        String sandbox = TenantContext.getTenant();

        Map<String, Object> message = new HashMap<>();
        message.put("entityType", entityType);
        message.put("action", action);
        message.put("data", data);
        message.put("timestamp", Instant.now().toString());

        String destination = String.format("/topic/%s/%s/entities/%s",
            clientId != null ? clientId : "default",
            sandbox != null ? sandbox : "public",
            entityType
        );

        messagingTemplate.convertAndSend(destination, message);
    }
}

Docker Setup

docker-compose.yml

services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: tgm-rabbitmq
    ports:
      - "5672:5672"   # AMQP
      - "15672:15672" # Management UI
    environment:
      - RABBITMQ_DEFAULT_USER=${RABBIT_USER:-guest}
      - RABBITMQ_DEFAULT_PASS=${RABBIT_PASS:-guest}
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "check_running"]
      interval: 30s
      timeout: 10s
      retries: 3

volumes:
  rabbitmq-data:

Starting RabbitMQ

# Start RabbitMQ
docker-compose up -d rabbitmq

# Access management UI
open http://localhost:15672
# Login: guest / guest

# Check status
docker exec tgm-rabbitmq rabbitmqctl status

Client SDK & Angular/React Integration

For comprehensive client-side documentation including: - Real-time entity CRUD subscriptions - Dynamic queue message subscriptions - IoT, notification, and calendar event subscriptions - Full STOMP destination reference - Angular (RxStomp) and React (@stomp/stompjs) examples - Error handling and troubleshooting

See: Real-Time WebSocket Client SDK

Quick Example (Entity Changes)

// Subscribe to all entity CRUD events — works for any entity type automatically
client.subscribe('/topic/entities', (message) => {
  const event = JSON.parse(message.body);
  // event = { resourceName, action, entityId, timestamp, data }
});

// Or subscribe to a specific resource type
client.subscribe('/topic/entities/inspections', callback);

Quick Example (Dynamic Queue Messages)

// Subscribe to a specific queue (must exist in RabbitMQ)
client.subscribe('/topic/queues/tgm_iot', (message) => {
  const event = JSON.parse(message.body);
  // event = { queueName, handlerType, routingKey, timestamp, payload }
});

// Or subscribe to all queue messages
client.subscribe('/topic/queues', callback);

Monitoring

Management UI

Access the RabbitMQ Management UI at http://localhost:15672:

  • Overview - Node status, message rates
  • Connections - Active client connections
  • Channels - Message channels
  • Exchanges - Exchange configuration
  • Queues - Queue depths, message rates

Health Check

# Check RabbitMQ health
curl http://localhost:15672/api/health/checks/alarms \
  -u guest:guest

# From application
GET /actuator/health

# Response includes RabbitMQ status
{
  "status": "UP",
  "components": {
    "rabbit": {
      "status": "UP",
      "details": {
        "version": "3.12.0"
      }
    }
  }
}

Queue Monitoring

# List queues with message counts
docker exec tgm-rabbitmq rabbitmqctl list_queues name messages consumers

# Queue details
docker exec tgm-rabbitmq rabbitmqctl list_queues name messages_ready messages_unacknowledged

Troubleshooting

Connection Issues

# Check if RabbitMQ is running
docker ps | grep rabbitmq

# Check logs
docker logs tgm-rabbitmq

# Test connection
docker exec tgm-rabbitmq rabbitmq-diagnostics check_port_connectivity

Messages Not Delivered

// Enable debug logging
logging.level.org.springframework.amqp=DEBUG

// Check if exchange/queue exists
docker exec tgm-rabbitmq rabbitmqctl list_exchanges
docker exec tgm-rabbitmq rabbitmqctl list_queues

// Check bindings
docker exec tgm-rabbitmq rabbitmqctl list_bindings

Consumer Not Receiving

# Check consumer count
docker exec tgm-rabbitmq rabbitmqctl list_queues name consumers

# Check if consumer is connected
docker exec tgm-rabbitmq rabbitmqctl list_consumers

High Memory Usage

# Check memory
docker exec tgm-rabbitmq rabbitmqctl status | grep -A5 memory

# Set memory limit
docker exec tgm-rabbitmq rabbitmqctl set_vm_memory_high_watermark 0.4

Message Accumulation

# Purge a queue (development only)
docker exec tgm-rabbitmq rabbitmqctl purge_queue tgm_queue

# Delete a queue
docker exec tgm-rabbitmq rabbitmqctl delete_queue old_queue

Best Practices

  1. Always include tenant context - Use RabbitMQPublisher which automatically adds clientId and sandbox
  2. Use appropriate routing keys - Follow the tgm.{entityType} pattern
  3. Handle reconnection - RxStomp handles automatic reconnection
  4. Acknowledge messages - Spring AMQP handles this automatically
  5. Monitor queue depths - Set alerts for message accumulation
  6. Use dead letter queues - For failed message handling