此处代码还有三个功能
1、spark流式文件处理
2、全局统计
3、广播变量
以下代码可运行
package com.sunbin.stream; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; /** * 模拟统计最近20秒内 读取的单词的个数 * * * @author root * */ public class WindowOnStreaming { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WindowOnStreaming"); JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5)); jsc.checkpoint("checkpoint"); JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9999); List<Tuple2<String, Boolean>> blacklist = new ArrayList<Tuple2<String, Boolean>>(); blacklist.add(new Tuple2<String, Boolean>("zhangsan2", true)); blacklist.add(new Tuple2<String, Boolean>("lisi2", true)); /** * 黑名单放入广播变量 */ final Broadcast<List<Tuple2<String, Boolean>>> blacklistRDD = jsc.sparkContext().broadcast(blacklist); /** * 广播变量在transform中处理 */ JavaDStream<String> filterlines = lines.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { public JavaRDD<String> call(JavaRDD<String> str) throws Exception { JavaRDD<String> strretu = str.filter(new Function<String, Boolean>() { public Boolean call(String arg0) throws Exception { return !blacklistRDD.getValue().contains(new Tuple2<String, Boolean>(arg0, true)); } }); return strretu; } }); JavaPairDStream<String, Integer> words = filterlines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Iterable<Tuple2<String, Integer>> call(String lines)throws Exception { ArrayList<Tuple2<String, Integer>> tuplelist = new ArrayList<Tuple2<String,Integer>>(); String[] split = lines.split(","); for(String word : split){ tuplelist.add(new Tuple2<String, Integer>(word, 1)); } System.out.println("------读取了一次-----"); return tuplelist; } }); /** * 全局统计,使用reduceByKeyAndWindow,此次必须配合checkpoint使用 */ JavaPairDStream<String, Integer> reduceByKeyAndWindow = words.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }, new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Integer call(Integer v1, Integer v2) throws Exception { return v1-v2; } /** * 每10秒计算最30秒的值 */ }, Durations.seconds(20), Durations.seconds(10)); /** * 为了方便测试: * 窗口宽度:20秒 * 窗口滑动:10秒 * 我们输入数据的时候: * 第一次5秒输入:a,b,c * 第二次5秒输入:d,e,f * 第三次5秒不输入 * 第四次5秒不输入 * * 然后看文件,观察在第30秒的时候是不是文件中的数据重新都没有 * * 下面我们将结果写入文件,方便查看 */ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); String format = sdf.format(new Date()); reduceByKeyAndWindow.print(); // reduceByKeyAndWindow.dstream().saveAsTextFiles("savedata/prefix"+format+"--", "txt"); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sunbin</groupId> <artifactId>stream</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>stream</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <repositories> <repository> <!-- Maven 自带的中央仓库使用的Id为central 如果其他的仓库声明也是用该Id就会覆盖中央仓库的配置 --> <id>cdh</id> <name>cdh</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> <layout>default</layout> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.5.0-cdh5.8.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-assembly_2.10</artifactId> <version>1.6.0-cdh5.8.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.0-cdh5.8.3</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>sparksql.dataframe.CreateDataFrameFromHive</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
相关推荐
统计本地文件中单词出现次数 二.操作流程 1.读取外部文件创建JavaRDD; 2.通过flatMap转化操作切分字符串,获取单词新JavaRDD; 3.通过mapToPair,以key为单词,value统一为1的键值JavaPairRDD; 4.通过reduceByKey...
基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的离线统计移动端数据分析源码.zip基于Java+spark的...
Spark统计电影评分数据:movies.dat,retings.dat,users.dat
Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据Spark 案例测试数据...
该系统利用Spark的微批处理能力,通过TCP套接字接收实时数据流,并根据预定义的黑名单对数据进行过滤。在设计中,黑名单以广播变量的形式在集群中分发,以减少数据传输并提高处理效率。 系统的核心在于使用`...
该项目是大三下学期的课程设计,使用的数据集来自知名数据网站 Kaggle 的 tmdb-movie-metadata 电影数据集,以Python为编程语言,使用大数据框架Spark对数据进行了预处理,然后分别从多个方面对数据进行了分类和分析...
Spark 快速大数据分析Spark 快速大数据分析Spark 快速大数据分析Spark 快速大数据分析Spark 快速大数据分析Spark 快速大数据分析
Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...
该项目是大三下学期的课程设计,选取了共541909条数据,以Python为编程语言,使用大数据框架Spark对数据进行了预处理,然后分别从多个方面对数据进行了分类和分析,并对分析结果进行可视化。里面包含我的课程设计...
java spark淘宝大数据分析可视化系统(源码+数据+报告)
spark——大型集群快速和通用数据处理 对与当前大数据的学习很有参考价值
本次作业要完成在Hadoop平台搭建完成的基础上,利用Spark组件完成文本词频统计的任务,目标是学习Scala语言,理解Spark编程思想,基于Spark 思想,使用IDEA编写SparkWordCount程序,并能够在spark-shell中执行代码和...
sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失
Spark是一个通用的并行分布式计算框架,...第6章介绍如何用Spark分布式处理你的数据。第7章介绍如何设置Shark,将Hive查询集成到你的Spark作业中来。第8章介绍如何测试Spark作业。第九章介绍如何提升Spark任务的性能。
该项目是我大三下学期的课程设计,它是以和鲸社区的信用卡评分模型构建数据为数据集,以Python为编程语言,使用大数据框架Spark对数据进行处理分析,并对分析结果进行可视化。里面包含我的课程设计报告和完整的代码...
Spark的共享单车数据存储-Spark的共享单车数据存储系统-Spark的共享单车数据存储系统源码-Spark的共享单车数据存储管理系统-Spark的共享单车数据存储管理系统java代码-Spark的共享单车数据存储系统设计与实现-基于...
[毕业设计]基于Spark网易云音乐数据分析 .zip 完整代码,可运行
2. Spark读取文件系统的数据 (1) 在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数; 图3 spark统计行数 (2) 在spark-shell中读取HDFS系统文件“/user/hadoop/test.txt”...
使用spark读取hbase中的数据,并插入到mysql中