0% found this document useful (0 votes)
15 views20 pages

Microservices Communication Patterns & Tech

Uploaded by

Hassam
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
15 views20 pages

Microservices Communication Patterns & Tech

Uploaded by

Hassam
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd

Communication Inter-Microservices GMAO

Patterns et Technologies d'Intégration

1. Patterns de Communication

Communication Synchrone
Utilisation : Requêtes nécessitant une réponse immédiate

Protocol : REST/HTTP avec JSON

Cas d'usage : Validation, consultation de données en temps réel

Communication Asynchrone
Utilisation : Événements, notifications, traitements différés

Protocols : Message queues, Event streaming

Cas d'usage : Événements métier, mise à jour d'état, notifications

2. Technologies d'Intégration

A. API REST - Communication Synchrone

Format Standard : OpenAPI/Swagger

Tous les services exposent leurs APIs au format OpenAPI 3.0 :

yaml
# Exemple : Equipment Service API
openapi: 3.0.0
info:
title: Equipment Service API
version: 1.0.0
paths:
/equipment/{id}:
get:
summary: Get equipment details
parameters:
- name: id
in: path
required: true
schema:
type: integer
responses:
'200':
description: Equipment details
content:
application/json:
schema:
$ref: '#/components/schemas/Equipment'
components:
schemas:
Equipment:
type: object
properties:
id:
type: integer
serialNumber:
type: string
status:
type: string
enum: [OPERATIONAL, MAINTENANCE, FAULTY]

Implémentation par Technologie

Java/Spring Boot :

java
@RestController
@RequestMapping("/api/v1/equipment")
public class EquipmentController {

@GetMapping("/{id}")
public ResponseEntity<Equipment> getEquipment(@PathVariable Long id) {
Equipment equipment = [Link](id);
return [Link](equipment);
}
}

// Client HTTP avec RestTemplate/WebClient


@Component
public class MaintenanceServiceClient {

@Autowired
private WebClient webClient;

public Equipment getEquipment(Long id) {


return webClient
.get()
.uri("[Link] id)
.header("Authorization", "Bearer " + getToken())
.retrieve()
.bodyToMono([Link])
.block();
}
}

[Link]/Express :

javascript
// Serveur
[Link]('/api/v1/equipment/:id', async (req, res) => {
try {
const equipment = await [Link]([Link]);
[Link](equipment);
} catch (error) {
[Link](500).json({ error: [Link] });
}
});

// Client avec axios


const axios = require('axios');

class MaintenanceServiceClient {
async getEquipment(id) {
const response = await [Link](
`[Link]
{
headers: {
'Authorization': `Bearer ${await [Link]()}`,
'Content-Type': 'application/json'
},
timeout: 5000
}
);
return [Link];
}
}

.NET Core :

csharp
[ApiController]
[Route("api/v1/[controller]")]
public class EquipmentController : ControllerBase
{
[HttpGet("{id}")]
public async Task<ActionResult<Equipment>> GetEquipment(long id)
{
var equipment = await _equipmentService.GetByIdAsync(id);
return Ok(equipment);
}
}

// Client HTTP
public class MaintenanceServiceClient
{
private readonly HttpClient _httpClient;

public async Task<Equipment> GetEquipmentAsync(long id)


{
var response = await _httpClient.GetAsync($"/api/v1/equipment/{id}");
[Link]();

var json = await [Link]();


return [Link]<Equipment>(json);
}
}

Python/FastAPI :

python
# Serveur
@[Link]("/api/v1/equipment/{equipment_id}")
async def get_equipment(equipment_id: int) -> Equipment:
equipment = await equipment_service.get_by_id(equipment_id)
return equipment

# Client avec httpx


import httpx

class MaintenanceServiceClient:
async def get_equipment(self, equipment_id: int) -> Equipment:
async with [Link]() as client:
response = await [Link](
f"[Link]
headers={"Authorization": f"Bearer {await self.get_token()}"},
timeout=5.0
)
response.raise_for_status()
return Equipment(**[Link]())

Go/Gin :

go
// Serveur
func GetEquipment(c *[Link]) {
id := [Link]("id")
equipment, err := [Link](id)
if err != nil {
[Link]([Link], gin.H{"error": [Link]()})
return
}
[Link]([Link], equipment)
}

// Client
type MaintenanceServiceClient struct {
httpClient *[Link]
baseURL string
}

func (m *MaintenanceServiceClient) GetEquipment(id string) (*Equipment, error) {


req, _ := [Link]("GET",
[Link]("%s/api/v1/equipment/%s", [Link], id), nil)

[Link]("Authorization", "Bearer "+[Link]())


[Link]("Content-Type", "application/json")

resp, err := [Link](req)


if err != nil {
return nil, err
}
defer [Link]()

var equipment Equipment


[Link]([Link]).Decode(&equipment)
return &equipment, nil
}

B. Apache Kafka - Communication Asynchrone

Configuration Kafka

yaml
# [Link]
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:7.4.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
ports:
- "9092:9092"

schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092

Définition des Événements (Avro Schema)

json

{
"namespace": "[Link]",
"type": "record",
"name": "WorkOrderCreated",
"fields": [
{"name": "workOrderId", "type": "long"},
{"name": "equipmentId", "type": "long"},
{"name": "technicianId", "type": ["null", "long"], "default": null},
{"name": "priority", "type": {"type": "enum", "name": "Priority", "symbols": ["LOW", "MEDIUM", "HIGH", "CRITICAL"]}},
{"name": "scheduledDate", "type": "long", "logicalType": "timestamp-millis"},
{"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"},
{"name": "metadata", "type": {"type": "map", "values": "string"}, "default": {}}
]
}

Implémentation par Technologie

Java/Spring Boot (Producer) :


java

@Component
public class MaintenanceEventProducer {

@Autowired
private KafkaTemplate<String, WorkOrderCreated> kafkaTemplate;

public void publishWorkOrderCreated(WorkOrder workOrder) {


WorkOrderCreated event = [Link]()
.setWorkOrderId([Link]())
.setEquipmentId([Link]())
.setTechnicianId([Link]())
.setPriority([Link]([Link]().name()))
.setScheduledDate([Link]().toEpochMilli())
.setCreatedAt([Link]().toEpochMilli())
.build();

[Link]("workorder-events", event);
}
}

@KafkaListener(topics = "workorder-events")
public void handleWorkOrderCreated(WorkOrderCreated event) {
// Traitement de l'événement
[Link](event);
}

[Link] (Consumer) :

javascript
const kafka = require('kafkajs');
const avro = require('avsc');

const client = kafka({


clientId: 'notification-service',
brokers: ['kafka:9092']
});

const consumer = [Link]({ groupId: 'notification-group' });

// Schéma Avro
const workOrderSchema = [Link]({
type: 'record',
name: 'WorkOrderCreated',
fields: [
{ name: 'workOrderId', type: 'long' },
{ name: 'equipmentId', type: 'long' },
{ name: 'technicianId', type: ['null', 'long'], default: null },
// ... autres champs
]
});

const run = async () => {


await [Link]({ topic: 'workorder-events' });

await [Link]({
eachMessage: async ({ topic, partition, message }) => {
const event = [Link]([Link]);

// Traitement de l'événement
await [Link]({
workOrderId: [Link],
technicianId: [Link],
priority: [Link]
});
},
});
};

run().catch([Link]);

.NET Core (Producer & Consumer) :

csharp
// Producer
public class InventoryEventProducer
{
private readonly IProducer<string, byte[]> _producer;

public async Task PublishStockLevelChanged(StockMovement stockMovement)


{
var stockEvent = new StockLevelChanged
{
PartId = [Link],
NewLevel = [Link],
Threshold = [Link],
Timestamp = [Link]()
};

using var stream = new MemoryStream();


using var writer = new BinaryWriter(stream);
// Sérialisation Avro

await _producer.ProduceAsync("stock-events", new Message<string, byte[]>


{
Key = [Link](),
Value = [Link]()
});
}
}

// Consumer
public class StockEventConsumer : BackgroundService
{
private readonly IConsumer<string, byte[]> _consumer;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)


{
_consumer.Subscribe("stock-events");

while (![Link])
{
var consumeResult = _consumer.Consume(stoppingToken);

// Désérialisation et traitement
var stockEvent = DeserializeStockEvent([Link]);
await ProcessStockEvent(stockEvent);
}
}
}

Python/FastAPI :
python

import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import [Link]
import [Link]
import io

class ReportingEventConsumer:
def __init__(self):
[Link] = AIOKafkaConsumer(
'workorder-events', 'stock-events',
bootstrap_servers='kafka:9092',
group_id='reporting-group'
)

async def start_consuming(self):


await [Link]()
try:
async for msg in [Link]:
if [Link] == 'workorder-events':
await self.process_workorder_event([Link])
elif [Link] == 'stock-events':
await self.process_stock_event([Link])
finally:
await [Link]()

async def process_workorder_event(self, data: bytes):


# Désérialisation Avro
event = self.deserialize_avro(data, self.workorder_schema)
# Mise à jour des métriques
await self.update_maintenance_metrics(event)

Go :

go
package main

import (
"[Link]/segmentio/kafka-go"
"[Link]/linkedin/goavro/v2"
)

type MobileEventConsumer struct {


reader *[Link]
codec *[Link]
}

func (m *MobileEventConsumer) StartConsuming() {


[Link] = [Link]([Link]{
Brokers: []string{"kafka:9092"},
Topic: "workorder-events",
GroupID: "mobile-group",
})

for {
message, err := [Link]([Link]())
if err != nil {
[Link]("Error reading message: %v", err)
continue
}

// Désérialisation Avro
native, _, err := [Link]([Link])
if err != nil {
[Link]("Error deserializing: %v", err)
continue
}

// Traitement de l'événement
[Link](native)
}
}

C. Service Discovery & Load Balancing

Kubernetes Service Discovery

yaml
# Equipment Service
apiVersion: v1
kind: Service
metadata:
name: equipment-service
labels:
app: equipment-service
spec:
selector:
app: equipment-service
ports:
- port: 8080
targetPort: 8080
type: ClusterIP

Configuration Client avec Service Discovery

Java/Spring Boot (Eureka alternative avec Kubernetes) :

java

@Configuration
public class ServiceConfig {

@Bean
@LoadBalanced
public [Link] webClientBuilder() {
return [Link]();
}

@Bean
public WebClient webClient([Link] builder) {
return [Link]();
}
}

// Usage
public class MaintenanceServiceClient {
private final WebClient webClient;

public Mono<Equipment> getEquipment(Long id) {


return webClient
.get()
.uri("[Link] id)
.retrieve()
.bodyToMono([Link]);
}
}
Configuration avec Consul (alternative) :

yaml

# [Link]
global:
datacenter: dc1

client:
enabled: true

connectInject:
enabled: true

controller:
enabled: true

D. API Gateway - Point d'Entrée Unifié

Kong Configuration

yaml
# [Link]
_format_version: "3.0"
services:
- name: equipment-service
url: [Link]
routes:
- name: equipment-route
paths:
- /api/v1/equipment
methods:
- GET
- POST
- PUT
- DELETE

- name: maintenance-service
url: [Link]
routes:
- name: maintenance-route
paths:
- /api/v1/workorders
- /api/v1/maintenance

plugins:
- name: jwt
config:
secret_is_base64: false
key_claim_name: iss

- name: rate-limiting
config:
minute: 100
hour: 1000

- name: cors
config:
origins:
- "*"
methods:
- GET
- POST
- PUT
- DELETE
headers:
- Accept
- Authorization
- Content-Type
3. Patterns de Résilience

A. Circuit Breaker Pattern


Java (Hystrix/Resilience4j) :

java

@Component
public class EquipmentServiceClient {

@CircuitBreaker(name = "equipment-service", fallbackMethod = "fallbackGetEquipment")


@TimeLimiter(name = "equipment-service")
@Retry(name = "equipment-service")
public CompletableFuture<Equipment> getEquipment(Long id) {
return [Link](() -> {
return [Link]()
.uri("/equipment/{id}", id)
.retrieve()
.bodyToMono([Link])
.block();
});
}

public CompletableFuture<Equipment> fallbackGetEquipment(Long id, Exception ex) {


return [Link](
[Link]()
.id(id)
.status([Link])
.build()
);
}
}

[Link] (circuit-breaker-js) :

javascript
const CircuitBreaker = require('opossum');

const options = {
timeout: 3000,
errorThresholdPercentage: 50,
resetTimeout: 30000
};

const equipmentServiceCall = (id) => {


return [Link](`[Link]
};

const breaker = new CircuitBreaker(equipmentServiceCall, options);

// Fallback
[Link]((id) => ({
data: { id: id, status: 'UNKNOWN' }
}));

// Usage
const getEquipment = async (id) => {
try {
const response = await [Link](id);
return [Link];
} catch (error) {
[Link]('Circuit breaker opened:', [Link]);
throw error;
}
};

B. Retry Pattern avec Exponential Backoff


Python :

python
import asyncio
import random
from typing import Optional

class ServiceClient:
async def call_with_retry(self,
func,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0):

for attempt in range(max_retries + 1):


try:
return await func()
except Exception as e:
if attempt == max_retries:
raise e

delay = min(base_delay * (2 ** attempt), max_delay)


jitter = [Link](0, 0.1) * delay

await [Link](delay + jitter)

4. Monitoring & Observabilité

Distributed Tracing

yaml

# Jaeger configuration
jaeger:
agent:
host: jaeger-agent
port: 6831
sampler:
type: const
param: 1

Implémentation dans chaque service :

Java/Spring Boot :

java
// Automatique avec Spring Cloud Sleuth + Zipkin
@RestController
public class EquipmentController {

@GetMapping("/{id}")
public Equipment getEquipment(@PathVariable Long id) {
// Le tracing est automatique
return [Link](id);
}
}

[Link] (OpenTelemetry) :

javascript

const opentelemetry = require('@opentelemetry/api');


const { NodeTracerProvider } = require('@opentelemetry/sdk-node');
const { JaegerExporter } = require('@opentelemetry/exporter-jaeger');

const provider = new NodeTracerProvider();


const exporter = new JaegerExporter({
endpoint: '[Link]
});

[Link](new BatchSpanProcessor(exporter));
[Link](provider);

Cette approche garantit une communication robuste et observable entre tous vos microservices,
indépendamment de leur technologie sous-jacente.

You might also like