neo4jmapper src/transaction.js

    Source

    // # Transaction
    // Can be used to send (many) cypher statement(s) as transaction
    // see: [http://docs.neo4j.org/chunked/preview/rest-api-transactional.html](http://docs.neo4j.org/chunked/preview/rest-api-transactional.html)
    
    var __initTransaction__ = function(neo4jrestful) {
    
      var Statement = function Statement(transaction, cypher, parameters) {
        this._transaction_  = transaction;
        this.statement = cypher;
        this.parameters = parameters;
      }
    
      Statement.prototype._transaction_ = null;
      Statement.prototype.statement = '';
      Statement.prototype.parameters = null;
      Statement.prototype.status = null; // 'sending', 'sended'
      Statement.prototype.position = null;
      Statement.prototype.results = null;
      Statement.prototype.errors = null;
    
      Statement.prototype.toObject = function() {
        return {
          statement: this.statement,
          parameters: JSON.stringify(this.parameters),
          status: this.status,
          position: this.position,
          errors: this.errors,
          results: this.results,
        };
      }
    
      var Transaction = function Transaction(cypher, parameters, cb) {
        this.neo4jrestful = neo4jrestful.singleton();
        this.begin(cypher, parameters, cb);
      }
    
      Transaction.Statement = Statement;
    
      Transaction.prototype.statements = null;
      Transaction.prototype._response_ = null;
      Transaction.prototype.neo4jrestful = null;
      Transaction.prototype.status = ''; // new|creating|open|committing|committed
      Transaction.prototype.id = null;
      Transaction.prototype.uri = null
      Transaction.prototype.expires = null;
      Transaction.prototype.results = null;
      Transaction.prototype._concurrentTransmissions_ = 0;
      Transaction.prototype._responseError_ = null; //will contain response Error
      Transaction.prototype._resortResults_ = true;
    
      Transaction.prototype.begin = function(cypher, parameters, cb) {
        // reset
        this.statements = [];
        this.results = [];
        this.errors = [];
        this.id = null;
        this.status = 'new';
        return this.add(cypher, parameters, cb);
      }
    
      Transaction.prototype.add = function(cypher, parameters, cb) {
        var args = Transaction._sortTransactionArguments(cypher, parameters, cb);
        var statements = args.statements;
        // we cancel the operation if we are comitting
        if (this.status === 'committed') {
          var err = Error("You can't add statements after transaction is committed");
          if (typeof args.cb === 'function') {
            cb(err, null);
          } else {
            throw err;
          }
          return this;
        }
        this.addStatementsToQueue(statements);
        if (args.cb) {
          cb = args.cb;
        } else {
          // we execute if we have a callback
          // till then we'll collect the statements
          return this;
        }
        return this.exec(cb);
      }
    
      Transaction.prototype.exec = function(cb) {
        var self = this;
        // stop here if there is no callback attached
        if (typeof cb !== 'function') {
          return this;
        }
        self.onResponse = cb;
    
        var url = '';
        var untransmittedStatements = this.untransmittedStatements();
    
        if (this.status === 'committing') {
          // commit transaction
          url = (this.id) ? '/transaction/'+this.id+'/commit' : '/transaction/commit';
        } else if (!this.id) {
          // begin a transaction
          this.status = 'creating';
          url = '/transaction';
        } else if (this.status === 'open') {
          // add to transaction
          this.status = 'adding';
          url = '/transaction/'+this.id;
        } else if (this.status = 'committed') {
          cb(Error('Transaction is committed. Create a new transaction instead.'), null, null);
        } else {
          throw Error('Transaction has a unknown status. Possible are: creating|open|committing|committed');
        }
        var statements = [];
        untransmittedStatements.forEach(function(statement, i){
          self.statements[i].status = 'sending';
          statements.push({ statement: statement.statement, parameters: statement.parameters });
        });
        this._concurrentTransmissions_++;
        this.neo4jrestful.post(url, { data: { statements: statements } }, function(err, response, debug) {
          self._response_ = response;
          self._concurrentTransmissions_--;
          self._applyResponse(err, response, debug, untransmittedStatements);
    
          untransmittedStatements.forEach(function(statement) {
            self.statements[statement.position].status = statement.status = 'sended';
          });
    
          untransmittedStatements = self.untransmittedStatements();
    
          if (untransmittedStatements.length > 0) {
            // re call exec() until all statements are transmitted
            // TODO: set a limit to avoid endless loop
            return self.exec(cb);
          }
          // TODO: sort and populate resultset, but currently no good way to detect result objects
          else if (self._concurrentTransmissions_ === 0) {//  {
            if (typeof self.onResponse === 'function') {
              var cb = self.onResponse;
              // release onResponse for (optional) next cb
              self.onResponse = null;
              // call final callback
              if (self.status === 'committing')
                self.status = 'committed';
              cb(self._responseError_, self, debug);
              return self;
            }
          }
        });
    
        return this;
      }
    
      Transaction.prototype.addStatementsToQueue = function(statements) {
        var self = this;
        if ((statements) && (statements.constructor === Array) && (statements.length > 0)) {
          // attach all statments
          statements.forEach(function(data){
            if (data.statement) {
              var statement = new Statement(self, data.statement, data.parameters);
              statement.position = self.statements.length;
              self.statements.push(statement);
            }
          });
        }
        return this;
      }
    
      Transaction.prototype._applyResponse = function(err, response, debug, untransmittedStatements) {
        var self = this;
        // if error on request/response
        if (self.status !== 'committing')
          self.status = 'open';
        if (err) {
          self.status = (err.status) ? err.status : err;
          if (!self.status)
            self.status = self._response_.status;
        }
        untransmittedStatements.forEach(function(statement, i){
          if (response.errors[i]) {
            statement.error = response.errors[i];
            self.errors.push(response.errors[i]);
          }
          if (response.results[i]) {
            if (self._resortResults_) {
              // move row property one level above
              // { rows: [ {}, {} ]} -> { [ {}, {} ]}
              response.results[i].data.forEach(function(data, j){
                //if ((response.results[i]) && (data.row)) {
                if (data.row) {
                  response.results[i].data[j] = data.row;
                }
              })
            }
            self.results.push(response.results[i]);
          }
        });
        if ((err)||(!response)) {
          self._responseError_ = (self._responseError_) ? self._responseError_.push(err) : self._responseError_ = [ err ];
        } else {
          self.populateWithDataFromResponse(response);
          // keep track of open transactions
          if (self.status === 'open')
            Transaction.__open_transactions__[self.id] = self;
          else
            delete Transaction.__open_transactions__[self.id];
        }
      }
    
      Transaction.prototype.toObject = function() {
        var statements = [];
        this.statements.forEach(function(stat){
          statements.push(stat.toObject());
        });
        return {
          id: this.id,
          status: this.status,
          statements: statements,
          expires: this.expires,
          uri: this.uri,
        };
      }
    
      Transaction.prototype.populateWithDataFromResponse = function(data) {
        if (data) {
          if ((data.transaction) && (data.transaction.expires))
            this.expires = new Date(data.transaction.expires);
          // exists only on POST a new transaction
          if (data.commit) {
            var match = data.commit.match(/^(.+?\/transaction\/(\d+))\/commit$/);
            this.id = Number(match[2]);
            this.uri = match[1];
          }
        }
      }
    
      Transaction.prototype.untransmittedStatements = function() {
        var statements = [];
        this.statements.forEach(function(statement){
          if ((statement)&&(!statement.status))
            statements.push(statement);
        });
        return statements;
      }
    
      Transaction.prototype.commit = function(cypher, parameters, cb) {
        if (typeof cypher === 'function') {
          cb = cypher;
        } else {
          var args = Transaction._sortTransactionArguments(cypher, parameters, cb);
          this.addStatementsToQueue(args.statements);
          cb = args.cb;
        }
        if (typeof cb !== 'function') {
          throw Error('You need to attach a callback an a commit/close operation');
        }
        this.onResponse = cb;
        this.status = 'committing';
        return this.exec(cb);
      }
    
      Transaction.prototype.close = Transaction.prototype.commit;
    
      Transaction.create = function(cypher, parameters, cb) {
        return new Transaction(cypher, parameters, cb);
      }
    
      Transaction.new = function(cypher, parameters) {
        return new Transaction(cypher, parameters);
      }
    
      Transaction.prototype.onResponse = null;
    
      Transaction._sortTransactionArguments = function(cypher, parameters, cb) {
        var statements = null;
        // we might have a Graph or CypherQuery Object
        if ((typeof cypher === 'object') && ((typeof cypher.toQuery === 'function') || (typeof cypher.statementsToString === 'function'))) {
          if (cypher.toQuery) {
            statements = [ { statement: cypher.toQuery().statementsToString(), parameters: cypher.parameters() }];
          } else {
            statements = [ { statement: cypher.statementsToString(), parameters: cypher.parameters() }];
          }
          cb = parameters;
        } else if (typeof cypher === 'string') {
          if (typeof parameters === 'function') {
            cb = parameters;
            parameters = {};
          }
          statements = [ { statement: cypher, parameters: parameters || {} } ];
        } else if ((cypher) && (cypher.constructor === Array)) {
          cb = parameters;
          statements = cypher;
        } else if ((cypher) && (cypher.statement)) {
          statements = [ cypher ];
        }
        return {
          statements: statements,
          cb: cb || null
        }
      }
    
      Transaction.prototype.rollback = function(cb) {
        var self = this;
        if ((this.id)&&(this.status!=='finalized')) {
          this.neo4jrestful.delete('/transaction/'+this.id, function(err, res, debug) {
            // remove from open_transactions
            if (!err)
              delete Transaction.__open_transactions__[self.id];
            cb(err, res, debug);
          });
        } else {
          cb(Error('You can only perform a rollback on an open transaction.'), null);
        }
        return this;
      }
    
      Transaction.prototype.undo   = Transaction.prototype.rollback;
      Transaction.prototype.delete = Transaction.prototype.rollback;
    
      Transaction.begin = function(cypher, parameters, cb) {
        return new Transaction(cypher, parameters, cb);
      }
    
      Transaction.create = Transaction.begin;
      Transaction.open = Transaction.begin;
    
      Transaction.commit = function(cypher, parameters, cb) {
        return new Transaction().commit(cypher, parameters, cb);
      }
    
      Transaction.executeAllOpenTransactions = function(cb, action) {
        // action can be commit|rollback
        if (typeof action === 'undefined')
          action = 'commit';
        var count = Object.keys(Transaction.__open_transactions__).length;
        var errors = [];
        var debugs = [];
    
        var _onDone_ = function(err, res, debug) {
          count--;
          if (err)
            errors.push(err);
          debugs.push(debug);
          if (count === 0)
            cb( ((errors.length > 0) ? errors : null), null, debugs );
        }
    
        for (var id in Transaction.__open_transactions__) {
          Transaction.__open_transactions__[id][action](_onDone_);
        }
    
        return this;
      }
    
      Transaction.commitAll   = function(cb) {
        return this.executeAllOpenTransactions(cb, 'commit');
      }
      Transaction.closeAll    = Transaction.commitAll;
    
      Transaction.rollbackAll = function(cb) {
        Transaction.executeAllOpenTransactions(cb, 'rollback');
      }
    
      Transaction.deleteAll   = Transaction.rollbackAll;
      Transaction.undoAll     = Transaction.rollbackAll;
    
      // all open transactions
      Transaction.__open_transactions__ = {};
    
      return neo4jrestful.Transaction = Transaction;
    
    }
    
    if (typeof window !== 'object') {
      module.exports = exports = {
        init: __initTransaction__
      };
    } else {
      window.Neo4jMapper.initTransaction = __initTransaction__;
    }