Rabbitmq
TGM Expert uses RabbitMQ as a message broker for real-time event distribution, asynchronous processing, and decoupled communication between services. It enables features like real-time notifications, IoT event streaming, and WebSocket event broadcasting.
Table of Contents¶
- Overview
- Architecture
- Configuration
- Message Types
- Multi-Tenancy
- Publishing Messages
- Consuming Messages
- WebSocket Integration
- Docker Setup
- Angular Integration
- Monitoring
- Troubleshooting
Overview¶
Key Features¶
- Real-Time Events - Instant notification delivery via WebSocket
- Asynchronous Processing - Offload heavy tasks from request threads
- Multi-Tenant Isolation - Messages tagged with client/sandbox context
- Event Broadcasting - Distribute events to multiple consumers
- Reliable Delivery - Message persistence and acknowledgment
- Topic Routing - Flexible routing with topic exchanges
Use Cases in TGM Expert¶
| Use Case | Routing Key | Description |
|---|---|---|
| IoT Events | tgm.iot |
Real-time sensor readings |
| Notifications | tgm.notification |
User notifications |
| Entity Updates | tgm.{entityType} |
CRUD event broadcasts |
| Work Orders | tgm.workorder |
Work order status changes |
| Alerts | tgm.alert |
Sensor threshold alerts |
Architecture¶
┌─────────────────────────────────────────────────────────────────┐
│ RabbitMQ Message Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Publishers │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │IoTService │ │UserService│ │AlertService│ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ RabbitMQPublisher │ │
│ │ (Adds tenant context: clientId, sandbox) │ │
│ └─────────────────────────┬───────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ RabbitMQ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ tgm_exchange (topic) │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │tgm_queue │ │iot_queue │ │alert_queue│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ RabbitMQListener │ │
│ │ (Sets tenant context from message) │ │
│ └─────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │WebSocket │ │Notification│ │Email │ │
│ │Service │ │Service │ │Service │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Configuration¶
Environment Variables¶
# Enable RabbitMQ
RABBIT_ENABLED=true
# Connection settings
RABBIT_HOST=localhost
RABBIT_PORT=5672
RABBIT_USER=guest
RABBIT_PASS=guest
# Queue configuration
RABBIT_QUEUE=tgm_queue
RABBIT_EXCHANGE=tgm_exchange
application.yml¶
spring:
rabbitmq:
enabled: ${RABBIT_ENABLED:false}
host: ${RABBIT_HOST:localhost}
port: ${RABBIT_PORT:5672}
username: ${RABBIT_USER:guest}
password: ${RABBIT_PASS:guest}
virtual-host: /
app:
queue:
name: ${RABBIT_QUEUE:tgm_queue}
exchange: ${RABBIT_EXCHANGE:tgm_exchange}
RabbitMQ Configuration Class¶
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.enabled", havingValue = "true")
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "tgm_exchange";
public static final String QUEUE_NAME = "tgm_queue";
public static final String IOT_QUEUE = "tgm_iot_queue";
public static final String NOTIFICATION_QUEUE = "tgm_notification_queue";
public static final String IOT_ROUTING_KEY = "tgm.iot";
public static final String NOTIFICATION_ROUTING_KEY = "tgm.notification";
@Bean
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
public Queue mainQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Queue iotQueue() {
return QueueBuilder.durable(IOT_QUEUE).build();
}
@Bean
public Queue notificationQueue() {
return QueueBuilder.durable(NOTIFICATION_QUEUE).build();
}
@Bean
public Binding mainBinding(Queue mainQueue, TopicExchange exchange) {
return BindingBuilder.bind(mainQueue).to(exchange).with("tgm.#");
}
@Bean
public Binding iotBinding(Queue iotQueue, TopicExchange exchange) {
return BindingBuilder.bind(iotQueue).to(exchange).with(IOT_ROUTING_KEY);
}
@Bean
public Binding notificationBinding(Queue notificationQueue, TopicExchange exchange) {
return BindingBuilder.bind(notificationQueue).to(exchange).with(NOTIFICATION_ROUTING_KEY);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Message Types¶
Standard Message Structure¶
All messages include tenant context for multi-tenant isolation:
{
"type": "notification",
"clientId": "client123",
"sandbox": "sandbox_dev",
"timestamp": "2026-02-05T10:30:00Z",
"payload": {
// Type-specific data
}
}
IoT Event Message¶
{
"type": "iot",
"clientId": "client123",
"sandbox": "public",
"sensorId": 456,
"unitId": 789,
"value": 75.5,
"timestamp": "2026-02-05T10:30:00Z",
"alertLevel": "warning"
}
Notification Message¶
{
"type": "notification",
"clientId": "client123",
"sandbox": "public",
"userId": 123,
"title": "New Work Order Assigned",
"message": "Work order #WO-2026-001 has been assigned to you",
"link": "/work-orders/1",
"priority": "high"
}
Entity Update Message¶
{
"type": "entity",
"clientId": "client123",
"sandbox": "public",
"entityType": "inspection",
"action": "created",
"entityId": 456,
"data": {
"id": 456,
"title": "Monthly Inspection",
"status": "scheduled"
}
}
Multi-Tenancy¶
Publisher: Adding Tenant Context¶
All messages automatically include the current tenant context:
@Component
@ConditionalOnProperty(name = "spring.rabbitmq.enabled", havingValue = "true")
@RequiredArgsConstructor
public class RabbitMQPublisher {
private final RabbitTemplate rabbitTemplate;
/**
* Add multi-tenancy context to a message.
*/
private Map<String, Object> addTenantContext(Map<String, Object> message) {
Map<String, Object> enrichedMessage = new HashMap<>(message);
enrichedMessage.put("clientId", ClientContext.getClient());
enrichedMessage.put("sandbox", TenantContext.getTenant());
enrichedMessage.put("timestamp", Instant.now().toString());
return enrichedMessage;
}
public void sendIotEvent(Map<String, Object> event) {
Map<String, Object> enrichedEvent = addTenantContext(event);
enrichedEvent.put("type", "iot");
sendMessage(RabbitMQConfig.IOT_ROUTING_KEY, enrichedEvent);
}
public void sendNotification(Long userId, String title, String message) {
Map<String, Object> notification = new HashMap<>();
notification.put("type", "notification");
notification.put("userId", userId);
notification.put("title", title);
notification.put("message", message);
notification.put("clientId", ClientContext.getClient());
notification.put("sandbox", TenantContext.getTenant());
sendMessage(RabbitMQConfig.NOTIFICATION_ROUTING_KEY, notification);
}
public void sendEntityUpdate(String entityType, String action, Object data) {
Map<String, Object> update = new HashMap<>();
update.put("type", "entity");
update.put("entityType", entityType);
update.put("action", action);
update.put("data", data);
update.put("clientId", ClientContext.getClient());
update.put("sandbox", TenantContext.getTenant());
sendMessage("tgm." + entityType, update);
}
private void sendMessage(String routingKey, Object message) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.EXCHANGE_NAME,
routingKey,
message
);
}
}
Listener: Restoring Tenant Context¶
The listener restores tenant context before processing:
@Component
@ConditionalOnProperty(name = "spring.rabbitmq.enabled", havingValue = "true")
@RequiredArgsConstructor
public class RabbitMQListener {
private final WebSocketService webSocketService;
@RabbitListener(queues = "${app.queue.name:tgm_queue}")
public void receiveMessage(Map<String, Object> message) {
try {
// Set multi-tenancy context from message
setTenantContext(message);
String type = (String) message.getOrDefault("type", "unknown");
switch (type) {
case "iot" -> handleIotMessage(message);
case "notification" -> handleNotificationMessage(message);
default -> handleGenericMessage(message);
}
} finally {
// Clear context after processing
clearTenantContext();
}
}
private void setTenantContext(Map<String, Object> message) {
Object clientId = message.get("clientId");
Object sandbox = message.get("sandbox");
if (clientId != null) {
ClientContext.setClient(clientId.toString());
}
if (sandbox != null) {
TenantContext.setTenant(sandbox.toString());
}
}
private void clearTenantContext() {
ClientContext.clear();
TenantContext.clear();
}
private void handleIotMessage(Map<String, Object> message) {
webSocketService.sendIotEvent(message);
}
private void handleNotificationMessage(Map<String, Object> message) {
Object userIdObj = message.get("userId");
if (userIdObj != null) {
Long userId = userIdObj instanceof Long ? (Long) userIdObj :
Long.parseLong(userIdObj.toString());
webSocketService.sendNotification(userId, message);
}
}
private void handleGenericMessage(Map<String, Object> message) {
String entityType = (String) message.getOrDefault("entityType", "generic");
String action = (String) message.getOrDefault("action", "update");
webSocketService.sendEntityUpdate(entityType, action, message.get("data"));
}
}
Publishing Messages¶
From Services¶
@Service
@RequiredArgsConstructor
public class InspectionService {
private final RabbitMQPublisher rabbitMQPublisher;
public Inspection createInspection(InspectionRequest request) {
Inspection inspection = // ... create inspection
// Publish event for real-time updates
rabbitMQPublisher.sendEntityUpdate(
"inspection",
"created",
InspectionDto.fromEntity(inspection)
);
return inspection;
}
public void completeInspection(Long inspectionId) {
Inspection inspection = // ... complete inspection
// Notify assigned users
rabbitMQPublisher.sendNotification(
inspection.getInspector().getId(),
"Inspection Completed",
"Inspection #" + inspection.getId() + " has been completed"
);
// Broadcast update
rabbitMQPublisher.sendEntityUpdate(
"inspection",
"completed",
InspectionDto.fromEntity(inspection)
);
}
}
IoT Events¶
@Service
@RequiredArgsConstructor
public class IoTService {
private final RabbitMQPublisher rabbitMQPublisher;
public void processSensorReading(Long sensorId, Double value) {
Map<String, Object> event = new HashMap<>();
event.put("sensorId", sensorId);
event.put("value", value);
event.put("timestamp", Instant.now().toString());
// Check thresholds
Sensor sensor = sensorRepository.findById(sensorId).orElseThrow();
if (value > sensor.getCriticalMax() || value < sensor.getCriticalMin()) {
event.put("alertLevel", "critical");
} else if (value > sensor.getWarningMax() || value < sensor.getWarningMin()) {
event.put("alertLevel", "warning");
} else {
event.put("alertLevel", "normal");
}
rabbitMQPublisher.sendIotEvent(event);
}
}
WebSocket Integration¶
WebSocket Service¶
@Service
@RequiredArgsConstructor
public class WebSocketService {
private final SimpMessagingTemplate messagingTemplate;
public void sendIotEvent(Map<String, Object> event) {
String clientId = (String) event.get("clientId");
String sandbox = (String) event.get("sandbox");
// Send to client-specific topic
String destination = String.format("/topic/%s/%s/iot",
clientId != null ? clientId : "default",
sandbox != null ? sandbox : "public"
);
messagingTemplate.convertAndSend(destination, event);
}
public void sendNotification(Long userId, Map<String, Object> notification) {
String clientId = (String) notification.get("clientId");
// Send to user-specific queue
String destination = String.format("/queue/%s/user/%d/notifications",
clientId != null ? clientId : "default",
userId
);
messagingTemplate.convertAndSend(destination, notification);
}
public void sendEntityUpdate(String entityType, String action, Object data) {
String clientId = ClientContext.getClient();
String sandbox = TenantContext.getTenant();
Map<String, Object> message = new HashMap<>();
message.put("entityType", entityType);
message.put("action", action);
message.put("data", data);
message.put("timestamp", Instant.now().toString());
String destination = String.format("/topic/%s/%s/entities/%s",
clientId != null ? clientId : "default",
sandbox != null ? sandbox : "public",
entityType
);
messagingTemplate.convertAndSend(destination, message);
}
}
Docker Setup¶
docker-compose.yml¶
services:
rabbitmq:
image: rabbitmq:3-management
container_name: tgm-rabbitmq
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
- RABBITMQ_DEFAULT_USER=${RABBIT_USER:-guest}
- RABBITMQ_DEFAULT_PASS=${RABBIT_PASS:-guest}
volumes:
- rabbitmq-data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "check_running"]
interval: 30s
timeout: 10s
retries: 3
volumes:
rabbitmq-data:
Starting RabbitMQ¶
# Start RabbitMQ
docker-compose up -d rabbitmq
# Access management UI
open http://localhost:15672
# Login: guest / guest
# Check status
docker exec tgm-rabbitmq rabbitmqctl status
Client SDK & Angular/React Integration¶
For comprehensive client-side documentation including: - Real-time entity CRUD subscriptions - Dynamic queue message subscriptions - IoT, notification, and calendar event subscriptions - Full STOMP destination reference - Angular (RxStomp) and React (@stomp/stompjs) examples - Error handling and troubleshooting
See: Real-Time WebSocket Client SDK
Quick Example (Entity Changes)¶
// Subscribe to all entity CRUD events — works for any entity type automatically
client.subscribe('/topic/entities', (message) => {
const event = JSON.parse(message.body);
// event = { resourceName, action, entityId, timestamp, data }
});
// Or subscribe to a specific resource type
client.subscribe('/topic/entities/inspections', callback);
Quick Example (Dynamic Queue Messages)¶
// Subscribe to a specific queue (must exist in RabbitMQ)
client.subscribe('/topic/queues/tgm_iot', (message) => {
const event = JSON.parse(message.body);
// event = { queueName, handlerType, routingKey, timestamp, payload }
});
// Or subscribe to all queue messages
client.subscribe('/topic/queues', callback);
Monitoring¶
Management UI¶
Access the RabbitMQ Management UI at http://localhost:15672:
- Overview - Node status, message rates
- Connections - Active client connections
- Channels - Message channels
- Exchanges - Exchange configuration
- Queues - Queue depths, message rates
Health Check¶
# Check RabbitMQ health
curl http://localhost:15672/api/health/checks/alarms \
-u guest:guest
# From application
GET /actuator/health
# Response includes RabbitMQ status
{
"status": "UP",
"components": {
"rabbit": {
"status": "UP",
"details": {
"version": "3.12.0"
}
}
}
}
Queue Monitoring¶
# List queues with message counts
docker exec tgm-rabbitmq rabbitmqctl list_queues name messages consumers
# Queue details
docker exec tgm-rabbitmq rabbitmqctl list_queues name messages_ready messages_unacknowledged
Troubleshooting¶
Connection Issues¶
# Check if RabbitMQ is running
docker ps | grep rabbitmq
# Check logs
docker logs tgm-rabbitmq
# Test connection
docker exec tgm-rabbitmq rabbitmq-diagnostics check_port_connectivity
Messages Not Delivered¶
// Enable debug logging
logging.level.org.springframework.amqp=DEBUG
// Check if exchange/queue exists
docker exec tgm-rabbitmq rabbitmqctl list_exchanges
docker exec tgm-rabbitmq rabbitmqctl list_queues
// Check bindings
docker exec tgm-rabbitmq rabbitmqctl list_bindings
Consumer Not Receiving¶
# Check consumer count
docker exec tgm-rabbitmq rabbitmqctl list_queues name consumers
# Check if consumer is connected
docker exec tgm-rabbitmq rabbitmqctl list_consumers
High Memory Usage¶
# Check memory
docker exec tgm-rabbitmq rabbitmqctl status | grep -A5 memory
# Set memory limit
docker exec tgm-rabbitmq rabbitmqctl set_vm_memory_high_watermark 0.4
Message Accumulation¶
# Purge a queue (development only)
docker exec tgm-rabbitmq rabbitmqctl purge_queue tgm_queue
# Delete a queue
docker exec tgm-rabbitmq rabbitmqctl delete_queue old_queue
Best Practices¶
- Always include tenant context - Use
RabbitMQPublisherwhich automatically addsclientIdandsandbox - Use appropriate routing keys - Follow the
tgm.{entityType}pattern - Handle reconnection - RxStomp handles automatic reconnection
- Acknowledge messages - Spring AMQP handles this automatically
- Monitor queue depths - Set alerts for message accumulation
- Use dead letter queues - For failed message handling
Related Documentation¶
- WebSocket Integration - Real-time chat and WebSocket setup
- Multi-Tenancy - Tenant isolation architecture
- IoT & Sensors - Sensor event streaming