Skip to content

Commit

Permalink
Integrate new message-rpc prototype into core messaging API (extensions)
Browse files Browse the repository at this point in the history
Refactors and improves the prototype of a faster JSON-RPC protocol initially contributed by @tsmaeder (See also eclipse-theia#10781).
The encoding approach used in the initial POC has some performance drawbacks when encoding plain JSON objects. We refactored the protocol to improve the performance for JSON objects whilst maintaining the excellent performance for encoding objects that contain binary data. 

Integrates the new message-rpc prototype into the core messaging API (replacing vscode-ws-jsonrpc).
This has major impacts on the Messaging API as we no longer expose a  `Connection` object (which was provided by vscode-ws-jsonrpc) and directly rely on a generic transport `Channel` implementation instead. 

- Introduce `Channel` as the main transport concept for messages (instead of the dedicated `Connection` from vscode-jsonrpc)
- Replace usage of  `vscode-ws-jsonrpc` with a custom binary RPC protocol.
- Remove 'vscode-ws-jsonrpc' depdency from "@theia/core/shared".
- Refactor all connection providers to use the new binary protocol.
- Ensure that the `RemoteFileSystemProvider` API uses  `Uint8Arrays` over plain number arrays. This enables direct serialization as buffers and reduces the overhead  of unnecessarily converting from and to `Uint8Arrays`.
- Refactor terminal widget and terminal backend contribution so that the widgets communicates with the underlying terminal process using the new rpc protocol.
- Rework the IPC bootstrap protocol so that it uses a binary pipe for message transmission instead of the `ipc` pipe which only supports string encoding.
- Extend the `JsonRpcProxyFactory` with an optional `RpcConnectionFactory` that enables adopter to creates proxies with a that use a custom `RpcProtocol`/`RpcConnection`.

The plugin API still uses its own RPC protocol implementation. This will be addressed in a follow-up PR. (See eclipse-theia#11093)

Contributed on behalf of STMicroelectronics.
Closes eclipse-theia#10684
Co-authored-by: Thomas Mäder <tmader@redhat.com>
  • Loading branch information
tortmayr committed May 17, 2022
1 parent 26b7891 commit df23ae2
Show file tree
Hide file tree
Showing 48 changed files with 2,018 additions and 620 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@

- [plugin] Introduce `DebugSession#workspaceFolder` [#11090](https://github.com/eclipse-theia/theia/pull/11090) - Contributed on behalf of STMicroelectronics

<a name="breaking_changes_1.26.0">[Breaking Changes:](#breaking_changes_1.26.0)</a>

- [core] Refactored the core messaging API. Replaced `vscode-ws-jsonrpc` with a custom RPC protocol that is better suited for handling binary data and enables message tunneling.
This impacts all main concepts of the messaging API. The API no longer exposes a `Connection` object and uses a generic `Channel` implementation instead.
- Replaces usage of `vscode-json-rpc`'s `Connection` with the new generic `Channel`. Affects `AbstractConnectionProvider`, `MessagingService`, `IPCConnectionProvider`, `ElectronMessagingService`
- `MessagingService`: No longer offers the `listen` and `forward` method. Use `wsChannel` instead.
- `RemoteFileSystemServer`: Use `UInt8Array` instead of plain number arrays for all arguments and return type that store binary data
- `DebugAdapter`: Replaced the debug-service internal `Channel` implementation with the newly introduced generic `Channel`.
[#11011](https://github.com/eclipse-theia/theia/pull/11011) - Contributed on behalf of STMicroelectronics.


## v1.25.0 - 4/28/2022

[1.25.0 Milestone](https://github.com/eclipse-theia/theia/milestone/35)
Expand Down
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"**/@types/node": "12"
},
"devDependencies": {
"@types/chai": "4.3.0",
"@types/chai-spies": "1.0.3",
"@types/chai-string": "^1.4.0",
"@types/jsdom": "^11.0.4",
"@types/node": "12",
Expand All @@ -20,6 +22,8 @@
"@typescript-eslint/eslint-plugin": "^4.8.1",
"@typescript-eslint/eslint-plugin-tslint": "^4.8.1",
"@typescript-eslint/parser": "^4.8.1",
"chai": "4.3.4",
"chai-spies": "1.0.0",
"chai-string": "^1.4.0",
"chalk": "4.0.0",
"concurrently": "^3.5.0",
Expand Down
1 change: 0 additions & 1 deletion packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ export class SomeClass {
- `react-virtualized` (from [`react-virtualized@^9.20.0`](https://www.npmjs.com/package/react-virtualized))
- `vscode-languageserver-protocol` (from [`vscode-languageserver-protocol@~3.15.3`](https://www.npmjs.com/package/vscode-languageserver-protocol))
- `vscode-uri` (from [`vscode-uri@^2.1.1`](https://www.npmjs.com/package/vscode-uri))
- `vscode-ws-jsonrpc` (from [`vscode-ws-jsonrpc@^0.2.0`](https://www.npmjs.com/package/vscode-ws-jsonrpc))
- `dompurify` (from [`dompurify@^2.2.9`](https://www.npmjs.com/package/dompurify))
- `express` (from [`express@^4.16.3`](https://www.npmjs.com/package/express))
- `lodash.debounce` (from [`lodash.debounce@^4.0.8`](https://www.npmjs.com/package/lodash.debounce))
Expand Down
4 changes: 1 addition & 3 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
"uuid": "^8.3.2",
"vscode-languageserver-protocol": "~3.15.3",
"vscode-uri": "^2.1.1",
"vscode-ws-jsonrpc": "^0.2.0",
"ws": "^7.1.2",
"yargs": "^15.3.1"
},
Expand Down Expand Up @@ -108,8 +107,7 @@
"react-dom",
"react-virtualized",
"vscode-languageserver-protocol",
"vscode-uri",
"vscode-ws-jsonrpc"
"vscode-uri"
],
"export =": [
"dompurify as DOMPurify",
Expand Down
1 change: 0 additions & 1 deletion packages/core/shared/vscode-ws-jsonrpc/index.d.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/core/shared/vscode-ws-jsonrpc/index.js

This file was deleted.

53 changes: 28 additions & 25 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
// *****************************************************************************

import { injectable, interfaces, decorate, unmanaged } from 'inversify';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common';
import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, Channel } from '../../common';
import { Endpoint } from '../endpoint';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { io, Socket } from 'socket.io-client';
import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel';

decorate(injectable(), JsonRpcProxyFactory);
decorate(unmanaged(), JsonRpcProxyFactory, 0);
Expand All @@ -35,6 +35,8 @@ export interface WebSocketOptions {
export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebSocketOptions> {

protected readonly onSocketDidOpenEmitter: Emitter<void> = new Emitter();
// Socket that is used by the main channel
protected socket: Socket;
get onSocketDidOpen(): Event<void> {
return this.onSocketDidOpenEmitter.event;
}
Expand All @@ -48,31 +50,23 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
return container.get(WebSocketConnectionProvider).createProxy<T>(path, arg);
}

protected readonly socket: Socket;

constructor() {
super();
protected createMainChannel(): Channel {
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
const channel = new WebSocketChannel(toIWebSocket(socket));
socket.on('connect', () => {
this.fireSocketDidOpen();
});
socket.on('disconnect', reason => {
for (const channel of [...this.channels.values()]) {
channel.close(undefined, reason);
}
this.fireSocketDidClose();
});
socket.on('message', data => {
this.handleIncomingRawMessage(data);
});
channel.onClose(() => this.fireSocketDidClose());
socket.connect();
this.socket = socket;

return channel;
}

override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void {
override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise<void> {
if (this.socket.connected) {
super.openChannel(path, handler, options);
return super.openChannel(path, handler, options);
} else {
const openChannel = () => {
this.socket.off('connect', openChannel);
Expand All @@ -82,14 +76,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
}
}

protected createChannel(id: number): WebSocketChannel {
return new WebSocketChannel(id, content => {
if (this.socket.connected) {
this.socket.send(content);
}
});
}

/**
* @param path The handler to reach in the backend.
*/
Expand Down Expand Up @@ -143,3 +129,20 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
this.onSocketDidCloseEmitter.fire(undefined);
}
}

function toIWebSocket(socket: Socket): IWebSocket {
return {
close: () => {
socket.removeAllListeners('disconnect');
socket.removeAllListeners('error');
socket.removeAllListeners('message');
socket.close();
},
isConnected: () => socket.connected,
onClose: cb => socket.on('disconnect', reason => cb(reason)),
onError: cb => socket.on('error', reason => cb(reason)),
onMessage: cb => socket.on('message', data => cb(data)),
send: message => socket.emit('message', message)
};
}

2 changes: 1 addition & 1 deletion packages/core/src/browser/progress-status-bar-item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// *****************************************************************************

import { injectable, inject } from 'inversify';
import { CancellationToken } from 'vscode-ws-jsonrpc';
import { CancellationToken } from '../../shared/vscode-languageserver-protocol';
import { ProgressClient, ProgressMessage, ProgressUpdate } from '../common';
import { StatusBar, StatusBarAlignment } from './status-bar';
import { Deferred } from '../common/promise-util';
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export * from './contribution-provider';
export * from './path';
export * from './logger';
export * from './messaging';
export * from './message-rpc';
export * from './message-service';
export * from './message-service-protocol';
export * from './progress-service';
Expand Down
89 changes: 89 additions & 0 deletions packages/core/src/common/message-rpc/channel.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// *****************************************************************************
// Copyright (C) 2021 Red Hat, Inc. and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { assert, expect, spy, use } from 'chai';
import * as spies from 'chai-spies';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';
import { ChannelMultiplexer, ForwardingChannel, MessageProvider } from './channel';

use(spies);

/**
* A pipe with two channels at each end for testing.
*/
export class ChannelPipe {
readonly left: ForwardingChannel = new ForwardingChannel('left', () => this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' }), () => {
const leftWrite = new Uint8ArrayWriteBuffer();
leftWrite.onCommit(buffer => {
this.right.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer));
});
return leftWrite;
});
readonly right: ForwardingChannel = new ForwardingChannel('right', () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' }), () => {
const rightWrite = new Uint8ArrayWriteBuffer();
rightWrite.onCommit(buffer => {
this.left.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer));
});
return rightWrite;
});
}
describe('Message Channel', () => {
describe('Channel multiplexer', () => {
it('should forward messages to intended target channel', async () => {
const pipe = new ChannelPipe();

const leftMultiplexer = new ChannelMultiplexer(pipe.left);
const rightMultiplexer = new ChannelMultiplexer(pipe.right);
const openChannelSpy = spy(() => {
});

rightMultiplexer.onDidOpenChannel(openChannelSpy);
leftMultiplexer.onDidOpenChannel(openChannelSpy);

const leftFirst = await leftMultiplexer.open('first');
const leftSecond = await leftMultiplexer.open('second');

const rightFirst = rightMultiplexer.getOpenChannel('first');
const rightSecond = rightMultiplexer.getOpenChannel('second');

assert.isNotNull(rightFirst);
assert.isNotNull(rightSecond);

const leftSecondSpy = spy((buf: MessageProvider) => {
const message = buf().readString();
expect(message).equal('message for second');
});

leftSecond.onMessage(leftSecondSpy);

const rightFirstSpy = spy((buf: MessageProvider) => {
const message = buf().readString();
expect(message).equal('message for first');
});

rightFirst!.onMessage(rightFirstSpy);

leftFirst.getWriteBuffer().writeString('message for first').commit();
rightSecond!.getWriteBuffer().writeString('message for second').commit();

expect(leftSecondSpy).to.be.called();
expect(rightFirstSpy).to.be.called();

expect(openChannelSpy).to.be.called.exactly(4);
});
});

}
Loading

0 comments on commit df23ae2

Please sign in to comment.