datax使用,mongodbreader 插件源码解读

datax:https://github.com/alibaba/DataX
测试:https://github.com/linkingli/dataxmysql
由于服务器最近被自己玩iptables搞炸了。所以这里是个localhost的测试
如果想进行远程测试,去mysql授权就可以,

mongodbreader插件源码

自定义异常类
public enum MongoDBReaderErrorCode implements ErrorCode {

    ILLEGAL_VALUE("ILLEGAL_PARAMETER_VALUE","参数不合法"),
    ILLEGAL_ADDRESS("ILLEGAL_ADDRESS","不合法的Mongo地址"),
    UNEXCEPT_EXCEPTION("UNEXCEPT_EXCEPTION","未知异常");

    private final String code;

    private final String description;

    private MongoDBReaderErrorCode(String code,String description) {
        this.code = code;
        this.description = description;
    }

    @Override
    public String getCode() {
        return code;
    }

    @Override
    public String getDescription() {
        return description;
    }
}

mg的初始化属性封装

package com.alibaba.datax.plugin.reader.mongodbreader;

//mg的链接属性,封装为一个class
public class KeyConstant {
   /**
    * 数组类型
    */
   public static final String ARRAY_TYPE = "array";
   /**
    * mongodb 的 host 地址
    */
   public static final String MONGO_ADDRESS = "address";
   /**
    * mongodb 的用户名
    */
   public static final String MONGO_USER_NAME = "userName";
   /**
    * mongodb 密码
    */
   public static final String MONGO_USER_PASSWORD = "userPassword";
   /**
    * mongodb 数据库名
    */
   public static final String MONGO_DB_NAME = "dbName";
   /**
    * mongodb 集合名
    */
   public static final String MONGO_COLLECTION_NAME = "collectionName";
   /**
    * mongodb 查询条件
    */
   public static final String MONGO_QUERY = "query";
   /**
    * mongodb 的列
    */
   public static final String MONGO_COLUMN = "column";
   /**
    * 每个列的名字
    */
   public static final String COLUMN_NAME = "name";
   /**
    * 每个列的类型
    */
   public static final String COLUMN_TYPE = "type";
   /**
    * 列分隔符
    */
   public static final String COLUMN_SPLITTER = "splitter";
   /**
    * 跳过的列数
    */
   public static final String SKIP_COUNT = "skipCount";
   /**
    * 批量获取的记录数
    */
   public static final String BATCH_SIZE = "batchSize";
   /**
    * MongoDB的idmeta
    */
   public static final String MONGO_PRIMIARY_ID_META = "_id";
   /**
    * 判断是否为数组类型
    * @param type 数据类型
    * @return
    */
   public static boolean isArrayType(String type) {
       return ARRAY_TYPE.equals(type);
   }
}

mg的reader类

package com.alibaba.datax.plugin.reader.mongodbreader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.mongodbreader.util.CollectionSplitUtil;
import com.alibaba.datax.plugin.reader.mongodbreader.util.MongoUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.mongodb.*;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.Document;

import java.util.*;


/*
*
* public abstract class Reader extends BaseObject
* {
* public static abstract class Job extends AbstractJobPlugin
*   {
*   public abstract List<Configuration> split(int adviceNumber);
*   }
*
*   public static abstract class Task extends AbstractTaskPlugin {
        public abstract void startRead(RecordSender recordSender);
    }
* }
*
*public abstract class AbstractTaskPlugin extends AbstractPlugin
*
*public abstract class AbstractPlugin extends BaseObject implements Pluginable
*
*public interface Pluginable{
* Configuration getPluginJobConf();
* }
*
* public class BaseObject
* {
* 重写了hashcode(),equals(),tostring();
* }
* */
public class MongoDBReader extends Reader {

    public static class Job extends Reader.Job {
        //Configuration 提供多级JSON配置信息无损存储
        private Configuration originalConfig = null;
        //mongodb_java_driver
        private MongoClient mongoClient;
        //username,passwd
        private String userName = null;
        private String password = null;
        //extends  com.alibaba.datax.common.spi;
        //adviceNumber是框架建议插件切分的任务数,调用自定义utils,CollectionSplitUtil
        @Override
        public List<Configuration> split(int adviceNumber) {
            return CollectionSplitUtil.doSplit(originalConfig,adviceNumber,mongoClient);
        }

        //KeyConstant获取user.passwd,db,config,并且建立client
        @Override
            public void init() {


                        /*

                                super.getPluginJobConf
                        * public interface Pluginable{
                            Configuration getPluginJobConf();
                                    }
                        * */
            this.originalConfig = super.getPluginJobConf();
                this.userName = originalConfig.getString(KeyConstant.MONGO_USER_NAME);
                this.password = originalConfig.getString(KeyConstant.MONGO_USER_PASSWORD);
                String database =  originalConfig.getString(KeyConstant.MONGO_DB_NAME);
            if(!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) {
                this.mongoClient = MongoUtil.initCredentialMongoClient(originalConfig,userName,password,database);
            } else {
                this.mongoClient = MongoUtil.initMongoClient(originalConfig);
            }
        }
/       //distory
        @Override
        public void destroy() {

        }
    }



    /*
    *在Task中重新获取链接属性
    * Job和Task之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。
    * 两者之间只能通过配置文件进行依赖。*/

    public static class Task extends Reader.Task {
        //Configuration 提供多级JSON配置信息无损存
        private Configuration readerSliceConfig;

        private MongoClient mongoClient;

        private String userName = null;
        private String password = null;

        private String database = null;
        private String collection = null;

        private String query = null;

        private JSONArray mongodbColumnMeta = null;
        /**
         * 批量获取的记录数
         */
        private Long batchSize = null;
        /**
         * 用来控制每个task取值的offset
         */
        private Long skipCount = null;
        /**
         * 每页数据的大小
         */
        private int pageSize = 1000;
        /*
        * 读取数据,进行分页处理
        *
        * */
        @Override
        public void startRead(RecordSender recordSender) {
            /*
            * 校验,调用自定义异常MongoDBReaderErrorCode
            * */
            if(batchSize == null ||
                             mongoClient == null || database == null ||
                             collection == null  || mongodbColumnMeta == null) {
                throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
                        MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
            }
            MongoDatabase db = mongoClient.getDatabase(database);
            MongoCollection col = db.getCollection(this.collection);
            BsonDocument sort = new BsonDocument();
            sort.append(KeyConstant.MONGO_PRIMIARY_ID_META, new BsonInt32(1));

            long pageCount = batchSize / pageSize;
            int modCount = (int)(batchSize % pageSize);

            for(int i = 0; i <= pageCount; i++) {
                if(modCount == 0 && i == pageCount) {
                    break;
                }
                if (i == pageCount) {
                    pageSize = modCount;
                }
                MongoCursor<Document> dbCursor = null;
                if(!Strings.isNullOrEmpty(query)) {
                    dbCursor = col.find(BsonDocument.parse(query)).sort(sort)
                            .skip(skipCount.intValue()).limit(pageSize).iterator();
                } else {
                    dbCursor = col.find().sort(sort)
                            .skip(skipCount.intValue()).limit(pageSize).iterator();
                }
                while (dbCursor.hasNext()) {
                    Document item = dbCursor.next();
                    Record record = recordSender.createRecord();
                    Iterator columnItera = mongodbColumnMeta.iterator();
                    while (columnItera.hasNext()) {
                        JSONObject column = (JSONObject)columnItera.next();
                        Object tempCol = item.get(column.getString(KeyConstant.COLUMN_NAME));
                        if (tempCol == null) {
                            continue;
                        }
                        if (tempCol instanceof Double) {
                            record.addColumn(new DoubleColumn((Double) tempCol));
                        } else if (tempCol instanceof Boolean) {
                            record.addColumn(new BoolColumn((Boolean) tempCol));
                        } else if (tempCol instanceof Date) {
                            record.addColumn(new DateColumn((Date) tempCol));
                        } else if (tempCol instanceof Integer) {
                            record.addColumn(new LongColumn((Integer) tempCol));
                        }else if (tempCol instanceof Long) {
                            record.addColumn(new LongColumn((Long) tempCol));
                        } else {
                            if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
                                String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
                                if(Strings.isNullOrEmpty(splitter)) {
                                    throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
                                            MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
                                } else {
                                    ArrayList array = (ArrayList)tempCol;
                                    String tempArrayStr = Joiner.on(splitter).join(array);
                                    record.addColumn(new StringColumn(tempArrayStr));
                                }
                            } else {
                                record.addColumn(new StringColumn(tempCol.toString()));
                            }
                        }
                    }
                    recordSender.sendToWriter(record);
                }
                skipCount += pageSize;
            }
        }
        /*
        * 初始化
        * */

        @Override
        public void init() {

            /*

                                super.getPluginJobConf
                        * public interface Pluginable{
                            Configuration getPluginJobConf();
                                    }
                        * */
            this.readerSliceConfig = super.getPluginJobConf();
                this.userName = readerSliceConfig.getString(KeyConstant.MONGO_USER_NAME);
                this.password = readerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD);
                this.database = readerSliceConfig.getString(KeyConstant.MONGO_DB_NAME);
            if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
                /*
                * 调用自定义utils,
                * MongoUtil.initCredentialMongoClient
                *
                * MongoUtil.initMongoClient
                *
                * */
                mongoClient = MongoUtil.initCredentialMongoClient(readerSliceConfig,userName,password,database);
            } else {
                mongoClient = MongoUtil.initMongoClient(readerSliceConfig);
            }
            
            this.collection = readerSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);
            this.query = readerSliceConfig.getString(KeyConstant.MONGO_QUERY);
            this.mongodbColumnMeta = JSON.parseArray(readerSliceConfig.getString(KeyConstant.MONGO_COLUMN));
            this.batchSize = readerSliceConfig.getLong(KeyConstant.BATCH_SIZE);
            this.skipCount = readerSliceConfig.getLong(KeyConstant.SKIP_COUNT);
        }
        /*
        * destroy
        * */
        @Override
        public void destroy() {

        }
    }
}

2个工具类,主要是mg的链接校验,mg传输切片处理

package com.alibaba.datax.plugin.reader.mongodbreader.util;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.mongodbreader.KeyConstant;
import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReaderErrorCode;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MongoUtil {
//mg初始化检测
    public static MongoClient initMongoClient(Configuration conf) {

        List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
        if(addressList == null || addressList.size() <= 0) {
            throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
        }
        try {
            return new MongoClient(parseServerAddress(addressList));
        } catch (UnknownHostException e) {
            throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
        } catch (NumberFormatException e) {
            throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
        } catch (Exception e) {
            throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
        }
    }
    //mg初始化客户端链接检验
    public static MongoClient initCredentialMongoClient(Configuration conf,String userName,String password,String database) {

        List<Object> addressList = conf.getList(KeyConstant.MONGO_ADDRESS);
        if(!isHostPortPattern(addressList)) {
            throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
        }
        try {
            MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
            return new MongoClient(parseServerAddress(addressList), Arrays.asList(credential));

        } catch (UnknownHostException e) {
            throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS,"不合法的地址");
        } catch (NumberFormatException e) {
            throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,"不合法参数");
        } catch (Exception e) {
            throw DataXException.asDataXException(MongoDBReaderErrorCode.UNEXCEPT_EXCEPTION,"未知异常");
        }
    }
    /**
     * 判断地址类型是否符合要求
     * @param addressList
     * @return
     */
    private static boolean isHostPortPattern(List<Object> addressList) {
        for(Object address : addressList) {
            String regex = "(\\S+):([0-9]+)";
            if(!((String)address).matches(regex)) {
                return false;
            }
        }
        return true;
    }
    /**
     * 转换为mongo地址协议
     * @param rawAddressList
     * @return
     */
    private static List<ServerAddress> parseServerAddress(List<Object> rawAddressList) throws UnknownHostException{
        List<ServerAddress> addressList = new ArrayList<ServerAddress>();
        for(Object address : rawAddressList) {
            String[] tempAddress = ((String)address).split(":");
            try {
                ServerAddress sa = new ServerAddress(tempAddress[0],Integer.valueOf(tempAddress[1]));
                addressList.add(sa);
            } catch (Exception e) {
                throw new UnknownHostException();
            }
        }
        return addressList;
    }

    public static void main(String[] args) {
        try {
            ArrayList hostAddress = new ArrayList();
            hostAddress.add("127.0.0.1:27017");
            System.out.println(MongoUtil.isHostPortPattern(hostAddress));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


package com.alibaba.datax.plugin.reader.mongodbreader.util;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.reader.mongodbreader.KeyConstant;
import com.alibaba.datax.plugin.reader.mongodbreader.MongoDBReaderErrorCode;
import com.google.common.base.Strings;
import com.mongodb.*;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.BsonDocument;

import java.util.ArrayList;
import java.util.List;

public class CollectionSplitUtil {

    //切分任务
    public static List<Configuration> doSplit(
            Configuration originalSliceConfig,int adviceNumber,MongoClient mongoClient) {

        List<Configuration> confList = new ArrayList<Configuration>();

        String dbName = originalSliceConfig.getString(KeyConstant.MONGO_DB_NAME);

        String collectionName = originalSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);

        if(Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(collectionName) || mongoClient == null) {
            throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
                    MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
        }

        String query = originalSliceConfig.getString(KeyConstant.MONGO_QUERY);

        MongoDatabase db = mongoClient.getDatabase(dbName);
        MongoCollection collection = db.getCollection(collectionName);

        List<Entry> countInterval = doSplitInterval(adviceNumber, collection, query);
        for(Entry interval : countInterval) {
            Configuration conf = originalSliceConfig.clone();
            conf.set(KeyConstant.SKIP_COUNT,interval.interval);
            conf.set(KeyConstant.BATCH_SIZE,interval.batchSize);
            confList.add(conf);
        }
        return confList;
    }

    //间隔切分

    private static List<Entry> doSplitInterval(int adviceNumber, MongoCollection collection, String query) {

        List<Entry> intervalCountList = new ArrayList<Entry>();

        long totalCount = 0;
        if (!Strings.isNullOrEmpty(query)) {
            totalCount = collection.count(BsonDocument.parse(query));
        } else {
            totalCount = collection.count();
        }
        if(totalCount < 0) {
            return intervalCountList;
        }
        // 100 6 => 16 mod 4
        long batchSize = totalCount/adviceNumber;
        for(int i = 0; i < adviceNumber; i++) {
            Entry entry = new Entry();
            /**
             * 这个判断确认不会丢失最后一页数据,
             * 因为 totalCount/adviceNumber 不整除时,如果不做判断会丢失最后一页
             */
            if(i == (adviceNumber - 1)) {
                entry.batchSize = batchSize + adviceNumber;
            } else {
                entry.batchSize = batchSize;
            }
            entry.interval = batchSize * i;
            intervalCountList.add(entry);
        }
        return intervalCountList;
    }

}

class Entry {
    Long interval;
    Long batchSize;
}


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容

  • afinalAfinal是一个android的ioc,orm框架 https://github.com/yangf...
    passiontim阅读 15,360评论 2 44
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,067评论 25 707
  • 好人应该都快死绝了吧?我这里定义的好人并不包含那些看上去人畜无害,实则好吃懒做,浪费社会资源,甚至影响身边一群人之...
    鸡仔说阅读 647评论 0 0
  • 昨天几乎一夜没睡。 早上的晨间思几乎是闭着眼睛在说话。之所以没睡好一方面是在担心老公赶夜车回上海路上是否安全,另一...
    Super_Luna阅读 177评论 0 0