`
zhangbaoming815
  • 浏览: 147622 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

使用MapReduce往Hbase插入数据

阅读更多
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import com.hbase.log.RecordParser;

public class HbaseInsertData {
	
	public static class HbaseMapper 
		extends Mapper<LongWritable, Text, Text, Text>{
		
		RecordParser parser = new RecordParser();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			parser.parse(value);
			String phone = parser.getPhone();
			int bloodPressure = parser.getBloodPressure();
			if(bloodPressure > 150) {
				context.write(new Text(phone), new Text(bloodPressure + ""));
			}
		}
	}
	
	public static class HbaseReducer
		extends TableReducer<Text, Text, ImmutableBytesWritable> {

		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Context context)
				throws IOException, InterruptedException {
			String value = values.iterator().next().toString();
			Put putRow = new Put(key.getBytes());
			putRow.add("f1".getBytes(), "qualifier".getBytes(), value.getBytes());
			
			context.write(new ImmutableBytesWritable(key.getBytes()), putRow);
		}
	}
	
	public static void main(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum.", "localhost");  //千万别忘记配置

		Job job = new Job(conf, "count");
		
		job.setJarByClass(HbaseInsertData.class);
		job.setMapperClass(HbaseMapper.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		Path in = new Path("hdfs://localhost:9000/input");
		FileInputFormat.addInputPath(job, in);
		
		TableMapReduceUtil.initTableReducerJob("tab1", HbaseReducer.class, job);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 

解析的classRecordParser

import org.apache.hadoop.io.Text;

public class RecordParser {
	
	private String id;
	private String phone;
	private int bloodPressure;
	
	public void parse(String record) {
		String[] logs = record.split(",");
		id = logs[1];
		phone = logs[3];
		bloodPressure = Integer.parseInt(logs[4]);
	}
	
	public void parse(Text record) {
		this.parse(record.toString());
	}

	public String getId() {
		return id;
	}

	public String getPhone() {
		return phone;
	}

	public int getBloodPressure() {
		return bloodPressure;
	}
}
 
分享到:
评论

相关推荐

    hive 整合 hbase

    HBase是建立在HDFS上的面上列的数据库。...hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。

    nosql 入门教程

    5.3.1 使用MongoDB、HBase和Redis更新及修改数据 98 5.3.2 有限原子性和事务完整性 99 5.4 小结 100 第6章 查询NoSQL存储 101 6.1 SQL与MongoDB查询功能的相似点 101 6.1.1 加载MovieLens数据 103 6.1.2 ...

    基于分布式数据库的图像检索系统

    利用HBase提供的MapReduce接口,让'dbinfo'的所有条目与query计算tf-idf相似度,根据tf-idf值从小到大排序,结果取排在前面的若干数据。 流程:query =&gt; sift特征点检测及描述符提取 =&gt; vocabulary对特征点进行编码...

    大数据技术原理与应用.docx

    在向数据库中插入记录时,HBase和关系数据库一样,每次都是以"行"为单位把整条记录插入数据库 C.HBase数据库表可以设置该表任意列作为索引 D.HBase是一种NoSQL数据库(正确答案) 10. 10单选(2分)已知一张表student...

    基于hadoop实现的电影推荐网站+源代码+文档说明

    用户在对某电影评分时在MYSQL的评分表中插入一条数据,以此来收集用户评分信息(MySQL),每过一段时间就对该时段内的评分数据进行协同过滤算法的MapReduce计算,计算结果是存储在HDFS里的,所以要使用sqoop工具来对...

    基于hadoop生态实现的的电影网站+源代码+文档说明

    用户在对某电影评分时在MYSQL的评分表中插入一条数据,以此来收集用户评分信息(MySQL),每过一段时间就对该时段内的评分数据进行协同过滤算法的MapReduce计算,计算结果是存储在HDFS里的,所以要使用sqoop工具来对...

    2017最新大数据架构师精英课程

    133_hbase的bulkload命令实现hbase集群之间数据的传输2 D6 d; F6 S8 x+ I/ I0 B0 @ 134_hive同hbase集成,统计hbase数据表信息% Q/ R! Z1 J3 J) k+ H! {6 D# M 135_使用TableInputFormat进行MR编程! m& C6 B/ v6 N" ...

    hive

    它建立在Apache Hadoop(TM)之上,提供*工具,使数据提取/转换/加载(ETL)变得容易*一种将结构强加于各种数据格式的机制*访问直接存储在Apache HDFS(TM)中的文件)或其他数据存储系统(例如Apache HBase(TM)...

    数据立方体datacube.zip

    使用 Hadoop MapReduce 进行批量加载 可插入数据库接口 datacube 暂时只支持 HBase 数据库后端。 示例: IdService idService = new CachingIdService(5, new MapIdService()); ConcurrentMap,byte[]&gt; ...

    Tweets-sentiemnt-analysis-on-IPhone:实时大数据分析

    实时大数据分析小项目: Andriod 应用程序中的 MapReduce(wordCount 示例) 使用 Android 应用程序实现的 HBase 功能(创建、插入、查看、删除) 存储库定期更新! 密苏里大学堪萨斯城分校

    zoeminghong#full-stack#phoenix_索引1

    默认值对存在默认值设定的业务表,添加全局索引,完美支持,此时,不要使用不可变索引(IMMUTABLE_ROWS=true)不可变索引只会进行插入数据,不会更新数

    大数据学习笔记.pdf

    2.2 MapReduce数据分享效率低..................................................................................... 9 2.3 MapReduce进行迭代操作 ..............................................................

Global site tag (gtag.js) - Google Analytics