利用JavaCompiler动态编译代码

Java Compiler API

在Java 6中javax.tools提供了标准的包来操作Java编译器,使得我们可以在程序运行期中动态加载Java类,来实现个性化的操作。实际应用可以在一些业务变化多的服务中,将共同操作抽象出来后,再针对不同业务场景动态加载编写的有业务属性的逻辑。

以一个简单的实际例子:

新建抽象基础类:JavaDynamicScript

1
2
3
4
5
public abstract class JavaDynamicScript {

public abstract Object script(Map<String, Object> context);

}

编写动态类代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import com.jinzl.compiler.dto.JavaDynamicScript;
import java.util.Map;
public class HelloWorldScript extends JavaDynamicScript {

private String content;

@Override
public Object script(Map<String, Object> context) {
System.out.println("脚本开始执行");
System.out.println("内部变量content: " + content);
System.out.println("执行上下文context: " + context);
System.out.println("脚本结束执行");
return content;
}
}

包装JavaCompilier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class JavaCodeCompiler {

public JavaCodeCompiler() throws MalformedURLException {}

private JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();

private final String libDir = StringUtils.substringBeforeLast(ResourceUtils.extractArchiveURL(JavaDynamicScript.class.getResource("")).getPath(), "/");
private final String classPath = System.getProperty("java.class.path");
private static final String JAVA_CODE_FILE_PATH = "string";

private static final StringJavaFileManager STRING_JAVA_FILE_MANAGER = CompilerConfig.standardFileManager();


public boolean compiler(String fullClassName, String sourceCode, DiagnosticCollector<JavaFileObject> diagnosticCollector){
long startTime = System.currentTimeMillis();
long compilerTakeTime;

JavaFileObject javaFileObject = new StringJavaFileObject(JAVA_CODE_FILE_PATH, fullClassName, sourceCode);
Iterable<String> options = Arrays.asList("-classpath", classPath, "-extdirs", libDir);

JavaCompiler.CompilationTask task = compiler.getTask(null, STRING_JAVA_FILE_MANAGER, diagnosticCollector, options, null, Arrays.asList(javaFileObject));
compilerTakeTime = System.currentTimeMillis() - startTime;

System.out.println("script compile success, class=" + fullClassName + ", time=" + compilerTakeTime);
return task.call();
}

}

使用 ToolProvider.getSystemJavaCompiler() 拿到JavaCompiler接口的实例。
这里还需要提供文件管理服务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class StringJavaFileObject extends SimpleJavaFileObject {

private String contents;

public StringJavaFileObject(String path, String className, String contents){
super(URI.create(path + ":///" + className.replaceAll("\\.", "/") + Kind.SOURCE.extension), Kind.SOURCE);
this.contents = contents;
}

@Override
public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
return contents;
}
}

评论

Idea - 设置Maven项目JDK版本

修改Maven的settings.xml文件,加入以下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- 让idea的maven项目-pom.xml文件变成jdk1.8的配置 -->
<profile>
<id>jdk-1.8</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>1.8</jdk>
</activation>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
</properties>
</profile>

然后在Maven Projects > Profiles 中选中jdk-1.8

评论

Java - Aviator

Aviator

Aviator是一个高性能、轻量级的java语言实现的表达式引擎,它动态地将表达式编译成字节码并运行,Aviator非常小,Aviator直接将表达式编译成JVM字节码,交给JVM运行。

特性

  1. 支持绝大多数运算操作符,包括算术操作符,关系运算符,逻辑操作符,位运算符,正则匹配操作符(=~),三元表达式
  2. 支持操作符优先级和括号强制设定优先级
  3. 逻辑运算符支持短路运算
  4. 支持丰富类型,例如nil,整数和浮点数,字符串,正则表达式,日期,变量等, 支持自动类型转换
  5. 内置一套强大的常用函数库
  6. 可自定义函数,易于扩展
  7. 可重载操作符
  8. 支持大数运算(BigDecimal)和高精度运算(BigDecimal)
  9. 性能优秀

Maven

1
2
3
4
5
<dependency>
<groupId>com.googlecode.aviator</groupId>
<artifactId>aviator</artifactId>
<version>3.3.0</version>
</dependency>

常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void generalMethodTest(){

System.out.println(AviatorEvaluator.execute("1 + 2 + 3"));

String name = "jinzl";
Map<String, Object> params = new HashMap<>();
params.put("name", name);
System.out.println(AviatorEvaluator.execute("'Hello, ' + name", params));
// Aviator 2.2之后新增exec方法,可以方便的传入变量
System.out.println(AviatorEvaluator.exec("'Hello, ' + name", name));

params.put("a", 3);
params.put("b", 4);
System.out.println(AviatorEvaluator.execute("a + b / 3.0", params));

}

自定义函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void customMethodTest(){
AviatorEvaluator.addFunction(new AddFunction());
System.out.println(AviatorEvaluator.execute("add(1, 2)"));
System.out.println(AviatorEvaluator.execute("add(add(1, 2), 100)"));
}

class AddFunction extends AbstractFunction{

@Override
public AviatorObject call(Map<String, Object> env, AviatorObject arg1, AviatorObject arg2) {
Number left = FunctionUtils.getNumberValue(arg1, env);
Number right = FunctionUtils.getNumberValue(arg2, env);
return new AviatorDouble(left.doubleValue() + right.doubleValue());
}

@Override
public String getName() {
return "add";
}
}

利用自定义函数可以各种强大的逻辑

编译表达式

上面的例子大都是Aviator做了编译并执行的工作,其实可以先编译表达式,返回一个编译的结果,在传入不同的env来复用编译结果,提高性能,推荐使用这一种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void cacheCompiledTest(){
String expression = "a - (b - c) > 200";
// 编译表达式
// Expression compiledExp = AviatorEvaluator.compile(expression);
Expression compiledExp = AviatorEvaluator.compile(expression, true);
Map<String, Object> env = new HashMap<>();
env.put("a", 200);
env.put("b", 300);
env.put("c", 400);
// 执行表达式
Boolean result = (Boolean)compiledExp.execute(env);
System.out.println(result);
}

可以通过

1
Expression compiledExp = AviatorEvaluator.compile(expression, true);

将cached设置为true,下次编译同一个表达式的时候直接将返回上一次编译的结果

常用的内置函数

  • 求长度:count(list)
  • 求和:reduce(list, + , 0),reduce函数接受三个参数,1) seq,2)聚合函数,3) 初始值
  • 过滤:filter(list, seq.gt(9)),过滤出list中所有大于9的元素并返回集合
  • 判断元素在不在集合里:include(list, 10)
  • 排序:sort(list)
  • 遍历整个集合: map(list, println),map接收的第二个函数将作用于集合中的每个元素

其他设置

两种运行模式

  • 默认AviatorEvaluator以执行速度有限:

    1
    AviatorEvaluator.setOptimize(AviatorEvaluator.EVAL);
  • 修改为编译速度有限,这样不会做编译优化

    1
    AviatorEvaluator.setOptimize(AviatorEvaluator.COMPILE);
评论

Netty - 知识点

共享handler

Netty的逻辑是,每次有连接到来的时候,都会调用ChannelInitializer的initChannel()方法,然后pipeline中的所有handler都会被new一次,但是这里大部分的handler内部都是没有成员变量的,也就是说无状态的,我们可以使用单例模式,即调用pipeline().addLast()方法的时候,都直接使用单例,不需要每次都new,提高效率也避免了创建很多小的对象,注意:如果一个handler要被多个channel进行共享,必须要加上@ChannelHandler.Sharable显示地标明这个handler是支持多个channel共享的,否则会报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 1. 加上注解标识,表明该 handler 是可以多个 channel 共享的
@ChannelHandler.Sharable
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {

// 2. 构造单例
public static final LoginRequestHandler INSTANCE = new LoginRequestHandler();

protected LoginRequestHandler() {
}

}
...
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// ...单例模式,多个 channel 共享同一个 handler
ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
// ...
}
});

压缩handler

利用MessageToMessageCodec

使用它可以让我们的编解码操作放到一个类里面去实现,它是一个无状态的handler,因此可以使用单例模式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@ChannelHandler.Sharable
public class PacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> {
public static final PacketCodecHandler INSTANCE = new PacketCodecHandler();

private PacketCodecHandler() {

}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) {
out.add(PacketCodec.INSTANCE.decode(byteBuf));
}

@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) {
ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();
PacketCodec.INSTANCE.encode(byteBuf, packet);
out.add(byteBuf);
}
}

缩短事件传播路径

压缩handler - 合并平行handler

平行handler即一个消息只会有一个handler触发处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@ChannelHandler.Sharable
public class IMHandler extends SimpleChannelInboundHandler<Packet> {
public static final IMHandler INSTANCE = new IMHandler();

private Map<Byte, SimpleChannelInboundHandler<? extends Packet>> handlerMap;

private IMHandler() {
handlerMap = new HashMap<>();

handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);
handlerMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE);
handlerMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE);
handlerMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE);
handlerMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE);
handlerMap.put(GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE);
handlerMap.put(LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {
handlerMap.get(packet.getCommand()).channelRead(ctx, packet);
}
}

更改事件传播源

如果outBound类型的handler较多,在写数据的时候能用ctx.writeAndFlush()就用这个方法
ctx.writeAndFlush()是从pipeline链中的当前节点开始往前找到第一个outBound类型的handler把对象往前进行传播,如果这个对象确认不需要经过其他outBound处理,就使用这个方法
ctx.channel().writeAndFlush()是从pipeline链中的最后一个outBound类型的handler开始,把对象往前进行传播,如果确认这个对象需要经过后面的outBound类型的handler,就使用这个方法

减少阻塞主线程的操作

默认情况下,Netty在启动的时候会开启2倍的CPU核数个NIO线程,而通常情况下我们单机会有几万或者十几万的连接,因此一条NIO线程会管理着几千或几万个连接,在传播事件的过程中,单条NIO线程的处理逻辑可以抽象成以下逻辑:

1
2
3
4
5
6
List<Channel> channelList = 已有数据可读的 channel
for (Channel channel in channelist) {
for (ChannelHandler handler in channel.pipeline()) {
handler.channelRead0();
}
}

只要有一个handler中的channelRead0()方法阻塞了NIO线程,都会拖慢绑定在该NIO线程上的所有的channel,我们需要把耗时的操作丢到我们的业务线程池中处理:

1
2
3
4
5
6
7
8
9
10
ThreadPool threadPool = xxx;

protected void channelRead0(ChannelHandlerContext ctx, T packet) {
threadPool.submit(new Runnable() {
// 1. balabala 一些逻辑
// 2. 数据库或者网络等一些耗时的操作
// 3. writeAndFlush()
// 4. balabala 其他的逻辑
})
}

准确统计处理时长

writeAndFlush()这个方法如果在非NIO线程中执行,它是一个异步的操作,调用之后是会立即返回的,剩下的所有操作都是Netty内部有一个任务队列异步执行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void channelRead0(ChannelHandlerContext ctx, T packet) {
threadPool.submit(new Runnable() {
long begin = System.currentTimeMillis();
// 1. balabala 一些逻辑
// 2. 数据库或者网络等一些耗时的操作

// 3. writeAndFlush
xxx.writeAndFlush().addListener(future -> {
if (future.isDone()) {
// 4. balabala 其他的逻辑
long time = System.currentTimeMillis() - begin;
}
});
})
}

心跳与定时检测

服务端空闲检测

服务端对于连接假死的应对策略就是空闲检测,Netty自带的IdelStateHandler就可以实现这个功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class IMIdleStateHandler extends IdleStateHandler {

private static final int READER_IDLE_TIME = 15;

public IMIdleStateHandler() {
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}

@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接");
ctx.channel().close();
}
}

调用了父类IdleStateHandler的构造函数,有四个参数:

  1. 读空闲时间
  2. 写空闲时间
  3. 读写空闲时间,表示我们不关心这两类条件
  4. 时间单位

    客户端定时发送心跳

    服务端在一段时间内没有收到客户单数据,产生的原因可以分为以下两种:
  5. 连接假死
  6. 非假死状态下确实没有发送数据
    我们需要排除掉第二种可能性,那么连接自然就是假死的,我们可以在客户端定期发送数据到服务端,通常称为心跳数据包
1
2
3
4
5
6
7
8
9
10
11
12
13
public class HeartBeatTimerHandler extends ChannelInboundHandlerAdapter {

private static final int HEARTBEAT_INTERVAL = 5;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.executor().scheduleAtFixedRate(() -> {
ctx.writeAndFlush(new HeartBeatRequestPacket());
}, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);

super.channelActive(ctx);
}
}

ctx.executor()返回的是当前的channel绑定的NIO线程,scheduleAtFixedRate(),类型JDK的定时任务机制,可以每隔一段时间执行一个任务

服务端回复心跳与客户端空闲检测

客户端空闲检测与服务端一样,在客户端pipeline的最前方插入IdleStateHandler
为了排除是否是因为服务端非假死状态下确实没有发送数据,服务端也要定期发送心跳给客户端,服务端只要在收到心跳之后回复客户端,给客户端发送一个心跳响应包即可

评论

大数据学习 - MapReduce

概述

MapReduce是一个分布式运算程序的编程框架,是用户开发 基于Hadoop的数据分析应用 的核心框架。MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

为什么要MapReduce

  1. 海量数据在单机上处理因为硬件资源限制,无法胜任
  2. 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
  3. 引入MapReduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理

    编程规范

  4. 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
  5. Mapper的输入数据是KV对的形式(类型可自定义)
  6. Mapper的输出数据是KV对的形式(类型可自定义)
  7. Mapper中业务逻辑写在map()方法中
  8. map()方法(maptask进程)对每一个KV调用一次
  9. Reducer的输出数据对应Mapper的输出数据类型,也是KV
  10. Reducer的业务逻辑写在reduce()方法中
  11. reduce()方法对每一组相同k的KV组调用一次
  12. 用户自定义的Mapper和Reducer都要继承各自的父类
  13. 整个程序需要一个Driver来进行提交,提交的是一个描述了各种必要信息的job对象

    MapTask并行度决定机制

    maptask的并行度决定map阶段的任务处理并发度,进而影响整个job的处理速度,那么,maptask并行实例是否越多越好?其并行度又是如何决定的呢?

    maptask并行度的决定机制

    一个job的map阶段并行度由客户端在提交job时决定,而客户端对map阶段并行度的规划的基本逻辑为:

将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个maptask并行实例处理

评论

Netty架构剖析

Netty逻辑架构

Netty采用了典型的三层网络架构进行设计和开发,如图:
Netty逻辑架构图

Reactor通信调度层

有一系列辅助类完成,包括Reactor线程NioEventLoop及其父类,NioSocketChannel/NioServerSocketChannel及其父类,ByteBuffer以及由其衍生出来的各种Buffer,Unsafe以及其衍生出的各种内部类等。该层的主要职责就是监听网络的读写和连接操作,负责将网络层的数据读取到内存缓冲区中,然后出发各种网络事件,例如连接创建、连接急活、读事件、写事件等,将这些事件出发到Pipeline中,有Pipeline管理的职责链来进行后续的处理。

职责链ChannelPipeline

负责事件在职责链中的有序传播,同时负责动态的编排职责链。职责链可以选择监听和处理自己关心的事件,它可以拦截处理和向后/向前传播事件。不同应用的Handler节点的功能也不同,通常情况下,往往会开发编解码Handler用于消息的编解码,它可以将外部的协议消息转换成内部的POJO对象,这样上层业务则只需要关系处理业务逻辑即可,不需要感知底层的协议差异和线程模型差异,实现了架构层面的分层隔离。

业务逻辑编排层(Service ChannelHandler)

业务逻辑编排层通常有两类:一类是 纯粹的业务逻辑编排,还有一类是其他的应用层协议插件,用于特定协议相关的会话和链路管理。例如CMPP协议,用于管理和中国移动短信系统的对接。
架构的不同层面,需要关心和处理的对象都不同,通常情况下,对于业务开发者,只需要关心职责链的拦截和业务Handler的编排。因为应用层协议栈往往是开发一次,到处运行,所以实际上对于业务开发者来说,只需要关心服务层的业务逻辑开发即可。各种应用协议以插件的形式提供,只有协议开发人员需要关注协议插件,对于其他业务开发人员来说,只需关系业务逻辑定制。这种分层架构设计理念实现了NIO框架各层之间的解耦,便于上层业务协议栈的开发和业务逻辑的定制。

Netty的高性能

  1. 采用异步非阻塞的I/O类库,基于Reactor模式实现,解决了传统同步阻塞I/O模式下一个服务端无法平滑地处理线性增长的客户端的问题。
  2. TCP接收和发送缓冲区使用直接内存代替堆内存,避免了内存复制,提升了I/O读取和写入的性能。
  3. 支持通过内存池的方式循环利用ByteBuf,避免了频繁创建和销毁ByteBuf带来的性能损耗
  4. 可配置的I/O线程数、TCP参数等,为不同的用户场景提供定制化的调优参数,满足不同的性能场景
  5. 采用环形数组缓冲区实现无锁化并发编程,代替传统的线程安全容器或者锁
  6. 合理地使用线程安全容器、原子类等,提升系统的并发处理能力
  7. 关键资源的处理使用单线程串行化的方式,避免多线程并发访问带来的锁竞争和额外的CPU资源消耗问题
  8. 通过引用计数器及时地申请释放不在被引用的对象,细粒度的内存管理降低了GC的频率,减少了频繁GC带来的时延增大和CPU损耗。

Netty的高可靠

  1. 链路有效性检测。利用心跳机制周期性地进行链路检测,Netty提供了两种链路空闲检测机制:
  • 读空闲超时机制:当连续周期T没有消息可读时,触发超时Handler,用户可以基于读空闲超时发送心跳消息,进行链路检测,如果连续N个周期仍然没有读取到心跳消息,可以主动关闭链路
  • 写空闲超时机制:当连续周期T没有消息要发送时,触发超时Handler,用户可以基于写空闲超时发送心跳消息,进行链路检测,如果连续N个周期仍然没有接收到对方的心跳消息,可以主动关闭链路
  1. 内存保护机制。包括以下几个方面:
  • 通过对象引用计数器对Netty的ByteBuf等内置对象进行细粒度的内存申请和释放,对非法的对象引用进行检测和保护
  • 通过内存池来重用ByteBuf,节省内存
  • 可设置的内存容量上限,包括ByteBuf、线程池线程数等
  1. 优雅停机。当系统退出时,JVM通过注册的ShutdownHook拦截到退出信号量,然后执行退出操作,释放相关模块的资源占用,将缓冲区的消息处理完成或者清空,将待刷新的数据持久化的磁盘或者数据库中,等到资源回收和缓冲区消息处理完成之后,再退出
  2. 可定制性。主要体现在一下几点:
  • 责任链模式:ChannelPipeline基于责任链模式开发,便于业务逻辑的拦截、定时和扩展
  • 基于接口的开发:关键的类库都提供了接口或者抽象类,如果Netty自身的实现无法满足用户的需求,可以由用户自定义实现相关接口
  • 提供了大量工厂类,通过重载这些工厂类可以按需创建出用户实现的对象
  • 提供了大量的系统参数供用户按需设置,增强系统的场景定制性
  1. 可扩展性。基于Netty的基础NIO框架,可以方便地进行应用层协议定制,例如HTTP协议栈、Thrift协议栈、FTP协议栈等。这些扩展不需要修改Netty的源码,直接基于Netty的二进制类库即可实现协议的扩展和定制。
评论

大数据学习 - HDFS

HDFS概述

  1. HDFS集群分为两大角色:NameNode,DataNode,Secondary NameNode
  2. NameNode负责管理整个文件系统的元数据
  3. DataNode负责管理用户的文件数据块
  4. 文件会按照固定的大小(blocksize)切成若干块后分布式存储在若干台datanode上
  5. 每一个文件块可以有多个副本,并存放在不同的datanode上
  6. datanode会定期想namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量
  7. HDFS的内部工作机制对客户端保持透明,客户端请求HDFS都是通过向namenode申请来进行

HDFS写数据流程

概述

客户端要向HDFS写数据,首先要跟namenode通信以确认可以写文件并获得接收文件block的datanode,然后客户端按顺序将文件逐个block传递给相应datanode,并由接收到block的datanode负责向其他datanode复制block的副本

流程

  1. 向namenode请求上传文件,namemode检查目标文件是否已存在,父目录是否存在
  2. namenode响应可以上传
  3. rpc请求上传一个block(0~128M)以获取datanode
  4. 返回datanode列表dn1,dn2,dn3(见备注1)
  5. client请求dn1建立block传输通道channel
  • dn1请求dn3建立通道
  • dn3请求dn4建立通道
  • dn4应答dn3成功
  • dn3应答dn1成功
  • dn1应答client成功
  1. client上传block,以packet为单位
  2. dn1接收block数据,将数据分为小数据块(packet)放入缓冲区(ByteBuf),每传一个packet会放入一个应答队列等待应答
  3. 再将packet传输给dn3、dn3同样操作传输给dn4
  4. 如果还有block未传输,则重复1~8

备注1:上传数据时,datanode的选择策略

  1. 第一个副本先考虑跟client最近的(同机架)
  2. 第二个副本在考虑跨机架挑选一个datanode,增加副本的可靠性
  3. 第三个副本就在第一个副本同机架另外挑选一台datanode存放

HDFS读数据流程

概述

客户端将要读取的文件路径发送给namenode,namenode获取文件的元信息(主要时block的存放位置信息)返回给客户端,客户端根据返回的信息找到相应datanode逐个获取文件的block并在客户端本地进行数据追加合并从而获取整个文件

流程

  1. 跟namenode通信查询元数据,找到文件块所在的datanode服务器
  2. 挑选一台datanode(就近原则,然后随机),请求建立socket流
  3. datanode开始发送数据(从磁盘里面读取数据放入流,以packet为单位来做校验)
  4. 客户端以packet为单位接收,先在本地缓存,然后写入目标文件

NameNode工作机制

职责

  • 负责客户端请求的响应
  • 元数据的管理(查询、修改)

    元数据管理

    namenode对数据的管理采用了三种存储形式:
  • 内存元数据(NameSystem)
  • 磁盘元数据镜像文件
  • 数据操作日志文件(可通过日志运算出元数据)

    元数据存储机制

  1. 内存中有一份完整的元数据(内存metadata)
  2. 磁盘有一个“准完整”的元数据镜像(fsimage)文件,文件再namenode的工作目录中
  3. 用于衔接内存metadata和持久化元数据镜像fsimage之间的操作日志(edits文件)
    注:当客户端对hdfs中的文件进行新增或者修改操作,操作记录首先被记入edits日志文件中,当客户端操作成功后,响应的元数据会更新到内存metadata中

    元数据手动查看

    可通过hdfs的一个工具来查看edits中的信息
1
2
bin/hdfs oev -i edits -o edits.xml
bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml

元数据的checkpoint

每隔一段时间,会由secondary namenode将namenode上积累的所有edits和一个最新fsimage下载到本地,并加载到内存进行merge,这个过程称为checkpoint

checkpoint流程

checkpoint流程

checkpoint操作的触发条件配置参数

1
2
3
4
5
6
7
8
dfs.namenode.checkpoint.check.period=60  #检查触发条件是否满足的频率,60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
#以上两个参数做checkpoint操作时,secondary namenode的本地工作目录
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}

dfs.namenode.checkpoint.max-retries=3 #最大重试次数
dfs.namenode.checkpoint.period=3600 #两次checkpoint之间的时间间隔3600秒
dfs.namenode.checkpoint.txns=1000000 #两次checkpoint之间最大的操作记录

checkpoint的附带作用

namenode和secondary namenode的工作目录存储结构完全相同,所以,当namenode故障退出需要重新恢复时,可以从secondary namenode的工作目录中将fsimage拷贝到namenode的工作目录,以恢复namenode的元数据

思考问题

  • namenode如果宕机,hdfs服务是否能够正常提供?
    不能
  • 如果namenode的硬盘损坏,元数据是否还能恢复?如果能恢复,如何恢复?
    可以恢复绝大部分,将sencondary namenode的工作目录拷贝到namenode上
  • 通过以上思考,我们再配置namenode工作目录参数时,有什么注意点?
    namenode的工作目录可以配置到多个磁盘下(配置hdfs-site.xml文件)

DataNode工作机制

职责

  • 存储管理用户的文件块数据
  • 定期向namenode汇报自身所持有的block信息(通过心跳信息上报),当集群中发生某些block副本失效时,集群如何恢复block初始副本数量的问题
1
2
3
4
5
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>3600000</value>
<description>Determines block reporting interval in milliseconds.</description>
</property>

DataNode掉线判断时限参数

datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把节点判定为死亡,要经过一段时间,这段时间称为超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则计算公式为

1
timeout=2*heartbeat.recheck.interval+10*dfs.heartbeat.interval

默认的heartbeat.recheck.interval为5分钟,dfs.heartbeat.interval为3秒
可于hdfs-site.xml文件中配置:

1
2
3
4
5
6
7
8
9
10
<property>
<!-- 单位毫秒 -->
<name>heartbeat.recheck.interval</name>
<value>2000</value>
</property>
<property>
<!-- 单位秒 -->
<name>dfs.heartbeat.interval</name>
<value>1</value>
</property>
评论

大数据学习 - hadoop

什么是hadoop

  1. hadoop是apache旗下的一套开源软件平台
  2. hadoop提供的功能,利用服务器集群,根据用户的自定义业务逻辑,对海量数据进行分布式处理
  3. hadoop的核心组件有:
  • HDFS(分布式文件系统)
  • YARN(运算资源调度系统)
  • MapReduce(分布式运算编程框架)
  1. 广义上,hadoop通常是指一个更广泛的概念–hadoop生态圈

常用命令参数介绍

  • ls

功能:显示目录信息

示例:hadoop fs -ls /wordcount/input

  • mkdir

功能:创建目录

示例:hadoop fs -mkdir -p /aaa/bbb/cc

  • moveFromLocal

功能:从本地剪切粘贴到hdfs

示例:hadoop fs -moveFromLocal /home/hadoop/a.txt /aaa/bbb/cc

  • moveToLocal

功能:从hdfs剪切粘贴到本地

示例:hadoop fs -moveToLocal /aaa/bbb/cc /home/hadoop/a.txt

  • appendToFile

功能:追加一个文件到已经存在的文件末尾

示例:hadoop fs -appendToFile ./hello.txt /hello.txt

  • cat

功能:查看文件内容

示例:hadoop fs -cat /hello.txt

  • tail

功能:查看文件尾部几行

示例:hadoop fs tailf /hello.txt | more

  • text

功能:以字符形式打印一个文件的内容

示例:hadoop fs -text /hello.txt

  • chgrp chmod chown

功能:与linux文件系统的用法一样,操作文件所属权限

示例:hadoop fs -chmod 666 /hello.txt

  • copyFromLocal copyToLocal

功能:用法与moveToLocal moveFromLocal 用法一致

  • cp

功能:从hdfs的一个路径拷贝到另一个路径

示例:hadoop fs -cp /aaa/from.txt /bbb/to.txt

  • mv

功能:从hdfs的一个路径移动另一个路径

示例:hadoop fs -mv /aaa/from.txt /

  • get

功能:等同于copyToLocal,就是从hdfs下载文件到本地

示例:hadoop fs -get /aaa/from.txt

  • getmerge

功能:合并下载多个文件

示例:hadoop fs -getmerge /aaa/log.*

  • put

功能:等同于copyFromLocal

示例:hadoop fs -put /aaa/from.txt /bbb/to.txt

  • rm

功能:删除文件或文件夹

示例: hadoop fs -rm -r /aaa/bbb/

  • rmdir

功能:删除kongmulu

示例:hadoop fs -rmdir /aaa/bbb

  • df

功能:统计文件系统的可用空间信息

示例:hadoop fs -df -h /

  • du

功能:统计文件夹的大小信息

示例:hadoop fs -du -s -h /

  • count

功能:统计一个指定目录下的文件节点数量

示例:hadoop fs -count /

  • setrep

功能:设置hdfs中文件的副本数量

示例: hadoop fs -setrep 3 /aaa/hello.txt

评论

大数据学习 - IO和netty

Linux网络I/O模型简介

根据UNIX网络编程对I/O模型的分类,unix提供了5种I/O模型:

  1. 阻塞I/O模型:最常用的I/O模型就是阻塞I/O模型,缺省情况下,所有文件操作都是阻塞的。以套接字接口为例来讲解此模型:在进程空间中调用recvfrom,其系统调用会直到数据包到达且被复制到应用进程的缓冲区中或者发生错误时才返回,在此期间一直会等待,进程在从调用recvfrom开始到它返回的整段时间内都是被阻塞的。阻塞IO模型
  2. 非阻塞I/O模型:recvfrom从应用层到内核的时候,如果该缓冲区没有数据的话,就直接返回一个EWOULDBLOCK错误,一般对非阻塞I/O模型进行轮询检查这个状态,看内核是不是有数据到来非阻塞IO模型
  3. I/O复用模型:Linux提供select/poll,进程通过一个或多个fd(文件描述符)传递给select或poll系统调用,阻塞在select操作上,这样select/poll可以帮我们侦测多个fd是否处于就绪状态。select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限,因此它的使用受到了一些制约。Linux还提供了一个epoll系统调用,epoll使用基于事件驱动方式代替顺序扫描,因此性能更高IO复用模型
  4. 信号驱动I/O模型:首先开启套接口信号驱动I/O功能,并铜鼓哦系统调用sigaction执行一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据准备就绪时,就为该进程生成一个SIGIO信号,通过信号回调通知应用程序调用recvfrom来读取数据,并通知主循环函数处理数据信号驱动IO模型
  5. 异步I/O:告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核复制到用户自己的缓冲区)通知我们。这种模型与信号驱动模型的主要区别是:信号驱动I/O由内核通知我们核是可以开始一个I/O操作;异步I/O模型由内核通知我们I/O操作何时已经完成异步IO模型

    Java中BIO存在的问题

    每当有一个新的客户端请求接入时,服务端必须创建一个新的线程处理新介入的客户端链路,一个线程只能处理一个客户端连接。在高性能服务器应用领域,往往需要面向成千上万个客户端的并发连接,这种模型显然无法满足高性能、高并发接入的场景。为了改进一线程一连接模型,后来又演进除了一种通过线程池或者消息队列实现1或者多个线程处理N个客户端的模型,由于它的底层通信机制依然使用同步阻塞I/O,所以被称为“伪异步”。

    Java中NIO概念

  6. 缓冲区Buffer:在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的。在写入数据时,写入到缓冲区中。任何时候访问NIO的数据,都是通过缓冲区进行操作。缓冲区实质上是一个数组,通常是一个字节数组,也可以是使用其他种类的数组,它又不仅仅是一个数组,缓冲区提供了对数据的结构化访问以及维护读写位置等信息。
  7. 通道Channel:Channel时一个通道,网络数据通过Channel读取和写入,它与流的不同之处在于通道是双向的,流只是在一个方向上移动(一个流必须是InputStream或者OutputStream的子类),而通过可以用于读、写或者二者同时进行。
  8. 多路复用器Selector:Selector对于NIO编程至关重要,多路复用器提供选择已经就绪的任务的能力,Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写时间,这个Channel就出于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合。一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2018的限制,也就意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。NIO服务端通信序列图NIO客户端通信序列图

    NIO优势

  9. 客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等到后续结果,不需要像之前的客户端那样被同步阻塞
  10. SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信线程就可以处理其他的链路,不需要同步等到这个链路可用
  11. 线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只限制于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector线程可以同时处理N个客户端连接,而且性能不会随着客户端的增加而线性下降,因此非常适合做高性能、高负载的网络服务器。

    netty的一个demo

    client

    EchoClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class EchoClient {

private String host;
private Integer port;
private NioEventLoopGroup nioEventLoopGroup = null;

public EchoClient(String host, Integer port){
this.host = host;
this.port = port;
}

public void start() throws Exception{
try {
// 客户端引导类
Bootstrap bootstrap = new Bootstrap();
// EventLoopGroup可以理解为是一个线程池, 这个线程池用来处理连接、接收数据
// 发送数据
nioEventLoopGroup = new NioEventLoopGroup();
bootstrap.group(nioEventLoopGroup) // 多线程处理
.channel(NioSocketChannel.class) // 制定通道类型为NioServerSocketChannel
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler()); // 注册handler
}
});
// 连接服务器
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture().sync();
}finally {
nioEventLoopGroup.shutdownGracefully().sync();
}

}

public static void main(String[] args) throws Exception {
EchoClient echoClient = new EchoClient("localhost", 20000);
echoClient.start();
}

}

EchoClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

// 客户端连接服务器后被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接服务器, 开始发送数据......");
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuf firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
ctx.writeAndFlush(firstMessage);
}

// 从服务器接收到数据后调用
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception {
System.out.println("client 读取server数据......");
// 服务器返回消息后
byte[] resp = new byte[msg.readableBytes()];
msg.readBytes(resp);
String body = new String(resp, "UTF-8");
System.out.println("服务端数据为: " + body);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptioncaught");
ctx.close();
}
}

server

EchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class EchoServer {

private final Integer port;

public EchoServer(Integer port){
this.port = port;
}

public void start() throws Exception{
EventLoopGroup eventExecutors = null;
try {
// server端引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 连接池处理数据
eventExecutors = new NioEventLoopGroup();
// 装配serverBootstrap
serverBootstrap.group(eventExecutors)
.channel(NioServerSocketChannel.class)
.localAddress("localhost", port)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
// 可添加多个InHandler OutHandler
// 添加顺序为: IN-1 OUT-1 OUT-2 IN-2
// 执行顺序为: IN-1 IN-2 OUT-2 OUT-1
// WARNING: OutHandler不能放在最后, 最后只能是InHandler
channel.pipeline().addLast(new EchoServerHandler());
}
});
// 最后绑定服务器等待直到绑定完成, 调用sync方法会阻塞直到服务器完成绑定
ChannelFuture channelFuture = serverBootstrap.bind().sync();
System.out.println("开始监听, 端口: " + channelFuture.channel());
channelFuture.channel().closeFuture().sync();
}finally {
if(eventExecutors != null){
eventExecutors.shutdownGracefully().sync();
}
}
}

public static void main(String[] args) throws Exception {
EchoServer echoServer = new EchoServer(20000);
echoServer.start();
}

}

EchoServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server 读取数据");
// 读取数据
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("接收client数据: " + body);
// 向客户端写数据
System.out.println("server向client发送数据");
String currentTime = new Date(System.currentTimeMillis()).toString();
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
// 执行下一个handler
// ctx.fireChannelRead(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("server 读取数据完毕");
ctx.flush(); // 刷新后才将数据发出到SocketChannel
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught");
ctx.close();
}
}

netty总结

  • ChannelInboundHandler之间的传递,需通过调用ctx.fireChannelRead(msg)实现,调用ctx.write(msg)将传递到ChannelOutboundHandler
  • ctx.write()方法执行后,需要调用flush()方法只能令它立即执行
  • ChannelOutboundHandler在pipeline注册的时候需要放在最后一个ChannelInboundHandler之前,否则将无法传递到ChannelOutboundHandler。
  • Handler的消费处理放在最后一个处理
评论

大数据学习 - RPC

什么是RPC

RPC(Remote procedure Call Protocol)–远程过程调用协议,它是一种通过网络从远程计算机程序中请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP和UDP,为通信程序之间携带信息数据。在OST网络通信模型中,RPC跨域了传输层和应用曾,RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
(未完待续)

评论