【转载】spark-streaming 连接kafka

Spark25ma.cn 发表了文章 • 0 个评论 • 82 次浏览 • 2018-11-08 09:18 • 来自相关话题

原文连接:https://mp.weixin.qq.com/s/Z01tn9Q1ZK4cnkXNTPtLQg
 推荐系统的在线部分往往使用spark-streaming实现,这是很重要的一个环节。

在线流程的实时数据一般有是从kafka 获取消息到spark streaming

spark连接kafka两种方式在面试中会经常被问到,说明这是重点~下面为大家介绍一下这两种方法:

第一种方式:Receiver模式 又称kafka高级api模式






效果:SparkStreaming中的Receivers,恰好Kafka有发布/订阅 ,然而:此种方式企业不常用,说明有BUG,不符合企业需求。因为:接收到的数据存储在Executor的内存,会出现数据漏处理或者多处理状况 

简单的理解就是kafka把消息全部分装好,提供给spark去调用,本来kafka的消息分布在不同的partition上面,相当于做了一步数据合并,在发送给spark,故spark可以设置excutor个数去消费这部分数据,效率相对慢一些

代码示例:

object ReceiverKafkaWordCount {

 Logger.getLogger("org").setLevel(Level.ERROR)

 def main(args: Array[String]): Unit = {

   val Array(brokers, topics) = Array(Conf.KAFKA_BROKER, Conf.TEST_TOPIC)

   // Create context with 2 second batch interval

   val conf = new SparkConf()

     .setMaster("local")

     .setAppName("OnlineStreamHobby") //设置本程序名称

//      .set("auto.offset.reset","smallest")

   val ssc = new StreamingContext(conf, Seconds(2))

   //    从kafka取数据

   val kafkaParams: Map[String, String] = Map[String, String](

//      "auto.offset.reset" -> "smallest", //自动将偏移重置为最早的偏移

           "zookeeper.connect" -> Conf.ZK_HOST,

//      "bootstrap.servers" -> Common.KAFKA_BROKER_LIST,

     "group.id" -> "test"

   )

   val numThreads = 1

   val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

   val fact_streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2).map(_._2)

//    fact_streaming.print()

   val words = fact_streaming.flatMap(_.split(" "))

   val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)

   wordCounts.print()

   ssc.checkpoint(".")

   //启动spark并设置执行时间

   ssc.start()

   ssc.awaitTermination()

 }

}

第二种方式:Direct模式 又称kafka低级api模式









效果:每次到Topic的每个partition依据偏移量进行获取数据,拉取数据以后进行处理,可以实现高可用

解释:在Spark 1.3中引入了这种新的无接收器“直接”方法,以确保更强大的端到端保证。这种方法不是使用接收器来接收数据,而是定期查询Kafka在每个topic+分partition中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。当处理数据的作业启动时,Kafka简单的客户API用于读取Kafka中定义的偏移范围(类似于从文件系统读取文件)。请注意,此功能在Spark 1.3中为Scala和Java API引入

简单的理解就是spark直接从kafka底层中的partition直接获取消息,相对于Receiver模式少了一步,效率更快。但是这样一来spark中的executor的工作的个数就为kafka中的partition一致,设置再多的executor都不工作,同时偏移量也需要自己维护




代码示例:

object DirectTest {

 def main(args: Array[String]) {

   val conf = new SparkConf().setAppName("kafka direct test").setMaster("local")

   val sc = new SparkContext(conf)

   val ssc = new StreamingContext(sc,Seconds(10))

   //kafka基本参数,yourBrokers你的brokers集群

   val kafkaParams = Map("metadata.broker.list" -> Conf.KAFKA_BROKER)

   val topic = "test"

   val customGroup = "testGroup"

   //新建一个zkClient,zk是你的zk集群,和broker一样,也是"IP:端口,IP端口..."

   /**

     *如果你使用val zkClient = new ZKClient(zk)新建zk客户端,

     *在后边读取分区信息的文件数据时可能会出现错误

     *org.I0Itec.zkclient.exception.ZkMarshallingError:

     *  java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) ..

     *那么使用我的这个新建方法就可以了,指定读取数据时的序列化方式

     **/

   val zkClient = new ZkClient(Conf.ZK_HOST, Integer.MAX_VALUE, 10000,ZKStringSerializer)

   //获取zk下该消费者的offset存储路径,一般该路径是/consumers/test_spark_streaming_group/offsets/topic_name

   val topicDirs = new ZKGroupTopicDirs(customGroup, topic)

   val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")

   //设置第一批数据读取的起始位置

   var fromOffsets: Map[TopicAndPartition, Long] = Map()

   var directKafkaStream : InputDStream[(String,String)] = null

   //如果zk下有该消费者的offset信息,则从zk下保存的offset位置开始读取,否则从最新的数据开始读取(受auto.offset.reset设置影响,此处默认)

   if (children > 0) {

     //将zk下保存的该主题该消费者的每个分区的offset值添加到fromOffsets中

     for (i <- 0 until children) {

       val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/$i")

       val tp = TopicAndPartition(topic, i)

       //将不同 partition 对应的 offset 增加到 fromOffsets 中

       fromOffsets += (tp -> partitionOffset.toLong)

       println("@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")

       val messageHandler = (mmd: MessageAndMetadata[String, String]) =>  (mmd.topic,mmd.message())

       directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String,String)](ssc, kafkaParams, fromOffsets, messageHandler)

     }

   }else{

     directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))

   }

   /**

     *上边已经实现从zk上保存的值开始读取数据

     *下边就是数据处理后,再讲offset值写会到zk上

     */

   //用于保存当前offset范围

   var offsetRanges: Array[OffsetRange]  = Array.empty

   val directKafkaStream1 = directKafkaStream.transform { rdd =>

     //取出该批数据的offset值

     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

     rdd

   }.map(_._2)

   directKafkaStream1.foreachRDD(rdd=>{

     //数据处理完毕后,将offset值更新到zk集群

     for (o <- offsetRanges) {

       val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"

       ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)

     }

     rdd.foreach(println)

   })

   ssc.start()

   ssc.awaitTermination()

 }

}
 
如有不正确的地方,还请大家多多指教! 查看全部
原文连接:https://mp.weixin.qq.com/s/Z01tn9Q1ZK4cnkXNTPtLQg
 推荐系统的在线部分往往使用spark-streaming实现,这是很重要的一个环节。

在线流程的实时数据一般有是从kafka 获取消息到spark streaming

spark连接kafka两种方式在面试中会经常被问到,说明这是重点~下面为大家介绍一下这两种方法:

第一种方式:Receiver模式 又称kafka高级api模式

20181108091402.jpg


效果:SparkStreaming中的Receivers,恰好Kafka有发布/订阅 ,然而:此种方式企业不常用,说明有BUG,不符合企业需求。因为:接收到的数据存储在Executor的内存,会出现数据漏处理或者多处理状况 

简单的理解就是kafka把消息全部分装好,提供给spark去调用,本来kafka的消息分布在不同的partition上面,相当于做了一步数据合并,在发送给spark,故spark可以设置excutor个数去消费这部分数据,效率相对慢一些

代码示例:

object ReceiverKafkaWordCount {

 Logger.getLogger("org").setLevel(Level.ERROR)

 def main(args: Array[String]): Unit = {

   val Array(brokers, topics) = Array(Conf.KAFKA_BROKER, Conf.TEST_TOPIC)

   // Create context with 2 second batch interval

   val conf = new SparkConf()

     .setMaster("local")

     .setAppName("OnlineStreamHobby") //设置本程序名称

//      .set("auto.offset.reset","smallest")

   val ssc = new StreamingContext(conf, Seconds(2))

   //    从kafka取数据

   val kafkaParams: Map[String, String] = Map[String, String](

//      "auto.offset.reset" -> "smallest", //自动将偏移重置为最早的偏移

           "zookeeper.connect" -> Conf.ZK_HOST,

//      "bootstrap.servers" -> Common.KAFKA_BROKER_LIST,

     "group.id" -> "test"

   )

   val numThreads = 1

   val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

   val fact_streaming = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2).map(_._2)

//    fact_streaming.print()

   val words = fact_streaming.flatMap(_.split(" "))

   val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)

   wordCounts.print()

   ssc.checkpoint(".")

   //启动spark并设置执行时间

   ssc.start()

   ssc.awaitTermination()

 }

}

第二种方式:Direct模式 又称kafka低级api模式


20181108091420.jpg




效果:每次到Topic的每个partition依据偏移量进行获取数据,拉取数据以后进行处理,可以实现高可用

解释:在Spark 1.3中引入了这种新的无接收器“直接”方法,以确保更强大的端到端保证。这种方法不是使用接收器来接收数据,而是定期查询Kafka在每个topic+分partition中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。当处理数据的作业启动时,Kafka简单的客户API用于读取Kafka中定义的偏移范围(类似于从文件系统读取文件)。请注意,此功能在Spark 1.3中为Scala和Java API引入

简单的理解就是spark直接从kafka底层中的partition直接获取消息,相对于Receiver模式少了一步,效率更快。但是这样一来spark中的executor的工作的个数就为kafka中的partition一致,设置再多的executor都不工作,同时偏移量也需要自己维护




代码示例:

object DirectTest {

 def main(args: Array[String]) {

   val conf = new SparkConf().setAppName("kafka direct test").setMaster("local")

   val sc = new SparkContext(conf)

   val ssc = new StreamingContext(sc,Seconds(10))

   //kafka基本参数,yourBrokers你的brokers集群

   val kafkaParams = Map("metadata.broker.list" -> Conf.KAFKA_BROKER)

   val topic = "test"

   val customGroup = "testGroup"

   //新建一个zkClient,zk是你的zk集群,和broker一样,也是"IP:端口,IP端口..."

   /**

     *如果你使用val zkClient = new ZKClient(zk)新建zk客户端,

     *在后边读取分区信息的文件数据时可能会出现错误

     *org.I0Itec.zkclient.exception.ZkMarshallingError:

     *  java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) ..

     *那么使用我的这个新建方法就可以了,指定读取数据时的序列化方式

     **/

   val zkClient = new ZkClient(Conf.ZK_HOST, Integer.MAX_VALUE, 10000,ZKStringSerializer)

   //获取zk下该消费者的offset存储路径,一般该路径是/consumers/test_spark_streaming_group/offsets/topic_name

   val topicDirs = new ZKGroupTopicDirs(customGroup, topic)

   val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")

   //设置第一批数据读取的起始位置

   var fromOffsets: Map[TopicAndPartition, Long] = Map()

   var directKafkaStream : InputDStream[(String,String)] = null

   //如果zk下有该消费者的offset信息,则从zk下保存的offset位置开始读取,否则从最新的数据开始读取(受auto.offset.reset设置影响,此处默认)

   if (children > 0) {

     //将zk下保存的该主题该消费者的每个分区的offset值添加到fromOffsets中

     for (i <- 0 until children) {

       val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/$i")

       val tp = TopicAndPartition(topic, i)

       //将不同 partition 对应的 offset 增加到 fromOffsets 中

       fromOffsets += (tp -> partitionOffset.toLong)

       println("@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")

       val messageHandler = (mmd: MessageAndMetadata[String, String]) =>  (mmd.topic,mmd.message())

       directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String,String)](ssc, kafkaParams, fromOffsets, messageHandler)

     }

   }else{

     directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))

   }

   /**

     *上边已经实现从zk上保存的值开始读取数据

     *下边就是数据处理后,再讲offset值写会到zk上

     */

   //用于保存当前offset范围

   var offsetRanges: Array[OffsetRange]  = Array.empty

   val directKafkaStream1 = directKafkaStream.transform { rdd =>

     //取出该批数据的offset值

     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

     rdd

   }.map(_._2)

   directKafkaStream1.foreachRDD(rdd=>{

     //数据处理完毕后,将offset值更新到zk集群

     for (o <- offsetRanges) {

       val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"

       ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)

     }

     rdd.foreach(println)

   })

   ssc.start()

   ssc.awaitTermination()

 }

}
 
如有不正确的地方,还请大家多多指教!

Yii2实现动态修改组件配置信息

php25ma.cn 发表了文章 • 0 个评论 • 140 次浏览 • 2018-10-12 09:48 • 来自相关话题

本教程是在yii2.0 及以上版本进行试验,其他版本的大家可以根据自身配置修改,原理都是一样
我遇到的问题是在处理CDN组件配置的时候,默认是不适用cdn,配置如下:


然而我们在后台有准备开启CDN功能:


怎么办呢,我们可以直接在配置文件初始化的时候加上如下代码即可:
         



ps: 在使用合并yii::configure() 合并配置的时候,如果你只想合并某一个组件的配置,那第一个参数就是 yii::$app->组件名。如果需要合并多个配置的话 只需要自己组装一个数组。以componets为下标,然后二维数组就是组件名去配置就可以的
如果有不明白的欢迎加QQ 405108246  查看全部
本教程是在yii2.0 及以上版本进行试验,其他版本的大家可以根据自身配置修改,原理都是一样
  • 我遇到的问题是在处理CDN组件配置的时候,默认是不适用cdn,配置如下:
    QQ图片20181012093035.png
  • 然而我们在后台有准备开启CDN功能:
    QQ图片20181012093302.png
  • 怎么办呢,我们可以直接在配置文件初始化的时候加上如下代码即可:

         
QQ截图20181012094903.png

ps: 在使用合并yii::configure() 合并配置的时候,如果你只想合并某一个组件的配置,那第一个参数就是 yii::$app->组件名。如果需要合并多个配置的话 只需要自己组装一个数组。以componets为下标,然后二维数组就是组件名去配置就可以的
  • 如果有不明白的欢迎加QQ 405108246 

搭建 GIT 服务器教程2017.12.11

回复

Linux25ma.cn 发起了问题 • 1 人关注 • 0 个回复 • 778 次浏览 • 2017-12-11 10:54 • 来自相关话题

python学习笔记——字符串格式化

python基础25ma.cn 发表了文章 • 0 个评论 • 563 次浏览 • 2017-06-12 09:37 • 来自相关话题

我们在输出字符串的时候,如果想对输出的内容进行一些整理,比如把几段字符拼接起来,或者把一段字符插入到另一段字符中间,就需要用到字符串的格式化输出。
 
先从简单的开始,如果你想把两段字符连起来输出
str1 = 'www.'
str2 = '25ma.cn'

你可以
print str1 + str2







或者还可以把字符变量一个字符串相加
print 'very' + str1
print str1 + ' and ' + str2
 
但如果你想要把一个数字加到文字后面输出,比如这样
num = 18
print 'My age is' + num
程序就会报错。因为字符和数字不能直接用+相加。


一种解决方法是,用str()把数字转换成字符串
print 'My age is' + str(18)

num = 18
print 'My age is' + str(num)
 
 
还有一种方法,就是用%对字符串进行格式化
num = 18
print 'My age is %d' % num
输出的时候,%d会被%后面的值替换。输出
My age is 18


这里,%d只能用来替换整数。如果你想格式化的数值是小数,要用%f
print ‘Price is %f’ % 4.99
输出
Price is 4.990000


如果你想保留两位小数,需要在f前面加上条件:%.2f
print ‘Price is %.2f’ % 4.99
输出
Price is 4.99
 
另外,可以用%s来替换一段字符串
name = 'Crossin'
print '%s is a good teacher.' % name
输出
Crossin is a good teacher.


或者
print 'Today is %s.' % 'Friday' 
输出
Today is Friday.


注意区分:有引号的表示一段字符,没有引号的就是一个变量,这个变量可能是字符,也可能是数字,但一定要和%所表示的格式相一致。
 
 
现在,试试看用字符串格式化改进一下之前你写的小游戏。比如你输了一个数字72,程序会回答你
72 is too small.
或者
Bingo, 72 is the right answer! 查看全部
我们在输出字符串的时候,如果想对输出的内容进行一些整理,比如把几段字符拼接起来,或者把一段字符插入到另一段字符中间,就需要用到字符串的格式化输出。
 
先从简单的开始,如果你想把两段字符连起来输出
str1 = 'www.'
str2 = '25ma.cn'

你可以
print str1 + str2

QQ截图20170612093612.png



或者还可以把字符变量一个字符串相加
print 'very' + str1
print str1 + ' and ' + str2
 
但如果你想要把一个数字加到文字后面输出,比如这样
num = 18
print 'My age is' + num
程序就会报错。因为字符和数字不能直接用+相加。


一种解决方法是,用str()把数字转换成字符串
print 'My age is' + str(18)

num = 18
print 'My age is' + str(num)
 
 
还有一种方法,就是用%对字符串进行格式化
num = 18
print 'My age is %d' % num
输出的时候,%d会被%后面的值替换。输出
My age is 18


这里,%d只能用来替换整数。如果你想格式化的数值是小数,要用%f
print ‘Price is %f’ % 4.99
输出
Price is 4.990000


如果你想保留两位小数,需要在f前面加上条件:%.2f
print ‘Price is %.2f’ % 4.99
输出
Price is 4.99
 
另外,可以用%s来替换一段字符串
name = 'Crossin'
print '%s is a good teacher.' % name
输出
Crossin is a good teacher.


或者
print 'Today is %s.' % 'Friday' 
输出
Today is Friday.


注意区分:有引号的表示一段字符,没有引号的就是一个变量,这个变量可能是字符,也可能是数字,但一定要和%所表示的格式相一致。
 
 
现在,试试看用字符串格式化改进一下之前你写的小游戏。比如你输了一个数字72,程序会回答你
72 is too small.
或者
Bingo, 72 is the right answer!

python学习笔记——字符串

python基础25ma.cn 发表了文章 • 0 个评论 • 559 次浏览 • 2017-06-12 09:27 • 来自相关话题

字符串就是一组字符的序列(序列!又见序列!还记得我说过,range就是产生一组整数序列。今天仍然不去细说它。),它一向是编程中的常见问题。之前我们用过它,以后我们还要不停地用它。
 
python中最常用的字符串表示方式是单引号(‘’)和双引号("")。我还是要再说:一定得是英文字符!

'string'和“string”的效果是一样的。


可以直接输出一个字符串print ‘25ma.cn’


也可以用一个变量来保存字符串,然后输出str = ‘bad’print str


如果你想表示一段带有英文单引号或者双引号的文字,那么表示这个字符串的引号就要与内容区别开。


内容带有单引号,就用双引号表示"It's good"
反之亦然
‘You are a "BAD" man’
 
python中还有一种表示字符串的方法:三个引号(‘’‘)或者(""")


在三个引号中,你可以方便地使用单引号和双引号,并且可以直接换行
'''
"What's your name?" I asked.
"I'm Han Meimei."
'''
 
还有一种在字符串中表示引号的方法,就是用\,可以不受引号的限制


\'表示单引号,\"表示双引号
‘I\'m a \"good\" teacher’


\被称作转译字符,除了用来表示引号,还有比如用
\\表示字符串中的\
\n表示字符串中的换行


\还有个用处,就是用来在代码中换行,而不影响输出的结果:
"this is the\
same line"


这个字符串仍然只有一行,和
"this is thesame line"
是一样的,只是在代码中换了行。当你要写一行很长的代码时,这个会派上用场。
 
作业时间】用print输出以下文字:


1.
He said, "I'm yours!"


2.
\\_v_//


3.
Stay hungry,
stay foolish.
-- Steve Jobs


4.
*
***
*****
***
* 查看全部
字符串就是一组字符的序列(序列!又见序列!还记得我说过,range就是产生一组整数序列。今天仍然不去细说它。),它一向是编程中的常见问题。之前我们用过它,以后我们还要不停地用它。
 
python中最常用的字符串表示方式是单引号(‘’)和双引号("")。我还是要再说:一定得是英文字符!

'string'和“string”的效果是一样的。


可以直接输出一个字符串print ‘25ma.cn’


也可以用一个变量来保存字符串,然后输出str = ‘bad’print str


如果你想表示一段带有英文单引号或者双引号的文字,那么表示这个字符串的引号就要与内容区别开。


内容带有单引号,就用双引号表示"It's good"
反之亦然
‘You are a "BAD" man’
 
python中还有一种表示字符串的方法:三个引号(‘’‘)或者(""")


在三个引号中,你可以方便地使用单引号和双引号,并且可以直接换行
'''
"What's your name?" I asked.
"I'm Han Meimei."
'''
 
还有一种在字符串中表示引号的方法,就是用\,可以不受引号的限制


\'表示单引号,\"表示双引号
‘I\'m a \"good\" teacher’


\被称作转译字符,除了用来表示引号,还有比如用
\\表示字符串中的\
\n表示字符串中的换行


\还有个用处,就是用来在代码中换行,而不影响输出的结果:
"this is the\
same line"


这个字符串仍然只有一行,和
"this is thesame line"
是一样的,只是在代码中换了行。当你要写一行很长的代码时,这个会派上用场。
 
作业时间】用print输出以下文字:


1.
He said, "I'm yours!"


2.
\\_v_//


3.
Stay hungry,
stay foolish.
-- Steve Jobs


4.
*
***
*****
***
*

mapreduce执行详细流程

回复

MapReducewolverine 发起了问题 • 0 人关注 • 0 个回复 • 854 次浏览 • 2017-06-11 12:35 • 来自相关话题

hadoop开发问题答案汇总【持续更新ing】

Hadoop生态圈25ma.cn 发表了文章 • 0 个评论 • 723 次浏览 • 2017-06-08 00:35 • 来自相关话题

wordcount 对中文统计按空格,中文怎么统计,单个字吗?
使用结巴(jieba)python实现的中文分词来处理中文统计


1.mapreduce 的离线数据源来自哪呢?存储在哪?
数据源可以来自很多地方,比如日志系统,访客行为,或者采集的文本文件等等
数据进行处理过后可以存放到关系型数据库(mysql,oracle)或者非关系型数据库(mongoDB,Hbash)以及分布式列式存储数据库(HBase)等都可以。


2.tcp/udp 的区别?
TCP(Transmission Control Protocol,传输控制协议)是基于连接的协议,也就是说,在正式收发数据前,必须和对方建立可靠的连接。一个TCP连接必须要经过三次“对话”才能建立起来,其中的过程非常复杂,我们这里只做简单、形象的介绍,你只要做到能够理解这个过程即可。我们来看看这三次对话的简单过程:主机A向主机B发出连接请求数据包:“我想给你发数据,可以吗?”,这是第一次对话;主机B向主机A发送同意连接和要求同步(同步就是两台主机一个在发送,一个在接收,协调工作)的数据包:“可以,你什么时候发?”,这是第二次对话;主机A再发出一个数据包确认主机B的要求同步:“我现在就发,你接着吧!”,这是第三次对话。三次“对话”的目的是使数据包的发送和接收同步,经过三次“对话”之后,主机A才向主机B正式发送数据。

UDP(User Data Protocol,用户数据报协议)是与TCP相对应的协议。它是面向非连接的协议,它不与对方建立连接,而是直接就把数据包发送过去!
UDP适用于一次只传送少量数据、对可靠性要求不高的应用环境。比如,我们经常使用“ping”命令来测试两台主机之间TCP/IP通信是否正常,其实“ping”命令的原理就是向对方主机发送UDP数据包,然后对方主机确认收到数据包,如果数据包是否到达的消息及时反馈回来,那么网络就是通的。例如,在默认状态下,一次“ping”操作发送4个数据包(如图2所示)。大家可以看到,发送的数据包数量是4包,收到的也是4包(因为对方主机收到后会发回一个确认收到的数据包)。这充分说明了UDP协议是面向非连接的协议,没有建立连接的过程。正因为UDP协议没有连接的过程,所以它的通信效果高;但也正因为如此,它的可靠性不如TCP协议高。QQ就使用UDP发消息,因此有时会出现收不到消息的情况。
tcp协议和udp协议的差别
TCP UDP
是否连接 : 面向连接 面向非连接
传输可靠性: 可靠 不可靠
应用场合: 传输大量数据 少量数据
速度 : 慢 快

3.storm 和hadoop的区别?
Storm用于处理高速、大型数据流的分布式实时计算系统。为Hadoop添加了可靠的实时数据处理功能
Spark采用了内存计算。从多迭代批处理出发,允许将数据载入内存作反复查询,此外还融合数据仓库,流处理和图形计算等多种计算范式。Spark构建在HDFS上,能与Hadoop很好的结合。它的RDD是一个很大的特点。
Hadoop当前大数据管理标准之一,运用在当前很多商业应用系统。可以轻松地集成结构化、半结构化甚至非结构化数据集。

4.什么是延时,吞吐?
Latency,中文译作延迟。Throughput,中文译作吞吐量。它们是衡量软件系统的最常见的两个指标。
延迟一般包括单向延迟(One-way Latency)和往返延迟(Round Trip Latency),实际测量时一般取往返延迟。它的单位一般是ms、s、min、h等。
而吞吐量一般指相当一段时间内测量出来的系统单位时间处理的任务数或事务数(TPS)。注意“相当一段时间”,不是几秒,而可能是十几分钟、半个小时、一天、几周甚至几月。它的单位一般是TPS、每单位时间写入磁盘的字节数等。

5.storm 为什么不用python写?
其实storm可以用很多种语言写,只是根据自身的业务需求来选择使用什么语言来开发

展示大数据的结果用啥工具呀?
a. 开源大数据生态圈
Hadoop HDFS、Hadoop MapReduce, HBase、Hive 渐次诞生,早期Hadoop生态圈逐步形成。
开源生态圈活跃,并免费,但Hadoop对技术要求高,实时性稍差。
b. 商用大数据分析工具
一体机数据库/数据仓库
IBM PureData(Netezza), Oracle Exadata, SAP Hana等等。
数据仓库
Teradata AsterData, EMC GreenPlum, HP Vertica 等等。
数据集市
QlikView、 Tableau 、国内永洪科技Yonghong Data Mart 等等。
c.前端展现
用于展现分析的前端开源工具有JasperSoft,Pentaho, Spagobi, Openi, Birt等等。
用于展现分析商用分析工具有Cognos,BO, Microsoft, Oracle,Microstrategy,QlikView、 Tableau 、 国内永洪科技Yonghong Z-Suite等等。

6.blot的数量有限制嘛?
理论上没有,设置多了跟集群有关

7.spouts是进程还是线程?
线程 blot 也是线程

8.如果bolt挂掉 其他bolt能立即接管嘛?
可以接管,因为bolt是一个连接池的机制,当一个bolt挂掉之后,会重新分配一个bolt立即接管挂掉的bolt

9.啥是RPC?
RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。

10.可指定emit多个stream流?
待补充

11.标准输入流 只能有一个么?
一般都只有一个

12.什么时候用多个spout ?应用场景?
可以,需要根据自己的业务来调整,比如一个冷水管,和一个热水管,这两个水管可以同时供水。而且也可以根据需要将冷水和热水汇合

13.bolt输出结果可以返回给任一bolt吗?还是只会返给spout?
一般不要返回给spout,因为大部分场景spout直管出,不管入,当然也有特殊场景(不常用)

14.同一patition 到同一reduce中 4个reduce 最后还要进行一次reduce么 因为要全局排序?
待补充

15.mapreduce做的不是局部的排序吗,怎么保证全局排序?
待补充

16.storm只走内存,产生的结果怎么处理的?
直接存储到数据库,然后再对外展示 查看全部
 wordcount 对中文统计按空格,中文怎么统计,单个字吗?
使用结巴(jieba)python实现的中文分词来处理中文统计


1.mapreduce 的离线数据源来自哪呢?存储在哪?
数据源可以来自很多地方,比如日志系统,访客行为,或者采集的文本文件等等
数据进行处理过后可以存放到关系型数据库(mysql,oracle)或者非关系型数据库(mongoDB,Hbash)以及分布式列式存储数据库(HBase)等都可以。


2.tcp/udp 的区别?
TCP(Transmission Control Protocol,传输控制协议)是基于连接的协议,也就是说,在正式收发数据前,必须和对方建立可靠的连接。一个TCP连接必须要经过三次“对话”才能建立起来,其中的过程非常复杂,我们这里只做简单、形象的介绍,你只要做到能够理解这个过程即可。我们来看看这三次对话的简单过程:主机A向主机B发出连接请求数据包:“我想给你发数据,可以吗?”,这是第一次对话;主机B向主机A发送同意连接和要求同步(同步就是两台主机一个在发送,一个在接收,协调工作)的数据包:“可以,你什么时候发?”,这是第二次对话;主机A再发出一个数据包确认主机B的要求同步:“我现在就发,你接着吧!”,这是第三次对话。三次“对话”的目的是使数据包的发送和接收同步,经过三次“对话”之后,主机A才向主机B正式发送数据。

UDP(User Data Protocol,用户数据报协议)是与TCP相对应的协议。它是面向非连接的协议,它不与对方建立连接,而是直接就把数据包发送过去!
UDP适用于一次只传送少量数据、对可靠性要求不高的应用环境。比如,我们经常使用“ping”命令来测试两台主机之间TCP/IP通信是否正常,其实“ping”命令的原理就是向对方主机发送UDP数据包,然后对方主机确认收到数据包,如果数据包是否到达的消息及时反馈回来,那么网络就是通的。例如,在默认状态下,一次“ping”操作发送4个数据包(如图2所示)。大家可以看到,发送的数据包数量是4包,收到的也是4包(因为对方主机收到后会发回一个确认收到的数据包)。这充分说明了UDP协议是面向非连接的协议,没有建立连接的过程。正因为UDP协议没有连接的过程,所以它的通信效果高;但也正因为如此,它的可靠性不如TCP协议高。QQ就使用UDP发消息,因此有时会出现收不到消息的情况。
tcp协议和udp协议的差别
TCP UDP
是否连接 : 面向连接 面向非连接
传输可靠性: 可靠 不可靠
应用场合: 传输大量数据 少量数据
速度 : 慢 快

3.storm 和hadoop的区别?
Storm用于处理高速、大型数据流的分布式实时计算系统。为Hadoop添加了可靠的实时数据处理功能
Spark采用了内存计算。从多迭代批处理出发,允许将数据载入内存作反复查询,此外还融合数据仓库,流处理和图形计算等多种计算范式。Spark构建在HDFS上,能与Hadoop很好的结合。它的RDD是一个很大的特点。
Hadoop当前大数据管理标准之一,运用在当前很多商业应用系统。可以轻松地集成结构化、半结构化甚至非结构化数据集。

4.什么是延时,吞吐?
Latency,中文译作延迟。Throughput,中文译作吞吐量。它们是衡量软件系统的最常见的两个指标。
延迟一般包括单向延迟(One-way Latency)和往返延迟(Round Trip Latency),实际测量时一般取往返延迟。它的单位一般是ms、s、min、h等。
而吞吐量一般指相当一段时间内测量出来的系统单位时间处理的任务数或事务数(TPS)。注意“相当一段时间”,不是几秒,而可能是十几分钟、半个小时、一天、几周甚至几月。它的单位一般是TPS、每单位时间写入磁盘的字节数等。

5.storm 为什么不用python写?
其实storm可以用很多种语言写,只是根据自身的业务需求来选择使用什么语言来开发

展示大数据的结果用啥工具呀?
a. 开源大数据生态圈
Hadoop HDFS、Hadoop MapReduce, HBase、Hive 渐次诞生,早期Hadoop生态圈逐步形成。
开源生态圈活跃,并免费,但Hadoop对技术要求高,实时性稍差。
b. 商用大数据分析工具
一体机数据库/数据仓库
IBM PureData(Netezza), Oracle Exadata, SAP Hana等等。
数据仓库
Teradata AsterData, EMC GreenPlum, HP Vertica 等等。
数据集市
QlikView、 Tableau 、国内永洪科技Yonghong Data Mart 等等。
c.前端展现
用于展现分析的前端开源工具有JasperSoft,Pentaho, Spagobi, Openi, Birt等等。
用于展现分析商用分析工具有Cognos,BO, Microsoft, Oracle,Microstrategy,QlikView、 Tableau 、 国内永洪科技Yonghong Z-Suite等等。

6.blot的数量有限制嘛?
理论上没有,设置多了跟集群有关

7.spouts是进程还是线程?
线程 blot 也是线程

8.如果bolt挂掉 其他bolt能立即接管嘛?
可以接管,因为bolt是一个连接池的机制,当一个bolt挂掉之后,会重新分配一个bolt立即接管挂掉的bolt

9.啥是RPC?
RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。

10.可指定emit多个stream流?
待补充

11.标准输入流 只能有一个么?
一般都只有一个

12.什么时候用多个spout ?应用场景?
可以,需要根据自己的业务来调整,比如一个冷水管,和一个热水管,这两个水管可以同时供水。而且也可以根据需要将冷水和热水汇合

13.bolt输出结果可以返回给任一bolt吗?还是只会返给spout?
一般不要返回给spout,因为大部分场景spout直管出,不管入,当然也有特殊场景(不常用)

14.同一patition 到同一reduce中 4个reduce 最后还要进行一次reduce么 因为要全局排序?
待补充

15.mapreduce做的不是局部的排序吗,怎么保证全局排序?
待补充

16.storm只走内存,产生的结果怎么处理的?
直接存储到数据库,然后再对外展示

二期学员报道贴

MapReducelinchao 回复了问题 • 8 人关注 • 7 个回复 • 875 次浏览 • 2017-06-07 15:26 • 来自相关话题

hadoop_streaming开发要点_提交作业jobconf常用配置

MapReduceeverything 发表了文章 • 0 个评论 • 797 次浏览 • 2017-06-05 15:18 • 来自相关话题

Hadoop streaming是和hadoop一起发布的实用程序。它允许用户创建和执行使用任何程序或者脚本(例如: python,ruby,shell,php)编写的map或者reduce的mapreduce jobs。

# 脚本程序run.sh
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python reduce.py" \
-jobconf "mapred.reduce.tasks=2" \ # reduce task的数量由mapred.reduce.tasks这个参数设定,默认值是1。
-file ./map.py \
-file ./reduce.pyinput: 指定作业的输入文件的HDFS路径,支持使用*通配符,支持指定多个文件或目录,可多次使用

output: 指定作业的输出文件的HDFS路径,路径必须存在,执行作业用户必须有创建该目录的权限,只能使用一次

mapper: 用户自己写的mapper程序

reducer: 用户自己写的reduce程序

file: 允许用户设置task的文件和文件档案,类似的配置还有-cacheFile, -cacheArchive分别用于向计算节点分发HDFS文件和HDFS压缩文件
分发的文件有如下:
map和reduce的执行文件map和reduce要用到的数据文件、配置文件

jobconf: 提交作业的一些配置属性
mapred.map.tasks:map task数目mapred.reduce.tasks:reduce task数目stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目num.key.fields.for.partition指定对key分出来的前几部分做partition而不是整个key
 
# jobconf常用配置
 
mapred.job.name 作业名
mapred.job.priority 作业优先级
mapred.job.map.capacity 最多同时运行map任务数
mapred.job.reduce.capacity 最多同时运行reduce任务数
mapred.task.timeout 任务没有响应(输入输出)的最大时间
mapred.compress.map.output map的输出是否压缩
mapred.map.output.compression.codec map的输出压缩方式
mapred.output.compress reduce的输出是否压缩
mapred.output.compression.codec reduce的输出压缩方式
stream.map.output.field.separator map输出分隔符 查看全部


Hadoop streaming是和hadoop一起发布的实用程序。它允许用户创建和执行使用任何程序或者脚本(例如: python,ruby,shell,php)编写的map或者reduce的mapreduce jobs。


# 脚本程序run.sh
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python reduce.py" \
-jobconf "mapred.reduce.tasks=2" \ # reduce task的数量由mapred.reduce.tasks这个参数设定,默认值是1。
-file ./map.py \
-file ./reduce.py
input: 指定作业的输入文件的HDFS路径,支持使用*通配符,支持指定多个文件或目录,可多次使用

output: 指定作业的输出文件的HDFS路径,路径必须存在,执行作业用户必须有创建该目录的权限,只能使用一次

mapper: 用户自己写的mapper程序

reducer: 用户自己写的reduce程序

file: 允许用户设置task的文件和文件档案,类似的配置还有-cacheFile, -cacheArchive分别用于向计算节点分发HDFS文件和HDFS压缩文件
分发的文件有如下:
  • map和reduce的执行文件
  • map和reduce要用到的数据文件、配置文件


jobconf: 提交作业的一些配置属性
  • mapred.map.tasks:map task数目
  • mapred.reduce.tasks:reduce task数目
  • stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
  • num.key.fields.for.partition指定对key分出来的前几部分做partition而不是整个key

 
# jobconf常用配置
 
mapred.job.name 作业名
mapred.job.priority 作业优先级
mapred.job.map.capacity 最多同时运行map任务数
mapred.job.reduce.capacity 最多同时运行reduce任务数
mapred.task.timeout 任务没有响应(输入输出)的最大时间
mapred.compress.map.output map的输出是否压缩
mapred.map.output.compression.codec map的输出压缩方式
mapred.output.compress reduce的输出是否压缩
mapred.output.compression.codec reduce的输出压缩方式
stream.map.output.field.separator map输出分隔符

python学习笔记——for循环

python基础25ma.cn 发表了文章 • 0 个评论 • 609 次浏览 • 2017-06-05 12:03 • 来自相关话题

大家对while循环已经有点熟悉了吧?今天我们来讲另一种循环语句:


for ... in ...
 
同while一样,for循环可以用来重复做一件事情。在某些场景下,它比while更好用。
比如之前的一道习题:输出1到100。
我们用while来做,需要有一个值来记录已经做了多少次,还需要在while后面判断是不是到了100。
如果用for循环,则可以这么写:


for i in range(1, 101):
print i

解释一下,range(1, 101)表示从1开始,到101为止(不包括101),取其中所有的整数。
for i in range(1, 101)就是说,把这些数,依次赋值给变量i。
相当于一个一个循环过去,第一次i = 1,第二次i = 2,……,直到i = 100。
当i = 101时跳出循环。
所以,当你需要一个循环10次的循环,你就只需要写:


for i in range(1, 11)


或者


for i in range(0, 10)

区别在于前者i是从1到10,后者i是从0到9。当然,你也可以不用i这个变量名。
比如一个循环n次的循环:


for count in range(0, n)
for循环的本质是对一个序列中的元素进行递归。什么是序列,以后再说。先记住这个最简单的形式:


for i in range(a, b)


从a循环至b-1


现在,你可以用for循环来打印0-9的随机数。、










  查看全部
大家对while循环已经有点熟悉了吧?今天我们来讲另一种循环语句:


for ... in ...
 
同while一样,for循环可以用来重复做一件事情。在某些场景下,它比while更好用。
比如之前的一道习题:输出1到100。
我们用while来做,需要有一个值来记录已经做了多少次,还需要在while后面判断是不是到了100。
如果用for循环,则可以这么写:


for i in range(1, 101):
print i

解释一下,range(1, 101)表示从1开始,到101为止(不包括101),取其中所有的整数。
for i in range(1, 101)就是说,把这些数,依次赋值给变量i。
相当于一个一个循环过去,第一次i = 1,第二次i = 2,……,直到i = 100。
当i = 101时跳出循环。
所以,当你需要一个循环10次的循环,你就只需要写:


for i in range(1, 11)


或者


for i in range(0, 10)

区别在于前者i是从1到10,后者i是从0到9。当然,你也可以不用i这个变量名。
比如一个循环n次的循环:


for count in range(0, n)
for循环的本质是对一个序列中的元素进行递归。什么是序列,以后再说。先记住这个最简单的形式:


for i in range(a, b)


从a循环至b-1


现在,你可以用for循环来打印0-9的随机数。、

for1.png


for2.png