// balancer_policy.cpp /** * Copyright (C) 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "pch.h" #include "config.h" #include "../client/dbclient.h" #include "../util/stringutils.h" #include "../util/unittest.h" #include "balancer_policy.h" namespace mongo { // limits map fields BSONField LimitsFields::currSize( "currSize" ); BSONField LimitsFields::hasOpsQueued( "hasOpsQueued" ); BalancerPolicy::ChunkInfo* BalancerPolicy::balance( const string& ns, const ShardToLimitsMap& shardToLimitsMap, const ShardToChunksMap& shardToChunksMap, int balancedLastTime ) { pair min("",numeric_limits::max()); pair max("",0); vector drainingShards; bool maxOpsQueued = false; for (ShardToChunksIter i = shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ) { // Find whether this shard's capacity or availability are exhausted const string& shard = i->first; BSONObj shardLimits; ShardToLimitsIter it = shardToLimitsMap.find( shard ); if ( it != shardToLimitsMap.end() ) shardLimits = it->second; const bool maxedOut = isSizeMaxed( shardLimits ); const bool draining = isDraining( shardLimits ); const bool opsQueued = hasOpsQueued( shardLimits ); // Is this shard a better chunk receiver then the current one? // Shards that would be bad receiver candidates: // + maxed out shards // + draining shards // + shards with operations queued for writeback const unsigned size = i->second.size(); if ( ! maxedOut && ! draining && ! opsQueued ) { if ( size < min.second ) { min = make_pair( shard , size ); } } else if ( opsQueued ) { LOG(1) << "won't send a chunk to: " << shard << " because it has ops queued" << endl; } else if ( maxedOut ) { LOG(1) << "won't send a chunk to: " << shard << " because it is maxedOut" << endl; } // Check whether this shard is a better chunk donor then the current one. // Draining shards take a lower priority than overloaded shards. if ( size > max.second ) { max = make_pair( shard , size ); maxOpsQueued = opsQueued; } if ( draining && (size > 0)) { drainingShards.push_back( shard ); } } // If there is no candidate chunk receiver -- they may have all been maxed out, // draining, ... -- there's not much that the policy can do. if ( min.second == numeric_limits::max() ) { log() << "no available shards to take chunks" << endl; return NULL; } if ( maxOpsQueued ) { log() << "biggest shard " << max.first << " has unprocessed writebacks, waiting for completion of migrate" << endl; return NULL; } LOG(1) << "collection : " << ns << endl; LOG(1) << "donor : " << max.second << " chunks on " << max.first << endl; LOG(1) << "receiver : " << min.second << " chunks on " << min.first << endl; if ( ! drainingShards.empty() ) { string drainingStr; joinStringDelim( drainingShards, &drainingStr, ',' ); LOG(1) << "draining : " << ! drainingShards.empty() << "(" << drainingShards.size() << ")" << endl; } // Solving imbalances takes a higher priority than draining shards. Many shards can // be draining at once but we choose only one of them to cater to per round. const int imbalance = max.second - min.second; const int threshold = balancedLastTime ? 2 : 8; string from, to; if ( imbalance >= threshold ) { from = max.first; to = min.first; } else if ( ! drainingShards.empty() ) { from = drainingShards[ rand() % drainingShards.size() ]; to = min.first; } else { // Everything is balanced here! return NULL; } const vector& chunksFrom = shardToChunksMap.find( from )->second; const vector& chunksTo = shardToChunksMap.find( to )->second; BSONObj chunkToMove = pickChunk( chunksFrom , chunksTo ); log() << "chose [" << from << "] to [" << to << "] " << chunkToMove << endl; return new ChunkInfo( ns, to, from, chunkToMove ); } BSONObj BalancerPolicy::pickChunk( const vector& from, const vector& to ) { // It is possible for a donor ('from') shard to have less chunks than a receiver one ('to') // if the donor is in draining mode. if ( to.size() == 0 ) return from[0]; if ( from[0]["min"].Obj().woCompare( to[to.size()-1]["max"].Obj() , BSONObj() , false ) == 0 ) return from[0]; if ( from[from.size()-1]["max"].Obj().woCompare( to[0]["min"].Obj() , BSONObj() , false ) == 0 ) return from[from.size()-1]; return from[0]; } bool BalancerPolicy::isSizeMaxed( BSONObj limits ) { // If there's no limit information for the shard, assume it can be a chunk receiver // (i.e., there's not bound on space utilization) if ( limits.isEmpty() ) { return false; } long long maxUsage = limits[ ShardFields::maxSize.name() ].Long(); if ( maxUsage == 0 ) { return false; } long long currUsage = limits[ LimitsFields::currSize.name() ].Long(); if ( currUsage < maxUsage ) { return false; } return true; } bool BalancerPolicy::isDraining( BSONObj limits ) { BSONElement draining = limits[ ShardFields::draining.name() ]; if ( draining.eoo() || ! draining.trueValue() ) { return false; } return true; } bool BalancerPolicy::hasOpsQueued( BSONObj limits ) { BSONElement opsQueued = limits[ LimitsFields::hasOpsQueued.name() ]; if ( opsQueued.eoo() || ! opsQueued.trueValue() ) { return false; } return true; } } // namespace mongo