Documentação / Ingestão Dual (Evolution API) — RabbitMQ + Webhook em Paralelo

Ingestão Dual (Evolution API) — RabbitMQ + Webhook em Paralelo

Entrar

Ingestão Dual (Evolution API) — RabbitMQ + Webhook em Paralelo

Este documento descreve, de forma detalhada e orientada ao código atual, como implementar um sistema de ingestão dual para eventos da Evolution API (WhatsApp), recebendo os mesmos eventos por RabbitMQ e por webhook HTTP em paralelo, com processamento idempotente, rastreabilidade, métricas e testes.

Objetivo

Garantir que todos os eventos enviados pela Evolution sejam capturados e processados mesmo quando um dos canais falhar, mantendo as regras:

  1. Falha individual: se o evento vier só por RabbitMQ, um worker consome, persiste no banco e encaminha para o webhook interno (/api/internal/webhook) para executar as regras de negócio. Se vier só por webhook, o handler persiste e executa as regras.
  2. Duplicação: se o mesmo evento vier pelos dois canais, apenas um deve “ganhar”. O outro não deve “morrer” por engano: se o evento ainda estiver em processamento, ele só faz skip; se já estiver processado com sucesso, aí sim ele pode descartar.
  3. Chave única do evento: gerar uma chave de deduplicação a partir de (payload.data.key.id OU payload.data.keyId) + payload.data.status + payload.event (com validação via exemplos em docs/evolution-events/).
  4. Requisitos técnicos: logs detalhados, métricas por canal, tolerância a falhas com retry exponencial e testes unitários/integração cobrindo os 3 cenários.

Onde isso se encaixa no sistema atual

Estado atual (hoje)

  • Webhook inbound Evolution: o endpoint central é:
    • POST /api/internal/webhook
    • Implementação: route.ts
    • Ele processa connection.update, logout.instance, messages.upsert (inbound) e messages.update (status), atualiza banco e dispara webhooks de cliente quando aplicável.
  • Persistência do evento cru: o handler já salva qualquer payload em EvolutionWebhookEvent (tabela evolution_webhook_events) para reconciliação/debug.
  • Worker atual: roda jobs BullMQ/Redis (envio de mensagens, reconciliação etc.).
  • RabbitMQ: não há cliente AMQP no repositório hoje; precisará ser adicionado ao apps/worker.

Decisão de arquitetura

Para manter o Next.js (apps/fullstack) simples e evitar conexão AMQP longa no runtime web (escala horizontal/cold starts), o consumidor RabbitMQ deve rodar no apps/worker.

O webhook interno /api/internal/webhook continua sendo o processador único das regras de negócio, mas passa a suportar:

  • deduplicação de processamento (idempotência real do lado “processador”)
  • flag de origem/canal (webhook vs rabbitmq)
  • flag already_saved (para requests encaminhados pelo worker, evitando duplicar persistência do evento cru)

Escopo de eventos e filas

RabbitMQ (dados fornecidos)

  • RABBITMQ_URI=amqp://oismaelash:645453@rabbitmq.iaxp.cloud:5672/default
  • RABBITMQ_EXCHANGE_NAME=evolution
  • Filas / routing keys para consumir:
    • evolution.connection.update
    • evolution.messages.update
    • evolution.messages.upsert

Webhook HTTP

  • Endpoint: /api/internal/webhook
  • O middleware do Next permite essa rota sem autenticação (hoje), por ser inbound do provedor.

Modelo de dados: como persistir e deduplicar com segurança

A deduplicação “só em Redis” não é suficiente para atrasos grandes entre canais (TTL expira, um evento tardio pode reprocessar). Por isso, a implementação usa 2 camadas:

  1. Redis SET NX: controla estado de processamento (lock curto) e permite “skip em andamento”.
  2. Banco (unique): registra estado final (processado ou não) e evita reprocessamento “tardio”.

Estados do evento (visão simples)

Para um mesmo dedupKey, o sistema considera 3 estados:

  • NOVO: não existe “processed” no banco e não há lock de processamento.
  • EM_PROCESSAMENTO: há lock no Redis (alguém está processando agora). Quem chegar depois deve skip e tentar mais tarde.
  • PROCESSADO: existe registro no banco com processedAt preenchido (sucesso). Quem chegar depois pode descartar.

Gravar também um marcador de “done” no Redis com TTL longo para atalhar a consulta no DB, mas o DB continua sendo a fonte de verdade do sucesso.

Tabelas propostas

1) Manter evolution_webhook_events como log cru (sem unique)

Continuar salvando o payload bruto como hoje, mas adicionar metadados mínimos para rastreio por canal:

  • channel (WEBHOOK | RABBITMQ)
  • dedupKey (string) para correlação e debug
  • receivedAt (timestamp do canal) se aplicável

Opção A (preferida): criar uma nova tabela para o “log cru por canal” e deixar evolution_webhook_events como está.

2) Criar tabela de “inbox” única para deduplicação e estado de processamento

Criar uma tabela nova (ex.: evolution_ingestion_events) para registrar apenas um registro por evento deduplicado:

Campos sugeridos:

  • id (cuid)
  • dedupKey (string, unique)
  • event (string)
  • instance (string, nullable)
  • channelFirstSeen (WEBHOOK | RABBITMQ)
  • firstSeenAt (DateTime default now)
  • lastSeenAt (DateTime updatedAt)
  • seenCount (int)
  • processedAt (DateTime nullable)
  • forwardedToInternalWebhookAt (DateTime nullable)
  • lastError (text nullable)

Motivos:

  • Permite saber: “chegou por qual canal primeiro?”, “quantas vezes repetiu?”, “processou?”.
  • Permite dedup forte via UNIQUE(dedupKey).
  • Permite métricas e debugging sem depender de Redis.

Chave de deduplicação (dedupKey)

Regra solicitada

Gerar a chave com:

  • (payload.data.key.id OU payload.data.keyId) + payload.data.status + payload.event

Recomendação para evitar colisões

Na prática, para reduzir risco de colisão entre instâncias (mesmo keyId em instâncias diferentes), é recomendável prefixar instance:

  • dedupKey = ${instance}|${event}|${status}|${key}`

Se você precisar seguir estritamente a regra sem instance, mantenha a regra original; mas o documento assume a versão com instance como “hardening” e registra isso como decisão explícita.

Extração dos campos (com base nos exemplos em docs/evolution-events)

Os exemplos em docs/evolution-events/ têm um wrapper (ex.: plataforma de logs de webhook). Nesses arquivos, o payload real da Evolution costuma estar em body (ou seja, o “payload” é input.body). Na aplicação real, o endpoint /api/internal/webhook recebe diretamente o payload da Evolution (sem wrapper).

Normalização e formato final

Para manter a chave segura para Redis e índice no DB:

  • key: string final extraída (data.key.id ou data.keyId) ou fallback
  • status: String(payload.data.status ?? "")
  • event: String(payload.event ?? "")
  • instance: String(payload.instance ?? "")
  • dedupKey: sha256("${instance}|${event}|${status}|${key}") ou string direta

Usar sha256 para padronizar comprimento e evitar chaves longas ou com caracteres inesperados.

Fluxo: ingestão via Webhook HTTP

Entrada

Request real da Evolution chega como:

{
  "event": "messages.update",
  "instance": "ismael-vagas",
  "data": { "keyId": "…", "status": "DELIVERY_ACK", "instanceId": "…", "messageId": "…" },
  "date_time": "…"
}

Passos (webhook)

  1. Parse do JSON no handler /api/internal/webhook.
  2. Extrair dedupKey (com fallback para eventos sem keyId).
  3. Verificar se já foi processado com sucesso (fonte de verdade: DB inbox):
    • Se processedAt já estiver preenchido para esse dedupKey: retornar 200 { ok: true, skipped: "already_processed" } e parar.
  4. Tentar adquirir o lock “em processamento” no Redis:
    • SET NX ps:evolution:processing:${dedupKey} 1 EX <ttl>
    • TTL sugerido: 2–10 minutos (apenas para corrida/concorrrência).
    • Se falhar: retornar 200 { ok: true, skipped: "in_processing" } e parar.
  5. Registrar/atualizar a inbox no DB (evolution_ingestion_events) como “visto”:
    • INSERT ... ON CONFLICT UPDATE lastSeenAt/seenCount
  6. Persistir evento cru (para auditoria/reconciliação), marcando channel=WEBHOOK e dedupKey.
  7. Executar regras de negócio existentes (o código atual em route.ts).
  8. Marcar processedAt na inbox evolution_ingestion_events (sucesso).
  9. Marcar “done” no Redis (opcional, TTL longo) e remover/expirar lock de processamento.

Por que “dedup no início” do webhook

Hoje, o webhook:

  • salva evento cru em evolutionWebhookEvent
  • processa e pode disparar webhooks de cliente

Se o RabbitMQ também encaminhar o mesmo evento para esse endpoint, sem dedup, você corre o risco de:

  • disparar webhooks de cliente duplicados
  • atualizar status/opt-ins/incoming messages em duplicidade (ou gerar logs duplicados)

Portanto, a deduplicação deve ocorrer antes das regras de negócio.

Fluxo: ingestão via RabbitMQ (worker)

Por que no worker

Manter a conexão AMQP e o consumo (prefetch/ack/nack/retry) é responsabilidade típica de um processo de background. O apps/worker já tem o ciclo de vida adequado e logger estruturado.

Passos (Rabbit consumer)

Para cada mensagem recebida em uma routing key evolution.*:

  1. Parse do content do RabbitMQ para obter o JSON do evento.
  2. Extrair dedupKey com a mesma função/algoritmo do webhook.
  3. Verificar se já foi processado com sucesso (DB inbox):
    • Se processedAt já estiver preenchido: ACK e descartar.
  4. Tentar adquirir lock “em processamento” no Redis:
    • SET NX ps:evolution:processing:${dedupKey} 1 EX <ttl>
    • Se falhar: não processar agora. A mensagem pode voltar para a fila (requeue/retry) e tentar novamente.
  5. Registrar/atualizar a inbox no DB (evolution_ingestion_events) como “visto”.
  6. Persistir evento cru em evolutionWebhookEvent com channel=RABBITMQ e dedupKey.
  7. Encaminhar para o webhook interno:
    • POST ${FULLSTACK_BASE_URL}/api/internal/webhook
    • Body: { ...payloadOriginal, already_saved: true, ingestion_source: "rabbitmq" }
  8. Se o POST responder 2xx: marcar forwardedToInternalWebhookAt e processedAt e então ACK.
  9. Se falhar:
    • aplicar retry exponencial (ver seção de retry)
    • não ACK até concluir o encaminhamento (ou mover para DLQ com rastreio)
  10. Em ambos os casos (sucesso/falha final): remover/expirar lock de processamento e, no sucesso, marcar “done” no Redis.

Observação crítica: idempotência no encaminhamento

Mesmo com dedup na ingestão RabbitMQ, o encaminhamento HTTP pode sofrer retries e gerar duplicação de execução no endpoint interno.

Por isso, o endpoint interno deve ser idempotente ao nível de “processamento de evento” (não só “persistência do evento cru”). A deduplicação por dedupKey deve acontecer antes de executar as regras de negócio, inclusive para requests com already_saved: true.

Alterações detalhadas no endpoint /api/internal/webhook

1) Aceitar metadados de ingestão

Adicionar suporte a campos opcionais no body:

  • already_saved?: boolean
  • ingestion_source?: "webhook" | "rabbitmq"

Esses campos são ignorados pela Evolution e só serão usados internamente.

2) Pular persistência do evento cru quando already_saved: true

Hoje o handler sempre executa:

  • prisma.evolutionWebhookEvent.create({ data: { … } })

Para evitar duplicar registros quando o worker RabbitMQ encaminhar para o mesmo endpoint, a regra será:

  • Se already_saved === true: não salvar em evolutionWebhookEvent
  • Caso contrário: salvar normalmente (channel=WEBHOOK)

3) Deduplicar antes de regras de negócio (processamento idempotente)

Adicionar no início do POST:

  • extrair dedupKey
  • executar:
    • Redis SET NX (lock curto)
    • DB inbox INSERT ... ON CONFLICT DO NOTHING

Se a dedup indicar duplicado:

  • retornar 200 { ok: true, dedup: true }
  • não executar regras de negócio (nem disparar webhooks para clientes)

4) (Recomendado) Proteger requests “internos” com segredo opcional

Como /api/internal/webhook é público por necessidade do provedor, mas o encaminhamento do worker adiciona already_saved: true, isso cria um vetor: alguém de fora poderia enviar already_saved: true e alterar semântica.

Mitigação recomendada:

  • Se already_saved === true, exigir header:
    • x-internal-webhook-secret: ${INTERNAL_WEBHOOK_SECRET}
  • Para requests “normais” (Evolution), não exigir esse header.

Isso preserva compatibilidade com Evolution e endurece o caminho Rabbit→Webhook.

Retry exponencial e tolerância a falhas

Webhook (HTTP inbound)

Para o canal webhook, o retry é responsabilidade do provedor (Evolution) ou de um gateway. Do lado do handler, focar em:

  • responder rápido
  • não falhar quando persistência do evento cru falha (já acontece hoje)
  • logar erro e continuar quando possível

Reprocessamento de eventos “skipped” no webhook

Como o webhook HTTP não tem fila nativa, um skip: "in_processing" precisa de um mecanismo interno para garantir que eventos “pendurados” não fiquem para sempre sem processar caso o processador que “ganhou” falhe.

Proposta:

  • Criar uma rotina periódica no apps/worker (semelhante aos reconcilers já existentes) que:
    • busca na inbox (evolution_ingestion_events) eventos com processedAt IS NULL e lastSeenAt mais antigo que um limiar (ex.: > 2 minutos);
    • tenta adquirir o lock ps:evolution:processing:${dedupKey} (caso não exista/tenha expirado);
    • reprocessa chamando o /api/internal/webhook com o payload persistido (do log cru) e already_saved: true.

Esse reprocessamento deve ser rate-limited (batch + backoff) para não sobrecarregar o sistema em incidentes.

RabbitMQ (consumer)

O consumer deve garantir “at-least-once” até encaminhar com sucesso para o endpoint interno. Estratégias:

Estratégia recomendada (RabbitMQ com retry via DLX/TTL)

Criar três níveis de retry por routing key:

  • Fila principal: evolution.messages.update
  • Fila retry1 (TTL curto): evolution.messages.update.retry.1 → dead-letter para a principal
  • Fila retry2 (TTL médio)
  • Fila retry3 (TTL longo)
  • Fila DLQ final: evolution.messages.update.dlq

Em falha de encaminhamento HTTP:

  • publicar a mensagem na fila de retry apropriada com header x-retry-count
  • ACK na original para não travar consumo

Exponencial sugerido (exemplo):

  • retry1: 5s
  • retry2: 30s
  • retry3: 3min
  • depois: DLQ

Logs e rastreabilidade

Padrão do projeto

O projeto já usa logger estruturado (@pilot-status/shared), com child() para contexto.

Objetivo: cada evento deve gerar logs com:

  • dedupKey
  • event, status, instance
  • channel (WEBHOOK/RABBITMQ)
  • outcome (processed/skip_in_processing/skip_already_processed/forward_failed/parse_error)

Pontos de log

  • Início da ingestão (por canal)
  • Resultado da dedup (Redis e DB)
  • Persistência do evento cru (ok/erro)
  • Encaminhamento (Rabbit→Webhook): status HTTP, duração, erro
  • Processamento final (marcar processedAt)

Métricas por canal

Hoje não há Prometheus/OTel integrado no runtime. Para cumprir o requisito com o stack atual, há duas opções:

Opção A (rápida, sem dependências): métricas em Redis

Registrar counters com INCR e TTL (ex.: 7 dias):

  • ps:metrics:evolution:webhook:received
  • ps:metrics:evolution:webhook:processed
  • ps:metrics:evolution:webhook:skip_in_processing
  • ps:metrics:evolution:webhook:skip_already_processed
  • ps:metrics:evolution:rabbitmq:received
  • ps:metrics:evolution:rabbitmq:forward_ok
  • ps:metrics:evolution:rabbitmq:forward_fail
  • ps:metrics:evolution:rabbitmq:skip_in_processing
  • ps:metrics:evolution:rabbitmq:skip_already_processed

E expor um endpoint interno (ou job) para ler esses contadores para observabilidade.

Plano detalhado de implementação (passo a passo)

Passo 1 — Adicionar dependência AMQP no worker

  • Adicionar amqplib (ou lib equivalente) em apps/worker/package.json.
  • Criar módulo apps/worker/src/rabbitmq/* para conexão, consumo e retry.

Passo 2 — Função única de extração e dedupKey (shared)

Criar uma função reutilizável (para worker e fullstack) com assinatura:

  • extractEvolutionDedupFields(payload): { event, instance, status, key, dedupKey }

Regras:

  • key vem de data.key.id ou data.keyId
  • status vem de data.status (ou fallback em connection.update)
  • event vem de payload.event
  • instance vem de payload.instance

Passo 3 — Migração Prisma: inbox table + colunas de canal no log cru

  • Criar tabela evolution_ingestion_events com UNIQUE(dedupKey).
  • Evoluir evolution_webhook_events com dedup_key e channel (ou criar tabela nova “raw per channel”).

Passo 4 — Ajustar /api/internal/webhook para idempotência e already_saved

Alterações principais:

  • Dedup no topo do handler
  • already_saved controla persistência do log cru
  • (recomendado) exigir segredo quando already_saved=true

Passo 5 — Implementar consumer RabbitMQ no worker

  • Conectar no exchange evolution
  • Consumir as 3 filas/routing keys
  • Dedup (Redis + DB)
  • Persistir evento cru (channel=RABBITMQ)
  • Encaminhar para /api/internal/webhook com already_saved:true
  • Retry exponencial no encaminhamento com DLX/TTL e DLQ

Passo 6 — Testes (unitário + integração)

Fullstack (webhook handler)

Adicionar testes cobrindo:

  • Apenas webhook:
    • redis SET NX ok + DB insert ok → processa regras e salva evento cru
  • Duplicação:
    • primeira chamada processa; segunda chamada (SET NX falha ou DB conflict) retorna ok e não executa regras
  • Encaminhado pelo RabbitMQ:
    • already_saved:true → não chama evolutionWebhookEvent.create e ainda executa regras (uma vez)

Basear nos testes existentes em:

Worker (consumer RabbitMQ)

Adicionar testes cobrindo:

  • Parse + extração de dedupKey com exemplos reais (docs/evolution-events/*.json)
  • Cenário “só RabbitMQ”:
    • consumer recebe → persiste → faz POST no webhook interno com already_saved:true
  • Cenário duplicado:
    • SET NX falha → ACK sem encaminhar
  • Retry:
    • falha de POST → envia para retry queue (e não perde a mensagem)

Passo 7 — Operação / rollout seguro

Recomendação de rollout com feature flags:

  • DUAL_INGESTION_ENABLED=true|false
  • RABBITMQ_CONSUMER_ENABLED=true|false
  • RABBITMQ_FORWARD_TO_WEBHOOK_ENABLED=true|false

Estratégia:

  1. Ativar consumer RabbitMQ em modo “shadow” (persistir e contar, mas não encaminhar).
  2. Comparar métricas: eventos recebidos por canal, dedup drops, divergências.
  3. Ativar encaminhamento.
  4. Monitorar DLQ, latência de encaminhamento e volume de dedup.

Padrões de configuração (env vars)

No worker:

  • RABBITMQ_URI
  • RABBITMQ_EXCHANGE_NAME
  • FULLSTACK_INTERNAL_WEBHOOK_URL (ex.: https://app.../api/internal/webhook)
  • INTERNAL_WEBHOOK_SECRET (opcional, recomendado)

No fullstack:

  • INTERNAL_WEBHOOK_SECRET (para validar encaminhamentos com already_saved:true)

Cenários esperados (matriz)

1) Apenas webhook

  • Webhook recebe → dedup ok → persiste cru → processa regras → inbox processedAt

2) Apenas RabbitMQ

  • Consumer recebe → dedup ok → persiste cru → encaminha → handler processa regras (sem salvar cru) → inbox processedAt

3) Duplicado (ambos)

Caso A: webhook chega primeiro

  • webhook “ganha” → processa → rabbit chega depois → SET NX falha ou DB conflict → descarta

Caso B: rabbit chega primeiro

  • rabbit “ganha” → persiste cru → encaminha → webhook interno processa → webhook da Evolution chega depois → dedup bloqueia → descarta

Pontos de atenção

  • connection.update exige fallback de chave (não há keyId).
  • Idempotência deve proteger também efeitos colaterais (disparo de webhooks de clientes).
  • O endpoint /api/internal/webhook é público por design; requests “internos” devem ser distinguíveis e opcionalmente autenticados quando usam already_saved:true.
  • evolutionWebhookEvent hoje não tem unique; não confundir “log cru” com “inbox dedup”.

Referências no repositório