Ingestão Dual (Evolution API) — RabbitMQ + Webhook em Paralelo
Este documento descreve, de forma detalhada e orientada ao código atual, como implementar um sistema de ingestão dual para eventos da Evolution API (WhatsApp), recebendo os mesmos eventos por RabbitMQ e por webhook HTTP em paralelo, com processamento idempotente, rastreabilidade, métricas e testes.
Objetivo
Garantir que todos os eventos enviados pela Evolution sejam capturados e processados mesmo quando um dos canais falhar, mantendo as regras:
- Falha individual: se o evento vier só por RabbitMQ, um worker consome, persiste no banco e encaminha para o webhook interno (
/api/internal/webhook) para executar as regras de negócio. Se vier só por webhook, o handler persiste e executa as regras. - Duplicação: se o mesmo evento vier pelos dois canais, apenas um deve “ganhar”. O outro não deve “morrer” por engano: se o evento ainda estiver em processamento, ele só faz skip; se já estiver processado com sucesso, aí sim ele pode descartar.
- Chave única do evento: gerar uma chave de deduplicação a partir de
(payload.data.key.id OU payload.data.keyId) + payload.data.status + payload.event(com validação via exemplos emdocs/evolution-events/). - Requisitos técnicos: logs detalhados, métricas por canal, tolerância a falhas com retry exponencial e testes unitários/integração cobrindo os 3 cenários.
Onde isso se encaixa no sistema atual
Estado atual (hoje)
- Webhook inbound Evolution: o endpoint central é:
POST /api/internal/webhook- Implementação: route.ts
- Ele processa
connection.update,logout.instance,messages.upsert(inbound) emessages.update(status), atualiza banco e dispara webhooks de cliente quando aplicável.
- Persistência do evento cru: o handler já salva qualquer payload em
EvolutionWebhookEvent(tabelaevolution_webhook_events) para reconciliação/debug.- Model: schema.prisma
- Worker atual: roda jobs BullMQ/Redis (envio de mensagens, reconciliação etc.).
- Bootstrap: apps/worker/src/index.ts
- RabbitMQ: não há cliente AMQP no repositório hoje; precisará ser adicionado ao
apps/worker.
Decisão de arquitetura
Para manter o Next.js (apps/fullstack) simples e evitar conexão AMQP longa no runtime web (escala horizontal/cold starts), o consumidor RabbitMQ deve rodar no apps/worker.
O webhook interno /api/internal/webhook continua sendo o processador único das regras de negócio, mas passa a suportar:
- deduplicação de processamento (idempotência real do lado “processador”)
- flag de origem/canal (
webhookvsrabbitmq) - flag
already_saved(para requests encaminhados pelo worker, evitando duplicar persistência do evento cru)
Escopo de eventos e filas
RabbitMQ (dados fornecidos)
RABBITMQ_URI=amqp://oismaelash:645453@rabbitmq.iaxp.cloud:5672/defaultRABBITMQ_EXCHANGE_NAME=evolution- Filas / routing keys para consumir:
evolution.connection.updateevolution.messages.updateevolution.messages.upsert
Webhook HTTP
- Endpoint:
/api/internal/webhook - O middleware do Next permite essa rota sem autenticação (hoje), por ser inbound do provedor.
Modelo de dados: como persistir e deduplicar com segurança
A deduplicação “só em Redis” não é suficiente para atrasos grandes entre canais (TTL expira, um evento tardio pode reprocessar). Por isso, a implementação usa 2 camadas:
- Redis
SET NX: controla estado de processamento (lock curto) e permite “skip em andamento”. - Banco (unique): registra estado final (processado ou não) e evita reprocessamento “tardio”.
Estados do evento (visão simples)
Para um mesmo dedupKey, o sistema considera 3 estados:
- NOVO: não existe “processed” no banco e não há lock de processamento.
- EM_PROCESSAMENTO: há lock no Redis (alguém está processando agora). Quem chegar depois deve skip e tentar mais tarde.
- PROCESSADO: existe registro no banco com
processedAtpreenchido (sucesso). Quem chegar depois pode descartar.
Gravar também um marcador de “done” no Redis com TTL longo para atalhar a consulta no DB, mas o DB continua sendo a fonte de verdade do sucesso.
Tabelas propostas
1) Manter evolution_webhook_events como log cru (sem unique)
Continuar salvando o payload bruto como hoje, mas adicionar metadados mínimos para rastreio por canal:
channel(WEBHOOK|RABBITMQ)dedupKey(string) para correlação e debugreceivedAt(timestamp do canal) se aplicável
Opção A (preferida): criar uma nova tabela para o “log cru por canal” e deixar evolution_webhook_events como está.
2) Criar tabela de “inbox” única para deduplicação e estado de processamento
Criar uma tabela nova (ex.: evolution_ingestion_events) para registrar apenas um registro por evento deduplicado:
Campos sugeridos:
id(cuid)dedupKey(string, unique)event(string)instance(string, nullable)channelFirstSeen(WEBHOOK|RABBITMQ)firstSeenAt(DateTime default now)lastSeenAt(DateTime updatedAt)seenCount(int)processedAt(DateTime nullable)forwardedToInternalWebhookAt(DateTime nullable)lastError(text nullable)
Motivos:
- Permite saber: “chegou por qual canal primeiro?”, “quantas vezes repetiu?”, “processou?”.
- Permite dedup forte via
UNIQUE(dedupKey). - Permite métricas e debugging sem depender de Redis.
Chave de deduplicação (dedupKey)
Regra solicitada
Gerar a chave com:
(payload.data.key.id OU payload.data.keyId) + payload.data.status + payload.event
Recomendação para evitar colisões
Na prática, para reduzir risco de colisão entre instâncias (mesmo keyId em instâncias diferentes), é recomendável prefixar instance:
dedupKey =${instance}|${event}|${status}|${key}`
Se você precisar seguir estritamente a regra sem instance, mantenha a regra original; mas o documento assume a versão com instance como “hardening” e registra isso como decisão explícita.
Extração dos campos (com base nos exemplos em docs/evolution-events)
Os exemplos em docs/evolution-events/ têm um wrapper (ex.: plataforma de logs de webhook). Nesses arquivos, o payload real da Evolution costuma estar em body (ou seja, o “payload” é input.body). Na aplicação real, o endpoint /api/internal/webhook recebe diretamente o payload da Evolution (sem wrapper).
messages.updatetempayload.data.keyIdepayload.data.status:messages.upserttempayload.data.key.ide geralmentepayload.data.status(ex.:SERVER_ACKem upsert de message sent):- Exemplo: message.sended.whatsapp-server.json
connection.updatenão temkeyId/key.id; para esse evento será necessário um fallback:- Sugestão:
dedupKey = ${instance}|connection.update|${data.state}|${data.statusReason ?? ""} - Exemplo: connection.update.close.json
- Sugestão:
Normalização e formato final
Para manter a chave segura para Redis e índice no DB:
key: string final extraída (data.key.idoudata.keyId) ou fallbackstatus:String(payload.data.status ?? "")event:String(payload.event ?? "")instance:String(payload.instance ?? "")dedupKey:sha256("${instance}|${event}|${status}|${key}")ou string direta
Usar sha256 para padronizar comprimento e evitar chaves longas ou com caracteres inesperados.
Fluxo: ingestão via Webhook HTTP
Entrada
Request real da Evolution chega como:
{
"event": "messages.update",
"instance": "ismael-vagas",
"data": { "keyId": "…", "status": "DELIVERY_ACK", "instanceId": "…", "messageId": "…" },
"date_time": "…"
}
Passos (webhook)
- Parse do JSON no handler
/api/internal/webhook. - Extrair
dedupKey(com fallback para eventos semkeyId). - Verificar se já foi processado com sucesso (fonte de verdade: DB inbox):
- Se
processedAtjá estiver preenchido para essededupKey: retornar200 { ok: true, skipped: "already_processed" }e parar.
- Se
- Tentar adquirir o lock “em processamento” no Redis:
SET NX ps:evolution:processing:${dedupKey} 1 EX <ttl>- TTL sugerido: 2–10 minutos (apenas para corrida/concorrrência).
- Se falhar: retornar
200 { ok: true, skipped: "in_processing" }e parar.
- Registrar/atualizar a inbox no DB (
evolution_ingestion_events) como “visto”:INSERT ... ON CONFLICT UPDATE lastSeenAt/seenCount
- Persistir evento cru (para auditoria/reconciliação), marcando
channel=WEBHOOKededupKey. - Executar regras de negócio existentes (o código atual em
route.ts). - Marcar
processedAtna inboxevolution_ingestion_events(sucesso). - Marcar “done” no Redis (opcional, TTL longo) e remover/expirar lock de processamento.
Por que “dedup no início” do webhook
Hoje, o webhook:
- salva evento cru em
evolutionWebhookEvent - processa e pode disparar webhooks de cliente
Se o RabbitMQ também encaminhar o mesmo evento para esse endpoint, sem dedup, você corre o risco de:
- disparar webhooks de cliente duplicados
- atualizar status/opt-ins/incoming messages em duplicidade (ou gerar logs duplicados)
Portanto, a deduplicação deve ocorrer antes das regras de negócio.
Fluxo: ingestão via RabbitMQ (worker)
Por que no worker
Manter a conexão AMQP e o consumo (prefetch/ack/nack/retry) é responsabilidade típica de um processo de background. O apps/worker já tem o ciclo de vida adequado e logger estruturado.
Passos (Rabbit consumer)
Para cada mensagem recebida em uma routing key evolution.*:
- Parse do
contentdo RabbitMQ para obter o JSON do evento. - Extrair
dedupKeycom a mesma função/algoritmo do webhook. - Verificar se já foi processado com sucesso (DB inbox):
- Se
processedAtjá estiver preenchido: ACK e descartar.
- Se
- Tentar adquirir lock “em processamento” no Redis:
SET NX ps:evolution:processing:${dedupKey} 1 EX <ttl>- Se falhar: não processar agora. A mensagem pode voltar para a fila (requeue/retry) e tentar novamente.
- Registrar/atualizar a inbox no DB (
evolution_ingestion_events) como “visto”. - Persistir evento cru em
evolutionWebhookEventcomchannel=RABBITMQededupKey. - Encaminhar para o webhook interno:
POST ${FULLSTACK_BASE_URL}/api/internal/webhook- Body:
{ ...payloadOriginal, already_saved: true, ingestion_source: "rabbitmq" }
- Se o POST responder 2xx: marcar
forwardedToInternalWebhookAteprocessedAte então ACK. - Se falhar:
- aplicar retry exponencial (ver seção de retry)
- não ACK até concluir o encaminhamento (ou mover para DLQ com rastreio)
- Em ambos os casos (sucesso/falha final): remover/expirar lock de processamento e, no sucesso, marcar “done” no Redis.
Observação crítica: idempotência no encaminhamento
Mesmo com dedup na ingestão RabbitMQ, o encaminhamento HTTP pode sofrer retries e gerar duplicação de execução no endpoint interno.
Por isso, o endpoint interno deve ser idempotente ao nível de “processamento de evento” (não só “persistência do evento cru”). A deduplicação por dedupKey deve acontecer antes de executar as regras de negócio, inclusive para requests com already_saved: true.
Alterações detalhadas no endpoint /api/internal/webhook
1) Aceitar metadados de ingestão
Adicionar suporte a campos opcionais no body:
already_saved?: booleaningestion_source?: "webhook" | "rabbitmq"
Esses campos são ignorados pela Evolution e só serão usados internamente.
2) Pular persistência do evento cru quando already_saved: true
Hoje o handler sempre executa:
prisma.evolutionWebhookEvent.create({ data: { … } })
Para evitar duplicar registros quando o worker RabbitMQ encaminhar para o mesmo endpoint, a regra será:
- Se
already_saved === true: não salvar emevolutionWebhookEvent - Caso contrário: salvar normalmente (
channel=WEBHOOK)
3) Deduplicar antes de regras de negócio (processamento idempotente)
Adicionar no início do POST:
- extrair
dedupKey - executar:
- Redis
SET NX(lock curto) - DB inbox
INSERT ... ON CONFLICT DO NOTHING
- Redis
Se a dedup indicar duplicado:
- retornar
200 { ok: true, dedup: true } - não executar regras de negócio (nem disparar webhooks para clientes)
4) (Recomendado) Proteger requests “internos” com segredo opcional
Como /api/internal/webhook é público por necessidade do provedor, mas o encaminhamento do worker adiciona already_saved: true, isso cria um vetor: alguém de fora poderia enviar already_saved: true e alterar semântica.
Mitigação recomendada:
- Se
already_saved === true, exigir header:x-internal-webhook-secret: ${INTERNAL_WEBHOOK_SECRET}
- Para requests “normais” (Evolution), não exigir esse header.
Isso preserva compatibilidade com Evolution e endurece o caminho Rabbit→Webhook.
Retry exponencial e tolerância a falhas
Webhook (HTTP inbound)
Para o canal webhook, o retry é responsabilidade do provedor (Evolution) ou de um gateway. Do lado do handler, focar em:
- responder rápido
- não falhar quando persistência do evento cru falha (já acontece hoje)
- logar erro e continuar quando possível
Reprocessamento de eventos “skipped” no webhook
Como o webhook HTTP não tem fila nativa, um skip: "in_processing" precisa de um mecanismo interno para garantir que eventos “pendurados” não fiquem para sempre sem processar caso o processador que “ganhou” falhe.
Proposta:
- Criar uma rotina periódica no
apps/worker(semelhante aos reconcilers já existentes) que:- busca na inbox (
evolution_ingestion_events) eventos comprocessedAt IS NULLelastSeenAtmais antigo que um limiar (ex.: > 2 minutos); - tenta adquirir o lock
ps:evolution:processing:${dedupKey}(caso não exista/tenha expirado); - reprocessa chamando o
/api/internal/webhookcom o payload persistido (do log cru) ealready_saved: true.
- busca na inbox (
Esse reprocessamento deve ser rate-limited (batch + backoff) para não sobrecarregar o sistema em incidentes.
RabbitMQ (consumer)
O consumer deve garantir “at-least-once” até encaminhar com sucesso para o endpoint interno. Estratégias:
Estratégia recomendada (RabbitMQ com retry via DLX/TTL)
Criar três níveis de retry por routing key:
- Fila principal:
evolution.messages.update - Fila retry1 (TTL curto):
evolution.messages.update.retry.1→ dead-letter para a principal - Fila retry2 (TTL médio)
- Fila retry3 (TTL longo)
- Fila DLQ final:
evolution.messages.update.dlq
Em falha de encaminhamento HTTP:
- publicar a mensagem na fila de retry apropriada com header
x-retry-count ACKna original para não travar consumo
Exponencial sugerido (exemplo):
- retry1: 5s
- retry2: 30s
- retry3: 3min
- depois: DLQ
Logs e rastreabilidade
Padrão do projeto
O projeto já usa logger estruturado (@pilot-status/shared), com child() para contexto.
Objetivo: cada evento deve gerar logs com:
dedupKeyevent,status,instancechannel(WEBHOOK/RABBITMQ)outcome(processed/skip_in_processing/skip_already_processed/forward_failed/parse_error)
Pontos de log
- Início da ingestão (por canal)
- Resultado da dedup (Redis e DB)
- Persistência do evento cru (ok/erro)
- Encaminhamento (Rabbit→Webhook): status HTTP, duração, erro
- Processamento final (marcar processedAt)
Métricas por canal
Hoje não há Prometheus/OTel integrado no runtime. Para cumprir o requisito com o stack atual, há duas opções:
Opção A (rápida, sem dependências): métricas em Redis
Registrar counters com INCR e TTL (ex.: 7 dias):
ps:metrics:evolution:webhook:receivedps:metrics:evolution:webhook:processedps:metrics:evolution:webhook:skip_in_processingps:metrics:evolution:webhook:skip_already_processedps:metrics:evolution:rabbitmq:receivedps:metrics:evolution:rabbitmq:forward_okps:metrics:evolution:rabbitmq:forward_failps:metrics:evolution:rabbitmq:skip_in_processingps:metrics:evolution:rabbitmq:skip_already_processed
E expor um endpoint interno (ou job) para ler esses contadores para observabilidade.
Plano detalhado de implementação (passo a passo)
Passo 1 — Adicionar dependência AMQP no worker
- Adicionar
amqplib(ou lib equivalente) emapps/worker/package.json. - Criar módulo
apps/worker/src/rabbitmq/*para conexão, consumo e retry.
Passo 2 — Função única de extração e dedupKey (shared)
Criar uma função reutilizável (para worker e fullstack) com assinatura:
extractEvolutionDedupFields(payload): { event, instance, status, key, dedupKey }
Regras:
keyvem dedata.key.idoudata.keyIdstatusvem dedata.status(ou fallback emconnection.update)eventvem depayload.eventinstancevem depayload.instance
Passo 3 — Migração Prisma: inbox table + colunas de canal no log cru
- Criar tabela
evolution_ingestion_eventscomUNIQUE(dedupKey). - Evoluir
evolution_webhook_eventscomdedup_keyechannel(ou criar tabela nova “raw per channel”).
Passo 4 — Ajustar /api/internal/webhook para idempotência e already_saved
Alterações principais:
- Dedup no topo do handler
already_savedcontrola persistência do log cru- (recomendado) exigir segredo quando
already_saved=true
Passo 5 — Implementar consumer RabbitMQ no worker
- Conectar no exchange
evolution - Consumir as 3 filas/routing keys
- Dedup (Redis + DB)
- Persistir evento cru (
channel=RABBITMQ) - Encaminhar para
/api/internal/webhookcomalready_saved:true - Retry exponencial no encaminhamento com DLX/TTL e DLQ
Passo 6 — Testes (unitário + integração)
Fullstack (webhook handler)
Adicionar testes cobrindo:
- Apenas webhook:
redis SET NXok + DB insert ok → processa regras e salva evento cru
- Duplicação:
- primeira chamada processa; segunda chamada (SET NX falha ou DB conflict) retorna ok e não executa regras
- Encaminhado pelo RabbitMQ:
already_saved:true→ não chamaevolutionWebhookEvent.createe ainda executa regras (uma vez)
Basear nos testes existentes em:
Worker (consumer RabbitMQ)
Adicionar testes cobrindo:
- Parse + extração de dedupKey com exemplos reais (
docs/evolution-events/*.json) - Cenário “só RabbitMQ”:
- consumer recebe → persiste → faz POST no webhook interno com
already_saved:true
- consumer recebe → persiste → faz POST no webhook interno com
- Cenário duplicado:
SET NXfalha → ACK sem encaminhar
- Retry:
- falha de POST → envia para retry queue (e não perde a mensagem)
Passo 7 — Operação / rollout seguro
Recomendação de rollout com feature flags:
DUAL_INGESTION_ENABLED=true|falseRABBITMQ_CONSUMER_ENABLED=true|falseRABBITMQ_FORWARD_TO_WEBHOOK_ENABLED=true|false
Estratégia:
- Ativar consumer RabbitMQ em modo “shadow” (persistir e contar, mas não encaminhar).
- Comparar métricas: eventos recebidos por canal, dedup drops, divergências.
- Ativar encaminhamento.
- Monitorar DLQ, latência de encaminhamento e volume de dedup.
Padrões de configuração (env vars)
No worker:
RABBITMQ_URIRABBITMQ_EXCHANGE_NAMEFULLSTACK_INTERNAL_WEBHOOK_URL(ex.:https://app.../api/internal/webhook)INTERNAL_WEBHOOK_SECRET(opcional, recomendado)
No fullstack:
INTERNAL_WEBHOOK_SECRET(para validar encaminhamentos comalready_saved:true)
Cenários esperados (matriz)
1) Apenas webhook
- Webhook recebe → dedup ok → persiste cru → processa regras → inbox processedAt
2) Apenas RabbitMQ
- Consumer recebe → dedup ok → persiste cru → encaminha → handler processa regras (sem salvar cru) → inbox processedAt
3) Duplicado (ambos)
Caso A: webhook chega primeiro
- webhook “ganha” → processa → rabbit chega depois →
SET NXfalha ou DB conflict → descarta
Caso B: rabbit chega primeiro
- rabbit “ganha” → persiste cru → encaminha → webhook interno processa → webhook da Evolution chega depois → dedup bloqueia → descarta
Pontos de atenção
connection.updateexige fallback de chave (não hákeyId).- Idempotência deve proteger também efeitos colaterais (disparo de webhooks de clientes).
- O endpoint
/api/internal/webhooké público por design; requests “internos” devem ser distinguíveis e opcionalmente autenticados quando usamalready_saved:true. evolutionWebhookEventhoje não tem unique; não confundir “log cru” com “inbox dedup”.
Referências no repositório
- Webhook interno Evolution (ponto central de processamento):
- route.ts
- Documento atual: webhook-interno-evolution.md
- Model Prisma do evento cru:
- Exemplos de payload Evolution: