博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Mapreduce 扫描hbase表建立solr索引
阅读量:6208 次
发布时间:2019-06-21

本文共 4769 字,大约阅读时间需要 15 分钟。

hot3.png

package com.hbase.index;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class RebuildHbaseIndex {    public static final Logger LOG = LoggerFactory            .getLogger(RebuildHbaseIndex.class);    public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {                Configuration conf = HBaseConfiguration.create();        conf.setBoolean("mapred.map.tasks.speculative.execution", false);        //每次读取100条数据        conf.setInt("hbase.client.scanner.caching", 100);        String[] tbNames={"Suggest"};        for(int i=0;i
package com.hbase.index;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.apache.solr.client.solrj.SolrServer;import org.apache.solr.client.solrj.SolrServerException;import org.apache.solr.client.solrj.impl.HttpSolrServer;import org.apache.solr.common.SolrInputDocument;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class SolrIndexerMapper extends TableMapper
 {        public static final Logger LOG = LoggerFactory.getLogger(SolrIndexerMapper.class);    //计数器    public static enum Counters {ROWS};     //只创建一个SolrServer实例    private SolrServer solr;    public String solrURL="http://192.168.1.79:8983/solr/IK_shard1_replica1";    private int commitSize;    private final List
 docs=new ArrayList
();        //任务开始调用    protected void setup(Context context){        Configuration conf=context.getConfiguration();        solr=new HttpSolrServer(solrURL);        //一次性添加文档数        commitSize=conf.getInt("solr.commit.size", 1000);    }    @Override    protected void map(ImmutableBytesWritable row, Result values,Context context)throws IOException, InterruptedException {        SolrInputDocument solrDoc = new SolrInputDocument();        String rowkey=Bytes.toString(values.getRow());        String id=Bytes.toString(values.getRow());        String tableName="Suggest";                solrDoc.addField("id", id);        solrDoc.addField("rowkey", rowkey);        //hbase里面需要增加tableName字段        solrDoc.addField("tableName", tableName);                     for (KeyValue kv : values.list()) {            String fieldName = Bytes.toString(kv.getQualifier());            String fieldValue = Bytes.toString(kv.getValue());            solrDoc.addField(fieldName, fieldValue);        }                    docs.add(solrDoc);        if (docs.size() >= commitSize) {            try {                LOG.info("添加文档:Adding " + Integer.toString(docs.size()) + " documents");                solr.add(docs); // 索引文档            } catch (final SolrServerException e) {                final IOException ioe = new IOException();                ioe.initCause(e);                throw ioe;            }            docs.clear();        }        context.getCounter(Counters.ROWS).increment(1);    }        //任务结束时候调用    @Override    protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)            throws IOException, InterruptedException {        try {            if(!docs.isEmpty()){                LOG.info("清空队列:Adding " + Integer.toString(docs.size()) + " documents");                solr.add(docs);                docs.clear();            }                    } catch (final SolrServerException e) {            final IOException ioe=new IOException();            ioe.initCause(e);            throw ioe;        }    }    public static Job createSubmittableJob(Configuration conf, String tableName) throws IOException {        Job job=Job.getInstance(conf,"SolrIndex_" + tableName);        job.setJarByClass(SolrIndexerMapper.class);        Scan scan=new Scan();        //scan的数据不放在缓存中,一次性的        scan.setCacheBlocks(false);        job.setOutputFormatClass(NullOutputFormat.class);        TableMapReduceUtil.initTableMapperJob(tableName, scan,                SolrIndexerMapper.class, null, null, job); // 不需要输出,键、值类型为null        job.setNumReduceTasks(0); // 无reduce任务        return job;            }}

转载于:https://my.oschina.net/u/2293326/blog/524814

你可能感兴趣的文章
个人作业五:四则运算二
查看>>
codeforces 234E Champions' League
查看>>
JS 正则 钱
查看>>
浅析微信支付:申请退款、退款回调接口、查询退款
查看>>
ASP 错误处理
查看>>
雷军:互联网思维本质上就是群众路线
查看>>
Google Xpath Helper
查看>>
OS实验一实验报告
查看>>
N皇后问题
查看>>
ThreadLocal源码剖析
查看>>
分布式数据库数据一致性的原理、与技术实现方案
查看>>
java分享第十七天-01(封装操作xml类)
查看>>
SignalR Self Host+MVC等多端消息推送服务(4)
查看>>
谨慎设计一个单例类
查看>>
const char*, char const* and char *const 分类: ...
查看>>
opencv在同一窗口打印多张图片
查看>>
CentOS 安装MySQL(rpm)提示错误Header V3 DSA/SHA1 Signature
查看>>
sql综合练习题
查看>>
局域网访问控制
查看>>
web crawling(plus5) crawling wechat
查看>>