Monday, June 30, 2014

How to Implement Audit Functionality In PostgreSQL

As I can remember 6 years back in 2008 Dec. we were researching on a better audit module to audit all transactions (Create, Modify, Delete) in PostgreSQL 8.0.3. It was a Friday which I started to implement this and I spent couple of days including Saturday and Sunday to get this completed. By Monday Morning I was able to implement the audit module to the existing product. Since then the time I'm writing this Blog still the audit functionality is working perfectly without any issues even with upgrade of the PostgreSQL versions.

So I though share this experience with you'll as when we eager to do something and also when we have the determination and courage, we are very much keen on to get things done whether it is weekend or 1 - 2 am in the morning. I know most of the Software Development enthusiastic would agree with me.

While I was researching on this, I found a way to implement the audit module with PL/Tcl language.
PL/Tcl is a loadable procedural language for the PostgreSQL database system that enables the Tcl language to be used to write functions and trigger procedures.

Please allow me to explain how we can get this implemented easily. Steps 1 - 3 has to be implemented only once. So thereafter if you need to implement audit then follow steps 4 - 5 (Create the Audit Table and Trigger). Pretty easy ha. Let's start...

  • Step 1 - In order to implement the audit module install the PL/TCL language.
 
createlang -U postgres pltcl <DB-Name>
  • Step 2 - Create the Function -  log_to_audit_table
Function          Arguments     Returns   Programming language
log_to_audit_table             "trigger"   pltcl
Definition
 
#This function crated to audit the data for all the modules
#Created By : Kosala De Silva
#Created On : 12 - Dec - 2008

spi_exec "SELECT CURRENT_USER AS tguser"
spi_exec "SELECT c.relname AS tgname,n.nspname AS schema_name
           FROM pg_class c , pg_namespace n
           WHERE n.oid = c.relnamespace
           AND relfilenode = $TG_relid"

 
if {[string equal -nocase $tgname audit_table]} { return OK }

set pk_name ""

 spi_exec "SELECT a.attname AS pk_name FROM pg_class c, pg_attribute a, pg_index i
 WHERE c.relname = '$tgname'
 AND c.oid=i.indrelid
 AND a.attnum > 0
 AND a.attrelid = i.indexrelid
 AND i.indisprimary='t'"

  spi_exec " select substring('$schema_name',0,instr('$schema_name','_', 1))||'_admin'
  AS main_schema"

 spi_exec "SELECT audit_table_name AS m_aud_tbl_name
 FROM $main_schema.audit_table_mapping
 where schema_name = '$schema_name'
 and tabel_name = '$tgname'"

switch $TG_op {
INSERT {
  set pk_value ""

  #get PK value
  foreach field $TG_relatts {
    if {[string equal -nocase [lindex [array get NEW $field] 0] $pk_name]} {
      set pk_value [lindex [array get NEW $field] 1]
      break;
    }
  }
  #log inserted row values
  foreach field $TG_relatts {
    if {! [string equal -nocase [lindex [array get NEW $field] 0] $pk_name]} {
      set modified_field [lindex [array get NEW $field] 0]
      set current_value [lindex [array get NEW $field] 1]
      spi_exec -array C "INSERT INTO $schema_name.$m_aud_tbl_name
         (time_stamp,pg_user,tbl_name,fld_name,primarykey_name,primarykey_value,modify_type,old_value,new_value)
        VALUES (CURRENT_TIMESTAMP, '$tguser', '$tgname', '$modified_field', '$pk_name', '$pk_value', '$TG_op', NULL, '$current_value')"
    }
  }
}
UPDATE {
  set pk_value ""

  #get PK value
  foreach field $TG_relatts {
    if {[string equal -nocase [lindex [array get NEW $field] 0] $pk_name]} {
      set pk_value [lindex [array get NEW $field] 1]
      break;
    }
  }
  #log inserted row values
  foreach field $TG_relatts {
    #check changed fields
    if {[string equal -nocase [array get NEW $field] [array get OLD $field]] == 0} {
      set modified_field [lindex [array get OLD $field] 0]
      if {[string compare $modified_field ""] == 0} {
        set modified_field [lindex  [array get NEW $field] 0]
      }
      set previous_value [lindex [array get OLD $field] 1]
      set current_value  [lindex [array get NEW $field] 1]
      spi_exec -array C "INSERT INTO $schema_name.$m_aud_tbl_name
        (time_stamp,pg_user,tbl_name,fld_name,primarykey_name,primarykey_value,modify_type,old_value,new_value)
        VALUES (CURRENT_TIMESTAMP, '$tguser', '$tgname', '$modified_field', '$pk_name', '$pk_value', '$TG_op', '$previous_value', '$current_value')"
    }
  }
}
DELETE {
  set pk_value ""

  #get PK value
  foreach field $TG_relatts {
    if {[string equal -nocase [lindex [array get OLD $field] 0] $pk_name]} {
      set pk_value [lindex [array get OLD $field] 1]
      break;
    }
  }
  #log inserted row values
  foreach field $TG_relatts {
    if {! [string equal -nocase [lindex [array get OLD $field] 0] $pk_name]} {
      set modified_field [lindex [array get OLD $field] 0]
      set previous_value [lindex [array get OLD $field] 1]
      spi_exec -array C "INSERT INTO $schema_name.$m_aud_tbl_name
        (time_stamp,pg_user,tbl_name,fld_name,primarykey_name,primarykey_value,modify_type,old_value,new_value)
        VALUES (CURRENT_TIMESTAMP, '$tguser', '$tgname', '$modified_field', '$pk_name', '$pk_value', '$TG_op', '$previous_value', NULL)"
    }
  }
}
}
return OK


Note : 
Main function should be created in the public schema. This function can be used for any system audit tracking. ex. "public"."log_to_audit_table" () 
This should be executed only once per DB server.

  • Step 3 - Create a table called audit_table_mapping
CREATE TABLE "<Schema Name>"."audit_table_mapping" (
  "id" INTEGER NOT NULL, 
  "schema_name" VARCHAR(100), 
  "tabel_name" VARCHAR(200), 
  "audit_table_name" VARCHAR(200), 
  CONSTRAINT "audit_table_mapping_pkey" PRIMARY KEY("id")
);
Note : 
When you implement the audit for each table there should be a mapping record in this table as one audit mapping table can be used to audit multiple tables.
DDL script should be executed only once.
A new record should be inserted for each audit table.


  • Step 4 - Create the Audit table which you need to audit data

  • CREATE TABLE "<Schema Name>"."<audit_Table Name>" (
      "time_stamp" TIMESTAMP WITH TIME ZONE,
      "pg_user" VARCHAR(30),
      "tbl_name" VARCHAR(30),
      "fld_name" VARCHAR(30),
      "primarykey_name" VARCHAR(30),
      "primarykey_value" VARCHAR(40),
      "modify_type" CHAR(6),
      "old_value" TEXT,
      "new_value" TEXT
    ) ;
    Note:
    You can use the same DDL script to create other audit tables as well. What you have to do is just change only the audit table name. ex. audit_[table name]

    • Step 5 - Execute the Trigger
     
    CREATE TRIGGER "trig_audit_<audit_Table Name>"
    AFTER INSERT OR DELETE OR UPDATE ON "<Schema Name>"."<Table Name>"
    FOR EACH ROW
    EXECUTE PROCEDURE "log_to_audit_table" ();
    
    Note:
    Trigger name should be created as follows. trig_audit_[table name]

    Once above steps are done you are good to log data. So when you try to insert / modify / delete that will be audited in the audit table. The advantage of using this functionality is that the audit table will have only the modified data with the old value and the new value. 
    
    

    No comments:

    Post a Comment

    Note: Only a member of this blog may post a comment.