基于上一篇文章中,Canal实现 MySQL binlog解析,我们继续探讨,将mysql
中变化的数据,直接异构到ElasticSearch中。
简单的demo代码实现,es7的数据增删改。
<?php
require_once __DIR__ . "/vendor/autoload.php";
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use Com\Alibaba\Otter\Canal\Protocol\Column;
use Com\Alibaba\Otter\Canal\Protocol\Entry;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Com\Alibaba\Otter\Canal\Protocol\RowData;
class Sync
{
public static $esObj;
/**
* @param Entry $entry
* @throws \Exception
*/
public static function println($entry)
{
switch ($entry->getEntryType()) {
case EntryType::TRANSACTIONBEGIN:
case EntryType::TRANSACTIONEND:
return;
break;
}
$rowChange = new RowChange();
$rowChange->mergeFromString($entry->getStoreValue());
$evenType = $rowChange->getEventType();
$header = $entry->getHeader();
echo sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", $header->getLogfileName(), $header->getLogfileOffset(), $header->getSchemaName(), $header->getTableName(), $header->getEventType()), PHP_EOL;
echo $rowChange->getSql(), PHP_EOL;
/** @var RowData $rowData */
foreach ($rowChange->getRowDatas() as $rowData) {
switch ($evenType) {
case EventType::DELETE:
self::ptColumn($rowData->getBeforeColumns());
self::delete($header->getSchemaName() . '_' . $header->getTableName(), self::ptColumn($rowData->getBeforeColumns()));
break;
case EventType::INSERT:
self::insert($header->getSchemaName() . '_' . $header->getTableName(), self::ptColumn($rowData->getAfterColumns()));
break;
default:
echo '-------> before', PHP_EOL;
self::ptColumn($rowData->getBeforeColumns());
echo '-------> after', PHP_EOL;
self::ptColumn($rowData->getAfterColumns());
self::update($header->getSchemaName() . '_' . $header->getTableName(), self::ptColumn($rowData->getAfterColumns()));
break;
}
}
}
private static function ptColumn($columns)
{
$arr = [];
/** @var Column $column */
foreach ($columns as $column) {
$arr[$column->getName()] = $column->getValue();
// echo sprintf("%s : %s update= %s", $column->getName(), $column->getValue(), var_export($column->getUpdated(), true)), PHP_EOL;
}
var_dump($arr);
return $arr;
}
public static function insert(string $table, array $data)
{
$params = [
'index' => $table,
'id' => 0,
'body' => []
];
$params['id'] = $data['id'];
$params['body'] = $data;
try {
$res = self::getEs()->index($params);
} catch (Throwable $t) {
var_dump($t->getMessage());
}
}
public static function delete(string $table, array $data)
{
$params = [
'index' => $table,
'id' => $data["id"],
];
try {
$res = self::getEs()->delete($params);
} catch (Throwable $t) {
var_dump($t->getMessage());
}
}
public static function update(string $table, array $data)
{
$params = [
'index' => $table,
'id' => $data["id"],
'body' => [
'doc' => $data
]
];
try {
$res = self::getEs()->update($params);
} catch (Throwable $t) {
var_dump($t->getMessage());
}
}
public static function getEs()
{
if (!self::$esObj) {
try {
$hosts = [
'127.0.0.1:9200',
];
self::$esObj = \Elasticsearch\ClientBuilder::create()->setHosts($hosts)->build();
} catch (Throwable $e) {
var_dump(__LINE__, $e->getMessage());
die;
}
}
return self::$esObj;
}
}
try {
$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
$client->connect("127.0.0.1", 11111);
$client->checkValid();
$client->subscribe("1001", "test", ".*\\..*");
# $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤
while (true) {
$message = $client->get(1);
if ($entries = $message->getEntries()) {
foreach ($entries as $entry) {
//Fmt::println($entry);
Sync::println($entry);
}
}
sleep(1);
}
$client->disConnect();
} catch (\Exception $e) {
echo $e->getMessage(), PHP_EOL;
}