-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexercise4.java
More file actions
73 lines (55 loc) · 2.75 KB
/
exercise4.java
File metadata and controls
73 lines (55 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package exercise_4;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.graphframes.GraphFrame;
import org.graphframes.lib.PageRank;
public class Exercise_4 {
public static void wikipedia(JavaSparkContext ctx, SQLContext sqlCtx) {
// Define the path that contains the vertex txt
String pathVertex = "src/main/resources/wiki-vertices.txt";
// Create Spark RDD for the vertexes out of the txt file
JavaRDD<String> vertexRDD = ctx.textFile(pathVertex);
// Split rows in the RDD and map to a Row type JavaRDD
JavaRDD<Row> RowVertexRDD = vertexRDD.map(line -> line.split("\t")).map(line -> RowFactory.create(line));
// Create schema for the vertexes
StructType VertexSchema = new StructType(new StructField[]{
new StructField("id", DataTypes.StringType, true, new MetadataBuilder().build()),
new StructField("name", DataTypes.StringType, true, new MetadataBuilder().build())
});
// Create Spark DataFrame for the vertexes
Dataset<Row> Vertex = sqlCtx.createDataFrame(RowVertexRDD, VertexSchema);
// Define the path that contains the vertex txt
String pathVertexEdges = "src/main/resources/wiki-edges.txt";
// Create Spark RDD for the edges out of the txt file
JavaRDD<String> vertexEdgesRDD = ctx.textFile(pathVertexEdges);
// Split rows in the RDD and map to a Row type JavaRDD
JavaRDD<Row> RowVertexEdgesRDD = vertexEdgesRDD.map(line -> line.split("\t")).map(line -> RowFactory.create(line));
// Create schema for the edges
StructType EdgesSchema = new StructType(new StructField[]{
new StructField("src", DataTypes.StringType, true, new MetadataBuilder().build()),
new StructField("dst", DataTypes.StringType, true, new MetadataBuilder().build())
});
// Create Spark DataFrame for the edges
Dataset<Row> Edges = sqlCtx.createDataFrame(RowVertexEdgesRDD, EdgesSchema);
// Create graph
GraphFrame myGraph = GraphFrame.apply(Vertex,Edges);
System.out.println(myGraph);
//myGraph.edges().show();
//myGraph.vertices().show();
// Apply PageRank algorithm to the graph
System.out.println("Top 10 most relevant Wikipedia articles:");
System.out.println("*****");
PageRank myPageRank = myGraph.pageRank().resetProbability(0.15).maxIter(10);
myPageRank.run().vertices().select("id", "name", "pageRank").show(10);
System.out.println("*****");
System.out.println("PageRank successfully finished");
}
}