皮皮网

【王村源码路灯】【鲸卡源码】【HLJ指标源码】reducer源码

2024-11-20 00:24:44 来源:波段指标源码教学

1.如何使用Python为Hadoop编写一个简单的MapReduce程序
2.Rematch 源码系列四、Third-Party plugins
3.如何用63行代码写一个NgRx Store
4.重读Redux源码的感悟
5.如何在MaxCompute上运行HadoopMR作业
6.Redux(4.0.4)源码解析

reducer源码

如何使用Python为Hadoop编写一个简单的MapReduce程序

       MichaelG.Noll在他的Blog中提到如何在Hadoop中用Python编写MapReduce程序,韩国的gogamza在其Bolg中也提到如何用C编写MapReduce程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章,也让国内的Hadoop用户能够使用别的语言来编写MapReduce程序。  首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二安装部署)。HadoopStreaming帮助我们用非Java的编程语言使用MapReduce,Streaming用STDIN(标准输入)和STDOUT(标准输出)来和我们编写的Map和Reduce进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapReduce程序,比如我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。  我们还是使用Hadoop的例子WordCount来做示范如何编写MapReduce,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按"1"用标准的输出输出来,代表这个单词出现了一次。在Reduce中我们来统计单词的出现频率。    PythonCode  Map:mapper.py  #!/usr/bin/envpythonimportsys#mapswordstotheircountsword2count={ }#inputcomesfromSTDIN(standardinput)forlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line.strip()#splitthelineintowordswhileremovinganyemptystringswords=filter(lambdaword:word,line.split())#increasecountersforwordinwords:#writetheresultstoSTDOUT(standardoutput);#whatweoutputherewillbetheinputforthe#Reducestep,i.e.theinputforreducer.py##tab-delimited;thetrivialwordcountis1print'%s\t%s'%(word,1)  Reduce:reducer.py  #!/usr/bin/envpythonfromoperatorimportitemgetterimportsys#mapswordstotheircountsword2count={ }#inputcomesfromSTDINforlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line.strip()#parsetheinputwegotfrommapper.pyword,count=line.split()#convertcount(currentlyastring)tointtry:count=int(count)word2count[word]=word2count.get(word,0)+countexceptValueError:#countwasnotanumber,sosilently#ignore/discardthislinepass#sortthewordslexigraphically;##thisstepisNOTrequired,wejustdoitsothatour#finaloutputwilllookmoreliketheofficialHadoop#wordcountexamplessorted_word2count=sorted(word2count.items(),key=itemgetter(0))#writetheresultstoSTDOUT(standardoutput)forword,countinsorted_word2count:print'%s\t%s'%(word,count)  CCode  Map:Mapper.c  #include#include#include#include#defineBUF_SIZE#defineDELIM"\n"intmain(intargc,char*argv[]){ charbuffer[BUF_SIZE];while(fgets(buffer,BUF_SIZE-1,stdin)){ intlen=strlen(buffer);if(buffer[len-1]=='\n')buffer[len-1]=0;char*querys=index(buffer,'');char*query=NULL;if(querys==NULL)continue;querys+=1;/*nottoinclude'\t'*/query=strtok(buffer,"");while(query){ printf("%s\t1\n",query);query=strtok(NULL,"");}}return0;}h>h>h>h>  Reduce:Reducer.c  #include#include#include#include#defineBUFFER_SIZE#defineDELIM"\t"intmain(intargc,char*argv[]){ charstrLastKey[BUFFER_SIZE];charstrLine[BUFFER_SIZE];intcount=0;*strLastKey='\0';*strLine='\0';while(fgets(strLine,BUFFER_SIZE-1,stdin)){ char*strCurrKey=NULL;char*strCurrNum=NULL;strCurrKey=strtok(strLine,DELIM);strCurrNum=strtok(NULL,DELIM);/*necessarytocheckerrorbut.*/if(strLastKey[0]=='\0'){ strcpy(strLastKey,strCurrKey);}if(strcmp(strCurrKey,strLastKey)){ printf("%s\t%d\n",strLastKey,count);count=atoi(strCurrNum);}else{ count+=atoi(strCurrNum);}strcpy(strLastKey,strCurrKey);}printf("%s\t%d\n",strLastKey,count);/*flushthecount*/return0;}h>h>h>h>  首先我们调试一下源码:  chmod+xmapper.pychmod+xreducer.pyecho"foofooquuxlabsfoobarquux"|./mapper.py|./reducer.pybar1foo3labs1quux2g++Mapper.c-oMapperg++Reducer.c-oReducerchmod+xMapperchmod+xReducerecho"foofooquuxlabsfoobarquux"|./Mapper|./Reducerbar1foo2labs1quux1foo1quux1  你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.  在Hadoop中运行程序  首先我们要下载我们的测试文档wget页面中摘下的用php编写的MapReduce程序,供php程序员参考:Map:mapper.php  #!/usr/bin/php$word2count=array();//inputcomesfromSTDIN(standardinput)while(($line=fgets(STDIN))!==false){ //removeleadingandtrailingwhitespaceandlowercase$line=strtolower(trim($line));//splitthelineintowordswhileremovinganyemptystring$words=preg_split('/\W/',$line,0,PREG_SPLIT_NO_EMPTY);//increasecountersforeach($wordsas$word){ $word2count[$word]+=1;}}//writetheresultstoSTDOUT(standardoutput)//whatweoutputherewillbetheinputforthe//Reducestep,i.e.theinputforreducer.pyforeach($word2countas$word=>$count){ //tab-delimitedecho$word,chr(9),$count,PHP_EOL;}?>  Reduce:mapper.php  #!/usr/bin/php$word2count=array();//inputcomesfromSTDINwhile(($line=fgets(STDIN))!==false){ //removeleadingandtrailingwhitespace$line=trim($line);//parsetheinputwegotfrommapper.phplist($word,$count)=explode(chr(9),$line);//convertcount(currentlyastring)toint$count=intval($count);//sumcountsif($count>0)$word2count[$word]+=$count;}//sortthewordslexigraphically////thissetisNOTrequired,wejustdoitsothatour//finaloutputwilllookmoreliketheofficialHadoop//wordcountexamplesksort($word2count);//writetheresultstoSTDOUT(standardoutput)foreach($word2countas$word=>$count){ echo$word,chr(9),$count,PHP_EOL;}?>  作者:马士华发表于:--

Rematch 源码系列四、Third-Party plugins

       本文深入探讨了rematch的两个常用第三方插件:immer与loading。immer插件旨在简化state的修改过程,通过引入immerjs,允许开发者在reducer中使用mutable状态,王村源码路灯进而生成immutable状态,简化了常规操作。immer插件的实现相对简单,只需将常规reducer包裹一层,使之通过immerjs处理即可。

       immer插件的核心在于其对reducer的封装,通过immer.produce方法处理draft状态,简化了mutable状态的管理,避免了复杂的clone和赋值操作。当状态为简单数据类型时,不会使用immer.produce,以保持代码的简洁性。更多关于immer.produce和combineReducers的使用和原理可参考官方文档。

       然而,immer插件的设计存在缺陷,即许多reducer配置若不能以数组形式存储,而是被替换,则可能导致插件配置失效。鲸卡源码rematch v2版本通过引入更细粒度的plugin hooks(如onReducer)解决了这一问题,提升了配置的灵活性。

       紧接着是loading插件,专注于管理异步操作的状态,包括网络请求等。其核心在于onModel钩子的使用,定义了全局和模型级别的loading状态,并为特定操作定义了show和hide两个reducer,动态跟踪和控制加载状态。

       loading插件的实现通过初始化代码定义了全局和模型级别的loading状态,并使用onModel钩子处理模型操作,对特定的effect动作进行管理,包装原始动作以实现状态控制。两个reducer,show和hide,分别用于增加和减少操作状态的计数,以此实现对加载状态的动态更新。

       本文综述了rematch的immer和loading插件的实现原理、使用场景及优化策略,为开发者提供了深入理解这些工具的框架。后续文章将探讨rematch v1升级到v2的设计变化以及TypeScript支持的实现,期待与开发者共同探索rematch的最新进展和优化。

如何用行代码写一个NgRx Store

       深入解析 NgRx Store 的HLJ指标源码内部运作机制,通过精简的行代码实现一个基础版本的 StoreService,探索 NgRx Store 如何通过 RxJS 进行状态管理。本文旨在为开发者提供一个简化版的 NgRx Store 实现,以深入理解其核心原理。

       通过一个简单的 Angular NgRx-Seed app,我们可以学习 NgRx Store 的基础组件和工作流程。本文章将提供一个超简化的 StoreService,包含 dispatching action、accumulating state、以及使用 selector 订阅更新状态的核心功能。

       构建一个与 NgRx 非常相似但高度简化的 StoreService,代码覆盖了基本的 Store 功能,包括创建行为主题、调度 action、以及实现状态的积累与更新。此 StoreService 实现仅供学习和理解 NgRx Store 的内部构造,不可用于实际项目。

       关注 queueScheduler 的使用,确保 action 以初始化顺序同步接收,避免因重新进入而导致的内存溢出问题。action$ 和 reducer$ 的融合通过 withLatestFrom 操作符完成,确保了状态更新的正确执行。

       reducerFactory 是E宠源码 NgRx Store 的复杂部分,通过闭包实现状态的融合。简化版本的 StoreService 中,忽略了对 meta reducers 的处理,使用 combineReducers 作为默认工厂函数,用于创建一个可作为 StoreService 的源的 reducer 融合函数。

       在扫描操作符(scan)的作用下,action$ 和 reducer$ 被混合以创建一个具有状态记忆能力的 stream。实现的累计函数 reduceState 实现了状态的更新与累积,以响应 action 和 reducer 的变化。

       对于 select 和 createSelector 的实现,本文简化了类型安全功能,直接提供基础的实现,以展示如何从 StoreService 中获取状态。通过一个闭包和 map 操作符,select 函数实现了从 StoreService 获取数据并应用到模板中的逻辑。

       StoreService 实现中的 createSelector 提供了一个从所有 selectors 的结果中分离特定 selector 的工具,简化了状态的获取与展示。

       在实际应用中,将 StoreService 注入到 Angular app 的组件中,通过 ngOnInit 生命周期钩子获取状态并将其结果显示在模板中。组件中包含 dispatch 功能,实现与 NgRx Store API 类似的操作。

       本文源代码已提供,小财鱼源码欢迎阅读与学习。如有任何问题或建议,欢迎直接联系作者。

重读Redux源码的感悟

       大道至简的createStore

       创造理解的%在createStore.js中体现,剩下%涉及中间件,整体来看软件开发追求高内聚,内耦合,以简洁面世。Redux源码由9个文件构成,包含中间件的代码。整体而言,Redux的深层含义超出了源码大小所能体现,业界常言“Redux是百行代码千行文档”,强调其复杂性。

       回到createStore.js,剥离中间件影响,仅留下核心代码骨架。最终返回的对象即store,提供了常用API。通过观察者模式或发布/订阅模式理解此框架,但要认识到Redux并非仅此,它结合现代前端开发与函数式编程,带来限制与便利,如纯函数要求、测试便利性、功能解耦及性能优化。

       实现撤销功能(undo)示例,通过高阶reducer存储过往状态值,结合Redux实现撤销与重做。函数式编程的FP特性,使实现变得可能。

       combineReducer利用闭包概念,接收多个reducer,生成单个reducer,可遍历执行所有reducer。若两个reducer同时处理相同type的action,它们都会执行更新状态。此特性可能带来冲突,需合理命名以避免问题。

       使用CLI工具搭建开发环境可能耗时,codesandbox.io提供多种框架支持及快速加载依赖,适合灵感突发时快速测试代码。

       在命名Action时,采用namespace前缀(如/或@)可避免重复,有助于清晰管理状态与减少冲突。

       compose方法实现多个方法串联执行,功能强大,易于实现并用于中间件处理。在Redux中,中间件处理Action,与服务器端处理request、response的Koa或Express不同,但核心原理相似,利用compose方法串联功能。

       中间件本质为方法代理,通过增强原方法执行前后添加操作,实现AOP。在Redux中,中间件位于store.dispatch之前,通过代理dispatch实现场景扩展与功能增强。理解中间件需关注enhancer参数及createStore方法传递,最后实现store与中间件串联。

       以redux-thunk为例,底层参数接收中间件API,只传递store的getState和dispatch方法,遵循特定逻辑处理action,提供方法执行选择与状态管理。中间件使用时需阅读文档,理解其规范与实现细节。

       综上,Redux源码展示了现代前端开发与函数式编程的结合,从createStore、combineReducer到中间件,提供了高效状态管理与功能扩展。理解其核心概念与实现机制,有助于深入应用与开发。

如何在MaxCompute上运行HadoopMR作业

       MaxCompute(原ODPS)有一套自己的MapReduce编程模型和接口,简单说来,这套接口的输入输出都是MaxCompute中的Table,处理的数据是以Record为组织形式的,它可以很好地描述Table中的数据处理过程,然而与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果要将原来的Hadoop MR作业迁移到MaxCompute的MR执行,需要重写MR的代码,使用MaxCompute的接口进行编译和调试,运行正常后再打成一个Jar包才能放到MaxCompute的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的Hadoop MR代码就能在MaxCompute平台上跑起来,将是一个比较理想的方式。

       çŽ°åœ¨MaxCompute平台提供了一个HadoopMR到MaxCompute MR的适配工具,已经在一定程度上实现了Hadoop MR作业的二进制级别的兼容,即用户可以在不改代码的情况下通过指定一些配置,就能将原来在Hadoop上运行的MR jar包拿过来直接跑在MaxCompute上。目前该插件处于测试阶段,暂时还不能支持用户自定义comparator和自定义key类型,下面将以WordCount程序为例,介绍一下这个插件的基本使用方式。

       ä½¿ç”¨è¯¥æ’件在MaxCompute平台跑一个HadoopMR作业的基本步骤如下:

       1. 下载HadoopMR的插件

       ä¸‹è½½æ’件,包名为hadoop2openmr-1.0.jar,注意,这个jar里面已经包含hadoop-2.7.2版本的相关依赖,在作业的jar包中请不要携带hadoop的依赖,避免版本冲突。

       2. 准备好HadoopMR的程序jar包

       ç¼–译导出WordCount的jar包:wordcount_test.jar ,wordcount程序的源码如下:

       package com.aliyun.odps.mapred.example.hadoop;

       import org.apache.hadoop.conf.Configuration;

       import org.apache.hadoop.fs.Path;

       import org.apache.hadoop.io.IntWritable;

       import org.apache.hadoop.io.Text;

       import org.apache.hadoop.mapreduce.Job;

       import org.apache.hadoop.mapreduce.Mapper;

       import org.apache.hadoop.mapreduce.Reducer;

       import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

       import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

       import java.io.IOException;

       import java.util.StringTokenizer;

       public class WordCount {

       public static class TokenizerMapper

       extends Mapper<Object, Text, Text, IntWritable>{

       private final static IntWritable one = new IntWritable(1);

       private Text word = new Text();

       public void map(Object key, Text value, Context context

       ) throws IOException, InterruptedException {

       StringTokenizer itr = new StringTokenizer(value.toString());

       while (itr.hasMoreTokens()) {

       word.set(itr.nextToken());

       context.write(word, one);

       }

       }

       }

       public static class IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

       private IntWritable result = new IntWritable();

       public void reduce(Text key, Iterable<IntWritable> values,

       Context context

       ) throws IOException, InterruptedException {

       int sum = 0;

       for (IntWritable val : values) {

       sum += val.get();

       }

       result.set(sum);

       context.write(key, result);

       }

       }

       public static void main(String[] args) throws Exception {

       Configuration conf = new Configuration();

       Job job = Job.getInstance(conf, "word count");

       job.setJarByClass(WordCount.class);

       job.setMapperClass(TokenizerMapper.class);

       job.setCombinerClass(IntSumReducer.class);

       job.setReducerClass(IntSumReducer.class);

       job.setOutputKeyClass(Text.class);

       job.setOutputValueClass(IntWritable.class);

       FileInputFormat.addInputPath(job, new Path(args[0]));

       FileOutputFormat.setOutputPath(job, new Path(args[1]));

       System.exit(job.waitForCompletion(true) ? 0 : 1);

       }

       }

       3. 测试数据准备

       åˆ›å»ºè¾“入表和输出表

       create table if not exists wc_in(line string);

       create table if not exists wc_out(key string, cnt bigint);

       é€šè¿‡tunnel将数据导入输入表中

       å¾…导入文本文件data.txt的数据内容如下:

       hello maxcompute

       hello mapreduce

       ä¾‹å¦‚可以通过如下命令将data.txt的数据导入wc_in中,

       tunnel upload data.txt wc_in;

       4. 准备好表与hdfs文件路径的映射关系配置

       é…ç½®æ–‡ä»¶å‘½åä¸ºï¼šwordcount-table-res.conf

       {

       "file:/foo": {

       "resolver": {

       "resolver": "c.TextFileResolver",

       "properties": {

       "text.resolver.columns.combine.enable": "true",

       "text.resolver.seperator": "\t"

       }

       },

       "tableInfos": [

       {

       "tblName": "wc_in",

       "partSpec": { },

       "label": "__default__"

       }

       ],

       "matchMode": "exact"

       },

       "file:/bar": {

       "resolver": {

       "resolver": "openmr.resolver.BinaryFileResolver",

       "properties": {

       "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",

       "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"

       }

       },

       "tableInfos": [

       {

       "tblName": "wc_out",

       "partSpec": { },

       "label": "__default__"

       }

       ],

       "matchMode": "fuzzy"

       }

       }

Redux(4.0.4)源码解析

       Redux源码解析

       Redux源代码解析旨在清晰展示其核心组件及工作流程,力求用最简洁的语言阐述每个关键部分的功能。Redux提供了一个状态管理库,以管理应用的全局状态。以下是Redux核心组件的主要解析:

       createStore.js

       export default function createStore(reducer, preloadedState, enhancer)

       createStore函数是Redux的核心,负责创建一个状态存储对象。它可以接受三个参数:reducer(减少操作函数)、预加载状态(初始状态)和增强器(可选参数,用于添加额外功能)。

       getState

       获取当前状态,操作简单直接。

       subscribe

       向监听列表中添加监听函数,返回取消监听函数。在调用dispatch时订阅或取消订阅,不会影响正在进行的dispatch。下一次dispatch时,将使用订阅列表的最新快照。

       dispatch

       执行reducer获取最新状态,并依次执行监听队列中的函数。

       replaceReducer

       替换当前的reducer。执行后,dispatch一次更新状态。一般不常用。

       observable

       未见实际应用,可能用于特定场景。使用了symbol-observable包,对于熟悉该包的开发者来说,此部分可能有更多探索空间。

       utils

       包括actionTypes.js、isPlainObject.js、warning.js等辅助函数。actionTypes.js定义了Redux保留的私有操作类型,用于确保操作的正确处理。isPlainObject.js用于判断action对象是否为原生对象。warning.js用于抛出错误,保持代码质量。

       applyMiddleware.js

       通过createStore(reducer,applyMiddleware(...middleware))执行,返回带有中间件增强的dispatch。精简后,代码更加清晰。

       compose.js

       实现中间件的串联,依次增强dispatch流程。使用函数式编程技巧,代码简洁高效。

       bindActionCreators.js

       将单个或多个ActionCreator转化为dispatch(action)的函数集合,简化Action的使用方式。

       combineReducers.js

       将多个reducer整合为一个,调整state结构,便于管理和操作。

       整体而言,Redux的源码解析展示了其如何通过一系列核心组件实现状态管理的流程,从创建store到管理state、执行reducer、中间件串联,直至整合多个reducer,提供了一套高效、模块化的状态管理方案。理解这些组件及其功能是掌握Redux并能灵活应用的关键。