Commit 7647fe45 authored by David Baker's avatar David Baker
Browse files

Branch to test bulk decryption of events

A bunch of terrible hacks to test how much speed we would gain at
startup by decrypting events in a batch. That is:
 * Trying to decrypt events once before putting them in the timeline
   to avoid an Event.decrypted for every encrypted event we load from
   disk at startup.
 * When we load batches of events from the store at startup, fetch the
   keys from indexeddb in batches too rather than a separate txn for
   every event (and don't fetch the same session multiple times for
   messages in the same session and the same batch).

Further optimisations that could be applied:
 * Re-use inbound group sessions for multiple messages within the same
   session rather than unpickling a separate one for each message.
 * Not decrypting events until we actually need them
 * Long standing store issues, ie. not storing message data as one
   giant monolithic blob that we have to load at startup.
parent a4a7097c
......@@ -5282,6 +5282,7 @@ function _resolve(callback, resolve, res) {
function _PojoToMatrixEventMapper(client, options) {
const preventReEmit = Boolean(options && options.preventReEmit);
const decrypt = Boolean(options && (options.decrypt || options.decrypt === undefined));
function mapper(plainOldJsObject) {
const event = new MatrixEvent(plainOldJsObject);
if (event.isEncrypted()) {
......@@ -5290,7 +5291,7 @@ function _PojoToMatrixEventMapper(client, options) {
"Event.decrypted",
]);
}
event.attemptDecryption(client._crypto);
if (decrypt) event.attemptDecryption(client._crypto);
}
const room = client.getRoom(event.getRoomId());
if (room && !preventReEmit) {
......
......@@ -980,6 +980,32 @@ OlmDevice.prototype._getInboundGroupSession = function(
);
};
OlmDevice.prototype._getInboundGroupSessions = function(
roomId, senderKeys, sessionIds, txn, func,
) {
this._cryptoStore.getEndToEndInboundGroupSession(
senderKey, sessionId, txn, (sessionData, withheld) => {
if (sessionData === null) {
func(null, null, withheld);
return;
}
// if we were given a room ID, check that the it matches the original one for the session. This stops
// the HS pretending a message was targeting a different room.
if (roomId !== null && roomId !== sessionData.room_id) {
throw new Error(
"Mismatched room_id for inbound group session (expected " +
sessionData.room_id + ", was " + roomId + ")",
);
}
this._unpickleInboundGroupSession(sessionData, (session) => {
func(session, sessionData, withheld);
});
},
);
};
/**
* Add an inbound group session to the session store
*
......@@ -1236,6 +1262,73 @@ OlmDevice.prototype.decryptGroupMessage = async function(
return result;
};
OlmDevice.prototype.decryptGroupMessages = async function(events) {
const tuples = events.map(e => {
return [e.getWireContent().sender_key, e.getWireContent().session_id];
});
const results = [];
for (let i = 0; i < events.length; ++i) results[i] = null;
await this._cryptoStore.doTxn(
'readonly', [
IndexedDBCryptoStore.STORE_INBOUND_GROUP_SESSIONS,
], (txn) => {
this._cryptoStore.getEndToEndInboundGroupSessions(tuples, txn, (sessionResults) => {
for (let i = 0; i < events.length; ++i) {
const ev = events[i];
const content = ev.getWireContent();
const sessionData = sessionResults[content.sender_key][content.session_id];
if (sessionData) {
const session = new global.Olm.InboundGroupSession();
session.unpickle(this._pickleKey, sessionData.session.session);
try {
const decryptRes = session.decrypt(content.ciphertext);
const result = {
/*return {
clearEvent: payload,
senderCurve25519Key: res.senderKey,
claimedEd25519Key: res.keysClaimed.ed25519,
forwardingCurve25519KeyChain: res.forwardingCurve25519KeyChain,
};*/
clearEvent: JSON.parse(decryptRes.plaintext),
keysClaimed: sessionData.keysClaimed || {},
senderKey: content.sender_key,
forwardingCurve25519KeyChain: (
sessionData.forwardingCurve25519KeyChain || []
),
};
results[i] = {status: 'fulfilled', value: result};
} catch (e) {
results[i] = {status: 'rejected', reason: e};
} finally {
session.free();
}
}
}
},
);
});
for (let i = 0; i < results.length; ++i) {
if (results[i] === null) {
results[i] = {
status: 'rejected',
reason: new algorithms.DecryptionError(
"MEGOLM_UNKNOWN_INBOUND_SESSION_ID",
"The sender's device has not sent us the keys for this message.",
{
//session: content.sender_key + '|' + content.session_id,
},
),
};
}
}
return results;
};
/**
* Determine if we have the keys for a given megolm session
*
......
......@@ -1204,6 +1204,10 @@ MegolmDecryption.prototype.decryptEvent = async function(event) {
};
};
MegolmDecryption.prototype.decryptEvents = async function(events) {
return this._olmDevice.decryptGroupMessages(events);
}
MegolmDecryption.prototype._requestKeysForEvent = function(event) {
const wireContent = event.getWireContent();
......
......@@ -254,6 +254,12 @@ OlmDecryption.prototype.decryptEvent = async function(event) {
};
};
OlmDecryption.prototype.decryptEvents = async function(events) {
return Promise.allSettled(events.map(ev => {
return this.decryptEvent(ev);
}));
};
/**
* Attempt to decrypt an Olm message
*
......
......@@ -2764,6 +2764,33 @@ Crypto.prototype.decryptEvent = function(event) {
return alg.decryptEvent(event);
};
Crypto.prototype.decryptEvents = function(events) {
if (events.length === 0) return Promise.resolve([]);
const byRoomAlg = {};
for (const ev of events) {
const content = ev.getWireContent();
if (content.algorithm === undefined) continue;
const roomAlg = ev.getRoomId() + '-' + content.algorithm;
if(byRoomAlg[roomAlg] === undefined) byRoomAlg[roomAlg] = [];
byRoomAlg[roomAlg].push(ev);
}
return Promise.all(Object.values(byRoomAlg).map(evs => {
const content = evs[0].getWireContent();
return this._getRoomDecryptor(evs[0].getRoomId(), content.algorithm).decryptEvents(evs);
})).then(results => {
for (let i = 0; i < Object.values(byRoomAlg).length; ++i) {
const evs = Object.values(byRoomAlg)[i];
const out = results[i];
for (let j = 0; j < evs.length; ++j) {
evs[j].setEncryptionResult(out[j]);
}
}
});
};
/**
* Handle the notification from /sync or /keys/changes that device lists have
* been changed.
......
......@@ -572,6 +572,38 @@ export class Backend {
};
}
getEndToEndInboundGroupSessions(tuples, txn, func) {
const results = {};
const objectStore = txn.objectStore("inbound_group_sessions");
let i = 0;
const getNext = () => {
if (i === tuples.length) {
func(results);
} else {
if (results[tuples[i][0]] && results[tuples[i][0]][tuples[i][1]] !== undefined) {
++i;
getNext();
} else {
const getReq = objectStore.get(tuples[i]);
getReq.onsuccess = function() {
try {
results[tuples[i][0]] = results[tuples[i][0]] || {};
results[tuples[i][0]][tuples[i][1]] = getReq.result;
++i;
getNext();
} catch (e) {
abortWithException(txn, e);
}
};
}
}
};
getNext();
}
getAllEndToEndInboundGroupSessions(txn, func) {
const objectStore = txn.objectStore("inbound_group_sessions");
const getReq = objectStore.openCursor();
......
......@@ -444,6 +444,12 @@ export class IndexedDBCryptoStore {
);
}
getEndToEndInboundGroupSessions(tuples, txn, func) {
this._backend.getEndToEndInboundGroupSessions(
tuples, txn, func,
);
}
/**
* Fetches all inbound group sessions in the store
* @param {*} txn An active transaction. See doTxn().
......
......@@ -395,7 +395,7 @@ utils.extend(MatrixEvent.prototype, {
* @returns {Promise} promise which resolves (to undefined) when the decryption
* attempt is completed.
*/
attemptDecryption: async function(crypto, isRetry) {
attemptDecryption: async function(crypto, isRetry, emit) {
// start with a couple of sanity checks.
if (!this.isEncrypted()) {
throw new Error("Attempt to decrypt event which isn't encrypted");
......@@ -425,7 +425,7 @@ utils.extend(MatrixEvent.prototype, {
return this._decryptionPromise;
}
this._decryptionPromise = this._decryptionLoop(crypto, isRetry);
this._decryptionPromise = this._decryptionLoop(crypto, isRetry, emit);
return this._decryptionPromise;
},
......@@ -470,7 +470,9 @@ utils.extend(MatrixEvent.prototype, {
return recipients;
},
_decryptionLoop: async function(crypto, isRetry) {
_decryptionLoop: async function(crypto, isRetry, emit) {
if (emit === undefined) emit = true;
// make sure that this method never runs completely synchronously.
// (doing so would mean that we would clear _decryptionPromise *before*
// it is set in attemptDecryption - and hence end up with a stuck
......@@ -567,6 +569,29 @@ utils.extend(MatrixEvent.prototype, {
}
},
setEncryptionResult: function(res) {
if (res.status === 'fulfilled') {
this._decryptionPromise = null;
this._retryDecryption = false;
if (res.value === undefined) {
console.log("sxcfhj");
}
this._setClearData(res.value);
// Before we emit the event, clear the push actions so that they can be recalculated
// by relevant code. We do this because the clear event has now changed, making it
// so that existing rules can be re-run over the applicable properties. Stuff like
// highlighting when the user's name is mentioned rely on this happening. We also want
// to set the push actions before emitting so that any notification listeners don't
// pick up the wrong contents.
this.setPushActions(null);
//this.emit("Event.decrypted", this, err);
} else {
this._setClearData(this._badEncryptedMessage(res.reason.message));
}
},
_badEncryptedMessage: function(reason) {
return {
clearEvent: {
......
......@@ -1151,10 +1151,10 @@ SyncApi.prototype._processSyncResponse = async function(
// Handle joins
await utils.promiseMapSeries(joinRooms, async function(joinObj) {
const room = joinObj.room;
const stateEvents = self._mapSyncEventsFormat(joinObj.state, room);
const timelineEvents = self._mapSyncEventsFormat(joinObj.timeline, room);
const ephemeralEvents = self._mapSyncEventsFormat(joinObj.ephemeral);
const accountDataEvents = self._mapSyncEventsFormat(joinObj.account_data);
const stateEvents = await self._mapSyncEventsFormat(joinObj.state, room, true);
const timelineEvents = await self._mapSyncEventsFormat(joinObj.timeline, room, true);
const ephemeralEvents = await self._mapSyncEventsFormat(joinObj.ephemeral, null, true);
const accountDataEvents = await self._mapSyncEventsFormat(joinObj.account_data, null, true);
// we do this first so it's correct when any of the events fire
if (joinObj.unread_notifications) {
......@@ -1502,17 +1502,37 @@ SyncApi.prototype._mapSyncResponseToRoomArray = function(obj) {
* @param {Room} room
* @return {MatrixEvent[]}
*/
SyncApi.prototype._mapSyncEventsFormat = function(obj, room) {
SyncApi.prototype._mapSyncEventsFormat = function(obj, room, async) {
if (!obj || !utils.isArray(obj.events)) {
return [];
}
const mapper = this.client.getEventMapper();
return obj.events.map(function(e) {
const mappedEvents = obj.events.map(function(e) {
if (room) {
e.room_id = room.roomId;
}
return mapper(e);
return mapper(e, {decrypt: false});
});
const encryptedEvents = mappedEvents.filter(e => e.isEncrypted());
/*const decryptionPromises = encryptedEvents.map(e => e.attemptDecryption(this.opts.crypto, false, false));
let allDecryptionsPromise = Promise.resolve();
if (decryptionPromises.length > 0) {
allDecryptionsPromise = Promise.all(decryptionPromises).then(() => {
this.client.emit("Events.decrypted", encryptedEvents);
});
}*/
//if (async) return allDecryptionsPromise.then(() => {
if (async) return this.opts.crypto.decryptEvents(encryptedEvents).then(() => {
return mappedEvents;
});
return mappedEvents;
};
/**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment