基于memcached for java 实现通用分布式缓存和集群分布式缓存
2016-11-14
本文参考借鉴:http://guazi.iteye.com/blog/107164
前提:基于memcached client for java 的基础进行的二次封装,实现缓存存储的两种模式:通用分布式缓存和集群分布式缓存。以下是对于memcached client for Java 二次封装的UML图。
对于memcached的客户端初始化在CacheFactory中通过读取配置文件cacheConfig.xml完成。通用分布式缓存,只是一个简单的封装,利用memcached client for java提供的分布式支持来实现,这里主要说一下clusterCache的实现思想:对存入的缓存对象的key值进行一次hash,找到对应的服务器存入,然后根据一定的规则再次进行hash,找到另外一个不同的服务器存入,取缓存时,先对要取的key值进行一次hash,找到主服务器,如果获取失败或者获取到的值为null,就对key进行再次hash,找到其从服务器,从这台服务器取缓存结果(如果取到结果就异步的更新到主服务器),这样就形成了主从式集群缓存。特点是:没有绝对的主节点和从节点,正常情况下所有服务器共同承担缓存服务器,在一台服务器出现异常时其他服务器共同承担增加的访问压力。
拓扑结构如下:
源代码
package com.yx.cache; public interface Cache<T> { /** * 获取缓存中的数据 * * @param key * @return */ T get(String key); /** * 把数据放入缓存 如果存在与key对应的值,则返回失败 * * @param key * @param value * @return */ boolean add(String key, T value); /** * 把数据放入缓存 如果存在与key对应的值,则覆盖原有的值 * * @param key * @param value * @return */ boolean set(String key, T value); /** * 缓存更新 如果不存在与key对应的缓存值,则不更新 * * @param key * @param value * @return */ boolean update(String key, T value); /** * 删除缓存 * * @param key * @return */ boolean delete(String key); }
package com.yx.cache; import com.danga.MemCached.MemCachedClient; public class CommonCache<T> implements Cache<T> { private static MemCachedClient memCachedClient = null; private String base = null; CommonCache(Class<T> t, MemCachedClient client) { memCachedClient = client; base = t.getSimpleName() + "-"; } public T get(String key) { return (T) memCachedClient.get(base + key); } public boolean set(String key, T value) { return memCachedClient.set(base + key, value); } @Override public boolean update(String key, T value) { return memCachedClient.replace(base + key, value); } @Override public boolean delete(String key) { return memCachedClient.delete(base + key); } @Override public boolean add(String key, T value) { return memCachedClient.add(base + key, value); } }
package com.yx.cache; import com.danga.MemCached.MemCachedClient; import com.schooner.MemCached.SchoonerSockIOPool; import com.yx.cache.util.HashCodeUtil; import com.yx.task.ThreadPoolManager; public class ClusterCache<T> implements Cache<T> { private static MemCachedClient memCachedClient = null; private static ThreadPoolManager taskManager = ThreadPoolManager .getInstance("cache"); private String base = null; private SchoonerSockIOPool pool = SchoonerSockIOPool.getInstance(); ClusterCache(Class<T> t, MemCachedClient client) { memCachedClient = client; base = "i-" + t.getSimpleName() + "-"; } @Override public T get(String key) { T value = null; if (key == null) { return null; } key = base + key; if (pool.getServers().length < 2) { value = (T) memCachedClient.get(key); } else { int hashCode = HashCodeUtil.getHash(key); value = (T) memCachedClient.get(key, hashCode); if (value == null) { hashCode = this.getRehashCode(key, hashCode); value = (T) memCachedClient.get(key, hashCode); if (value != null) {// 如果在另外一台服务器上取到了缓存,则恢复第一台服务器 UpdateTask task = new UpdateTask(key, value); taskManager.submit(task); } } } return value; } @Override public boolean set(String key, T value) { if (key == null) { return false; } key = base + key; boolean result = false; if (pool.getServers().length < 2) { result = memCachedClient.set(key, value); } else { int hashCode = HashCodeUtil.getHash(key); result = memCachedClient.set(key, value, hashCode); // if (result) { hashCode = getRehashCode(key, hashCode); memCachedClient.set(key, value, hashCode); // } } return result; } private int getRehashCode(String key, int oldHashcode) { String host = pool.getHost(key, oldHashcode); int rehashTries = 0; // if (result) { int hashCode = HashCodeUtil.getHash(rehashTries + key); while (host.equals(pool.getHost(key, hashCode))) { rehashTries++; hashCode = HashCodeUtil.getHash(rehashTries + key); } return hashCode; } @Override public boolean update(String key, T value) { if (key == null) { return false; } key = base + key; boolean result = false; if (pool.getServers().length < 2) { result = memCachedClient.replace(key, value); } else { int hashCode = HashCodeUtil.getHash(key); result = memCachedClient.replace(key, value, hashCode); // if (result) { hashCode = getRehashCode(key, hashCode); memCachedClient.replace(key, value, hashCode); // } } return result; } @Override public boolean delete(String key) { if (key == null) { return false; } key = base + key; boolean result = false; if (pool.getServers().length < 2) { result = memCachedClient.delete(key); } else { int hashCode = HashCodeUtil.getHash(key); result = memCachedClient.delete(key, hashCode, null); // if (result) { hashCode = this.getRehashCode(key, hashCode); memCachedClient.delete(key, hashCode, null); // } } return result; } @Override public boolean add(String key, T value) { if (key == null) { return false; } key = base + key; boolean result = false; if (pool.getServers().length < 2) { result = memCachedClient.add(key, value); } else { int hashCode = HashCodeUtil.getHash(key); result = memCachedClient.add(key, value, hashCode); // if (result) { hashCode = getRehashCode(key, hashCode); memCachedClient.add(key, value, hashCode); // } } return result; } static class UpdateTask implements Runnable { private String key; private Object value; UpdateTask(String key, Object value) { this.key = key; this.value = value; } @Override public void run() { memCachedClient.set(key, value, HashCodeUtil.getHash(key)); } } }
package com.yx.cache; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import com.danga.MemCached.MemCachedClient; import com.danga.MemCached.SockIOPool; import com.yx.cache.util.ConfigUtil; public class CacheFactory { private static MemCachedClient memCachedClient = null; @SuppressWarnings("rawtypes") private static final Map<String, Cache> map = new ConcurrentHashMap<String, Cache>(); static { String serverStr = ConfigUtil.getConfigValue("servers", ""); List<String> servers = new ArrayList<String>(); for (String s : serverStr.split(",")) { s = s.trim(); if (!"".equals(s)) { servers.add(s); } } if (servers.size() < 1) { throw new RuntimeException("cache 初始化失败!"); } SockIOPool pool = SockIOPool.getInstance(); pool.setServers(servers.toArray(new String[] {})); pool.setFailover(Boolean.valueOf(ConfigUtil.getConfigValue("failover", "true"))); pool.setInitConn(Integer.valueOf(ConfigUtil.getConfigValue("initConn", "100"))); pool.setMinConn(Integer.valueOf(ConfigUtil.getConfigValue("minConn", "25"))); pool.setMaxConn(Integer.valueOf(ConfigUtil.getConfigValue("maxConn", "250"))); pool.setMaintSleep(Integer.valueOf(ConfigUtil.getConfigValue( "maintSleep", "30"))); pool.setNagle(Boolean.valueOf(ConfigUtil.getConfigValue("nagle", "false")));// 关闭nagle算法 pool.setSocketTO(Integer.valueOf(ConfigUtil.getConfigValue("socketTO", "3000"))); pool.setAliveCheck(Boolean.valueOf(ConfigUtil.getConfigValue( "aliveCheck", "true"))); pool.setHashingAlg(Integer.valueOf(ConfigUtil.getConfigValue( "hashingAlg", "0"))); pool.setSocketConnectTO(Integer.valueOf(ConfigUtil.getConfigValue( "socketConnectTO", "3000"))); String wStr = ConfigUtil.getConfigValue("weights", ""); List<Integer> weights = new ArrayList<Integer>(); for (String s : wStr.split(",")) { s = s.trim(); if (!"".equals(s)) { weights.add(Integer.valueOf(s)); } } if (weights.size() == servers.size()) { pool.setWeights(weights.toArray(new Integer[] {})); } pool.initialize(); memCachedClient = new MemCachedClient(); } public static <T> Cache<T> getCommonCache(Class<T> t) { Cache<T> cache = map.get(t.getName()); if (cache == null) { cache = createCommonCache(t); } return cache; } public static <T> Cache<T> getClusterCache(Class<T> t) { Cache<T> cache = map.get("i-" + t.getName()); if (cache == null) { cache = createClusterCache(t); } return cache; } private static synchronized <T> Cache<T> createCommonCache(Class<T> t) { Cache<T> cache = map.get(t.getName()); if (cache == null) { cache = new CommonCache<T>(t, memCachedClient); map.put(t.getName(), cache); } return cache; } private static synchronized <T> Cache<T> createClusterCache(Class<T> t) { Cache<T> cache = map.get(t.getName()); if (cache == null) { cache = new ClusterCache<T>(t, memCachedClient); map.put(t.getName(), cache); } return cache; } }
ConfigUtil.Java和HashCodeUtil.java
package com.yx.cache.util; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.SAXReader; public class ConfigUtil { private static final String CONFILE = "cacheConfig.xml"; private static final Map<String, String> map = new HashMap<String, String>(); static { SAXReader saxReader = new SAXReader(); InputStream ins = ConfigUtil.class.getClassLoader() .getResourceAsStream(CONFILE); try { if (ins != null) { Document doc = saxReader.read(ins); Element root = doc.getRootElement(); Iterator<Element> iter = root.elementIterator(); while (iter.hasNext()) { Element e = iter.next(); map.put(e.getName(), e.getTextTrim()); } } } catch (DocumentException e) { // TODO Auto-generated catch block e.printStackTrace(); throw new RuntimeException("找不到配置文件:" + CONFILE); } finally { try { if (ins != null) { ins.close(); } else { throw new RuntimeException("找不到配置文件:" + CONFILE); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static String getConfigValue(String key, String defaultValue) { String tmp = map.get(key); return isEmpty(tmp) ? defaultValue : tmp; } public static void main(String[] args) { System.out.println(map); } private static boolean isEmpty(String str) { if (str == null || "".equals(str)) { return true; } return false; } }
package com.yx.cache.util; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.zip.CRC32; import com.schooner.MemCached.SchoonerSockIOPool; public class HashCodeUtil { public static final int NATIVE_HASH = 0; // native String.hashCode(); public static final int OLD_COMPAT_HASH = 1; // original compatibility public static final int NEW_COMPAT_HASH = 2; // new CRC32 based public static final int CONSISTENT_HASH = 3; // MD5 Based -- Stops private static int hashingAlg = SchoonerSockIOPool.getInstance() .getHashingAlg(); /** * Returns a bucket to check for a given key. * * @param key * String key cache is stored under * @return int bucket */ public static final int getHash(String key) { switch (hashingAlg) { case NATIVE_HASH: return key.hashCode(); case OLD_COMPAT_HASH: return origCompatHashingAlg(key); case NEW_COMPAT_HASH: return newCompatHashingAlg(key); case CONSISTENT_HASH: return md5HashingAlg(key); default: // use the native hash as a default hashingAlg = NATIVE_HASH; return key.hashCode(); } } private static int origCompatHashingAlg(String key) { int hash = 0; char[] cArr = key.toCharArray(); for (int i = 0; i < cArr.length; ++i) { hash = (hash * 33) + cArr[i]; } return hash; } private static int newCompatHashingAlg(String key) { CRC32 checksum = new CRC32(); checksum.update(key.getBytes()); int crc = (int) checksum.getValue(); return (crc >> 16) & 0x7fff; } private static int md5HashingAlg(String key) { MessageDigest md5 = MD5.get(); md5.reset(); md5.update(key.getBytes()); byte[] bKey = md5.digest(); int res = ((bKey[3] & 0xFF) << 24) | ((bKey[2] & 0xFF) << 16) | ((bKey[1] & 0xFF) << 8) | (bKey[0] & 0xFF); return res; } private static ThreadLocal<MessageDigest> MD5 = new ThreadLocal<MessageDigest>() { @Override protected final MessageDigest initialValue() { try { return MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(" no md5 algorythm found"); } } }; }
package com.yx.task; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; /** * @author liuyuxiao * @Date 2011-5-30 下午04:34:16 */ public class ThreadPoolManager { private static final Map<String, ThreadPoolManager> map = new HashMap<String, ThreadPoolManager>(); final int CORE_SIZE = 5; private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors .newFixedThreadPool(CORE_SIZE); public void submit(Runnable task) { executor.submit(task); } public boolean finished() { return executor.getCompletedTaskCount() == executor.getTaskCount(); } private ThreadPoolManager() { } public static synchronized ThreadPoolManager getInstance(String key) { ThreadPoolManager t = map.get(key); if (t == null) { t = new ThreadPoolManager(); map.put(key, t); } return t; } }
package com.yx.cache.test; import com.yx.cache.Cache; import com.yx.cache.CacheFactory; public class TestCommonCache { /** * @param args */ public static void main(String[] args) { Cache<String> cache = CacheFactory.getCommonCache(String.class); int count = 0; for (int i = 0; i < 100; i++) { // cache.set("" + i, "Hello!" + i); String result = cache.get("" + i); // System.out.println(String.format("set( %d ): %s", i, success)); if (result == null) { count++; } System.out.println(String.format("get( %d ): %s", i, result)); } System.out.println(count); // for (int i = 0; i < 500; i++) { // MemTask task = new MemTask(); // Thread t = new Thread(task); // t.start(); // } } }
package com.yx.cache.test; import com.yx.cache.Cache; import com.yx.cache.CacheFactory; public class TestClusterCache { public static void main(String[] args) { Cache<String> cache = CacheFactory.getClusterCache(String.class); int count = 0; for (int i = 0; i < 100; i++) { // cache.set("" + i, "Hello!" + i); String result = cache.get("" + i); // System.out.println(String.format("set( %d ): %s", i, success)); if (result == null) { count++; } System.out.println(String.format("get( %d ): %s", i, result)); } } }