`
sunbin
  • 浏览: 341626 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

spark 统计黑名单之外的数据次数

 
阅读更多

此处代码还有三个功能

1、spark流式文件处理

2、全局统计

3、广播变量

以下代码可运行

 

package com.sunbin.stream;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;
/**
 * 模拟统计最近20秒内 读取的单词的个数
 * 
 * 
 * @author root
 *
 */
public class WindowOnStreaming {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WindowOnStreaming");
		JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
		jsc.checkpoint("checkpoint");
		JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9999);
		
		List<Tuple2<String, Boolean>> blacklist = new ArrayList<Tuple2<String, Boolean>>();
		blacklist.add(new Tuple2<String, Boolean>("zhangsan2", true));
		blacklist.add(new Tuple2<String, Boolean>("lisi2", true));
		/**
		 * 黑名单放入广播变量
		 */
		final Broadcast<List<Tuple2<String, Boolean>>> blacklistRDD = jsc.sparkContext().broadcast(blacklist);
		
		/**
		 * 广播变量在transform中处理
		 */
		JavaDStream<String> filterlines = lines.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
			public JavaRDD<String> call(JavaRDD<String> str) throws Exception {
				JavaRDD<String> strretu = str.filter(new Function<String, Boolean>() {
					public Boolean call(String arg0) throws Exception {
						return !blacklistRDD.getValue().contains(new Tuple2<String, Boolean>(arg0, true));
					}
				});
				return strretu;
			}
		});
		
		JavaPairDStream<String, Integer> words = filterlines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			public Iterable<Tuple2<String, Integer>> call(String lines)throws Exception {
				ArrayList<Tuple2<String, Integer>> tuplelist = new ArrayList<Tuple2<String,Integer>>(); 
				String[] split = lines.split(",");
				for(String word : split){
					tuplelist.add(new Tuple2<String, Integer>(word, 1));
				}
				System.out.println("------读取了一次-----");
				return tuplelist;
			}
		});
		
		/**
		 * 全局统计,使用reduceByKeyAndWindow,此次必须配合checkpoint使用
		 */
		JavaPairDStream<String, Integer> reduceByKeyAndWindow = words.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
			
			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1+v2;
			}
		}, new Function2<Integer, Integer, Integer>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1-v2;
			}
			/**
			 *  每10秒计算最30秒的值
			 */
		}, Durations.seconds(20), Durations.seconds(10));
		/**
		 * 为了方便测试:
		 * 窗口宽度:20秒
		 * 窗口滑动:10秒
		 * 我们输入数据的时候:
		 * 第一次5秒输入:a,b,c
		 * 第二次5秒输入:d,e,f
		 * 第三次5秒不输入
		 * 第四次5秒不输入
		 * 
		 * 然后看文件,观察在第30秒的时候是不是文件中的数据重新都没有
		 * 
		 * 下面我们将结果写入文件,方便查看
		 */
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
		String format = sdf.format(new Date());
		reduceByKeyAndWindow.print();
//		reduceByKeyAndWindow.dstream().saveAsTextFiles("savedata/prefix"+format+"--", "txt");
		
		jsc.start();
		jsc.awaitTermination();
		jsc.close();
	}
}

 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.sunbin</groupId>
  <artifactId>stream</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>stream</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
 
 
	<repositories>
		<repository>
			<!-- Maven 自带的中央仓库使用的Id为central 如果其他的仓库声明也是用该Id就会覆盖中央仓库的配置 -->
			<id>cdh</id>
			<name>cdh</name>
			<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
			<layout>default</layout>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
	</repositories>
		
	<dependencies>
	
	    <dependency>
	      <groupId>junit</groupId>
	      <artifactId>junit</artifactId>
	      <version>3.8.1</version>
	      <scope>test</scope>
	    </dependency>
		<dependency>
		    <groupId>org.apache.kafka</groupId>
		    <artifactId>kafka_2.10</artifactId>
		    <version>0.9.0.0</version>
		</dependency>
		
		<dependency>
		    <groupId>com.twitter</groupId>
		    <artifactId>parquet-hadoop</artifactId>
		    <version>1.5.0-cdh5.8.3</version>
		</dependency>
		
		<dependency>
			<groupId>mysql</groupId>
    		<artifactId>mysql-connector-java</artifactId>
    		<version>5.1.8</version>
		</dependency>
		
		<dependency>
			<groupId>org.apache.spark</groupId>
		    <artifactId>spark-assembly_2.10</artifactId>
		    <version>1.6.0-cdh5.8.3</version>
		    <scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>1.6.0-cdh5.8.3</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>
	
	<build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>sparksql.dataframe.CreateDataFrameFromHive</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>  
</project>

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics