import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import com.hbase.log.RecordParser;
public class HbaseInsertData {
public static class HbaseMapper
extends Mapper<LongWritable, Text, Text, Text>{
RecordParser parser = new RecordParser();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
String phone = parser.getPhone();
int bloodPressure = parser.getBloodPressure();
if(bloodPressure > 150) {
context.write(new Text(phone), new Text(bloodPressure + ""));
}
}
}
public static class HbaseReducer
extends TableReducer<Text, Text, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
String value = values.iterator().next().toString();
Put putRow = new Put(key.getBytes());
putRow.add("f1".getBytes(), "qualifier".getBytes(), value.getBytes());
context.write(new ImmutableBytesWritable(key.getBytes()), putRow);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.", "localhost"); //千万别忘记配置
Job job = new Job(conf, "count");
job.setJarByClass(HbaseInsertData.class);
job.setMapperClass(HbaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
Path in = new Path("hdfs://localhost:9000/input");
FileInputFormat.addInputPath(job, in);
TableMapReduceUtil.initTableReducerJob("tab1", HbaseReducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
解析的classRecordParser
import org.apache.hadoop.io.Text;
public class RecordParser {
private String id;
private String phone;
private int bloodPressure;
public void parse(String record) {
String[] logs = record.split(",");
id = logs[1];
phone = logs[3];
bloodPressure = Integer.parseInt(logs[4]);
}
public void parse(Text record) {
this.parse(record.toString());
}
public String getId() {
return id;
}
public String getPhone() {
return phone;
}
public int getBloodPressure() {
return bloodPressure;
}
}
分享到:
相关推荐
HBase是建立在HDFS上的面上列的数据库。...hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。
5.3.1 使用MongoDB、HBase和Redis更新及修改数据 98 5.3.2 有限原子性和事务完整性 99 5.4 小结 100 第6章 查询NoSQL存储 101 6.1 SQL与MongoDB查询功能的相似点 101 6.1.1 加载MovieLens数据 103 6.1.2 ...
利用HBase提供的MapReduce接口,让'dbinfo'的所有条目与query计算tf-idf相似度,根据tf-idf值从小到大排序,结果取排在前面的若干数据。 流程:query => sift特征点检测及描述符提取 => vocabulary对特征点进行编码...
在向数据库中插入记录时,HBase和关系数据库一样,每次都是以"行"为单位把整条记录插入数据库 C.HBase数据库表可以设置该表任意列作为索引 D.HBase是一种NoSQL数据库(正确答案) 10. 10单选(2分)已知一张表student...
用户在对某电影评分时在MYSQL的评分表中插入一条数据,以此来收集用户评分信息(MySQL),每过一段时间就对该时段内的评分数据进行协同过滤算法的MapReduce计算,计算结果是存储在HDFS里的,所以要使用sqoop工具来对...
用户在对某电影评分时在MYSQL的评分表中插入一条数据,以此来收集用户评分信息(MySQL),每过一段时间就对该时段内的评分数据进行协同过滤算法的MapReduce计算,计算结果是存储在HDFS里的,所以要使用sqoop工具来对...
133_hbase的bulkload命令实现hbase集群之间数据的传输2 D6 d; F6 S8 x+ I/ I0 B0 @ 134_hive同hbase集成,统计hbase数据表信息% Q/ R! Z1 J3 J) k+ H! {6 D# M 135_使用TableInputFormat进行MR编程! m& C6 B/ v6 N" ...
它建立在Apache Hadoop(TM)之上,提供*工具,使数据提取/转换/加载(ETL)变得容易*一种将结构强加于各种数据格式的机制*访问直接存储在Apache HDFS(TM)中的文件)或其他数据存储系统(例如Apache HBase(TM)...
使用 Hadoop MapReduce 进行批量加载 可插入数据库接口 datacube 暂时只支持 HBase 数据库后端。 示例: IdService idService = new CachingIdService(5, new MapIdService()); ConcurrentMap,byte[]> ...
实时大数据分析小项目: Andriod 应用程序中的 MapReduce(wordCount 示例) 使用 Android 应用程序实现的 HBase 功能(创建、插入、查看、删除) 存储库定期更新! 密苏里大学堪萨斯城分校
默认值对存在默认值设定的业务表,添加全局索引,完美支持,此时,不要使用不可变索引(IMMUTABLE_ROWS=true)不可变索引只会进行插入数据,不会更新数
2.2 MapReduce数据分享效率低..................................................................................... 9 2.3 MapReduce进行迭代操作 ..............................................................