diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
commit | 5d342a758c6095b4d30aba0750b54f13b8916f51 (patch) | |
tree | 762e9aa84781f5e3b96db2c02d356c29cf0217c0 /tools/import.cpp | |
parent | cbe2d992e9cd1ea66af9fa91df006106775d3073 (diff) | |
download | mongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz |
Imported Upstream version 2.0.0
Diffstat (limited to 'tools/import.cpp')
-rw-r--r-- | tools/import.cpp | 327 |
1 files changed, 206 insertions, 121 deletions
diff --git a/tools/import.cpp b/tools/import.cpp index 6b59bdc..16980b0 100644 --- a/tools/import.cpp +++ b/tools/import.cpp @@ -27,6 +27,7 @@ #include <iostream> #include <boost/program_options.hpp> +#include <boost/algorithm/string.hpp> using namespace mongo; @@ -44,100 +45,215 @@ class Import : public Tool { bool _doimport; bool _jsonArray; vector<string> _upsertFields; + static const int BUF_SIZE = 1024 * 1024 * 4; + + string trimWhitespace(const string& str) { + if (str.size() == 0) { + return str; + } + size_t begin = 0; + size_t end = str.size() - 1; + while (begin < str.size() && isspace(str[begin])) { ++begin; } // Finds index of first non-whitespace character + while (end > 0 && isspace(str[end])) { --end; } // Finds index of last non-whitespace character + return str.substr(begin, end - begin + 1); + } + + void csvTokenizeRow(const string& row, vector<string>& tokens) { + bool inQuotes = false; + bool prevWasQuote = false; + bool tokenQuoted = false; + string curtoken = ""; + for (string::const_iterator it = row.begin(); it != row.end(); ++it) { + char element = *it; + if (element == '"') { + if (!inQuotes) { + inQuotes = true; + tokenQuoted = true; + curtoken = ""; + } else { + if (prevWasQuote) { + curtoken += "\""; + prevWasQuote = false; + } else { + prevWasQuote = true; + } + } + } else { + if (inQuotes && prevWasQuote) { + inQuotes = false; + prevWasQuote = false; + tokens.push_back(curtoken); + } + + if (element == ',' && !inQuotes) { + if (!tokenQuoted) { // If token was quoted, it's already been added + tokens.push_back(trimWhitespace(curtoken)); + } + curtoken = ""; + tokenQuoted = false; + } else { + curtoken += element; + } + } + } + if (!tokenQuoted || (inQuotes && prevWasQuote)) { + tokens.push_back(trimWhitespace(curtoken)); + } + } void _append( BSONObjBuilder& b , const string& fieldName , const string& data ) { - if ( b.appendAsNumber( fieldName , data ) ) + if ( _ignoreBlanks && data.size() == 0 ) return; - if ( _ignoreBlanks && data.size() == 0 ) + if ( b.appendAsNumber( fieldName , data ) ) return; // TODO: other types? - b.append( fieldName , data ); + b.append ( fieldName , data ); + } + + /* + * Reads one line from in into buf. + * Returns the number of bytes that should be skipped - the caller should + * increment buf by this amount. + */ + int getLine(istream* in, char* buf) { + if (_jsonArray) { + in->read(buf, BUF_SIZE); + uassert(13295, "JSONArray file too large", (in->rdstate() & ios_base::eofbit)); + buf[ in->gcount() ] = '\0'; + } + else { + in->getline( buf , BUF_SIZE ); + log(1) << "got line:" << buf << endl; + } + uassert( 10263 , "unknown error reading file" , + (!(in->rdstate() & ios_base::badbit)) && + (!(in->rdstate() & ios_base::failbit) || (in->rdstate() & ios_base::eofbit)) ); + + int numBytesSkipped = 0; + if (strncmp("\xEF\xBB\xBF", buf, 3) == 0) { // UTF-8 BOM (notepad is stupid) + buf += 3; + numBytesSkipped += 3; + } + + uassert(13289, "Invalid UTF8 character detected", isValidUTF8(buf)); + return numBytesSkipped; } - BSONObj parseLine( char * line ) { - uassert(13289, "Invalid UTF8 character detected", isValidUTF8(line)); + /* + * Parses a BSON object out of a JSON array. + * Returns number of bytes processed on success and -1 on failure. + */ + int parseJSONArray(char* buf, BSONObj& o) { + int len = 0; + while (buf[0] != '{' && buf[0] != '\0') { + len++; + buf++; + } + if (buf[0] == '\0') + return -1; + + int jslen; + o = fromjson(buf, &jslen); + len += jslen; - if ( _type == JSON ) { + return len; + } + + /* + * Parses one object from the input file. This usually corresponds to one line in the input + * file, unless the file is a CSV and contains a newline within a quoted string entry. + * Returns a true if a BSONObj was successfully created and false if not. + */ + bool parseRow(istream* in, BSONObj& o, int& numBytesRead) { + boost::scoped_array<char> buffer(new char[BUF_SIZE+2]); + char* line = buffer.get(); + + numBytesRead = getLine(in, line); + line += numBytesRead; + + if (line[0] == '\0') { + return false; + } + numBytesRead += strlen( line ); + + if (_type == JSON) { + // Strip out trailing whitespace char * end = ( line + strlen( line ) ) - 1; - while ( isspace(*end) ) { + while ( end >= line && isspace(*end) ) { *end = 0; end--; } - return fromjson( line ); + o = fromjson( line ); + return true; } - BSONObjBuilder b; + vector<string> tokens; + if (_type == CSV) { + string row; + bool inside_quotes = false; + size_t last_quote = 0; + while (true) { + string lineStr(line); + // Deal with line breaks in quoted strings + last_quote = lineStr.find_first_of('"'); + while (last_quote != string::npos) { + inside_quotes = !inside_quotes; + last_quote = lineStr.find_first_of('"', last_quote+1); + } - unsigned int pos=0; - while ( line[0] ) { - string name; - if ( pos < _fields.size() ) { - name = _fields[pos]; + row.append(lineStr); + + if (inside_quotes) { + row.append("\n"); + int num = getLine(in, line); + line += num; + numBytesRead += num; + + uassert (15854, "CSV file ends while inside quoted field", line[0] != '\0'); + numBytesRead += strlen( line ); + } else { + break; + } } - else { - stringstream ss; - ss << "field" << pos; - name = ss.str(); + // now 'row' is string corresponding to one row of the CSV file + // (which may span multiple lines) and represents one BSONObj + csvTokenizeRow(row, tokens); + } + else { // _type == TSV + while (line[0] != '\t' && isspace(line[0])) { // Strip leading whitespace, but not tabs + line++; } - pos++; - - bool done = false; - string data; - char * end; - if ( _type == CSV && line[0] == '"' ) { - line++; //skip first '"' - - while (true) { - end = strchr( line , '"' ); - if (!end) { - data += line; - done = true; - break; - } - else if (end[1] == '"') { - // two '"'s get appended as one - data.append(line, end-line+1); //include '"' - line = end+2; //skip both '"'s - } - else if (end[-1] == '\\') { - // "\\\"" gets appended as '"' - data.append(line, end-line-1); //exclude '\\' - data.append("\""); - line = end+1; //skip the '"' - } - else { - data.append(line, end-line); - line = end+2; //skip '"' and ',' - break; - } - } + + boost::split(tokens, line, boost::is_any_of(_sep)); + } + + // Now that the row is tokenized, create a BSONObj out of it. + BSONObjBuilder b; + unsigned int pos=0; + for (vector<string>::iterator it = tokens.begin(); it != tokens.end(); ++it) { + string token = *it; + if ( _headerLine ) { + _fields.push_back(token); } else { - end = strstr( line , _sep ); - if ( ! end ) { - done = true; - data = string( line ); + string name; + if ( pos < _fields.size() ) { + name = _fields[pos]; } else { - data = string( line , end - line ); - line = end+1; + stringstream ss; + ss << "field" << pos; + name = ss.str(); } - } + pos++; - if ( _headerLine ) { - while ( isspace( data[0] ) ) - data = data.substr( 1 ); - _fields.push_back( data ); + _append( b , name , token ); } - else - _append( b , name , data ); - - if ( done ) - break; } - return b.obj(); + o = b.obj(); + return true; } public: @@ -255,68 +371,37 @@ public: _jsonArray = true; } - int errors = 0; - - int num = 0; - time_t start = time(0); - log(1) << "filesize: " << fileSize << endl; ProgressMeter pm( fileSize ); - const int BUF_SIZE = 1024 * 1024 * 4; - boost::scoped_array<char> line(new char[BUF_SIZE+2]); - char * buf = line.get(); - while ( _jsonArray || in->rdstate() == 0 ) { - if (_jsonArray) { - if (buf == line.get()) { //first pass - in->read(buf, BUF_SIZE); - uassert(13295, "JSONArray file too large", (in->rdstate() & ios_base::eofbit)); - buf[ in->gcount() ] = '\0'; - } - } - else { - buf = line.get(); - in->getline( buf , BUF_SIZE ); - log(1) << "got line:" << buf << endl; - } - uassert( 10263 , "unknown error reading file" , - (!(in->rdstate() & ios_base::badbit)) && - (!(in->rdstate() & ios_base::failbit) || (in->rdstate() & ios_base::eofbit)) ); - - int len = 0; - if (strncmp("\xEF\xBB\xBF", buf, 3) == 0) { // UTF-8 BOM (notepad is stupid) - buf += 3; - len += 3; - } - - if (_jsonArray) { - while (buf[0] != '{' && buf[0] != '\0') { - len++; - buf++; - } - if (buf[0] == '\0') - break; - } - else { - while (isspace( buf[0] )) { - len++; - buf++; - } - if (buf[0] == '\0') - continue; - len += strlen( buf ); - } + int num = 0; + int errors = 0; + int len = 0; + // buffer and line are only used when parsing a jsonArray + boost::scoped_array<char> buffer(new char[BUF_SIZE+2]); + char* line = buffer.get(); + while ( _jsonArray || in->rdstate() == 0 ) { try { BSONObj o; if (_jsonArray) { - int jslen; - o = fromjson(buf, &jslen); - len += jslen; - buf += jslen; + int bytesProcessed = 0; + if (line == buffer.get()) { // Only read on first pass - the whole array must be on one line. + bytesProcessed = getLine(in, line); + line += bytesProcessed; + len += bytesProcessed; + } + if ((bytesProcessed = parseJSONArray(line, o)) < 0) { + len += bytesProcessed; + break; + } + len += bytesProcessed; + line += len; } else { - o = parseLine( buf ); + if (!parseRow(in, o, len)) { + continue; + } } if ( _headerLine ) { @@ -348,7 +433,7 @@ public: } catch ( std::exception& e ) { cout << "exception:" << e.what() << endl; - cout << buf << endl; + cout << line << endl; errors++; if (hasParam("stopOnError") || _jsonArray) |