对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据量,但是对于大图,优化空间有限。
####背景:####
当下的极大团算法都是串行化的算法,基于Bron–Kerbosch算法
####思路:####
spark graphx提供了连通图的算法,连通图和极大团都是无向图中的概念,极大团为连通图的子集
利用spark graphx 找出连通图,在从各个连通图中,利用串行化的极大团算法,找出极大团 (伪并行化)
对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据量,但是对于大图,优化空间有限
期待真正的并行化的极大团算法
####配置文件:####
- graph_data_path=hdfs://localhost/graph_data
- out_path=hdfs://localhost/clique
- ck_path=hdfs://localhost/checkpoint
- numIter=50剪枝次数
- count=3极大团顶点数大小
- algorithm=2极大团算法,1:个人实现2:jgrapht
- percent=90剪枝后的顶点数,占前一次的百分比,如果剪完后,还剩下90%的数据,那么剪枝效率已然不高
- spark.master=local
- spark.app.name=graph
- spark.serializer=org.apache.spark.serializer.KryoSerializer
- spark.yarn.executor.memoryOverhead=20480
- spark.yarn.driver.memoryOverhead=20480
- spark.driver.extraJavaOptions=-XX:+UseG1GC-XX:+UseCompressedOops-XX:+DisableExplicitGC
- spark.executor.extraJavaOptions=-XX:+UseG1GC-XX:+UseCompressedOops-XX:+DisableExplicitGC
- spark.driver.maxResultSize=10g
- spark.default.parallelism=60
jgrapht
####样本数据:####
{"src":"0","dst":"1"} {"src":"0","dst":"2"} {"src":"0","dst":"3"} {"src":"1","dst":"0"} {"src":"2","dst":"1"} {"src":"3","dst":"5"} {"src":"4","dst":"6"} {"src":"5","dst":"4"} {"src":"6","dst":"5"} {"src":"3","dst":"2"} {"src":"2","dst":"3"} {"src":"6","dst":"4"} {"src":"3","dst":"4"} {"src":"4","dst":"3"} {"src":"2","dst":"6"} {"src":"6","dst":"2"} {"src":"6","dst":"7"} {"src":"7","dst":"6"}
####样本图:####
####输出:####
0,1,2 0,2,3 3,4,5 4,5,6
####代码实现:####
- importjava.utilimportjava.util.Properties
- importorg.apache.spark.broadcast.Broadcast
- importorg.apache.spark.graphx.{Edge,Graph}
- importorg.apache.spark.rdd.RDD
- importorg.apache.spark.sql.{Row,SQLContext}
- importorg.apache.spark.storage.StorageLevel
- importorg.apache.spark.{SparkConf,SparkContext}
- importorg.jgrapht.alg.BronKerboschCliqueFinder
- importorg.jgrapht.graph.{DefaultEdge,SimpleGraph}
- importscala.collection.JavaConverters._
- importscala.collection.mutable
- objectApplicationTitan{
- defmain(args:Array[String]){
- valprop=newProperties()
- prop.load(getClass.getResourceAsStream("/config.properties"))
- valgraph_data_path=prop.getProperty("graph_data_path")
- valout_path=prop.getProperty("out_path")
- valck_path=prop.getProperty("ck_path")
- valcount=Integer.parseInt(prop.getProperty("count"))
- valnumIter=Integer.parseInt(prop.getProperty("numIter"))
- valalgorithm=Integer.parseInt(prop.getProperty("algorithm"))
- valpercent=Integer.parseInt(prop.getProperty("percent"))
- valconf=newSparkConf()
- try{
- Runtime.getRuntime.exec("hdfsdfs-rm-r"+out_path)
- //Runtime.getRuntime.exec("cmd.exe/Crd/s/q"+out_path)
- }catch{
- caseex:Exception=>
- ex.printStackTrace(System.out)
- }
- prop.stringPropertyNames().asScala.foreach(s=>{
- if(s.startsWith("spark")){
- conf.set(s,prop.getProperty(s))
- }
- })
- conf.registerKryoClasses(Array(getClass))
- valsc=newSparkContext(conf)
- sc.setLogLevel("ERROR")
- sc.setCheckpointDir(ck_path)
- valsqlc=newSQLContext(sc)
- try{
- vale_df=sqlc.read
- //.json(graph_data_path)
- .parquet(graph_data_path)
- vare_rdd=e_df
- .mapPartitions(it=>{
- it.map({
- caseRow(dst:String,src:String)=>
- valsrc_long=src.toLong
- valdst_long=dst.toLong
- if(src_long<dst_long)(src_long,dst_long)else(dst_long,src_long)
- })
- }).distinct()
- e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
- varbc:Broadcast[Set[Long]]=null
- variter=0
- varbc_size=0
- //剪枝
- while(iter<=numIter){
- valtemp=e_rdd
- .flatMap(x=>List((x._1,1),(x._2,1)))
- .reduceByKey((x,y)=>x+y)
- .filter(x=>x._2>=count-1)
- .mapPartitions(it=>it.map(x=>x._1))
- valbc_value=temp.collect().toSet
- bc=sc.broadcast(bc_value)
- e_rdd=e_rdd.filter(x=>bc.value.contains(x._1)&&bc.value.contains(x._2))
- e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
- iter+=1
- if(bc_size!=0&&bc_value.size>=bc_size*percent/100){
- println("totaliter:"+iter)
- iter=Int.MaxValue
- }
- bc_size=bc_value.size
- }
- //构造图
- valedge:RDD[Edge[Long]]=e_rdd.mapPartitions(it=>it.map(x=>Edge(x._1,x._2)))
- valgraph=Graph.fromEdges(edge,0,StorageLevel.MEMORY_AND_DISK_SER,StorageLevel.MEMORY_AND_DISK_SER)
- //连通图
- valcc=graph.connectedComponents().vertices
- cc.persist(StorageLevel.MEMORY_AND_DISK_SER)
- cc.join(e_rdd)
- .mapPartitions(it=>it.map(x=>((math.random*10).toInt.toString.concat(x._2._1.toString),(x._1,x._2._2))))
- .aggregateByKey(List[(Long,Long)]())((list,v)=>list:+v,(list1,list2)=>list1:::list2)
- .mapPartitions(it=>it.map(x=>(x._1.substring(1),x._2)))
- .aggregateByKey(List[(Long,Long)]())((list1,list2)=>list1:::list2,(list3,list4)=>list3:::list4)
- .filter(x=>x._2.size>=count-1)
- .flatMap(x=>{
- if(algorithm==1)
- find(x,count)
- else
- find2(x,count)
- })
- .mapPartitions(it=>{
- it.map({
- caseset=>
- vartemp=""
- set.asScala.foreach(x=>temp+=x+",")
- temp.substring(0,temp.length-1)
- case_=>
- })
- })
- //.coalesce(1)
- .saveAsTextFile(out_path)
- }
- catch{
- caseex:Exception=>
- ex.printStackTrace(System.out)
- }
- sc.stop()
- }
- //自己实现的极大团算法
- deffind(x:(String,List[(Long,Long)]),count:Int):mutable.Set[util.Set[String]]={
- println(x._1+"|s|"+x._2.size)
- println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())
- valneighbors=newutil.HashMap[String,util.Set[String]]
- valfinder=newCliqueFinder(neighbors,count)
- x._2.foreach(r=>{
- valv1=r._1.toString
- valv2=r._2.toString
- if(neighbors.containsKey(v1)){
- neighbors.get(v1).add(v2)
- }else{
- valtemp=newutil.HashSet[String]()
- temp.add(v2)
- neighbors.put(v1,temp)
- }
- if(neighbors.containsKey(v2)){
- neighbors.get(v2).add(v1)
- }else{
- valtemp=newutil.HashSet[String]()
- temp.add(v1)
- neighbors.put(v2,temp)
- }
- })
- println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())
- finder.findMaxCliques().asScala
- }
- //jgrapht中的极大团算法
- deffind2(x:(String,List[(Long,Long)]),count:Int):Set[util.Set[String]]={
- println(x._1+"|s|"+x._2.size)
- println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())
- valto_clique=newSimpleGraph[String,DefaultEdge](classOf[DefaultEdge])
- x._2.foreach(r=>{
- valv1=r._1.toString
- valv2=r._2.toString
- to_clique.addVertex(v1)
- to_clique.addVertex(v2)
- to_clique.addEdge(v1,v2)
- })
- valfinder=newBronKerboschCliqueFinder(to_clique)
- vallist=finder.getAllMaximalCliques.asScala
- varresult=Set[util.Set[String]]()
- list.foreach(x=>{
- if(x.size()>=count)
- result=result+x
- })
- println("BKCliqueFinder---"+x._1+"---"+System.currentTimeMillis())
- result
- }
- }
//自己实现的极大团算法
- importjava.util.*;
- /**
- *[@author](https://my.oschina.net/arthor)mopspecial@gmail.com
- *[@date](https://my.oschina.net/u/2504391)2017/7/31
- */
- publicclassCliqueFinder{
- privateMap<String,Set<String>>neighbors;
- privateSet<String>nodes;
- privateSet<Set<String>>maxCliques=newHashSet<>();
- privateIntegerminSize;
- publicCliqueFinder(Map<String,Set<String>>neighbors,IntegerminSize){
- this.neighbors=neighbors;
- this.nodes=neighbors.keySet();
- this.minSize=minSize;
- }
- privatevoidbk3(Set<String>clique,List<String>candidates,List<String>excluded){
- if(candidates.isEmpty()&&excluded.isEmpty()){
- if(!clique.isEmpty()&&clique.size()>=minSize){
- maxCliques.add(clique);
- }
- return;
- }
- for(Strings:degeneracy_order(candidates)){
- List<String>new_candidates=newArrayList<>(candidates);
- new_candidates.retainAll(neighbors.get(s));
- List<String>new_excluded=newArrayList<>(excluded);
- new_excluded.retainAll(neighbors.get(s));
- Set<String>nextClique=newHashSet<>(clique);
- nextClique.add(s);
- bk2(nextClique,new_candidates,new_excluded);
- candidates.remove(s);
- excluded.add(s);
- }
- }
- privatevoidbk2(Set<String>clique,List<String>candidates,List<String>excluded){
- if(candidates.isEmpty()&&excluded.isEmpty()){
- if(!clique.isEmpty()&&clique.size()>=minSize){
- maxCliques.add(clique);
- }
- return;
- }
- Stringpivot=pick_random(candidates);
- if(pivot==null){
- pivot=pick_random(excluded);
- }
- List<String>tempc=newArrayList<>(candidates);
- tempc.removeAll(neighbors.get(pivot));
- for(Strings:tempc){
- List<String>new_candidates=newArrayList<>(candidates);
- new_candidates.retainAll(neighbors.get(s));
- List<String>new_excluded=newArrayList<>(excluded);
- new_excluded.retainAll(neighbors.get(s));
- Set<String>nextClique=newHashSet<>(clique);
- nextClique.add(s);
- bk2(nextClique,new_candidates,new_excluded);
- candidates.remove(s);
- excluded.add(s);
- }
- }
- privateList<String>degeneracy_order(List<String>innerNodes){
- List<String>result=newArrayList<>();
- Map<String,Integer>deg=newHashMap<>();
- for(Stringnode:innerNodes){
- deg.put(node,neighbors.get(node).size());
- }
- while(!deg.isEmpty()){
- Integermin=Collections.min(deg.values());
- StringminKey=null;
- for(Stringkey:deg.keySet()){
- if(deg.get(key).equals(min)){
- minKey=key;
- break;
- }
- }
- result.add(minKey);
- deg.remove(minKey);
- for(Stringk:neighbors.get(minKey)){
- if(deg.containsKey(k)){
- deg.put(k,deg.get(k)-1);
- }
- }
- }
- returnresult;
- }
- privateStringpick_random(List<String>random){
- if(random!=null&&!random.isEmpty()){
- returnrandom.get(0);
- }else{
- returnnull;
- }
- }
- publicSet<Set<String>>findMaxCliques(){
- this.bk3(newHashSet<>(),newArrayList<>(nodes),newArrayList<>());
- returnmaxCliques;
- }
- publicstaticvoidmain(String[]args){
- Map<String,Set<String>>neighbors=newHashMap<>();
- neighbors.put("0",newHashSet<>(Arrays.asList("1","2","3")));
- neighbors.put("1",newHashSet<>(Arrays.asList("0","2")));
- neighbors.put("2",newHashSet<>(Arrays.asList("0","1","3","6")));
- neighbors.put("3",newHashSet<>(Arrays.asList("0","2","4","5")));
- neighbors.put("4",newHashSet<>(Arrays.asList("3","5","6")));
- neighbors.put("5",newHashSet<>(Arrays.asList("3","4","6")));
- neighbors.put("6",newHashSet<>(Arrays.asList("2","4","5")));
- neighbors.put("7",newHashSet<>(Arrays.asList("6")));
- CliqueFinderfinder=newCliqueFinder(neighbors,3);
- finder.bk3(newHashSet<>(),newArrayList<>(neighbors.keySet()),newArrayList<>());
- System.out.println(finder.maxCliques);
- }
- }