My Project
|
00001 /* Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved. 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; version 2 of the License. 00006 00007 This program is distributed in the hope that it will be useful, 00008 but WITHOUT ANY WARRANTY; without even the implied warranty of 00009 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00010 GNU General Public License for more details. 00011 00012 You should have received a copy of the GNU General Public License 00013 along with this program; if not, write to the Free Software Foundation, 00014 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ 00015 00016 #ifndef RPL_HANDLER_H 00017 #define RPL_HANDLER_H 00018 00019 #include "sql_priv.h" 00020 #include "rpl_gtid.h" 00021 #include "rpl_mi.h" 00022 #include "rpl_rli.h" 00023 #include "sql_plugin.h" 00024 #include "replication.h" 00025 00026 class Observer_info { 00027 public: 00028 void *observer; 00029 st_plugin_int *plugin_int; 00030 plugin_ref plugin; 00031 00032 Observer_info(void *ob, st_plugin_int *p) 00033 :observer(ob), plugin_int(p) 00034 { 00035 plugin= plugin_int_to_ref(plugin_int); 00036 } 00037 }; 00038 00039 class Delegate { 00040 public: 00041 typedef List<Observer_info> Observer_info_list; 00042 typedef List_iterator<Observer_info> Observer_info_iterator; 00043 00044 int add_observer(void *observer, st_plugin_int *plugin) 00045 { 00046 int ret= FALSE; 00047 if (!inited) 00048 return TRUE; 00049 write_lock(); 00050 Observer_info_iterator iter(observer_info_list); 00051 Observer_info *info= iter++; 00052 while (info && info->observer != observer) 00053 info= iter++; 00054 if (!info) 00055 { 00056 info= new Observer_info(observer, plugin); 00057 if (!info || observer_info_list.push_back(info, &memroot)) 00058 ret= TRUE; 00059 } 00060 else 00061 ret= TRUE; 00062 unlock(); 00063 return ret; 00064 } 00065 00066 int remove_observer(void *observer, st_plugin_int *plugin) 00067 { 00068 int ret= FALSE; 00069 if (!inited) 00070 return TRUE; 00071 write_lock(); 00072 Observer_info_iterator iter(observer_info_list); 00073 Observer_info *info= iter++; 00074 while (info && info->observer != observer) 00075 info= iter++; 00076 if (info) 00077 { 00078 iter.remove(); 00079 delete info; 00080 } 00081 else 00082 ret= TRUE; 00083 unlock(); 00084 return ret; 00085 } 00086 00087 inline Observer_info_iterator observer_info_iter() 00088 { 00089 return Observer_info_iterator(observer_info_list); 00090 } 00091 00092 inline bool is_empty() 00093 { 00094 DBUG_PRINT("debug", ("is_empty: %d", observer_info_list.is_empty())); 00095 return observer_info_list.is_empty(); 00096 } 00097 00098 inline int read_lock() 00099 { 00100 if (!inited) 00101 return TRUE; 00102 return mysql_rwlock_rdlock(&lock); 00103 } 00104 00105 inline int write_lock() 00106 { 00107 if (!inited) 00108 return TRUE; 00109 return mysql_rwlock_wrlock(&lock); 00110 } 00111 00112 inline int unlock() 00113 { 00114 if (!inited) 00115 return TRUE; 00116 return mysql_rwlock_unlock(&lock); 00117 } 00118 00119 inline bool is_inited() 00120 { 00121 return inited; 00122 } 00123 00124 Delegate( 00125 #ifdef HAVE_PSI_INTERFACE 00126 PSI_rwlock_key key 00127 #endif 00128 ) 00129 { 00130 inited= FALSE; 00131 #ifdef HAVE_PSI_INTERFACE 00132 if (mysql_rwlock_init(key, &lock)) 00133 return; 00134 #else 00135 if (mysql_rwlock_init(0, &lock)) 00136 return; 00137 #endif 00138 init_sql_alloc(&memroot, 1024, 0); 00139 inited= TRUE; 00140 } 00141 ~Delegate() 00142 { 00143 inited= FALSE; 00144 mysql_rwlock_destroy(&lock); 00145 free_root(&memroot, MYF(0)); 00146 } 00147 00148 private: 00149 Observer_info_list observer_info_list; 00150 mysql_rwlock_t lock; 00151 MEM_ROOT memroot; 00152 bool inited; 00153 }; 00154 00155 #ifdef HAVE_PSI_INTERFACE 00156 extern PSI_rwlock_key key_rwlock_Trans_delegate_lock; 00157 #endif 00158 00159 class Trans_delegate 00160 :public Delegate { 00161 public: 00162 00163 Trans_delegate() 00164 : Delegate( 00165 #ifdef HAVE_PSI_INTERFACE 00166 key_rwlock_Trans_delegate_lock 00167 #endif 00168 ) 00169 {} 00170 00171 typedef Trans_observer Observer; 00172 int before_commit(THD *thd, bool all); 00173 int before_rollback(THD *thd, bool all); 00174 int after_commit(THD *thd, bool all); 00175 int after_rollback(THD *thd, bool all); 00176 }; 00177 00178 #ifdef HAVE_PSI_INTERFACE 00179 extern PSI_rwlock_key key_rwlock_Binlog_storage_delegate_lock; 00180 #endif 00181 00182 class Binlog_storage_delegate 00183 :public Delegate { 00184 public: 00185 00186 Binlog_storage_delegate() 00187 : Delegate( 00188 #ifdef HAVE_PSI_INTERFACE 00189 key_rwlock_Binlog_storage_delegate_lock 00190 #endif 00191 ) 00192 {} 00193 00194 typedef Binlog_storage_observer Observer; 00195 int after_flush(THD *thd, const char *log_file, 00196 my_off_t log_pos); 00197 }; 00198 00199 #ifdef HAVE_REPLICATION 00200 #ifdef HAVE_PSI_INTERFACE 00201 extern PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock; 00202 #endif 00203 00204 class Binlog_transmit_delegate 00205 :public Delegate { 00206 public: 00207 00208 Binlog_transmit_delegate() 00209 : Delegate( 00210 #ifdef HAVE_PSI_INTERFACE 00211 key_rwlock_Binlog_transmit_delegate_lock 00212 #endif 00213 ) 00214 {} 00215 00216 typedef Binlog_transmit_observer Observer; 00217 int transmit_start(THD *thd, ushort flags, 00218 const char *log_file, my_off_t log_pos, 00219 bool *observe_transmission); 00220 int transmit_stop(THD *thd, ushort flags); 00221 int reserve_header(THD *thd, ushort flags, String *packet); 00222 int before_send_event(THD *thd, ushort flags, 00223 String *packet, const 00224 char *log_file, my_off_t log_pos ); 00225 int after_send_event(THD *thd, ushort flags, 00226 String *packet, const char *skipped_log_file, 00227 my_off_t skipped_log_pos); 00228 int after_reset_master(THD *thd, ushort flags); 00229 }; 00230 00231 #ifdef HAVE_PSI_INTERFACE 00232 extern PSI_rwlock_key key_rwlock_Binlog_relay_IO_delegate_lock; 00233 #endif 00234 00235 class Binlog_relay_IO_delegate 00236 :public Delegate { 00237 public: 00238 00239 Binlog_relay_IO_delegate() 00240 : Delegate( 00241 #ifdef HAVE_PSI_INTERFACE 00242 key_rwlock_Binlog_relay_IO_delegate_lock 00243 #endif 00244 ) 00245 {} 00246 00247 typedef Binlog_relay_IO_observer Observer; 00248 int thread_start(THD *thd, Master_info *mi); 00249 int thread_stop(THD *thd, Master_info *mi); 00250 int before_request_transmit(THD *thd, Master_info *mi, ushort flags); 00251 int after_read_event(THD *thd, Master_info *mi, 00252 const char *packet, ulong len, 00253 const char **event_buf, ulong *event_len); 00254 int after_queue_event(THD *thd, Master_info *mi, 00255 const char *event_buf, ulong event_len, 00256 bool synced); 00257 int after_reset_slave(THD *thd, Master_info *mi); 00258 private: 00259 void init_param(Binlog_relay_IO_param *param, Master_info *mi); 00260 }; 00261 #endif /* HAVE_REPLICATION */ 00262 00263 int delegates_init(); 00264 void delegates_destroy(); 00265 00266 extern Trans_delegate *transaction_delegate; 00267 extern Binlog_storage_delegate *binlog_storage_delegate; 00268 #ifdef HAVE_REPLICATION 00269 extern Binlog_transmit_delegate *binlog_transmit_delegate; 00270 extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; 00271 #endif /* HAVE_REPLICATION */ 00272 00273 /* 00274 if there is no observers in the delegate, we can return 0 00275 immediately. 00276 */ 00277 #define RUN_HOOK(group, hook, args) \ 00278 (group ##_delegate->is_empty() ? \ 00279 0 : group ##_delegate->hook args) 00280 00281 #endif /* RPL_HANDLER_H */