博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka源码分析之MemoryRecords
阅读量:5923 次
发布时间:2019-06-19

本文共 6039 字,大约阅读时间需要 20 分钟。

        MemoryRecords是Kakfa中Record在内存中的实现形式,它基于Java NIO中ByteBuffer来实现。MemoryRecords中成员变量如下:

private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;    // the compressor used for appends-only    // 仅仅用于appends的压缩器Compressor实例compressor    private final Compressor compressor;    // the write limit for writable buffer, which may be smaller than the buffer capacity    // 可写缓冲区的写限制writeLimit,可能小于缓存区的装载能力    private final int writeLimit;    // the capacity of the initial buffer, which is only used for de-allocation of writable records    // 最初的缓冲区的装载能力initialCapacity,仅用于重新分配可写的记录    private final int initialCapacity;    // the underlying buffer used for read; while the records are still writable it is null    // 用于读的底层缓冲区buffer,java NIO的ByteBuffer类,此时记录仍可写    private ByteBuffer buffer;    // indicate if the memory records is writable or not (i.e. used for appends or read-only)    // 标志内存记录是否可写的状态位writable    private boolean writable;
        其中,compressor是仅仅用于appends的压缩器Compressor实例,writeLimit是可写缓冲区的写限制,它可能小于缓存区的装载能力,initialCapacity是最初的缓冲区的装载能力,仅用于重新分配可写的记录,buffer是java NIO的ByteBuffer实例,用于读的底层缓冲区,而writable是一个标志内存记录是否可写的状态位。

        再来看下MemoryRecords的构造函数,如下:

// Construct a writable memory records    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {            	// 根据入参赋值writable、writeLimit    	this.writable = writable;        this.writeLimit = writeLimit;                // initialCapacity取Java NIO中ByteBuffer的capacity        this.initialCapacity = buffer.capacity();                if (this.writable) {// 如果writable为true,即处于可写状态        	        	// buffer设置为null            this.buffer = null;                        // 利用入参ByteBuffer类型的buffer和CompressionType类型的type构造Compressor实例compressor            this.compressor = new Compressor(buffer, type);        } else {        	        	// buffer设置为入参ByteBuffer类型的buffer            this.buffer = buffer;                        // compressor设置为null            this.compressor = null;        }    }
        通过MemoryRecords的构造函数我们可以知道,MemoryRecords有两种基本的状态,一个是只写,一个是只读,当为只写时,标志位writable为true,此时MemoryRecords中ByteBuffer类型的成员变量buffer被设置为null,同时利用入参ByteBuffer类型的buffer和CompressionType类型的type构造Compressor实例compressor;当为只读时,buffer设置为入参ByteBuffer类型的buffer,compressor设置为null。

        MemoryRecords最主要的一个功能就是添加记录,而实现这一功能的方法就是append()方法,代码如下:

/**     * Append a new record and offset to the buffer     * 添加一条新的记录,并且在缓冲区buffer中记录偏移量offset     */    public void append(long offset, byte[] key, byte[] value) {    	    	// 首先判断MemoryRecords的可写标志位writable    	if (!writable)            throw new IllegalStateException("Memory records is not writable");    	// 根据key、value通过Record的recordSize()方法计算记录大小size        int size = Record.recordSize(key, value);                // 压缩器compressor中记录偏移量offset、记录大小size、记录key和value        compressor.putLong(offset);        compressor.putInt(size);        compressor.putRecord(key, value);                // 压缩器compressor中通过recordWritten()方法累加记录数numRecords和已写入未压缩总大小writtenUncompressed,        // 记录数numRecords为加1,        // 而已写入未压缩总大小writtenUncompresse是记录大小size再加上size所占大小和offset所占大小之和LOG_OVERHEAD,也就是额外的Int+Long为12        compressor.recordWritten(size + Records.LOG_OVERHEAD);    }
        上面为key、value形式的append,而还有一种Record形式的append,代码如下:

/**     * Append the given record and offset to the buffer     */    public void append(long offset, Record record) {            	// 首先判断MemoryRecords的可写标志位writable    	if (!writable)            throw new IllegalStateException("Memory records is not writable");    	// 获取记录大小size        int size = record.size();                // 压缩器compressor中记录偏移量offset、记录大小size、记录record的buffer        compressor.putLong(offset);        compressor.putInt(size);        compressor.put(record.buffer());                // 压缩器compressor中通过recordWritten()方法累加记录数numRecords和已写入未压缩总大小writtenUncompressed,        // 记录数numRecords为加1,        // 而已写入未压缩总大小writtenUncompresse是记录大小size再加上size所占大小和offset所占大小之和LOG_OVERHEAD,也就是额外的Int+Long为12        compressor.recordWritten(size + Records.LOG_OVERHEAD);                // 重绕此缓冲区,将位置设置为零并丢弃标记        record.buffer().rewind();    }
        MemoryRecords中还有一个针对指定Record的key、value来判断是否尚有余地的hasRoomFor()方法,代码如下:

/**     * Check if we have room for a new record containing the given key/value pair     *     * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be     * accurate if compression is really used. When this happens, the following append may cause dynamic buffer     * re-allocation in the underlying byte buffer stream.     *     * There is an exceptional case when appending a single message whose size is larger than the batch size, the     * capacity will be the message size which is larger than the write limit, i.e. the batch size. In this case     * the checking should be based on the capacity of the initialized buffer rather than the write limit in order     * to accept this single record.     */    public boolean hasRoomFor(byte[] key, byte[] value) {        return this.writable && this.compressor.numRecordsWritten() == 0 ?            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :            this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);    }
        hasRoomFor()方法中,需要首先判断writable是否为true,writable为false的话会直接返回false,直接通知调用者MemoryRecords已无余地存储Record。writable为true的话,还需要判断compressor的已写入数据大小numRecordsWritten是否为0,为0 的话,根据MemoryRecords的最初的缓冲区的装载能力initialCapacity是否大于key、value、offset所占大小、size所占大小之和来确定是否尚有足够空间容纳一个Record,不为0 的话,则根据可写缓冲区的写限制writeLimit是否大于compressor预估算的大小与key、value、offset所占大小、size所占大小之和来确定是否尚有足够空间容纳一个Record。compressor预估算的大小通过如下方法来判断:

public long estimatedBytesWritten() {        if (type == CompressionType.NONE) {            return bufferStream.buffer().position();        } else {            // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes            return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);        }    }
        

转载地址:http://posvx.baihongyu.com/

你可能感兴趣的文章
Centos的常用命令
查看>>
ListView侧滑菜单SwipeMenuListView的简单使用
查看>>
javascript之代理模式
查看>>
阿里云MaxCompute被Forrester评为全球云端数据仓库领导者
查看>>
ABAP下载的病毒扫描Virus Scan
查看>>
SAP标准培训课程C4C10学习笔记(二)第二单元
查看>>
观点 | 区块链十年,世界发生巨变
查看>>
BANCOR学习:如何开发自己的BANCOR去中心化交易平台?
查看>>
央行工作论文:区块链能做什么、不能做什么?
查看>>
Python库大全,建议收藏留用!
查看>>
百度翻译的代码实现,POST请求对接百度翻译接口获取Json数据
查看>>
Dubbo下一站:Apache顶级项目
查看>>
时序数据库连载系列:指标届的独角兽Prometheus
查看>>
读书笔记之Z-BLOG如何选择ASP和PHP程序以及数据库版本?
查看>>
抽象类和接口
查看>>
PageOffice V4.0 Excel常用的接口对象---Sheet类
查看>>
Java 常量池注意点
查看>>
Pod在多可用区worker节点上的高可用部署
查看>>
15 款免费好用的 Mac App
查看>>
带你了解SN74LVC245ADWR
查看>>