Communication Inter-Microservices GMAO
Patterns et Technologies d'Intégration
1. Patterns de Communication
Communication Synchrone
Utilisation : Requêtes nécessitant une réponse immédiate
Protocol : REST/HTTP avec JSON
Cas d'usage : Validation, consultation de données en temps réel
Communication Asynchrone
Utilisation : Événements, notifications, traitements différés
Protocols : Message queues, Event streaming
Cas d'usage : Événements métier, mise à jour d'état, notifications
2. Technologies d'Intégration
A. API REST - Communication Synchrone
Format Standard : OpenAPI/Swagger
Tous les services exposent leurs APIs au format OpenAPI 3.0 :
yaml
# Exemple : Equipment Service API
openapi: 3.0.0
info:
title: Equipment Service API
version: 1.0.0
paths:
/equipment/{id}:
get:
summary: Get equipment details
parameters:
- name: id
in: path
required: true
schema:
type: integer
responses:
'200':
description: Equipment details
content:
application/json:
schema:
$ref: '#/components/schemas/Equipment'
components:
schemas:
Equipment:
type: object
properties:
id:
type: integer
serialNumber:
type: string
status:
type: string
enum: [OPERATIONAL, MAINTENANCE, FAULTY]
Implémentation par Technologie
Java/Spring Boot :
java
@RestController
@RequestMapping("/api/v1/equipment")
public class EquipmentController {
@GetMapping("/{id}")
public ResponseEntity<Equipment> getEquipment(@PathVariable Long id) {
Equipment equipment = [Link](id);
return [Link](equipment);
}
}
// Client HTTP avec RestTemplate/WebClient
@Component
public class MaintenanceServiceClient {
@Autowired
private WebClient webClient;
public Equipment getEquipment(Long id) {
return webClient
.get()
.uri("[Link] id)
.header("Authorization", "Bearer " + getToken())
.retrieve()
.bodyToMono([Link])
.block();
}
}
[Link]/Express :
javascript
// Serveur
[Link]('/api/v1/equipment/:id', async (req, res) => {
try {
const equipment = await [Link]([Link]);
[Link](equipment);
} catch (error) {
[Link](500).json({ error: [Link] });
}
});
// Client avec axios
const axios = require('axios');
class MaintenanceServiceClient {
async getEquipment(id) {
const response = await [Link](
`[Link]
{
headers: {
'Authorization': `Bearer ${await [Link]()}`,
'Content-Type': 'application/json'
},
timeout: 5000
}
);
return [Link];
}
}
.NET Core :
csharp
[ApiController]
[Route("api/v1/[controller]")]
public class EquipmentController : ControllerBase
{
[HttpGet("{id}")]
public async Task<ActionResult<Equipment>> GetEquipment(long id)
{
var equipment = await _equipmentService.GetByIdAsync(id);
return Ok(equipment);
}
}
// Client HTTP
public class MaintenanceServiceClient
{
private readonly HttpClient _httpClient;
public async Task<Equipment> GetEquipmentAsync(long id)
{
var response = await _httpClient.GetAsync($"/api/v1/equipment/{id}");
[Link]();
var json = await [Link]();
return [Link]<Equipment>(json);
}
}
Python/FastAPI :
python
# Serveur
@[Link]("/api/v1/equipment/{equipment_id}")
async def get_equipment(equipment_id: int) -> Equipment:
equipment = await equipment_service.get_by_id(equipment_id)
return equipment
# Client avec httpx
import httpx
class MaintenanceServiceClient:
async def get_equipment(self, equipment_id: int) -> Equipment:
async with [Link]() as client:
response = await [Link](
f"[Link]
headers={"Authorization": f"Bearer {await self.get_token()}"},
timeout=5.0
)
response.raise_for_status()
return Equipment(**[Link]())
Go/Gin :
go
// Serveur
func GetEquipment(c *[Link]) {
id := [Link]("id")
equipment, err := [Link](id)
if err != nil {
[Link]([Link], gin.H{"error": [Link]()})
return
}
[Link]([Link], equipment)
}
// Client
type MaintenanceServiceClient struct {
httpClient *[Link]
baseURL string
}
func (m *MaintenanceServiceClient) GetEquipment(id string) (*Equipment, error) {
req, _ := [Link]("GET",
[Link]("%s/api/v1/equipment/%s", [Link], id), nil)
[Link]("Authorization", "Bearer "+[Link]())
[Link]("Content-Type", "application/json")
resp, err := [Link](req)
if err != nil {
return nil, err
}
defer [Link]()
var equipment Equipment
[Link]([Link]).Decode(&equipment)
return &equipment, nil
}
B. Apache Kafka - Communication Asynchrone
Configuration Kafka
yaml
# [Link]
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.4.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
ports:
- "9092:9092"
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
Définition des Événements (Avro Schema)
json
{
"namespace": "[Link]",
"type": "record",
"name": "WorkOrderCreated",
"fields": [
{"name": "workOrderId", "type": "long"},
{"name": "equipmentId", "type": "long"},
{"name": "technicianId", "type": ["null", "long"], "default": null},
{"name": "priority", "type": {"type": "enum", "name": "Priority", "symbols": ["LOW", "MEDIUM", "HIGH", "CRITICAL"]}},
{"name": "scheduledDate", "type": "long", "logicalType": "timestamp-millis"},
{"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"},
{"name": "metadata", "type": {"type": "map", "values": "string"}, "default": {}}
]
}
Implémentation par Technologie
Java/Spring Boot (Producer) :
java
@Component
public class MaintenanceEventProducer {
@Autowired
private KafkaTemplate<String, WorkOrderCreated> kafkaTemplate;
public void publishWorkOrderCreated(WorkOrder workOrder) {
WorkOrderCreated event = [Link]()
.setWorkOrderId([Link]())
.setEquipmentId([Link]())
.setTechnicianId([Link]())
.setPriority([Link]([Link]().name()))
.setScheduledDate([Link]().toEpochMilli())
.setCreatedAt([Link]().toEpochMilli())
.build();
[Link]("workorder-events", event);
}
}
@KafkaListener(topics = "workorder-events")
public void handleWorkOrderCreated(WorkOrderCreated event) {
// Traitement de l'événement
[Link](event);
}
[Link] (Consumer) :
javascript
const kafka = require('kafkajs');
const avro = require('avsc');
const client = kafka({
clientId: 'notification-service',
brokers: ['kafka:9092']
});
const consumer = [Link]({ groupId: 'notification-group' });
// Schéma Avro
const workOrderSchema = [Link]({
type: 'record',
name: 'WorkOrderCreated',
fields: [
{ name: 'workOrderId', type: 'long' },
{ name: 'equipmentId', type: 'long' },
{ name: 'technicianId', type: ['null', 'long'], default: null },
// ... autres champs
]
});
const run = async () => {
await [Link]({ topic: 'workorder-events' });
await [Link]({
eachMessage: async ({ topic, partition, message }) => {
const event = [Link]([Link]);
// Traitement de l'événement
await [Link]({
workOrderId: [Link],
technicianId: [Link],
priority: [Link]
});
},
});
};
run().catch([Link]);
.NET Core (Producer & Consumer) :
csharp
// Producer
public class InventoryEventProducer
{
private readonly IProducer<string, byte[]> _producer;
public async Task PublishStockLevelChanged(StockMovement stockMovement)
{
var stockEvent = new StockLevelChanged
{
PartId = [Link],
NewLevel = [Link],
Threshold = [Link],
Timestamp = [Link]()
};
using var stream = new MemoryStream();
using var writer = new BinaryWriter(stream);
// Sérialisation Avro
await _producer.ProduceAsync("stock-events", new Message<string, byte[]>
{
Key = [Link](),
Value = [Link]()
});
}
}
// Consumer
public class StockEventConsumer : BackgroundService
{
private readonly IConsumer<string, byte[]> _consumer;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("stock-events");
while (![Link])
{
var consumeResult = _consumer.Consume(stoppingToken);
// Désérialisation et traitement
var stockEvent = DeserializeStockEvent([Link]);
await ProcessStockEvent(stockEvent);
}
}
}
Python/FastAPI :
python
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import [Link]
import [Link]
import io
class ReportingEventConsumer:
def __init__(self):
[Link] = AIOKafkaConsumer(
'workorder-events', 'stock-events',
bootstrap_servers='kafka:9092',
group_id='reporting-group'
)
async def start_consuming(self):
await [Link]()
try:
async for msg in [Link]:
if [Link] == 'workorder-events':
await self.process_workorder_event([Link])
elif [Link] == 'stock-events':
await self.process_stock_event([Link])
finally:
await [Link]()
async def process_workorder_event(self, data: bytes):
# Désérialisation Avro
event = self.deserialize_avro(data, self.workorder_schema)
# Mise à jour des métriques
await self.update_maintenance_metrics(event)
Go :
go
package main
import (
"[Link]/segmentio/kafka-go"
"[Link]/linkedin/goavro/v2"
)
type MobileEventConsumer struct {
reader *[Link]
codec *[Link]
}
func (m *MobileEventConsumer) StartConsuming() {
[Link] = [Link]([Link]{
Brokers: []string{"kafka:9092"},
Topic: "workorder-events",
GroupID: "mobile-group",
})
for {
message, err := [Link]([Link]())
if err != nil {
[Link]("Error reading message: %v", err)
continue
}
// Désérialisation Avro
native, _, err := [Link]([Link])
if err != nil {
[Link]("Error deserializing: %v", err)
continue
}
// Traitement de l'événement
[Link](native)
}
}
C. Service Discovery & Load Balancing
Kubernetes Service Discovery
yaml
# Equipment Service
apiVersion: v1
kind: Service
metadata:
name: equipment-service
labels:
app: equipment-service
spec:
selector:
app: equipment-service
ports:
- port: 8080
targetPort: 8080
type: ClusterIP
Configuration Client avec Service Discovery
Java/Spring Boot (Eureka alternative avec Kubernetes) :
java
@Configuration
public class ServiceConfig {
@Bean
@LoadBalanced
public [Link] webClientBuilder() {
return [Link]();
}
@Bean
public WebClient webClient([Link] builder) {
return [Link]();
}
}
// Usage
public class MaintenanceServiceClient {
private final WebClient webClient;
public Mono<Equipment> getEquipment(Long id) {
return webClient
.get()
.uri("[Link] id)
.retrieve()
.bodyToMono([Link]);
}
}
Configuration avec Consul (alternative) :
yaml
# [Link]
global:
datacenter: dc1
client:
enabled: true
connectInject:
enabled: true
controller:
enabled: true
D. API Gateway - Point d'Entrée Unifié
Kong Configuration
yaml
# [Link]
_format_version: "3.0"
services:
- name: equipment-service
url: [Link]
routes:
- name: equipment-route
paths:
- /api/v1/equipment
methods:
- GET
- POST
- PUT
- DELETE
- name: maintenance-service
url: [Link]
routes:
- name: maintenance-route
paths:
- /api/v1/workorders
- /api/v1/maintenance
plugins:
- name: jwt
config:
secret_is_base64: false
key_claim_name: iss
- name: rate-limiting
config:
minute: 100
hour: 1000
- name: cors
config:
origins:
- "*"
methods:
- GET
- POST
- PUT
- DELETE
headers:
- Accept
- Authorization
- Content-Type
3. Patterns de Résilience
A. Circuit Breaker Pattern
Java (Hystrix/Resilience4j) :
java
@Component
public class EquipmentServiceClient {
@CircuitBreaker(name = "equipment-service", fallbackMethod = "fallbackGetEquipment")
@TimeLimiter(name = "equipment-service")
@Retry(name = "equipment-service")
public CompletableFuture<Equipment> getEquipment(Long id) {
return [Link](() -> {
return [Link]()
.uri("/equipment/{id}", id)
.retrieve()
.bodyToMono([Link])
.block();
});
}
public CompletableFuture<Equipment> fallbackGetEquipment(Long id, Exception ex) {
return [Link](
[Link]()
.id(id)
.status([Link])
.build()
);
}
}
[Link] (circuit-breaker-js) :
javascript
const CircuitBreaker = require('opossum');
const options = {
timeout: 3000,
errorThresholdPercentage: 50,
resetTimeout: 30000
};
const equipmentServiceCall = (id) => {
return [Link](`[Link]
};
const breaker = new CircuitBreaker(equipmentServiceCall, options);
// Fallback
[Link]((id) => ({
data: { id: id, status: 'UNKNOWN' }
}));
// Usage
const getEquipment = async (id) => {
try {
const response = await [Link](id);
return [Link];
} catch (error) {
[Link]('Circuit breaker opened:', [Link]);
throw error;
}
};
B. Retry Pattern avec Exponential Backoff
Python :
python
import asyncio
import random
from typing import Optional
class ServiceClient:
async def call_with_retry(self,
func,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0):
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
if attempt == max_retries:
raise e
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = [Link](0, 0.1) * delay
await [Link](delay + jitter)
4. Monitoring & Observabilité
Distributed Tracing
yaml
# Jaeger configuration
jaeger:
agent:
host: jaeger-agent
port: 6831
sampler:
type: const
param: 1
Implémentation dans chaque service :
Java/Spring Boot :
java
// Automatique avec Spring Cloud Sleuth + Zipkin
@RestController
public class EquipmentController {
@GetMapping("/{id}")
public Equipment getEquipment(@PathVariable Long id) {
// Le tracing est automatique
return [Link](id);
}
}
[Link] (OpenTelemetry) :
javascript
const opentelemetry = require('@opentelemetry/api');
const { NodeTracerProvider } = require('@opentelemetry/sdk-node');
const { JaegerExporter } = require('@opentelemetry/exporter-jaeger');
const provider = new NodeTracerProvider();
const exporter = new JaegerExporter({
endpoint: '[Link]
});
[Link](new BatchSpanProcessor(exporter));
[Link](provider);
Cette approche garantit une communication robuste et observable entre tous vos microservices,
indépendamment de leur technologie sous-jacente.