consumer.js 4.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. const amqp = require('amqplib');
  2. const cfg = require('../config/cfg.json');
  3. const fs = require('fs-extra');
  4. const path = require('path');
  5. const crypto = require('crypto');
  6. const filePiper = require('../util/filePiper.js');
  7. const stlGltfConverter = require('../util/stlGltfConverter.js');
  8. const producer = require('./producer.js');
  9. const chp = require('child_process');
  10. async function run() {
  11. await producer.init(); // !IMPORTANT!
  12. let connection = await amqp.connect(cfg.rabbitmq.url);
  13. const channel = await connection.createChannel();
  14. channel.prefetch(cfg.rabbitmq.consumer.prefetch);
  15. await channel.assertQueue(cfg.rabbitmq.consumer.queue);
  16. console.log('-------------------------------\n',
  17. 'Consumer started with prefetch: ' + cfg.rabbitmq.consumer.prefetch,
  18. '\n-------------------------------\n')
  19. channel.consume(
  20. cfg.rabbitmq.consumer.queue, async (msg) => {
  21. if (msg !== null) {
  22. let json = JSON.parse(msg.content.toString());
  23. let fileID = crypto.randomBytes(5).toString("hex");
  24. let filename = fileID + '.' + json.origin;
  25. let fileTarget = fileID + '.' + json.target;
  26. console.log("Downloading file: " + filename + ' ...');
  27. try {
  28. await filePiper.pipeToQueue(json.url, filename);
  29. console.log("Downloading complete!");
  30. if (json.target === 'glb') {
  31. console.log('Converting to glb...')
  32. await stlGltfConverter.convert(filename);
  33. console.log('File ' + filename + ' cleared!');
  34. await fs.remove(path.join(__dirname, "../queue/", filename));
  35. }
  36. if (!json.transfer) {
  37. await uploadToEndpoint(json.targetURL, fileTarget);
  38. await fs.remove(path.join(__dirname, "../ready/", fileTarget));
  39. producer.sendResponseToViewer(json.author, 'system', 'Ваш файл был удачно обработан!')
  40. } else {
  41. if (json.transfer === 'glbCompressor') {
  42. // let cmd = 'gltf-transform simplify ' + path.join(__dirname, "../ready/", fileTarget)
  43. // + " " + path.join(__dirname, "../ready/", fileTarget) + " --ratio" +
  44. // chp.execSync()
  45. console.log("transfered file " + fileTarget);
  46. producer.transferToGlbCompressor(fileTarget, json.targetURL, json.author);
  47. }
  48. }
  49. console.log('Done!')
  50. channel.ack(msg);
  51. } catch (e) {
  52. console.log('\n!-- ERROR OCCURRED --!\n\n',
  53. e + '\n');
  54. try {
  55. await fs.remove(path.join(__dirname, "../queue/", filename));
  56. console.log('File ' + filename + ' cleared!');
  57. } catch (e) {/* ignore */
  58. }
  59. try {
  60. await fs.remove(path.join(__dirname, "../ready/", fileTarget));
  61. console.log('File ' + fileTarget + ' cleared!');
  62. } catch (e) {/* ignore */
  63. }
  64. if (msg.fields.redelivered) {
  65. console.log('\n!-- MESSAGE FAILED TWICE AND WILL BE DESTROYED --!\n')
  66. channel.nack(msg, false, false);
  67. } else {
  68. console.log('\n!-- MESSAGE WILL BE RETURNED TO THE QUEUE ONE MORE TIME --!\n')
  69. channel.nack(msg, false, true);
  70. }
  71. producer.sendResponseToViewer(json.author, 'system', 'Ошибка при обпаботке вашего файла!')
  72. }
  73. } else {
  74. console.log('Consumer cancelled by server');
  75. }
  76. });
  77. }
  78. async function uploadToEndpoint(endpoint, filename) {
  79. let data = Buffer.from(fs.readFileSync(path.join(__dirname, "../ready/", filename)));
  80. await fetch(endpoint, {
  81. method: 'PUT',
  82. headers: {'Content-Type': 'application/octet'},
  83. body: data
  84. })
  85. }
  86. module.exports = {
  87. run
  88. }