Rate Limiting d'API avec Traefik, Docker, Go, et la mise en cache

Limiter l’utilisation de l’API en fonction d’une règle avancée de limitation du débit n’est pas si facile. Pour y parvenir avec l’API NLP Cloud, nous utilisons une combinaison de Traefik (comme proxy inverse) et de mise en cache locale dans un script Go. Si cela est fait correctement, vous pouvez améliorer considérablement les performances de votre limitation de débit et étrangler correctement les demandes d’API sans sacrifier la vitesse des demandes.

Dans cet exemple, nous montrons comment déléguer la limitation de débit de chaque requête API à un microservice dédié grâce à Traefik et Docker. Ensuite, dans ce microservice dédié, nous allons compter le nombre de requêtes récemment effectuées afin d’autoriser ou non la nouvelle requête.

Traefik comme reverse proxy

Pour mettre en place une passerelle API, Traefik et Docker sont une très bonne combinaison.

Traefik

L’idée est que toutes vos demandes d’API soient d’abord acheminées vers un conteneur Docker contenant une instance Traefik. Cette instance Traefik agit comme un proxy inverse et effectue donc des opérations telles que l’authentification, le filtrage, la réessai, … pour finalement acheminer la demande de l’utilisateur vers le bon conteneur.

Par exemple, si vous faites une demande de résumé de texte sur NLP Cloud, vous passerez d’abord par la passerelle API qui se chargera d’authentifier votre demande et, si elle est authentifiée avec succès, votre demande sera acheminée vers un modèle d’apprentissage automatique de résumé de texte contenu dans un conteneur Docker dédié hébergé sur un serveur spécifique.

Traefik et Docker sont tous deux faciles à utiliser, et ils rendent votre programme assez facile à maintenir.

Pourquoi utiliser Go?

Un script de limitation de débit devra nécessairement gérer un énorme volume de requêtes simultanées.

Go est un bon candidat pour ce type d’application car il traite les demandes très rapidement, et sans consommer trop de CPU et de RAM.

Traefik et Docker ont tous deux été écrits en Go, ce qui ne doit pas être une coïncidence…

Une mise en œuvre naïve consisterait à utiliser la base de données pour stocker l’utilisation de l’API, compter les demandes passées des utilisateurs et limiter les demandes en fonction de cela. Cela soulèvera rapidement des problèmes de performance, car le fait d’effectuer une requête à la base de données chaque fois que vous voulez vérifier une demande surchargera la base de données et créera des tonnes d’accès inutiles au réseau. La meilleure solution est de gérer cela localement en mémoire. Le revers de la médaille, bien sûr, est que les compteurs en mémoire ne sont pas persistants : si vous redémarrez votre application de limitation de débit, vous perdrez tous vos compteurs en cours. En théorie, cela ne devrait pas être un gros problème pour une application de limitation de débit.

Délégation du rate limiting de l’API à un microservice dédié grâce à Traefik et Docker

Traefik possède de nombreuses fonctionnalités intéressantes. L’une d’entre elles est la possibilité de transmettre l’authentification à un service dédié.

Traefik Auth Forwarding

Fondamentalement, chaque demande d’API entrante sera d’abord transmise à un service dédié. Si ce service renvoie un code 2XX, alors la demande est acheminée vers le service approprié, sinon elle est rejetée.

Dans l’exemple suivant, nous allons utiliser un fichier Docker Compose pour un cluster Docker Swarm. Si vous utilisez un autre orchestrateur de conteneurs comme Kubernetes, Traefik fonctionnera très bien aussi.

Tout d’abord, créez un fichier Docker Compose pour votre endpoint API et activez Traefik :

version: "3.8"

services:
  traefik:
    image: "traefik"
    command:
      - --providers.docker.swarmmode
  api_endpoint:
    image: path_to_api_endpoint_image
    deploy:
      labels:
        - traefik.http.routers.api_endpoint.entrypoints=http
        - traefik.http.services.api_endpoint.loadbalancer.server.port=80
        - traefik.http.routers.api_endpoint.rule=Host(`example.com`) && PathPrefix(`/api-endpoint`)

Ensuite, ajoutez un nouveau service dédié à la limitation de débit et demandez à Traefik de lui transmettre toutes les requêtes (nous coderons ce service Go rate limiting un peu plus tard) :

version: "3.8"

services:
  traefik:
    image: traefik
    command:
      - --providers.docker.swarmmode
  api_endpoint:
    image: path_to_your_api_endpoint_image
    deploy:
      labels:
        - traefik.http.routers.api_endpoint.entrypoints=http
        - traefik.http.services.api_endpoint.loadbalancer.server.port=80
        - traefik.http.routers.api_endpoint.rule=Host(`example.com`) && PathPrefix(`/api-endpoint`)
        - traefik.http.middlewares.forward_auth_api_endpoint.forwardauth.address=http://rate_limiting:8080
        - traefik.http.routers.api_endpoint.middlewares=forward_auth_api_endpoint
  rate_limiting:
    image: path_to_your_rate_limiting_image
    deploy:
      labels:
        - traefik.http.routers.rate_limiting.entrypoints=http
        - traefik.http.services.rate_limiting.loadbalancer.server.port=8080

Nous avons maintenant une configuration complète Docker Swarm + Traefik qui transmet d’abord les demandes à un service de limitation de débit avant de router éventuellement la demande vers le point final de l’API. Vous pouvez mettre ce qui précède dans un fichier production.yml et démarrer l’application avec la commande suivante :

docker stack deploy --with-registry-auth -c production.yml application_name

Notez que seuls les en-têtes des demandes sont transférés, et non le contenu des demandes. Ceci est pour des raisons de performance. Donc, si vous voulez authentifier une demande sur la base du corps de cette demande, vous devrez trouver une autre stratégie.

Gérer le rate limiting avec Go et la mise en cache

Les configurations de Traefik et de Docker sont prêtes. Nous devons maintenant coder le microservice Go qui se chargera de limiter le débit des demandes : les utilisateurs n’ont droit qu’à 10 demandes par minute. Au-delà de 10 demandes par minute, chaque demande sera rejetée avec un code HTTP 429.

package main

import (
  "fmt"
  "time"
  "log"
  "net/http"

  "github.com/gorilla/mux"
  "github.com/patrickmn/go-cache"
)

var c *cache.Cache

// updateUsage increments the API calls in local cache.
func updateUsage(token) {
  // We first try to increment the counter for this user.
  // If there is no existing counter, an error is returned, and in that
  // case we create a new counter with a 3 minute expiry (we don't want
  // old counters to stay in memory forever).
  _, err := c.IncrementInt(fmt.Sprintf("%v/%v", token, time.Now().Minute()), 1)
  if err != nil {
  c.Set(fmt.Sprintf("%v/%v", token, time.Now().Minute()), 1, 3*time.Minute)
  }
}

func RateLimitingHandler(w http.ResponseWriter, r *http.Request) {
  // Retrieve user API token from request headers.
  // Not implemented here for the sake of simplicity.
  apiToken := retrieveAPIToken(r)
  
  var count int

  if x, found := c.Get(fmt.Sprintf("%v/%v", apiToken, time.Now().Minute())); found {
    count = x.(int)
  }

  if count >= 10 {
    w.WriteHeader(http.StatusTooManyRequests)
    return
  }

  updateUsage(apiToken)

  w.WriteHeader(http.StatusOK)
}

func main() {
 r := mux.NewRouter()
 r.HandleFunc("/", RateLimitingHandler)

 log.Println("API is ready and listening on 8080.")

 log.Fatal(http.ListenAndServe(":8080", r))
}

Comme vous pouvez le voir, nous utilisons le Gorilla toolkit afin de créer une petite API, écoutant sur le port 8080, qui recevra la requête transmise par Traefik.

Une fois la requête reçue, nous extrayons le jeton API de l’utilisateur de la requête (non implémenté ici pour des raisons de simplicité), et vérifions le nombre de requêtes effectuées par l’utilisateur associé à ce jeton API au cours de la dernière minute.

Le compteur de requêtes est stocké en mémoire grâce à la bibliothèque go-cache. Go-cache est une bibliothèque de mise en cache minimaliste pour Go qui est très similaire à Redis. Elle gère automatiquement les éléments importants comme l’expiration du cache. Le stockage des compteurs d’API en mémoire est crucial car c’est la solution la plus rapide, et nous voulons que ce code soit aussi rapide que possible afin de ne pas trop ralentir les demandes d’API.

Si l’utilisateur a fait plus de 10 demandes pendant la minute en cours, la demande est rejetée avec un code d’erreur HTTP 429. Traefik verra que cette erreur 429 n’est pas un code 2XX, donc il ne permettra pas à la requête de l’utilisateur d’atteindre le endpoint API, et il propagera l’erreur 429 à l’utilisateur.

Si la demande n’est pas limitée en débit, nous incrémentons automatiquement le compteur pour cet utilisateur.

Je vous recommande de déployer cette application Go dans un simple conteneur “scratch” (FROM scratch) : c’est la manière la plus légère de déployer des binaires Go avec Docker.

Conclusion

Comme vous pouvez le constater, la mise en œuvre d’une passerelle de limitation de débit pour votre API n’est pas si difficile, grâce à Traefik, Docker et Go.

Bien sûr, la limitation du débit en fonction du nombre de demandes par minute n’est qu’une première étape. Vous pourriez vouloir faire des choses plus avancées comme :

  • Limitation du débit par minute, par heure, par jour et par mois.
  • Limitation du débit par point de terminaison de l’API
  • Avoir une limite de débit variable par utilisateur en fonction du plan auquel il a souscrit.
  • Vérifier la concurrence

Il y a tellement de choses intéressantes que nous ne pouvons pas mentionner dans cet article !

Si vous avez des questions, n’hésitez pas à m’en faire part.

Also available in English | También existe en Español
Analyse d'API avec les séries chronologiques grâce à TimescaleDB

Le suivi de l’utilisation des API peut être un véritable défi technique en raison de la vitesse et du volume élevés des demandes. Pourtant, il est crucial de disposer d’analyses précises pour votre API, surtout si vous vous en servez pour facturer vos clients. Il est possible d’être à la fois rapide et précis avec une base de données de séries temporelles appelée TimescaleDB. Il s’agit en fait de la solution que nous avons mise en œuvre derrière NLP Cloud.

Qu’est-ce que l’analyse des API et pourquoi est-ce difficile ?

L’analyse des API consiste à récupérer diverses mesures liées à l’utilisation de votre API.

Par exemple, derrière l’API NLP Cloud, nous souhaitons connaître les éléments suivants :

  • Combien de demandes ont été faites au cours de la dernière minute, de la dernière heure, du dernier jour, du dernier mois et de la dernière année ?
  • Combien de requêtes ont été faites par point de terminaison de l’API et par utilisateur ?
  • Combien de mots ont été générés par nos modèles NLP de génération de texte (comme GPT-J) ?
  • Combien de caractères ont été utilisés par notre module NLP multilingue ?

Toutes ces mesures sont importantes pour mieux comprendre comment notre API est utilisée par nos clients. Sans ces données, nous sommes incapables de savoir quels modèles NLP sont les plus utilisés, qui sont nos clients les plus importants, etc.

Mais plus important encore, certaines de ces mesures sont utilisées pour la facturation ! Par exemple, les clients qui ont souscrit à un plan “pay-as-you-go” sont facturés en fonction du nombre de mots qu’ils ont générés avec notre API.

Un très grand volume de données passe par notre passerelle API, ce qui constitue un défi en termes de performances. Il est très facile de ralentir l’API ou de perdre des données.

Il est donc essentiel qu’un tel système d’analyse des API soit à la fois rapide et fiable.

TimescaleDB à la rescousse

TimescaleDB est une base de données PostgreSQL optimisée pour les séries chronologiques.

TimescaleDB

Fondamentalement, Timescale est optimisé pour un volume élevé d’écritures atomiques. Il est parfait pour un cas d’utilisation où vous écrivez des tonnes de données sur une base très régulière, mais ne modifiez presque jamais ces données, et ne les lisez qu’occasionnellement.

Timescale propose des outils intéressants qui facilitent la création de séries chronologiques. Par exemple, il existe des “agrégats continus”. Ces agrégats sont un moyen de réduire automatiquement l’échantillonnage de vos données sur une base régulière. L’échantillonnage réduit signifie que vous supprimez les anciennes données après un certain temps et que vous ne conservez que certains agrégats de ces données (basés sur des sommes, des comptages, des moyennes, etc.) C’est crucial pour deux raisons :

  • Les séries chronologiques peuvent croître très rapidement, c’est donc un très bon moyen d’économiser de l’espace disque.
  • La lecture d’une table remplie de données peut être terriblement lente. Il est beaucoup plus facile de lire les données d’une table agrégée qui contient moins de données.

Contrairement à d’autres solutions comme InfluxDB, TimescaleDB est une solution purement SQL, la courbe d’apprentissage est donc assez faible et l’intégration sera beaucoup plus facile. Par exemple, chez NLP Cloud, nous interagissons avec TimescaleDB dans des applications Python et Go et nous pouvons utiliser nos bibliothèques PostgreSQL habituelles.

Installation

Vous pouvez installer TimescaleDB en tant que package système, mais il est plus simple de l’installer en tant que conteneur Docker.

Commencez par télécharger l’image Docker :

docker pull timescale/timescaledb:latest-pg14

Ensuite, démarrez votre conteneur et passez un mot de passe pour votre DB :

docker run -d --name timescaledb -p 5432:5432 -e POSTGRES_PASSWORD=password timescale/timescaledb:latest-pg14

Structure des données dans TimescaleDB

Dans cet exemple, nous voulons stocker les demandes d’API. Nous voulons que chaque demande contienne les éléments suivants :

  • L’heure de la demande
  • L’identifiant de l’utilisateur qui a fait la demande
  • Le point de terminaison de l’API utilisé pendant la demande

La première fois que vous lancez TimescaleDB, vous devez créer plusieurs choses.

Tout d’abord, lancez l’extension TimescaleDB.

CREATE EXTENSION IF NOT EXISTS timescaledb;

Créez la table qui stockera les demandes d’API, comme nous le ferions dans toute base de données PostgreSQL :

CREATE TABLE IF NOT EXISTS api_calls (
  time TIMESTAMPTZ NOT NULL,
  user_id TEXT NOT NULL,
  endpoint TEXT NOT NULL
);

Nous en faisons ensuite un “hypertable” :

SELECT create_hypertable('api_calls', 'time', if_not_exists => TRUE);

Les tables horizontales sont le cœur de TimescaleDB. Elles ajoutent automatiquement de nombreux éléments intelligents afin de gérer efficacement vos données.

Nous allons maintenant créer une vue spécifique à partir de votre table api_calls appelée api_calls_per_hour. C’est une vue qui stockera les données agrégées provenant de api_calls. Toutes les heures, le nombre de requêtes API dans api_calls sera compté et placé dans api_calls_per_hour. La vue sera beaucoup plus rapide à interroger puisqu’elle contient beaucoup moins de données que la table initiale api_calls.

CREATE MATERIALIZED VIEW IF NOT EXISTS api_calls_per_hour
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', time) as bucket, user_id, endpoint,
COUNT(time)
FROM api_calls
GROUP BY bucket, user_id, endpoint;

Enfin, nous créons une politique d’agrégation continue et une politique de rétention. Les deux seront gérées par des workers en arrière-plan. La plupart du temps, tout fonctionne bien, mais si vous commencez à avoir beaucoup de politiques, vous risquez de manquer de workers en arrière-plan et vous verrez des messages d’erreur dans vos journaux. Dans ce cas, l’astuce est d’augmenter votre nombre de workers en arrière-plan dans /var/lib/postgresql/data/postgresql.conf.

La politique d’agrégation continue se chargera d’échantillonner régulièrement les données de api_calls et de les mettre dans api_calls_per_hour. La politique de rétention se chargera de supprimer les anciennes données de api_calls afin que vous ne manquiez jamais d’espace disque :

SELECT add_continuous_aggregate_policy('api_calls_per_hour',
  start_offset => INTERVAL '1 day',
  end_offset => INTERVAL '1 hour',
  schedule_interval => INTERVAL '1 minute',
  if_not_exists => TRUE);

SELECT add_retention_policy('api_calls', INTERVAL '90 days', if_not_exists => TRUE);

Comme vous pouvez le voir, ce n’était pas trop complexe.

Insertion de données

Dans votre application, vous pouvez maintenant vous connecter à votre base de données Timescale et insérer des requêtes.

Par exemple, voici comment vous pourriez le faire en Python :

import psycopg2

conn = psycopg2.connect(
  "host=timescaledb dbname={} user={} password={}".format("name", "user", "password"))
cur = conn.cursor()
cur.execute("INSERT INTO api_calls (time, user_id, endpoint) VALUES (%s, %s, %s)",
  (datetime.now(), "1584586", "/v1/gpu/bart-large-cnn/summarization"))
conn.commit()
cur.close()
conn.close()

Et maintenant en Go :

import (
  "github.com/jackc/pgx/v4"
  "github.com/jackc/pgx/v4/pgxpool"
)

func main(){
timescaledbURL := fmt.Sprintf("postgres://%s:%s@timescaledb:5432/%s", "user", "password", "name")
timescaledbDatabase, err := pgxpool.Connect(context.Background(), timescaledbURL)
if err != nil {
  log.Fatalf("Cannot connect to TimescaleDB database: %v. Stopping here.", err)
}

query := `INSERT into api_calls (time, user_id, endpoint) VALUES ($1, $2, $3)`
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  _, err := timescaledbDatabase.Exec(ctx, query, time.Now(), "1584586", "/v1/gpu/bart-large-cnn/summarization")
  if err != nil {
    log.Printf("Cannot insert metric in TimescaleDB: %v", err)
  }
}

Point important : vous ne voulez probablement pas ralentir les demandes de l’API utilisateur à cause d’un traitement potentiellement lent du côté de TimescaleDB. La solution consiste à insérer vos données de manière asynchrone, de sorte que la réponse de l’API utilisateur soit renvoyée même si les données ne sont pas encore insérées dans votre base de données. Mais cela dépasse le cadre de cet article.

Afin d’améliorer le débit, vous pouvez également insérer plusieurs requêtes API en une seule fois. L’idée est que vous devez d’abord mettre en cache certaines demandes en mémoire, puis en enregistrer plusieurs dans la base de données en une seule fois après un certain temps.

Visualisation des données

Il existe de nombreux outils de visualisation des données. J’aime Grafana parce qu’il est facile de le brancher sur TimescaleDB et que les possibilités de graphiques sont innombrables.

Voici un bon tutoriel sur la façon de configurer TimescaleDB avec Grafana : voir ici.

Grafana

Conclusion

TimescaleDB est un outil puissant pour les séries chronologiques, et c’est une excellente solution si vous voulez analyser correctement l’utilisation de votre API.

Comme vous pouvez le constater, la configuration et l’utilisation de TimescaleDB sont assez simples. Attention toutefois : TimescaleDB peut rapidement utiliser beaucoup de RAM, alors assurez-vous de garder cela à l’esprit avant de provisionner votre instance de serveur.

Si vous avez des questions, n’hésitez pas à les poser !

Also available in English | También existe en Español
Stocker les données de paiement Stripe en base de donnée

Il est difficile de savoir si les données de paiement Stripe doivent être stockées localement en base de donnée ou non. De nombreux développeurs se demandent quel type de données Stripe ils doivent enregistrer dans leur BDD. Ils peuvent même parfois être tentés de n’enregistrer aucune donnée localement et de seulement se baser sur des appels API Stripe.

Laissez-moi vous montrer comment nous traitons cette question chez NLP Cloud et pourquoi.

NLP Cloud est une API de traitement naturel du language (NLP) basée sur spaCy et HuggingFace Transformers afin de proposer de l’extraction d’entités (NER), analyse de sentiments, classification de texte, résumé de texte, génération de texte, et bien plus. Les clients sont prélevés mensuellement et les paiements sont gérés par Stripe. Il est important pour nous que l’API et le dashboard utilisateur soient rapides comment l’éclair. Nous voulons donc dépendre le moins possible d’appels à l’API de Stripe. Nous souhaitons par ailleurs ne pas trop dépendre d’une potentielle perte de donnée du côté de Stripe.

Scénario typique

Voici un scénario standard pour un service à abonnement comme le notre chez NLP Cloud :

  1. Un client s’inscrit sur votre site
  2. Vous enregistrez le client en BDD
  3. vous créez un client Stripe via l’API de Stripe
  4. Vous enregistrez l’ID client Stripe localement dans votre base

Vous ne pouvez pas vraiment faire mieux que cela.

Maintenant voilà la partie plus ardue.

Par exemple, il est probable que vous ayez besoin de garder une trace en local de l’abonnement Stripe de votre client, afin de lui donner accès à certaines fonctionnalités payantes, ou à plus de requêtes API, ou, s’il s’agit d’un utilisateur gratuit, de désactiver certaines fonctionnalités (par exemple). Certaines fois vous serez amené à mettre à jour un abonnement vous-même, mais d’autres fois ce sera à l’initiative de Stripe (par exemple dans le cas d’un paiement en échec plusieurs fois d’affilée, Stripe marquera l’abonnement comme canceled). Lorsqu’un abonnement est mis à jour par Stripe, ils vous le font savoir via un webhook. Si vous utilisez Stripe Portal, tout sera intégralement géré côté Stripe, et tout changement vous sera signalé via webhook.

Donc l’intégration Stripe est bi-directionnelle : parfois les modifications viennent de vous, parfois d’eux. Il est facile de finir avec des informations non synchronisées !

Considérations autour de la vitesse

On pourrait être tenté de déléguer autant d’information que possible à Stripe de façon à ce que Stripe soit la seule source de données. Dans une configuration de ce type, vous auriez besoin d’appeler l’API Stripe très fréquemment. C’est une mauvaise idée.

Par exemple, si vos données d’abonnement clients sont uniquement stockées chez Stripe, il vous faudra en premier effectuer un appel Stripe avant d’autoriser un client à accéder ou non à une fonctionnalité payante spécifique. Cela ajoute des millisecondes critiques au temps de réponse de votre site, ce qui n’est pas bon. Et si Stripe se met à ramer, votre site va se mettre à ramer aussi. Dans le cas d’une API, c’est tout simplement hors de question : vous ne pouvez pas ralentir un appel API parce que vous êtes dans l’attente d’une réponse de Stripe.

Considérations autour d’un plan de reprise d’activité

Déléguer les informations à Stripe sans copie des données en local est risqué. Même si Stripe est un acteur solide, vous ne serez jamais certain qu’ils ne risquent pas de perdre vos données.

Afin d’assurer une reprise d’activité en cas de potentiel incident, il est crucial de stocker les données des clients en local afin de pouvoir relancer le service quelque part ailleurs en cas d’incident grave, et ceci sans perdre une seule donnée d’abonnement client (ce qui serait un désastre).

Tout mettre en cache localement

La stratégie que nous suivons chez NLP Cloud est de tout mettre en cache localement. C’est plus simple qu’il n’y parait grace au fait que les SGBD modernes comme PostgreSQL savent facilement stocker le JSON sans presque aucun compromis en terme de performances.

En gros, voici ce que vous devrez faire si vous souhaitez suivre cette voie :

  1. Lorsque vous créez un client Stripe via leur API, sauvegardez la réponse JSON de Stripe au sein d’un champ JSON en BDD (avec PostgreSQL, utilisez le type JSONB)
  2. Lorsque vous créez un abonnement Stripe pour ce client, faite de même
  3. Lorsque vous souhaitez accéder aux informations de clients ou d’abonnements Stripe, contentez-vous de les chercher dans votre base de donnée

Voici un exemple d’INSERT dans un champ JSONB PostgreSQL :

CREATE TABLE customers (  
  id serial NOT NULL,
  stripe_customer jsonb
  stripe_subscription jsonb
);
INSERT INTO customers VALUES (1, '{id:1, ...}', '{id:1, ...}');

Et voici comment vous pouvez récupérer les données d’abonnement Stripe par exemple :

SELECT stripe_subscription->'id' AS id FROM customers;  

2 champs en BDD et c’est tout ! Nul besoin de créer toute une batterie de champs afin de stocker les nombreuses informations de clients et d’abonnements.

Rester synchro

Afin de vous assurer que votre cache local soit parfaitement synchronisé avec Stripe, vous devez correctement recevoir et traiter les webhooks de Stripe.

A chaque fois que vous recevez un webhook Stripe concernant un client ou un abonnement, mettez à jour les champs client et abonnement en BDD.

Si vous souhaitez être complètement tranquille, vous pouvez aussi vous préparer à un potentiel dysfonctionnement des webhooks. Dans ce cas, la meilleure stratégie est de pro-activement aller chercher les clients et les abonnements Stripe via l’API à intervalles réguliers, afin d’être certain de ne pas perdre d’infos.

Conclusion

Comme vous le voyez, il est relativement facile de mettre en place un système de cache Stripe qui soit simple et robuste. Cette stratégie épargne un temps de développement non négligeable; elle est rapide, sûre en cas de dysfonctionnement côté Stripe, et vous n’avez plus besoin de vous demander de quelles données Stripe vous avez besoin localement ou non.

J’espère que vous avez trouvé cela utile. Si vous avez des retours, ou si vous pensez à une meilleure stratégie, faites-le moi savoir !

Also available in English
API de Machine Learning NLP avec FastAPI et spaCy

FastAPI est un nouveau framework d’API pour Python qui est de plus en plus utilisé en production aujourd’hui. Nous utilisons FastAPI derrière NLP Cloud. NLP Cloud est une API basée sur spaCy et les transformers HuggingFace, afin de proposer de l’extraction d’entités (NER), de l’analyse de sentiments, de la classification de texte, du résumé de texte, et bien plus. FastAPI nous a aidé à rapidement construire une API de machine learning robuste et rapide afin de servir les modèles NLP.

Laissez-moi vous expliquer les raisons de ce choix, et vous montrer comment mettre en place FastAPI et spaCy pour de l’extraction d’entités.

Pourquoi FastAPI ?

Jusqu’à récemment, j’avais pour habitude d’utiliser Django Rest Framework pour mes APIs Python. Mais FastAPI propose plusieurs fonctionnalités intéressantes :

  • C’est très rapide
  • C’est bien documenté
  • C’est facile à utiliser
  • Cela génère automatiquement les schémas d’APIs pour vous (OpenAPI par exemple)
  • Cela utilise la validation de types avec Pydantic. Pour un développeur Go tel que moi, qui suis habitué au typage statique, c’est très sympa de pouvoir profiter du typage de cette façon. Cela rend le code plus propre, et moins sujet aux erreurs.

Les performances de FastAPI sont censées en faire un excellent candidat pour les APIs de machine learning. Étant donné que l’on sert un grand nombre de modèles NLP exigeants en termes de performances chez NLP Cloud, FastAPI est une très bonne solution.

Mettre en place FastAPI

La première option dont vous disposez est d’installer FastAPI et Uvicorn (le serveur ASGI en amont de FastAPI) par vous-même :

pip install fastapi[all]

Comme vous le voyez, FastAPI tourne derrière un serveur ASGI, ce qui signifie qu’il peut nativement supporter les requêtes Python asynchrones avec asyncio.

Puis vous n’avez qu’à démarrer votre app avec quelque chose comme ça :

uvicorn main:app

Une autre option est d’utiliser une des images Docker généreusement mises à disposition par Sebastián Ramírez, le créateur de FastAPI. Ces images sont maintenues et sont clé-en-main.

Par exemple, l’image Uvicorn + Gunicorn + FastAPI ajoute Gunicorn à la stack afin de gérer plusieurs processus Python en parallèle.

L’application est censée démarrer automatiquement via un docker run si vous avez correctement suivi la documentation de l’image.

Ces images sont personnalisables. Par exemple, vous pouvez modifier le nombre de processus créés par Gunicorn. Il est important de jouer sur ce type de paramètres en fonction des ressources que demande votre API. Si vous API sert un modèle de machine learning utilisant plusieurs gigas de mémoire, vous feriez bien de diminuer la concurrence par défaut de Gunicorn, sans cela votre application consommera vite trop de mémoire.

API de NER simple avec FastAPI et spaCy

Imaginons que vous souhaitiez créer un point d’accès API qui fasse de l’extraction d’entités (NER) avec spaCy. En gros, le but de la NER est d’extraire des entités telles que le nom, l’entreprise, le poste… à partir d’une phrase. Plus d’infos sur la NER ici si besoin.

Ce point d’accès prendra une phrase en entrée, et retournera une liste d’entités. Chaque entité est composée de la position du premier caractère de l’entité, la position du dernier caractère de l’entité, le type de l’entité, et le texte de l’entité lui-même.

Le point d’accès devra être contacté via des requêtes POST de cette façon :

curl "http://127.0.0.1/entities" \
  -X POST \
  -d '{"text":"John Doe is a Go Developer at Google"}'

Et il retournera quelque chose comme ce qui suit :

[
  {
    "end": 8,
    "start": 0,
    "text": "John Doe",
    "type": "PERSON"
  },
  {
    "end": 25,
    "start": 13,
    "text": "Go Developer",
    "type": "POSITION"
  },
  {
    "end": 35,
    "start": 30,
    "text": "Google",
    "type": "ORG"
  },
]

Voici comment s’y prendre :

import spacy
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List

en_core_web_lg = spacy.load("en_core_web_lg")

api = FastAPI()

class Input(BaseModel):
    sentence: str

class Extraction(BaseModel):
    first_index: int
    last_index: int
    name: str
    content: str

class Output(BaseModel):
    extractions: List[Extraction]

@api.post("/extractions", response_model=Output)
def extractions(input: Input):
    document = en_core_web_lg(input.sentence)

    extractions = []
    for entity in document.ents:
      extraction = {}
      extraction["first_index"] = entity.start_char
      extraction["last_index"] = entity.end_char
      extraction["name"] = entity.label_
      extraction["content"] = entity.text
      extractions.append(extraction)

    return {"extractions": extractions}

Le première chose ici est que nous chargeons le modèle spaCy. Dans notre exemple nous utilisons un modèle spaCy pré-entraîné pour l’anglais de taille “large”. Les modèles “large” prennent plus de mémoire et plus d’espace disque, mais ont une meilleure précision car ils ont été entraînés sur de plus grands jeux de données.

en_core_web_lg = spacy.load("en_core_web_lg")

Par la suite nous utilisons ce modèle spaCy pour de la NER en faisant ce qui suit.

document = en_core_web_lg(input.sentence)
# [...]
document.ents

La seconde chose, qui est une super fonctionnalité de FastAPI, est la possibilité de forcer la validation des données via Pydantic. En gros, vous devez déclarer à l’avance quel sera le format des données entrées par vos utilisateurs, et le format de ce que l’API leur retournera. Si vous êtes développeur Go, vous trouverez cela très similaire à l’unmarshalling JSON avec les structs. Par exemple, nous retournons le format d’une entité retournée de cette façon :

class Extraction(BaseModel):
    first_index: int
    last_index: int
    name: str
    content: str

Notez que start et end sont des positions dans la phrase, ce sont donc des entiers, et type et text sont des chaînes de caractères. Si l’API essaie de retourner une entité qui ne répond pas à ce format (par exemple, si start n’est pas un entier), FastAPI va soulever une erreur.

Comme vous pouvez le voir, il est possible d’inclure une classe de validation dans une autre. Ici nous retournons une liste d’entités, nous devons donc déclarer ce qui suit :

class Output(BaseModel):
    extractions: List[Extraction]

Certains types simples tels que int et str sont natifs, mais des types plus complexes tels que List doivent être explicitement importés.

Pour être plus concis, la validation de la réponse peut se fait via un décorateur :

@api.post("/extractions", response_model=Output)

Validation avancée

Vous pouvez effectuer de la validation plus avancée avec FastAPI et Pydantic. Par exemple, si vous souhaitez que l’entrée utilisateur ait une taille minimum de 10 caractères, vous pouvez faire ce qui suit :

from pydantic import BaseModel, constr

class UserRequestIn(BaseModel):
    text: constr(min_length=10)

Maintenant, un autre cas : que se passe-t-il si la validation via Pydantic passe, mais que vous réalisez plus tard qu’il y a un problème avec les données et donc que vous voulez retourner un code HTTP 400 ?

Il vous suffit de soulever une HTTPException :

from fastapi import HTTPException

raise HTTPException(
            status_code=400, detail="Your request is malformed")

Il s’agit juste de quelques exemples, mais vous pouvez faire bien plus ! Pour cela jetez un oeil à la documentation de FastAPI et de Pydantic.

Chemin racine

Il est très commun de faire tourner de telles API derrière un reverse proxy. Par exemple chez NLPCloud.io nous utilisons le reverse proxy Traefik pour cela.

Un point délicat lors que l’on utilise un reverse proxy est que notre sous-application (ici l’API) ne connait pas nécessairement le chemin URL entier. Et c’est en réalité une excellente chose car cela montre que votre API est faiblement couplée au reste de votre application.

Par exemple ici nous souhaitons que notre API croie que l’URL du point d’accès est /entities, mais en réalité la vraie URL peut être quelques chose comme /api/v1/entities. Voici comment s’y prendre en modifiant le chemin racine :

app = FastAPI(root_path="/api/v1")

Vous pouvez aussi arriver au même résultat en passant un paramètre supplémentaire à Uvicorn dans le cas où vous démarrez Uvicorn manuellement :

uvicorn main:app --root-path /api/v1

Conclusion

Comme vous avez pu le voir, créer une API avec FastAPI est simple comme bonjour, et la validation avec Pydantic rend le code très expressif (ce qui permet en retour d’écrire moins de documentation) et moins sujet aux erreurs.

FastAPI possède à la fois de très bonnes performances et la possibilité d’utiliser des requêtes asynchrones de façon native avec asyncio, ce qui est excellent pour les modèles de machine learning exigeants. L’exemple ci-dessus traitant de l’extraction d’entités avec spaCy et FastAPI peut presque être considéré comme prêt pour la production (bien sûr le code de l’API n’est qu’une petite partie dans le cadre d’une vraie application en cluster). Jusqu’à présent, FastAPI n’a jamais été l’élément limitant dans notre infrastructure NLPCloud.io.

Si vous avez des questions, n’hésitez pas !

Also available in English
Reverse Proxy Traefik avec Docker Compose et Docker Swarm

Mon dernier article à propos de Docker Swarm était le premier d’une série d’articles que je tenais à écrire à propos de la stack utilisée par NLP Cloud. NLP Cloud est une API qui se base sur spaCy et les transformers HuggingFace afin de proposer Named Entity Recognition (NER), analyse de sentiments, classification de texte, summarization, et bien plus. Un des challenges est que chaque modèle tourne au sein de son propre conteneur et que de nouveaux modèles sont régulièrement ajoutés au cluster. Nous avons donc besoin d’un reverse proxy qui soit à la fois performant et flexible en face de tous ces conteneurs.

La solution que nous avons choisie est Traefik.

Je me suis dit qu’il serait intéressant d’écrire un article sur la façon dont nous avons implémenté Traefik et pourquoi nous avons fait ce choix plutôt qu’un reverse proxy classique comme Nginx.

Pourquoi Traefik

Traefik est un reverse proxy encore relativement jeune comparé à Nginx ou Apache, mais l’outil gagne rapidement en popularité. Le principal avantage de Traefik est qu’il s’intègre parfaitement à Docker, Docker Compose, et Docker Swarm (et même Kubernetes et autre) : en gros, l’intégralité de votre configuration Traefik peut se place dans votre fichier docker-compose.yml, ce qui est fort pratique. Et, dès que vous ajoutez un nouveau service à votre cluster, Traefik les découvre à la volée sans avoir à être redémarré.

Donc Traefik améliore nettement la maintenabilité d’un cluster, et est un gros progrès du point de vue haute disponibilité.

L’outil est développé en Go alors que Nginx est codé en C, donc je suppose que cela crée une légère différence en termes de performance, mais rien que je n’aie pu concrètement constater. A mon avis la pénalité en termes de performances est négligeable en regard des avantages apportés.

En revanche, l’apprentissage de Traefik demande un certain temps et, bien que la documentation soit assez bonne, il est relativement facile de faire des erreurs de configuration sans en comprendre l’origine. Laissez-moi donc vous donner quelques exemples prêts à l’emploi ci-dessous.

Installer Traefik

Il n’y a, en gros, pas grand chose à faire ici. Traefik est juste une nouvelle image Docker à ajouter à votre cluster en tant que service dans votre docker-compose.yml:

version: '3.8'
services:
    traefik:
        image: traefik:v2.3

Il existe plusieurs façons d’intégrer Traefik mais, comme je le disais ci-dessus, nous allons opter pour l’intégration avec Docker Compose.

Configuration de base

90% de la congfiguration Traefik est effectuée via les labels Docker.

Imaginons que nous ayons 3 services :

Plus de détails au sujet des modèles NLP spaCy ici et FastAPI ici.

version: '3.8'
services:
    traefik:
        image: traefik:v2.4
        ports:
            - "80:80"
        command:
            - --providers.docker
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock:ro
    corporate:
        image: <your corporate image>
        labels:
            - traefik.http.routers.corporate.rule=Host(`localhost`)
    en_core_web_sm:
        image: <your en_core_web_sm model API image>
        labels:
            - traefik.http.routers.en_core_web_sm.rule=Host(`api.localhost`) && PathPrefix(`/en_core_web_sm`)
    en_core_web_lg:
        image: <your en_core_web_lg model API image>
        labels:
            - traefik.http.routers.en_core_web_lg.rule=Host(`api.localhost`) && PathPrefix(`/en_core_web_lg`)

Vous pouvez désormais accéder à votre site vitrine sur http://localhost, votre modèle en_core_web_sm sur http://api.localhost/en_core_web_sm, et votre modèle en_core_web_lg sur http://api.localhost/en_core_web_lg.

Comme vous pouvez le voir, c’est simple comme bonjour.

La configuration ci-dessus était pour notre configuration locale uniquement, donc nous voulons désormais faire de même pour la production dans un cluster Docker Swarm :

version: '3.8'
services:
    traefik:
        image: traefik:v2.4
        ports:
            - "80:80"
        command:
            - --providers.docker.swarmmode
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock:ro
        deploy:
            placement:
                constraints:
                    - node.role == manager
    corporate:
        image: <your corporate image>
        deploy:
            labels:
                - traefik.http.routers.corporate.rule=Host(`nlpcloud.io`)
    en_core_web_sm:
        image: <your en_core_web_sm model API image>
        deploy:
            labels:
                - traefik.http.services.en_core_web_sm.loadbalancer.server.port=80
                - traefik.http.routers.en_core_web_sm.rule=Host(`api.nlpcloud.io`) && PathPrefix(`/en_core_web_sm`)
    en_core_web_lg:
        image: <your en_core_web_lg model API image>
        deploy:
            labels:
                - traefik.http.services.en_core_web_lg.loadbalancer.server.port=80
                - traefik.http.routers.en_core_web_lg.rule=Host(`api.nlpcloud.io`) && PathPrefix(`/en_core_web_lg`)

Vous pouvez désormais accéder à votre site vitrine sur http://nlpcloud.io, votre modèle en_core_web_sm sur http://api.nlpcloud.io/en_core_web_sm, et votre modèle en_core_web_lg sur http://api.nlpcloud.io/en_core_web_lg.

C’est toujours relativement simple, mais il est important de noter les points suivants :

  • il faut explicitement utiliser le provider docker.swarmmode au lieu de docker,
  • les labels doivent désormais être placés dans la section deploy,
  • il nous faut déclarer manuellement le port de chaque service en utilisant la directive loadbalancer (cela doit être fait manuellement car Docker Swarm ne permet pas d’exposer automatiquement les ports),
  • nous devons nous assurer que Traefik sera déployé sur un noeud manager du Swarm en utilisant constraints.

Vous avez désormais un cluster parfaitement fonctionnel grâce à Docker Swarm et Traefik. Maintenant il est probable que vous ayez des exigences spécifiques à votre projet, et aucun doute que la documentation Trafik vous aidera. Mais laissez-moi vous montrer quelques fonctionnalités que nous utilisons chez NLP Cloud.

Déporter l’authentification

Disons que vos points d’accès à l’API NLP sont protégés et que les utilisateurs ont besoin d’un token pour y accéder. Une bonne solution pour ce cas d’usage est de mettre à profit Traefik ForwardAuth.

En deux mots, Traefik redirigera toutes les requêtes utilisateurs vers une page dédiée créée pour l’occasion. Cette page se chargera d’examiner les en-têtes de la requête (et peut-être extraire un token d’authentification par exemple) et de déterminer si l’utilisateur a le droit d’accéder à la ressource. Si c’est le cas, la page doit retourner un code HTTP 2XX.

Si un code 2XX est retourné, Traefik effectuera alors la vraie requête vers le point d’accès API final. Sinon, il retournera une erreur.

Veuillez noter que, pour des raisons de performance, Traefik redirige uniquement les en-têtes des requêtes vers votre page d’authentification, et non pas le corps de la requête. Il n’est donc pas possible d’autoriser une requête utilisateur à partir d’éléments situés dans le corps de la requête.

Voici comment s’y prendre :

version: '3.8'
services:
    traefik:
        image: traefik:v2.4
        ports:
            - "80:80"
        command:
            - --providers.docker.swarmmode
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock:ro
        deploy:
            placement:
                constraints:
                    - node.role == manager
    corporate:
        image: <your corporate image>
        deploy:
            labels:
                - traefik.http.routers.corporate.rule=Host(`nlpcloud.io`)
    en_core_web_sm:
        image: <your en_core_web_sm model API image>
        deploy:
            labels:
                - traefik.http.services.en_core_web_sm.loadbalancer.server.port=80
                - traefik.http.routers.en_core_web_sm.rule=Host(`api.nlpcloud.io`) && PathPrefix(`/en_core_web_sm`)
                - traefik.http.middlewares.forward_auth_api_en_core_web_sm.forwardauth.address=https://api.nlpcloud.io/auth/
                - traefik.http.routers.en_core_web_sm.middlewares=forward_auth_api_en_core_web_sm
    api_auth:
        image: <your api_auth image>
        deploy:
            labels:
                - traefik.http.services.en_core_web_sm.loadbalancer.server.port=80
                - traefik.http.routers.en_core_web_sm.rule=Host(`api.nlpcloud.io`) && PathPrefix(`/auth`)

Chez NLP Cloud, le service api_auth est en réalité une image Django + Django Rest Framework en charge de l’authentification des requêtes.

Pages d’erreur personnalisées

Peut-être que vous ne voulez pas afficher des pages d’erreurs Traefik brutes à vos utilisateurs. Dans ce cas, il est possible de remplacer ces pages d’erreur par défaut par des pages d’erreur personnalisées.

Traefik ne stocke pas de page d’erreur personnalisée en mémoire, mais il peut utiliser des pages d’erreur servies par l’un de vos services. Lorsque vous contactez votre service afin de récupérer la page d’erreur custom, Traefik passe aussi le code erreur HTTP comme argument positionnel, de façon à ce que vous puissiez afficher différentes pages d’erreur selon l’erreur HTTP initiale.

Imaginons que vous ayez un petit site statique servi par Nginx qui serve vos pages d’erreurs personnalisées. Vous voulez désormais utiliser ses pages d’erreur pour toutes les erreurs HTTP 400 à 599. Voici comment faire :

version: '3.8'
services:
    traefik:
        image: traefik:v2.4
        ports:
            - "80:80"
        command:
            - --providers.docker.swarmmode
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock:ro
        deploy:
            placement:
                constraints:
                    - node.role == manager
            labels:
                - traefik.http.middlewares.handle-http-error.errors.status=400-599
                - traefik.http.middlewares.handle-http-error.errors.service=errors_service
                - traefik.http.middlewares.handle-http-error.errors.query=/{status}.html
    corporate:
        image: <your corporate image>
        deploy:
            labels:
                - traefik.http.routers.corporate.rule=Host(`nlpcloud.io`)
                - traefik.http.routers.corporate.middlewares=handle-http-error
    errors_service:
        image: <your static website image>
        deploy:
            labels:
                - traefik.http.routers.corporate.rule=Host(`nlpcloud.io/errors`)

Par exemple, grâce à l’exemple ci-dessus, une page d’erreur 404 utiliserait désormais cette page : http://nlpcloud.io/errors/404.html

HTTPS

Une fonctionnalité sympa de Traefik est sa capacité à provisionner et utiliser automatiquement les certificats TLS Let’s Encrypt.

Il ont un excellent tuto sur la façon de mettre cela en place avec Docker donc je me contente de vous diriger vers la bonne ressource : https://doc.traefik.io/traefik/user-guides/docker-compose/acme-tls/

Réhausser la limite d’upload

La taille limite d’upload par défaut est assez basse pour des raisons de performance (je crois que c’est 4194304 octets mais je ne suis pas 100% certain parce que ce n’est pas dans la doc).

Afin de réhausser cette limite, il faut utiliser la directive maxRequestBodyBytes :

version: '3.8'
services:
    traefik:
        image: traefik:v2.4
        ports:
            - "80:80"
        command:
            - --providers.docker.swarmmode
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock:ro
        deploy:
            placement:
                constraints:
                    - node.role == manager
    corporate:
        image: <your corporate image>
        deploy:
            labels:
                - traefik.http.routers.corporate.rule=Host(`nlpcloud.io`)
                - traefik.http.middlewares.upload-limit.buffering.maxRequestBodyBytes=20000000
                - traefik.http.routers.backoffice.middlewares=upload-limit

Dans l’exemple ci-dessus, nous avons réhaussé la limite d’upload à 20Mo.

Mais n’oubliez pas que le fait d’uploader un gros fichier d’un coup n’est pas nécessairement la meilleure option. A la place il est plus intéressant de découper le fichier en morceaux et d’uploader chaque morceau indépendamment. J’écrirai peut-être un article sur le sujet à l’avenir.

Debugging

Il y a plusieurs options que vous pouvez activer afin de vous aider à débugger Traefik.

La première chose est d’activer le mode debug qui vous montrera des tonnes de trucs à propos de l’exécution de Traefik.

Deuxièmement, activez les logs d’accès afin de voir toutes les requêtes HTTP entrantes.

Enfin, Traefik fournit un dashboard natif très sympa qui aide à débugger votre configuration. Il est très utile car il est parfois ardu de comprendre pourquoi les choses ne fonctionnent pas.

Afin d’activer les fonctionnalités ci-dessus, faites ce qui suit :

version: '3.8'
services:
    traefik:
        image: traefik:v2.4
        ports:
            - "80:80"
        command:
            - --providers.docker.swarmmode
            - --log.level=DEBUG
            - --accesslog
            - --api.dashboard=true
        volumes:
            - /var/run/docker.sock:/var/run/docker.sock:ro
        deploy:
            placement:
                constraints:
                    - node.role == manager
            labels:
                - traefik.http.routers.dashboard.rule=Host(`dashboard.nlpcloud.io`)
                - traefik.http.routers.dashboard.service=api@internal
                - traefik.http.middlewares.auth.basicauth.users=<your basic auth user>.<your basic auth hashed password>.
                - traefik.http.routers.dashboard.middlewares=auth

Dans cet example, nous avons activé le debugging, les logs d’accès, et le dashboard qui est accessible à l’adresse http://dashboard.nlpcloud.io via une authentification basic auth.

Conclusion

Comme vous pouvez le voir, Traefik est parfaitement intégré à votre configuration Docker Compose. Si vous souhaitez changer la config d’un service, ou ajouter ou supprimer un service, il suffit de modifier votre docker-compose.yml, et de redéployer votre cluster Docker Swarm. Les nouveaux changement seront pris en compte, et les services qui n’ont pas été modifiés n’ont même pas besoin d’être redémarrés, ce qui est super question haute disponibilité.

Je vais continuer d’écrire quelques articles à propos de la stack que nous utilisons derrière NLP Cloud. Je pense que le prochain sera à propos de notre frontend et de comment nous utilisons HTMX au lieu de gros frameworks javascript.

Si vous avez des questions, n’hésitez pas !

Also available in English