My Project
|
00001 /* Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights reserved. 00002 00003 This program is free software; you can redistribute it and/or modify 00004 it under the terms of the GNU General Public License as published by 00005 the Free Software Foundation; version 2 of the License. 00006 00007 This program is distributed in the hope that it will be useful, 00008 but WITHOUT ANY WARRANTY; without even the implied warranty of 00009 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00010 GNU General Public License for more details. 00011 00012 You should have received a copy of the GNU General Public License 00013 along with this program; if not, write to the Free Software 00014 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ 00015 00016 #ifndef RPL_RLI_PDB_H 00017 00018 #define RPL_RLI_PDB_H 00019 00020 #ifdef HAVE_REPLICATION 00021 00022 #include "sql_string.h" 00023 #include "rpl_rli.h" 00024 #include <my_sys.h> 00025 #include <my_bitmap.h> 00026 #include "rpl_slave.h" 00027 00039 /* Assigned Partition Hash (APH) entry */ 00040 typedef struct st_db_worker_hash_entry 00041 { 00042 uint db_len; 00043 const char *db; 00044 Slave_worker *worker; 00045 /* 00046 The number of transaction pending on this database. 00047 This should only be modified under the lock slave_worker_hash_lock. 00048 */ 00049 long usage; 00050 /* 00051 The list of temp tables belonging to @ db database is 00052 attached to an assigned @c worker to become its thd->temporary_tables. 00053 The list is updated with every ddl incl CREATE, DROP. 00054 It is removed from the entry and merged to the coordinator's 00055 thd->temporary_tables in case of events: slave stops, APH oversize. 00056 */ 00057 TABLE* volatile temporary_tables; 00058 00059 /* todo: relax concurrency to mimic record-level locking. 00060 That is to augmenting the entry with mutex/cond pair 00061 pthread_mutex_t 00062 pthread_cond_t 00063 timestamp updated_at; */ 00064 00065 } db_worker_hash_entry; 00066 00067 bool init_hash_workers(ulong slave_parallel_workers); 00068 void destroy_hash_workers(Relay_log_info*); 00069 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, 00070 db_worker_hash_entry **ptr_entry, 00071 bool need_temp_tables, Slave_worker *w); 00072 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers); 00073 int wait_for_workers_to_finish(Relay_log_info const *rli, 00074 Slave_worker *ignore= NULL); 00075 00076 #define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray 00077 00078 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2 00079 00080 typedef struct slave_job_item 00081 { 00082 void *data; 00083 } Slave_job_item; 00084 00092 class circular_buffer_queue 00093 { 00094 public: 00095 00096 DYNAMIC_ARRAY Q; 00097 ulong size; // the Size of the queue in terms of element 00098 ulong avail; // first Available index to append at (next to tail) 00099 ulong entry; // the head index or the entry point to the queue. 00100 volatile ulong len; // actual length 00101 bool inited_queue; 00102 00103 circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) : 00104 size(max), avail(0), entry(max), len(0), inited_queue(FALSE) 00105 { 00106 DBUG_ASSERT(size < (ulong) -1); 00107 if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc)) 00108 inited_queue= TRUE; 00109 } 00110 circular_buffer_queue () : inited_queue(FALSE) {} 00111 ~circular_buffer_queue () 00112 { 00113 if (inited_queue) 00114 delete_dynamic(&Q); 00115 } 00116 00125 ulong de_queue(uchar *); 00129 ulong de_tail(uchar *val); 00130 00136 ulong en_queue(void *item); 00140 void* head_queue(); 00141 bool gt(ulong i, ulong k); // comparision of ordering of two entities 00142 /* index is within the valid range */ 00143 bool in(ulong k) { return !empty() && 00144 (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); } 00145 bool empty() { return entry == size; } 00146 bool full() { return avail == size; } 00147 }; 00148 00149 typedef struct st_slave_job_group 00150 { 00151 char *group_master_log_name; // (actually redundant) 00152 /* 00153 T-event lop_pos filled by Worker for CheckPoint (CP) 00154 */ 00155 my_off_t group_master_log_pos; 00156 00157 /* 00158 When relay-log name changes allocates and fill in a new name of relay-log, 00159 otherwise it fills in NULL. 00160 Coordinator keeps track of each Worker has been notified on the updating 00161 to make sure the routine runs once per change. 00162 00163 W checks the value at commit and memoriezes a not-NULL. 00164 Freeing unless NULL is left to Coordinator at CP. 00165 */ 00166 char *group_relay_log_name; // The value is last seen relay-log 00167 my_off_t group_relay_log_pos; // filled by W 00168 ulong worker_id; 00169 Slave_worker *worker; 00170 ulonglong total_seqno; 00171 00172 my_off_t master_log_pos; // B-event log_pos 00173 /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */ 00174 uint checkpoint_seqno; 00175 my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint 00176 char* checkpoint_log_name; 00177 my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint 00178 char* checkpoint_relay_log_name; 00179 volatile uchar done; // Flag raised by W, read and reset by Coordinator 00180 ulong shifted; // shift the last CP bitmap at receiving a new CP 00181 time_t ts; // Group's timestampt to update Seconds_behind_master 00182 #ifndef DBUG_OFF 00183 bool notified; // to debug group_master_log_name change notification 00184 #endif 00185 /* 00186 Coordinator fills the struct with defaults and options at starting of 00187 a group distribution. 00188 */ 00189 void reset(my_off_t master_pos, ulonglong seqno) 00190 { 00191 master_log_pos= master_pos; 00192 group_master_log_pos= group_relay_log_pos= 0; 00193 group_master_log_name= NULL; // todo: remove 00194 group_relay_log_name= NULL; 00195 worker_id= MTS_WORKER_UNDEF; 00196 total_seqno= seqno; 00197 checkpoint_log_name= NULL; 00198 checkpoint_log_pos= 0; 00199 checkpoint_relay_log_name= NULL; 00200 checkpoint_relay_log_pos= 0; 00201 checkpoint_seqno= (uint) -1; 00202 done= 0; 00203 #ifndef DBUG_OFF 00204 notified= false; 00205 #endif 00206 } 00207 } Slave_job_group; 00208 00214 class Slave_committed_queue : public circular_buffer_queue 00215 { 00216 public: 00217 00218 bool inited; 00219 00220 /* master's Rot-ev exec */ 00221 void update_current_binlog(const char *post_rotate); 00222 00223 /* 00224 The last checkpoint time Low-Water-Mark 00225 */ 00226 Slave_job_group lwm; 00227 00228 /* last time processed indexes for each worker */ 00229 DYNAMIC_ARRAY last_done; 00230 00231 /* the being assigned group index in GAQ */ 00232 ulong assigned_group_index; 00233 00234 Slave_committed_queue (const char *log, uint el_size, ulong max, uint n, 00235 uint inc= 0) 00236 : circular_buffer_queue(el_size, max, inc), inited(FALSE) 00237 { 00238 uint k; 00239 ulonglong l= 0; 00240 00241 if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue) 00242 return; 00243 else 00244 inited= TRUE; 00245 my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0); 00246 for (k= 0; k < n; k++) 00247 insert_dynamic(&last_done, (uchar*) &l); // empty for each Worker 00248 lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0)); 00249 lwm.group_relay_log_name[0]= 0; 00250 } 00251 00252 ~Slave_committed_queue () 00253 { 00254 if (inited) 00255 { 00256 delete_dynamic(&last_done); 00257 my_free(lwm.group_relay_log_name); 00258 free_dynamic_items(); // free possibly left allocated strings in GAQ list 00259 } 00260 } 00261 00262 #ifndef DBUG_OFF 00263 bool count_done(Relay_log_info* rli); 00264 #endif 00265 00266 /* Checkpoint routine refreshes the queue */ 00267 ulong move_queue_head(DYNAMIC_ARRAY *ws); 00268 /* Method is for slave shutdown time cleanup */ 00269 void free_dynamic_items(); 00270 /* 00271 returns a pointer to Slave_job_group struct instance as indexed by arg 00272 in the circular buffer dyn-array 00273 */ 00274 Slave_job_group* get_job_group(ulong ind) 00275 { 00276 return (Slave_job_group*) dynamic_array_ptr(&Q, ind); 00277 } 00278 00283 ulong en_queue(void *item) 00284 { 00285 return assigned_group_index= circular_buffer_queue::en_queue(item); 00286 } 00287 00288 }; 00289 00290 class Slave_jobs_queue : public circular_buffer_queue 00291 { 00292 public: 00293 00294 /* 00295 Coordinator marks with true, Worker signals back at queue back to 00296 available 00297 */ 00298 bool overfill; 00299 ulonglong waited_overfill; 00300 }; 00301 00302 class Slave_worker : public Relay_log_info 00303 { 00304 public: 00305 Slave_worker(Relay_log_info *rli 00306 #ifdef HAVE_PSI_INTERFACE 00307 ,PSI_mutex_key *param_key_info_run_lock, 00308 PSI_mutex_key *param_key_info_data_lock, 00309 PSI_mutex_key *param_key_info_sleep_lock, 00310 PSI_mutex_key *param_key_info_data_cond, 00311 PSI_mutex_key *param_key_info_start_cond, 00312 PSI_mutex_key *param_key_info_stop_cond, 00313 PSI_mutex_key *param_key_info_sleep_cond 00314 #endif 00315 , uint param_id 00316 ); 00317 virtual ~Slave_worker(); 00318 00319 Slave_jobs_queue jobs; // assignment queue containing events to execute 00320 mysql_mutex_t jobs_lock; // mutex for the jobs queue 00321 mysql_cond_t jobs_cond; // condition variable for the jobs queue 00322 Relay_log_info *c_rli; // pointer to Coordinator's rli 00323 DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions 00324 bool curr_group_seen_begin; // is set to TRUE with explicit B-event 00325 ulong id; // numberic identifier of the Worker 00326 00327 /* 00328 Worker runtime statictics 00329 */ 00330 // the index in GAQ of the last processed group by this Worker 00331 volatile ulong last_group_done_index; 00332 ulonglong last_groups_assigned_index; // index of previous group assigned to worker 00333 ulong wq_empty_waits; // how many times got idle 00334 ulong events_done; // how many events (statements) processed 00335 ulong groups_done; // how many groups (transactions) processed 00336 volatile int curr_jobs; // number of active assignments 00337 // number of partitions allocated to the worker at point in time 00338 long usage_partition; 00339 // symmetric to rli->mts_end_group_sets_max_dbs 00340 bool end_group_sets_max_dbs; 00341 00342 volatile bool relay_log_change_notified; // Coord sets and resets, W can read 00343 volatile bool checkpoint_notified; // Coord sets and resets, W can read 00344 volatile bool master_log_change_notified; // Coord sets and resets, W can read 00345 ulong bitmap_shifted; // shift the last bitmap at receiving new CP 00346 // WQ current excess above the overrun level 00347 long wq_overrun_cnt; 00348 /* 00349 number of events starting from which Worker queue is regarded as 00350 close to full. The number of the excessive events yields a weight factor 00351 to compute Coordinator's nap. 00352 */ 00353 ulong overrun_level; 00354 /* 00355 reverse to overrun: the number of events below which Worker is 00356 considered underruning 00357 */ 00358 ulong underrun_level; 00359 /* 00360 Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker. 00361 When WQ length is dropped below overrun the counter is reset. 00362 */ 00363 ulong excess_cnt; 00364 /* 00365 Coordinates of the last CheckPoint (CP) this Worker has 00366 acknowledged; part of is persisent data 00367 */ 00368 char checkpoint_relay_log_name[FN_REFLEN]; 00369 ulonglong checkpoint_relay_log_pos; 00370 char checkpoint_master_log_name[FN_REFLEN]; 00371 ulonglong checkpoint_master_log_pos; 00372 MY_BITMAP group_executed; // bitmap describes groups executed after last CP 00373 MY_BITMAP group_shifted; // temporary bitmap to compute group_executed 00374 ulong checkpoint_seqno; // the most significant ON bit in group_executed 00375 enum en_running_state 00376 { 00377 NOT_RUNNING= 0, 00378 RUNNING= 1, 00379 ERROR_LEAVING= 2, // is set by Worker 00380 STOP= 3, // is set by Coordinator upon receiving STOP 00381 STOP_ACCEPTED= 4 // is set by worker upon completing job when STOP SLAVE is issued 00382 }; 00383 /* 00384 The running status is guarded by jobs_lock mutex that a writer 00385 Coordinator or Worker itself needs to hold when write a new value. 00386 */ 00387 en_running_state volatile running_status; 00388 /* 00389 exit_incremented indicates whether worker has contributed to max updated index. 00390 By default it is set to false. When the worker contibutes for the first time this 00391 variable is set to true. 00392 */ 00393 bool exit_incremented; 00394 00395 int init_worker(Relay_log_info*, ulong); 00396 int rli_init_info(bool); 00397 int flush_info(bool force= FALSE); 00398 static size_t get_number_worker_fields(); 00399 void slave_worker_ends_group(Log_event*, int); 00400 const char *get_master_log_name(); 00401 ulonglong get_master_log_pos() { return master_log_pos; }; 00402 ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; }; 00403 bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force); 00404 /* 00405 When commit fails clear bitmap for executed worker group. Revert back the 00406 positions to the old positions that existed before commit using the checkpoint. 00407 00408 @param Slave_job_group a pointer to Slave_job_group struct instance which 00409 holds group master log pos, group relay log pos and checkpoint positions. 00410 */ 00411 void rollback_positions(Slave_job_group *ptr_g); 00412 bool reset_recovery_info(); 00419 void set_rli_description_event(Format_description_log_event *fdle) 00420 { 00421 DBUG_ASSERT(!fdle || (running_status == Slave_worker::RUNNING && info_thd)); 00422 #ifndef DBUG_OFF 00423 if (fdle) 00424 mysql_mutex_assert_owner(&jobs_lock); 00425 #endif 00426 00427 if (fdle) 00428 adapt_to_master_version(fdle); 00429 rli_description_event= fdle; 00430 } 00431 00432 inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; }; 00433 inline void set_gaq_index(ulong val) 00434 { 00435 if (gaq_index == c_rli->gaq->size) 00436 gaq_index= val; 00437 }; 00438 00439 protected: 00440 00441 virtual void do_report(loglevel level, int err_code, 00442 const char *msg, va_list v_args) const; 00443 00444 private: 00445 ulong gaq_index; // GAQ index of the current assignment 00446 ulonglong master_log_pos; // event's cached log_pos for possibile error report 00447 void end_info(); 00448 bool read_info(Rpl_info_handler *from); 00449 bool write_info(Rpl_info_handler *to); 00450 Slave_worker& operator=(const Slave_worker& info); 00451 Slave_worker(const Slave_worker& info); 00452 }; 00453 00454 void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret); 00455 bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item); 00456 bool set_max_updated_index_on_stop(Slave_worker *worker, 00457 Slave_job_item *job_item); 00458 00459 TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*); 00460 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*); 00461 #endif // HAVE_REPLICATION 00462 #endif