summaryrefslogtreecommitdiff
path: root/tools/import.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'tools/import.cpp')
-rw-r--r--tools/import.cpp133
1 files changed, 111 insertions, 22 deletions
diff --git a/tools/import.cpp b/tools/import.cpp
index e34e73d..6335e59 100644
--- a/tools/import.cpp
+++ b/tools/import.cpp
@@ -16,11 +16,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "client/dbclient.h"
#include "db/json.h"
#include "tool.h"
+#include "../util/text.h"
#include <fstream>
#include <iostream>
@@ -39,6 +40,10 @@ class Import : public Tool {
const char * _sep;
bool _ignoreBlanks;
bool _headerLine;
+ bool _upsert;
+ bool _doimport;
+ bool _jsonArray;
+ vector<string> _upsertFields;
void _append( BSONObjBuilder& b , const string& fieldName , const string& data ){
if ( b.appendAsNumber( fieldName , data ) )
@@ -48,10 +53,12 @@ class Import : public Tool {
return;
// TODO: other types?
- b.append( fieldName.c_str() , data );
+ b.append( fieldName , data );
}
BSONObj parseLine( char * line ){
+ uassert(13289, "Invalid UTF8 character detected", isValidUTF8(line));
+
if ( _type == JSON ){
char * end = ( line + strlen( line ) ) - 1;
while ( isspace(*end) ){
@@ -137,11 +144,21 @@ public:
("file",po::value<string>() , "file to import from; if not specified stdin is used" )
("drop", "drop collection first " )
("headerline","CSV,TSV only - use first line as headers")
+ ("upsert", "insert or update objects that already exist" )
+ ("upsertFields", po::value<string>(), "comma-separated fields for the query part of the upsert. You should make sure this is indexed" )
+ ("stopOnError", "stop importing at first error rather than continuing" )
+ ("jsonArray", "load a json array, not one item per line. Currently limited to 4MB." )
+ ;
+ add_hidden_options()
+ ("noimport", "don't actually import. useful for benchmarking parser" )
;
addPositionArg( "file" , 1 );
_type = JSON;
_ignoreBlanks = false;
_headerLine = false;
+ _upsert = false;
+ _doimport = true;
+ _jsonArray = false;
}
int run(){
@@ -183,6 +200,21 @@ public:
_ignoreBlanks = true;
}
+ if ( hasParam( "upsert" ) ){
+ _upsert = true;
+
+ string uf = getParam("upsertFields");
+ if (uf.empty()){
+ _upsertFields.push_back("_id");
+ } else {
+ StringSplitter(uf.c_str(), ",").split(_upsertFields);
+ }
+ }
+
+ if ( hasParam( "noimport" ) ){
+ _doimport = false;
+ }
+
if ( hasParam( "type" ) ){
string type = getParam( "type" );
if ( type == "json" )
@@ -207,6 +239,10 @@ public:
needFields();
}
+ if (_type == JSON && hasParam("jsonArray")){
+ _jsonArray = true;
+ }
+
int errors = 0;
int num = 0;
@@ -217,38 +253,91 @@ public:
ProgressMeter pm( fileSize );
const int BUF_SIZE = 1024 * 1024 * 4;
boost::scoped_array<char> line(new char[BUF_SIZE+2]);
- while ( *in ){
- char * buf = line.get();
- in->getline( buf , BUF_SIZE );
- uassert( 10263 , "unknown error reading file" , ( in->rdstate() & ios_base::badbit ) == 0 );
- log(1) << "got line:" << buf << endl;
+ char * buf = line.get();
+ while ( _jsonArray || in->rdstate() == 0 ){
+ if (_jsonArray){
+ if (buf == line.get()){ //first pass
+ in->read(buf, BUF_SIZE);
+ uassert(13295, "JSONArray file too large", (in->rdstate() & ios_base::eofbit));
+ buf[ in->gcount() ] = '\0';
+ }
+ } else {
+ buf = line.get();
+ in->getline( buf , BUF_SIZE );
+ log(1) << "got line:" << buf << endl;
+ }
+ uassert( 10263 , "unknown error reading file" ,
+ (!(in->rdstate() & ios_base::badbit)) &&
+ (!(in->rdstate() & ios_base::failbit) || (in->rdstate() & ios_base::eofbit)) );
- while( isspace( buf[0] ) ) buf++;
-
- int len = strlen( buf );
- if ( ! len )
- continue;
-
- buf[len+1] = 0;
+ int len = 0;
+ if (strncmp("\xEF\xBB\xBF", buf, 3) == 0){ // UTF-8 BOM (notepad is stupid)
+ buf += 3;
+ len += 3;
+ }
- if ( in->rdstate() == ios_base::eofbit )
- break;
- assert( in->rdstate() == 0 );
+ if (_jsonArray){
+ while (buf[0] != '{' && buf[0] != '\0') {
+ len++;
+ buf++;
+ }
+ if (buf[0] == '\0')
+ break;
+ } else {
+ while (isspace( buf[0] )){
+ len++;
+ buf++;
+ }
+ if (buf[0] == '\0')
+ continue;
+ len += strlen( buf );
+ }
try {
- BSONObj o = parseLine( buf );
- if ( _headerLine )
+ BSONObj o;
+ if (_jsonArray){
+ int jslen;
+ o = fromjson(buf, &jslen);
+ len += jslen;
+ buf += jslen;
+ } else {
+ o = parseLine( buf );
+ }
+
+ if ( _headerLine ){
_headerLine = false;
- else
- conn().insert( ns.c_str() , o );
+ } else if (_doimport) {
+ bool doUpsert = _upsert;
+ BSONObjBuilder b;
+ if (_upsert){
+ for (vector<string>::const_iterator it=_upsertFields.begin(), end=_upsertFields.end(); it!=end; ++it){
+ BSONElement e = o.getFieldDotted(it->c_str());
+ if (e.eoo()){
+ doUpsert = false;
+ break;
+ }
+ b.appendAs(e, *it);
+ }
+ }
+
+ if (doUpsert){
+ conn().update(ns, Query(b.obj()), o, true);
+ } else {
+ conn().insert( ns.c_str() , o );
+ }
+ }
+
+ num++;
}
catch ( std::exception& e ){
cout << "exception:" << e.what() << endl;
cout << buf << endl;
errors++;
+
+ if (hasParam("stopOnError") || _jsonArray)
+ break;
}
- num++;
if ( pm.hit( len + 1 ) ){
cout << "\t\t\t" << num << "\t" << ( num / ( time(0) - start ) ) << "/second" << endl;
}