SafeDispatch/UpdateHelper/generate_partitioning_of_an_existing_table_by_date_unix_time.sql

253 lines
10 KiB
PL/PgSQL

DROP FUNCTION IF EXISTS public.generate_partitioning_of_an_existing_table_by_date_unix_time(text, text, integer, integer, text, boolean);
CREATE OR REPLACE FUNCTION public.generate_partitioning_of_an_existing_table_by_date_unix_time(
tablename text,
columnname text,
start_year integer,
end_year integer,
step text,
droptable boolean DEFAULT false)
RETURNS void
LANGUAGE 'plpgsql'
VOLATILE PARALLEL UNSAFE
AS $BODY$
declare old_tablename TEXT;
new_tablename TEXT;
partition_tablename TEXT;
start_interval TEXT;
end_interval TEXT;
interations INTEGER;
i INTEGER;
sql_str TEXT = '';
receive_constraints TEXT;
receive_triggers TEXT;
toBeDeleted_constraints TEXT;
lastvalue INTEGER;
exists_sequence TEXT;
tmp_sql VARCHAR;
renamed_indexes TEXT;
renamed_constraints TEXT;
toBeCreated_indexes TEXT;
start_unix_time TEXT;
end_unix_time TEXT;
primary_key TEXT;
sequence_id TEXT;
BEGIN
IF start_year > end_year THEN
RAISE NOTICE '%', 'start_year > end_year';
RETURN;
END IF;
primary_key := '"posID"';
sequence_id := '_posID_seq';
new_tablename := 'new_' || tablename;
sql_str := format('-- create a new table having the same structure with specified table ' || CHR(13) ||
'CREATE TABLE ' || new_tablename || '(LIKE ' || tablename || ' INCLUDING DEFAULTS INCLUDING CONSTRAINTS) PARTITION BY RANGE(' || columnname || ');' || CHR(13) || CHR(13) ||
'-- create partiton for all other records that dont match with the partition condition' || CHR(13) ||
'CREATE TABLE ' || tablename || '_default PARTITION OF ' || new_tablename || ' DEFAULT; ') || CHR(13);
sql_str := sql_str || CHR(13) || get_query_partitioning_of_an_existing_table_by_date_unix_time(tablename , new_tablename, start_year, end_year, step);
sql_str := sql_str || CHR(13) ||
'-- copy data into the new table ' || CHR(13) ||
'INSERT INTO ' || new_tablename || '(SELECT * FROM ' || tablename || '); ' || CHR(13) ;
RAISE NOTICE '%', sql_str;
-- execute the query
EXECUTE sql_str;
--===========================================================
-- build the commands to rename the indexes
--===========================================================
old_tablename := tablename;
SELECT string_agg( 'ALTER INDEX IF EXISTS "' || indexname || '" RENAME TO "old_' || indexname || '";' , CHR(13)) INTO renamed_indexes
FROM pg_indexes pg
WHERE pg.tablename = old_tablename
AND indexname NOT LIKE '%pkey%';
SELECT string_agg( CASE WHEN contype = 'p' THEN -- primary key
'ALTER INDEX IF EXISTS "' || conname || '" RENAME TO "old_' || conname || '";'
ELSE
'ALTER TABLE ' || old_tablename || ' RENAME CONSTRAINT "' || conname || '" TO "old_' || conname || '";'
END
, CHR(13)) INTO renamed_constraints
FROM pg_catalog.pg_constraint con
INNER JOIN pg_catalog.pg_class rel ON rel.oid = con.conrelid
WHERE rel.relname = old_tablename;
--======================================================================
-- build the commands to recreate the indexes on the partitioned table
--======================================================================
SELECT string_agg( indexdef || ';' , CHR(13)) INTO toBeCreated_indexes
FROM pg_indexes pg
WHERE pg.tablename = old_tablename
AND indexname NOT LIKE '%pkey%'
AND indexname NOT LIKE '%' || primary_key || '%'
AND indexdef NOT LIKE '%UNIQUE%';
--===========================================================
-- save the foreign key into receive_constraints variable
--===========================================================
SELECT string_agg(command , CHR(13) ) INTO receive_constraints
FROM ( SELECT format('ALTER TABLE IF EXISTS "%s" ADD CONSTRAINT "%s" FOREIGN KEY ("%s") REFERENCES "%s"("%s");',
tablename,
tc.constraint_name,
string_agg(distinct kcu.column_name, ','),
ccu.table_name,
string_agg(distinct ccu.column_name, ',')) command
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE constraint_type = 'FOREIGN KEY'
AND tc.table_name = tablename
GROUP BY tc.constraint_name, tc.table_name, ccu.table_name
)x;
--===========================================================
-- save the triggers into receive_triggers variable
--===========================================================
SELECT string_agg(command , CHR(13) ) INTO receive_triggers
FROM ( SELECT format( 'CREATE TRIGGER %s %s %s ON "%s" FOR EACH ROW %s ;',
t1.trigger_name
,t1.action_timing
,t2.event_manipulation
,t1.event_object_table
,t1.action_statement ) as command
FROM ( SELECT DISTINCT
trigger_name
,action_timing
,event_object_table
,action_statement
FROM information_schema.triggers
WHERE event_object_table = tablename
) t1
INNER JOIN (SELECT trigger_name, string_agg(event_manipulation, ' OR ') as event_manipulation
FROM information_schema.triggers
WHERE event_object_table = tablename
GROUP BY trigger_name
) t2 ON t1.trigger_name = t2.trigger_name
)x;
--===================================================================================================================
-- save the constraints which refereces the specified tablename into toBeDeleted_constraints variable
--===================================================================================================================
SELECT string_agg( 'ALTER TABLE IF EXISTS "' || foreign_table || '" DROP CONSTRAINT IF EXISTS "' || constraint_name || '";' , CHR(13) ) INTO toBeDeleted_constraints
FROM (
select kcu.table_name as foreign_table,
'>-' as rel,
rel_tco.table_schema || '.' || rel_tco.table_name as primary_table,
string_agg(kcu.column_name, ', ') as fk_columns,
kcu.constraint_name
from information_schema.table_constraints tco
join information_schema.key_column_usage kcu
on tco.constraint_schema = kcu.constraint_schema
and tco.constraint_name = kcu.constraint_name
join information_schema.referential_constraints rco
on tco.constraint_schema = rco.constraint_schema
and tco.constraint_name = rco.constraint_name
join information_schema.table_constraints rel_tco
on rco.unique_constraint_schema = rel_tco.constraint_schema
and rco.unique_constraint_name = rel_tco.constraint_name
where tco.constraint_type = 'FOREIGN KEY'
and rel_tco.table_name = tablename
group by kcu.table_schema,
kcu.table_name,
rel_tco.table_name,
rel_tco.table_schema,
kcu.constraint_name
)x;
tmp_sql = ' SELECT ''exists'' FROM pg_class WHERE relname = ''' || tablename || sequence_id || ''';';
EXECUTE tmp_sql INTO exists_sequence;
--RAISE NOTICE '%', COALESCE( exists_sequence, 'not exists');
IF exists_sequence = 'exists' THEN
EXECUTE format(' SELECT last_value FROM "%s";', tablename || sequence_id) INTO lastvalue;
END IF;
IF ( dropTable ) THEN
sql_str := '--====== dropTable = true ======-- ' || CHR(13) || CHR(13) ||
'-- drop old table with cascade ' || CHR(13) ||
'DROP TABLE IF EXISTS ' || tablename || ' CASCADE; ' || CHR(13) ||
'-- rename the new table to old table ' || CHR(13) ||
'ALTER TABLE IF EXISTS ' || new_tablename || ' RENAME TO ' || tablename || ';' || CHR(13) ||
'-- create primary key ' || CHR(13) ||
'ALTER TABLE IF EXISTS ' || tablename || ' ADD CONSTRAINT ' || tablename || '_pkey PRIMARY KEY (' || primary_key || ',' || columnname || ');' || CHR(13) ||
'-- create sequence ' || CHR(13) ||
'CREATE SEQUENCE IF NOT EXISTS "' || tablename || sequence_id || '" INCREMENT 1 START ' || COALESCE(lastvalue,0) + 1 || ';' || CHR(13) ||
'-- set id to be taken from sequence ' || CHR(13) ||
'ALTER TABLE IF EXISTS ' || tablename || ' ALTER COLUMN ' || primary_key || ' SET DEFAULT nextval(''"' || tablename || sequence_id || '"'');' || CHR(13);
ELSE
sql_str := COALESCE( renamed_constraints, '') || CHR(13) ||
COALESCE( renamed_indexes, '') || CHR(13);
EXECUTE sql_str;
sql_str := '--====== dropTable = false ( rename table )======-- ' || CHR(13) || CHR(13) ||
'-- rename the current table to *_old ' || CHR(13) ||
'ALTER TABLE IF EXISTS ' || tablename || ' RENAME TO ' || tablename || '_old; ' || CHR(13) ||
'-- rename the new table ' || CHR(13) ||
'ALTER TABLE IF EXISTS ' || new_tablename || ' RENAME TO ' || tablename || ';' || CHR(13) ||
'-- create primary key ' || CHR(13) ||
'ALTER TABLE IF EXISTS ' || tablename || ' ADD CONSTRAINT ' || tablename || '_pkey PRIMARY KEY (' || primary_key || ',' || columnname || ');' || CHR(13) ||
'-- rename existing sequence as old ' || CHR(13) ||
'ALTER SEQUENCE IF EXISTS "' || tablename || sequence_id || '" RENAME TO "' || tablename || '_old' || sequence_id || '";' || CHR(13) ||
'-- create sequence ' || CHR(13) ||
'CREATE SEQUENCE IF NOT EXISTS "' || tablename || sequence_id || '" INCREMENT 1 START ' || COALESCE(lastvalue,0) + 1 || ';' || CHR(13) ||
'-- set id to be taken from sequence ' || CHR(13) ||
'ALTER TABLE IF EXISTS ' || tablename || '_old' || ' ALTER COLUMN ' || primary_key || ' SET DEFAULT nextval(''"' || tablename || '_old' || sequence_id || '"'');' || CHR(13) ||
'-- set id to be taken from sequence ' || CHR(13) ||
'ALTER TABLE IF EXISTS ' || tablename || ' ALTER COLUMN ' || primary_key || ' SET DEFAULT nextval(''"' || tablename || sequence_id || '"'');' || CHR(13);
END IF;
sql_str := sql_str || COALESCE(toBeDeleted_constraints, '') || CHR(13) || COALESCE(receive_constraints, '') || CHR(13) || CHR(13) || COALESCE(receive_triggers, '');
RAISE NOTICE '%', sql_str;
-- execute the query
EXECUTE sql_str;
sql_str := COALESCE(toBeCreated_indexes, '');
RAISE NOTICE '%', sql_str;
EXECUTE sql_str;
END;
$BODY$;