Source: sync/chrono-sync2013.js

/**
 * This class represents the digest tree for chrono-sync2013.
 * Copyright (C) 2014-2016 Regents of the University of California.
 * @author: Zhehao Wang, based on Jeff T.'s implementation in ndn-cpp
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 * A copy of the GNU Lesser General Public License is in the file COPYING.
 */

/** @ignore */
var DigestTree = require('./digest-tree.js').DigestTree; /** @ignore */
var Interest = require('../interest.js').Interest; /** @ignore */
var Data = require('../data.js').Data; /** @ignore */
var Name = require('../name.js').Name; /** @ignore */
var Blob = require('../util/blob.js').Blob; /** @ignore */
var MemoryContentCache = require('../util/memory-content-cache.js').MemoryContentCache; /** @ignore */
var SyncStateProto = require('./sync-state.js').SyncStateProto; /** @ignore */
var NdnCommon = require('../util/ndn-common.js').NdnCommon;

/**
 * ChronoSync2013 implements the NDN ChronoSync protocol as described in the
 * 2013 paper "Let's ChronoSync: Decentralized Dataset State Synchronization in
 * Named Data Networking". http://named-data.net/publications/chronosync .
 * @note The support for ChronoSync is experimental and the API is not finalized.
 * See the API docs for more detail at
 * http://named-data.net/doc/ndn-ccl-api/chrono-sync2013.html .
 *
 * Create a new ChronoSync2013 to communicate using the given face. Initialize
 * the digest log with a digest of "00" and and empty content. Register the
 * applicationBroadcastPrefix to receive interests for sync state messages and
 * express an interest for the initial root digest "00".
 * @param {function} onReceivedSyncState When ChronoSync receives a sync state message,
 * this calls onReceivedSyncState(syncStates, isRecovery) where syncStates is the
 * list of SyncState messages and isRecovery is true if this is the initial
 * list of SyncState messages or from a recovery interest. (For example, if
 * isRecovery is true, a chat application would not want to re-display all
 * the associated chat messages.) The callback should send interests to fetch
 * the application data for the sequence numbers in the sync state.
 * NOTE: The library will log any exceptions thrown by this callback, but for
 * better error handling the callback should catch and properly handle any
 * exceptions.
 * @param {function} onInitialized This calls onInitialized() when the first sync data
 * is received (or the interest times out because there are no other
 * publishers yet).
 * NOTE: The library will log any exceptions thrown by this callback, but for
 * better error handling the callback should catch and properly handle any
 * exceptions.
 * @param {Name} applicationDataPrefix The prefix used by this application instance
 * for application data. For example, "/my/local/prefix/ndnchat4/0K4wChff2v".
 * This is used when sending a sync message for a new sequence number.
 * In the sync message, this uses applicationDataPrefix.toUri().
 * @param {Name} applicationBroadcastPrefix The broadcast name prefix including the
 * application name. For example, "/ndn/broadcast/ChronoChat-0.3/ndnchat1".
 * This makes a copy of the name.
 * @param {int} sessionNo The session number used with the applicationDataPrefix in
 * sync state messages.
 * @param {Face} face The Face for calling registerPrefix and expressInterest. The
 * Face object must remain valid for the life of this ChronoSync2013 object.
 * @param {KeyChain} keyChain To sign a data packet containing a sync state message, this
 * calls keyChain.sign(data, certificateName).
 * @param {Name} certificateName The certificate name of the key to use for signing a
 * data packet containing a sync state message.
 * @param {Milliseconds} syncLifetime The interest lifetime in milliseconds for sending
 * sync interests.
 * @param {function} onRegisterFailed If failed to register the prefix to receive
 * interests for the applicationBroadcastPrefix, this calls
 * onRegisterFailed(applicationBroadcastPrefix).
 * NOTE: The library will log any exceptions thrown by this callback, but for
 * better error handling the callback should catch and properly handle any
 * exceptions.
 * @constructor
 */
var ChronoSync2013 = function ChronoSync2013
  (onReceivedSyncState, onInitialized, applicationDataPrefix,
   applicationBroadcastPrefix, sessionNo, face, keyChain, certificateName,
   syncLifetime, onRegisterFailed)
{
  // assigning function pointers
  this.onReceivedSyncState = onReceivedSyncState;
  this.onInitialized = onInitialized;
  this.applicationDataPrefixUri = applicationDataPrefix.toUri();
  this.applicationBroadcastPrefix = applicationBroadcastPrefix;
  this.session = sessionNo;
  this.face = face;
  this.keyChain = keyChain;
  this.certificateName = certificateName;
  this.sync_lifetime = syncLifetime;
  this.usrseq = -1;

  this.digest_tree = new DigestTree();
  this.contentCache = new MemoryContentCache(face);

  this.digest_log = new Array();
  this.digest_log.push(new ChronoSync2013.DigestLogEntry("00",[]));

  this.contentCache.registerPrefix
    (this.applicationBroadcastPrefix, onRegisterFailed,
     this.onInterest.bind(this));
  this.enabled = true;

  var interest = new Interest(this.applicationBroadcastPrefix);
  interest.getName().append("00");

  interest.setInterestLifetimeMilliseconds(1000);

  var Sync;
  try {
    // Using protobuf.min.js in the browser.
    Sync = dcodeIO.ProtoBuf.newBuilder().import(SyncStateProto).build("Sync");
  }
  catch (ex) {
    // Using protobufjs in node.
    Sync = require("protobufjs").newBuilder().import(SyncStateProto).build("Sync");
  }
  this.SyncStateMsg = Sync.SyncStateMsg;
  this.SyncState = Sync.SyncState;

  this.face.expressInterest(interest, this.onData.bind(this), this.initialTimeOut.bind(this));
};

exports.ChronoSync2013 = ChronoSync2013;

ChronoSync2013.prototype.getProducerSequenceNo = function(dataPrefix, sessionNo)
{
  var index = this.digest_tree.find(dataPrefix, sessionNo);
  if (index < 0)
    return -1;
  else
    return this.digest_tree.get(index).getSequenceNo();
};

/**
 * Increment the sequence number, create a sync message with the new sequence number,
 * and publish a data packet where the name is applicationBroadcastPrefix + root
 * digest of current digest tree. Then add the sync message to digest tree and digest
 * log which creates a new root digest. Finally, express an interest for the next sync
 * update with the name applicationBroadcastPrefix + the new root digest.
 * After this, application should publish the content for the new sequence number.
 * Get the new sequence number with getSequenceNo().
 */
ChronoSync2013.prototype.publishNextSequenceNo = function()
{
  this.usrseq ++;
  var content = [new this.SyncState({ name:this.applicationDataPrefixUri,
                                 type:'UPDATE',
                                 seqno:{
                                   seq:this.usrseq,
                                   session:this.session
                                  }
                                })];
  var content_t = new this.SyncStateMsg({ss:content});
  this.broadcastSyncState(this.digest_tree.getRoot(), content_t);

  if (!this.update(content))
    console.log("Warning: ChronoSync: update did not create a new digest log entry");

  var interest = new Interest(this.applicationBroadcastPrefix);
  interest.getName().append(this.digest_tree.getRoot());
  interest.setInterestLifetimeMilliseconds(this.sync_lifetime);

  this.face.expressInterest(interest, this.onData.bind(this), this.syncTimeout.bind(this));
};

/**
 * Get the sequence number of the latest data published by this application instance.
 * @return {int} the sequence number
 */
ChronoSync2013.prototype.getSequenceNo = function()
{
  return this.usrseq;
};

// DigestLogEntry class

ChronoSync2013.DigestLogEntry = function ChronoSync2013DisgestLogEntry(digest, data)
{
  this.digest = digest;
  this.data = data;
};

ChronoSync2013.DigestLogEntry.prototype.getDigest = function()
{
  return this.digest;
};

ChronoSync2013.DigestLogEntry.prototype.getData = function()
{
  return this.data;
};

/**
 * Unregister callbacks so that this does not respond to interests anymore.
 * If you will dispose this ChronoSync2013 object while your application is
 * still running, you should call shutdown() first.  After calling this, you
 * should not call publishNextSequenceNo() again since the behavior will be
 * undefined.
 */
ChronoSync2013.prototype.shutdown = function()
{
  this.enabled = false;
  this.contentCache.unregisterAll();
};

// SyncState class
/**
 * A SyncState holds the values of a sync state message which is passed to the
 * onReceivedSyncState callback which was given to the ChronoSyn2013
 * constructor. Note: this has the same info as the Protobuf class
 * Sync::SyncState, but we make a separate class so that we don't need the
 * Protobuf definition in the ChronoSync API.
 */
ChronoSync2013.SyncState = function ChronoSync2013SyncState(dataPrefixUri, sessionNo, sequenceNo)
{
  this.dataPrefixUri_ = dataPrefixUri;
  this.sessionNo_ = sessionNo;
  this.sequenceNo_ = sequenceNo;
};

/**
 * Get the application data prefix for this sync state message.
 * @return The application data prefix as a Name URI string.
 */
ChronoSync2013.SyncState.prototype.getDataPrefix = function()
{
  return this.dataPrefixUri_;
}

/**
 * Get the session number associated with the application data prefix for
 * this sync state message.
 * @return The session number.
 */
ChronoSync2013.SyncState.prototype.getSessionNo = function()
{
  return this.sessionNo_;
}

/**
 * Get the sequence number for this sync state message.
 * @return The sequence number.
 */
ChronoSync2013.SyncState.prototype.getSequenceNo = function()
{
  return this.sequenceNo_;
}

// Private methods for ChronoSync2013 class,
/**
 * Make a data packet with the syncMessage and with name applicationBroadcastPrefix_ + digest.
 * Sign and send.
 * @param {string} The root digest as a hex string for the data packet name.
 * @param {SyncStateMsg} The syncMessage updates the digest tree state with the given digest.
 */
ChronoSync2013.prototype.broadcastSyncState = function(digest, syncMessage)
{
  var array = new Uint8Array(syncMessage.toArrayBuffer());
  var data = new Data(this.applicationBroadcastPrefix);
  data.getName().append(digest);
  data.setContent(new Blob(array, false));
  var thisChronoSync = this;
  this.keyChain.sign(data, this.certificateName, function() {
    thisChronoSync.contentCache.add(data);
  });
};

/**
 * Update the digest tree with the messages in content. If the digest tree root is not in
 * the digest log, also add a log entry with the content.
 * @param {SyncStates[]} The sync state messages
 * @return {bool} True if added a digest log entry (because the updated digest tree root
 * was not in the log), false if didn't add a log entry.
 */
 // Whatever's received by ondata, is pushed into digest log as its data directly
ChronoSync2013.prototype.update = function(content)
{
  for (var i = 0; i < content.length; i++) {
    if (content[i].type == 0) {
      if (this.digest_tree.update(content[i].name, content[i].seqno.session, content[i].seqno.seq)) {
        if (this.applicationDataPrefixUri == content[i].name)
          this.usrseq = content[i].seqno.seq;
      }
    }
  }

  if (this.logfind(this.digest_tree.getRoot()) == -1) {
    var newlog = new ChronoSync2013.DigestLogEntry(this.digest_tree.getRoot(), content);
    this.digest_log.push(newlog);
    return true;
  }
  else
    return false;
};

ChronoSync2013.prototype.logfind = function(digest)
{
  for (var i = 0; i < this.digest_log.length; i++) {
    if(digest == this.digest_log[i].digest)
      return i;
  }
  return -1;
};

/**
 * Process the sync interest from the applicationBroadcastPrefix. If we can't
 * satisfy the interest, add it to the pending interest table in
 * this.contentCache so that a future call to contentCacheAdd may satisfy it.
 */
ChronoSync2013.prototype.onInterest = function
  (prefix, interest, face, interestFilterId, filter)
{
  if (!this.enabled)
    // Ignore callbacks after the application calls shutdown().
    return;

  //search if the digest is already exist in the digest log

  var syncdigest = interest.getName().get(this.applicationBroadcastPrefix.size()).toEscapedString();
  if (interest.getName().size() == this.applicationBroadcastPrefix.size() + 2) {
    syncdigest = interest.getName().get(this.applicationBroadcastPrefix.size() + 1).toEscapedString();
  }
  if (interest.getName().size() == this.applicationBroadcastPrefix.size() + 2 || syncdigest == "00") {
    this.processRecoveryInst(interest, syncdigest, face);
  }
  else {
    this.contentCache.storePendingInterest(interest, face);

    if (syncdigest != this.digest_tree.getRoot()) {
      var index = this.logfind(syncdigest);
      var content = [];
      if(index == -1) {
        var self = this;
        // Are we sure that using a "/local/timeout" interest is the best future call approach?
        var timeout = new Interest(new Name("/local/timeout"));
        timeout.setInterestLifetimeMilliseconds(2000);
        this.face.expressInterest
          (timeout, this.dummyOnData,
           this.judgeRecovery.bind(this, timeout, syncdigest, face));
      }
      else {
        //common interest processing
        this.processSyncInst(index, syncdigest, face);
      }
    }
  }
};

/**
 * Process sync/recovery data.
 * @param {Interest}
 * @param {Data}
 */
ChronoSync2013.prototype.onData = function(interest, co)
{
  if (!this.enabled)
    // Ignore callbacks after the application calls shutdown().
    return;

  var arr = new Uint8Array(co.getContent().size());
  arr.set(co.getContent().buf());
  var content_t = this.SyncStateMsg.decode(arr.buffer);
  var content = content_t.ss;

  var isRecovery = false;

  if (this.digest_tree.getRoot() == "00") {
    isRecovery = true;
    this.initialOndata(content);
  }
  else {
    this.update(content);
    if (interest.getName().size() == this.applicationBroadcastPrefix.size() + 2)
      // Assume this is a recovery interest.
      isRecovery = true;
    else
      isRecovery = false;
  }

  var syncStates = [];

  for (var i = 0; i < content.length; i++) {
    if (content[i].type == 0) {
      syncStates.push(new ChronoSync2013.SyncState
        (content[i].name, content[i].seqno.session, content[i].seqno.seq));
    }
  }

  // Instead of using Protobuf, use our own definition of SyncStates to pass to onReceivedSyncState.
  try {
    this.onReceivedSyncState(syncStates, isRecovery);
  } catch (ex) {
    console.log("Error in onReceivedSyncState: " + NdnCommon.getErrorWithStackTrace(ex));
  }

  var n = new Name(this.applicationBroadcastPrefix);
  n.append(this.digest_tree.getRoot());

  var interest = new Interest(n);
  interest.setInterestLifetimeMilliseconds(this.sync_lifetime);

  this.face.expressInterest(interest, this.onData.bind(this), this.syncTimeout.bind(this));
};

/**
 * Interest variable not actually in use here
 */
ChronoSync2013.prototype.initialTimeOut = function(interest)
{
  if (!this.enabled)
    // Ignore callbacks after the application calls shutdown().
    return;

  console.log("no other people");

  this.usrseq++;
  try {
    this.onInitialized();
  } catch (ex) {
    console.log("Error in onInitialized: " + NdnCommon.getErrorWithStackTrace(ex));
  }
  var content = [new this.SyncState({ name:this.applicationDataPrefixUri,
                                 type:'UPDATE',
                                 seqno: {
                                   seq:this.usrseq,
                                   session:this.session
                                 }
                               })];
  this.update(content);
  var n = new Name(this.applicationBroadcastPrefix);
  n.append(this.digest_tree.getRoot());
  var retryInterest = new Interest(n);
  retryInterest.setInterestLifetimeMilliseconds(this.sync_lifetime);

  this.face.expressInterest(retryInterest, this.onData.bind(this), this.syncTimeout.bind(this));
};

ChronoSync2013.prototype.processRecoveryInst = function(interest, syncdigest, face)
{
  if (this.logfind(syncdigest) != -1) {
    var content = [];

    for(var i = 0; i < this.digest_tree.digestnode.length; i++) {
      content[i] = new this.SyncState({ name:this.digest_tree.digestnode[i].getDataPrefix(),
                                   type:'UPDATE',
                                   seqno:{
                                     seq:this.digest_tree.digestnode[i].getSequenceNo(),
                                     session:this.digest_tree.digestnode[i].getSessionNo()
                                    }
                                 });
    }

    if (content.length != 0) {
      var content_t = new this.SyncStateMsg({ss:content});
      var str = new Uint8Array(content_t.toArrayBuffer());
      var co = new Data(interest.getName());
      co.setContent(new Blob(str, false));
      if (interest.getName().get(-1).toEscapedString() == "00")
        // Limit the lifetime of replies to interest for "00" since they can be different.
        co.getMetaInfo().setFreshnessPeriod(1000);

      this.keyChain.sign(co, this.certificateName, function() {
        try {
          face.putData(co);
        } catch (e) {
          console.log(e.toString());
        }
      });
    }
  }
};

/**
 * Common interest processing, using digest log to find the difference after syncdigest_t
 * @return True if sent a data packet to satisfy the interest.
 */
ChronoSync2013.prototype.processSyncInst = function(index, syncdigest_t, face)
{
  var content = [];
  var data_name = [];
  var data_seq = [];
  var data_ses = [];

  for (var j = index + 1; j < this.digest_log.length; j++) {
    var temp = this.digest_log[j].getData();
    for (var i = 0 ; i < temp.length ; i++) {
      if (temp[i].type != 0) {
        continue;
      }
      if (this.digest_tree.find(temp[i].name, temp[i].seqno.session) != -1) {
        var n = data_name.indexOf(temp[i].name);
        if (n == -1) {
          data_name.push(temp[i].name);
          data_seq.push(temp[i].seqno.seq);
          data_ses.push(temp[i].seqno.session);
        }
        else {
          data_seq[n] = temp[i].seqno.seq;
          data_ses[n] = temp[i].seqno.session;
        }
      }
    }
  }

  for(var i = 0; i < data_name.length; i++) {
    content[i] = new this.SyncState({ name:data_name[i],
                                 type:'UPDATE',
                                 seqno: {
                                   seq:data_seq[i],
                                   session:data_ses[i]
                                 }
                               });
  }
  if (content.length != 0) {
    var content_t = new this.SyncStateMsg({ss:content});
    var str = new Uint8Array(content_t.toArrayBuffer());
    var n = new Name(this.prefix)
    n.append(this.chatroom).append(syncdigest_t);

    var co = new Data(n);
    co.setContent(new Blob(str, false));
    this.keyChain.sign(co, this.certificateName, function() {
      try {
        face.putData(co);
      }
      catch (e) {
        console.log(e.toString());
      }
    });
  }
};

/**
 * Send recovery interset.
 * @param {string} syncdigest_t
 */
ChronoSync2013.prototype.sendRecovery = function(syncdigest_t)
{
  var n = new Name(this.applicationBroadcastPrefix);
  n.append("recovery").append(syncdigest_t);

  var interest = new Interest(n);

  interest.setInterestLifetimeMilliseconds(this.sync_lifetime);

  this.face.expressInterest(interest, this.onData.bind(this), this.syncTimeout.bind(this));
};

/**
 * This is called by onInterest after a timeout to check if a recovery is needed.
 * This method has an interest argument because we use it as the onTimeout for
 * Face.expressInterest.
 * @param {Interest}
 * @param {string}
 * @param {Face}
 */
ChronoSync2013.prototype.judgeRecovery = function(interest, syncdigest_t, face)
{
  //console.log("*** judgeRecovery interest " + interest.getName().toUri() + " times out. Digest: " + syncdigest_t + " ***");
  var index = this.logfind(syncdigest_t);
  if (index != -1) {
    if (syncdigest_t != this.digest_tree.root)
      this.processSyncInst(index, syncdigest_t, face);
  }
  else
    this.sendRecovery(syncdigest_t);
};

ChronoSync2013.prototype.syncTimeout = function(interest)
{
  if (!this.enabled)
    // Ignore callbacks after the application calls shutdown().
    return;

  var component = interest.getName().get(4).toEscapedString();
  if (component == this.digest_tree.root) {
    var n = new Name(interest.getName());
    var newInterest = new Interest(n);

    interest.setInterestLifetimeMilliseconds(this.sync_lifetime);
    this.face.expressInterest(newInterest, this.onData.bind(this), this.syncTimeout.bind(this));
  }
};

ChronoSync2013.prototype.initialOndata = function(content)
{
  this.update(content);

  var digest_t = this.digest_tree.getRoot();
  for (var i = 0; i < content.length; i++) {
    if (content[i].name == this.applicationDataPrefixUri && content[i].seqno.session == this.session) {
      //if the user was an old comer, after add the static log he need to increase his seqno by 1
      var content_t = [new this.SyncState({ name:this.applicationDataPrefixUri,
                                       type:'UPDATE',
                                       seqno: {
                                         seq:content[i].seqno.seq + 1,
                                         session:this.session
                                       }
                                     })];
      if (this.update(content_t)) {
        var newlog = new ChronoSync2013.DigestLogEntry(this.digest_tree.getRoot(), content_t);
        this.digest_log.push(newlog);
        try {
          this.onInitialized();
        } catch (ex) {
          console.log("Error in onInitialized: " + NdnCommon.getErrorWithStackTrace(ex));
        }
      }
    }
  }

  var content_t;
  if (this.usrseq >= 0) {
    //send the data packet with new seqno back
    content_t = new this.SyncState({ name:this.applicationDataPrefixUri,
                                   type:'UPDATE',
                                   seqno: {
                                     seq:this.usrseq,
                                     session:this.session
                                   }
                                 });
  }
  else
    content_t = new this.SyncState({ name:this.applicationDataPrefixUri,
                                   type:'UPDATE',
                                   seqno: {
                                     seq:0,
                                     session:this.session
                                   }
                                 });
  var content_tt = new this.SyncStateMsg({ss:content_t});
  this.broadcastSyncState(digest_t, content_tt);

  if (this.digest_tree.find(this.applicationDataPrefixUri, this.session) == -1) {
    //the user haven't put himself in the digest tree
    this.usrseq++;
    var content = [new this.SyncState({ name:this.applicationDataPrefixUri,
                                   type:'UPDATE',
                                   seqno: {
                                     seq:this.usrseq,
                                     session:this.session
                                   }
                                 })];
    if (this.update(content)) {
      try {
        this.onInitialized();
      } catch (ex) {
        console.log("Error in onInitialized: " + NdnCommon.getErrorWithStackTrace(ex));
      }
    }
  }
};

ChronoSync2013.prototype.dummyOnData = function(interest, data)
{
  console.log("*** dummyOnData called. ***");
};