12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- const amqp = require('amqplib');
- const cfg = require('../config/cfg.json');
- const fs = require('fs-extra');
- const path = require('path');
- const crypto = require('crypto');
- const filePiper = require('../util/filePiper.js');
- const stlGltfConverter = require('../util/stlGltfConverter.js');
- const producer = require('./producer.js');
- const chp = require('child_process');
- async function run() {
- await producer.init(); // !IMPORTANT!
- let connection = await amqp.connect(cfg.rabbitmq.url);
- const channel = await connection.createChannel();
- channel.prefetch(cfg.rabbitmq.consumer.prefetch);
- await channel.assertQueue(cfg.rabbitmq.consumer.queue);
- console.log('-------------------------------\n',
- 'Consumer started with prefetch: ' + cfg.rabbitmq.consumer.prefetch,
- '\n-------------------------------\n')
- channel.consume(
- cfg.rabbitmq.consumer.queue, async (msg) => {
- if (msg !== null) {
- let json = JSON.parse(msg.content.toString());
- let fileID = crypto.randomBytes(5).toString("hex");
- let filename = fileID + '.' + json.origin;
- let fileTarget = fileID + '.' + json.target;
- console.log("Downloading file: " + filename + ' ...');
- try {
- await filePiper.pipeToQueue(json.url, filename);
- console.log("Downloading complete!");
- if (json.target === 'glb') {
- console.log('Converting to glb...')
- await stlGltfConverter.convert(filename);
- console.log('File ' + filename + ' cleared!');
- await fs.remove(path.join(__dirname, "../queue/", filename));
- }
- if (!json.transfer) {
- await uploadToEndpoint(json.targetURL, fileTarget);
- await fs.remove(path.join(__dirname, "../ready/", fileTarget));
- producer.sendResponseToViewer(json.author, 'system', 'Ваш файл был удачно обработан!')
- } else {
- if (json.transfer === 'glbCompressor') {
- // let cmd = 'gltf-transform simplify ' + path.join(__dirname, "../ready/", fileTarget)
- // + " " + path.join(__dirname, "../ready/", fileTarget) + " --ratio" +
- // chp.execSync()
- console.log("transfered file " + fileTarget);
- producer.transferToGlbCompressor(fileTarget, json.targetURL, json.author);
- }
- }
- console.log('Done!')
- channel.ack(msg);
- } catch (e) {
- console.log('\n!-- ERROR OCCURRED --!\n\n',
- e + '\n');
- try {
- await fs.remove(path.join(__dirname, "../queue/", filename));
- console.log('File ' + filename + ' cleared!');
- } catch (e) {/* ignore */
- }
- try {
- await fs.remove(path.join(__dirname, "../ready/", fileTarget));
- console.log('File ' + fileTarget + ' cleared!');
- } catch (e) {/* ignore */
- }
- if (msg.fields.redelivered) {
- console.log('\n!-- MESSAGE FAILED TWICE AND WILL BE DESTROYED --!\n')
- channel.nack(msg, false, false);
- } else {
- console.log('\n!-- MESSAGE WILL BE RETURNED TO THE QUEUE ONE MORE TIME --!\n')
- channel.nack(msg, false, true);
- }
- producer.sendResponseToViewer(json.author, 'system', 'Ошибка при обпаботке вашего файла!')
- }
- } else {
- console.log('Consumer cancelled by server');
- }
- });
- }
- async function uploadToEndpoint(endpoint, filename) {
- let data = Buffer.from(fs.readFileSync(path.join(__dirname, "../ready/", filename)));
- await fetch(endpoint, {
- method: 'PUT',
- headers: {'Content-Type': 'application/octet'},
- body: data
- })
- }
- module.exports = {
- run
- }
|