message_sender.js

// vim: ts=4:sw=4:expandtab

const Attachment = require('./attachment');
const OutgoingMessage = require('./outgoing_message');
const crypto = require('./crypto');
const eventing = require('./eventing');
const exchange = require('./exchange');
const hub = require('./hub');
const libsignal = require('libsignal');
const node_crypto = require('crypto');
const protobufs = require('./protobufs');
const queueAsync = require('./queue_async');
const storage = require('./storage');
const util = require('./util');
const uuid4 = require('uuid/v4');


/**
 * @event MessageSender#error
 * @type {module:eventing~Event}
 * @property {Error} error
 */

/**
 * @event MessageSender#keychange
 * @type {module:eventing~KeyChangeEvent}
 */

/**
 * @typedef {Object} Action
 * @property {string} title - Text to be displayed in the recipients client.
 * @property {string} action - The identifier included in responses.
 * @property {string} [color]
 */

/**
 * @typedef {Object} ActionOptions
 * @since 5.2.1
 * @property {boolean} allowMultiple=false - Enable to prevent the action buttons
 *                                           from disabling after use.
 */

/**
 * @typedef {Object} SendOptions
 * @property {Object} options
 * @property {TagExpression} options.to - Required unless {@link distribution} or {@addrs} are set.
 * @property {ResolvedTagExpression} options.distribution - This argument is required if {@link to} and {@ addrs} are omitted.
 * @property {string[]} options.addrs - This argument is required if {@link to} and {@distribution} are omitted.
 *                                                       
 * @property {string} options.text - Text to send as message body.
 * @property {string} [options.html] - HTML to send as message body.
 * @property {Object} [options.data] - Misc data properties. (ADVANCED)
 * @property {string} [options.threadId=uuid4()] - UUID of this thread
 * @property {string} [options.threadType=conversation]
 * @property {string} [options.messageType=content] - E.g. content, control, etc..
 * @property {string} [options.messageId=uuid4()]
 * @property {string} [options.messageRef] - UUID of message this message will be referencing (reply-to).
 * @property {string} [options.expiration] - Offset for when message should expire.
 * @property {Attachment[]} [options.attachments]
 * @property {number} [options.flags] - Low level flags. (ADVANCED)
 * @property {string} [options.userAgent=librelay]
 * @property {boolean} [options.noSync=false] - When true a sync message will not be sent to your other
 *                                              devices.  (ADVANCED)
 * @property {ActionOptions} [options.actionOptions]
 * @property {Action[]} [options.actions]
 */

/**
 * Primary interface for sending messages to peers (or your other devices).
 *
 * @fires MessageSender#error
 * @fires MessageSender#keychange
 */
class MessageSender extends eventing.EventTarget {

    /**
     * @param {Object} options
     * @param {string} options.addr - Your signal address (e.g. your account UUID)
     * @param {SignalClient} options.signal
     * @param {AtlasClient} options.atlas
     */
    constructor({addr, signal, atlas}) {
        super();
        this.addr = addr;
        this.signal = signal;
        this.atlas = atlas;
    }

    /**
     * Return a default instance.
     * @returns {MessageSender}
     */
    static async factory() {
        const addr = await storage.getState('addr');
        const signal = await hub.SignalClient.factory();
        const atlas = await hub.AtlasClient.factory();
        return new this({addr, signal, atlas});
    }

    async _makeAttachmentPointer(attachment) {
        if (!(attachment instanceof Attachment)) {
            throw TypeError("Expected `Attachment` type");
        }
        const key = node_crypto.randomBytes(64);
        const ptr = protobufs.AttachmentPointer.create({
            key,
            contentType: attachment.type
        });
        const iv = node_crypto.randomBytes(16);
        const encryptedBin = await crypto.encryptAttachment(attachment.buffer, key, iv);
        ptr.id = await this.signal.putAttachment(encryptedBin);
        return ptr;
    }

    /**
     * Send a message
     *
     * @param {SendOptions} options
     * @returns {OutgoingMessage}
     */
    async send({
        to=null, distribution=null, addrs=null,
        text=null, html=null,
        data={},
        threadId=uuid4(),
        threadType='conversation',
        threadTitle=undefined,
        messageType='content',
        messageId=uuid4(),
        messageRef=undefined,
        expiration=undefined,
        attachments=undefined,
        flags=undefined,
        userAgent='librelay',
        noSync=false,
        actions=undefined,
        actionOptions=undefined
    }) {
        const ex = exchange.create();
        if (!distribution) {
            if (to) {
                distribution = await this.atlas.resolveTags(to);
            } else if (!addrs) {
                throw TypeError("`to`, `distribution` or `addrs` required");
            }
        }
        if (distribution) {
            ex.setThreadExpression(distribution.universal);
        }
        if (text) {
            ex.setBody(text);
        }
        if (html) {
            ex.setBody(html, {html: true});
        }
        ex.setThreadId(threadId);
        ex.setThreadType(threadType);
        ex.setThreadTitle(threadTitle);
        ex.setMessageType(messageType);
        ex.setMessageId(messageId);
        ex.setMessageRef(messageRef);
        ex.setUserAgent(userAgent);
        ex.setSource(this.addr);
        ex.setExpiration(expiration);
        ex.setFlags(flags);
        if (actions && actions.length) {
            ex.setDataProperty('actions', actions);
            if (actionOptions) {
                ex.setDataProperty('actionOptions', actionOptions);
            }
        }
        for (const [k, v] of Object.entries(data)) {
            ex.setDataProperty(k, v);
        }
        if (attachments && attachments.length) {
            // TODO Port to exchange interfaces (TBD)
            ex.setAttachments(attachments.map(x => x.getMeta()));
        }
        const dataMessage = ex.encode();
        if (attachments) {
            // TODO Port to exchange interfaces (TBD)
            dataMessage.attachments = await Promise.all(attachments.map(x =>
                this._makeAttachmentPointer(x)));
        }
        const content = protobufs.Content.create({dataMessage});
        const ts = Date.now();
        const outMsg = this._send(content, ts, this._scrubSelf(addrs || distribution.userids));
        if (!noSync) {
            const syncOutMsg = this._sendSync(content, ts, threadId, expiration && Date.now());
            // Relay events from out message into the normal (non-sync) out-msg.  Even
            // if this message is just for us, it makes the interface consistent.
            syncOutMsg.on('sent', entry => outMsg._emitSentEntry(entry));
            syncOutMsg.on('error', entry => outMsg._emitErrorEntry(entry));
        }
        return outMsg;
    }

    _send(content, timestamp, addrs) {
        console.assert(addrs instanceof Array);
        const outmsg = new OutgoingMessage(this.signal, timestamp, content);
        outmsg.on('keychange', this._onKeyChange.bind(this));
        for (const addr of addrs) {
            queueAsync('message-send-job-' + addr.split('.')[0], () =>
                outmsg.sendToAddr(addr).catch(this._onError.bind(this)));
        }
        return outmsg;
    }

    async _onError(e) {
        const ev = new eventing.Event('error');
        ev.error = e;
        await this.dispatchEvent(ev);
    }

    async _onKeyChange(e) {
        await this.dispatchEvent(new eventing.KeyChangeEvent(e));
    }

    _sendSync(content, timestamp, threadId, expirationStartTimestamp) {
        const sentMessage = protobufs.SyncMessage.Sent.create({
            timestamp,
            message: content.dataMessage
        });
        if (threadId) {
            sentMessage.destination = threadId;
        }
        if (expirationStartTimestamp) {
            sentMessage.expirationStartTimestamp = expirationStartTimestamp;
        }
        const syncMessage = protobufs.SyncMessage.create({sent: sentMessage});
        const syncContent = protobufs.Content.create({syncMessage});
        return this._send(syncContent, timestamp, [this.addr]);
    }

    async syncReadMessages(reads) {
        if (!reads.length) {
            console.warn("No reads to sync");
        }
        const read = reads.map(r => protobufs.SyncMessage.Read.create({
            timestamp: r.timestamp,
            sender: r.sender
        }));
        const syncMessage = protobufs.SyncMessage.create({read});
        const content = protobufs.Content.create({syncMessage});
        return this._send(content, Date.now(), [this.addr]);
    }

    _scrubSelf(addrs) {
        const nset = new Set(addrs);
        nset.delete(this.addr);
        return Array.from(nset);
    }

    async closeSession(encodedAddr, options) {
        const [addr, deviceId] = util.unencodeAddr(encodedAddr);
        const deviceIds = deviceId ? [deviceId] :  await storage.getDeviceIds(addr);

        async function _closeOpenSessions() {
            await Promise.all(deviceIds.map(deviceId => {
                const address = new libsignal.ProtocolAddress(addr, deviceId);
                const sessionCipher = new libsignal.SessionCipher(storage, address);
                return sessionCipher.closeOpenSession();
            }));
        }

        await _closeOpenSessions();  // Clear before so endsession is a prekey bundle
        const outmsg = await this.send({
            addrs: [encodedAddr],
            noSync: true,
            flags: protobufs.DataMessage.Flags.END_SESSION,
            messageType: 'control',
            messageId: 'deadbeef-1111-2222-3333-000000000000',
            threadId: 'deadbeef-1111-2222-3333-000000000000',
            data: {
                control: 'closeSession',
                retransmit: options.retransmit
            }
        });
        try {
            await new Promise((resolve, reject) => {
                outmsg.on('sent', resolve);
                outmsg.on('error', reject);
            });
        } finally {
            await _closeOpenSessions();  // Clear after so don't use the reopened session from the end msg
        }
    }
}

module.exports = MessageSender;