diff options
Diffstat (limited to 's/writeback_listener.cpp')
-rw-r--r-- | s/writeback_listener.cpp | 254 |
1 files changed, 254 insertions, 0 deletions
diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp new file mode 100644 index 0000000..21d59d0 --- /dev/null +++ b/s/writeback_listener.cpp @@ -0,0 +1,254 @@ +// @file writeback_listener.cpp + +/** +* Copyright (C) 2010 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" + +#include "../util/timer.h" + +#include "config.h" +#include "grid.h" +#include "request.h" +#include "server.h" +#include "shard.h" +#include "util.h" +#include "client.h" + +#include "writeback_listener.h" + +namespace mongo { + + map<string,WriteBackListener*> WriteBackListener::_cache; + set<string> WriteBackListener::_seenSets; + mongo::mutex WriteBackListener::_cacheLock("WriteBackListener"); + + map<ConnectionId,WriteBackListener::WBStatus> WriteBackListener::_seenWritebacks; + mongo::mutex WriteBackListener::_seenWritebacksLock("WriteBackListener::seen"); + + WriteBackListener::WriteBackListener( const string& addr ) : _addr( addr ) { + log() << "creating WriteBackListener for: " << addr << endl; + } + + /* static */ + void WriteBackListener::init( DBClientBase& conn ) { + + if ( conn.type() == ConnectionString::SYNC ) { + // don't want write back listeners for config servers + return; + } + + if ( conn.type() != ConnectionString::SET ) { + init( conn.getServerAddress() ); + return; + } + + + { + scoped_lock lk( _cacheLock ); + if ( _seenSets.count( conn.getServerAddress() ) ) + return; + } + + // we want to do writebacks on all rs nodes + string errmsg; + ConnectionString cs = ConnectionString::parse( conn.getServerAddress() , errmsg ); + uassert( 13641 , str::stream() << "can't parse host [" << conn.getServerAddress() << "]" , cs.isValid() ); + + vector<HostAndPort> hosts = cs.getServers(); + + for ( unsigned i=0; i<hosts.size(); i++ ) + init( hosts[i].toString() ); + + } + + /* static */ + void WriteBackListener::init( const string& host ) { + scoped_lock lk( _cacheLock ); + WriteBackListener*& l = _cache[host]; + if ( l ) + return; + l = new WriteBackListener( host ); + l->go(); + } + + /* static */ + BSONObj WriteBackListener::waitFor( ConnectionId connectionId, const OID& oid ) { + Timer t; + for ( int i=0; i<5000; i++ ) { + { + scoped_lock lk( _seenWritebacksLock ); + WBStatus s = _seenWritebacks[connectionId]; + if ( oid < s.id ) { + // this means we're waiting for a GLE that already passed. + // it should be impossible becauseonce we call GLE, no other + // writebacks should happen with that connection id + msgasserted( 13633 , str::stream() << "got writeback waitfor for older id " << + " oid: " << oid << " s.id: " << s.id << " connectionId: " << connectionId ); + } + else if ( oid == s.id ) { + return s.gle; + } + + } + sleepmillis( 10 ); + } + uasserted( 13403 , str::stream() << "didn't get writeback for: " << oid << " after: " << t.millis() << " ms" ); + throw 1; // never gets here + } + + void WriteBackListener::run() { + int secsToSleep = 0; + while ( ! inShutdown() ) { + + if ( ! Shard::isAShardNode( _addr ) ) { + log(1) << _addr << " is not a shard node" << endl; + sleepsecs( 60 ); + continue; + } + + try { + ScopedDbConnection conn( _addr ); + + BSONObj result; + + { + BSONObjBuilder cmd; + cmd.appendOID( "writebacklisten" , &serverID ); // Command will block for data + if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ) { + log() << "writebacklisten command failed! " << result << endl; + conn.done(); + continue; + } + + } + + log(1) << "writebacklisten result: " << result << endl; + + BSONObj data = result.getObjectField( "data" ); + if ( data.getBoolField( "writeBack" ) ) { + string ns = data["ns"].valuestrsafe(); + + ConnectionId cid = 0; + OID wid; + if ( data["connectionId"].isNumber() && data["id"].type() == jstOID ) { + cid = data["connectionId"].numberLong(); + wid = data["id"].OID(); + } + else { + warning() << "mongos/mongod version mismatch (1.7.5 is the split)" << endl; + } + + int len; // not used, but needed for next call + Message m( (void*)data["msg"].binData( len ) , false ); + massert( 10427 , "invalid writeback message" , m.header()->valid() ); + + DBConfigPtr db = grid.getDBConfig( ns ); + ShardChunkVersion needVersion( data["version"] ); + + log(1) << "connectionId: " << cid << " writebackId: " << wid << " needVersion : " << needVersion.toString() + << " mine : " << db->getChunkManager( ns )->getVersion().toString() << endl;// TODO change to log(3) + + if ( logLevel ) log(1) << debugString( m ) << endl; + + if ( needVersion.isSet() && needVersion <= db->getChunkManager( ns )->getVersion() ) { + // this means when the write went originally, the version was old + // if we're here, it means we've already updated the config, so don't need to do again + //db->getChunkManager( ns , true ); // SERVER-1349 + } + else { + // we received a writeback object that was sent to a previous version of a shard + // the actual shard may not have the object the writeback operation is for + // we need to reload the chunk manager and get the new shard versions + db->getChunkManager( ns , true ); + } + + // do request and then call getLastError + // we have to call getLastError so we can return the right fields to the user if they decide to call getLastError + + BSONObj gle; + try { + + Request r( m , 0 ); + r.init(); + + ClientInfo * ci = r.getClientInfo(); + ci->noAutoSplit(); + + r.process(); + + ci->newRequest(); // this so we flip prev and cur shards + + BSONObjBuilder b; + if ( ! ci->getLastError( BSON( "getLastError" << 1 ) , b , true ) ) { + b.appendBool( "commandFailed" , true ); + } + gle = b.obj(); + + ci->clearSinceLastGetError(); + } + catch ( DBException& e ) { + error() << "error processing writeback: " << e << endl; + BSONObjBuilder b; + b.append( "err" , e.toString() ); + e.getInfo().append( b ); + gle = b.obj(); + } + + { + scoped_lock lk( _seenWritebacksLock ); + WBStatus& s = _seenWritebacks[cid]; + s.id = wid; + s.gle = gle; + } + } + else if ( result["noop"].trueValue() ) { + // no-op + } + else { + log() << "unknown writeBack result: " << result << endl; + } + + conn.done(); + secsToSleep = 0; + continue; + } + catch ( std::exception e ) { + + if ( inShutdown() ) { + // we're shutting down, so just clean up + return; + } + + log() << "WriteBackListener exception : " << e.what() << endl; + + // It's possible this shard was removed + Shard::reloadShardInfo(); + } + catch ( ... ) { + log() << "WriteBackListener uncaught exception!" << endl; + } + secsToSleep++; + sleepsecs(secsToSleep); + if ( secsToSleep > 10 ) + secsToSleep = 0; + } + + log() << "WriteBackListener exiting : address no longer in cluster " << _addr; + + } + +} // namespace mongo |