Перемещаем данные между разными инстансами Oracle используя DataPump

Задача:

Есть БД приложения, таблицы в которой чистятся самим приложением по расписанию. Это необходимо, и отключить нельзя. Надо хранить где-то копию данных из этих таблиц, с накоплением — ничего потеряться не должно. Данные не должны дублироваться. У некоторых таблиц нет primary key. В исходной базе таблицы хранятся в разных схемах, в итоговой должны храниться в одной для облегчения работы аналитиков.

Решение:

Пишем скрипт, который будет жить в CRON`e и работать по следующему алгоритму (для каждой таблицы свой вызов скрипта в свое время, если нужно подключить новую таблицу, заводим еще одну задачу в кроне):

  • Запускается на сервере аналитической базы (новой), и оттуда коннектится к боевой (старой).
  • Из боевой базы вытаскивает нужные данные из нужной таблицы, используя Data Pump (схему и название таблицы берет из входного параметра) и сохраняет дамп-файл
  • Проверяет, существует ли уже в новой базе таблица с таким именем и таблица с таким именем и префиксом tmp_
  • Если такой таблицы не существует, импортирует метаданные из дамп-файла, создает таблицу и временную её копию
  • В любом случае производит truncate временной таблицы
  • Импортирует данные из дамп-файла во временную таблицу
  • Производит слияние временной и основной таблицы (по полю id или другому, указанному для конкретного случая).
  • И разумеется, пишет все, что можно, в лог-файлы по каждому запуску


Вперед!

Для начала, нам придется указать путь к нашей боевой базе. В файле

vim $ORACLE_HOME/network/admin/tnsnames.ora

Добавим запись такого вида (меняем DISTANT_DB_HOST на имя или ip нашего боевого сервера, а SERVICE_NAME на global_name нашего боевого инстанса )

DISTANT_DB =
  (DESCRIPTION =
    (ADDRESS = (PROTOCOL = TCP)(HOST = DISTANT_DB_HOST)(PORT = 1521))
    (CONNECT_DATA =
      (SERVER = DEDICATED)
      (SERVICE_NAME = DISTANT_DB)
    )
  )

Далее создаем в самой аналитической базе публичный database link. Он нам понадобится для эспорта

CREATE PUBLIC DATABASE LINK "SOURCE_DB"
CONNECT TO USERNAME IDENTIFIED BY PASSWORD 
USING 'DISTANT_DB';

После этого создадим директорию в операционной системе, куда будем закидывать дамп-файлы и писать логи. (В данном случае я создаю, mkdir /u01/app/oracle/oradata/export/ ) и регистрируем её в нашей БД (SQL запросом

CREATE DIRECTORY export_dir AS '/u01/app/oracle/oradata/export';

Все это потребуется позднее.

Теперь приступаем к работе непосредственно над скриптом.

Так как он будет принимать в качестве входного параметра имя_схемы.название_таблицы (  # script schema.table ), надо предусмотреть ситуацию, когда параметр не указан

#!/bin/bash

if [ "$1" == "" ]
then
    echo "USING: export.sh scheme.table_name"
    exit 1
fi

curr_table="$1"

Так как мы будем запускать его из крона, нам потребуется установить переменные окружения. Здесь O_SID = $ORACLE_SID (надо продублировать явно)

export NLS_LANG=AMERICAN_AMERICA.AL32UTF8
export LD_LIBRARY_PATH=/u01/app/oracle/product/11.2.0/dbhome_1/lib
export ORACLE_SID=O_SID
export ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1
export PATH=$PATH:$ORACLE_HOME/bin

Вынесем настройки в начало скрипта

conn_str="oracle/oracle@O_SID" #строка для подключения к местной, аналитической базе 
network_link="SOURCE_DB" #Database Link что мы создали ранее для подключения к боевой базе
export_dir="export_dir" # директория для выгрузки/загрузки (см. выше)
exclude_objects="SEQUENCE,PROCEDURE,PACKAGE,TRIGGER,FUNCTION,SYNONYM,VIEW,TYPE,INDEX,GRANT,CONSTRAINT,STATISTICS,POST_INSTANCE" #Объекты, которые мы копировать не хотим - оставим только таблицы
remap_tablespace="WMSDB_ARC_TS:WMS_TS,WMSDB_ARC_IDX:WMS_TS,WMSDB_TS:WMS_TS,WMSDB_IDX:WMS_TS" #На боевой БД используются разные tablespaces, на аналитике достаточно одного
local_scheme="ORACLE" # Все схемы будем переводить в одну, нашу

Вообще, финальную полную версию скрипта можно найти в конце статьи, здесь же я постараюсь прояснить основные функции. Их, помимо прочего, можно использовать и в других скриптах.

Функция export_table.

На входе функция принимает два параметра — две строки. Первая — это имя_схемы.название_таблицы в боевой базе. Вторая — либо «data_only» либо «metadata_only». По хорошему, должна быть либо «» либо «meta», но тогда это не пришло мне в голову. «metadata_only» означает, что мы хотим только описание таблицы, с её полями, констрейнтами,  и т.п., но импортировать данные не собираемся — экономим место и время. «data_only», соответственно, выгружает из боевой таблицы только данные. При вызове функции удаляется файл дампа, и создается новый. Если что-то пойдет не так, функция возвратит ошибку, и мы её перехватим при вызове.

# table_name, data_only|metadata_only
export_table() {
    table_name="$1"
    dt=`date +"%F_%H.%M.%S.%N"`
    log_name="$table_name.$dt.export.log"

    if [ -f /u01/app/oracle/oradata/export/"$table_name.dmp" ]
    then
        rm /u01/app/oracle/oradata/export/"$table_name.dmp"
    fi

    expdp "$conn_str" full=N content=$2 tables="$table_name" network_link="$network_link" directory="$export_dir" dumpfile="$table_name.dmp" logfile="$log_name" exclude="$exclude_objects" > /dev/null
    echo $?
}

функция import_table.

Принимает две строки. Первая — это имя_схемы.название_таблицы в боевой базе. Во время работы разбивает эту строку на схему и таблицу, и при импорте конвертирует схему в нашу, аналитическую. Второй параметр — это префикс вновь создаваемой строки. Либо префикса нет — и таблица создается с нуля, либо префикс «tmp_» — для импорта во временную таблицу и последующего слияния. При импорте во временную таблицу выполняется её truncate.

#table_name, ""|tmp_
import_table() {
    table_name="$1"
    dt=`date +"%F_%H.%M.%S.%N"`
    real_table_name=$(extract_table_name $1)    
    real_scheme=$(extract_scheme $1)    
    log_name="$table_name.$dt.import.log"

    impdp "$conn_str" tables="$table_name" directory=export_dir dumpfile="$table_name.dmp" logfile="$log_name" TABLE_EXISTS_ACTION=TRUNCATE  REMAP_SCHEMA=$real_scheme:$local_scheme REMAP_TABLESPACE="$remap_tablespace" REMAP_TABLE="$table_name":"$2$real_table_name"
    echo $?
}

Функция merge_tables.

Слияние временной и основной таблиц — добавляет в основную таблицу данные, добавленные после предыдущего импорта. Думаю, конструкция SQL merge в данном случае подошла бы больше, но зато так работает ощутимо шустро.

merge_tables() {

    rel_key_name='id'
    t_name=$(extract_table_name $curr_table)
 
    case "$t_name" in
        "conv_stat" ) rel_key_name='pick_id';;
        "loc_types" ) rel_key_name='name';;
        "rst_min_af_sku_qty" ) rel_key_name='sku_id';;
        "rst_min_fp_sku_qty" ) rel_key_name='sku_id';;
        "work_type" ) rel_key_name='name';;
        * ) rel_key_name='id';;
    esac

    echo "insert into $t_name select * from tmp_$t_name T2 where not exists (select 1 from $t_name T1 WHERE T1.$rel_key_name=T2.$rel_key_name)"
}

Вспомогательные функции

функции extract_*

Разделяют строку вида «schema.table_name» на схему и название таблицы соответственно

extract_table_name() {
    echo $1 | cut -d'.' -f 2
}

extract_scheme () {
    echo $1 | cut -d'.' -f 1
}

Функция sql_query.

Выполняем запросы к локальной, аналитической базе, используя sqlplus.

sql_query() {
    q="$1"
    res=`sqlplus -s "$conn_str" << EOF
        set pages 0
        set head off
        set feed off
        $q;
EOF`
    echo $res
}

Функция clear_tmp_table.

Очистка времнной таблицы после импорта данных — для экономии места в датафайлах.

clear_tmp_table() {
    t_name=$(extract_table_name $curr_table)
    echo "truncate table tmp_$t_name"
}

Ну и непосредственно кусок, отвечающий за логику работы и вызов всех этих функций:

#Проверяем, существует ли уже таблица:

echo "Check if exists table as well as tmp_table..."
table_exists=$(sql_query "SELECT if_table_exists('$curr_table',0) FROM DUAL") # сама таблица
tmp_table_exists=$(sql_query "SELECT if_table_exists('$curr_table',1) FROM DUAL") # Её временная копия
echo "table_exists: $table_exists, tmp_table_exists $tmp_table_exists"

tables=$(($table_exists+$tmp_table_exists))

#One or both tables are not exist
if [ "$table_exists" == 0 -o "$tmp_table_exists" == 0 ]
then
    echo "Some of them do now exist, trying to export table metadata"
    meta=$(export_table $curr_table "metadata_only") # Одной из них не существует, возьмем свежие метаданные из боевой базы
    if [ $meta == 0 ] # Экпорт метаданных прошел успешно
    then
        if [ $table_exists == 0 ] # Не существует постоянной таблицы, надо создать
        then
            echo "importing metadata for main table..."
            res=$(import_table "$curr_table" "")
            if [ $res == 0 ] # успешно создали постоянную таблицу
            then
                tables=$(($tables+1))
            else
                echo "Could't create main table"
            fi
        fi

        if [ $tmp_table_exists == 0 ] # не существует временной таблицы, надо создать, с новым префиксом
        then
            echo "importing metadata for tmp_table..."
            res=$(import_table "$curr_table" "tmp_")
            if [ $res == 0 ]
            then
                tables=$(($tables+1))
            else
                echo "Couldn't create tmp_table"
            fi
        fi
    fi
fi

#обе таблицы существуют
if [ $tables == 2 ] 
then
    echo "Everything is OK, let's update"
    echo "Trying to export data for $curr_table"
    res=$(export_table "$curr_table" "data_only") # импортируем свежие данные
    if [ $res == 0 ]
    then
        echo "Exporting data went well, now let's import it"
        res=$(import_table $curr_table "tmp_") # импортируем во временную таблицу
        if [ $res == 0 ]
        then # импорт прошел успешно
            # echo "import went OK, now merge it"
            r=$(sql_query "$(merge_tables)") # производим слияние таблиц
            if [ "$r" == "" ]
            then # слияние прошло успешно
                echo "Merge went OK, clearing tmp table"
                tr=$(sql_query "$(clear_tmp_table)") # очищаем временную тсблицу
                if [ "$tr" == "" ]
                then
                    echo "tmp table cleared, finishing"
                else
                    echo "Couldn't truncate tmp table, what a pity.. '$tr'"
                fi
            else # слияние прошло неуспешно
                echo "Couldn't merge: $r" 
            fi
        else # не удалось импортировать
            echo "import failed: $res"
        fi
    else # не удалось выгрузить данные
        echo "Couldn't export data: $res"        
    fi
fi

Теперь, когда скрипт готов, добавляем задачи в крон, и смотрим, как заполняются наши таблицы.

crontab -e

0 8 * * * /home/oracle/scripts/export.sh wms_archive.oldpick_detail
0 7 * * * /home/oracle/scripts/export.sh analyst.conv_stat
0 6 * * * /home/oracle/scripts/export.sh analyst.ptl_stat
0 5 * * * /home/oracle/scripts/export.sh wms.loc_types
0 4 * * * /home/oracle/scripts/export.sh wms_archive.oldpick
0 3 * * * /home/oracle/scripts/export.sh wms.work_type
10 3 * * * /home/oracle/scripts/export.sh analyst.rst_min_af_sku_qty
20 3 * * * /home/oracle/scripts/export.sh analyst.rst_min_fp_sku_qty
0 2 * * * /home/oracle/scripts/export.sh wms_archive.oldcounting_diff
0 9 * * * /home/oracle/scripts/export.sh analyst.r_workers_shifts
10 9 * * * /home/oracle/scripts/export.sh analyst.rst_multi_info
20 9 * * * /home/oracle/scripts/export.sh analyst.rst_printman_info
35 19 * * * /home/oracle/scripts/export.sh wms_archive.oldprint_jobs

Вот и всё!

О том, как отправлять результаты работы на почту, выводить в виджеты, и других способах мониторинга, читайте в других статьях.

Вот итоговая версия скрипта:

#!/bin/bash

if [ "$1" == "" ]
then
    echo "USING: export.sh scheme.table_name"
    exit 1
fi

export NLS_LANG=AMERICAN_AMERICA.AL32UTF8
export LD_LIBRARY_PATH=/u01/app/oracle/product/11.2.0/dbhome_1/lib
export ORACLE_SID=mfass
export ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1
export PATH=$PATH:$ORACLE_HOME/bin

curr_table="$1"

conn_str="oracle/oracle@mfass" #analytic database connect data
network_link="WMS" #remote database TNS record
export_dir="export_dir" # directory created in analytic DB for import/export
exclude_objects="SEQUENCE,PROCEDURE,PACKAGE,TRIGGER,FUNCTION,SYNONYM,VIEW,TYPE,INDEX,GRANT,CONSTRAINT,STATISTICS,POST_INSTANCE"
remap_tablespace="WMSDB_ARC_TS:WMS_TS,WMSDB_ARC_IDX:WMS_TS,WMSDB_TS:WMS_TS,WMSDB_IDX:WMS_TS" #move to local tablespaces
local_scheme="ORACLE"

# table_name, data_only|metadata_only
export_table() {
    table_name="$1"
    dt=`date +"%F_%H.%M.%S.%N"`
    log_name="$table_name.$dt.export.log"

    if [ -f /u01/app/oracle/oradata/export/"$table_name.dmp" ]
    then
        rm /u01/app/oracle/oradata/export/"$table_name.dmp"
    fi

    expdp "$conn_str" full=N content=$2 tables="$table_name" network_link="$network_link" directory="$export_dir" dumpfile="$table_name.dmp" logfile="$log_name" exclude="$exclude_objects" > /dev/null
    echo $?
}

#table_name, ""|tmp_
import_table() {
    table_name="$1"
    dt=`date +"%F_%H.%M.%S.%N"`
    real_table_name=$(extract_table_name $1)    
    real_scheme=$(extract_scheme $1)    
    log_name="$table_name.$dt.import.log"

    impdp "$conn_str" tables="$table_name" directory=export_dir dumpfile="$table_name.dmp" logfile="$log_name" TABLE_EXISTS_ACTION=TRUNCATE  REMAP_SCHEMA=$real_scheme:$local_scheme REMAP_TABLESPACE="$remap_tablespace" REMAP_TABLE="$table_name":"$2$real_table_name"
    echo $?
}

extract_table_name() {
    echo $1 | cut -d'.' -f 2
}

extract_scheme () {
    echo $1 | cut -d'.' -f 1
}

sql_query() {
    q="$1"
    res=`sqlplus -s "$conn_str" << EOF
        set pages 0
        set head off
        set feed off
        $q;
EOF`
    echo $res
}

merge_tables() {

    rel_key_name='id'
    t_name=$(extract_table_name $curr_table)
    #  usually we merge tables, considering they have unique ID field, but some tables don't;
    #  therefore we have to point out another unique field for them.     
    case "$t_name" in
        "conv_stat" ) rel_key_name='pick_id';;
        "loc_types" ) rel_key_name='name';;
        "rst_min_af_sku_qty" ) rel_key_name='sku_id';;
        "rst_min_fp_sku_qty" ) rel_key_name='sku_id';;
        "work_type" ) rel_key_name='name';;
        * ) rel_key_name='id';;
    esac

    echo "insert into $t_name select * from tmp_$t_name T2 where not exists (select 1 from $t_name T1 WHERE T1.$rel_key_name=T2.$rel_key_name)"
}

clear_tmp_table() {
    t_name=$(extract_table_name $curr_table)
    echo "truncate table tmp_$t_name"
}

# check if tables are not exist in the analytics

echo "Check if exists table as well as tmp_table..."
table_exists=$(sql_query "SELECT if_table_exists('$curr_table',0) FROM DUAL")
tmp_table_exists=$(sql_query "SELECT if_table_exists('$curr_table',1) FROM DUAL")
echo "table_exists: $table_exists, tmp_table_exists $tmp_table_exists"

tables=$(($table_exists+$tmp_table_exists))

if [ "$table_exists" == 0 -o "$tmp_table_exists" == 0 ]
then
    echo "Some of them do now exist, trying to export table metadata"
    meta=$(export_table $curr_table "metadata_only")
    if [ $meta == 0 ]
    then
        if [ $table_exists == 0 ]
        then
            echo "importing metadata for main table..."
            res=$(import_table "$curr_table" "")
            if [ $res == 0 ]
            then
                tables=$(($tables+1))
            else
                echo "Could't create main table"
            fi
        fi

        if [ $tmp_table_exists == 0 ]
        then
            echo "importing metadata for tmp_table..."
            res=$(import_table "$curr_table" "tmp_")
            if [ $res == 0 ]
            then
                tables=$(($tables+1))
            else
                echo "Couldn't create tmp_table"
            fi
        fi
    fi
fi

if [ $tables == 2 ]
then
    echo "Everything is OK, let's update"
    echo "Trying to export data for $curr_table"
    res=$(export_table "$curr_table" "data_only")
    if [ $res == 0 ]
    then
        echo "Exporting data went well, now let's import it"
        res=$(import_table $curr_table "tmp_")
        if [ $res == 0 ]
        then
            echo "import went OK, now merge it"
            r=$(sql_query "$(merge_tables)")
            if [ "$r" == "" ]
            then
                echo "Merge went OK, clearing tmp table"
                tr=$(sql_query "$(clear_tmp_table)")
                if [ "$tr" == "" ]
                then
                    echo "tmp table cleared, finishing"
                else
                    echo "Couldn't truncate tmp table, what a pity.. '$tr'"
                fi
            else
                echo "Couldn't merge: $r" 
            fi
        else
            echo "import failed: $res"
        fi
    else
        echo "Couldn't export data: $res"        
    fi
fi
Категория: Программирование
Последнее изменение:


Крипто-кошельки для помощи и благодарности проекту:

Bitcoin адрес проекта: [[address]]

Перевод на сумму [[value]] BTC получен. Спасибо!.
[[error]]

Ethereum адрес проекта: [[address]]



Комментарии
Пожалуйста, авторизуйтесь, что бы оставить свой комментарий