AWS Kinesis
La plateforme de streaming d'AWS qui permet d'ingérer, de traiter et d'analyser en temps réel des flux massifs de données provenant de diverses sources.
Qu'est-ce qu'AWS Kinesis ?
Imaginez un immense réseau routier où des millions de véhicules (données) circulent simultanément. Pour comprendre le trafic en temps réel, vous auriez besoin d'un système capable de collecter les informations de tous ces véhicules, de les analyser instantanément, et de prendre des décisions immédiates.
AWS Kinesis est comme ce système de gestion du trafic, mais pour les données numériques. Il permet de collecter, traiter et analyser des flux massifs d'informations en temps réel, qu'il s'agisse de clics sur un site web, de données IoT, de transactions financières, de flux vidéo ou de toute autre source générant continuellement des données.
Pourquoi est-ce important ?
Temps réel
Permet d'obtenir des informations et de réagir instantanément plutôt que d'attendre des analyses rétrospectives.
Volume massif
Gère des milliers, voire des millions d'événements par seconde sans ralentissement ni perte de données.
En résumé, AWS Kinesis est un service qui permet aux entreprises de capturer, de traiter et d'analyser d'énormes volumes de données en mouvement, le tout en temps réel. C'est comme avoir un système nerveux numérique qui permet de sentir et de réagir instantanément à tout ce qui se passe dans votre environnement digital.
Fonctionnement technique
AWS Kinesis est une famille de services pour le traitement de données en temps réel qui comprend quatre composants principaux, chacun répondant à des besoins spécifiques de gestion de flux de données.
Les composants de Kinesis
Kinesis Data Streams
Service fondamental de streaming de données qui permet d'ingérer et de stocker des téraoctets de données par heure provenant de milliers de sources. Il conserve les données de quelques heures à 365 jours (selon la configuration) et permet à plusieurs applications de les consommer simultanément.
La structure de Kinesis Data Streams repose sur des "shards" (partitions) qui déterminent la capacité de débit du stream. Chaque shard peut ingérer jusqu'à 1 Mo/s ou 1000 enregistrements par seconde en écriture, et jusqu'à 2 Mo/s en lecture.
Production de données
// Exemple d'écriture de données dans Kinesis Data Streams avec AWS SDK v3
const { KinesisClient, PutRecordCommand } = require('@aws-sdk/client-kinesis');
async function sendDataToKinesis(data) {
// Initialiser le client Kinesis
const client = new KinesisClient({ region: 'eu-west-3' });
// Préparer les données à envoyer
const params = {
StreamName: 'my-data-stream',
Data: Buffer.from(JSON.stringify(data)),
PartitionKey: `partition-${Math.floor(Math.random() * 100)}` // Clé de partitionnement pour distribuer les données
};
try {
// Envoyer les données à Kinesis
const command = new PutRecordCommand(params);
const response = await client.send(command);
console.log('Données envoyées avec succès :', response);
return response;
} catch (error) {
console.error('Erreur lors de l\'envoi des données :', error);
throw error;
}
}
// Exemple d'utilisation pour envoyer un événement utilisateur
sendDataToKinesis({
eventType: 'page_view',
userId: '12345',
timestamp: new Date().toISOString(),
page: '/products/42',
referrer: 'https://www.google.com',
device: {
type: 'mobile',
browser: 'Chrome',
os: 'Android'
}
});
Consommation de données
// Exemple de consommation de données depuis Kinesis Data Streams avec AWS SDK v3
const { KinesisClient, DescribeStreamCommand, GetShardIteratorCommand, GetRecordsCommand } = require('@aws-sdk/client-kinesis');
async function consumeKinesisData() {
const client = new KinesisClient({ region: 'eu-west-3' });
const streamName = 'my-data-stream';
try {
// 1. Obtenir la description du stream pour récupérer les shards
const describeParams = { StreamName: streamName };
const describeCommand = new DescribeStreamCommand(describeParams);
const streamDescription = await client.send(describeCommand);
const shards = streamDescription.StreamDescription.Shards;
console.log(`Le stream contient ${shards.length} shards`);
// 2. Pour chaque shard, récupérer un itérateur
for (const shard of shards) {
const shardId = shard.ShardId;
// Obtenir un itérateur pour ce shard (à partir du début)
const iteratorParams = {
StreamName: streamName,
ShardId: shardId,
ShardIteratorType: 'TRIM_HORIZON' // Options: LATEST, TRIM_HORIZON, AT_SEQUENCE_NUMBER, etc.
};
const iteratorCommand = new GetShardIteratorCommand(iteratorParams);
const iteratorData = await client.send(iteratorCommand);
let shardIterator = iteratorData.ShardIterator;
// 3. Utiliser l'itérateur pour récupérer les enregistrements
let processedRecords = 0;
const maxRecords = 100; // Nombre maximum d'enregistrements à traiter
while (shardIterator && processedRecords < maxRecords) {
// Récupérer les enregistrements
const recordsParams = {
ShardIterator: shardIterator,
Limit: 25 // Nombre d'enregistrements à récupérer par appel
};
const recordsCommand = new GetRecordsCommand(recordsParams);
const recordsData = await client.send(recordsCommand);
// Traiter les enregistrements reçus
const records = recordsData.Records;
if (records.length > 0) {
console.log(`Récupération de ${records.length} enregistrements depuis le shard ${shardId}`);
for (const record of records) {
// Convertir les données binaires en objet JSON
const data = JSON.parse(Buffer.from(record.Data).toString('utf8'));
console.log('Traitement des données :', data);
// Ici, effectuez le traitement réel des données selon votre cas d'usage
// ...
processedRecords++;
}
}
// Mise à jour de l'itérateur pour le prochain appel
shardIterator = recordsData.NextShardIterator;
// Petite pause pour éviter de dépasser les limites d'API
await new Promise(resolve => setTimeout(resolve, 1000));
// Si aucun nouvel itérateur ou aucun enregistrement, sortir de la boucle
if (!shardIterator || records.length === 0) {
break;
}
}
}
} catch (error) {
console.error('Erreur lors de la consommation des données :', error);
throw error;
}
}
Kinesis Data Firehose
Service de chargement de données en streaming qui capture, transforme et livre des données vers des destinations comme S3,
Amazon OpenSearch, Redshift, ou des services tiers comme Datadog, Splunk ou MongoDB.
Data Firehose gère automatiquement le dimensionnement, la compression et le chiffrement. Il permet également la transformation des données en transit via des fonctions Lambda.
// Configuration du service AWS Lambda qui sera déclenché par Kinesis Data Firehose
export const handler = async (event, context) => {
// Chaque événement contient des enregistrements provenant de Firehose
const output = event.records.map((record) => {
// Décoder les données Base64 reçues
const payload = Buffer.from(record.data, 'base64').toString('utf-8');
try {
// Analyser le JSON
const data = JSON.parse(payload);
// Transformation des données (exemple: normalisation)
const transformed = {
event_id: data.id || `${Date.now()}-${Math.random().toString(36).substring(2, 10)}`,
event_type: data.type || 'unknown',
timestamp: data.timestamp || new Date().toISOString(),
// Ajout de champs additionnels et normalisation
user: {
id: data.userId || data.user_id || 'anonymous',
device: (data.device || 'unknown').toLowerCase(),
country: (data.country || 'unknown').toUpperCase()
},
// ... autres transformations
metadata: {
processed_at: new Date().toISOString(),
version: '1.0'
}
};
// Convertir en JSON et encoder en Base64 pour Firehose
return {
recordId: record.recordId,
result: 'Ok',
data: Buffer.from(JSON.stringify(transformed)).toString('base64'),
};
} catch (e) {
console.error('Erreur de traitement:', e);
// En cas d'erreur, marquer l'enregistrement en échec
return {
recordId: record.recordId,
result: 'ProcessingFailed',
data: record.data,
};
}
});
return { records: output };
};
Kinesis Data Analytics
Service d'analyse en temps réel qui permet de traiter et d'analyser les données de streaming à l'aide de SQL standard ou Apache Flink. Il facilite la création d'applications d'analyse complexes sans avoir à gérer l'infrastructure sous-jacente.
Avec Kinesis Data Analytics pour SQL, vous pouvez écrire des requêtes SQL qui s'exécutent en continu sur vos données de streaming, tandis qu'avec Kinesis Data Analytics pour Apache Flink, vous pouvez utiliser l'API Flink pour des transformations et analyses plus complexes.
-- Requête SQL qui calcule le nombre de vues par page sur une fenêtre glissante de 1 minute
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
page_path VARCHAR(255),
view_count INTEGER,
minute_window TIMESTAMP
);
-- Insérer les résultats agrégés dans le stream de destination
CREATE OR REPLACE PUMP "AGGREGATE_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
page_path,
COUNT(*) AS view_count,
FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) AS minute_window
FROM "SOURCE_SQL_STREAM_001"
WHERE event_type = 'page_view'
GROUP BY page_path, FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE)
WINDOW TUMBLING (SIZE 60 SECONDS);
Kinesis Video Streams
Service spécialisé qui permet de diffuser, stocker, traiter et analyser des flux vidéo en temps réel. Il est particulièrement utile pour les applications IoT, la surveillance, la vision par ordinateur et la reconnaissance faciale.
Kinesis Video Streams gère l'ingestion, le stockage durable et le traitement des flux vidéo, audio et autres données séquentielles chronologiques. Il s'intègre avec des services comme Rekognition pour l'analyse automatisée du contenu vidéo.
Infrastructure as Code
Les ressources Kinesis peuvent être définies et déployées à l'aide de services comme AWS CloudFormation ou AWS CDK :
# Définition d'un stream Kinesis et d'une fonction Lambda dans CloudFormation
Resources:
# Définition du stream Kinesis Data Streams
AnalyticsDataStream:
Type: AWS::Kinesis::Stream
Properties:
Name: analytics-data-stream
RetentionPeriodHours: 24
ShardCount: 2
StreamEncryption:
EncryptionType: KMS
KeyId: alias/aws/kinesis
Tags:
- Key: Environment
Value: Production
# IAM Role pour la fonction Lambda
KinesisProcessorRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: 'sts:AssumeRole'
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
Policies:
- PolicyName: KinesisDataStreamAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'kinesis:DescribeStream'
- 'kinesis:GetShardIterator'
- 'kinesis:GetRecords'
Resource: !GetAtt AnalyticsDataStream.Arn
# Fonction Lambda consommant les données du stream
StreamProcessorFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: analytics-stream-processor
Handler: index.handler
Runtime: nodejs18.x
Timeout: 60
MemorySize: 256
Role: !GetAtt KinesisProcessorRole.Arn
Code:
ZipFile: |
exports.handler = async (event, context) => {
console.log('Processing Kinesis records:', JSON.stringify(event, null, 2));
for (const record of event.Records) {
// Decode and process the data
const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
const data = JSON.parse(payload);
console.log('Processing record:', data);
// Your processing logic here
}
return { status: 'Success' };
};
# Event Source Mapping pour connecter Lambda au stream Kinesis
StreamProcessorMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 100
Enabled: true
EventSourceArn: !GetAtt AnalyticsDataStream.Arn
FunctionName: !GetAtt StreamProcessorFunction.Arn
StartingPosition: LATEST
Concepts clés
- Producteurs - Applications qui envoient des données au stream (SDKs AWS, Kinesis Producer Library, Kinesis Agent)
- Consommateurs - Applications qui lisent et traitent les données du stream (Kinesis Consumer Library, Lambda, Firehose)
- Shards - Unités de capacité qui déterminent le débit du stream
- Partitionnement - Stratégie de distribution des données entre les shards
- Rétention - Durée pendant laquelle les données sont conservées dans le stream (24h par défaut, configurable jusqu'à 365 jours)
- Enhanced Fanout - Fonctionnalité permettant à chaque consommateur de recevoir 2 Mo/s de débit par shard
- Resharding - Ajustement dynamique du nombre de shards d'un stream pour adapter sa capacité
Sécurité et monitoring
Kinesis intègre différentes couches de sécurité et de surveillance :
- Sécurité - Chiffrement au repos via KMS, chiffrement en transit via SSL/TLS, contrôles d'accès via IAM
- Monitoring - Métriques CloudWatch détaillées (GetRecords.IteratorAgeMilliseconds, PutRecord.Success, ReadProvisionedThroughputExceeded, etc.)
- Logging - Intégration avec CloudTrail pour l'audit des API
- VPC Endpoints - Accès privé via les points de terminaison VPC
Patterns d'architecture
- Streaming ETL - Extraction, transformation et chargement en temps réel
- Continuous Metrics - Surveillance et alerting sur des métriques en temps réel
- Real-time Analytics - Analyse de données en temps réel pour tableau de bord
- Complex Event Processing - Détection de modèles dans des flux d'événements
- Data Lake Integration - Alimentation continue d'un lac de données
- Fan-out Architecture - Distribution de données à plusieurs applications consommatrices
Cas d'usage
Analyse de flux d'activité utilisateur
Collectez et analysez en temps réel les événements d'interaction utilisateur sur votre site ou application pour comprendre le comportement, détecter des anomalies ou générer des recommandations personnalisées.
Détection de fraude en temps réel
Surveillez les transactions financières en temps réel pour identifier des patterns suspects et bloquer les activités frauduleuses avant qu'elles ne soient finalisées, réduisant ainsi les pertes potentielles.
Surveillance IoT et domotique
Ingérez et traitez les données provenant de millions d'appareils connectés pour surveiller leur état, déclencher des alertes ou automatiser des actions en fonction de conditions spécifiques.
Analyse vidéo en temps réel
Utilisez Kinesis Video Streams pour traiter des flux vidéo en direct, permettant la reconnaissance d'objets, la surveillance de sécurité, ou l'analyse de trafic routier avec des services comme Amazon Rekognition.
Industries utilisant Kinesis
Le traitement de données en temps réel de Kinesis bénéficie à de nombreux secteurs :