Dc10

Changing the HBase default LoadBalancer.

Posted by jmspaggi on 10/12/12  ~  Posted in: Non catégorisé

Having a VERY heterogeneous cluster (a.k.a.  Frankencluster), the default HBase load balancer, even if very efficient, is maybe not the best fit for my needs.

By default, HBase is balancing the regions evenly over all the regionservers. However, on my cluster, I have P4 CPUs, few CPUs with 2 cores, some with 4 an one with 8. Even between computers with the same numbers of cores, performances are very different.

I searched a way to better balance the load  between all my nodes and the best way I found so far is to produce  my customed load balancer, and increase the odds to have the tasks assigned locally (mapred.fairscheduler.locality.delay).

Before those modifications, about 20% of the tasks was running with non-local data. After balancing with below’s LoadBalancer and the updating the locality delay, 100% of the tasks was running locally! On the performances side, I have only tested with RowCounter so far, but with this MR job, the initial performances was above 10 minutes to count 101M lines, and is under 5 minutes with the new balancing.

Before going further, there is few things you need to understand about my cluster and the balancing. First, take a look at Mike’s message on the HBase list. http://www.mail-archive.com/user@hbase.apache.org/msg20722.html The default HBase balancing and Hadoop will do all the work for you in a production's  “normal” cluster. My situation here is very specific. It’s a dev cluster, not a production one. So I have more flexibility to play with it and re-arrange the regions the way I want. I can also stop/start as often as I want.  In production cluster, you might be better to simply remove the slowest nodes or upgrade them.

Also, regarding the code below, please note that it’s just a POC. There is most probably many things missing (Just take a look at the DefaultLoadBalancer and you will see what I mean...) . A full solution will have read the JMX results over the time and slowly adjust the load to balance it over all the nodes. My solution is “simply” taking the performance index provided by you and using it to share the load. On this version, the performances index is hardcoded. Which mean I need to redeploy my code each time I add a node or change something. But you can also read that from a file, and ask all the regionservers to populate the file. That way you will have a dynamic balancer. Also, with the hardcoded version, if you add a RegionServer to your cluster, no regions will be assigned to it until it’s added on the class.

One last thing. Don’t use the CPU speed index for the performance index. They are many other limitations on your regionserver. Disk access, memory, etc. For my won tests, I have based the performances on the time each regionserver is taking to build the SAME linux kernel with the SAME configuration file. Another good comparaison will be to build HBase on all the region servers, run the HBase tests in them and compare the nodes based on the time they are taking for that.

That being said and now being sure you understand the goal of this hack, here is the code and some extracts of the results.

Most of this code is "simply" a cut&past from the default load balancer. Only the balanceCluster method as been modified.

Example of balancing with this code:

Region ServerRegion Count
http://node4:60030/ 15
http://phenom:60030/ 20
http://node5:60030/ 7
http://node2:60030/ 8
http://node3:60030/ 65
http://node1:60030/ 7
http://node6:60030/ 7

(You will need to ignore the last line of the code below.)
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.common.base.Joiner;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Sets;

public class RegionServerPerformanceBalancer implements LoadBalancer
{

	protected static Map<String, Integer> performances = new HashMap<String, Integer>()
	{
		{
			put("node1", 2844);
			put("node2", 3000);
			put("node3", 25000);
			put("node4", 5789);
			put("node5", 2820);
			put("node6", 2894);
			put("phenom", 7852);
		}
	};

	private static final Log LOG = LogFactory.getLog(LoadBalancer.class);
	private static final Random RANDOM = new Random(System.currentTimeMillis());
	private Configuration config;
	private ClusterStatus status;
	private MasterServices services;

	public void setClusterStatus(ClusterStatus st)
	{
		this.status = st;
	}

	public void setMasterServices(MasterServices masterServices)
	{
		this.services = masterServices;
	}

	@Override
	public void setConf(Configuration conf)
	{
		this.config = conf;
	}

	@Override
	public Configuration getConf()
	{
		return this.config;
	}

	/**
	 * Generate a global load balancing plan according to the specified map of
	 * server information to the most loaded regions of each server.
	 * 
	 * Each server will be assigned regions based on its performances.
	 * Performances are passed to the balancer and can be changed anytime. If at
	 * the end some regions are still not assigned, they are assigned randomly
	 * to all the servers.
	 * 
	 * @param clusterState
	 *            Map of regionservers and their load/region information to a
	 *            list of their most loaded regions
	 * @return a list of regions to be moved, including source and destination,
	 *         or null if cluster is already balanced
	 */
	public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)
	{
		LOG.debug("**************** Balancing ************");

		LOG.info(performances.toString());

		int numServers = clusterState.size();
		if (numServers == 0)
		{
			LOG.debug("numServers=0 so skipping load balancing");
			return null;
		}

		List<RegionPlan> result = new ArrayList<RegionPlan>();

		// Compute total performance
		int totalPerformances = 0;
		for (int performance : performances.values())
			totalPerformances += performance;

		int numRegions = 0;

		byte[] tableName = null;
		// Count the total number of regions we have on this table, and capture
		// the table name.
		for (Map.Entry<ServerName, List<HRegionInfo>> regionsInServer : clusterState
				.entrySet())
		{
			List<HRegionInfo> regions = regionsInServer.getValue();

			for (HRegionInfo region : regions)
			{
				if (tableName == null)
					tableName = region.getTableName();
				numRegions++;
			}
		}

		LOG.info("LoadBalancing " + Bytes.toString(tableName) + " (" + numRegions
				+ " regions)");

		// RegionsToMove contains all the regions who need to be removed from
		// overloaded servers.
		Map<HRegionInfo, ServerName> regionsToMove = new HashMap<HRegionInfo, ServerName>();

		// ServersToLoad contains the list of under-loaded servers with the
		// number of extra regions they can take.
		Map<ServerName, Integer> serversToLoad = new HashMap<ServerName, Integer>();

		// We need to keep track of the most powerfull server to assign him all
		// remaining regions (should be 1 max) at the end of the proccess.
		// This is not fully required. Worst case one region will stay where it
		// is.
		ServerName bigestServerName = null;
		int bigestServerLoad = 0;

		for (Map.Entry<ServerName, List<HRegionInfo>> entryServer : clusterState
				.entrySet())
		{
			ServerName serverName = entryServer.getKey();
			int expectedLoad = Math
					.round((float) numRegions
							* ((float) performances.get(serverName.getHostname()) / (float) totalPerformances));
			if (expectedLoad >= bigestServerLoad)
			{
				bigestServerName = serverName;
				bigestServerLoad = expectedLoad;
			}

			LOG.debug("For server " + serverName + " and table "
					+ Bytes.toString(tableName) + " we have "
					+ entryServer.getValue().size()
					+ " regions asigned and we should have " + expectedLoad);
			// Here we have 3 options.
			// 1) The server is under-loaded
			// 2) The server is over-loaded
			// 3) The server has the right number of regions. (Do nothing)
			if (expectedLoad > entryServer.getValue().size()) 
			{
				// Add this server to the list of server who can take more load.
				serversToLoad.put(serverName, expectedLoad
						- entryServer.getValue().size());
			} else if (expectedLoad < entryServer.getValue().size()) 
			{
				// Put the extra regions into the moving queue. We will find a
				// target for them later.
				for (int i = expectedLoad; i < entryServer.getValue().size(); i++)
				{
					regionsToMove.put(entryServer.getValue().get(i), serverName);
				}
			}
		}

		LOG.debug("Most powerfull server is " + bigestServerName);

		LOG.debug("There is int total " + regionsToMove.size() + " regions to move.");
		int places = 0;
		for (Map.Entry<ServerName, Integer> serverToLoad : serversToLoad.entrySet())
		{
			places += serverToLoad.getValue();
			LOG.debug("Need to move " + serverToLoad.getValue() + " resiong on "
					+ serverToLoad.getKey());
		}
		LOG.debug("Total is " + places + " regions to be moved");

		for (Map.Entry<HRegionInfo, ServerName> regionToMoveEntry : regionsToMove
				.entrySet())
		{
			ServerName destServerName = null;
			// First, do that only if there is any room to move a region.
			if (serversToLoad.size() > 0)
			{
				// Find a place where to move the region.
				int serverIndex = (int) Math.floor(Math.random() * serversToLoad.size());

				// Get the destination server and remove its acceptable load by
				// 1.
				destServerName = ((Map.Entry<ServerName, Integer>) (serversToLoad
						.entrySet().toArray()[serverIndex])).getKey();
				if (serversToLoad.get(destServerName) == 1)
					serversToLoad.remove(destServerName);
				else
					serversToLoad.put(destServerName,
							serversToLoad.get(destServerName) - 1);
			} else
			{
				// If we don't have any available server to move this region,
				// simply move it to the most powerfull server
				destServerName = bigestServerName;
			}

			if (!destServerName.equals(regionToMoveEntry.getValue()))
			{
				RegionPlan regionPlan = new RegionPlan(regionToMoveEntry.getKey(),
						regionToMoveEntry.getValue(), destServerName);
				LOG.debug("Moving " + regionToMoveEntry.getKey().getRegionNameAsString()
						+ " from " + regionToMoveEntry.getValue() + " to "
						+ destServerName);
				result.add(regionPlan);
			}
		}

		LOG.debug(result.toString());

		LOG.debug("**************** Fin ******************");

		return result;
	}

	/**
	 * 
	 * Generates a bulk assignment plan to be used on cluster startup using a
	 * simple round-robin assignment.
	 * <p>
	 * Takes a list of all the regions and all the servers in the cluster and
	 * returns a map of each server to the regions that it should be assigned.
	 * <p>
	 * Currently implemented as a round-robin assignment. Same invariant as load
	 * balancing, all servers holding floor(avg) or ceiling(avg).
	 * 
	 * TODO: Use block locations from HDFS to place regions with their blocks
	 * 
	 * @param regions
	 *            all regions
	 * @param servers
	 *            all servers
	 * @return map of server to the regions it should take, or null if no
	 *         assignment is possible (ie. no regions or no servers)
	 */
	public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
			List<HRegionInfo> regions, List<ServerName> servers)
	{
		if (regions.isEmpty() || servers.isEmpty())
		{
			return null;
		}
		Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
		int numRegions = regions.size();
		int numServers = servers.size();
		int max = (int) Math.ceil((float) numRegions / numServers);
		int serverIdx = 0;
		if (numServers > 1)
		{
			serverIdx = RANDOM.nextInt(numServers);
		}
		int regionIdx = 0;
		for (int j = 0; j < numServers; j++)
		{
			ServerName server = servers.get((j + serverIdx) % numServers);
			List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
			for (int i = regionIdx; i < numRegions; i += numServers)
			{
				serverRegions.add(regions.get(i % numRegions));
			}
			assignments.put(server, serverRegions);
			regionIdx++;
		}
		return assignments;
	}

	/**
	 * Generates a bulk assignment startup plan, attempting to reuse the
	 * existing assignment information from META, but adjusting for the
	 * specified list of available/online servers available for assignment.
	 * <p>
	 * Takes a map of all regions to their existing assignment from META. Also
	 * takes a list of online servers for regions to be assigned to. Attempts to
	 * retain all assignment, so in some instances initial assignment will not
	 * be completely balanced.
	 * <p>
	 * Any leftover regions without an existing server to be assigned to will be
	 * assigned randomly to available servers.
	 * 
	 * @param regions
	 *            regions and existing assignment from meta
	 * @param servers
	 *            available servers
	 * @return map of servers and regions to be assigned to them
	 */
	public Map<ServerName, List<HRegionInfo>> retainAssignment(
			Map<HRegionInfo, ServerName> regions, List<ServerName> servers)
	{
		// Group all of the old assignments by their hostname.
		// We can't group directly by ServerName since the servers all have
		// new start-codes.

		// Group the servers by their hostname. It's possible we have multiple
		// servers on the same host on different ports.
		ArrayListMultimap<String, ServerName> serversByHostname = ArrayListMultimap
				.create();
		for (ServerName server : servers)
		{
			serversByHostname.put(server.getHostname(), server);
		}

		// Now come up with new assignments
		Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();

		for (ServerName server : servers)
		{
			assignments.put(server, new ArrayList<HRegionInfo>());
		}

		// Collection of the hostnames that used to have regions
		// assigned, but for which we no longer have any RS running
		// after the cluster restart.
		Set<String> oldHostsNoLongerPresent = Sets.newTreeSet();

		int numRandomAssignments = 0;
		int numRetainedAssigments = 0;
		for (Map.Entry<HRegionInfo, ServerName> entry : regions.entrySet())
		{
			HRegionInfo region = entry.getKey();
			ServerName oldServerName = entry.getValue();
			List<ServerName> localServers = new ArrayList<ServerName>();
			if (oldServerName != null)
			{
				localServers = serversByHostname.get(oldServerName.getHostname());
			}
			if (localServers.isEmpty())
			{
				// No servers on the new cluster match up with this hostname,
				// assign randomly.
				ServerName randomServer = servers.get(RANDOM.nextInt(servers.size()));
				assignments.get(randomServer).add(region);
				numRandomAssignments++;
				if (oldServerName != null)
					oldHostsNoLongerPresent.add(oldServerName.getHostname());
			} else if (localServers.size() == 1)
			{
				// the usual case - one new server on same host
				assignments.get(localServers.get(0)).add(region);
				numRetainedAssigments++;
			} else
			{
				// multiple new servers in the cluster on this same host
				int size = localServers.size();
				ServerName target = localServers.get(RANDOM.nextInt(size));
				assignments.get(target).add(region);
				numRetainedAssigments++;
			}
		}

		String randomAssignMsg = "";
		if (numRandomAssignments > 0)
		{
			randomAssignMsg = numRandomAssignments + " regions were assigned "
					+ "to random hosts, since the old hosts for these regions are no "
					+ "longer present in the cluster. These hosts were:\n  "
					+ Joiner.on("\n  ").join(oldHostsNoLongerPresent);
		}

		LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
				+ " retained the pre-restart assignment. " + randomAssignMsg);
		return assignments;
	}

	/**
	 * Returns an ordered list of hosts that are hosting the blocks for this
	 * region. The weight of each host is the sum of the block lengths of all
	 * files on that host, so the first host in the list is the server which
	 * holds the most bytes of the given region's HFiles.
	 * 
	 * @param fs
	 *            the filesystem
	 * @param region
	 *            region
	 * @return ordered list of hosts holding blocks of the specified region
	 */
	@SuppressWarnings("unused")
	private List<ServerName> getTopBlockLocations(FileSystem fs, HRegionInfo region)
	{
		List<ServerName> topServerNames = null;
		try
		{
			HTableDescriptor tableDescriptor = getTableDescriptor(region.getTableName());
			if (tableDescriptor != null)
			{
				HDFSBlocksDistribution blocksDistribution = HRegion
						.computeHDFSBlocksDistribution(config, tableDescriptor,
								region.getEncodedName());
				List<String> topHosts = blocksDistribution.getTopHosts();
				topServerNames = mapHostNameToServerName(topHosts);
			}
		} catch (IOException ioe)
		{
			LOG.debug("IOException during HDFSBlocksDistribution computation. for "
					+ "region = " + region.getEncodedName(), ioe);
		}

		return topServerNames;
	}

	/**
	 * return HTableDescriptor for a given tableName
	 * 
	 * @param tableName
	 *            the table name
	 * @return HTableDescriptor
	 * @throws IOException
	 */
	private HTableDescriptor getTableDescriptor(byte[] tableName) throws IOException
	{
		HTableDescriptor tableDescriptor = null;
		try
		{
			if (this.services != null)
			{
				tableDescriptor = this.services.getTableDescriptors().get(
						Bytes.toString(tableName));
			}
		} catch (FileNotFoundException fnfe)
		{
			LOG.debug("FileNotFoundException during getTableDescriptors."
					+ " Current table name = " + tableName, fnfe);
		}

		return tableDescriptor;
	}

	/**
	 * Map hostname to ServerName, The output ServerName list will have the same
	 * order as input hosts.
	 * 
	 * @param hosts
	 *            the list of hosts
	 * @return ServerName list
	 */
	private List<ServerName> mapHostNameToServerName(List<String> hosts)
	{
		if (hosts == null || status == null)
		{
			return null;
		}

		List<ServerName> topServerNames = new ArrayList<ServerName>();
		Collection<ServerName> regionServers = status.getServers();

		// create a mapping from hostname to ServerName for fast lookup
		HashMap<String, ServerName> hostToServerName = new HashMap<String, ServerName>();
		for (ServerName sn : regionServers)
		{
			hostToServerName.put(sn.getHostname(), sn);
		}

		for (String host : hosts)
		{
			ServerName sn = hostToServerName.get(host);
			// it is possible that HDFS is up ( thus host is valid ),
			// but RS is down ( thus sn is null )
			if (sn != null)
			{
				topServerNames.add(sn);
			}
		}
		return topServerNames;
	}

	/**
	 * Generates an immediate assignment plan to be used by a new master for
	 * regions in transition that do not have an already known destination.
	 * 
	 * Takes a list of regions that need immediate assignment and a list of all
	 * available servers. Returns a map of regions to the server they should be
	 * assigned to.
	 * 
	 * This method will return quickly and does not do any intelligent
	 * balancing. The goal is to make a fast decision not the best decision
	 * possible.
	 * 
	 * Currently this is random.
	 * 
	 * @param regions
	 * @param servers
	 * @return map of regions to the server it should be assigned to
	 */
	public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
			List<ServerName> servers)
	{
		Map<HRegionInfo, ServerName> assignments = new TreeMap<HRegionInfo, ServerName>();
		for (HRegionInfo region : regions)
		{
			assignments.put(region, servers.get(RANDOM.nextInt(servers.size())));
		}
		return assignments;
	}

	public ServerName randomAssignment(List<ServerName> servers)
	{
		if (servers == null || servers.isEmpty())
		{
			LOG.warn("Wanted to do random assignment but no servers to assign to");
			return null;
		}
		return servers.get(RANDOM.nextInt(servers.size()));
	}
}