diff options
Diffstat (limited to 'tools/import.cpp')
-rw-r--r-- | tools/import.cpp | 133 |
1 files changed, 111 insertions, 22 deletions
diff --git a/tools/import.cpp b/tools/import.cpp index e34e73d..6335e59 100644 --- a/tools/import.cpp +++ b/tools/import.cpp @@ -16,11 +16,12 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "stdafx.h" +#include "pch.h" #include "client/dbclient.h" #include "db/json.h" #include "tool.h" +#include "../util/text.h" #include <fstream> #include <iostream> @@ -39,6 +40,10 @@ class Import : public Tool { const char * _sep; bool _ignoreBlanks; bool _headerLine; + bool _upsert; + bool _doimport; + bool _jsonArray; + vector<string> _upsertFields; void _append( BSONObjBuilder& b , const string& fieldName , const string& data ){ if ( b.appendAsNumber( fieldName , data ) ) @@ -48,10 +53,12 @@ class Import : public Tool { return; // TODO: other types? - b.append( fieldName.c_str() , data ); + b.append( fieldName , data ); } BSONObj parseLine( char * line ){ + uassert(13289, "Invalid UTF8 character detected", isValidUTF8(line)); + if ( _type == JSON ){ char * end = ( line + strlen( line ) ) - 1; while ( isspace(*end) ){ @@ -137,11 +144,21 @@ public: ("file",po::value<string>() , "file to import from; if not specified stdin is used" ) ("drop", "drop collection first " ) ("headerline","CSV,TSV only - use first line as headers") + ("upsert", "insert or update objects that already exist" ) + ("upsertFields", po::value<string>(), "comma-separated fields for the query part of the upsert. You should make sure this is indexed" ) + ("stopOnError", "stop importing at first error rather than continuing" ) + ("jsonArray", "load a json array, not one item per line. Currently limited to 4MB." ) + ; + add_hidden_options() + ("noimport", "don't actually import. useful for benchmarking parser" ) ; addPositionArg( "file" , 1 ); _type = JSON; _ignoreBlanks = false; _headerLine = false; + _upsert = false; + _doimport = true; + _jsonArray = false; } int run(){ @@ -183,6 +200,21 @@ public: _ignoreBlanks = true; } + if ( hasParam( "upsert" ) ){ + _upsert = true; + + string uf = getParam("upsertFields"); + if (uf.empty()){ + _upsertFields.push_back("_id"); + } else { + StringSplitter(uf.c_str(), ",").split(_upsertFields); + } + } + + if ( hasParam( "noimport" ) ){ + _doimport = false; + } + if ( hasParam( "type" ) ){ string type = getParam( "type" ); if ( type == "json" ) @@ -207,6 +239,10 @@ public: needFields(); } + if (_type == JSON && hasParam("jsonArray")){ + _jsonArray = true; + } + int errors = 0; int num = 0; @@ -217,38 +253,91 @@ public: ProgressMeter pm( fileSize ); const int BUF_SIZE = 1024 * 1024 * 4; boost::scoped_array<char> line(new char[BUF_SIZE+2]); - while ( *in ){ - char * buf = line.get(); - in->getline( buf , BUF_SIZE ); - uassert( 10263 , "unknown error reading file" , ( in->rdstate() & ios_base::badbit ) == 0 ); - log(1) << "got line:" << buf << endl; + char * buf = line.get(); + while ( _jsonArray || in->rdstate() == 0 ){ + if (_jsonArray){ + if (buf == line.get()){ //first pass + in->read(buf, BUF_SIZE); + uassert(13295, "JSONArray file too large", (in->rdstate() & ios_base::eofbit)); + buf[ in->gcount() ] = '\0'; + } + } else { + buf = line.get(); + in->getline( buf , BUF_SIZE ); + log(1) << "got line:" << buf << endl; + } + uassert( 10263 , "unknown error reading file" , + (!(in->rdstate() & ios_base::badbit)) && + (!(in->rdstate() & ios_base::failbit) || (in->rdstate() & ios_base::eofbit)) ); - while( isspace( buf[0] ) ) buf++; - - int len = strlen( buf ); - if ( ! len ) - continue; - - buf[len+1] = 0; + int len = 0; + if (strncmp("\xEF\xBB\xBF", buf, 3) == 0){ // UTF-8 BOM (notepad is stupid) + buf += 3; + len += 3; + } - if ( in->rdstate() == ios_base::eofbit ) - break; - assert( in->rdstate() == 0 ); + if (_jsonArray){ + while (buf[0] != '{' && buf[0] != '\0') { + len++; + buf++; + } + if (buf[0] == '\0') + break; + } else { + while (isspace( buf[0] )){ + len++; + buf++; + } + if (buf[0] == '\0') + continue; + len += strlen( buf ); + } try { - BSONObj o = parseLine( buf ); - if ( _headerLine ) + BSONObj o; + if (_jsonArray){ + int jslen; + o = fromjson(buf, &jslen); + len += jslen; + buf += jslen; + } else { + o = parseLine( buf ); + } + + if ( _headerLine ){ _headerLine = false; - else - conn().insert( ns.c_str() , o ); + } else if (_doimport) { + bool doUpsert = _upsert; + BSONObjBuilder b; + if (_upsert){ + for (vector<string>::const_iterator it=_upsertFields.begin(), end=_upsertFields.end(); it!=end; ++it){ + BSONElement e = o.getFieldDotted(it->c_str()); + if (e.eoo()){ + doUpsert = false; + break; + } + b.appendAs(e, *it); + } + } + + if (doUpsert){ + conn().update(ns, Query(b.obj()), o, true); + } else { + conn().insert( ns.c_str() , o ); + } + } + + num++; } catch ( std::exception& e ){ cout << "exception:" << e.what() << endl; cout << buf << endl; errors++; + + if (hasParam("stopOnError") || _jsonArray) + break; } - num++; if ( pm.hit( len + 1 ) ){ cout << "\t\t\t" << num << "\t" << ( num / ( time(0) - start ) ) << "/second" << endl; } |