summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
committerAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
commit7645618fd3914cb8a20561625913c20d49504a49 (patch)
tree8370f846f58f6d71165b7a0e2eda04648584ec76 /tools
parent68c73c3c7608b4c87f07440dc3232801720b1168 (diff)
downloadmongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz
Imported Upstream version 1.6.0
Diffstat (limited to 'tools')
-rw-r--r--tools/bridge.cpp29
-rw-r--r--tools/bsondump.cpp132
-rw-r--r--tools/dump.cpp22
-rw-r--r--tools/export.cpp22
-rw-r--r--tools/files.cpp6
-rw-r--r--tools/import.cpp133
-rw-r--r--tools/restore.cpp112
-rw-r--r--tools/sniffer.cpp183
-rw-r--r--tools/stat.cpp115
-rw-r--r--tools/tool.cpp196
-rw-r--r--tools/tool.h32
11 files changed, 745 insertions, 237 deletions
diff --git a/tools/bridge.cpp b/tools/bridge.cpp
index 5535719..b0e1530 100644
--- a/tools/bridge.cpp
+++ b/tools/bridge.cpp
@@ -16,9 +16,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "../util/message.h"
#include "../client/dbclient.h"
+#include "../db/dbmessage.h"
using namespace mongo;
using namespace std;
@@ -44,11 +45,28 @@ public:
break;
}
- int oldId = m.data->id;
- if ( m.data->operation() == dbQuery || m.data->operation() == dbMsg || m.data->operation() == dbGetMore ) {
+ int oldId = m.header()->id;
+ if ( m.operation() == dbQuery || m.operation() == dbMsg || m.operation() == dbGetMore ) {
+ bool exhaust = false;
+ if ( m.operation() == dbQuery ) {
+ DbMessage d( m );
+ QueryMessage q( d );
+ exhaust = q.queryOptions & QueryOption_Exhaust;
+ }
Message response;
dest.port().call( m, response );
mp_.reply( m, response, oldId );
+ while ( exhaust ) {
+ MsgData *header = response.header();
+ QueryResult *qr = (QueryResult *) header;
+ if ( qr->cursorId ) {
+ response.reset();
+ dest.port().recv( response );
+ mp_.reply( m, response ); // m argument is ignored anyway
+ } else {
+ exhaust = false;
+ }
+ }
} else {
dest.port().say( m, oldId );
}
@@ -74,7 +92,7 @@ auto_ptr< MyListener > listener;
#if !defined(_WIN32)
void cleanup( int sig ) {
- close( listener->socket() );
+ ListeningSockets::get()->closeAll();
for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ )
(*i)->shutdown();
::exit( 0 );
@@ -125,8 +143,7 @@ int main( int argc, char **argv ) {
check( port != 0 && !destUri.empty() );
listener.reset( new MyListener( port ) );
- listener->init();
- listener->listen();
+ listener->initAndListen();
return 0;
}
diff --git a/tools/bsondump.cpp b/tools/bsondump.cpp
new file mode 100644
index 0000000..426b60e
--- /dev/null
+++ b/tools/bsondump.cpp
@@ -0,0 +1,132 @@
+// restore.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../pch.h"
+#include "../client/dbclient.h"
+#include "../util/mmap.h"
+#include "../util/text.h"
+#include "tool.h"
+
+#include <boost/program_options.hpp>
+
+#include <fcntl.h>
+
+using namespace mongo;
+
+namespace po = boost::program_options;
+
+class BSONDump : public BSONTool {
+
+ enum OutputType { JSON , DEBUG } _type;
+
+public:
+
+ BSONDump() : BSONTool( "bsondump" ){
+ add_options()
+ ("type" , po::value<string>()->default_value("json") , "type of output: json,debug" )
+ ;
+ add_hidden_options()
+ ("file" , po::value<string>() , ".bson file" )
+ ;
+ addPositionArg( "file" , 1 );
+ _noconnection = true;
+ }
+
+ virtual void printExtraHelp(ostream& out) {
+ out << "usage: " << _name << " [options] <bson filename>" << endl;
+ }
+
+ virtual int doRun(){
+ {
+ string t = getParam( "type" );
+ if ( t == "json" )
+ _type = JSON;
+ else if ( t == "debug" )
+ _type = DEBUG;
+ else {
+ cerr << "bad type: " << t << endl;
+ return 1;
+ }
+ }
+ processFile( getParam( "file" ) );
+ return 0;
+ }
+
+ bool debug( const BSONObj& o , int depth=0){
+ string prefix = "";
+ for ( int i=0; i<depth; i++ ){
+ prefix += "\t\t\t";
+ }
+
+ int read = 4;
+
+ try {
+ cout << prefix << "--- new object ---\n";
+ cout << prefix << "\t size : " << o.objsize() << "\n";
+ BSONObjIterator i(o);
+ while ( i.more() ){
+ BSONElement e = i.next();
+ cout << prefix << "\t\t " << e.fieldName() << "\n" << prefix << "\t\t\t type:" << setw(3) << e.type() << " size: " << e.size() << endl;
+ if ( ( read + e.size() ) > o.objsize() ){
+ cout << prefix << " SIZE DOES NOT WORK" << endl;
+ return false;
+ }
+ read += e.size();
+ try {
+ e.validate();
+ if ( e.isABSONObj() ){
+ if ( ! debug( e.Obj() , depth + 1 ) )
+ return false;
+ }
+ else if ( e.type() == String && ! isValidUTF8( e.valuestr() ) ){
+ cout << prefix << "\t\t\t" << "bad utf8 String!" << endl;
+ }
+ else if ( logLevel > 0 ){
+ cout << prefix << "\t\t\t" << e << endl;
+ }
+
+ }
+ catch ( std::exception& e ){
+ cout << prefix << "\t\t\t bad value: " << e.what() << endl;
+ }
+ }
+ }
+ catch ( std::exception& e ){
+ cout << prefix << "\t" << e.what() << endl;
+ }
+ return true;
+ }
+
+ virtual void gotObject( const BSONObj& o ){
+ switch ( _type ){
+ case JSON:
+ cout << o << endl;
+ break;
+ case DEBUG:
+ debug(o);
+ break;
+ default:
+ cerr << "bad type? : " << _type << endl;
+ }
+ }
+};
+
+int main( int argc , char ** argv ) {
+ BSONDump dump;
+ return dump.main( argc , argv );
+}
diff --git a/tools/dump.cpp b/tools/dump.cpp
index 52e95ce..7bb38ca 100644
--- a/tools/dump.cpp
+++ b/tools/dump.cpp
@@ -16,7 +16,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../stdafx.h"
+#include "../pch.h"
#include "../client/dbclient.h"
#include "tool.h"
@@ -31,6 +31,7 @@ public:
Dump() : Tool( "dump" , true , "*" ){
add_options()
("out,o", po::value<string>()->default_value("dump"), "output directory")
+ ("query,q", po::value<string>() , "json query" )
;
}
@@ -39,11 +40,17 @@ public:
ofstream out;
out.open( outputFile.string().c_str() , ios_base::out | ios_base::binary );
- ASSERT_STREAM_GOOD( 10262 , "couldn't open file" , out );
+ assertStreamGood( 10262 , "couldn't open file" , out );
ProgressMeter m( conn( true ).count( coll.c_str() , BSONObj() , QueryOption_SlaveOk ) );
- auto_ptr<DBClientCursor> cursor = conn( true ).query( coll.c_str() , Query().snapshot() , 0 , 0 , 0 , QueryOption_SlaveOk | QueryOption_NoCursorTimeout );
+ Query q;
+ if ( _query.isEmpty() )
+ q.snapshot();
+ else
+ q = _query;
+
+ auto_ptr<DBClientCursor> cursor = conn( true ).query( coll.c_str() , q , 0 , 0 , 0 , QueryOption_SlaveOk | QueryOption_NoCursorTimeout );
while ( cursor->more() ) {
BSONObj obj = cursor->next();
@@ -80,8 +87,14 @@ public:
}
}
-
+
int run(){
+
+ {
+ string q = getParam("query");
+ if ( q.size() )
+ _query = fromjson( q );
+ }
path root( getParam("out") );
string db = _db;
@@ -113,6 +126,7 @@ public:
return 0;
}
+ BSONObj _query;
};
int main( int argc , char ** argv ) {
diff --git a/tools/export.cpp b/tools/export.cpp
index aabebf3..5603823 100644
--- a/tools/export.cpp
+++ b/tools/export.cpp
@@ -16,7 +16,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "client/dbclient.h"
#include "db/json.h"
@@ -34,17 +34,20 @@ namespace po = boost::program_options;
class Export : public Tool {
public:
Export() : Tool( "export" ){
+ addFieldOptions();
add_options()
("query,q" , po::value<string>() , "query filter, as a JSON string" )
- ("fields,f" , po::value<string>() , "comma seperated list of field names e.g. -f name,age" )
("csv","export to csv instead of json")
("out,o", po::value<string>(), "output file; if not specified, stdout is used")
+ ("jsonArray", "output to a json array rather than one object per line")
;
+ _usesstdout = false;
}
int run(){
string ns;
const bool csv = hasParam( "csv" );
+ const bool jsonArray = hasParam( "jsonArray" );
ostream *outPtr = &cout;
string outfile = getParam( "out" );
auto_ptr<ofstream> fileStream;
@@ -76,7 +79,7 @@ public:
auth();
- if ( hasParam( "fields" ) ){
+ if ( hasParam( "fields" ) || csv ){
needFields();
fieldsToReturn = &_fieldsObj;
}
@@ -99,6 +102,9 @@ public:
out << endl;
}
+ if (jsonArray)
+ out << '[';
+
long long num = 0;
while ( cursor->more() ) {
num++;
@@ -115,10 +121,18 @@ public:
out << endl;
}
else {
- out << obj.jsonString() << endl;
+ if (jsonArray && num != 1)
+ out << ',';
+
+ out << obj.jsonString();
+
+ if (!jsonArray)
+ out << endl;
}
}
+ if (jsonArray)
+ out << ']' << endl;
cerr << "exported " << num << " records" << endl;
diff --git a/tools/files.cpp b/tools/files.cpp
index 2cbda12..5adcdae 100644
--- a/tools/files.cpp
+++ b/tools/files.cpp
@@ -16,7 +16,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "client/gridfs.h"
#include "client/dbclient.h"
@@ -140,14 +140,14 @@ public:
}
conn().getLastError();
- cout << "done!";
+ cout << "done!" << endl;
return 0;
}
if ( cmd == "delete" ){
g.removeFile(filename);
conn().getLastError();
- cout << "done!";
+ cout << "done!" << endl;
return 0;
}
diff --git a/tools/import.cpp b/tools/import.cpp
index e34e73d..6335e59 100644
--- a/tools/import.cpp
+++ b/tools/import.cpp
@@ -16,11 +16,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "client/dbclient.h"
#include "db/json.h"
#include "tool.h"
+#include "../util/text.h"
#include <fstream>
#include <iostream>
@@ -39,6 +40,10 @@ class Import : public Tool {
const char * _sep;
bool _ignoreBlanks;
bool _headerLine;
+ bool _upsert;
+ bool _doimport;
+ bool _jsonArray;
+ vector<string> _upsertFields;
void _append( BSONObjBuilder& b , const string& fieldName , const string& data ){
if ( b.appendAsNumber( fieldName , data ) )
@@ -48,10 +53,12 @@ class Import : public Tool {
return;
// TODO: other types?
- b.append( fieldName.c_str() , data );
+ b.append( fieldName , data );
}
BSONObj parseLine( char * line ){
+ uassert(13289, "Invalid UTF8 character detected", isValidUTF8(line));
+
if ( _type == JSON ){
char * end = ( line + strlen( line ) ) - 1;
while ( isspace(*end) ){
@@ -137,11 +144,21 @@ public:
("file",po::value<string>() , "file to import from; if not specified stdin is used" )
("drop", "drop collection first " )
("headerline","CSV,TSV only - use first line as headers")
+ ("upsert", "insert or update objects that already exist" )
+ ("upsertFields", po::value<string>(), "comma-separated fields for the query part of the upsert. You should make sure this is indexed" )
+ ("stopOnError", "stop importing at first error rather than continuing" )
+ ("jsonArray", "load a json array, not one item per line. Currently limited to 4MB." )
+ ;
+ add_hidden_options()
+ ("noimport", "don't actually import. useful for benchmarking parser" )
;
addPositionArg( "file" , 1 );
_type = JSON;
_ignoreBlanks = false;
_headerLine = false;
+ _upsert = false;
+ _doimport = true;
+ _jsonArray = false;
}
int run(){
@@ -183,6 +200,21 @@ public:
_ignoreBlanks = true;
}
+ if ( hasParam( "upsert" ) ){
+ _upsert = true;
+
+ string uf = getParam("upsertFields");
+ if (uf.empty()){
+ _upsertFields.push_back("_id");
+ } else {
+ StringSplitter(uf.c_str(), ",").split(_upsertFields);
+ }
+ }
+
+ if ( hasParam( "noimport" ) ){
+ _doimport = false;
+ }
+
if ( hasParam( "type" ) ){
string type = getParam( "type" );
if ( type == "json" )
@@ -207,6 +239,10 @@ public:
needFields();
}
+ if (_type == JSON && hasParam("jsonArray")){
+ _jsonArray = true;
+ }
+
int errors = 0;
int num = 0;
@@ -217,38 +253,91 @@ public:
ProgressMeter pm( fileSize );
const int BUF_SIZE = 1024 * 1024 * 4;
boost::scoped_array<char> line(new char[BUF_SIZE+2]);
- while ( *in ){
- char * buf = line.get();
- in->getline( buf , BUF_SIZE );
- uassert( 10263 , "unknown error reading file" , ( in->rdstate() & ios_base::badbit ) == 0 );
- log(1) << "got line:" << buf << endl;
+ char * buf = line.get();
+ while ( _jsonArray || in->rdstate() == 0 ){
+ if (_jsonArray){
+ if (buf == line.get()){ //first pass
+ in->read(buf, BUF_SIZE);
+ uassert(13295, "JSONArray file too large", (in->rdstate() & ios_base::eofbit));
+ buf[ in->gcount() ] = '\0';
+ }
+ } else {
+ buf = line.get();
+ in->getline( buf , BUF_SIZE );
+ log(1) << "got line:" << buf << endl;
+ }
+ uassert( 10263 , "unknown error reading file" ,
+ (!(in->rdstate() & ios_base::badbit)) &&
+ (!(in->rdstate() & ios_base::failbit) || (in->rdstate() & ios_base::eofbit)) );
- while( isspace( buf[0] ) ) buf++;
-
- int len = strlen( buf );
- if ( ! len )
- continue;
-
- buf[len+1] = 0;
+ int len = 0;
+ if (strncmp("\xEF\xBB\xBF", buf, 3) == 0){ // UTF-8 BOM (notepad is stupid)
+ buf += 3;
+ len += 3;
+ }
- if ( in->rdstate() == ios_base::eofbit )
- break;
- assert( in->rdstate() == 0 );
+ if (_jsonArray){
+ while (buf[0] != '{' && buf[0] != '\0') {
+ len++;
+ buf++;
+ }
+ if (buf[0] == '\0')
+ break;
+ } else {
+ while (isspace( buf[0] )){
+ len++;
+ buf++;
+ }
+ if (buf[0] == '\0')
+ continue;
+ len += strlen( buf );
+ }
try {
- BSONObj o = parseLine( buf );
- if ( _headerLine )
+ BSONObj o;
+ if (_jsonArray){
+ int jslen;
+ o = fromjson(buf, &jslen);
+ len += jslen;
+ buf += jslen;
+ } else {
+ o = parseLine( buf );
+ }
+
+ if ( _headerLine ){
_headerLine = false;
- else
- conn().insert( ns.c_str() , o );
+ } else if (_doimport) {
+ bool doUpsert = _upsert;
+ BSONObjBuilder b;
+ if (_upsert){
+ for (vector<string>::const_iterator it=_upsertFields.begin(), end=_upsertFields.end(); it!=end; ++it){
+ BSONElement e = o.getFieldDotted(it->c_str());
+ if (e.eoo()){
+ doUpsert = false;
+ break;
+ }
+ b.appendAs(e, *it);
+ }
+ }
+
+ if (doUpsert){
+ conn().update(ns, Query(b.obj()), o, true);
+ } else {
+ conn().insert( ns.c_str() , o );
+ }
+ }
+
+ num++;
}
catch ( std::exception& e ){
cout << "exception:" << e.what() << endl;
cout << buf << endl;
errors++;
+
+ if (hasParam("stopOnError") || _jsonArray)
+ break;
}
- num++;
if ( pm.hit( len + 1 ) ){
cout << "\t\t\t" << num << "\t" << ( num / ( time(0) - start ) ) << "/second" << endl;
}
diff --git a/tools/restore.cpp b/tools/restore.cpp
index 6fcf2d3..115297b 100644
--- a/tools/restore.cpp
+++ b/tools/restore.cpp
@@ -16,7 +16,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../stdafx.h"
+#include "../pch.h"
#include "../client/dbclient.h"
#include "../util/mmap.h"
#include "tool.h"
@@ -29,16 +29,17 @@ using namespace mongo;
namespace po = boost::program_options;
-class Restore : public Tool {
+class Restore : public BSONTool {
public:
bool _drop;
- bool _objcheck;
-
- Restore() : Tool( "restore" , true , "" , "" ) , _drop(false),_objcheck(false){
+ bool _indexesLast;
+ const char * _curns;
+
+ Restore() : BSONTool( "restore" ) , _drop(false){
add_options()
("drop" , "drop each collection before import" )
- ("objcheck" , "validate object before inserting" )
+ ("indexesLast" , "wait to add indexes (faster if data isn't inserted in index order)" )
;
add_hidden_options()
("dir", po::value<string>()->default_value("dump"), "directory to restore from")
@@ -50,11 +51,11 @@ public:
out << "usage: " << _name << " [options] [directory or filename to restore from]" << endl;
}
- int run(){
+ virtual int doRun(){
auth();
path root = getParam("dir");
_drop = hasParam( "drop" );
- _objcheck = hasParam( "objcheck" );
+ _indexesLast = hasParam("indexesLast");
/* If _db is not "" then the user specified a db name to restore as.
*
@@ -76,6 +77,7 @@ public:
if ( is_directory( root ) ) {
directory_iterator end;
directory_iterator i(root);
+ path indexes;
while ( i != end ) {
path p = *i;
i++;
@@ -98,8 +100,15 @@ public:
}
}
- drillDown(p, use_db, use_coll);
+ if ( _indexesLast && p.leaf() == "system.indexes.bson" )
+ indexes = p;
+ else
+ drillDown(p, use_db, use_coll);
}
+
+ if (!indexes.empty())
+ drillDown(indexes, use_db, use_coll);
+
return;
}
@@ -109,18 +118,29 @@ public:
return;
}
- out() << root.string() << endl;
+ log() << root.string() << endl;
+
+ if ( root.leaf() == "system.profile.bson" ){
+ log() << "\t skipping" << endl;
+ return;
+ }
string ns;
if (use_db) {
ns += _db;
- } else {
+ }
+ else {
string dir = root.branch_path().string();
if ( dir.find( "/" ) == string::npos )
ns += dir;
else
ns += dir.substr( dir.find_last_of( "/" ) + 1 );
+
+ if ( ns.size() == 0 )
+ ns = "test";
}
+
+ assert( ns.size() );
if (use_coll) {
ns += "." + _coll;
@@ -130,76 +150,22 @@ public:
ns += "." + l;
}
- long long fileLength = file_size( root );
-
- if ( fileLength == 0 ) {
- out() << "file " + root.native_file_string() + " empty, skipping" << endl;
- return;
- }
-
out() << "\t going into namespace [" << ns << "]" << endl;
if ( _drop ){
out() << "\t dropping" << endl;
conn().dropCollection( ns );
}
+
+ _curns = ns.c_str();
+ processFile( root );
+ }
- string fileString = root.string();
- ifstream file( fileString.c_str() , ios_base::in | ios_base::binary);
- if ( ! file.is_open() ){
- log() << "error opening file: " << fileString << endl;
- return;
- }
-
- log(1) << "\t file size: " << fileLength << endl;
-
- long long read = 0;
- long long num = 0;
-
- const int BUF_SIZE = 1024 * 1024 * 5;
- boost::scoped_array<char> buf_holder(new char[BUF_SIZE]);
- char * buf = buf_holder.get();
-
- ProgressMeter m( fileLength );
-
- while ( read < fileLength ) {
- file.read( buf , 4 );
- int size = ((int*)buf)[0];
- if ( size >= BUF_SIZE ){
- cerr << "got an object of size: " << size << " terminating..." << endl;
- }
- uassert( 10264 , "invalid object size" , size < BUF_SIZE );
-
- file.read( buf + 4 , size - 4 );
-
- BSONObj o( buf );
- if ( _objcheck && ! o.valid() ){
- cerr << "INVALID OBJECT - going try and pring out " << endl;
- cerr << "size: " << size << endl;
- BSONObjIterator i(o);
- while ( i.more() ){
- BSONElement e = i.next();
- try {
- e.validate();
- }
- catch ( ... ){
- cerr << "\t\t NEXT ONE IS INVALID" << endl;
- }
- cerr << "\t name : " << e.fieldName() << " " << e.type() << endl;
- cerr << "\t " << e << endl;
- }
- }
- conn().insert( ns.c_str() , o );
-
- read += o.objsize();
- num++;
-
- m.hit( o.objsize() );
- }
-
- uassert( 10265 , "counts don't match" , m.done() == fileLength );
- out() << "\t " << m.hits() << " objects" << endl;
+ virtual void gotObject( const BSONObj& obj ){
+ conn().insert( _curns , obj );
}
+
+
};
int main( int argc , char ** argv ) {
diff --git a/tools/sniffer.cpp b/tools/sniffer.cpp
index 14d32bd..abc2dc0 100644
--- a/tools/sniffer.cpp
+++ b/tools/sniffer.cpp
@@ -34,7 +34,7 @@
#undef max
#endif
-#include "../util/builder.h"
+#include "../bson/util/builder.h"
#include "../util/message.h"
#include "../util/mmap.h"
#include "../db/dbmessage.h"
@@ -74,6 +74,10 @@ using mongo::MemoryMappedFile;
int captureHeaderSize;
set<int> serverPorts;
string forwardAddress;
+bool objcheck = false;
+
+ostream *outPtr = &cout;
+ostream &out() { return *outPtr; }
/* IP header */
struct sniff_ip {
@@ -205,23 +209,23 @@ void got_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *pa
if ( bytesRemainingInMessage[ c ] == 0 ) {
m.setData( (MsgData*)payload , false );
- if ( !m.data->valid() ) {
+ if ( !m.header()->valid() ) {
cerr << "Invalid message start, skipping packet." << endl;
return;
}
- if ( size_payload > m.data->len ) {
+ if ( size_payload > m.header()->len ) {
cerr << "Multiple messages in packet, skipping packet." << endl;
return;
}
- if ( size_payload < m.data->len ) {
- bytesRemainingInMessage[ c ] = m.data->len - size_payload;
+ if ( size_payload < m.header()->len ) {
+ bytesRemainingInMessage[ c ] = m.header()->len - size_payload;
messageBuilder[ c ].reset( new BufBuilder() );
- messageBuilder[ c ]->append( (void*)payload, size_payload );
+ messageBuilder[ c ]->appendBuf( (void*)payload, size_payload );
return;
}
} else {
bytesRemainingInMessage[ c ] -= size_payload;
- messageBuilder[ c ]->append( (void*)payload, size_payload );
+ messageBuilder[ c ]->appendBuf( (void*)payload, size_payload );
if ( bytesRemainingInMessage[ c ] < 0 ) {
cerr << "Received too many bytes to complete message, resetting buffer" << endl;
bytesRemainingInMessage[ c ] = 0;
@@ -237,84 +241,103 @@ void got_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *pa
DbMessage d( m );
- cout << inet_ntoa(ip->ip_src) << ":" << ntohs( tcp->th_sport )
- << ( serverPorts.count( ntohs( tcp->th_dport ) ) ? " -->> " : " <<-- " )
- << inet_ntoa(ip->ip_dst) << ":" << ntohs( tcp->th_dport )
- << " " << d.getns()
- << " " << m.data->len << " bytes "
- << " id:" << hex << m.data->id << dec << "\t" << m.data->id;
+ out() << inet_ntoa(ip->ip_src) << ":" << ntohs( tcp->th_sport )
+ << ( serverPorts.count( ntohs( tcp->th_dport ) ) ? " -->> " : " <<-- " )
+ << inet_ntoa(ip->ip_dst) << ":" << ntohs( tcp->th_dport )
+ << " " << d.getns()
+ << " " << m.header()->len << " bytes "
+ << " id:" << hex << m.header()->id << dec << "\t" << m.header()->id;
processMessage( c , m );
}
-void processMessage( Connection& c , Message& m ){
- DbMessage d(m);
+class AuditingDbMessage : public DbMessage {
+public:
+ AuditingDbMessage( const Message &m ) : DbMessage( m ) {}
+ BSONObj nextJsObj( const char *context ) {
+ BSONObj ret = DbMessage::nextJsObj();
+ if ( objcheck && !ret.valid() ) {
+ // TODO provide more debugging info
+ cout << "invalid object in " << context << ": " << ret.hexDump() << endl;
+ }
+ return ret;
+ }
+};
- if ( m.data->operation() == mongo::opReply )
- cout << " - " << m.data->responseTo;
- cout << endl;
+void processMessage( Connection& c , Message& m ){
+ AuditingDbMessage d(m);
+
+ if ( m.operation() == mongo::opReply )
+ out() << " - " << (unsigned)m.header()->responseTo;
+ out() << endl;
- switch( m.data->operation() ){
- case mongo::opReply:{
- mongo::QueryResult* r = (mongo::QueryResult*)m.data;
- cout << "\treply" << " n:" << r->nReturned << " cursorId: " << r->cursorId << endl;
- if ( r->nReturned ){
- mongo::BSONObj o( r->data() , 0 );
- cout << "\t" << o << endl;
+ try {
+ switch( m.operation() ){
+ case mongo::opReply:{
+ mongo::QueryResult* r = (mongo::QueryResult*)m.singleData();
+ out() << "\treply" << " n:" << r->nReturned << " cursorId: " << r->cursorId << endl;
+ if ( r->nReturned ){
+ mongo::BSONObj o( r->data() , 0 );
+ out() << "\t" << o << endl;
+ }
+ break;
+ }
+ case mongo::dbQuery:{
+ mongo::QueryMessage q(d);
+ out() << "\tquery: " << q.query << " ntoreturn: " << q.ntoreturn << " ntoskip: " << q.ntoskip << endl;
+ break;
+ }
+ case mongo::dbUpdate:{
+ int flags = d.pullInt();
+ BSONObj q = d.nextJsObj( "update" );
+ BSONObj o = d.nextJsObj( "update" );
+ out() << "\tupdate flags:" << flags << " q:" << q << " o:" << o << endl;
+ break;
+ }
+ case mongo::dbInsert:{
+ out() << "\tinsert: " << d.nextJsObj( "insert" ) << endl;
+ while ( d.moreJSObjs() ) {
+ out() << "\t\t" << d.nextJsObj( "insert" ) << endl;
+ }
+ break;
+ }
+ case mongo::dbGetMore:{
+ int nToReturn = d.pullInt();
+ long long cursorId = d.pullInt64();
+ out() << "\tgetMore nToReturn: " << nToReturn << " cursorId: " << cursorId << endl;
+ break;
+ }
+ case mongo::dbDelete:{
+ int flags = d.pullInt();
+ BSONObj q = d.nextJsObj( "delete" );
+ out() << "\tdelete flags: " << flags << " q: " << q << endl;
+ break;
+ }
+ case mongo::dbKillCursors:{
+ int *x = (int *) m.singleData()->_data;
+ x++; // reserved
+ int n = *x;
+ out() << "\tkillCursors n: " << n << endl;
+ break;
+ }
+ default:
+ cerr << "*** CANNOT HANDLE TYPE: " << m.operation() << endl;
}
- break;
- }
- case mongo::dbQuery:{
- mongo::QueryMessage q(d);
- cout << "\tquery: " << q.query << " ntoreturn: " << q.ntoreturn << " ntoskip: " << q.ntoskip << endl;
- break;
- }
- case mongo::dbUpdate:{
- int flags = d.pullInt();
- BSONObj q = d.nextJsObj();
- BSONObj o = d.nextJsObj();
- cout << "\tupdate flags:" << flags << " q:" << q << " o:" << o << endl;
- break;
- }
- case mongo::dbInsert:{
- cout << "\tinsert: " << d.nextJsObj() << endl;
- while ( d.moreJSObjs() )
- cout << "\t\t" << d.nextJsObj() << endl;
- break;
- }
- case mongo::dbGetMore:{
- int nToReturn = d.pullInt();
- long long cursorId = d.pullInt64();
- cout << "\tgetMore nToReturn: " << nToReturn << " cursorId: " << cursorId << endl;
- break;
- }
- case mongo::dbDelete:{
- int flags = d.pullInt();
- BSONObj q = d.nextJsObj();
- cout << "\tdelete flags: " << flags << " q: " << q << endl;
- break;
- }
- case mongo::dbKillCursors:{
- int *x = (int *) m.data->_data;
- x++; // reserved
- int n = *x;
- cout << "\tkillCursors n: " << n << endl;
- break;
- }
- default:
- cerr << "*** CANNOT HANDLE TYPE: " << m.data->operation() << endl;
+ } catch ( ... ) {
+ cerr << "Error parsing message for operation: " << m.operation() << endl;
}
-
+
+
if ( !forwardAddress.empty() ) {
- if ( m.data->operation() != mongo::opReply ) {
+ if ( m.operation() != mongo::opReply ) {
boost::shared_ptr<DBClientConnection> conn = forwarder[ c ];
if ( !conn ) {
conn.reset(new DBClientConnection( true ));
conn->connect( forwardAddress );
forwarder[ c ] = conn;
}
- if ( m.data->operation() == mongo::dbQuery || m.data->operation() == mongo::dbGetMore ) {
- if ( m.data->operation() == mongo::dbGetMore ) {
+ if ( m.operation() == mongo::dbQuery || m.operation() == mongo::dbGetMore ) {
+ if ( m.operation() == mongo::dbGetMore ) {
DbMessage d( m );
d.pullInt();
long long &cId = d.pullInt64();
@@ -322,8 +345,8 @@ void processMessage( Connection& c , Message& m ){
}
Message response;
conn->port().call( m, response );
- QueryResult *qr = (QueryResult *) response.data;
- if ( !( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) ) {
+ QueryResult *qr = (QueryResult *) response.singleData();
+ if ( !( qr->resultFlags() & mongo::ResultFlag_CursorNotFound ) ) {
if ( qr->cursorId != 0 ) {
lastCursor[ c ] = qr->cursorId;
return;
@@ -336,9 +359,9 @@ void processMessage( Connection& c , Message& m ){
} else {
Connection r = c.reverse();
long long myCursor = lastCursor[ r ];
- QueryResult *qr = (QueryResult *) m.data;
+ QueryResult *qr = (QueryResult *) m.singleData();
long long yourCursor = qr->cursorId;
- if ( ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) )
+ if ( ( qr->resultFlags() & mongo::ResultFlag_CursorNotFound ) )
yourCursor = 0;
if ( myCursor && !yourCursor )
cerr << "Expected valid cursor in sniffed response, found none" << endl;
@@ -366,7 +389,7 @@ void processDiagLog( const char * file ){
long read = 0;
while ( read < length ){
Message m(pos,false);
- int len = m.data->len;
+ int len = m.header()->len;
DbMessage d(m);
cout << len << " " << d.getns() << endl;
@@ -389,6 +412,9 @@ void usage() {
" or a file containing output from mongod's --diaglog option.\n"
" If no source is specified, mongosniff will attempt to sniff\n"
" from one of the machine's network interfaces.\n"
+ "--objcheck Log hex representation of invalid BSON objects and nothing\n"
+ " else. Spurious messages about invalid objects may result\n"
+ " when there are dropped tcp packets.\n"
"<port0>... These parameters are used to filter sniffing. By default, \n"
" only port 27017 is sniffed.\n"
"--help Print this help message.\n"
@@ -397,6 +423,9 @@ void usage() {
int main(int argc, char **argv){
+ stringstream nullStream;
+ nullStream.clear(ios::failbit);
+
const char *dev = NULL;
char errbuf[PCAP_ERRBUF_SIZE];
pcap_t *handle;
@@ -435,6 +464,10 @@ int main(int argc, char **argv){
else
dev = args[ ++i ];
}
+ else if ( arg == string( "--objcheck" ) ) {
+ objcheck = true;
+ outPtr = &nullStream;
+ }
else {
serverPorts.insert( atoi( args[ i ] ) );
}
diff --git a/tools/stat.cpp b/tools/stat.cpp
index f66f3f1..05318a9 100644
--- a/tools/stat.cpp
+++ b/tools/stat.cpp
@@ -16,9 +16,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "client/dbclient.h"
#include "db/json.h"
+#include "../util/httpclient.h"
#include "tool.h"
@@ -38,6 +39,7 @@ namespace mongo {
_sleep = 1;
_rowNum = 0;
_showHeaders = true;
+ _http = false;
add_hidden_options()
( "sleep" , po::value<int>() , "time to sleep between calls" )
@@ -45,9 +47,12 @@ namespace mongo {
add_options()
("noheaders", "don't output column names")
("rowcount,n", po::value<int>()->default_value(0), "number of stats lines to print (0 for indefinite)")
+ ("http", "use http instead of raw db connection")
;
addPositionArg( "sleep" , 1 );
+
+ _autoreconnect = true;
}
virtual void printExtraHelp( ostream & out ){
@@ -55,7 +60,55 @@ namespace mongo {
out << "sleep time: time to wait (in seconds) between calls" << endl;
}
+ virtual void printExtraHelpAfter( ostream & out ){
+ out << "\n";
+ out << " Fields\n";
+ out << " inserts/s \t- # of inserts per second\n";
+ out << " query/s \t- # of queries per second\n";
+ out << " update/s \t- # of updates per second\n";
+ out << " delete/s \t- # of deletes per second\n";
+ out << " getmore/s \t- # of get mores (cursor batch) per second\n";
+ out << " command/s \t- # of commands per second\n";
+ out << " flushes/s \t- # of fsync flushes per second\n";
+ out << " mapped \t- amount of data mmaped (total data size) megabytes\n";
+ out << " visze \t- virtual size of process in megabytes\n";
+ out << " res \t- resident size of process in megabytes\n";
+ out << " faults/s \t- # of pages faults/sec (linux only)\n";
+ out << " locked \t- percent of time in global write lock\n";
+ out << " idx miss \t- percent of btree page misses (sampled)\n";
+ out << " q t|r|w \t- lock queue lengths (total|read|write)\n";
+ out << " conn \t- number of open connections\n";
+ }
+
+
BSONObj stats(){
+ if ( _http ){
+ HttpClient c;
+ HttpClient::Result r;
+
+ string url;
+ {
+ stringstream ss;
+ ss << "http://" << _host;
+ if ( _host.find( ":" ) == string::npos )
+ ss << ":28017";
+ ss << "/_status";
+ url = ss.str();
+ }
+
+ if ( c.get( url , &r ) != 200 ){
+ cout << "error (http): " << r.getEntireResponse() << endl;
+ return BSONObj();
+ }
+
+ BSONObj x = fromjson( r.getBody() );
+ BSONElement e = x["serverStatus"];
+ if ( e.type() != Object ){
+ cout << "BROKEN: " << x << endl;
+ return BSONObj();
+ }
+ return e.embeddedObjectUserCheck();
+ }
BSONObj out;
if ( ! conn().simpleCommand( _db , &out , "serverStatus" ) ){
cout << "error: " << out << endl;
@@ -77,7 +130,9 @@ namespace mongo {
double y = ( b.getFieldDotted( outof ).number() - a.getFieldDotted( outof ).number() );
if ( y == 0 )
return 0;
- return x / y;
+ double p = x / y;
+ p = (double)((int)(p * 1000)) / 10;
+ return p;
}
void cellstart( stringstream& ss , string name , unsigned& width ){
@@ -101,12 +156,10 @@ namespace mongo {
}
void cell( stringstream& ss , string name , unsigned width , const string& val ){
- assert( val.size() <= width );
cellstart( ss , name , width );
ss << setw(width) << val << " ";
}
-
string doRow( const BSONObj& a , const BSONObj& b ){
stringstream ss;
@@ -120,6 +173,13 @@ namespace mongo {
}
}
+ if ( b["backgroundFlushing"].type() == Object ){
+ BSONObj ax = a["backgroundFlushing"].embeddedObject();
+ BSONObj bx = b["backgroundFlushing"].embeddedObject();
+ BSONObjIterator i( bx );
+ cell( ss , "flushes/s" , 6 , (int)diff( "flushes" , ax , bx ) );
+ }
+
if ( b.getFieldDotted("mem.supported").trueValue() ){
BSONObj bx = b["mem"].embeddedObject();
BSONObjIterator i( bx );
@@ -127,10 +187,24 @@ namespace mongo {
cell( ss , "vsize" , 6 , bx["virtual"].numberInt() );
cell( ss , "res" , 6 , bx["resident"].numberInt() );
}
+
+ if ( b["extra_info"].type() == Object ){
+ BSONObj ax = a["extra_info"].embeddedObject();
+ BSONObj bx = b["extra_info"].embeddedObject();
+ if ( ax["page_faults"].type() || ax["page_faults"].type() )
+ cell( ss , "faults/s" , 6 , (int)diff( "page_faults" , ax , bx ) );
+ }
- cell( ss , "% locked" , 8 , percent( "globalLock.totalTime" , "globalLock.lockTime" , a , b ) );
- cell( ss , "% idx miss" , 8 , percent( "indexCounters.btree.accesses" , "indexCounters.btree.misses" , a , b ) );
+ cell( ss , "locked %" , 8 , percent( "globalLock.totalTime" , "globalLock.lockTime" , a , b ) );
+ cell( ss , "idx miss %" , 8 , percent( "indexCounters.btree.accesses" , "indexCounters.btree.misses" , a , b ) );
+ if ( b.getFieldDotted( "globalLock.currentQueue" ).type() == Object ){
+ int r = b.getFieldDotted( "globalLock.currentQueue.readers" ).numberInt();
+ int w = b.getFieldDotted( "globalLock.currentQueue.writers" ).numberInt();
+ stringstream temp;
+ temp << r+w << "|" << r << "|" << w;
+ cell( ss , "q t|r|w" , 10 , temp.str() );
+ }
cell( ss , "conn" , 5 , b.getFieldDotted( "connections.current" ).numberInt() );
{
@@ -142,7 +216,7 @@ namespace mongo {
<< setfill('0') << setw(2) << t.tm_min
<< ":"
<< setfill('0') << setw(2) << t.tm_sec;
- cell( ss , "time" , 8 , temp.str() );
+ cell( ss , "time" , 10 , temp.str() );
}
if ( _showHeaders && _rowNum % 20 == 0 ){
@@ -154,6 +228,13 @@ namespace mongo {
return ss.str();
}
+ virtual void preSetup(){
+ if ( hasParam( "http" ) ){
+ _http = true;
+ _noconnection = true;
+ }
+ }
+
int run(){
_sleep = getParam( "sleep" , _sleep );
if ( hasParam( "noheaders" ) ) {
@@ -167,11 +248,26 @@ namespace mongo {
while ( _rowCount == 0 || _rowNum < _rowCount ){
sleepsecs(_sleep);
- BSONObj now = stats();
+ BSONObj now;
+ try {
+ now = stats();
+ }
+ catch ( std::exception& e ){
+ cout << "can't get data: " << e.what() << endl;
+ continue;
+ }
+
if ( now.isEmpty() )
return -2;
- cout << doRow( prev , now ) << endl;
+ try {
+ cout << doRow( prev , now ) << endl;
+ }
+ catch ( AssertionException& e ){
+ cout << "\nerror: " << e.what() << "\n"
+ << now
+ << endl;
+ }
prev = now;
}
@@ -183,6 +279,7 @@ namespace mongo {
int _rowNum;
int _rowCount;
bool _showHeaders;
+ bool _http;
};
}
diff --git a/tools/tool.cpp b/tools/tool.cpp
index c9a2977..dbb3de1 100644
--- a/tools/tool.cpp
+++ b/tools/tool.cpp
@@ -24,6 +24,7 @@
#include <pcrecpp.h>
#include "util/file_allocator.h"
+#include "util/password.h"
using namespace std;
using namespace mongo;
@@ -34,24 +35,28 @@ namespace mongo {
CmdLine cmdLine;
- Tool::Tool( string name , bool localDBAllowed , string defaultDB , string defaultCollection ) :
- _name( name ) , _db( defaultDB ) , _coll( defaultCollection ) , _conn(0), _paired(false) {
-
+ Tool::Tool( string name , bool localDBAllowed , string defaultDB ,
+ string defaultCollection , bool usesstdout ) :
+ _name( name ) , _db( defaultDB ) , _coll( defaultCollection ) ,
+ _usesstdout(usesstdout), _noconnection(false), _autoreconnect(false), _conn(0), _paired(false) {
+
_options = new po::options_description( "options" );
_options->add_options()
("help","produce help message")
("verbose,v", "be more verbose (include multiple times for more verbosity e.g. -vvvvv)")
("host,h",po::value<string>(), "mongo host to connect to (\"left,right\" for pairs)" )
+ ("port",po::value<string>(), "server port. Can also use --host hostname:port" )
("db,d",po::value<string>(), "database to use" )
("collection,c",po::value<string>(), "collection to use (some commands)" )
("username,u",po::value<string>(), "username" )
- ("password,p",po::value<string>(), "password" )
+ ("password,p", new PasswordValue( &_password ), "password" )
+ ("ipv6", "enable IPv6 support (disabled by default)")
;
if ( localDBAllowed )
_options->add_options()
- ("dbpath",po::value<string>(), "directly access mongod data "
- "files in the given path, instead of connecting to a mongod "
- "instance - needs to lock the data directory, so cannot be "
+ ("dbpath",po::value<string>(), "directly access mongod database "
+ "files in the given path, instead of connecting to a mongod "
+ "server - needs to lock the data directory, so cannot be "
"used if a mongod is currently accessing the same path" )
("directoryperdb", "if dbpath specified, each db is in a separate directory" )
;
@@ -71,12 +76,10 @@ namespace mongo {
delete _conn;
}
- void Tool::printExtraHelp( ostream & out ){
- }
-
void Tool::printHelp(ostream &out) {
printExtraHelp(out);
_options->print(out);
+ printExtraHelpAfter(out);
}
int Tool::main( int argc , char ** argv ){
@@ -109,8 +112,18 @@ namespace mongo {
return EXIT_BADOPTIONS;
}
+ // hide password from ps output
+ for (int i=0; i < (argc-1); ++i){
+ if (!strcmp(argv[i], "-p") || !strcmp(argv[i], "--password")){
+ char* arg = argv[i+1];
+ while (*arg){
+ *arg++ = 'x';
+ }
+ }
+ }
+
if ( _params.count( "help" ) ){
- printHelp(cerr);
+ printHelp(cout);
return 0;
}
@@ -123,37 +136,39 @@ namespace mongo {
logLevel = s.length();
}
}
+
+ preSetup();
bool useDirectClient = hasParam( "dbpath" );
-
+
if ( ! useDirectClient ) {
_host = "127.0.0.1";
if ( _params.count( "host" ) )
_host = _params["host"].as<string>();
- if ( _host.find( "," ) == string::npos ){
- DBClientConnection * c = new DBClientConnection();
- _conn = c;
-
- string errmsg;
- if ( ! c->connect( _host , errmsg ) ){
- cerr << "couldn't connect to [" << _host << "] " << errmsg << endl;
- return -1;
- }
+ if ( _params.count( "port" ) )
+ _host += ':' + _params["port"].as<string>();
+
+ if ( _noconnection ){
+ // do nothing
}
else {
- log(1) << "using pairing" << endl;
- DBClientPaired * c = new DBClientPaired();
- _paired = true;
- _conn = c;
+ string errmsg;
- if ( ! c->connect( _host ) ){
- cerr << "couldn't connect to paired server: " << _host << endl;
+ ConnectionString cs = ConnectionString::parse( _host , errmsg );
+ if ( ! cs.isValid() ){
+ cerr << "invalid hostname [" << _host << "] " << errmsg << endl;
+ return -1;
+ }
+
+ _conn = cs.connect( errmsg );
+ if ( ! _conn ){
+ cerr << "couldn't connect to [" << _host << "] " << errmsg << endl;
return -1;
}
}
-
- cerr << "connected to: " << _host << endl;
+
+ (_usesstdout ? cout : cerr ) << "connected to: " << _host << endl;
}
else {
if ( _params.count( "directoryperdb" ) ) {
@@ -168,7 +183,7 @@ namespace mongo {
try {
acquirePathLock();
}
- catch ( DBException& e ){
+ catch ( DBException& ){
cerr << endl << "If you are running a mongod on the same "
"path you should connect to that instead of direct data "
"file access" << endl << endl;
@@ -188,8 +203,13 @@ namespace mongo {
if ( _params.count( "username" ) )
_username = _params["username"].as<string>();
- if ( _params.count( "password" ) )
- _password = _params["password"].as<string>();
+ if ( _params.count( "password" )
+ && ( _password.empty() ) ) {
+ _password = askPassword();
+ }
+
+ if (_params.count("ipv6"))
+ enableIPv6();
int ret = -1;
try {
@@ -209,14 +229,15 @@ namespace mongo {
}
DBClientBase& Tool::conn( bool slaveIfPaired ){
- if ( _paired && slaveIfPaired )
- return ((DBClientPaired*)_conn)->slaveConn();
+ // TODO: _paired is deprecated
+ if ( slaveIfPaired && _conn->type() == ConnectionString::SET )
+ return ((DBClientReplicaSet*)_conn)->slaveConn();
return *_conn;
}
void Tool::addFieldOptions(){
add_options()
- ("fields,f" , po::value<string>() , "comma seperated list of field names e.g. -f name,age" )
+ ("fields,f" , po::value<string>() , "comma separated list of field names e.g. -f name,age" )
("fieldFile" , po::value<string>() , "file with fields names - 1 per line" )
;
}
@@ -230,10 +251,10 @@ namespace mongo {
pcrecpp::StringPiece input(fields_arg);
string f;
- pcrecpp::RE re("([\\w\\.\\s]+),?" );
+ pcrecpp::RE re("([#\\w\\.\\s\\-]+),?" );
while ( re.Consume( &input, &f ) ){
_fields.push_back( f );
- b.append( f.c_str() , 1 );
+ b.append( f , 1 );
}
_fieldsObj = b.obj();
@@ -254,7 +275,7 @@ namespace mongo {
file.getline( line , BUF_SIZE );
const char * cur = line;
while ( isspace( cur[0] ) ) cur++;
- if ( strlen( cur ) == 0 )
+ if ( cur[0] == '\0' )
continue;
_fields.push_back( cur );
@@ -286,6 +307,105 @@ namespace mongo {
throw UserException( 9997 , (string)"auth failed: " + errmsg );
}
+ BSONTool::BSONTool( const char * name , bool objcheck )
+ : Tool( name , true , "" , "" ) , _objcheck( objcheck ){
+
+ add_options()
+ ("objcheck" , "validate object before inserting" )
+ ("filter" , po::value<string>() , "filter to apply before inserting" )
+ ;
+ }
+
+
+ int BSONTool::run(){
+ _objcheck = hasParam( "objcheck" );
+
+ if ( hasParam( "filter" ) )
+ _matcher.reset( new Matcher( fromjson( getParam( "filter" ) ) ) );
+
+ return doRun();
+ }
+
+ long long BSONTool::processFile( const path& root ){
+ string fileString = root.string();
+
+ long long fileLength = file_size( root );
+
+ if ( fileLength == 0 ) {
+ out() << "file " << fileString << " empty, skipping" << endl;
+ return 0;
+ }
+
+
+ FILE* file = fopen( fileString.c_str() , "rb" );
+ if ( ! file ){
+ log() << "error opening file: " << fileString << endl;
+ return 0;
+ }
+
+#if !defined(__sunos__) && defined(POSIX_FADV_SEQUENTIAL)
+ posix_fadvise(fileno(file), 0, fileLength, POSIX_FADV_SEQUENTIAL);
+#endif
+
+ log(1) << "\t file size: " << fileLength << endl;
+
+ long long read = 0;
+ long long num = 0;
+ long long processed = 0;
+
+ const int BUF_SIZE = 1024 * 1024 * 5;
+ boost::scoped_array<char> buf_holder(new char[BUF_SIZE]);
+ char * buf = buf_holder.get();
+
+ ProgressMeter m( fileLength );
+
+ while ( read < fileLength ) {
+ int readlen = fread(buf, 4, 1, file);
+ int size = ((int*)buf)[0];
+ if ( size >= BUF_SIZE ){
+ cerr << "got an object of size: " << size << " terminating..." << endl;
+ }
+ uassert( 10264 , "invalid object size" , size < BUF_SIZE );
+
+ readlen = fread(buf+4, size-4, 1, file);
+
+ BSONObj o( buf );
+ if ( _objcheck && ! o.valid() ){
+ cerr << "INVALID OBJECT - going try and pring out " << endl;
+ cerr << "size: " << size << endl;
+ BSONObjIterator i(o);
+ while ( i.more() ){
+ BSONElement e = i.next();
+ try {
+ e.validate();
+ }
+ catch ( ... ){
+ cerr << "\t\t NEXT ONE IS INVALID" << endl;
+ }
+ cerr << "\t name : " << e.fieldName() << " " << e.type() << endl;
+ cerr << "\t " << e << endl;
+ }
+ }
+
+ if ( _matcher.get() == 0 || _matcher->matches( o ) ){
+ gotObject( o );
+ processed++;
+ }
+
+ read += o.objsize();
+ num++;
+
+ m.hit( o.objsize() );
+ }
+
+ uassert( 10265 , "counts don't match" , m.done() == fileLength );
+ out() << "\t " << m.hits() << " objects found" << endl;
+ if ( _matcher.get() )
+ out() << "\t " << processed << " objects processed" << endl;
+ return processed;
+ }
+
+
void setupSignals(){}
}
diff --git a/tools/tool.h b/tools/tool.h
index 330fc2d..900c02f 100644
--- a/tools/tool.h
+++ b/tools/tool.h
@@ -35,7 +35,8 @@ namespace mongo {
class Tool {
public:
- Tool( string name , bool localDBAllowed=true, string defaultDB="test" , string defaultCollection="");
+ Tool( string name , bool localDBAllowed=true, string defaultDB="test" ,
+ string defaultCollection="", bool usesstdout=true );
virtual ~Tool();
int main( int argc , char ** argv );
@@ -71,12 +72,15 @@ namespace mongo {
}
return _db + "." + _coll;
}
+
+ virtual void preSetup(){}
virtual int run() = 0;
virtual void printHelp(ostream &out);
- virtual void printExtraHelp( ostream & out );
+ virtual void printExtraHelp( ostream & out ){}
+ virtual void printExtraHelpAfter( ostream & out ){}
protected:
@@ -90,6 +94,10 @@ namespace mongo {
string _username;
string _password;
+
+ bool _usesstdout;
+ bool _noconnection;
+ bool _autoreconnect;
void addFieldOptions();
void needFields();
@@ -98,8 +106,10 @@ namespace mongo {
BSONObj _fieldsObj;
- private:
string _host;
+
+ protected:
+
mongo::DBClientBase * _conn;
bool _paired;
@@ -111,4 +121,20 @@ namespace mongo {
};
+ class BSONTool : public Tool {
+ bool _objcheck;
+ auto_ptr<Matcher> _matcher;
+
+ public:
+ BSONTool( const char * name , bool objcheck = false );
+
+ virtual int doRun() = 0;
+ virtual void gotObject( const BSONObj& obj ) = 0;
+
+ virtual int run();
+
+ long long processFile( const path& file );
+
+ };
+
}