HAL module: consumes CloudEvents from AMQP and forwards as email notifications
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

79 Zeilen
2.2KB

  1. const amqp = require("amqplib");
  2. const nodemailer = require("nodemailer");
  3. const {
  4. AMQP_URL,
  5. AMQP_EXCHANGE,
  6. AMQP_QUEUE = "email-forwarder",
  7. SMTP_HOST,
  8. SMTP_PORT = "25",
  9. NOTIFY_TO,
  10. NOTIFY_FROM = "hal@novox.be",
  11. } = process.env;
  12. async function main() {
  13. console.log(`Connecting to AMQP: ${AMQP_URL?.replace(/\/\/.*@/, "//***@")}`);
  14. const conn = await amqp.connect(AMQP_URL);
  15. const ch = await conn.createChannel();
  16. await ch.assertQueue(AMQP_QUEUE, { durable: true });
  17. await ch.bindQueue(AMQP_QUEUE, AMQP_EXCHANGE, "");
  18. console.log(`Bound queue '${AMQP_QUEUE}' to exchange '${AMQP_EXCHANGE}'`);
  19. const transport = nodemailer.createTransport({
  20. host: SMTP_HOST,
  21. port: parseInt(SMTP_PORT),
  22. secure: false,
  23. tls: { rejectUnauthorized: false },
  24. });
  25. console.log(`SMTP: ${SMTP_HOST}:${SMTP_PORT}`);
  26. console.log(`Forwarding to: ${NOTIFY_TO}`);
  27. console.log("Waiting for messages...\n");
  28. ch.consume(AMQP_QUEUE, async (msg) => {
  29. if (!msg) return;
  30. try {
  31. const headers = {};
  32. for (const [k, v] of Object.entries(msg.properties.headers || {})) {
  33. headers[k] = Buffer.isBuffer(v) ? v.toString() : v;
  34. }
  35. const body = JSON.parse(msg.content.toString());
  36. const subjectType = headers.subjecttype || "unknown";
  37. const brandCode = body.brandCode || "unknown";
  38. const ref = body.transactionReference || "unknown";
  39. const subject = `[CloudEvent] ${subjectType} / ${brandCode} / ${ref}`;
  40. const html = `
  41. <h2>CloudEvent received</h2>
  42. <h3>Headers</h3>
  43. <pre>${JSON.stringify(headers, null, 2)}</pre>
  44. <h3>CDM Payload</h3>
  45. <pre>${JSON.stringify(body, null, 2)}</pre>
  46. <hr>
  47. <p><small>From: AH BE Task Connector → ${AMQP_EXCHANGE} → ${AMQP_QUEUE}</small></p>
  48. `;
  49. await transport.sendMail({
  50. from: NOTIFY_FROM,
  51. to: NOTIFY_TO,
  52. subject,
  53. html,
  54. });
  55. console.log(`Forwarded: ${subject} → ${NOTIFY_TO}`);
  56. ch.ack(msg);
  57. } catch (err) {
  58. console.error("Failed to process message:", err.message);
  59. ch.nack(msg, false, true);
  60. }
  61. });
  62. }
  63. main().catch((err) => {
  64. console.error("Fatal:", err);
  65. process.exit(1);
  66. });