My Project
rpl_handler.h
00001 /* Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved.
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software Foundation,
00014    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
00015 
00016 #ifndef RPL_HANDLER_H
00017 #define RPL_HANDLER_H
00018 
00019 #include "sql_priv.h"
00020 #include "rpl_gtid.h"
00021 #include "rpl_mi.h"
00022 #include "rpl_rli.h"
00023 #include "sql_plugin.h"
00024 #include "replication.h"
00025 
00026 class Observer_info {
00027 public:
00028   void *observer;
00029   st_plugin_int *plugin_int;
00030   plugin_ref plugin;
00031 
00032   Observer_info(void *ob, st_plugin_int *p)
00033     :observer(ob), plugin_int(p)
00034   {
00035     plugin= plugin_int_to_ref(plugin_int);
00036   }
00037 };
00038 
00039 class Delegate {
00040 public:
00041   typedef List<Observer_info> Observer_info_list;
00042   typedef List_iterator<Observer_info> Observer_info_iterator;
00043   
00044   int add_observer(void *observer, st_plugin_int *plugin)
00045   {
00046     int ret= FALSE;
00047     if (!inited)
00048       return TRUE;
00049     write_lock();
00050     Observer_info_iterator iter(observer_info_list);
00051     Observer_info *info= iter++;
00052     while (info && info->observer != observer)
00053       info= iter++;
00054     if (!info)
00055     {
00056       info= new Observer_info(observer, plugin);
00057       if (!info || observer_info_list.push_back(info, &memroot))
00058         ret= TRUE;
00059     }
00060     else
00061       ret= TRUE;
00062     unlock();
00063     return ret;
00064   }
00065   
00066   int remove_observer(void *observer, st_plugin_int *plugin)
00067   {
00068     int ret= FALSE;
00069     if (!inited)
00070       return TRUE;
00071     write_lock();
00072     Observer_info_iterator iter(observer_info_list);
00073     Observer_info *info= iter++;
00074     while (info && info->observer != observer)
00075       info= iter++;
00076     if (info)
00077     {
00078       iter.remove();
00079       delete info;
00080     }
00081     else
00082       ret= TRUE;
00083     unlock();
00084     return ret;
00085   }
00086 
00087   inline Observer_info_iterator observer_info_iter()
00088   {
00089     return Observer_info_iterator(observer_info_list);
00090   }
00091 
00092   inline bool is_empty()
00093   {
00094     DBUG_PRINT("debug", ("is_empty: %d", observer_info_list.is_empty()));
00095     return observer_info_list.is_empty();
00096   }
00097 
00098   inline int read_lock()
00099   {
00100     if (!inited)
00101       return TRUE;
00102     return mysql_rwlock_rdlock(&lock);
00103   }
00104 
00105   inline int write_lock()
00106   {
00107     if (!inited)
00108       return TRUE;
00109     return mysql_rwlock_wrlock(&lock);
00110   }
00111 
00112   inline int unlock()
00113   {
00114     if (!inited)
00115       return TRUE;
00116     return mysql_rwlock_unlock(&lock);
00117   }
00118 
00119   inline bool is_inited()
00120   {
00121     return inited;
00122   }
00123 
00124   Delegate(
00125 #ifdef HAVE_PSI_INTERFACE
00126            PSI_rwlock_key key
00127 #endif
00128            )
00129   {
00130     inited= FALSE;
00131 #ifdef HAVE_PSI_INTERFACE
00132     if (mysql_rwlock_init(key, &lock))
00133       return;
00134 #else
00135     if (mysql_rwlock_init(0, &lock))
00136       return;
00137 #endif
00138     init_sql_alloc(&memroot, 1024, 0);
00139     inited= TRUE;
00140   }
00141   ~Delegate()
00142   {
00143     inited= FALSE;
00144     mysql_rwlock_destroy(&lock);
00145     free_root(&memroot, MYF(0));
00146   }
00147 
00148 private:
00149   Observer_info_list observer_info_list;
00150   mysql_rwlock_t lock;
00151   MEM_ROOT memroot;
00152   bool inited;
00153 };
00154 
00155 #ifdef HAVE_PSI_INTERFACE
00156 extern PSI_rwlock_key key_rwlock_Trans_delegate_lock;
00157 #endif
00158 
00159 class Trans_delegate
00160   :public Delegate {
00161 public:
00162 
00163   Trans_delegate()
00164   : Delegate(
00165 #ifdef HAVE_PSI_INTERFACE
00166              key_rwlock_Trans_delegate_lock
00167 #endif
00168              )
00169   {}
00170 
00171   typedef Trans_observer Observer;
00172   int before_commit(THD *thd, bool all);
00173   int before_rollback(THD *thd, bool all);
00174   int after_commit(THD *thd, bool all);
00175   int after_rollback(THD *thd, bool all);
00176 };
00177 
00178 #ifdef HAVE_PSI_INTERFACE
00179 extern PSI_rwlock_key key_rwlock_Binlog_storage_delegate_lock;
00180 #endif
00181 
00182 class Binlog_storage_delegate
00183   :public Delegate {
00184 public:
00185 
00186   Binlog_storage_delegate()
00187   : Delegate(
00188 #ifdef HAVE_PSI_INTERFACE
00189              key_rwlock_Binlog_storage_delegate_lock
00190 #endif
00191              )
00192   {}
00193 
00194   typedef Binlog_storage_observer Observer;
00195   int after_flush(THD *thd, const char *log_file,
00196                   my_off_t log_pos);
00197 };
00198 
00199 #ifdef HAVE_REPLICATION
00200 #ifdef HAVE_PSI_INTERFACE
00201 extern PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock;
00202 #endif
00203 
00204 class Binlog_transmit_delegate
00205   :public Delegate {
00206 public:
00207 
00208   Binlog_transmit_delegate()
00209   : Delegate(
00210 #ifdef HAVE_PSI_INTERFACE
00211              key_rwlock_Binlog_transmit_delegate_lock
00212 #endif
00213              )
00214   {}
00215 
00216   typedef Binlog_transmit_observer Observer;
00217   int transmit_start(THD *thd, ushort flags,
00218                      const char *log_file, my_off_t log_pos,
00219                      bool *observe_transmission);
00220   int transmit_stop(THD *thd, ushort flags);
00221   int reserve_header(THD *thd, ushort flags, String *packet);
00222   int before_send_event(THD *thd, ushort flags,
00223                         String *packet, const
00224                         char *log_file, my_off_t log_pos );
00225   int after_send_event(THD *thd, ushort flags,
00226                        String *packet, const char *skipped_log_file,
00227                        my_off_t skipped_log_pos);
00228   int after_reset_master(THD *thd, ushort flags);
00229 };
00230 
00231 #ifdef HAVE_PSI_INTERFACE
00232 extern PSI_rwlock_key key_rwlock_Binlog_relay_IO_delegate_lock;
00233 #endif
00234 
00235 class Binlog_relay_IO_delegate
00236   :public Delegate {
00237 public:
00238 
00239   Binlog_relay_IO_delegate()
00240   : Delegate(
00241 #ifdef HAVE_PSI_INTERFACE
00242              key_rwlock_Binlog_relay_IO_delegate_lock
00243 #endif
00244              )
00245   {}
00246 
00247   typedef Binlog_relay_IO_observer Observer;
00248   int thread_start(THD *thd, Master_info *mi);
00249   int thread_stop(THD *thd, Master_info *mi);
00250   int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
00251   int after_read_event(THD *thd, Master_info *mi,
00252                        const char *packet, ulong len,
00253                        const char **event_buf, ulong *event_len);
00254   int after_queue_event(THD *thd, Master_info *mi,
00255                         const char *event_buf, ulong event_len,
00256                         bool synced);
00257   int after_reset_slave(THD *thd, Master_info *mi);
00258 private:
00259   void init_param(Binlog_relay_IO_param *param, Master_info *mi);
00260 };
00261 #endif /* HAVE_REPLICATION */
00262 
00263 int delegates_init();
00264 void delegates_destroy();
00265 
00266 extern Trans_delegate *transaction_delegate;
00267 extern Binlog_storage_delegate *binlog_storage_delegate;
00268 #ifdef HAVE_REPLICATION
00269 extern Binlog_transmit_delegate *binlog_transmit_delegate;
00270 extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
00271 #endif /* HAVE_REPLICATION */
00272 
00273 /*
00274   if there is no observers in the delegate, we can return 0
00275   immediately.
00276 */
00277 #define RUN_HOOK(group, hook, args)             \
00278   (group ##_delegate->is_empty() ?              \
00279    0 : group ##_delegate->hook args)
00280 
00281 #endif /* RPL_HANDLER_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines