var imports = require('soop').imports(); var EventEmitter= imports.EventEmitter || require('events').EventEmitter; /* * Emits * 'networkChange' * when network layout has change (new/lost peers, etc) * * 'data' * when an unknown data type arrives * * Provides * send(toPeerIds, {data}, cb?) * */ function Network(opts) { var self = this; opts = opts || {}; this.peerId = opts.peerId; this.apiKey = opts.apiKey || 'lwjd5qra8257b9'; this.debug = opts.debug || 3; this.maxPeers = opts.maxPeers || 10; this.opts = { key: opts.key }; // For using your own peerJs server ['port', 'host', 'path', 'debug'].forEach(function(k) { if (opts[k]) self.opts[k]=opts[k]; }); this.connectedPeers = []; this.started = false; } Network.parent=EventEmitter; // Array helpers Network._arrayDiff = function(a, b) { var seen = []; var diff = []; for (var i = 0; i < b.length; i++) seen[b[i]] = true; for (var j = 0; j < a.length; j++) if (!seen[a[j]]) diff.push(a[j]); return diff; }; Network._inArray = function(el, array) { return array.indexOf(el) > -1; }; Network._arrayPushOnce = function(el, array) { var ret = false; if (!Network._inArray(el, array)) { array.push(el); ret = true; } return ret; }; Network._arrayRemove = function(el, array) { var pos = array.indexOf(el); if (pos >= 0) array.splice(pos, 1); return array; }; Network.prototype._onClose = function(peerId) { this.connectedPeers = Network._arrayRemove(peerId, this.connectedPeers); console.log('on close peers:'+this.connectedPeers); this._notifyNetworkChange(); }; Network.prototype._connectToPeers = function(peerIds) { var self = this; var ret = false; var arrayDiff1= Network._arrayDiff(peerIds, this.connectedPeers); var arrayDiff = Network._arrayDiff(arrayDiff1, [this.peerId]); arrayDiff.forEach(function(peerId) { console.log('### CONNECTING TO:', peerId); self.connectTo(peerId); ret = true; }); return ret; }; Network.prototype._onData = function(data, isInbound) { var obj; try { obj = JSON.parse(data); } catch (e) { console.log('### ERROR ON DATA: "%s" ', data, isInbound, e); return; }; console.log('### RECEIVED TYPE: %s FROM %s', obj.data.type, obj.sender, obj.data); switch(obj.data.type) { case 'peerList': this._connectToPeers(obj.data.peers); this._notifyNetworkChange(); break; case 'disconnect': this._onClose(obj.sender); break; case 'walletId': this.emit('walletId', obj.data); break; default: this.emit('data', obj.sender, obj.data, isInbound); } }; Network.prototype._sendPeers = function(peerIds) { console.log('#### SENDING PEER LIST: ', this.connectedPeers, ' TO ', peerIds?peerIds: 'ALL'); this.send(peerIds, { type: 'peerList', peers: this.connectedPeers, }); }; Network.prototype._addPeer = function(peerId, isInbound) { var hasChanged = Network._arrayPushOnce(peerId, this.connectedPeers); if (isInbound && hasChanged) { this._sendPeers(); //broadcast peer list } else { if (isInbound) { this._sendPeers(peerId); } } }; Network.prototype._checkAnyPeer = function() { if (!this.connectedPeers.length) { console.log('EMIT openError: no more peers, not even you!'); this.emit('openError'); } } Network.prototype._setupConnectionHandlers = function(dataConn, isInbound) { var self=this; dataConn.on('open', function() { if (!Network._inArray(dataConn.peer, self.connectedPeers)) { console.log('### DATA CONNECTION READY TO: ADDING PEER: %s (inbound: %s)', dataConn.peer, isInbound); self._addPeer(dataConn.peer, isInbound); self._notifyNetworkChange( isInbound ? dataConn.peer : null); this.emit('open'); } }); dataConn.on('data', function(data) { self._onData(data, isInbound); }); dataConn.on('error', function(e) { console.log('### DATA ERROR',e ); //TODO self.emit('dataError'); }); dataConn.on('close', function() { if (self.closing) return; console.log('### CLOSE RECV FROM:', dataConn.peer); self._onClose(dataConn.peer); self._checkAnyPeer(); }); }; Network.prototype._notifyNetworkChange = function(newPeer) { console.log('[WebRTC.js.164:_notifyNetworkChange:]', newPeer); //TODO this.emit('networkChange', newPeer); }; Network.prototype._setupPeerHandlers = function(openCallback) { var self=this; var p = this.peer; p.on('open', function(peerId) { console.log('setup peer handlers open'+peerId); self.peerId = peerId; self.connectedPeers = [peerId]; return openCallback(peerId); }); p.on('error', function(err) { console.log('### PEER ERROR:', err); //self.disconnect(null, true); // force disconnect self._checkAnyPeer(); }); p.on('connection', function(dataConn) { console.log('### NEW INBOUND CONNECTION %d/%d', self.connectedPeers.length, self.maxPeers); if (self.connectedPeers.length >= self.maxPeers) { console.log('### PEER REJECTED. PEER MAX LIMIT REACHED'); dataConn.on('open', function() { console.log('### CLOSING CONN FROM:' + dataConn.peer); dataConn.close(); }); } else { self._setupConnectionHandlers(dataConn, true); } }); }; Network.prototype.start = function(openCallback, opts) { console.log('start start'); opts = opts || {}; // Start PeerJS Peer var self = this; if (this.started) { // network already started, restarting network layer console.log('Restarting network layer'); opts.connectedPeers = this.connectedPeers; Network._arrayRemove(this.peerId, opts.connectedPeers); this.disconnect(function() { console.log('restart disconnect finished'); self.start(openCallback, opts); }, true); // fast disconnect return; } opts = opts || {}; opts.connectedPeers = opts.connectedPeers || []; this.peerId = this.peerId || opts.peerId; console.log('setting up fresh network with id'+this.peerId); this.peer = new Peer(this.peerId, this.opts); this._setupPeerHandlers(openCallback); console.log('connected peers'+opts.connectedPeers); for (var i = 0; i