Accueil/Compétences/AWS Kinesis
Logo AWS Kinesis

AWS Kinesis

La plateforme de streaming d'AWS qui permet d'ingérer, de traiter et d'analyser en temps réel des flux massifs de données provenant de diverses sources.

Pour les non-initiés

Qu'est-ce qu'AWS Kinesis ?

Imaginez un immense réseau routier où des millions de véhicules (données) circulent simultanément. Pour comprendre le trafic en temps réel, vous auriez besoin d'un système capable de collecter les informations de tous ces véhicules, de les analyser instantanément, et de prendre des décisions immédiates.

AWS Kinesis est comme ce système de gestion du trafic, mais pour les données numériques. Il permet de collecter, traiter et analyser des flux massifs d'informations en temps réel, qu'il s'agisse de clics sur un site web, de données IoT, de transactions financières, de flux vidéo ou de toute autre source générant continuellement des données.

Pourquoi est-ce important ?

Temps réel

Permet d'obtenir des informations et de réagir instantanément plutôt que d'attendre des analyses rétrospectives.

Volume massif

Gère des milliers, voire des millions d'événements par seconde sans ralentissement ni perte de données.

En résumé, AWS Kinesis est un service qui permet aux entreprises de capturer, de traiter et d'analyser d'énormes volumes de données en mouvement, le tout en temps réel. C'est comme avoir un système nerveux numérique qui permet de sentir et de réagir instantanément à tout ce qui se passe dans votre environnement digital.

Pour les développeurs

Fonctionnement technique

AWS Kinesis est une famille de services pour le traitement de données en temps réel qui comprend quatre composants principaux, chacun répondant à des besoins spécifiques de gestion de flux de données.

Les composants de Kinesis

Kinesis Data Streams

Service fondamental de streaming de données qui permet d'ingérer et de stocker des téraoctets de données par heure provenant de milliers de sources. Il conserve les données de quelques heures à 365 jours (selon la configuration) et permet à plusieurs applications de les consommer simultanément.

La structure de Kinesis Data Streams repose sur des "shards" (partitions) qui déterminent la capacité de débit du stream. Chaque shard peut ingérer jusqu'à 1 Mo/s ou 1000 enregistrements par seconde en écriture, et jusqu'à 2 Mo/s en lecture.

Production de données
Écriture dans Kinesis Data Streams avec Node.js
// Exemple d'écriture de données dans Kinesis Data Streams avec AWS SDK v3 const { KinesisClient, PutRecordCommand } = require('@aws-sdk/client-kinesis'); async function sendDataToKinesis(data) { // Initialiser le client Kinesis const client = new KinesisClient({ region: 'eu-west-3' }); // Préparer les données à envoyer const params = { StreamName: 'my-data-stream', Data: Buffer.from(JSON.stringify(data)), PartitionKey: `partition-${Math.floor(Math.random() * 100)}` // Clé de partitionnement pour distribuer les données }; try { // Envoyer les données à Kinesis const command = new PutRecordCommand(params); const response = await client.send(command); console.log('Données envoyées avec succès :', response); return response; } catch (error) { console.error('Erreur lors de l\'envoi des données :', error); throw error; } } // Exemple d'utilisation pour envoyer un événement utilisateur sendDataToKinesis({ eventType: 'page_view', userId: '12345', timestamp: new Date().toISOString(), page: '/products/42', referrer: 'https://www.google.com', device: { type: 'mobile', browser: 'Chrome', os: 'Android' } });
Consommation de données
Lecture depuis Kinesis Data Streams avec Node.js
// Exemple de consommation de données depuis Kinesis Data Streams avec AWS SDK v3 const { KinesisClient, DescribeStreamCommand, GetShardIteratorCommand, GetRecordsCommand } = require('@aws-sdk/client-kinesis'); async function consumeKinesisData() { const client = new KinesisClient({ region: 'eu-west-3' }); const streamName = 'my-data-stream'; try { // 1. Obtenir la description du stream pour récupérer les shards const describeParams = { StreamName: streamName }; const describeCommand = new DescribeStreamCommand(describeParams); const streamDescription = await client.send(describeCommand); const shards = streamDescription.StreamDescription.Shards; console.log(`Le stream contient ${shards.length} shards`); // 2. Pour chaque shard, récupérer un itérateur for (const shard of shards) { const shardId = shard.ShardId; // Obtenir un itérateur pour ce shard (à partir du début) const iteratorParams = { StreamName: streamName, ShardId: shardId, ShardIteratorType: 'TRIM_HORIZON' // Options: LATEST, TRIM_HORIZON, AT_SEQUENCE_NUMBER, etc. }; const iteratorCommand = new GetShardIteratorCommand(iteratorParams); const iteratorData = await client.send(iteratorCommand); let shardIterator = iteratorData.ShardIterator; // 3. Utiliser l'itérateur pour récupérer les enregistrements let processedRecords = 0; const maxRecords = 100; // Nombre maximum d'enregistrements à traiter while (shardIterator && processedRecords < maxRecords) { // Récupérer les enregistrements const recordsParams = { ShardIterator: shardIterator, Limit: 25 // Nombre d'enregistrements à récupérer par appel }; const recordsCommand = new GetRecordsCommand(recordsParams); const recordsData = await client.send(recordsCommand); // Traiter les enregistrements reçus const records = recordsData.Records; if (records.length > 0) { console.log(`Récupération de ${records.length} enregistrements depuis le shard ${shardId}`); for (const record of records) { // Convertir les données binaires en objet JSON const data = JSON.parse(Buffer.from(record.Data).toString('utf8')); console.log('Traitement des données :', data); // Ici, effectuez le traitement réel des données selon votre cas d'usage // ... processedRecords++; } } // Mise à jour de l'itérateur pour le prochain appel shardIterator = recordsData.NextShardIterator; // Petite pause pour éviter de dépasser les limites d'API await new Promise(resolve => setTimeout(resolve, 1000)); // Si aucun nouvel itérateur ou aucun enregistrement, sortir de la boucle if (!shardIterator || records.length === 0) { break; } } } } catch (error) { console.error('Erreur lors de la consommation des données :', error); throw error; } }

Kinesis Data Firehose

Service de chargement de données en streaming qui capture, transforme et livre des données vers des destinations comme Icône S3S3, Icône Amazon OpenSearchAmazon OpenSearch, Redshift, ou des services tiers comme Datadog, Splunk ou MongoDB.

Data Firehose gère automatiquement le dimensionnement, la compression et le chiffrement. Il permet également la transformation des données en transit via des fonctions Icône LambdaLambda.

Fonction Lambda pour transformer des données dans Firehose
// Configuration du service AWS Lambda qui sera déclenché par Kinesis Data Firehose export const handler = async (event, context) => { // Chaque événement contient des enregistrements provenant de Firehose const output = event.records.map((record) => { // Décoder les données Base64 reçues const payload = Buffer.from(record.data, 'base64').toString('utf-8'); try { // Analyser le JSON const data = JSON.parse(payload); // Transformation des données (exemple: normalisation) const transformed = { event_id: data.id || `${Date.now()}-${Math.random().toString(36).substring(2, 10)}`, event_type: data.type || 'unknown', timestamp: data.timestamp || new Date().toISOString(), // Ajout de champs additionnels et normalisation user: { id: data.userId || data.user_id || 'anonymous', device: (data.device || 'unknown').toLowerCase(), country: (data.country || 'unknown').toUpperCase() }, // ... autres transformations metadata: { processed_at: new Date().toISOString(), version: '1.0' } }; // Convertir en JSON et encoder en Base64 pour Firehose return { recordId: record.recordId, result: 'Ok', data: Buffer.from(JSON.stringify(transformed)).toString('base64'), }; } catch (e) { console.error('Erreur de traitement:', e); // En cas d'erreur, marquer l'enregistrement en échec return { recordId: record.recordId, result: 'ProcessingFailed', data: record.data, }; } }); return { records: output }; };

Kinesis Data Analytics

Service d'analyse en temps réel qui permet de traiter et d'analyser les données de streaming à l'aide de SQL standard ou Apache Flink. Il facilite la création d'applications d'analyse complexes sans avoir à gérer l'infrastructure sous-jacente.

Avec Kinesis Data Analytics pour SQL, vous pouvez écrire des requêtes SQL qui s'exécutent en continu sur vos données de streaming, tandis qu'avec Kinesis Data Analytics pour Apache Flink, vous pouvez utiliser l'API Flink pour des transformations et analyses plus complexes.

Exemple de requête SQL pour Kinesis Data Analytics
-- Requête SQL qui calcule le nombre de vues par page sur une fenêtre glissante de 1 minute CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( page_path VARCHAR(255), view_count INTEGER, minute_window TIMESTAMP ); -- Insérer les résultats agrégés dans le stream de destination CREATE OR REPLACE PUMP "AGGREGATE_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM page_path, COUNT(*) AS view_count, FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) AS minute_window FROM "SOURCE_SQL_STREAM_001" WHERE event_type = 'page_view' GROUP BY page_path, FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) WINDOW TUMBLING (SIZE 60 SECONDS);

Kinesis Video Streams

Service spécialisé qui permet de diffuser, stocker, traiter et analyser des flux vidéo en temps réel. Il est particulièrement utile pour les applications IoT, la surveillance, la vision par ordinateur et la reconnaissance faciale.

Kinesis Video Streams gère l'ingestion, le stockage durable et le traitement des flux vidéo, audio et autres données séquentielles chronologiques. Il s'intègre avec des services comme Icône RekognitionRekognition pour l'analyse automatisée du contenu vidéo.

Infrastructure as Code

Les ressources Kinesis peuvent être définies et déployées à l'aide de services comme AWS CloudFormation ou AWS CDK :

Définition d'un stream Kinesis avec CloudFormation
# Définition d'un stream Kinesis et d'une fonction Lambda dans CloudFormation Resources: # Définition du stream Kinesis Data Streams AnalyticsDataStream: Type: AWS::Kinesis::Stream Properties: Name: analytics-data-stream RetentionPeriodHours: 24 ShardCount: 2 StreamEncryption: EncryptionType: KMS KeyId: alias/aws/kinesis Tags: - Key: Environment Value: Production # IAM Role pour la fonction Lambda KinesisProcessorRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: 'sts:AssumeRole' ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' Policies: - PolicyName: KinesisDataStreamAccess PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 'kinesis:DescribeStream' - 'kinesis:GetShardIterator' - 'kinesis:GetRecords' Resource: !GetAtt AnalyticsDataStream.Arn # Fonction Lambda consommant les données du stream StreamProcessorFunction: Type: AWS::Lambda::Function Properties: FunctionName: analytics-stream-processor Handler: index.handler Runtime: nodejs18.x Timeout: 60 MemorySize: 256 Role: !GetAtt KinesisProcessorRole.Arn Code: ZipFile: | exports.handler = async (event, context) => { console.log('Processing Kinesis records:', JSON.stringify(event, null, 2)); for (const record of event.Records) { // Decode and process the data const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8'); const data = JSON.parse(payload); console.log('Processing record:', data); // Your processing logic here } return { status: 'Success' }; }; # Event Source Mapping pour connecter Lambda au stream Kinesis StreamProcessorMapping: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 100 Enabled: true EventSourceArn: !GetAtt AnalyticsDataStream.Arn FunctionName: !GetAtt StreamProcessorFunction.Arn StartingPosition: LATEST

Concepts clés

  • Producteurs - Applications qui envoient des données au stream (SDKs AWS, Kinesis Producer Library, Kinesis Agent)
  • Consommateurs - Applications qui lisent et traitent les données du stream (Kinesis Consumer Library, Lambda, Firehose)
  • Shards - Unités de capacité qui déterminent le débit du stream
  • Partitionnement - Stratégie de distribution des données entre les shards
  • Rétention - Durée pendant laquelle les données sont conservées dans le stream (24h par défaut, configurable jusqu'à 365 jours)
  • Enhanced Fanout - Fonctionnalité permettant à chaque consommateur de recevoir 2 Mo/s de débit par shard
  • Resharding - Ajustement dynamique du nombre de shards d'un stream pour adapter sa capacité

Sécurité et monitoring

Kinesis intègre différentes couches de sécurité et de surveillance :

  • Sécurité - Chiffrement au repos via KMS, chiffrement en transit via SSL/TLS, contrôles d'accès via IAM
  • Monitoring - Métriques CloudWatch détaillées (GetRecords.IteratorAgeMilliseconds, PutRecord.Success, ReadProvisionedThroughputExceeded, etc.)
  • Logging - Intégration avec CloudTrail pour l'audit des API
  • VPC Endpoints - Accès privé via les points de terminaison VPC

Patterns d'architecture

  • Streaming ETL - Extraction, transformation et chargement en temps réel
  • Continuous Metrics - Surveillance et alerting sur des métriques en temps réel
  • Real-time Analytics - Analyse de données en temps réel pour tableau de bord
  • Complex Event Processing - Détection de modèles dans des flux d'événements
  • Data Lake Integration - Alimentation continue d'un lac de données
  • Fan-out Architecture - Distribution de données à plusieurs applications consommatrices
Applications concrètes

Cas d'usage

Analyse de flux d'activité utilisateur

Collectez et analysez en temps réel les événements d'interaction utilisateur sur votre site ou application pour comprendre le comportement, détecter des anomalies ou générer des recommandations personnalisées.

Détection de fraude en temps réel

Surveillez les transactions financières en temps réel pour identifier des patterns suspects et bloquer les activités frauduleuses avant qu'elles ne soient finalisées, réduisant ainsi les pertes potentielles.

Surveillance IoT et domotique

Ingérez et traitez les données provenant de millions d'appareils connectés pour surveiller leur état, déclencher des alertes ou automatiser des actions en fonction de conditions spécifiques.

Analyse vidéo en temps réel

Utilisez Kinesis Video Streams pour traiter des flux vidéo en direct, permettant la reconnaissance d'objets, la surveillance de sécurité, ou l'analyse de trafic routier avec des services comme Amazon Rekognition.

Industries utilisant Kinesis

Le traitement de données en temps réel de Kinesis bénéficie à de nombreux secteurs :

Finance
E-commerce
Télécommunications
Jeux vidéo
Sécurité
Industrie 4.0
Santé
Transport